summary refs log tree commit diff
path: root/fs/nfsd
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2009-04-06 13:25:56 -0700
committerLinus Torvalds <torvalds@linux-foundation.org>2009-04-06 13:25:56 -0700
commita63856252d2112e7c452696037a86ceb12f47f80 (patch)
treeb1ad03fe441349069f80e58de425b3f72af9e5b7 /fs/nfsd
parentb24241a09208874d5d770bee30791daae41ad762 (diff)
parent04826f43d4f0a4d56423eb8abb9f2ec9987df5b5 (diff)
downloadlinux-a63856252d2112e7c452696037a86ceb12f47f80.tar.gz
Merge branch 'for-2.6.30' of git://linux-nfs.org/~bfields/linux
* 'for-2.6.30' of git://linux-nfs.org/~bfields/linux: (81 commits)
  nfsd41: define nfsd4_set_statp as noop for !CONFIG_NFSD_V4
  nfsd41: define NFSD_DRC_SIZE_SHIFT in set_max_drc
  nfsd41: Documentation/filesystems/nfs41-server.txt
  nfsd41: CREATE_EXCLUSIVE4_1
  nfsd41: SUPPATTR_EXCLCREAT attribute
  nfsd41: support for 3-word long attribute bitmask
  nfsd: dynamically skip encoded fattr bitmap in _nfsd4_verify
  nfsd41: pass writable attrs mask to nfsd4_decode_fattr
  nfsd41: provide support for minor version 1 at rpc level
  nfsd41: control nfsv4.1 svc via /proc/fs/nfsd/versions
  nfsd41: add OPEN4_SHARE_ACCESS_WANT nfs4_stateid bmap
  nfsd41: access_valid
  nfsd41: clientid handling
  nfsd41: check encode size for sessions maxresponse cached
  nfsd41: stateid handling
  nfsd: pass nfsd4_compound_state* to nfs4_preprocess_{state,seq}id_op
  nfsd41: destroy_session operation
  nfsd41: non-page DRC for solo sequence responses
  nfsd41: Add a create session replay cache
  nfsd41: create_session operation
  ...
Diffstat (limited to 'fs/nfsd')
-rw-r--r--fs/nfsd/Kconfig1
-rw-r--r--fs/nfsd/nfs3proc.c10
-rw-r--r--fs/nfsd/nfs4callback.c47
-rw-r--r--fs/nfsd/nfs4proc.c246
-rw-r--r--fs/nfsd/nfs4recover.c74
-rw-r--r--fs/nfsd/nfs4state.c1196
-rw-r--r--fs/nfsd/nfs4xdr.c633
-rw-r--r--fs/nfsd/nfsctl.c38
-rw-r--r--fs/nfsd/nfsproc.c3
-rw-r--r--fs/nfsd/nfssvc.c88
-rw-r--r--fs/nfsd/vfs.c37
11 files changed, 1999 insertions, 374 deletions
diff --git a/fs/nfsd/Kconfig b/fs/nfsd/Kconfig
index 44d7d04dab95..503b9da159a3 100644
--- a/fs/nfsd/Kconfig
+++ b/fs/nfsd/Kconfig
@@ -1,6 +1,7 @@
 config NFSD
 	tristate "NFS server support"
 	depends on INET
+	depends on FILE_LOCKING
 	select LOCKD
 	select SUNRPC
 	select EXPORTFS
diff --git a/fs/nfsd/nfs3proc.c b/fs/nfsd/nfs3proc.c
index 9dbd2eb91281..7c9fe838f038 100644
--- a/fs/nfsd/nfs3proc.c
+++ b/fs/nfsd/nfs3proc.c
@@ -18,6 +18,7 @@
 #include <linux/unistd.h>
 #include <linux/slab.h>
 #include <linux/major.h>
+#include <linux/magic.h>
 
 #include <linux/sunrpc/svc.h>
 #include <linux/nfsd/nfsd.h>
@@ -202,6 +203,7 @@ nfsd3_proc_write(struct svc_rqst *rqstp, struct nfsd3_writeargs *argp,
 					 struct nfsd3_writeres  *resp)
 {
 	__be32	nfserr;
+	unsigned long cnt = argp->len;
 
 	dprintk("nfsd: WRITE(3)    %s %d bytes at %ld%s\n",
 				SVCFH_fmt(&argp->fh),
@@ -214,9 +216,9 @@ nfsd3_proc_write(struct svc_rqst *rqstp, struct nfsd3_writeargs *argp,
 	nfserr = nfsd_write(rqstp, &resp->fh, NULL,
 				   argp->offset,
 				   rqstp->rq_vec, argp->vlen,
-				   argp->len,
+				   &cnt,
 				   &resp->committed);
-	resp->count = argp->count;
+	resp->count = cnt;
 	RETURN_STATUS(nfserr);
 }
 
@@ -569,7 +571,7 @@ nfsd3_proc_fsinfo(struct svc_rqst * rqstp, struct nfsd_fhandle    *argp,
 		struct super_block *sb = argp->fh.fh_dentry->d_inode->i_sb;
 
 		/* Note that we don't care for remote fs's here */
-		if (sb->s_magic == 0x4d44 /* MSDOS_SUPER_MAGIC */) {
+		if (sb->s_magic == MSDOS_SUPER_MAGIC) {
 			resp->f_properties = NFS3_FSF_BILLYBOY;
 		}
 		resp->f_maxfilesize = sb->s_maxbytes;
@@ -610,7 +612,7 @@ nfsd3_proc_pathconf(struct svc_rqst * rqstp, struct nfsd_fhandle      *argp,
 			resp->p_link_max = EXT2_LINK_MAX;
 			resp->p_name_max = EXT2_NAME_LEN;
 			break;
-		case 0x4d44:	/* MSDOS_SUPER_MAGIC */
+		case MSDOS_SUPER_MAGIC:
 			resp->p_case_insensitive = 1;
 			resp->p_case_preserving  = 0;
 			break;
diff --git a/fs/nfsd/nfs4callback.c b/fs/nfsd/nfs4callback.c
index c464181b5994..290289bd44f7 100644
--- a/fs/nfsd/nfs4callback.c
+++ b/fs/nfsd/nfs4callback.c
@@ -218,7 +218,7 @@ static int
 encode_cb_recall(struct xdr_stream *xdr, struct nfs4_cb_recall *cb_rec)
 {
 	__be32 *p;
-	int len = cb_rec->cbr_fhlen;
+	int len = cb_rec->cbr_fh.fh_size;
 
 	RESERVE_SPACE(12+sizeof(cb_rec->cbr_stateid) + len);
 	WRITE32(OP_CB_RECALL);
@@ -226,7 +226,7 @@ encode_cb_recall(struct xdr_stream *xdr, struct nfs4_cb_recall *cb_rec)
 	WRITEMEM(&cb_rec->cbr_stateid.si_opaque, sizeof(stateid_opaque_t));
 	WRITE32(cb_rec->cbr_trunc);
 	WRITE32(len);
-	WRITEMEM(cb_rec->cbr_fhval, len);
+	WRITEMEM(&cb_rec->cbr_fh.fh_base, len);
 	return 0;
 }
 
@@ -361,9 +361,8 @@ static struct rpc_program cb_program = {
 /* Reference counting, callback cleanup, etc., all look racy as heck.
  * And why is cb_set an atomic? */
 
-static int do_probe_callback(void *data)
+static struct rpc_clnt *setup_callback_client(struct nfs4_client *clp)
 {
-	struct nfs4_client *clp = data;
 	struct sockaddr_in	addr;
 	struct nfs4_callback    *cb = &clp->cl_callback;
 	struct rpc_timeout	timeparms = {
@@ -384,17 +383,10 @@ static int do_probe_callback(void *data)
 		.flags		= (RPC_CLNT_CREATE_NOPING | RPC_CLNT_CREATE_QUIET),
 		.client_name    = clp->cl_principal,
 	};
-	struct rpc_message msg = {
-		.rpc_proc       = &nfs4_cb_procedures[NFSPROC4_CLNT_CB_NULL],
-		.rpc_argp       = clp,
-	};
 	struct rpc_clnt *client;
-	int status;
 
-	if (!clp->cl_principal && (clp->cl_flavor >= RPC_AUTH_GSS_KRB5)) {
-		status = nfserr_cb_path_down;
-		goto out_err;
-	}
+	if (!clp->cl_principal && (clp->cl_flavor >= RPC_AUTH_GSS_KRB5))
+		return ERR_PTR(-EINVAL);
 
 	/* Initialize address */
 	memset(&addr, 0, sizeof(addr));
@@ -404,9 +396,29 @@ static int do_probe_callback(void *data)
 
 	/* Create RPC client */
 	client = rpc_create(&args);
+	if (IS_ERR(client))
+		dprintk("NFSD: couldn't create callback client: %ld\n",
+			PTR_ERR(client));
+	return client;
+
+}
+
+static int do_probe_callback(void *data)
+{
+	struct nfs4_client *clp = data;
+	struct nfs4_callback    *cb = &clp->cl_callback;
+	struct rpc_message msg = {
+		.rpc_proc       = &nfs4_cb_procedures[NFSPROC4_CLNT_CB_NULL],
+		.rpc_argp       = clp,
+	};
+	struct rpc_clnt *client;
+	int status;
+
+	client = setup_callback_client(clp);
 	if (IS_ERR(client)) {
-		dprintk("NFSD: couldn't create callback client\n");
 		status = PTR_ERR(client);
+		dprintk("NFSD: couldn't create callback client: %d\n",
+								status);
 		goto out_err;
 	}
 
@@ -422,10 +434,10 @@ static int do_probe_callback(void *data)
 out_release_client:
 	rpc_shutdown_client(client);
 out_err:
-	dprintk("NFSD: warning: no callback path to client %.*s\n",
-		(int)clp->cl_name.len, clp->cl_name.data);
+	dprintk("NFSD: warning: no callback path to client %.*s: error %d\n",
+		(int)clp->cl_name.len, clp->cl_name.data, status);
 	put_nfs4_client(clp);
-	return status;
+	return 0;
 }
 
 /*
@@ -451,7 +463,6 @@ nfsd4_probe_callback(struct nfs4_client *clp)
 
 /*
  * called with dp->dl_count inc'ed.
- * nfs4_lock_state() may or may not have been called.
  */
 void
 nfsd4_cb_recall(struct nfs4_delegation *dp)
diff --git a/fs/nfsd/nfs4proc.c b/fs/nfsd/nfs4proc.c
index 9fa60a3ad48c..b2883e9c6381 100644
--- a/fs/nfsd/nfs4proc.c
+++ b/fs/nfsd/nfs4proc.c
@@ -93,6 +93,21 @@ do_open_lookup(struct svc_rqst *rqstp, struct svc_fh *current_fh, struct nfsd4_o
 	open->op_truncate = 0;
 
 	if (open->op_create) {
+		/* FIXME: check session persistence and pnfs flags.
+		 * The nfsv4.1 spec requires the following semantics:
+		 *
+		 * Persistent   | pNFS   | Server REQUIRED | Client Allowed
+		 * Reply Cache  | server |                 |
+		 * -------------+--------+-----------------+--------------------
+		 * no           | no     | EXCLUSIVE4_1    | EXCLUSIVE4_1
+		 *              |        |                 | (SHOULD)
+		 *              |        | and EXCLUSIVE4  | or EXCLUSIVE4
+		 *              |        |                 | (SHOULD NOT)
+		 * no           | yes    | EXCLUSIVE4_1    | EXCLUSIVE4_1
+		 * yes          | no     | GUARDED4        | GUARDED4
+		 * yes          | yes    | GUARDED4        | GUARDED4
+		 */
+
 		/*
 		 * Note: create modes (UNCHECKED,GUARDED...) are the same
 		 * in NFSv4 as in v3.
@@ -103,11 +118,13 @@ do_open_lookup(struct svc_rqst *rqstp, struct svc_fh *current_fh, struct nfsd4_o
 					(u32 *)open->op_verf.data,
 					&open->op_truncate, &created);
 
-		/* If we ever decide to use different attrs to store the
-		 * verifier in nfsd_create_v3, then we'll need to change this
+		/*
+		 * Following rfc 3530 14.2.16, use the returned bitmask
+		 * to indicate which attributes we used to store the
+		 * verifier:
 		 */
 		if (open->op_createmode == NFS4_CREATE_EXCLUSIVE && status == 0)
-			open->op_bmval[1] |= (FATTR4_WORD1_TIME_ACCESS |
+			open->op_bmval[1] = (FATTR4_WORD1_TIME_ACCESS |
 						FATTR4_WORD1_TIME_MODIFY);
 	} else {
 		status = nfsd_lookup(rqstp, current_fh,
@@ -118,13 +135,11 @@ do_open_lookup(struct svc_rqst *rqstp, struct svc_fh *current_fh, struct nfsd4_o
 		goto out;
 
 	set_change_info(&open->op_cinfo, current_fh);
-
-	/* set reply cache */
 	fh_dup2(current_fh, &resfh);
-	open->op_stateowner->so_replay.rp_openfh_len = resfh.fh_handle.fh_size;
-	memcpy(open->op_stateowner->so_replay.rp_openfh,
-			&resfh.fh_handle.fh_base, resfh.fh_handle.fh_size);
 
+	/* set reply cache */
+	fh_copy_shallow(&open->op_stateowner->so_replay.rp_openfh,
+			&resfh.fh_handle);
 	if (!created)
 		status = do_open_permission(rqstp, current_fh, open,
 					    NFSD_MAY_NOP);
@@ -150,10 +165,8 @@ do_open_fhandle(struct svc_rqst *rqstp, struct svc_fh *current_fh, struct nfsd4_
 	memset(&open->op_cinfo, 0, sizeof(struct nfsd4_change_info));
 
 	/* set replay cache */
-	open->op_stateowner->so_replay.rp_openfh_len = current_fh->fh_handle.fh_size;
-	memcpy(open->op_stateowner->so_replay.rp_openfh,
-		&current_fh->fh_handle.fh_base,
-		current_fh->fh_handle.fh_size);
+	fh_copy_shallow(&open->op_stateowner->so_replay.rp_openfh,
+			&current_fh->fh_handle);
 
 	open->op_truncate = (open->op_iattr.ia_valid & ATTR_SIZE) &&
 		(open->op_iattr.ia_size == 0);
@@ -164,12 +177,23 @@ do_open_fhandle(struct svc_rqst *rqstp, struct svc_fh *current_fh, struct nfsd4_
 	return status;
 }
 
+static void
+copy_clientid(clientid_t *clid, struct nfsd4_session *session)
+{
+	struct nfsd4_sessionid *sid =
+			(struct nfsd4_sessionid *)session->se_sessionid.data;
+
+	clid->cl_boot = sid->clientid.cl_boot;
+	clid->cl_id = sid->clientid.cl_id;
+}
 
 static __be32
 nfsd4_open(struct svc_rqst *rqstp, struct nfsd4_compound_state *cstate,
 	   struct nfsd4_open *open)
 {
 	__be32 status;
+	struct nfsd4_compoundres *resp;
+
 	dprintk("NFSD: nfsd4_open filename %.*s op_stateowner %p\n",
 		(int)open->op_fname.len, open->op_fname.data,
 		open->op_stateowner);
@@ -178,16 +202,19 @@ nfsd4_open(struct svc_rqst *rqstp, struct nfsd4_compound_state *cstate,
 	if (open->op_create && open->op_claim_type != NFS4_OPEN_CLAIM_NULL)
 		return nfserr_inval;
 
+	if (nfsd4_has_session(cstate))
+		copy_clientid(&open->op_clientid, cstate->session);
+
 	nfs4_lock_state();
 
 	/* check seqid for replay. set nfs4_owner */
-	status = nfsd4_process_open1(open);
+	resp = rqstp->rq_resp;
+	status = nfsd4_process_open1(&resp->cstate, open);
 	if (status == nfserr_replay_me) {
 		struct nfs4_replay *rp = &open->op_stateowner->so_replay;
 		fh_put(&cstate->current_fh);
-		cstate->current_fh.fh_handle.fh_size = rp->rp_openfh_len;
-		memcpy(&cstate->current_fh.fh_handle.fh_base, rp->rp_openfh,
-				rp->rp_openfh_len);
+		fh_copy_shallow(&cstate->current_fh.fh_handle,
+				&rp->rp_openfh);
 		status = fh_verify(rqstp, &cstate->current_fh, 0, NFSD_MAY_NOP);
 		if (status)
 			dprintk("nfsd4_open: replay failed"
@@ -209,10 +236,6 @@ nfsd4_open(struct svc_rqst *rqstp, struct nfsd4_compound_state *cstate,
 
 	switch (open->op_claim_type) {
 		case NFS4_OPEN_CLAIM_DELEGATE_CUR:
-			status = nfserr_inval;
-			if (open->op_create)
-				goto out;
-			/* fall through */
 		case NFS4_OPEN_CLAIM_NULL:
 			/*
 			 * (1) set CURRENT_FH to the file being opened,
@@ -455,8 +478,9 @@ nfsd4_getattr(struct svc_rqst *rqstp, struct nfsd4_compound_state *cstate,
 	if (getattr->ga_bmval[1] & NFSD_WRITEONLY_ATTRS_WORD1)
 		return nfserr_inval;
 
-	getattr->ga_bmval[0] &= NFSD_SUPPORTED_ATTRS_WORD0;
-	getattr->ga_bmval[1] &= NFSD_SUPPORTED_ATTRS_WORD1;
+	getattr->ga_bmval[0] &= nfsd_suppattrs0(cstate->minorversion);
+	getattr->ga_bmval[1] &= nfsd_suppattrs1(cstate->minorversion);
+	getattr->ga_bmval[2] &= nfsd_suppattrs2(cstate->minorversion);
 
 	getattr->ga_fhp = &cstate->current_fh;
 	return nfs_ok;
@@ -520,9 +544,8 @@ nfsd4_read(struct svc_rqst *rqstp, struct nfsd4_compound_state *cstate,
 
 	nfs4_lock_state();
 	/* check stateid */
-	if ((status = nfs4_preprocess_stateid_op(&cstate->current_fh,
-				&read->rd_stateid,
-				CHECK_FH | RD_STATE, &read->rd_filp))) {
+	if ((status = nfs4_preprocess_stateid_op(cstate, &read->rd_stateid,
+						 RD_STATE, &read->rd_filp))) {
 		dprintk("NFSD: nfsd4_read: couldn't process stateid!\n");
 		goto out;
 	}
@@ -548,8 +571,9 @@ nfsd4_readdir(struct svc_rqst *rqstp, struct nfsd4_compound_state *cstate,
 	if (readdir->rd_bmval[1] & NFSD_WRITEONLY_ATTRS_WORD1)
 		return nfserr_inval;
 
-	readdir->rd_bmval[0] &= NFSD_SUPPORTED_ATTRS_WORD0;
-	readdir->rd_bmval[1] &= NFSD_SUPPORTED_ATTRS_WORD1;
+	readdir->rd_bmval[0] &= nfsd_suppattrs0(cstate->minorversion);
+	readdir->rd_bmval[1] &= nfsd_suppattrs1(cstate->minorversion);
+	readdir->rd_bmval[2] &= nfsd_suppattrs2(cstate->minorversion);
 
 	if ((cookie > ~(u32)0) || (cookie == 1) || (cookie == 2) ||
 	    (cookie == 0 && memcmp(readdir->rd_verf.data, zeroverf.data, NFS4_VERIFIER_SIZE)))
@@ -653,8 +677,8 @@ nfsd4_setattr(struct svc_rqst *rqstp, struct nfsd4_compound_state *cstate,
 
 	if (setattr->sa_iattr.ia_valid & ATTR_SIZE) {
 		nfs4_lock_state();
-		status = nfs4_preprocess_stateid_op(&cstate->current_fh,
-			&setattr->sa_stateid, CHECK_FH | WR_STATE, NULL);
+		status = nfs4_preprocess_stateid_op(cstate,
+			&setattr->sa_stateid, WR_STATE, NULL);
 		nfs4_unlock_state();
 		if (status) {
 			dprintk("NFSD: nfsd4_setattr: couldn't process stateid!\n");
@@ -685,6 +709,7 @@ nfsd4_write(struct svc_rqst *rqstp, struct nfsd4_compound_state *cstate,
 	struct file *filp = NULL;
 	u32 *p;
 	__be32 status = nfs_ok;
+	unsigned long cnt;
 
 	/* no need to check permission - this will be done in nfsd_write() */
 
@@ -692,8 +717,7 @@ nfsd4_write(struct svc_rqst *rqstp, struct nfsd4_compound_state *cstate,
 		return nfserr_inval;
 
 	nfs4_lock_state();
-	status = nfs4_preprocess_stateid_op(&cstate->current_fh, stateid,
-					CHECK_FH | WR_STATE, &filp);
+	status = nfs4_preprocess_stateid_op(cstate, stateid, WR_STATE, &filp);
 	if (filp)
 		get_file(filp);
 	nfs4_unlock_state();
@@ -703,7 +727,7 @@ nfsd4_write(struct svc_rqst *rqstp, struct nfsd4_compound_state *cstate,
 		return status;
 	}
 
-	write->wr_bytes_written = write->wr_buflen;
+	cnt = write->wr_buflen;
 	write->wr_how_written = write->wr_stable_how;
 	p = (u32 *)write->wr_verifier.data;
 	*p++ = nfssvc_boot.tv_sec;
@@ -711,10 +735,12 @@ nfsd4_write(struct svc_rqst *rqstp, struct nfsd4_compound_state *cstate,
 
 	status =  nfsd_write(rqstp, &cstate->current_fh, filp,
 			     write->wr_offset, rqstp->rq_vec, write->wr_vlen,
-			     write->wr_buflen, &write->wr_how_written);
+			     &cnt, &write->wr_how_written);
 	if (filp)
 		fput(filp);
 
+	write->wr_bytes_written = cnt;
+
 	if (status == nfserr_symlink)
 		status = nfserr_inval;
 	return status;
@@ -737,8 +763,9 @@ _nfsd4_verify(struct svc_rqst *rqstp, struct nfsd4_compound_state *cstate,
 	if (status)
 		return status;
 
-	if ((verify->ve_bmval[0] & ~NFSD_SUPPORTED_ATTRS_WORD0)
-	    || (verify->ve_bmval[1] & ~NFSD_SUPPORTED_ATTRS_WORD1))
+	if ((verify->ve_bmval[0] & ~nfsd_suppattrs0(cstate->minorversion))
+	    || (verify->ve_bmval[1] & ~nfsd_suppattrs1(cstate->minorversion))
+	    || (verify->ve_bmval[2] & ~nfsd_suppattrs2(cstate->minorversion)))
 		return nfserr_attrnotsupp;
 	if ((verify->ve_bmval[0] & FATTR4_WORD0_RDATTR_ERROR)
 	    || (verify->ve_bmval[1] & NFSD_WRITEONLY_ATTRS_WORD1))
@@ -766,7 +793,8 @@ _nfsd4_verify(struct svc_rqst *rqstp, struct nfsd4_compound_state *cstate,
 	if (status)
 		goto out_kfree;
 
-	p = buf + 3;
+	/* skip bitmap */
+	p = buf + 1 + ntohl(buf[0]);
 	status = nfserr_not_same;
 	if (ntohl(*p++) != verify->ve_attrlen)
 		goto out_kfree;
@@ -813,39 +841,17 @@ static inline void nfsd4_increment_op_stats(u32 opnum)
 		nfsdstats.nfs4_opcount[opnum]++;
 }
 
-static void cstate_free(struct nfsd4_compound_state *cstate)
-{
-	if (cstate == NULL)
-		return;
-	fh_put(&cstate->current_fh);
-	fh_put(&cstate->save_fh);
-	BUG_ON(cstate->replay_owner);
-	kfree(cstate);
-}
-
-static struct nfsd4_compound_state *cstate_alloc(void)
-{
-	struct nfsd4_compound_state *cstate;
-
-	cstate = kmalloc(sizeof(struct nfsd4_compound_state), GFP_KERNEL);
-	if (cstate == NULL)
-		return NULL;
-	fh_init(&cstate->current_fh, NFS4_FHSIZE);
-	fh_init(&cstate->save_fh, NFS4_FHSIZE);
-	cstate->replay_owner = NULL;
-	return cstate;
-}
-
 typedef __be32(*nfsd4op_func)(struct svc_rqst *, struct nfsd4_compound_state *,
 			      void *);
+enum nfsd4_op_flags {
+	ALLOWED_WITHOUT_FH = 1 << 0,	/* No current filehandle required */
+	ALLOWED_ON_ABSENT_FS = 2 << 0,	/* ops processed on absent fs */
+	ALLOWED_AS_FIRST_OP = 3 << 0,	/* ops reqired first in compound */
+};
 
 struct nfsd4_operation {
 	nfsd4op_func op_func;
 	u32 op_flags;
-/* Most ops require a valid current filehandle; a few don't: */
-#define ALLOWED_WITHOUT_FH 1
-/* GETATTR and ops not listed as returning NFS4ERR_MOVED: */
-#define ALLOWED_ON_ABSENT_FS 2
 	char *op_name;
 };
 
@@ -854,6 +860,51 @@ static struct nfsd4_operation nfsd4_ops[];
 static const char *nfsd4_op_name(unsigned opnum);
 
 /*
+ * This is a replay of a compound for which no cache entry pages
+ * were used. Encode the sequence operation, and if cachethis is FALSE
+ * encode the uncache rep error on the next operation.
+ */
+static __be32
+nfsd4_enc_uncached_replay(struct nfsd4_compoundargs *args,
+			 struct nfsd4_compoundres *resp)
+{
+	struct nfsd4_op *op;
+
+	dprintk("--> %s resp->opcnt %d ce_cachethis %u \n", __func__,
+		resp->opcnt, resp->cstate.slot->sl_cache_entry.ce_cachethis);
+
+	/* Encode the replayed sequence operation */
+	BUG_ON(resp->opcnt != 1);
+	op = &args->ops[resp->opcnt - 1];
+	nfsd4_encode_operation(resp, op);
+
+	/*return nfserr_retry_uncached_rep in next operation. */
+	if (resp->cstate.slot->sl_cache_entry.ce_cachethis == 0) {
+		op = &args->ops[resp->opcnt++];
+		op->status = nfserr_retry_uncached_rep;
+		nfsd4_encode_operation(resp, op);
+	}
+	return op->status;
+}
+
+/*
+ * Enforce NFSv4.1 COMPOUND ordering rules.
+ *
+ * TODO:
+ * - enforce NFS4ERR_NOT_ONLY_OP,
+ * - DESTROY_SESSION MUST be the final operation in the COMPOUND request.
+ */
+static bool nfs41_op_ordering_ok(struct nfsd4_compoundargs *args)
+{
+	if (args->minorversion && args->opcnt > 0) {
+		struct nfsd4_op *op = &args->ops[0];
+		return (op->status == nfserr_op_illegal) ||
+		       (nfsd4_ops[op->opnum].op_flags & ALLOWED_AS_FIRST_OP);
+	}
+	return true;
+}
+
+/*
  * COMPOUND call.
  */
 static __be32
@@ -863,12 +914,13 @@ nfsd4_proc_compound(struct svc_rqst *rqstp,
 {
 	struct nfsd4_op	*op;
 	struct nfsd4_operation *opdesc;
-	struct nfsd4_compound_state *cstate = NULL;
+	struct nfsd4_compound_state *cstate = &resp->cstate;
 	int		slack_bytes;
 	__be32		status;
 
 	resp->xbuf = &rqstp->rq_res;
-	resp->p = rqstp->rq_res.head[0].iov_base + rqstp->rq_res.head[0].iov_len;
+	resp->p = rqstp->rq_res.head[0].iov_base +
+						rqstp->rq_res.head[0].iov_len;
 	resp->tagp = resp->p;
 	/* reserve space for: taglen, tag, and opcnt */
 	resp->p += 2 + XDR_QUADLEN(args->taglen);
@@ -877,18 +929,25 @@ nfsd4_proc_compound(struct svc_rqst *rqstp,
 	resp->tag = args->tag;
 	resp->opcnt = 0;
 	resp->rqstp = rqstp;
+	resp->cstate.minorversion = args->minorversion;
+	resp->cstate.replay_owner = NULL;
+	fh_init(&resp->cstate.current_fh, NFS4_FHSIZE);
+	fh_init(&resp->cstate.save_fh, NFS4_FHSIZE);
+	/* Use the deferral mechanism only for NFSv4.0 compounds */
+	rqstp->rq_usedeferral = (args->minorversion == 0);
 
 	/*
 	 * According to RFC3010, this takes precedence over all other errors.
 	 */
 	status = nfserr_minor_vers_mismatch;
-	if (args->minorversion > NFSD_SUPPORTED_MINOR_VERSION)
+	if (args->minorversion > nfsd_supported_minorversion)
 		goto out;
 
-	status = nfserr_resource;
-	cstate = cstate_alloc();
-	if (cstate == NULL)
-		goto out;
+	if (!nfs41_op_ordering_ok(args)) {
+		op = &args->ops[0];
+		op->status = nfserr_sequence_pos;
+		goto encode_op;
+	}
 
 	status = nfs_ok;
 	while (!status && resp->opcnt < args->opcnt) {
@@ -897,7 +956,6 @@ nfsd4_proc_compound(struct svc_rqst *rqstp,
 		dprintk("nfsv4 compound op #%d/%d: %d (%s)\n",
 			resp->opcnt, args->opcnt, op->opnum,
 			nfsd4_op_name(op->opnum));
-
 		/*
 		 * The XDR decode routines may have pre-set op->status;
 		 * for example, if there is a miscellaneous XDR error
@@ -938,6 +996,15 @@ nfsd4_proc_compound(struct svc_rqst *rqstp,
 			BUG_ON(op->status == nfs_ok);
 
 encode_op:
+		/* Only from SEQUENCE or CREATE_SESSION */
+		if (resp->cstate.status == nfserr_replay_cache) {
+			dprintk("%s NFS4.1 replay from cache\n", __func__);
+			if (nfsd4_not_cached(resp))
+				status = nfsd4_enc_uncached_replay(args, resp);
+			else
+				status = op->status;
+			goto out;
+		}
 		if (op->status == nfserr_replay_me) {
 			op->replay = &cstate->replay_owner->so_replay;
 			nfsd4_encode_replay(resp, op);
@@ -961,15 +1028,24 @@ encode_op:
 
 		nfsd4_increment_op_stats(op->opnum);
 	}
+	if (!rqstp->rq_usedeferral && status == nfserr_dropit) {
+		dprintk("%s Dropit - send NFS4ERR_DELAY\n", __func__);
+		status = nfserr_jukebox;
+	}
 
-	cstate_free(cstate);
+	resp->cstate.status = status;
+	fh_put(&resp->cstate.current_fh);
+	fh_put(&resp->cstate.save_fh);
+	BUG_ON(resp->cstate.replay_owner);
 out:
 	nfsd4_release_compoundargs(args);
+	/* Reset deferral mechanism for RPC deferrals */
+	rqstp->rq_usedeferral = 1;
 	dprintk("nfsv4 compound returned %d\n", ntohl(status));
 	return status;
 }
 
-static struct nfsd4_operation nfsd4_ops[OP_RELEASE_LOCKOWNER+1] = {
+static struct nfsd4_operation nfsd4_ops[] = {
 	[OP_ACCESS] = {
 		.op_func = (nfsd4op_func)nfsd4_access,
 		.op_name = "OP_ACCESS",
@@ -1045,7 +1121,7 @@ static struct nfsd4_operation nfsd4_ops[OP_RELEASE_LOCKOWNER+1] = {
 		.op_name = "OP_PUTFH",
 	},
 	[OP_PUTPUBFH] = {
-		/* unsupported, just for future reference: */
+		.op_func = (nfsd4op_func)nfsd4_putrootfh,
 		.op_flags = ALLOWED_WITHOUT_FH | ALLOWED_ON_ABSENT_FS,
 		.op_name = "OP_PUTPUBFH",
 	},
@@ -1119,6 +1195,28 @@ static struct nfsd4_operation nfsd4_ops[OP_RELEASE_LOCKOWNER+1] = {
 		.op_flags = ALLOWED_WITHOUT_FH | ALLOWED_ON_ABSENT_FS,
 		.op_name = "OP_RELEASE_LOCKOWNER",
 	},
+
+	/* NFSv4.1 operations */
+	[OP_EXCHANGE_ID] = {
+		.op_func = (nfsd4op_func)nfsd4_exchange_id,
+		.op_flags = ALLOWED_WITHOUT_FH | ALLOWED_AS_FIRST_OP,
+		.op_name = "OP_EXCHANGE_ID",
+	},
+	[OP_CREATE_SESSION] = {
+		.op_func = (nfsd4op_func)nfsd4_create_session,
+		.op_flags = ALLOWED_WITHOUT_FH | ALLOWED_AS_FIRST_OP,
+		.op_name = "OP_CREATE_SESSION",
+	},
+	[OP_DESTROY_SESSION] = {
+		.op_func = (nfsd4op_func)nfsd4_destroy_session,
+		.op_flags = ALLOWED_WITHOUT_FH | ALLOWED_AS_FIRST_OP,
+		.op_name = "OP_DESTROY_SESSION",
+	},
+	[OP_SEQUENCE] = {
+		.op_func = (nfsd4op_func)nfsd4_sequence,
+		.op_flags = ALLOWED_WITHOUT_FH | ALLOWED_AS_FIRST_OP,
+		.op_name = "OP_SEQUENCE",
+	},
 };
 
 static const char *nfsd4_op_name(unsigned opnum)
diff --git a/fs/nfsd/nfs4recover.c b/fs/nfsd/nfs4recover.c
index 74f7b67567fd..3444c0052a87 100644
--- a/fs/nfsd/nfs4recover.c
+++ b/fs/nfsd/nfs4recover.c
@@ -182,36 +182,26 @@ out_unlock:
 
 typedef int (recdir_func)(struct dentry *, struct dentry *);
 
-struct dentry_list {
-	struct dentry *dentry;
+struct name_list {
+	char name[HEXDIR_LEN];
 	struct list_head list;
 };
 
-struct dentry_list_arg {
-	struct list_head dentries;
-	struct dentry *parent;
-};
-
 static int
-nfsd4_build_dentrylist(void *arg, const char *name, int namlen,
+nfsd4_build_namelist(void *arg, const char *name, int namlen,
 		loff_t offset, u64 ino, unsigned int d_type)
 {
-	struct dentry_list_arg *dla = arg;
-	struct list_head *dentries = &dla->dentries;
-	struct dentry *parent = dla->parent;
-	struct dentry *dentry;
-	struct dentry_list *child;
+	struct list_head *names = arg;
+	struct name_list *entry;
 
-	if (name && isdotent(name, namlen))
+	if (namlen != HEXDIR_LEN - 1)
 		return 0;
-	dentry = lookup_one_len(name, parent, namlen);
-	if (IS_ERR(dentry))
-		return PTR_ERR(dentry);
-	child = kmalloc(sizeof(*child), GFP_KERNEL);
-	if (child == NULL)
+	entry = kmalloc(sizeof(struct name_list), GFP_KERNEL);
+	if (entry == NULL)
 		return -ENOMEM;
-	child->dentry = dentry;
-	list_add(&child->list, dentries);
+	memcpy(entry->name, name, HEXDIR_LEN - 1);
+	entry->name[HEXDIR_LEN - 1] = '\0';
+	list_add(&entry->list, names);
 	return 0;
 }
 
@@ -220,11 +210,9 @@ nfsd4_list_rec_dir(struct dentry *dir, recdir_func *f)
 {
 	const struct cred *original_cred;
 	struct file *filp;
-	struct dentry_list_arg dla = {
-		.parent = dir,
-	};
-	struct list_head *dentries = &dla.dentries;
-	struct dentry_list *child;
+	LIST_HEAD(names);
+	struct name_list *entry;
+	struct dentry *dentry;
 	int status;
 
 	if (!rec_dir_init)
@@ -233,31 +221,34 @@ nfsd4_list_rec_dir(struct dentry *dir, recdir_func *f)
 	status = nfs4_save_creds(&original_cred);
 	if (status < 0)
 		return status;
-	INIT_LIST_HEAD(dentries);
 
 	filp = dentry_open(dget(dir), mntget(rec_dir.mnt), O_RDONLY,
 			   current_cred());
 	status = PTR_ERR(filp);
 	if (IS_ERR(filp))
 		goto out;
-	INIT_LIST_HEAD(dentries);
-	status = vfs_readdir(filp, nfsd4_build_dentrylist, &dla);
+	status = vfs_readdir(filp, nfsd4_build_namelist, &names);
 	fput(filp);
-	while (!list_empty(dentries)) {
-		child = list_entry(dentries->next, struct dentry_list, list);
-		status = f(dir, child->dentry);
+	while (!list_empty(&names)) {
+		entry = list_entry(names.next, struct name_list, list);
+
+		dentry = lookup_one_len(entry->name, dir, HEXDIR_LEN-1);
+		if (IS_ERR(dentry)) {
+			status = PTR_ERR(dentry);
+			goto out;
+		}
+		status = f(dir, dentry);
+		dput(dentry);
 		if (status)
 			goto out;
-		list_del(&child->list);
-		dput(child->dentry);
-		kfree(child);
+		list_del(&entry->list);
+		kfree(entry);
 	}
 out:
-	while (!list_empty(dentries)) {
-		child = list_entry(dentries->next, struct dentry_list, list);
-		list_del(&child->list);
-		dput(child->dentry);
-		kfree(child);
+	while (!list_empty(&names)) {
+		entry = list_entry(names.next, struct name_list, list);
+		list_del(&entry->list);
+		kfree(entry);
 	}
 	nfs4_reset_creds(original_cred);
 	return status;
@@ -353,7 +344,8 @@ purge_old(struct dentry *parent, struct dentry *child)
 {
 	int status;
 
-	if (nfs4_has_reclaimed_state(child->d_name.name))
+	/* note: we currently use this path only for minorversion 0 */
+	if (nfs4_has_reclaimed_state(child->d_name.name, false))
 		return 0;
 
 	status = nfsd4_clear_clid_dir(parent, child);
diff --git a/fs/nfsd/nfs4state.c b/fs/nfsd/nfs4state.c
index b6f60f48e94b..c65a27b76a9d 100644
--- a/fs/nfsd/nfs4state.c
+++ b/fs/nfsd/nfs4state.c
@@ -68,6 +68,7 @@ static u32 current_delegid = 1;
 static u32 nfs4_init;
 static stateid_t zerostateid;             /* bits all 0 */
 static stateid_t onestateid;              /* bits all 1 */
+static u64 current_sessionid = 1;
 
 #define ZERO_STATEID(stateid) (!memcmp((stateid), &zerostateid, sizeof(stateid_t)))
 #define ONE_STATEID(stateid)  (!memcmp((stateid), &onestateid, sizeof(stateid_t)))
@@ -75,18 +76,21 @@ static stateid_t onestateid;              /* bits all 1 */
 /* forward declarations */
 static struct nfs4_stateid * find_stateid(stateid_t *stid, int flags);
 static struct nfs4_delegation * find_delegation_stateid(struct inode *ino, stateid_t *stid);
-static void release_stateid_lockowners(struct nfs4_stateid *open_stp);
 static char user_recovery_dirname[PATH_MAX] = "/var/lib/nfs/v4recovery";
 static void nfs4_set_recdir(char *recdir);
 
-/* Locking:
- *
- * client_mutex:
- * 	protects clientid_hashtbl[], clientstr_hashtbl[],
- * 	unconfstr_hashtbl[], uncofid_hashtbl[].
- */
+/* Locking: */
+
+/* Currently used for almost all code touching nfsv4 state: */
 static DEFINE_MUTEX(client_mutex);
 
+/*
+ * Currently used for the del_recall_lru and file hash table.  In an
+ * effort to decrease the scope of the client_mutex, this spinlock may
+ * eventually cover more:
+ */
+static DEFINE_SPINLOCK(recall_lock);
+
 static struct kmem_cache *stateowner_slab = NULL;
 static struct kmem_cache *file_slab = NULL;
 static struct kmem_cache *stateid_slab = NULL;
@@ -117,37 +121,23 @@ opaque_hashval(const void *ptr, int nbytes)
 	return x;
 }
 
-/* forward declarations */
-static void release_stateowner(struct nfs4_stateowner *sop);
-static void release_stateid(struct nfs4_stateid *stp, int flags);
-
-/*
- * Delegation state
- */
-
-/* recall_lock protects the del_recall_lru */
-static DEFINE_SPINLOCK(recall_lock);
 static struct list_head del_recall_lru;
 
-static void
-free_nfs4_file(struct kref *kref)
-{
-	struct nfs4_file *fp = container_of(kref, struct nfs4_file, fi_ref);
-	list_del(&fp->fi_hash);
-	iput(fp->fi_inode);
-	kmem_cache_free(file_slab, fp);
-}
-
 static inline void
 put_nfs4_file(struct nfs4_file *fi)
 {
-	kref_put(&fi->fi_ref, free_nfs4_file);
+	if (atomic_dec_and_lock(&fi->fi_ref, &recall_lock)) {
+		list_del(&fi->fi_hash);
+		spin_unlock(&recall_lock);
+		iput(fi->fi_inode);
+		kmem_cache_free(file_slab, fi);
+	}
 }
 
 static inline void
 get_nfs4_file(struct nfs4_file *fi)
 {
-	kref_get(&fi->fi_ref);
+	atomic_inc(&fi->fi_ref);
 }
 
 static int num_delegations;
@@ -220,9 +210,7 @@ alloc_init_deleg(struct nfs4_client *clp, struct nfs4_stateid *stp, struct svc_f
 	dp->dl_stateid.si_stateownerid = current_delegid++;
 	dp->dl_stateid.si_fileid = 0;
 	dp->dl_stateid.si_generation = 0;
-	dp->dl_fhlen = current_fh->fh_handle.fh_size;
-	memcpy(dp->dl_fhval, &current_fh->fh_handle.fh_base,
-		        current_fh->fh_handle.fh_size);
+	fh_copy_shallow(&dp->dl_fh, &current_fh->fh_handle);
 	dp->dl_time = 0;
 	atomic_set(&dp->dl_count, 1);
 	list_add(&dp->dl_perfile, &fp->fi_delegations);
@@ -311,6 +299,291 @@ static struct list_head	unconf_id_hashtbl[CLIENT_HASH_SIZE];
 static struct list_head client_lru;
 static struct list_head close_lru;
 
+static void unhash_generic_stateid(struct nfs4_stateid *stp)
+{
+	list_del(&stp->st_hash);
+	list_del(&stp->st_perfile);
+	list_del(&stp->st_perstateowner);
+}
+
+static void free_generic_stateid(struct nfs4_stateid *stp)
+{
+	put_nfs4_file(stp->st_file);
+	kmem_cache_free(stateid_slab, stp);
+}
+
+static void release_lock_stateid(struct nfs4_stateid *stp)
+{
+	unhash_generic_stateid(stp);
+	locks_remove_posix(stp->st_vfs_file, (fl_owner_t)stp->st_stateowner);
+	free_generic_stateid(stp);
+}
+
+static void unhash_lockowner(struct nfs4_stateowner *sop)
+{
+	struct nfs4_stateid *stp;
+
+	list_del(&sop->so_idhash);
+	list_del(&sop->so_strhash);
+	list_del(&sop->so_perstateid);
+	while (!list_empty(&sop->so_stateids)) {
+		stp = list_first_entry(&sop->so_stateids,
+				struct nfs4_stateid, st_perstateowner);
+		release_lock_stateid(stp);
+	}
+}
+
+static void release_lockowner(struct nfs4_stateowner *sop)
+{
+	unhash_lockowner(sop);
+	nfs4_put_stateowner(sop);
+}
+
+static void
+release_stateid_lockowners(struct nfs4_stateid *open_stp)
+{
+	struct nfs4_stateowner *lock_sop;
+
+	while (!list_empty(&open_stp->st_lockowners)) {
+		lock_sop = list_entry(open_stp->st_lockowners.next,
+				struct nfs4_stateowner, so_perstateid);
+		/* list_del(&open_stp->st_lockowners);  */
+		BUG_ON(lock_sop->so_is_open_owner);
+		release_lockowner(lock_sop);
+	}
+}
+
+static void release_open_stateid(struct nfs4_stateid *stp)
+{
+	unhash_generic_stateid(stp);
+	release_stateid_lockowners(stp);
+	nfsd_close(stp->st_vfs_file);
+	free_generic_stateid(stp);
+}
+
+static void unhash_openowner(struct nfs4_stateowner *sop)
+{
+	struct nfs4_stateid *stp;
+
+	list_del(&sop->so_idhash);
+	list_del(&sop->so_strhash);
+	list_del(&sop->so_perclient);
+	list_del(&sop->so_perstateid); /* XXX: necessary? */
+	while (!list_empty(&sop->so_stateids)) {
+		stp = list_first_entry(&sop->so_stateids,
+				struct nfs4_stateid, st_perstateowner);
+		release_open_stateid(stp);
+	}
+}
+
+static void release_openowner(struct nfs4_stateowner *sop)
+{
+	unhash_openowner(sop);
+	list_del(&sop->so_close_lru);
+	nfs4_put_stateowner(sop);
+}
+
+static DEFINE_SPINLOCK(sessionid_lock);
+#define SESSION_HASH_SIZE	512
+static struct list_head sessionid_hashtbl[SESSION_HASH_SIZE];
+
+static inline int
+hash_sessionid(struct nfs4_sessionid *sessionid)
+{
+	struct nfsd4_sessionid *sid = (struct nfsd4_sessionid *)sessionid;
+
+	return sid->sequence % SESSION_HASH_SIZE;
+}
+
+static inline void
+dump_sessionid(const char *fn, struct nfs4_sessionid *sessionid)
+{
+	u32 *ptr = (u32 *)(&sessionid->data[0]);
+	dprintk("%s: %u:%u:%u:%u\n", fn, ptr[0], ptr[1], ptr[2], ptr[3]);
+}
+
+static void
+gen_sessionid(struct nfsd4_session *ses)
+{
+	struct nfs4_client *clp = ses->se_client;
+	struct nfsd4_sessionid *sid;
+
+	sid = (struct nfsd4_sessionid *)ses->se_sessionid.data;
+	sid->clientid = clp->cl_clientid;
+	sid->sequence = current_sessionid++;
+	sid->reserved = 0;
+}
+
+/*
+ * Give the client the number of slots it requests bound by
+ * NFSD_MAX_SLOTS_PER_SESSION and by sv_drc_max_pages.
+ *
+ * If we run out of pages (sv_drc_pages_used == sv_drc_max_pages) we
+ * should (up to a point) re-negotiate active sessions and reduce their
+ * slot usage to make rooom for new connections. For now we just fail the
+ * create session.
+ */
+static int set_forechannel_maxreqs(struct nfsd4_channel_attrs *fchan)
+{
+	int status = 0, np = fchan->maxreqs * NFSD_PAGES_PER_SLOT;
+
+	spin_lock(&nfsd_serv->sv_lock);
+	if (np + nfsd_serv->sv_drc_pages_used > nfsd_serv->sv_drc_max_pages)
+		np = nfsd_serv->sv_drc_max_pages - nfsd_serv->sv_drc_pages_used;
+	nfsd_serv->sv_drc_pages_used += np;
+	spin_unlock(&nfsd_serv->sv_lock);
+
+	if (np <= 0) {
+		status = nfserr_resource;
+		fchan->maxreqs = 0;
+	} else
+		fchan->maxreqs = np / NFSD_PAGES_PER_SLOT;
+
+	return status;
+}
+
+/*
+ * fchan holds the client values on input, and the server values on output
+ */
+static int init_forechannel_attrs(struct svc_rqst *rqstp,
+				    struct nfsd4_session *session,
+				    struct nfsd4_channel_attrs *fchan)
+{
+	int status = 0;
+	__u32   maxcount = svc_max_payload(rqstp);
+
+	/* headerpadsz set to zero in encode routine */
+
+	/* Use the client's max request and max response size if possible */
+	if (fchan->maxreq_sz > maxcount)
+		fchan->maxreq_sz = maxcount;
+	session->se_fmaxreq_sz = fchan->maxreq_sz;
+
+	if (fchan->maxresp_sz > maxcount)
+		fchan->maxresp_sz = maxcount;
+	session->se_fmaxresp_sz = fchan->maxresp_sz;
+
+	/* Set the max response cached size our default which is
+	 * a multiple of PAGE_SIZE and small */
+	session->se_fmaxresp_cached = NFSD_PAGES_PER_SLOT * PAGE_SIZE;
+	fchan->maxresp_cached = session->se_fmaxresp_cached;
+
+	/* Use the client's maxops if possible */
+	if (fchan->maxops > NFSD_MAX_OPS_PER_COMPOUND)
+		fchan->maxops = NFSD_MAX_OPS_PER_COMPOUND;
+	session->se_fmaxops = fchan->maxops;
+
+	/* try to use the client requested number of slots */
+	if (fchan->maxreqs > NFSD_MAX_SLOTS_PER_SESSION)
+		fchan->maxreqs = NFSD_MAX_SLOTS_PER_SESSION;
+
+	/* FIXME: Error means no more DRC pages so the server should
+	 * recover pages from existing sessions. For now fail session
+	 * creation.
+	 */
+	status = set_forechannel_maxreqs(fchan);
+
+	session->se_fnumslots = fchan->maxreqs;
+	return status;
+}
+
+static int
+alloc_init_session(struct svc_rqst *rqstp, struct nfs4_client *clp,
+		   struct nfsd4_create_session *cses)
+{
+	struct nfsd4_session *new, tmp;
+	int idx, status = nfserr_resource, slotsize;
+
+	memset(&tmp, 0, sizeof(tmp));
+
+	/* FIXME: For now, we just accept the client back channel attributes. */
+	status = init_forechannel_attrs(rqstp, &tmp, &cses->fore_channel);
+	if (status)
+		goto out;
+
+	/* allocate struct nfsd4_session and slot table in one piece */
+	slotsize = tmp.se_fnumslots * sizeof(struct nfsd4_slot);
+	new = kzalloc(sizeof(*new) + slotsize, GFP_KERNEL);
+	if (!new)
+		goto out;
+
+	memcpy(new, &tmp, sizeof(*new));
+
+	new->se_client = clp;
+	gen_sessionid(new);
+	idx = hash_sessionid(&new->se_sessionid);
+	memcpy(clp->cl_sessionid.data, new->se_sessionid.data,
+	       NFS4_MAX_SESSIONID_LEN);
+
+	new->se_flags = cses->flags;
+	kref_init(&new->se_ref);
+	spin_lock(&sessionid_lock);
+	list_add(&new->se_hash, &sessionid_hashtbl[idx]);
+	list_add(&new->se_perclnt, &clp->cl_sessions);
+	spin_unlock(&sessionid_lock);
+
+	status = nfs_ok;
+out:
+	return status;
+}
+
+/* caller must hold sessionid_lock */
+static struct nfsd4_session *
+find_in_sessionid_hashtbl(struct nfs4_sessionid *sessionid)
+{
+	struct nfsd4_session *elem;
+	int idx;
+
+	dump_sessionid(__func__, sessionid);
+	idx = hash_sessionid(sessionid);
+	dprintk("%s: idx is %d\n", __func__, idx);
+	/* Search in the appropriate list */
+	list_for_each_entry(elem, &sessionid_hashtbl[idx], se_hash) {
+		dump_sessionid("list traversal", &elem->se_sessionid);
+		if (!memcmp(elem->se_sessionid.data, sessionid->data,
+			    NFS4_MAX_SESSIONID_LEN)) {
+			return elem;
+		}
+	}
+
+	dprintk("%s: session not found\n", __func__);
+	return NULL;
+}
+
+/* caller must hold sessionid_lock */
+static void
+unhash_session(struct nfsd4_session *ses)
+{
+	list_del(&ses->se_hash);
+	list_del(&ses->se_perclnt);
+}
+
+static void
+release_session(struct nfsd4_session *ses)
+{
+	spin_lock(&sessionid_lock);
+	unhash_session(ses);
+	spin_unlock(&sessionid_lock);
+	nfsd4_put_session(ses);
+}
+
+static void nfsd4_release_respages(struct page **respages, short resused);
+
+void
+free_session(struct kref *kref)
+{
+	struct nfsd4_session *ses;
+	int i;
+
+	ses = container_of(kref, struct nfsd4_session, se_ref);
+	for (i = 0; i < ses->se_fnumslots; i++) {
+		struct nfsd4_cache_entry *e = &ses->se_slots[i].sl_cache_entry;
+		nfsd4_release_respages(e->ce_respages, e->ce_resused);
+	}
+	kfree(ses->se_slots);
+	kfree(ses);
+}
+
 static inline void
 renew_client(struct nfs4_client *clp)
 {
@@ -330,8 +603,8 @@ STALE_CLIENTID(clientid_t *clid)
 {
 	if (clid->cl_boot == boot_time)
 		return 0;
-	dprintk("NFSD stale clientid (%08x/%08x)\n", 
-			clid->cl_boot, clid->cl_id);
+	dprintk("NFSD stale clientid (%08x/%08x) boot_time %08lx\n",
+		clid->cl_boot, clid->cl_id, boot_time);
 	return 1;
 }
 
@@ -376,6 +649,8 @@ static inline void
 free_client(struct nfs4_client *clp)
 {
 	shutdown_callback_client(clp);
+	nfsd4_release_respages(clp->cl_slot.sl_cache_entry.ce_respages,
+			     clp->cl_slot.sl_cache_entry.ce_resused);
 	if (clp->cl_cred.cr_group_info)
 		put_group_info(clp->cl_cred.cr_group_info);
 	kfree(clp->cl_principal);
@@ -420,7 +695,13 @@ expire_client(struct nfs4_client *clp)
 	list_del(&clp->cl_lru);
 	while (!list_empty(&clp->cl_openowners)) {
 		sop = list_entry(clp->cl_openowners.next, struct nfs4_stateowner, so_perclient);
-		release_stateowner(sop);
+		release_openowner(sop);
+	}
+	while (!list_empty(&clp->cl_sessions)) {
+		struct nfsd4_session  *ses;
+		ses = list_entry(clp->cl_sessions.next, struct nfsd4_session,
+				 se_perclnt);
+		release_session(ses);
 	}
 	put_nfs4_client(clp);
 }
@@ -439,6 +720,7 @@ static struct nfs4_client *create_client(struct xdr_netobj name, char *recdir)
 	INIT_LIST_HEAD(&clp->cl_strhash);
 	INIT_LIST_HEAD(&clp->cl_openowners);
 	INIT_LIST_HEAD(&clp->cl_delegations);
+	INIT_LIST_HEAD(&clp->cl_sessions);
 	INIT_LIST_HEAD(&clp->cl_lru);
 	return clp;
 }
@@ -568,25 +850,45 @@ find_unconfirmed_client(clientid_t *clid)
 	return NULL;
 }
 
+/*
+ * Return 1 iff clp's clientid establishment method matches the use_exchange_id
+ * parameter. Matching is based on the fact the at least one of the
+ * EXCHGID4_FLAG_USE_{NON_PNFS,PNFS_MDS,PNFS_DS} flags must be set for v4.1
+ *
+ * FIXME: we need to unify the clientid namespaces for nfsv4.x
+ * and correctly deal with client upgrade/downgrade in EXCHANGE_ID
+ * and SET_CLIENTID{,_CONFIRM}
+ */
+static inline int
+match_clientid_establishment(struct nfs4_client *clp, bool use_exchange_id)
+{
+	bool has_exchange_flags = (clp->cl_exchange_flags != 0);
+	return use_exchange_id == has_exchange_flags;
+}
+
 static struct nfs4_client *
-find_confirmed_client_by_str(const char *dname, unsigned int hashval)
+find_confirmed_client_by_str(const char *dname, unsigned int hashval,
+			     bool use_exchange_id)
 {
 	struct nfs4_client *clp;
 
 	list_for_each_entry(clp, &conf_str_hashtbl[hashval], cl_strhash) {
-		if (same_name(clp->cl_recdir, dname))
+		if (same_name(clp->cl_recdir, dname) &&
+		    match_clientid_establishment(clp, use_exchange_id))
 			return clp;
 	}
 	return NULL;
 }
 
 static struct nfs4_client *
-find_unconfirmed_client_by_str(const char *dname, unsigned int hashval)
+find_unconfirmed_client_by_str(const char *dname, unsigned int hashval,
+			       bool use_exchange_id)
 {
 	struct nfs4_client *clp;
 
 	list_for_each_entry(clp, &unconf_str_hashtbl[hashval], cl_strhash) {
-		if (same_name(clp->cl_recdir, dname))
+		if (same_name(clp->cl_recdir, dname) &&
+		    match_clientid_establishment(clp, use_exchange_id))
 			return clp;
 	}
 	return NULL;
@@ -685,6 +987,534 @@ out_err:
 	return;
 }
 
+void
+nfsd4_set_statp(struct svc_rqst *rqstp, __be32 *statp)
+{
+	struct nfsd4_compoundres *resp = rqstp->rq_resp;
+
+	resp->cstate.statp = statp;
+}
+
+/*
+ * Dereference the result pages.
+ */
+static void
+nfsd4_release_respages(struct page **respages, short resused)
+{
+	int i;
+
+	dprintk("--> %s\n", __func__);
+	for (i = 0; i < resused; i++) {
+		if (!respages[i])
+			continue;
+		put_page(respages[i]);
+		respages[i] = NULL;
+	}
+}
+
+static void
+nfsd4_copy_pages(struct page **topages, struct page **frompages, short count)
+{
+	int i;
+
+	for (i = 0; i < count; i++) {
+		topages[i] = frompages[i];
+		if (!topages[i])
+			continue;
+		get_page(topages[i]);
+	}
+}
+
+/*
+ * Cache the reply pages up to NFSD_PAGES_PER_SLOT + 1, clearing the previous
+ * pages. We add a page to NFSD_PAGES_PER_SLOT for the case where the total
+ * length of the XDR response is less than se_fmaxresp_cached
+ * (NFSD_PAGES_PER_SLOT * PAGE_SIZE) but the xdr_buf pages is used for a
+ * of the reply (e.g. readdir).
+ *
+ * Store the base and length of the rq_req.head[0] page
+ * of the NFSv4.1 data, just past the rpc header.
+ */
+void
+nfsd4_store_cache_entry(struct nfsd4_compoundres *resp)
+{
+	struct nfsd4_cache_entry *entry = &resp->cstate.slot->sl_cache_entry;
+	struct svc_rqst *rqstp = resp->rqstp;
+	struct nfsd4_compoundargs *args = rqstp->rq_argp;
+	struct nfsd4_op *op = &args->ops[resp->opcnt];
+	struct kvec *resv = &rqstp->rq_res.head[0];
+
+	dprintk("--> %s entry %p\n", __func__, entry);
+
+	/* Don't cache a failed OP_SEQUENCE. */
+	if (resp->opcnt == 1 && op->opnum == OP_SEQUENCE && resp->cstate.status)
+		return;
+
+	nfsd4_release_respages(entry->ce_respages, entry->ce_resused);
+	entry->ce_opcnt = resp->opcnt;
+	entry->ce_status = resp->cstate.status;
+
+	/*
+	 * Don't need a page to cache just the sequence operation - the slot
+	 * does this for us!
+	 */
+
+	if (nfsd4_not_cached(resp)) {
+		entry->ce_resused = 0;
+		entry->ce_rpchdrlen = 0;
+		dprintk("%s Just cache SEQUENCE. ce_cachethis %d\n", __func__,
+			resp->cstate.slot->sl_cache_entry.ce_cachethis);
+		return;
+	}
+	entry->ce_resused = rqstp->rq_resused;
+	if (entry->ce_resused > NFSD_PAGES_PER_SLOT + 1)
+		entry->ce_resused = NFSD_PAGES_PER_SLOT + 1;
+	nfsd4_copy_pages(entry->ce_respages, rqstp->rq_respages,
+			 entry->ce_resused);
+	entry->ce_datav.iov_base = resp->cstate.statp;
+	entry->ce_datav.iov_len = resv->iov_len - ((char *)resp->cstate.statp -
+				(char *)page_address(rqstp->rq_respages[0]));
+	/* Current request rpc header length*/
+	entry->ce_rpchdrlen = (char *)resp->cstate.statp -
+				(char *)page_address(rqstp->rq_respages[0]);
+}
+
+/*
+ * We keep the rpc header, but take the nfs reply from the replycache.
+ */
+static int
+nfsd41_copy_replay_data(struct nfsd4_compoundres *resp,
+			struct nfsd4_cache_entry *entry)
+{
+	struct svc_rqst *rqstp = resp->rqstp;
+	struct kvec *resv = &resp->rqstp->rq_res.head[0];
+	int len;
+
+	/* Current request rpc header length*/
+	len = (char *)resp->cstate.statp -
+			(char *)page_address(rqstp->rq_respages[0]);
+	if (entry->ce_datav.iov_len + len > PAGE_SIZE) {
+		dprintk("%s v41 cached reply too large (%Zd).\n", __func__,
+			entry->ce_datav.iov_len);
+		return 0;
+	}
+	/* copy the cached reply nfsd data past the current rpc header */
+	memcpy((char *)resv->iov_base + len, entry->ce_datav.iov_base,
+		entry->ce_datav.iov_len);
+	resv->iov_len = len + entry->ce_datav.iov_len;
+	return 1;
+}
+
+/*
+ * Keep the first page of the replay. Copy the NFSv4.1 data from the first
+ * cached page.  Replace any futher replay pages from the cache.
+ */
+__be32
+nfsd4_replay_cache_entry(struct nfsd4_compoundres *resp,
+			 struct nfsd4_sequence *seq)
+{
+	struct nfsd4_cache_entry *entry = &resp->cstate.slot->sl_cache_entry;
+	__be32 status;
+
+	dprintk("--> %s entry %p\n", __func__, entry);
+
+	/*
+	 * If this is just the sequence operation, we did not keep
+	 * a page in the cache entry because we can just use the
+	 * slot info stored in struct nfsd4_sequence that was checked
+	 * against the slot in nfsd4_sequence().
+	 *
+	 * This occurs when seq->cachethis is FALSE, or when the client
+	 * session inactivity timer fires and a solo sequence operation
+	 * is sent (lease renewal).
+	 */
+	if (seq && nfsd4_not_cached(resp)) {
+		seq->maxslots = resp->cstate.session->se_fnumslots;
+		return nfs_ok;
+	}
+
+	if (!nfsd41_copy_replay_data(resp, entry)) {
+		/*
+		 * Not enough room to use the replay rpc header, send the
+		 * cached header. Release all the allocated result pages.
+		 */
+		svc_free_res_pages(resp->rqstp);
+		nfsd4_copy_pages(resp->rqstp->rq_respages, entry->ce_respages,
+			entry->ce_resused);
+	} else {
+		/* Release all but the first allocated result page */
+
+		resp->rqstp->rq_resused--;
+		svc_free_res_pages(resp->rqstp);
+
+		nfsd4_copy_pages(&resp->rqstp->rq_respages[1],
+				 &entry->ce_respages[1],
+				 entry->ce_resused - 1);
+	}
+
+	resp->rqstp->rq_resused = entry->ce_resused;
+	resp->opcnt = entry->ce_opcnt;
+	resp->cstate.iovlen = entry->ce_datav.iov_len + entry->ce_rpchdrlen;
+	status = entry->ce_status;
+
+	return status;
+}
+
+/*
+ * Set the exchange_id flags returned by the server.
+ */
+static void
+nfsd4_set_ex_flags(struct nfs4_client *new, struct nfsd4_exchange_id *clid)
+{
+	/* pNFS is not supported */
+	new->cl_exchange_flags |= EXCHGID4_FLAG_USE_NON_PNFS;
+
+	/* Referrals are supported, Migration is not. */
+	new->cl_exchange_flags |= EXCHGID4_FLAG_SUPP_MOVED_REFER;
+
+	/* set the wire flags to return to client. */
+	clid->flags = new->cl_exchange_flags;
+}
+
+__be32
+nfsd4_exchange_id(struct svc_rqst *rqstp,
+		  struct nfsd4_compound_state *cstate,
+		  struct nfsd4_exchange_id *exid)
+{
+	struct nfs4_client *unconf, *conf, *new;
+	int status;
+	unsigned int		strhashval;
+	char			dname[HEXDIR_LEN];
+	nfs4_verifier		verf = exid->verifier;
+	u32			ip_addr = svc_addr_in(rqstp)->sin_addr.s_addr;
+
+	dprintk("%s rqstp=%p exid=%p clname.len=%u clname.data=%p "
+		" ip_addr=%u flags %x, spa_how %d\n",
+		__func__, rqstp, exid, exid->clname.len, exid->clname.data,
+		ip_addr, exid->flags, exid->spa_how);
+
+	if (!check_name(exid->clname) || (exid->flags & ~EXCHGID4_FLAG_MASK_A))
+		return nfserr_inval;
+
+	/* Currently only support SP4_NONE */
+	switch (exid->spa_how) {
+	case SP4_NONE:
+		break;
+	case SP4_SSV:
+		return nfserr_encr_alg_unsupp;
+	default:
+		BUG();				/* checked by xdr code */
+	case SP4_MACH_CRED:
+		return nfserr_serverfault;	/* no excuse :-/ */
+	}
+
+	status = nfs4_make_rec_clidname(dname, &exid->clname);
+
+	if (status)
+		goto error;
+
+	strhashval = clientstr_hashval(dname);
+
+	nfs4_lock_state();
+	status = nfs_ok;
+
+	conf = find_confirmed_client_by_str(dname, strhashval, true);
+	if (conf) {
+		if (!same_verf(&verf, &conf->cl_verifier)) {
+			/* 18.35.4 case 8 */
+			if (exid->flags & EXCHGID4_FLAG_UPD_CONFIRMED_REC_A) {
+				status = nfserr_not_same;
+				goto out;
+			}
+			/* Client reboot: destroy old state */
+			expire_client(conf);
+			goto out_new;
+		}
+		if (!same_creds(&conf->cl_cred, &rqstp->rq_cred)) {
+			/* 18.35.4 case 9 */
+			if (exid->flags & EXCHGID4_FLAG_UPD_CONFIRMED_REC_A) {
+				status = nfserr_perm;
+				goto out;
+			}
+			expire_client(conf);
+			goto out_new;
+		}
+		if (ip_addr != conf->cl_addr &&
+		    !(exid->flags & EXCHGID4_FLAG_UPD_CONFIRMED_REC_A)) {
+			/* Client collision. 18.35.4 case 3 */
+			status = nfserr_clid_inuse;
+			goto out;
+		}
+		/*
+		 * Set bit when the owner id and verifier map to an already
+		 * confirmed client id (18.35.3).
+		 */
+		exid->flags |= EXCHGID4_FLAG_CONFIRMED_R;
+
+		/*
+		 * Falling into 18.35.4 case 2, possible router replay.
+		 * Leave confirmed record intact and return same result.
+		 */
+		copy_verf(conf, &verf);
+		new = conf;
+		goto out_copy;
+	} else {
+		/* 18.35.4 case 7 */
+		if (exid->flags & EXCHGID4_FLAG_UPD_CONFIRMED_REC_A) {
+			status = nfserr_noent;
+			goto out;
+		}
+	}
+
+	unconf  = find_unconfirmed_client_by_str(dname, strhashval, true);
+	if (unconf) {
+		/*
+		 * Possible retry or client restart.  Per 18.35.4 case 4,
+		 * a new unconfirmed record should be generated regardless
+		 * of whether any properties have changed.
+		 */
+		expire_client(unconf);
+	}
+
+out_new:
+	/* Normal case */
+	new = create_client(exid->clname, dname);
+	if (new == NULL) {
+		status = nfserr_resource;
+		goto out;
+	}
+
+	copy_verf(new, &verf);
+	copy_cred(&new->cl_cred, &rqstp->rq_cred);
+	new->cl_addr = ip_addr;
+	gen_clid(new);
+	gen_confirm(new);
+	add_to_unconfirmed(new, strhashval);
+out_copy:
+	exid->clientid.cl_boot = new->cl_clientid.cl_boot;
+	exid->clientid.cl_id = new->cl_clientid.cl_id;
+
+	new->cl_slot.sl_seqid = 0;
+	exid->seqid = 1;
+	nfsd4_set_ex_flags(new, exid);
+
+	dprintk("nfsd4_exchange_id seqid %d flags %x\n",
+		new->cl_slot.sl_seqid, new->cl_exchange_flags);
+	status = nfs_ok;
+
+out:
+	nfs4_unlock_state();
+error:
+	dprintk("nfsd4_exchange_id returns %d\n", ntohl(status));
+	return status;
+}
+
+static int
+check_slot_seqid(u32 seqid, struct nfsd4_slot *slot)
+{
+	dprintk("%s enter. seqid %d slot->sl_seqid %d\n", __func__, seqid,
+		slot->sl_seqid);
+
+	/* The slot is in use, and no response has been sent. */
+	if (slot->sl_inuse) {
+		if (seqid == slot->sl_seqid)
+			return nfserr_jukebox;
+		else
+			return nfserr_seq_misordered;
+	}
+	/* Normal */
+	if (likely(seqid == slot->sl_seqid + 1))
+		return nfs_ok;
+	/* Replay */
+	if (seqid == slot->sl_seqid)
+		return nfserr_replay_cache;
+	/* Wraparound */
+	if (seqid == 1 && (slot->sl_seqid + 1) == 0)
+		return nfs_ok;
+	/* Misordered replay or misordered new request */
+	return nfserr_seq_misordered;
+}
+
+__be32
+nfsd4_create_session(struct svc_rqst *rqstp,
+		     struct nfsd4_compound_state *cstate,
+		     struct nfsd4_create_session *cr_ses)
+{
+	u32 ip_addr = svc_addr_in(rqstp)->sin_addr.s_addr;
+	struct nfsd4_compoundres *resp = rqstp->rq_resp;
+	struct nfs4_client *conf, *unconf;
+	struct nfsd4_slot *slot = NULL;
+	int status = 0;
+
+	nfs4_lock_state();
+	unconf = find_unconfirmed_client(&cr_ses->clientid);
+	conf = find_confirmed_client(&cr_ses->clientid);
+
+	if (conf) {
+		slot = &conf->cl_slot;
+		status = check_slot_seqid(cr_ses->seqid, slot);
+		if (status == nfserr_replay_cache) {
+			dprintk("Got a create_session replay! seqid= %d\n",
+				slot->sl_seqid);
+			cstate->slot = slot;
+			cstate->status = status;
+			/* Return the cached reply status */
+			status = nfsd4_replay_cache_entry(resp, NULL);
+			goto out;
+		} else if (cr_ses->seqid != conf->cl_slot.sl_seqid + 1) {
+			status = nfserr_seq_misordered;
+			dprintk("Sequence misordered!\n");
+			dprintk("Expected seqid= %d but got seqid= %d\n",
+				slot->sl_seqid, cr_ses->seqid);
+			goto out;
+		}
+		conf->cl_slot.sl_seqid++;
+	} else if (unconf) {
+		if (!same_creds(&unconf->cl_cred, &rqstp->rq_cred) ||
+		    (ip_addr != unconf->cl_addr)) {
+			status = nfserr_clid_inuse;
+			goto out;
+		}
+
+		slot = &unconf->cl_slot;
+		status = check_slot_seqid(cr_ses->seqid, slot);
+		if (status) {
+			/* an unconfirmed replay returns misordered */
+			status = nfserr_seq_misordered;
+			goto out;
+		}
+
+		slot->sl_seqid++; /* from 0 to 1 */
+		move_to_confirmed(unconf);
+
+		/*
+		 * We do not support RDMA or persistent sessions
+		 */
+		cr_ses->flags &= ~SESSION4_PERSIST;
+		cr_ses->flags &= ~SESSION4_RDMA;
+
+		conf = unconf;
+	} else {
+		status = nfserr_stale_clientid;
+		goto out;
+	}
+
+	status = alloc_init_session(rqstp, conf, cr_ses);
+	if (status)
+		goto out;
+
+	memcpy(cr_ses->sessionid.data, conf->cl_sessionid.data,
+	       NFS4_MAX_SESSIONID_LEN);
+	cr_ses->seqid = slot->sl_seqid;
+
+	slot->sl_inuse = true;
+	cstate->slot = slot;
+	/* Ensure a page is used for the cache */
+	slot->sl_cache_entry.ce_cachethis = 1;
+out:
+	nfs4_unlock_state();
+	dprintk("%s returns %d\n", __func__, ntohl(status));
+	return status;
+}
+
+__be32
+nfsd4_destroy_session(struct svc_rqst *r,
+		      struct nfsd4_compound_state *cstate,
+		      struct nfsd4_destroy_session *sessionid)
+{
+	struct nfsd4_session *ses;
+	u32 status = nfserr_badsession;
+
+	/* Notes:
+	 * - The confirmed nfs4_client->cl_sessionid holds destroyed sessinid
+	 * - Should we return nfserr_back_chan_busy if waiting for
+	 *   callbacks on to-be-destroyed session?
+	 * - Do we need to clear any callback info from previous session?
+	 */
+
+	dump_sessionid(__func__, &sessionid->sessionid);
+	spin_lock(&sessionid_lock);
+	ses = find_in_sessionid_hashtbl(&sessionid->sessionid);
+	if (!ses) {
+		spin_unlock(&sessionid_lock);
+		goto out;
+	}
+
+	unhash_session(ses);
+	spin_unlock(&sessionid_lock);
+
+	/* wait for callbacks */
+	shutdown_callback_client(ses->se_client);
+	nfsd4_put_session(ses);
+	status = nfs_ok;
+out:
+	dprintk("%s returns %d\n", __func__, ntohl(status));
+	return status;
+}
+
+__be32
+nfsd4_sequence(struct svc_rqst *rqstp,
+	       struct nfsd4_compound_state *cstate,
+	       struct nfsd4_sequence *seq)
+{
+	struct nfsd4_compoundres *resp = rqstp->rq_resp;
+	struct nfsd4_session *session;
+	struct nfsd4_slot *slot;
+	int status;
+
+	if (resp->opcnt != 1)
+		return nfserr_sequence_pos;
+
+	spin_lock(&sessionid_lock);
+	status = nfserr_badsession;
+	session = find_in_sessionid_hashtbl(&seq->sessionid);
+	if (!session)
+		goto out;
+
+	status = nfserr_badslot;
+	if (seq->slotid >= session->se_fnumslots)
+		goto out;
+
+	slot = &session->se_slots[seq->slotid];
+	dprintk("%s: slotid %d\n", __func__, seq->slotid);
+
+	status = check_slot_seqid(seq->seqid, slot);
+	if (status == nfserr_replay_cache) {
+		cstate->slot = slot;
+		cstate->session = session;
+		/* Return the cached reply status and set cstate->status
+		 * for nfsd4_svc_encode_compoundres processing */
+		status = nfsd4_replay_cache_entry(resp, seq);
+		cstate->status = nfserr_replay_cache;
+		goto replay_cache;
+	}
+	if (status)
+		goto out;
+
+	/* Success! bump slot seqid */
+	slot->sl_inuse = true;
+	slot->sl_seqid = seq->seqid;
+	slot->sl_cache_entry.ce_cachethis = seq->cachethis;
+	/* Always set the cache entry cachethis for solo sequence */
+	if (nfsd4_is_solo_sequence(resp))
+		slot->sl_cache_entry.ce_cachethis = 1;
+
+	cstate->slot = slot;
+	cstate->session = session;
+
+replay_cache:
+	/* Renew the clientid on success and on replay.
+	 * Hold a session reference until done processing the compound:
+	 * nfsd4_put_session called only if the cstate slot is set.
+	 */
+	renew_client(session->se_client);
+	nfsd4_get_session(session);
+out:
+	spin_unlock(&sessionid_lock);
+	dprintk("%s: return %d\n", __func__, ntohl(status));
+	return status;
+}
+
 __be32
 nfsd4_setclientid(struct svc_rqst *rqstp, struct nfsd4_compound_state *cstate,
 		  struct nfsd4_setclientid *setclid)
@@ -716,14 +1546,13 @@ nfsd4_setclientid(struct svc_rqst *rqstp, struct nfsd4_compound_state *cstate,
 	strhashval = clientstr_hashval(dname);
 
 	nfs4_lock_state();
-	conf = find_confirmed_client_by_str(dname, strhashval);
+	conf = find_confirmed_client_by_str(dname, strhashval, false);
 	if (conf) {
 		/* RFC 3530 14.2.33 CASE 0: */
 		status = nfserr_clid_inuse;
-		if (!same_creds(&conf->cl_cred, &rqstp->rq_cred)
-				|| conf->cl_addr != sin->sin_addr.s_addr) {
-			dprintk("NFSD: setclientid: string in use by clientat %pI4\n",
-				&conf->cl_addr);
+		if (!same_creds(&conf->cl_cred, &rqstp->rq_cred)) {
+			dprintk("NFSD: setclientid: string in use by client"
+				" at %pI4\n", &conf->cl_addr);
 			goto out;
 		}
 	}
@@ -732,7 +1561,7 @@ nfsd4_setclientid(struct svc_rqst *rqstp, struct nfsd4_compound_state *cstate,
 	 * has a description of SETCLIENTID request processing consisting
 	 * of 5 bullet points, labeled as CASE0 - CASE4 below.
 	 */
-	unconf = find_unconfirmed_client_by_str(dname, strhashval);
+	unconf = find_unconfirmed_client_by_str(dname, strhashval, false);
 	status = nfserr_resource;
 	if (!conf) {
 		/*
@@ -887,7 +1716,7 @@ nfsd4_setclientid_confirm(struct svc_rqst *rqstp,
 			unsigned int hash =
 				clientstr_hashval(unconf->cl_recdir);
 			conf = find_confirmed_client_by_str(unconf->cl_recdir,
-									hash);
+							    hash, false);
 			if (conf) {
 				nfsd4_remove_clid_dir(conf);
 				expire_client(conf);
@@ -923,11 +1752,13 @@ alloc_init_file(struct inode *ino)
 
 	fp = kmem_cache_alloc(file_slab, GFP_KERNEL);
 	if (fp) {
-		kref_init(&fp->fi_ref);
+		atomic_set(&fp->fi_ref, 1);
 		INIT_LIST_HEAD(&fp->fi_hash);
 		INIT_LIST_HEAD(&fp->fi_stateids);
 		INIT_LIST_HEAD(&fp->fi_delegations);
+		spin_lock(&recall_lock);
 		list_add(&fp->fi_hash, &file_hashtbl[hashval]);
+		spin_unlock(&recall_lock);
 		fp->fi_inode = igrab(ino);
 		fp->fi_id = current_fileid++;
 		fp->fi_had_conflict = false;
@@ -1037,48 +1868,6 @@ alloc_init_open_stateowner(unsigned int strhashval, struct nfs4_client *clp, str
 	return sop;
 }
 
-static void
-release_stateid_lockowners(struct nfs4_stateid *open_stp)
-{
-	struct nfs4_stateowner *lock_sop;
-
-	while (!list_empty(&open_stp->st_lockowners)) {
-		lock_sop = list_entry(open_stp->st_lockowners.next,
-				struct nfs4_stateowner, so_perstateid);
-		/* list_del(&open_stp->st_lockowners);  */
-		BUG_ON(lock_sop->so_is_open_owner);
-		release_stateowner(lock_sop);
-	}
-}
-
-static void
-unhash_stateowner(struct nfs4_stateowner *sop)
-{
-	struct nfs4_stateid *stp;
-
-	list_del(&sop->so_idhash);
-	list_del(&sop->so_strhash);
-	if (sop->so_is_open_owner)
-		list_del(&sop->so_perclient);
-	list_del(&sop->so_perstateid);
-	while (!list_empty(&sop->so_stateids)) {
-		stp = list_entry(sop->so_stateids.next,
-			struct nfs4_stateid, st_perstateowner);
-		if (sop->so_is_open_owner)
-			release_stateid(stp, OPEN_STATE);
-		else
-			release_stateid(stp, LOCK_STATE);
-	}
-}
-
-static void
-release_stateowner(struct nfs4_stateowner *sop)
-{
-	unhash_stateowner(sop);
-	list_del(&sop->so_close_lru);
-	nfs4_put_stateowner(sop);
-}
-
 static inline void
 init_stateid(struct nfs4_stateid *stp, struct nfs4_file *fp, struct nfsd4_open *open) {
 	struct nfs4_stateowner *sop = open->op_stateowner;
@@ -1100,30 +1889,13 @@ init_stateid(struct nfs4_stateid *stp, struct nfs4_file *fp, struct nfsd4_open *
 	stp->st_stateid.si_generation = 0;
 	stp->st_access_bmap = 0;
 	stp->st_deny_bmap = 0;
-	__set_bit(open->op_share_access, &stp->st_access_bmap);
+	__set_bit(open->op_share_access & ~NFS4_SHARE_WANT_MASK,
+		  &stp->st_access_bmap);
 	__set_bit(open->op_share_deny, &stp->st_deny_bmap);
 	stp->st_openstp = NULL;
 }
 
 static void
-release_stateid(struct nfs4_stateid *stp, int flags)
-{
-	struct file *filp = stp->st_vfs_file;
-
-	list_del(&stp->st_hash);
-	list_del(&stp->st_perfile);
-	list_del(&stp->st_perstateowner);
-	if (flags & OPEN_STATE) {
-		release_stateid_lockowners(stp);
-		stp->st_vfs_file = NULL;
-		nfsd_close(filp);
-	} else if (flags & LOCK_STATE)
-		locks_remove_posix(filp, (fl_owner_t) stp->st_stateowner);
-	put_nfs4_file(stp->st_file);
-	kmem_cache_free(stateid_slab, stp);
-}
-
-static void
 move_to_close_lru(struct nfs4_stateowner *sop)
 {
 	dprintk("NFSD: move_to_close_lru nfs4_stateowner %p\n", sop);
@@ -1160,20 +1932,33 @@ find_file(struct inode *ino)
 	unsigned int hashval = file_hashval(ino);
 	struct nfs4_file *fp;
 
+	spin_lock(&recall_lock);
 	list_for_each_entry(fp, &file_hashtbl[hashval], fi_hash) {
 		if (fp->fi_inode == ino) {
 			get_nfs4_file(fp);
+			spin_unlock(&recall_lock);
 			return fp;
 		}
 	}
+	spin_unlock(&recall_lock);
 	return NULL;
 }
 
-static inline int access_valid(u32 x)
+static inline int access_valid(u32 x, u32 minorversion)
 {
-	if (x < NFS4_SHARE_ACCESS_READ)
+	if ((x & NFS4_SHARE_ACCESS_MASK) < NFS4_SHARE_ACCESS_READ)
 		return 0;
-	if (x > NFS4_SHARE_ACCESS_BOTH)
+	if ((x & NFS4_SHARE_ACCESS_MASK) > NFS4_SHARE_ACCESS_BOTH)
+		return 0;
+	x &= ~NFS4_SHARE_ACCESS_MASK;
+	if (minorversion && x) {
+		if ((x & NFS4_SHARE_WANT_MASK) > NFS4_SHARE_WANT_CANCEL)
+			return 0;
+		if ((x & NFS4_SHARE_WHEN_MASK) > NFS4_SHARE_PUSH_DELEG_WHEN_UNCONTENDED)
+			return 0;
+		x &= ~(NFS4_SHARE_WANT_MASK | NFS4_SHARE_WHEN_MASK);
+	}
+	if (x)
 		return 0;
 	return 1;
 }
@@ -1409,7 +2194,8 @@ static struct lock_manager_operations nfsd_lease_mng_ops = {
 
 
 __be32
-nfsd4_process_open1(struct nfsd4_open *open)
+nfsd4_process_open1(struct nfsd4_compound_state *cstate,
+		    struct nfsd4_open *open)
 {
 	clientid_t *clientid = &open->op_clientid;
 	struct nfs4_client *clp = NULL;
@@ -1432,10 +2218,13 @@ nfsd4_process_open1(struct nfsd4_open *open)
 			return nfserr_expired;
 		goto renew;
 	}
+	/* When sessions are used, skip open sequenceid processing */
+	if (nfsd4_has_session(cstate))
+		goto renew;
 	if (!sop->so_confirmed) {
 		/* Replace unconfirmed owners without checking for replay. */
 		clp = sop->so_client;
-		release_stateowner(sop);
+		release_openowner(sop);
 		open->op_stateowner = NULL;
 		goto renew;
 	}
@@ -1709,6 +2498,7 @@ out:
 __be32
 nfsd4_process_open2(struct svc_rqst *rqstp, struct svc_fh *current_fh, struct nfsd4_open *open)
 {
+	struct nfsd4_compoundres *resp = rqstp->rq_resp;
 	struct nfs4_file *fp = NULL;
 	struct inode *ino = current_fh->fh_dentry->d_inode;
 	struct nfs4_stateid *stp = NULL;
@@ -1716,7 +2506,7 @@ nfsd4_process_open2(struct svc_rqst *rqstp, struct svc_fh *current_fh, struct nf
 	__be32 status;
 
 	status = nfserr_inval;
-	if (!access_valid(open->op_share_access)
+	if (!access_valid(open->op_share_access, resp->cstate.minorversion)
 			|| !deny_valid(open->op_share_deny))
 		goto out;
 	/*
@@ -1764,12 +2554,17 @@ nfsd4_process_open2(struct svc_rqst *rqstp, struct svc_fh *current_fh, struct nf
 		init_stateid(stp, fp, open);
 		status = nfsd4_truncate(rqstp, current_fh, open);
 		if (status) {
-			release_stateid(stp, OPEN_STATE);
+			release_open_stateid(stp);
 			goto out;
 		}
+		if (nfsd4_has_session(&resp->cstate))
+			update_stateid(&stp->st_stateid);
 	}
 	memcpy(&open->op_stateid, &stp->st_stateid, sizeof(stateid_t));
 
+	if (nfsd4_has_session(&resp->cstate))
+		open->op_stateowner->so_confirmed = 1;
+
 	/*
 	* Attempt to hand out a delegation. No error return, because the
 	* OPEN succeeds even if we fail.
@@ -1790,7 +2585,8 @@ out:
 	* To finish the open response, we just need to set the rflags.
 	*/
 	open->op_rflags = NFS4_OPEN_RESULT_LOCKTYPE_POSIX;
-	if (!open->op_stateowner->so_confirmed)
+	if (!open->op_stateowner->so_confirmed &&
+	    !nfsd4_has_session(&resp->cstate))
 		open->op_rflags |= NFS4_OPEN_RESULT_CONFIRM;
 
 	return status;
@@ -1898,7 +2694,7 @@ nfs4_laundromat(void)
 		}
 		dprintk("NFSD: purging unused open stateowner (so_id %d)\n",
 			sop->so_id);
-		release_stateowner(sop);
+		release_openowner(sop);
 	}
 	if (clientid_val < NFSD_LAUNDROMAT_MINTIMEOUT)
 		clientid_val = NFSD_LAUNDROMAT_MINTIMEOUT;
@@ -1983,10 +2779,7 @@ out:
 static inline __be32
 check_special_stateids(svc_fh *current_fh, stateid_t *stateid, int flags)
 {
-	/* Trying to call delegreturn with a special stateid? Yuch: */
-	if (!(flags & (RD_STATE | WR_STATE)))
-		return nfserr_bad_stateid;
-	else if (ONE_STATEID(stateid) && (flags & RD_STATE))
+	if (ONE_STATEID(stateid) && (flags & RD_STATE))
 		return nfs_ok;
 	else if (locks_in_grace()) {
 		/* Answer in remaining cases depends on existance of
@@ -2005,14 +2798,20 @@ check_special_stateids(svc_fh *current_fh, stateid_t *stateid, int flags)
  * that are not able to provide mandatory locking.
  */
 static inline int
-io_during_grace_disallowed(struct inode *inode, int flags)
+grace_disallows_io(struct inode *inode)
 {
-	return locks_in_grace() && (flags & (RD_STATE | WR_STATE))
-		&& mandatory_lock(inode);
+	return locks_in_grace() && mandatory_lock(inode);
 }
 
-static int check_stateid_generation(stateid_t *in, stateid_t *ref)
+static int check_stateid_generation(stateid_t *in, stateid_t *ref, int flags)
 {
+	/*
+	 * When sessions are used the stateid generation number is ignored
+	 * when it is zero.
+	 */
+	if ((flags & HAS_SESSION) && in->si_generation == 0)
+		goto out;
+
 	/* If the client sends us a stateid from the future, it's buggy: */
 	if (in->si_generation > ref->si_generation)
 		return nfserr_bad_stateid;
@@ -2028,74 +2827,77 @@ static int check_stateid_generation(stateid_t *in, stateid_t *ref)
 	 */
 	if (in->si_generation < ref->si_generation)
 		return nfserr_old_stateid;
+out:
 	return nfs_ok;
 }
 
+static int is_delegation_stateid(stateid_t *stateid)
+{
+	return stateid->si_fileid == 0;
+}
+
 /*
 * Checks for stateid operations
 */
 __be32
-nfs4_preprocess_stateid_op(struct svc_fh *current_fh, stateid_t *stateid, int flags, struct file **filpp)
+nfs4_preprocess_stateid_op(struct nfsd4_compound_state *cstate,
+			   stateid_t *stateid, int flags, struct file **filpp)
 {
 	struct nfs4_stateid *stp = NULL;
 	struct nfs4_delegation *dp = NULL;
-	stateid_t *stidp;
+	struct svc_fh *current_fh = &cstate->current_fh;
 	struct inode *ino = current_fh->fh_dentry->d_inode;
 	__be32 status;
 
-	dprintk("NFSD: preprocess_stateid_op: stateid = (%08x/%08x/%08x/%08x)\n",
-		stateid->si_boot, stateid->si_stateownerid, 
-		stateid->si_fileid, stateid->si_generation); 
 	if (filpp)
 		*filpp = NULL;
 
-	if (io_during_grace_disallowed(ino, flags))
+	if (grace_disallows_io(ino))
 		return nfserr_grace;
 
+	if (nfsd4_has_session(cstate))
+		flags |= HAS_SESSION;
+
 	if (ZERO_STATEID(stateid) || ONE_STATEID(stateid))
 		return check_special_stateids(current_fh, stateid, flags);
 
-	/* STALE STATEID */
 	status = nfserr_stale_stateid;
 	if (STALE_STATEID(stateid)) 
 		goto out;
 
-	/* BAD STATEID */
 	status = nfserr_bad_stateid;
-	if (!stateid->si_fileid) { /* delegation stateid */
-		if(!(dp = find_delegation_stateid(ino, stateid))) {
-			dprintk("NFSD: delegation stateid not found\n");
+	if (is_delegation_stateid(stateid)) {
+		dp = find_delegation_stateid(ino, stateid);
+		if (!dp)
 			goto out;
-		}
-		stidp = &dp->dl_stateid;
+		status = check_stateid_generation(stateid, &dp->dl_stateid,
+						  flags);
+		if (status)
+			goto out;
+		status = nfs4_check_delegmode(dp, flags);
+		if (status)
+			goto out;
+		renew_client(dp->dl_client);
+		if (filpp)
+			*filpp = dp->dl_vfs_file;
 	} else { /* open or lock stateid */
-		if (!(stp = find_stateid(stateid, flags))) {
-			dprintk("NFSD: open or lock stateid not found\n");
+		stp = find_stateid(stateid, flags);
+		if (!stp)
 			goto out;
-		}
-		if ((flags & CHECK_FH) && nfs4_check_fh(current_fh, stp))
+		if (nfs4_check_fh(current_fh, stp))
 			goto out;
 		if (!stp->st_stateowner->so_confirmed)
 			goto out;
-		stidp = &stp->st_stateid;
-	}
-	status = check_stateid_generation(stateid, stidp);
-	if (status)
-		goto out;
-	if (stp) {
-		if ((status = nfs4_check_openmode(stp,flags)))
+		status = check_stateid_generation(stateid, &stp->st_stateid,
+						  flags);
+		if (status)
+			goto out;
+		status = nfs4_check_openmode(stp, flags);
+		if (status)
 			goto out;
 		renew_client(stp->st_stateowner->so_client);
 		if (filpp)
 			*filpp = stp->st_vfs_file;
-	} else {
-		if ((status = nfs4_check_delegmode(dp, flags)))
-			goto out;
-		renew_client(dp->dl_client);
-		if (flags & DELEG_RET)
-			unhash_delegation(dp);
-		if (filpp)
-			*filpp = dp->dl_vfs_file;
 	}
 	status = nfs_ok;
 out:
@@ -2113,10 +2915,14 @@ setlkflg (int type)
  * Checks for sequence id mutating operations. 
  */
 static __be32
-nfs4_preprocess_seqid_op(struct svc_fh *current_fh, u32 seqid, stateid_t *stateid, int flags, struct nfs4_stateowner **sopp, struct nfs4_stateid **stpp, struct nfsd4_lock *lock)
+nfs4_preprocess_seqid_op(struct nfsd4_compound_state *cstate, u32 seqid,
+			 stateid_t *stateid, int flags,
+			 struct nfs4_stateowner **sopp,
+			 struct nfs4_stateid **stpp, struct nfsd4_lock *lock)
 {
 	struct nfs4_stateid *stp;
 	struct nfs4_stateowner *sop;
+	struct svc_fh *current_fh = &cstate->current_fh;
 	__be32 status;
 
 	dprintk("NFSD: preprocess_seqid_op: seqid=%d " 
@@ -2134,6 +2940,10 @@ nfs4_preprocess_seqid_op(struct svc_fh *current_fh, u32 seqid, stateid_t *statei
 
 	if (STALE_STATEID(stateid))
 		return nfserr_stale_stateid;
+
+	if (nfsd4_has_session(cstate))
+		flags |= HAS_SESSION;
+
 	/*
 	* We return BAD_STATEID if filehandle doesn't match stateid, 
 	* the confirmed flag is incorrecly set, or the generation 
@@ -2166,8 +2976,9 @@ nfs4_preprocess_seqid_op(struct svc_fh *current_fh, u32 seqid, stateid_t *statei
 		if (lock->lk_is_new) {
 			if (!sop->so_is_open_owner)
 				return nfserr_bad_stateid;
-			if (!same_clid(&clp->cl_clientid, lockclid))
-			       return nfserr_bad_stateid;
+			if (!(flags & HAS_SESSION) &&
+			    !same_clid(&clp->cl_clientid, lockclid))
+				return nfserr_bad_stateid;
 			/* stp is the open stateid */
 			status = nfs4_check_openmode(stp, lkflg);
 			if (status)
@@ -2190,7 +3001,7 @@ nfs4_preprocess_seqid_op(struct svc_fh *current_fh, u32 seqid, stateid_t *statei
 	*  For the moment, we ignore the possibility of 
 	*  generation number wraparound.
 	*/
-	if (seqid != sop->so_seqid)
+	if (!(flags & HAS_SESSION) && seqid != sop->so_seqid)
 		goto check_replay;
 
 	if (sop->so_confirmed && flags & CONFIRM) {
@@ -2203,7 +3014,7 @@ nfs4_preprocess_seqid_op(struct svc_fh *current_fh, u32 seqid, stateid_t *statei
 				" confirmed yet!\n");
 		return nfserr_bad_stateid;
 	}
-	status = check_stateid_generation(stateid, &stp->st_stateid);
+	status = check_stateid_generation(stateid, &stp->st_stateid, flags);
 	if (status)
 		return status;
 	renew_client(sop->so_client);
@@ -2239,7 +3050,7 @@ nfsd4_open_confirm(struct svc_rqst *rqstp, struct nfsd4_compound_state *cstate,
 
 	nfs4_lock_state();
 
-	if ((status = nfs4_preprocess_seqid_op(&cstate->current_fh,
+	if ((status = nfs4_preprocess_seqid_op(cstate,
 					oc->oc_seqid, &oc->oc_req_stateid,
 					CONFIRM | OPEN_STATE,
 					&oc->oc_stateowner, &stp, NULL)))
@@ -2304,12 +3115,12 @@ nfsd4_open_downgrade(struct svc_rqst *rqstp,
 			(int)cstate->current_fh.fh_dentry->d_name.len,
 			cstate->current_fh.fh_dentry->d_name.name);
 
-	if (!access_valid(od->od_share_access)
+	if (!access_valid(od->od_share_access, cstate->minorversion)
 			|| !deny_valid(od->od_share_deny))
 		return nfserr_inval;
 
 	nfs4_lock_state();
-	if ((status = nfs4_preprocess_seqid_op(&cstate->current_fh,
+	if ((status = nfs4_preprocess_seqid_op(cstate,
 					od->od_seqid,
 					&od->od_stateid, 
 					OPEN_STATE,
@@ -2362,7 +3173,7 @@ nfsd4_close(struct svc_rqst *rqstp, struct nfsd4_compound_state *cstate,
 
 	nfs4_lock_state();
 	/* check close_lru for replay */
-	if ((status = nfs4_preprocess_seqid_op(&cstate->current_fh,
+	if ((status = nfs4_preprocess_seqid_op(cstate,
 					close->cl_seqid,
 					&close->cl_stateid, 
 					OPEN_STATE | CLOSE_STATE,
@@ -2373,7 +3184,7 @@ nfsd4_close(struct svc_rqst *rqstp, struct nfsd4_compound_state *cstate,
 	memcpy(&close->cl_stateid, &stp->st_stateid, sizeof(stateid_t));
 
 	/* release_stateid() calls nfsd_close() if needed */
-	release_stateid(stp, OPEN_STATE);
+	release_open_stateid(stp);
 
 	/* place unused nfs4_stateowners on so_close_lru list to be
 	 * released by the laundromat service after the lease period
@@ -2394,16 +3205,40 @@ __be32
 nfsd4_delegreturn(struct svc_rqst *rqstp, struct nfsd4_compound_state *cstate,
 		  struct nfsd4_delegreturn *dr)
 {
+	struct nfs4_delegation *dp;
+	stateid_t *stateid = &dr->dr_stateid;
+	struct inode *inode;
 	__be32 status;
+	int flags = 0;
 
 	if ((status = fh_verify(rqstp, &cstate->current_fh, S_IFREG, 0)))
-		goto out;
+		return status;
+	inode = cstate->current_fh.fh_dentry->d_inode;
 
+	if (nfsd4_has_session(cstate))
+		flags |= HAS_SESSION;
 	nfs4_lock_state();
-	status = nfs4_preprocess_stateid_op(&cstate->current_fh,
-					    &dr->dr_stateid, DELEG_RET, NULL);
-	nfs4_unlock_state();
+	status = nfserr_bad_stateid;
+	if (ZERO_STATEID(stateid) || ONE_STATEID(stateid))
+		goto out;
+	status = nfserr_stale_stateid;
+	if (STALE_STATEID(stateid))
+		goto out;
+	status = nfserr_bad_stateid;
+	if (!is_delegation_stateid(stateid))
+		goto out;
+	dp = find_delegation_stateid(inode, stateid);
+	if (!dp)
+		goto out;
+	status = check_stateid_generation(stateid, &dp->dl_stateid, flags);
+	if (status)
+		goto out;
+	renew_client(dp->dl_client);
+
+	unhash_delegation(dp);
 out:
+	nfs4_unlock_state();
+
 	return status;
 }
 
@@ -2684,11 +3519,12 @@ nfsd4_lock(struct svc_rqst *rqstp, struct nfsd4_compound_state *cstate,
 		struct nfs4_file *fp;
 		
 		status = nfserr_stale_clientid;
-		if (STALE_CLIENTID(&lock->lk_new_clientid))
+		if (!nfsd4_has_session(cstate) &&
+		    STALE_CLIENTID(&lock->lk_new_clientid))
 			goto out;
 
 		/* validate and update open stateid and open seqid */
-		status = nfs4_preprocess_seqid_op(&cstate->current_fh,
+		status = nfs4_preprocess_seqid_op(cstate,
 				        lock->lk_new_open_seqid,
 		                        &lock->lk_new_open_stateid,
 					OPEN_STATE,
@@ -2715,7 +3551,7 @@ nfsd4_lock(struct svc_rqst *rqstp, struct nfsd4_compound_state *cstate,
 			goto out;
 	} else {
 		/* lock (lock owner + lock stateid) already exists */
-		status = nfs4_preprocess_seqid_op(&cstate->current_fh,
+		status = nfs4_preprocess_seqid_op(cstate,
 				       lock->lk_old_lock_seqid, 
 				       &lock->lk_old_lock_stateid, 
 				       LOCK_STATE,
@@ -2788,7 +3624,7 @@ nfsd4_lock(struct svc_rqst *rqstp, struct nfsd4_compound_state *cstate,
 	}
 out:
 	if (status && lock->lk_is_new && lock_sop)
-		release_stateowner(lock_sop);
+		release_lockowner(lock_sop);
 	if (lock->lk_replay_owner) {
 		nfs4_get_stateowner(lock->lk_replay_owner);
 		cstate->replay_owner = lock->lk_replay_owner;
@@ -2838,7 +3674,7 @@ nfsd4_lockt(struct svc_rqst *rqstp, struct nfsd4_compound_state *cstate,
 	nfs4_lock_state();
 
 	status = nfserr_stale_clientid;
-	if (STALE_CLIENTID(&lockt->lt_clientid))
+	if (!nfsd4_has_session(cstate) && STALE_CLIENTID(&lockt->lt_clientid))
 		goto out;
 
 	if ((status = fh_verify(rqstp, &cstate->current_fh, S_IFREG, 0))) {
@@ -2911,7 +3747,7 @@ nfsd4_locku(struct svc_rqst *rqstp, struct nfsd4_compound_state *cstate,
 
 	nfs4_lock_state();
 									        
-	if ((status = nfs4_preprocess_seqid_op(&cstate->current_fh,
+	if ((status = nfs4_preprocess_seqid_op(cstate,
 					locku->lu_seqid, 
 					&locku->lu_stateid, 
 					LOCK_STATE,
@@ -3037,7 +3873,7 @@ nfsd4_release_lockowner(struct svc_rqst *rqstp,
 		/* unhash_stateowner deletes so_perclient only
 		 * for openowners. */
 		list_del(&sop->so_perclient);
-		release_stateowner(sop);
+		release_lockowner(sop);
 	}
 out:
 	nfs4_unlock_state();
@@ -3051,12 +3887,12 @@ alloc_reclaim(void)
 }
 
 int
-nfs4_has_reclaimed_state(const char *name)
+nfs4_has_reclaimed_state(const char *name, bool use_exchange_id)
 {
 	unsigned int strhashval = clientstr_hashval(name);
 	struct nfs4_client *clp;
 
-	clp = find_confirmed_client_by_str(name, strhashval);
+	clp = find_confirmed_client_by_str(name, strhashval, use_exchange_id);
 	return clp ? 1 : 0;
 }
 
@@ -3153,6 +3989,8 @@ nfs4_state_init(void)
 		INIT_LIST_HEAD(&unconf_str_hashtbl[i]);
 		INIT_LIST_HEAD(&unconf_id_hashtbl[i]);
 	}
+	for (i = 0; i < SESSION_HASH_SIZE; i++)
+		INIT_LIST_HEAD(&sessionid_hashtbl[i]);
 	for (i = 0; i < FILE_HASH_SIZE; i++) {
 		INIT_LIST_HEAD(&file_hashtbl[i]);
 	}
diff --git a/fs/nfsd/nfs4xdr.c b/fs/nfsd/nfs4xdr.c
index 9250067943d8..b820c311931c 100644
--- a/fs/nfsd/nfs4xdr.c
+++ b/fs/nfsd/nfs4xdr.c
@@ -45,6 +45,7 @@
 #include <linux/fs.h>
 #include <linux/namei.h>
 #include <linux/vfs.h>
+#include <linux/utsname.h>
 #include <linux/sunrpc/xdr.h>
 #include <linux/sunrpc/svc.h>
 #include <linux/sunrpc/clnt.h>
@@ -188,6 +189,11 @@ static __be32 *read_buf(struct nfsd4_compoundargs *argp, u32 nbytes)
 	return p;
 }
 
+static int zero_clientid(clientid_t *clid)
+{
+	return (clid->cl_boot == 0) && (clid->cl_id == 0);
+}
+
 static int
 defer_free(struct nfsd4_compoundargs *argp,
 		void (*release)(const void *), void *p)
@@ -230,6 +236,7 @@ nfsd4_decode_bitmap(struct nfsd4_compoundargs *argp, u32 *bmval)
 
 	bmval[0] = 0;
 	bmval[1] = 0;
+	bmval[2] = 0;
 
 	READ_BUF(4);
 	READ32(bmlen);
@@ -241,13 +248,27 @@ nfsd4_decode_bitmap(struct nfsd4_compoundargs *argp, u32 *bmval)
 		READ32(bmval[0]);
 	if (bmlen > 1)
 		READ32(bmval[1]);
+	if (bmlen > 2)
+		READ32(bmval[2]);
 
 	DECODE_TAIL;
 }
 
+static u32 nfsd_attrmask[] = {
+	NFSD_WRITEABLE_ATTRS_WORD0,
+	NFSD_WRITEABLE_ATTRS_WORD1,
+	NFSD_WRITEABLE_ATTRS_WORD2
+};
+
+static u32 nfsd41_ex_attrmask[] = {
+	NFSD_SUPPATTR_EXCLCREAT_WORD0,
+	NFSD_SUPPATTR_EXCLCREAT_WORD1,
+	NFSD_SUPPATTR_EXCLCREAT_WORD2
+};
+
 static __be32
-nfsd4_decode_fattr(struct nfsd4_compoundargs *argp, u32 *bmval, struct iattr *iattr,
-    struct nfs4_acl **acl)
+nfsd4_decode_fattr(struct nfsd4_compoundargs *argp, u32 *bmval, u32 *writable,
+		   struct iattr *iattr, struct nfs4_acl **acl)
 {
 	int expected_len, len = 0;
 	u32 dummy32;
@@ -263,9 +284,12 @@ nfsd4_decode_fattr(struct nfsd4_compoundargs *argp, u32 *bmval, struct iattr *ia
 	 * According to spec, unsupported attributes return ERR_ATTRNOTSUPP;
 	 * read-only attributes return ERR_INVAL.
 	 */
-	if ((bmval[0] & ~NFSD_SUPPORTED_ATTRS_WORD0) || (bmval[1] & ~NFSD_SUPPORTED_ATTRS_WORD1))
+	if ((bmval[0] & ~nfsd_suppattrs0(argp->minorversion)) ||
+	    (bmval[1] & ~nfsd_suppattrs1(argp->minorversion)) ||
+	    (bmval[2] & ~nfsd_suppattrs2(argp->minorversion)))
 		return nfserr_attrnotsupp;
-	if ((bmval[0] & ~NFSD_WRITEABLE_ATTRS_WORD0) || (bmval[1] & ~NFSD_WRITEABLE_ATTRS_WORD1))
+	if ((bmval[0] & ~writable[0]) || (bmval[1] & ~writable[1]) ||
+	    (bmval[2] & ~writable[2]))
 		return nfserr_inval;
 
 	READ_BUF(4);
@@ -400,6 +424,7 @@ nfsd4_decode_fattr(struct nfsd4_compoundargs *argp, u32 *bmval, struct iattr *ia
 			goto xdr_error;
 		}
 	}
+	BUG_ON(bmval[2]);	/* no such writeable attr supported yet */
 	if (len != expected_len)
 		goto xdr_error;
 
@@ -493,7 +518,9 @@ nfsd4_decode_create(struct nfsd4_compoundargs *argp, struct nfsd4_create *create
 	if ((status = check_filename(create->cr_name, create->cr_namelen, nfserr_inval)))
 		return status;
 
-	if ((status = nfsd4_decode_fattr(argp, create->cr_bmval, &create->cr_iattr, &create->cr_acl)))
+	status = nfsd4_decode_fattr(argp, create->cr_bmval, nfsd_attrmask,
+				    &create->cr_iattr, &create->cr_acl);
+	if (status)
 		goto out;
 
 	DECODE_TAIL;
@@ -583,6 +610,8 @@ nfsd4_decode_lockt(struct nfsd4_compoundargs *argp, struct nfsd4_lockt *lockt)
 	READ_BUF(lockt->lt_owner.len);
 	READMEM(lockt->lt_owner.data, lockt->lt_owner.len);
 
+	if (argp->minorversion && !zero_clientid(&lockt->lt_clientid))
+		return nfserr_inval;
 	DECODE_TAIL;
 }
 
@@ -652,13 +681,26 @@ nfsd4_decode_open(struct nfsd4_compoundargs *argp, struct nfsd4_open *open)
 		switch (open->op_createmode) {
 		case NFS4_CREATE_UNCHECKED:
 		case NFS4_CREATE_GUARDED:
-			if ((status = nfsd4_decode_fattr(argp, open->op_bmval, &open->op_iattr, &open->op_acl)))
+			status = nfsd4_decode_fattr(argp, open->op_bmval,
+				nfsd_attrmask, &open->op_iattr, &open->op_acl);
+			if (status)
 				goto out;
 			break;
 		case NFS4_CREATE_EXCLUSIVE:
 			READ_BUF(8);
 			COPYMEM(open->op_verf.data, 8);
 			break;
+		case NFS4_CREATE_EXCLUSIVE4_1:
+			if (argp->minorversion < 1)
+				goto xdr_error;
+			READ_BUF(8);
+			COPYMEM(open->op_verf.data, 8);
+			status = nfsd4_decode_fattr(argp, open->op_bmval,
+				nfsd41_ex_attrmask, &open->op_iattr,
+				&open->op_acl);
+			if (status)
+				goto out;
+			break;
 		default:
 			goto xdr_error;
 		}
@@ -851,7 +893,7 @@ nfsd4_decode_setattr(struct nfsd4_compoundargs *argp, struct nfsd4_setattr *seta
 	status = nfsd4_decode_stateid(argp, &setattr->sa_stateid);
 	if (status)
 		return status;
-	return nfsd4_decode_fattr(argp, setattr->sa_bmval,
+	return nfsd4_decode_fattr(argp, setattr->sa_bmval, nfsd_attrmask,
 				  &setattr->sa_iattr, &setattr->sa_acl);
 }
 
@@ -993,6 +1035,241 @@ nfsd4_decode_release_lockowner(struct nfsd4_compoundargs *argp, struct nfsd4_rel
 	READ_BUF(rlockowner->rl_owner.len);
 	READMEM(rlockowner->rl_owner.data, rlockowner->rl_owner.len);
 
+	if (argp->minorversion && !zero_clientid(&rlockowner->rl_clientid))
+		return nfserr_inval;
+	DECODE_TAIL;
+}
+
+static __be32
+nfsd4_decode_exchange_id(struct nfsd4_compoundargs *argp,
+			 struct nfsd4_exchange_id *exid)
+{
+	int dummy;
+	DECODE_HEAD;
+
+	READ_BUF(NFS4_VERIFIER_SIZE);
+	COPYMEM(exid->verifier.data, NFS4_VERIFIER_SIZE);
+
+	READ_BUF(4);
+	READ32(exid->clname.len);
+
+	READ_BUF(exid->clname.len);
+	SAVEMEM(exid->clname.data, exid->clname.len);
+
+	READ_BUF(4);
+	READ32(exid->flags);
+
+	/* Ignore state_protect4_a */
+	READ_BUF(4);
+	READ32(exid->spa_how);
+	switch (exid->spa_how) {
+	case SP4_NONE:
+		break;
+	case SP4_MACH_CRED:
+		/* spo_must_enforce */
+		READ_BUF(4);
+		READ32(dummy);
+		READ_BUF(dummy * 4);
+		p += dummy;
+
+		/* spo_must_allow */
+		READ_BUF(4);
+		READ32(dummy);
+		READ_BUF(dummy * 4);
+		p += dummy;
+		break;
+	case SP4_SSV:
+		/* ssp_ops */
+		READ_BUF(4);
+		READ32(dummy);
+		READ_BUF(dummy * 4);
+		p += dummy;
+
+		READ_BUF(4);
+		READ32(dummy);
+		READ_BUF(dummy * 4);
+		p += dummy;
+
+		/* ssp_hash_algs<> */
+		READ_BUF(4);
+		READ32(dummy);
+		READ_BUF(dummy);
+		p += XDR_QUADLEN(dummy);
+
+		/* ssp_encr_algs<> */
+		READ_BUF(4);
+		READ32(dummy);
+		READ_BUF(dummy);
+		p += XDR_QUADLEN(dummy);
+
+		/* ssp_window and ssp_num_gss_handles */
+		READ_BUF(8);
+		READ32(dummy);
+		READ32(dummy);
+		break;
+	default:
+		goto xdr_error;
+	}
+
+	/* Ignore Implementation ID */
+	READ_BUF(4);    /* nfs_impl_id4 array length */
+	READ32(dummy);
+
+	if (dummy > 1)
+		goto xdr_error;
+
+	if (dummy == 1) {
+		/* nii_domain */
+		READ_BUF(4);
+		READ32(dummy);
+		READ_BUF(dummy);
+		p += XDR_QUADLEN(dummy);
+
+		/* nii_name */
+		READ_BUF(4);
+		READ32(dummy);
+		READ_BUF(dummy);
+		p += XDR_QUADLEN(dummy);
+
+		/* nii_date */
+		READ_BUF(12);
+		p += 3;
+	}
+	DECODE_TAIL;
+}
+
+static __be32
+nfsd4_decode_create_session(struct nfsd4_compoundargs *argp,
+			    struct nfsd4_create_session *sess)
+{
+	DECODE_HEAD;
+
+	u32 dummy;
+	char *machine_name;
+	int i;
+	int nr_secflavs;
+
+	READ_BUF(16);
+	COPYMEM(&sess->clientid, 8);
+	READ32(sess->seqid);
+	READ32(sess->flags);
+
+	/* Fore channel attrs */
+	READ_BUF(28);
+	READ32(dummy); /* headerpadsz is always 0 */
+	READ32(sess->fore_channel.maxreq_sz);
+	READ32(sess->fore_channel.maxresp_sz);
+	READ32(sess->fore_channel.maxresp_cached);
+	READ32(sess->fore_channel.maxops);
+	READ32(sess->fore_channel.maxreqs);
+	READ32(sess->fore_channel.nr_rdma_attrs);
+	if (sess->fore_channel.nr_rdma_attrs == 1) {
+		READ_BUF(4);
+		READ32(sess->fore_channel.rdma_attrs);
+	} else if (sess->fore_channel.nr_rdma_attrs > 1) {
+		dprintk("Too many fore channel attr bitmaps!\n");
+		goto xdr_error;
+	}
+
+	/* Back channel attrs */
+	READ_BUF(28);
+	READ32(dummy); /* headerpadsz is always 0 */
+	READ32(sess->back_channel.maxreq_sz);
+	READ32(sess->back_channel.maxresp_sz);
+	READ32(sess->back_channel.maxresp_cached);
+	READ32(sess->back_channel.maxops);
+	READ32(sess->back_channel.maxreqs);
+	READ32(sess->back_channel.nr_rdma_attrs);
+	if (sess->back_channel.nr_rdma_attrs == 1) {
+		READ_BUF(4);
+		READ32(sess->back_channel.rdma_attrs);
+	} else if (sess->back_channel.nr_rdma_attrs > 1) {
+		dprintk("Too many back channel attr bitmaps!\n");
+		goto xdr_error;
+	}
+
+	READ_BUF(8);
+	READ32(sess->callback_prog);
+
+	/* callback_sec_params4 */
+	READ32(nr_secflavs);
+	for (i = 0; i < nr_secflavs; ++i) {
+		READ_BUF(4);
+		READ32(dummy);
+		switch (dummy) {
+		case RPC_AUTH_NULL:
+			/* Nothing to read */
+			break;
+		case RPC_AUTH_UNIX:
+			READ_BUF(8);
+			/* stamp */
+			READ32(dummy);
+
+			/* machine name */
+			READ32(dummy);
+			READ_BUF(dummy);
+			SAVEMEM(machine_name, dummy);
+
+			/* uid, gid */
+			READ_BUF(8);
+			READ32(sess->uid);
+			READ32(sess->gid);
+
+			/* more gids */
+			READ_BUF(4);
+			READ32(dummy);
+			READ_BUF(dummy * 4);
+			for (i = 0; i < dummy; ++i)
+				READ32(dummy);
+			break;
+		case RPC_AUTH_GSS:
+			dprintk("RPC_AUTH_GSS callback secflavor "
+				"not supported!\n");
+			READ_BUF(8);
+			/* gcbp_service */
+			READ32(dummy);
+			/* gcbp_handle_from_server */
+			READ32(dummy);
+			READ_BUF(dummy);
+			p += XDR_QUADLEN(dummy);
+			/* gcbp_handle_from_client */
+			READ_BUF(4);
+			READ32(dummy);
+			READ_BUF(dummy);
+			p += XDR_QUADLEN(dummy);
+			break;
+		default:
+			dprintk("Illegal callback secflavor\n");
+			return nfserr_inval;
+		}
+	}
+	DECODE_TAIL;
+}
+
+static __be32
+nfsd4_decode_destroy_session(struct nfsd4_compoundargs *argp,
+			     struct nfsd4_destroy_session *destroy_session)
+{
+	DECODE_HEAD;
+	READ_BUF(NFS4_MAX_SESSIONID_LEN);
+	COPYMEM(destroy_session->sessionid.data, NFS4_MAX_SESSIONID_LEN);
+
+	DECODE_TAIL;
+}
+
+static __be32
+nfsd4_decode_sequence(struct nfsd4_compoundargs *argp,
+		      struct nfsd4_sequence *seq)
+{
+	DECODE_HEAD;
+
+	READ_BUF(NFS4_MAX_SESSIONID_LEN + 16);
+	COPYMEM(seq->sessionid.data, NFS4_MAX_SESSIONID_LEN);
+	READ32(seq->seqid);
+	READ32(seq->slotid);
+	READ32(seq->maxslots);
+	READ32(seq->cachethis);
+
 	DECODE_TAIL;
 }
 
@@ -1005,7 +1282,7 @@ nfsd4_decode_noop(struct nfsd4_compoundargs *argp, void *p)
 static __be32
 nfsd4_decode_notsupp(struct nfsd4_compoundargs *argp, void *p)
 {
-	return nfserr_opnotsupp;
+	return nfserr_notsupp;
 }
 
 typedef __be32(*nfsd4_dec)(struct nfsd4_compoundargs *argp, void *);
@@ -1031,7 +1308,7 @@ static nfsd4_dec nfsd4_dec_ops[] = {
 	[OP_OPEN_CONFIRM]	= (nfsd4_dec)nfsd4_decode_open_confirm,
 	[OP_OPEN_DOWNGRADE]	= (nfsd4_dec)nfsd4_decode_open_downgrade,
 	[OP_PUTFH]		= (nfsd4_dec)nfsd4_decode_putfh,
-	[OP_PUTPUBFH]		= (nfsd4_dec)nfsd4_decode_notsupp,
+	[OP_PUTPUBFH]		= (nfsd4_dec)nfsd4_decode_noop,
 	[OP_PUTROOTFH]		= (nfsd4_dec)nfsd4_decode_noop,
 	[OP_READ]		= (nfsd4_dec)nfsd4_decode_read,
 	[OP_READDIR]		= (nfsd4_dec)nfsd4_decode_readdir,
@@ -1050,6 +1327,67 @@ static nfsd4_dec nfsd4_dec_ops[] = {
 	[OP_RELEASE_LOCKOWNER]	= (nfsd4_dec)nfsd4_decode_release_lockowner,
 };
 
+static nfsd4_dec nfsd41_dec_ops[] = {
+	[OP_ACCESS]		(nfsd4_dec)nfsd4_decode_access,
+	[OP_CLOSE]		(nfsd4_dec)nfsd4_decode_close,
+	[OP_COMMIT]		(nfsd4_dec)nfsd4_decode_commit,
+	[OP_CREATE]		(nfsd4_dec)nfsd4_decode_create,
+	[OP_DELEGPURGE]		(nfsd4_dec)nfsd4_decode_notsupp,
+	[OP_DELEGRETURN]	(nfsd4_dec)nfsd4_decode_delegreturn,
+	[OP_GETATTR]		(nfsd4_dec)nfsd4_decode_getattr,
+	[OP_GETFH]		(nfsd4_dec)nfsd4_decode_noop,
+	[OP_LINK]		(nfsd4_dec)nfsd4_decode_link,
+	[OP_LOCK]		(nfsd4_dec)nfsd4_decode_lock,
+	[OP_LOCKT]		(nfsd4_dec)nfsd4_decode_lockt,
+	[OP_LOCKU]		(nfsd4_dec)nfsd4_decode_locku,
+	[OP_LOOKUP]		(nfsd4_dec)nfsd4_decode_lookup,
+	[OP_LOOKUPP]		(nfsd4_dec)nfsd4_decode_noop,
+	[OP_NVERIFY]		(nfsd4_dec)nfsd4_decode_verify,
+	[OP_OPEN]		(nfsd4_dec)nfsd4_decode_open,
+	[OP_OPENATTR]		(nfsd4_dec)nfsd4_decode_notsupp,
+	[OP_OPEN_CONFIRM]	(nfsd4_dec)nfsd4_decode_notsupp,
+	[OP_OPEN_DOWNGRADE]	(nfsd4_dec)nfsd4_decode_open_downgrade,
+	[OP_PUTFH]		(nfsd4_dec)nfsd4_decode_putfh,
+	[OP_PUTPUBFH]		(nfsd4_dec)nfsd4_decode_notsupp,
+	[OP_PUTROOTFH]		(nfsd4_dec)nfsd4_decode_noop,
+	[OP_READ]		(nfsd4_dec)nfsd4_decode_read,
+	[OP_READDIR]		(nfsd4_dec)nfsd4_decode_readdir,
+	[OP_READLINK]		(nfsd4_dec)nfsd4_decode_noop,
+	[OP_REMOVE]		(nfsd4_dec)nfsd4_decode_remove,
+	[OP_RENAME]		(nfsd4_dec)nfsd4_decode_rename,
+	[OP_RENEW]		(nfsd4_dec)nfsd4_decode_notsupp,
+	[OP_RESTOREFH]		(nfsd4_dec)nfsd4_decode_noop,
+	[OP_SAVEFH]		(nfsd4_dec)nfsd4_decode_noop,
+	[OP_SECINFO]		(nfsd4_dec)nfsd4_decode_secinfo,
+	[OP_SETATTR]		(nfsd4_dec)nfsd4_decode_setattr,
+	[OP_SETCLIENTID]	(nfsd4_dec)nfsd4_decode_notsupp,
+	[OP_SETCLIENTID_CONFIRM](nfsd4_dec)nfsd4_decode_notsupp,
+	[OP_VERIFY]		(nfsd4_dec)nfsd4_decode_verify,
+	[OP_WRITE]		(nfsd4_dec)nfsd4_decode_write,
+	[OP_RELEASE_LOCKOWNER]	(nfsd4_dec)nfsd4_decode_notsupp,
+
+	/* new operations for NFSv4.1 */
+	[OP_BACKCHANNEL_CTL]	(nfsd4_dec)nfsd4_decode_notsupp,
+	[OP_BIND_CONN_TO_SESSION](nfsd4_dec)nfsd4_decode_notsupp,
+	[OP_EXCHANGE_ID]	(nfsd4_dec)nfsd4_decode_exchange_id,
+	[OP_CREATE_SESSION]	(nfsd4_dec)nfsd4_decode_create_session,
+	[OP_DESTROY_SESSION]	(nfsd4_dec)nfsd4_decode_destroy_session,
+	[OP_FREE_STATEID]	(nfsd4_dec)nfsd4_decode_notsupp,
+	[OP_GET_DIR_DELEGATION]	(nfsd4_dec)nfsd4_decode_notsupp,
+	[OP_GETDEVICEINFO]	(nfsd4_dec)nfsd4_decode_notsupp,
+	[OP_GETDEVICELIST]	(nfsd4_dec)nfsd4_decode_notsupp,
+	[OP_LAYOUTCOMMIT]	(nfsd4_dec)nfsd4_decode_notsupp,
+	[OP_LAYOUTGET]		(nfsd4_dec)nfsd4_decode_notsupp,
+	[OP_LAYOUTRETURN]	(nfsd4_dec)nfsd4_decode_notsupp,
+	[OP_SECINFO_NO_NAME]	(nfsd4_dec)nfsd4_decode_notsupp,
+	[OP_SEQUENCE]		(nfsd4_dec)nfsd4_decode_sequence,
+	[OP_SET_SSV]		(nfsd4_dec)nfsd4_decode_notsupp,
+	[OP_TEST_STATEID]	(nfsd4_dec)nfsd4_decode_notsupp,
+	[OP_WANT_DELEGATION]	(nfsd4_dec)nfsd4_decode_notsupp,
+	[OP_DESTROY_CLIENTID]	(nfsd4_dec)nfsd4_decode_notsupp,
+	[OP_RECLAIM_COMPLETE]	(nfsd4_dec)nfsd4_decode_notsupp,
+};
+
 struct nfsd4_minorversion_ops {
 	nfsd4_dec *decoders;
 	int nops;
@@ -1057,6 +1395,7 @@ struct nfsd4_minorversion_ops {
 
 static struct nfsd4_minorversion_ops nfsd4_minorversion[] = {
 	[0] = { nfsd4_dec_ops, ARRAY_SIZE(nfsd4_dec_ops) },
+	[1] = { nfsd41_dec_ops, ARRAY_SIZE(nfsd41_dec_ops) },
 };
 
 static __be32
@@ -1412,6 +1751,7 @@ nfsd4_encode_fattr(struct svc_fh *fhp, struct svc_export *exp,
 {
 	u32 bmval0 = bmval[0];
 	u32 bmval1 = bmval[1];
+	u32 bmval2 = bmval[2];
 	struct kstat stat;
 	struct svc_fh tempfh;
 	struct kstatfs statfs;
@@ -1425,12 +1765,16 @@ nfsd4_encode_fattr(struct svc_fh *fhp, struct svc_export *exp,
 	int err;
 	int aclsupport = 0;
 	struct nfs4_acl *acl = NULL;
+	struct nfsd4_compoundres *resp = rqstp->rq_resp;
+	u32 minorversion = resp->cstate.minorversion;
 
 	BUG_ON(bmval1 & NFSD_WRITEONLY_ATTRS_WORD1);
-	BUG_ON(bmval0 & ~NFSD_SUPPORTED_ATTRS_WORD0);
-	BUG_ON(bmval1 & ~NFSD_SUPPORTED_ATTRS_WORD1);
+	BUG_ON(bmval0 & ~nfsd_suppattrs0(minorversion));
+	BUG_ON(bmval1 & ~nfsd_suppattrs1(minorversion));
+	BUG_ON(bmval2 & ~nfsd_suppattrs2(minorversion));
 
 	if (exp->ex_fslocs.migrated) {
+		BUG_ON(bmval[2]);
 		status = fattr_handle_absent_fs(&bmval0, &bmval1, &rdattr_err);
 		if (status)
 			goto out;
@@ -1476,22 +1820,42 @@ nfsd4_encode_fattr(struct svc_fh *fhp, struct svc_export *exp,
 	if ((buflen -= 16) < 0)
 		goto out_resource;
 
-	WRITE32(2);
-	WRITE32(bmval0);
-	WRITE32(bmval1);
+	if (unlikely(bmval2)) {
+		WRITE32(3);
+		WRITE32(bmval0);
+		WRITE32(bmval1);
+		WRITE32(bmval2);
+	} else if (likely(bmval1)) {
+		WRITE32(2);
+		WRITE32(bmval0);
+		WRITE32(bmval1);
+	} else {
+		WRITE32(1);
+		WRITE32(bmval0);
+	}
 	attrlenp = p++;                /* to be backfilled later */
 
 	if (bmval0 & FATTR4_WORD0_SUPPORTED_ATTRS) {
-		u32 word0 = NFSD_SUPPORTED_ATTRS_WORD0;
+		u32 word0 = nfsd_suppattrs0(minorversion);
+		u32 word1 = nfsd_suppattrs1(minorversion);
+		u32 word2 = nfsd_suppattrs2(minorversion);
+
 		if ((buflen -= 12) < 0)
 			goto out_resource;
 		if (!aclsupport)
 			word0 &= ~FATTR4_WORD0_ACL;
 		if (!exp->ex_fslocs.locations)
 			word0 &= ~FATTR4_WORD0_FS_LOCATIONS;
-		WRITE32(2);
-		WRITE32(word0);
-		WRITE32(NFSD_SUPPORTED_ATTRS_WORD1);
+		if (!word2) {
+			WRITE32(2);
+			WRITE32(word0);
+			WRITE32(word1);
+		} else {
+			WRITE32(3);
+			WRITE32(word0);
+			WRITE32(word1);
+			WRITE32(word2);
+		}
 	}
 	if (bmval0 & FATTR4_WORD0_TYPE) {
 		if ((buflen -= 4) < 0)
@@ -1801,6 +2165,13 @@ out_acl:
 		}
 		WRITE64(stat.ino);
 	}
+	if (bmval2 & FATTR4_WORD2_SUPPATTR_EXCLCREAT) {
+		WRITE32(3);
+		WRITE32(NFSD_SUPPATTR_EXCLCREAT_WORD0);
+		WRITE32(NFSD_SUPPATTR_EXCLCREAT_WORD1);
+		WRITE32(NFSD_SUPPATTR_EXCLCREAT_WORD2);
+	}
+
 	*attrlenp = htonl((char *)p - (char *)attrlenp - 4);
 	*countp = p - buffer;
 	status = nfs_ok;
@@ -2572,6 +2943,143 @@ nfsd4_encode_write(struct nfsd4_compoundres *resp, __be32 nfserr, struct nfsd4_w
 }
 
 static __be32
+nfsd4_encode_exchange_id(struct nfsd4_compoundres *resp, int nfserr,
+			 struct nfsd4_exchange_id *exid)
+{
+	ENCODE_HEAD;
+	char *major_id;
+	char *server_scope;
+	int major_id_sz;
+	int server_scope_sz;
+	uint64_t minor_id = 0;
+
+	if (nfserr)
+		return nfserr;
+
+	major_id = utsname()->nodename;
+	major_id_sz = strlen(major_id);
+	server_scope = utsname()->nodename;
+	server_scope_sz = strlen(server_scope);
+
+	RESERVE_SPACE(
+		8 /* eir_clientid */ +
+		4 /* eir_sequenceid */ +
+		4 /* eir_flags */ +
+		4 /* spr_how (SP4_NONE) */ +
+		8 /* so_minor_id */ +
+		4 /* so_major_id.len */ +
+		(XDR_QUADLEN(major_id_sz) * 4) +
+		4 /* eir_server_scope.len */ +
+		(XDR_QUADLEN(server_scope_sz) * 4) +
+		4 /* eir_server_impl_id.count (0) */);
+
+	WRITEMEM(&exid->clientid, 8);
+	WRITE32(exid->seqid);
+	WRITE32(exid->flags);
+
+	/* state_protect4_r. Currently only support SP4_NONE */
+	BUG_ON(exid->spa_how != SP4_NONE);
+	WRITE32(exid->spa_how);
+
+	/* The server_owner struct */
+	WRITE64(minor_id);      /* Minor id */
+	/* major id */
+	WRITE32(major_id_sz);
+	WRITEMEM(major_id, major_id_sz);
+
+	/* Server scope */
+	WRITE32(server_scope_sz);
+	WRITEMEM(server_scope, server_scope_sz);
+
+	/* Implementation id */
+	WRITE32(0);	/* zero length nfs_impl_id4 array */
+	ADJUST_ARGS();
+	return 0;
+}
+
+static __be32
+nfsd4_encode_create_session(struct nfsd4_compoundres *resp, int nfserr,
+			    struct nfsd4_create_session *sess)
+{
+	ENCODE_HEAD;
+
+	if (nfserr)
+		return nfserr;
+
+	RESERVE_SPACE(24);
+	WRITEMEM(sess->sessionid.data, NFS4_MAX_SESSIONID_LEN);
+	WRITE32(sess->seqid);
+	WRITE32(sess->flags);
+	ADJUST_ARGS();
+
+	RESERVE_SPACE(28);
+	WRITE32(0); /* headerpadsz */
+	WRITE32(sess->fore_channel.maxreq_sz);
+	WRITE32(sess->fore_channel.maxresp_sz);
+	WRITE32(sess->fore_channel.maxresp_cached);
+	WRITE32(sess->fore_channel.maxops);
+	WRITE32(sess->fore_channel.maxreqs);
+	WRITE32(sess->fore_channel.nr_rdma_attrs);
+	ADJUST_ARGS();
+
+	if (sess->fore_channel.nr_rdma_attrs) {
+		RESERVE_SPACE(4);
+		WRITE32(sess->fore_channel.rdma_attrs);
+		ADJUST_ARGS();
+	}
+
+	RESERVE_SPACE(28);
+	WRITE32(0); /* headerpadsz */
+	WRITE32(sess->back_channel.maxreq_sz);
+	WRITE32(sess->back_channel.maxresp_sz);
+	WRITE32(sess->back_channel.maxresp_cached);
+	WRITE32(sess->back_channel.maxops);
+	WRITE32(sess->back_channel.maxreqs);
+	WRITE32(sess->back_channel.nr_rdma_attrs);
+	ADJUST_ARGS();
+
+	if (sess->back_channel.nr_rdma_attrs) {
+		RESERVE_SPACE(4);
+		WRITE32(sess->back_channel.rdma_attrs);
+		ADJUST_ARGS();
+	}
+	return 0;
+}
+
+static __be32
+nfsd4_encode_destroy_session(struct nfsd4_compoundres *resp, int nfserr,
+			     struct nfsd4_destroy_session *destroy_session)
+{
+	return nfserr;
+}
+
+__be32
+nfsd4_encode_sequence(struct nfsd4_compoundres *resp, int nfserr,
+		      struct nfsd4_sequence *seq)
+{
+	ENCODE_HEAD;
+
+	if (nfserr)
+		return nfserr;
+
+	RESERVE_SPACE(NFS4_MAX_SESSIONID_LEN + 20);
+	WRITEMEM(seq->sessionid.data, NFS4_MAX_SESSIONID_LEN);
+	WRITE32(seq->seqid);
+	WRITE32(seq->slotid);
+	WRITE32(seq->maxslots);
+	/*
+	 * FIXME: for now:
+	 *   target_maxslots = maxslots
+	 *   status_flags = 0
+	 */
+	WRITE32(seq->maxslots);
+	WRITE32(0);
+
+	ADJUST_ARGS();
+	return 0;
+}
+
+static __be32
 nfsd4_encode_noop(struct nfsd4_compoundres *resp, __be32 nfserr, void *p)
 {
 	return nfserr;
@@ -2579,6 +3087,11 @@ nfsd4_encode_noop(struct nfsd4_compoundres *resp, __be32 nfserr, void *p)
 
 typedef __be32(* nfsd4_enc)(struct nfsd4_compoundres *, __be32, void *);
 
+/*
+ * Note: nfsd4_enc_ops vector is shared for v4.0 and v4.1
+ * since we don't need to filter out obsolete ops as this is
+ * done in the decoding phase.
+ */
 static nfsd4_enc nfsd4_enc_ops[] = {
 	[OP_ACCESS]		= (nfsd4_enc)nfsd4_encode_access,
 	[OP_CLOSE]		= (nfsd4_enc)nfsd4_encode_close,
@@ -2617,8 +3130,77 @@ static nfsd4_enc nfsd4_enc_ops[] = {
 	[OP_VERIFY]		= (nfsd4_enc)nfsd4_encode_noop,
 	[OP_WRITE]		= (nfsd4_enc)nfsd4_encode_write,
 	[OP_RELEASE_LOCKOWNER]	= (nfsd4_enc)nfsd4_encode_noop,
+
+	/* NFSv4.1 operations */
+	[OP_BACKCHANNEL_CTL]	= (nfsd4_enc)nfsd4_encode_noop,
+	[OP_BIND_CONN_TO_SESSION] = (nfsd4_enc)nfsd4_encode_noop,
+	[OP_EXCHANGE_ID]	= (nfsd4_enc)nfsd4_encode_exchange_id,
+	[OP_CREATE_SESSION]	= (nfsd4_enc)nfsd4_encode_create_session,
+	[OP_DESTROY_SESSION]	= (nfsd4_enc)nfsd4_encode_destroy_session,
+	[OP_FREE_STATEID]	= (nfsd4_enc)nfsd4_encode_noop,
+	[OP_GET_DIR_DELEGATION]	= (nfsd4_enc)nfsd4_encode_noop,
+	[OP_GETDEVICEINFO]	= (nfsd4_enc)nfsd4_encode_noop,
+	[OP_GETDEVICELIST]	= (nfsd4_enc)nfsd4_encode_noop,
+	[OP_LAYOUTCOMMIT]	= (nfsd4_enc)nfsd4_encode_noop,
+	[OP_LAYOUTGET]		= (nfsd4_enc)nfsd4_encode_noop,
+	[OP_LAYOUTRETURN]	= (nfsd4_enc)nfsd4_encode_noop,
+	[OP_SECINFO_NO_NAME]	= (nfsd4_enc)nfsd4_encode_noop,
+	[OP_SEQUENCE]		= (nfsd4_enc)nfsd4_encode_sequence,
+	[OP_SET_SSV]		= (nfsd4_enc)nfsd4_encode_noop,
+	[OP_TEST_STATEID]	= (nfsd4_enc)nfsd4_encode_noop,
+	[OP_WANT_DELEGATION]	= (nfsd4_enc)nfsd4_encode_noop,
+	[OP_DESTROY_CLIENTID]	= (nfsd4_enc)nfsd4_encode_noop,
+	[OP_RECLAIM_COMPLETE]	= (nfsd4_enc)nfsd4_encode_noop,
 };
 
+/*
+ * Calculate the total amount of memory that the compound response has taken
+ * after encoding the current operation.
+ *
+ * pad: add on 8 bytes for the next operation's op_code and status so that
+ * there is room to cache a failure on the next operation.
+ *
+ * Compare this length to the session se_fmaxresp_cached.
+ *
+ * Our se_fmaxresp_cached will always be a multiple of PAGE_SIZE, and so
+ * will be at least a page and will therefore hold the xdr_buf head.
+ */
+static int nfsd4_check_drc_limit(struct nfsd4_compoundres *resp)
+{
+	int status = 0;
+	struct xdr_buf *xb = &resp->rqstp->rq_res;
+	struct nfsd4_compoundargs *args = resp->rqstp->rq_argp;
+	struct nfsd4_session *session = NULL;
+	struct nfsd4_slot *slot = resp->cstate.slot;
+	u32 length, tlen = 0, pad = 8;
+
+	if (!nfsd4_has_session(&resp->cstate))
+		return status;
+
+	session = resp->cstate.session;
+	if (session == NULL || slot->sl_cache_entry.ce_cachethis == 0)
+		return status;
+
+	if (resp->opcnt >= args->opcnt)
+		pad = 0; /* this is the last operation */
+
+	if (xb->page_len == 0) {
+		length = (char *)resp->p - (char *)xb->head[0].iov_base + pad;
+	} else {
+		if (xb->tail[0].iov_base && xb->tail[0].iov_len > 0)
+			tlen = (char *)resp->p - (char *)xb->tail[0].iov_base;
+
+		length = xb->head[0].iov_len + xb->page_len + tlen + pad;
+	}
+	dprintk("%s length %u, xb->page_len %u tlen %u pad %u\n", __func__,
+		length, xb->page_len, tlen, pad);
+
+	if (length <= session->se_fmaxresp_cached)
+		return status;
+	else
+		return nfserr_rep_too_big_to_cache;
+}
+
 void
 nfsd4_encode_operation(struct nfsd4_compoundres *resp, struct nfsd4_op *op)
 {
@@ -2635,6 +3217,9 @@ nfsd4_encode_operation(struct nfsd4_compoundres *resp, struct nfsd4_op *op)
 	BUG_ON(op->opnum < 0 || op->opnum >= ARRAY_SIZE(nfsd4_enc_ops) ||
 	       !nfsd4_enc_ops[op->opnum]);
 	op->status = nfsd4_enc_ops[op->opnum](resp, op->status, &op->u);
+	/* nfsd4_check_drc_limit guarantees enough room for error status */
+	if (!op->status && nfsd4_check_drc_limit(resp))
+		op->status = nfserr_rep_too_big_to_cache;
 status:
 	/*
 	 * Note: We write the status directly, instead of using WRITE32(),
@@ -2735,6 +3320,18 @@ nfs4svc_encode_compoundres(struct svc_rqst *rqstp, __be32 *p, struct nfsd4_compo
 		iov = &rqstp->rq_res.head[0];
 	iov->iov_len = ((char*)resp->p) - (char*)iov->iov_base;
 	BUG_ON(iov->iov_len > PAGE_SIZE);
+	if (nfsd4_has_session(&resp->cstate)) {
+		if (resp->cstate.status == nfserr_replay_cache &&
+				!nfsd4_not_cached(resp)) {
+			iov->iov_len = resp->cstate.iovlen;
+		} else {
+			nfsd4_store_cache_entry(resp);
+			dprintk("%s: SET SLOT STATE TO AVAILABLE\n", __func__);
+			resp->cstate.slot->sl_inuse = 0;
+		}
+		if (resp->cstate.session)
+			nfsd4_put_session(resp->cstate.session);
+	}
 	return 1;
 }
 
diff --git a/fs/nfsd/nfsctl.c b/fs/nfsd/nfsctl.c
index a4ed8644d69c..af16849d243a 100644
--- a/fs/nfsd/nfsctl.c
+++ b/fs/nfsd/nfsctl.c
@@ -60,6 +60,7 @@ enum {
 	NFSD_FO_UnlockFS,
 	NFSD_Threads,
 	NFSD_Pool_Threads,
+	NFSD_Pool_Stats,
 	NFSD_Versions,
 	NFSD_Ports,
 	NFSD_MaxBlkSize,
@@ -172,6 +173,16 @@ static const struct file_operations exports_operations = {
 	.owner		= THIS_MODULE,
 };
 
+extern int nfsd_pool_stats_open(struct inode *inode, struct file *file);
+
+static struct file_operations pool_stats_operations = {
+	.open		= nfsd_pool_stats_open,
+	.read		= seq_read,
+	.llseek		= seq_lseek,
+	.release	= seq_release,
+	.owner		= THIS_MODULE,
+};
+
 /*----------------------------------------------------------------------------*/
 /*
  * payload - write methods
@@ -781,8 +792,9 @@ out_free:
 static ssize_t __write_versions(struct file *file, char *buf, size_t size)
 {
 	char *mesg = buf;
-	char *vers, sign;
+	char *vers, *minorp, sign;
 	int len, num;
+	unsigned minor;
 	ssize_t tlen = 0;
 	char *sep;
 
@@ -803,9 +815,20 @@ static ssize_t __write_versions(struct file *file, char *buf, size_t size)
 		do {
 			sign = *vers;
 			if (sign == '+' || sign == '-')
-				num = simple_strtol((vers+1), NULL, 0);
+				num = simple_strtol((vers+1), &minorp, 0);
 			else
-				num = simple_strtol(vers, NULL, 0);
+				num = simple_strtol(vers, &minorp, 0);
+			if (*minorp == '.') {
+				if (num < 4)
+					return -EINVAL;
+				minor = simple_strtoul(minorp+1, NULL, 0);
+				if (minor == 0)
+					return -EINVAL;
+				if (nfsd_minorversion(minor, sign == '-' ?
+						     NFSD_CLEAR : NFSD_SET) < 0)
+					return -EINVAL;
+				goto next;
+			}
 			switch(num) {
 			case 2:
 			case 3:
@@ -815,6 +838,7 @@ static ssize_t __write_versions(struct file *file, char *buf, size_t size)
 			default:
 				return -EINVAL;
 			}
+		next:
 			vers += len + 1;
 			tlen += len;
 		} while ((len = qword_get(&mesg, vers, size)) > 0);
@@ -833,6 +857,13 @@ static ssize_t __write_versions(struct file *file, char *buf, size_t size)
 				       num);
 			sep = " ";
 		}
+	if (nfsd_vers(4, NFSD_AVAIL))
+		for (minor = 1; minor <= NFSD_SUPPORTED_MINOR_VERSION; minor++)
+			len += sprintf(buf+len, " %c4.%u",
+					(nfsd_vers(4, NFSD_TEST) &&
+					 nfsd_minorversion(minor, NFSD_TEST)) ?
+						'+' : '-',
+					minor);
 	len += sprintf(buf+len, "\n");
 	return len;
 }
@@ -1248,6 +1279,7 @@ static int nfsd_fill_super(struct super_block * sb, void * data, int silent)
 		[NFSD_Fh] = {"filehandle", &transaction_ops, S_IWUSR|S_IRUSR},
 		[NFSD_Threads] = {"threads", &transaction_ops, S_IWUSR|S_IRUSR},
 		[NFSD_Pool_Threads] = {"pool_threads", &transaction_ops, S_IWUSR|S_IRUSR},
+		[NFSD_Pool_Stats] = {"pool_stats", &pool_stats_operations, S_IRUGO},
 		[NFSD_Versions] = {"versions", &transaction_ops, S_IWUSR|S_IRUSR},
 		[NFSD_Ports] = {"portlist", &transaction_ops, S_IWUSR|S_IRUGO},
 		[NFSD_MaxBlkSize] = {"max_block_size", &transaction_ops, S_IWUSR|S_IRUGO},
diff --git a/fs/nfsd/nfsproc.c b/fs/nfsd/nfsproc.c
index 6f7f26351227..e298e260b5f1 100644
--- a/fs/nfsd/nfsproc.c
+++ b/fs/nfsd/nfsproc.c
@@ -180,6 +180,7 @@ nfsd_proc_write(struct svc_rqst *rqstp, struct nfsd_writeargs *argp,
 {
 	__be32	nfserr;
 	int	stable = 1;
+	unsigned long cnt = argp->len;
 
 	dprintk("nfsd: WRITE    %s %d bytes at %d\n",
 		SVCFH_fmt(&argp->fh),
@@ -188,7 +189,7 @@ nfsd_proc_write(struct svc_rqst *rqstp, struct nfsd_writeargs *argp,
 	nfserr = nfsd_write(rqstp, fh_copy(&resp->fh, &argp->fh), NULL,
 				   argp->offset,
 				   rqstp->rq_vec, argp->vlen,
-				   argp->len,
+			           &cnt,
 				   &stable);
 	return nfsd_return_attrs(nfserr, resp);
 }
diff --git a/fs/nfsd/nfssvc.c b/fs/nfsd/nfssvc.c
index 7c09852be713..cbba4a935786 100644
--- a/fs/nfsd/nfssvc.c
+++ b/fs/nfsd/nfssvc.c
@@ -22,6 +22,7 @@
 #include <linux/freezer.h>
 #include <linux/fs_struct.h>
 #include <linux/kthread.h>
+#include <linux/swap.h>
 
 #include <linux/sunrpc/types.h>
 #include <linux/sunrpc/stats.h>
@@ -40,9 +41,6 @@
 extern struct svc_program	nfsd_program;
 static int			nfsd(void *vrqstp);
 struct timeval			nfssvc_boot;
-static atomic_t			nfsd_busy;
-static unsigned long		nfsd_last_call;
-static DEFINE_SPINLOCK(nfsd_call_lock);
 
 /*
  * nfsd_mutex protects nfsd_serv -- both the pointer itself and the members
@@ -123,6 +121,8 @@ struct svc_program		nfsd_program = {
 
 };
 
+u32 nfsd_supported_minorversion;
+
 int nfsd_vers(int vers, enum vers_op change)
 {
 	if (vers < NFSD_MINVERS || vers >= NFSD_NRVERS)
@@ -149,6 +149,28 @@ int nfsd_vers(int vers, enum vers_op change)
 	}
 	return 0;
 }
+
+int nfsd_minorversion(u32 minorversion, enum vers_op change)
+{
+	if (minorversion > NFSD_SUPPORTED_MINOR_VERSION)
+		return -1;
+	switch(change) {
+	case NFSD_SET:
+		nfsd_supported_minorversion = minorversion;
+		break;
+	case NFSD_CLEAR:
+		if (minorversion == 0)
+			return -1;
+		nfsd_supported_minorversion = minorversion - 1;
+		break;
+	case NFSD_TEST:
+		return minorversion <= nfsd_supported_minorversion;
+	case NFSD_AVAIL:
+		return minorversion <= NFSD_SUPPORTED_MINOR_VERSION;
+	}
+	return 0;
+}
+
 /*
  * Maximum number of nfsd processes
  */
@@ -200,6 +222,28 @@ void nfsd_reset_versions(void)
 	}
 }
 
+/*
+ * Each session guarantees a negotiated per slot memory cache for replies
+ * which in turn consumes memory beyond the v2/v3/v4.0 server. A dedicated
+ * NFSv4.1 server might want to use more memory for a DRC than a machine
+ * with mutiple services.
+ *
+ * Impose a hard limit on the number of pages for the DRC which varies
+ * according to the machines free pages. This is of course only a default.
+ *
+ * For now this is a #defined shift which could be under admin control
+ * in the future.
+ */
+static void set_max_drc(void)
+{
+	/* The percent of nr_free_buffer_pages used by the V4.1 server DRC */
+	#define NFSD_DRC_SIZE_SHIFT	7
+	nfsd_serv->sv_drc_max_pages = nr_free_buffer_pages()
+						>> NFSD_DRC_SIZE_SHIFT;
+	nfsd_serv->sv_drc_pages_used = 0;
+	dprintk("%s svc_drc_max_pages %u\n", __func__,
+		nfsd_serv->sv_drc_max_pages);
+}
 
 int nfsd_create_serv(void)
 {
@@ -227,11 +271,12 @@ int nfsd_create_serv(void)
 			nfsd_max_blksize /= 2;
 	}
 
-	atomic_set(&nfsd_busy, 0);
 	nfsd_serv = svc_create_pooled(&nfsd_program, nfsd_max_blksize,
 				      nfsd_last_thread, nfsd, THIS_MODULE);
 	if (nfsd_serv == NULL)
 		err = -ENOMEM;
+	else
+		set_max_drc();
 
 	do_gettimeofday(&nfssvc_boot);		/* record boot time */
 	return err;
@@ -375,26 +420,6 @@ nfsd_svc(unsigned short port, int nrservs)
 	return error;
 }
 
-static inline void
-update_thread_usage(int busy_threads)
-{
-	unsigned long prev_call;
-	unsigned long diff;
-	int decile;
-
-	spin_lock(&nfsd_call_lock);
-	prev_call = nfsd_last_call;
-	nfsd_last_call = jiffies;
-	decile = busy_threads*10/nfsdstats.th_cnt;
-	if (decile>0 && decile <= 10) {
-		diff = nfsd_last_call - prev_call;
-		if ( (nfsdstats.th_usage[decile-1] += diff) >= NFSD_USAGE_WRAP)
-			nfsdstats.th_usage[decile-1] -= NFSD_USAGE_WRAP;
-		if (decile == 10)
-			nfsdstats.th_fullcnt++;
-	}
-	spin_unlock(&nfsd_call_lock);
-}
 
 /*
  * This is the NFS server kernel thread
@@ -460,8 +485,6 @@ nfsd(void *vrqstp)
 			continue;
 		}
 
-		update_thread_usage(atomic_read(&nfsd_busy));
-		atomic_inc(&nfsd_busy);
 
 		/* Lock the export hash tables for reading. */
 		exp_readlock();
@@ -470,8 +493,6 @@ nfsd(void *vrqstp)
 
 		/* Unlock export hash tables */
 		exp_readunlock();
-		update_thread_usage(atomic_read(&nfsd_busy));
-		atomic_dec(&nfsd_busy);
 	}
 
 	/* Clear signals before calling svc_exit_thread() */
@@ -539,6 +560,10 @@ nfsd_dispatch(struct svc_rqst *rqstp, __be32 *statp)
 		+ rqstp->rq_res.head[0].iov_len;
 	rqstp->rq_res.head[0].iov_len += sizeof(__be32);
 
+	/* NFSv4.1 DRC requires statp */
+	if (rqstp->rq_vers == 4)
+		nfsd4_set_statp(rqstp, statp);
+
 	/* Now call the procedure handler, and encode NFS status. */
 	nfserr = proc->pc_func(rqstp, rqstp->rq_argp, rqstp->rq_resp);
 	nfserr = map_new_errors(rqstp->rq_vers, nfserr);
@@ -570,3 +595,10 @@ nfsd_dispatch(struct svc_rqst *rqstp, __be32 *statp)
 	nfsd_cache_update(rqstp, proc->pc_cachetype, statp + 1);
 	return 1;
 }
+
+int nfsd_pool_stats_open(struct inode *inode, struct file *file)
+{
+	if (nfsd_serv == NULL)
+		return -ENODEV;
+	return svc_pool_stats_open(nfsd_serv, file);
+}
diff --git a/fs/nfsd/vfs.c b/fs/nfsd/vfs.c
index 78376b6c0236..ab93fcfef254 100644
--- a/fs/nfsd/vfs.c
+++ b/fs/nfsd/vfs.c
@@ -366,8 +366,9 @@ nfsd_setattr(struct svc_rqst *rqstp, struct svc_fh *fhp, struct iattr *iap,
 	}
 
 	/* Revoke setuid/setgid on chown */
-	if (((iap->ia_valid & ATTR_UID) && iap->ia_uid != inode->i_uid) ||
-	    ((iap->ia_valid & ATTR_GID) && iap->ia_gid != inode->i_gid)) {
+	if (!S_ISDIR(inode->i_mode) &&
+	    (((iap->ia_valid & ATTR_UID) && iap->ia_uid != inode->i_uid) ||
+	     ((iap->ia_valid & ATTR_GID) && iap->ia_gid != inode->i_gid))) {
 		iap->ia_valid |= ATTR_KILL_PRIV;
 		if (iap->ia_valid & ATTR_MODE) {
 			/* we're setting mode too, just clear the s*id bits */
@@ -960,7 +961,7 @@ static void kill_suid(struct dentry *dentry)
 static __be32
 nfsd_vfs_write(struct svc_rqst *rqstp, struct svc_fh *fhp, struct file *file,
 				loff_t offset, struct kvec *vec, int vlen,
-	   			unsigned long cnt, int *stablep)
+				unsigned long *cnt, int *stablep)
 {
 	struct svc_export	*exp;
 	struct dentry		*dentry;
@@ -974,7 +975,7 @@ nfsd_vfs_write(struct svc_rqst *rqstp, struct svc_fh *fhp, struct file *file,
 	err = nfserr_perm;
 
 	if ((fhp->fh_export->ex_flags & NFSEXP_MSNFS) &&
-		(!lock_may_write(file->f_path.dentry->d_inode, offset, cnt)))
+		(!lock_may_write(file->f_path.dentry->d_inode, offset, *cnt)))
 		goto out;
 #endif
 
@@ -1009,7 +1010,7 @@ nfsd_vfs_write(struct svc_rqst *rqstp, struct svc_fh *fhp, struct file *file,
 	host_err = vfs_writev(file, (struct iovec __user *)vec, vlen, &offset);
 	set_fs(oldfs);
 	if (host_err >= 0) {
-		nfsdstats.io_write += cnt;
+		nfsdstats.io_write += host_err;
 		fsnotify_modify(file->f_path.dentry);
 	}
 
@@ -1054,9 +1055,10 @@ nfsd_vfs_write(struct svc_rqst *rqstp, struct svc_fh *fhp, struct file *file,
 	}
 
 	dprintk("nfsd: write complete host_err=%d\n", host_err);
-	if (host_err >= 0)
+	if (host_err >= 0) {
 		err = 0;
-	else 
+		*cnt = host_err;
+	} else
 		err = nfserrno(host_err);
 out:
 	return err;
@@ -1098,7 +1100,7 @@ out:
  */
 __be32
 nfsd_write(struct svc_rqst *rqstp, struct svc_fh *fhp, struct file *file,
-		loff_t offset, struct kvec *vec, int vlen, unsigned long cnt,
+		loff_t offset, struct kvec *vec, int vlen, unsigned long *cnt,
 		int *stablep)
 {
 	__be32			err = 0;
@@ -1179,6 +1181,21 @@ nfsd_create_setattr(struct svc_rqst *rqstp, struct svc_fh *resfhp,
 	return 0;
 }
 
+/* HPUX client sometimes creates a file in mode 000, and sets size to 0.
+ * setting size to 0 may fail for some specific file systems by the permission
+ * checking which requires WRITE permission but the mode is 000.
+ * we ignore the resizing(to 0) on the just new created file, since the size is
+ * 0 after file created.
+ *
+ * call this only after vfs_create() is called.
+ * */
+static void
+nfsd_check_ignore_resizing(struct iattr *iap)
+{
+	if ((iap->ia_valid & ATTR_SIZE) && (iap->ia_size == 0))
+		iap->ia_valid &= ~ATTR_SIZE;
+}
+
 /*
  * Create a file (regular, directory, device, fifo); UNIX sockets 
  * not yet implemented.
@@ -1274,6 +1291,8 @@ nfsd_create(struct svc_rqst *rqstp, struct svc_fh *fhp,
 	switch (type) {
 	case S_IFREG:
 		host_err = vfs_create(dirp, dchild, iap->ia_mode, NULL);
+		if (!host_err)
+			nfsd_check_ignore_resizing(iap);
 		break;
 	case S_IFDIR:
 		host_err = vfs_mkdir(dirp, dchild, iap->ia_mode);
@@ -1427,6 +1446,8 @@ nfsd_create_v3(struct svc_rqst *rqstp, struct svc_fh *fhp,
 		/* setattr will sync the child (or not) */
 	}
 
+	nfsd_check_ignore_resizing(iap);
+
 	if (createmode == NFS3_CREATE_EXCLUSIVE) {
 		/* Cram the verifier into atime/mtime */
 		iap->ia_valid = ATTR_MTIME|ATTR_ATIME