summary refs log tree commit diff
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2016-10-13 21:04:42 -0700
committerLinus Torvalds <torvalds@linux-foundation.org>2016-10-13 21:04:42 -0700
commit2778556474b1996aa68ae61619386b8802733bd8 (patch)
treecc136dd6d4589073b5aedb28429eb72cdd067106
parent35a891be96f1f8e1227e6ad3ca827b8a08ce47ea (diff)
parent29ae7f9dc21a7dda41d78b27bbda7d427ece8ad4 (diff)
downloadlinux-2778556474b1996aa68ae61619386b8802733bd8.tar.gz
Merge tag 'nfsd-4.9' of git://linux-nfs.org/~bfields/linux
Pull nfsd updates from Bruce Fields:
 "Some RDMA work and some good bugfixes, and two new features that could
  benefit from user testing:

   - Anna Schumacker contributed a simple NFSv4.2 COPY implementation.
     COPY is already supported on the client side, so a call to
     copy_file_range() on a recent client should now result in a
     server-side copy that doesn't require all the data to make a round
     trip to the client and back.

   - Jeff Layton implemented callbacks to notify clients when contended
     locks become available, which should reduce latency on workloads
     with contended locks"

* tag 'nfsd-4.9' of git://linux-nfs.org/~bfields/linux:
  NFSD: Implement the COPY call
  nfsd: handle EUCLEAN
  nfsd: only WARN once on unmapped errors
  exportfs: be careful to only return expected errors.
  nfsd4: setclientid_confirm with unmatched verifier should fail
  nfsd: randomize SETCLIENTID reply to help distinguish servers
  nfsd: set the MAY_NOTIFY_LOCK flag in OPEN replies
  nfs: add a new NFS4_OPEN_RESULT_MAY_NOTIFY_LOCK constant
  nfsd: add a LRU list for blocked locks
  nfsd: have nfsd4_lock use blocking locks for v4.1+ locks
  nfsd: plumb in a CB_NOTIFY_LOCK operation
  NFSD: fix corruption in notifier registration
  svcrdma: support Remote Invalidation
  svcrdma: Server-side support for rpcrdma_connect_private
  rpcrdma: RDMA/CM private message data structure
  svcrdma: Skip put_page() when send_reply() fails
  svcrdma: Tail iovec leaves an orphaned DMA mapping
  nfsd: fix dprintk in nfsd4_encode_getdeviceinfo
  nfsd: eliminate cb_minorversion field
  nfsd: don't set a FL_LAYOUT lease for flexfiles layouts
-rw-r--r--fs/exportfs/expfs.c10
-rw-r--r--fs/nfsd/flexfilelayout.c1
-rw-r--r--fs/nfsd/netns.h1
-rw-r--r--fs/nfsd/nfs4callback.c64
-rw-r--r--fs/nfsd/nfs4layouts.c6
-rw-r--r--fs/nfsd/nfs4proc.c90
-rw-r--r--fs/nfsd/nfs4state.c237
-rw-r--r--fs/nfsd/nfs4xdr.c65
-rw-r--r--fs/nfsd/nfsctl.c2
-rw-r--r--fs/nfsd/nfsproc.c3
-rw-r--r--fs/nfsd/nfssvc.c18
-rw-r--r--fs/nfsd/pnfs.h1
-rw-r--r--fs/nfsd/state.h22
-rw-r--r--fs/nfsd/vfs.c16
-rw-r--r--fs/nfsd/vfs.h2
-rw-r--r--fs/nfsd/xdr4.h23
-rw-r--r--fs/nfsd/xdr4cb.h9
-rw-r--r--include/linux/exportfs.h13
-rw-r--r--include/linux/sunrpc/rpc_rdma.h35
-rw-r--r--include/linux/sunrpc/svc_rdma.h10
-rw-r--r--include/uapi/linux/nfs4.h5
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_backchannel.c2
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_recvfrom.c2
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_sendto.c82
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_transport.c60
25 files changed, 683 insertions, 96 deletions
diff --git a/fs/exportfs/expfs.c b/fs/exportfs/expfs.c
index 207ba8d627ca..a4b531be9168 100644
--- a/fs/exportfs/expfs.c
+++ b/fs/exportfs/expfs.c
@@ -428,10 +428,10 @@ struct dentry *exportfs_decode_fh(struct vfsmount *mnt, struct fid *fid,
 	if (!nop || !nop->fh_to_dentry)
 		return ERR_PTR(-ESTALE);
 	result = nop->fh_to_dentry(mnt->mnt_sb, fid, fh_len, fileid_type);
-	if (!result)
-		result = ERR_PTR(-ESTALE);
-	if (IS_ERR(result))
-		return result;
+	if (PTR_ERR(result) == -ENOMEM)
+		return ERR_CAST(result);
+	if (IS_ERR_OR_NULL(result))
+		return ERR_PTR(-ESTALE);
 
 	if (d_is_dir(result)) {
 		/*
@@ -541,6 +541,8 @@ struct dentry *exportfs_decode_fh(struct vfsmount *mnt, struct fid *fid,
 
  err_result:
 	dput(result);
+	if (err != -ENOMEM)
+		err = -ESTALE;
 	return ERR_PTR(err);
 }
 EXPORT_SYMBOL_GPL(exportfs_decode_fh);
diff --git a/fs/nfsd/flexfilelayout.c b/fs/nfsd/flexfilelayout.c
index df880e9fa71f..b67287383010 100644
--- a/fs/nfsd/flexfilelayout.c
+++ b/fs/nfsd/flexfilelayout.c
@@ -126,6 +126,7 @@ nfsd4_ff_proc_getdeviceinfo(struct super_block *sb, struct svc_rqst *rqstp,
 const struct nfsd4_layout_ops ff_layout_ops = {
 	.notify_types		=
 			NOTIFY_DEVICEID4_DELETE | NOTIFY_DEVICEID4_CHANGE,
+	.disable_recalls	= true,
 	.proc_getdeviceinfo	= nfsd4_ff_proc_getdeviceinfo,
 	.encode_getdeviceinfo	= nfsd4_ff_encode_getdeviceinfo,
 	.proc_layoutget		= nfsd4_ff_proc_layoutget,
diff --git a/fs/nfsd/netns.h b/fs/nfsd/netns.h
index 5fbf3bbd00d0..b10d557f9c9e 100644
--- a/fs/nfsd/netns.h
+++ b/fs/nfsd/netns.h
@@ -84,6 +84,7 @@ struct nfsd_net {
 	struct list_head client_lru;
 	struct list_head close_lru;
 	struct list_head del_recall_lru;
+	struct list_head blocked_locks_lru;
 
 	struct delayed_work laundromat_work;
 
diff --git a/fs/nfsd/nfs4callback.c b/fs/nfsd/nfs4callback.c
index 04c68d900324..211dc2aed8e1 100644
--- a/fs/nfsd/nfs4callback.c
+++ b/fs/nfsd/nfs4callback.c
@@ -448,7 +448,7 @@ static int decode_cb_sequence4res(struct xdr_stream *xdr,
 {
 	int status;
 
-	if (cb->cb_minorversion == 0)
+	if (cb->cb_clp->cl_minorversion == 0)
 		return 0;
 
 	status = decode_cb_op_status(xdr, OP_CB_SEQUENCE, &cb->cb_seq_status);
@@ -485,7 +485,7 @@ static void nfs4_xdr_enc_cb_recall(struct rpc_rqst *req, struct xdr_stream *xdr,
 	const struct nfs4_delegation *dp = cb_to_delegation(cb);
 	struct nfs4_cb_compound_hdr hdr = {
 		.ident = cb->cb_clp->cl_cb_ident,
-		.minorversion = cb->cb_minorversion,
+		.minorversion = cb->cb_clp->cl_minorversion,
 	};
 
 	encode_cb_compound4args(xdr, &hdr);
@@ -594,7 +594,7 @@ static void nfs4_xdr_enc_cb_layout(struct rpc_rqst *req,
 		container_of(cb, struct nfs4_layout_stateid, ls_recall);
 	struct nfs4_cb_compound_hdr hdr = {
 		.ident = 0,
-		.minorversion = cb->cb_minorversion,
+		.minorversion = cb->cb_clp->cl_minorversion,
 	};
 
 	encode_cb_compound4args(xdr, &hdr);
@@ -623,6 +623,62 @@ static int nfs4_xdr_dec_cb_layout(struct rpc_rqst *rqstp,
 }
 #endif /* CONFIG_NFSD_PNFS */
 
+static void encode_stateowner(struct xdr_stream *xdr, struct nfs4_stateowner *so)
+{
+	__be32	*p;
+
+	p = xdr_reserve_space(xdr, 8 + 4 + so->so_owner.len);
+	p = xdr_encode_opaque_fixed(p, &so->so_client->cl_clientid, 8);
+	xdr_encode_opaque(p, so->so_owner.data, so->so_owner.len);
+}
+
+static void nfs4_xdr_enc_cb_notify_lock(struct rpc_rqst *req,
+					struct xdr_stream *xdr,
+					const struct nfsd4_callback *cb)
+{
+	const struct nfsd4_blocked_lock *nbl =
+		container_of(cb, struct nfsd4_blocked_lock, nbl_cb);
+	struct nfs4_lockowner *lo = (struct nfs4_lockowner *)nbl->nbl_lock.fl_owner;
+	struct nfs4_cb_compound_hdr hdr = {
+		.ident = 0,
+		.minorversion = cb->cb_clp->cl_minorversion,
+	};
+
+	__be32 *p;
+
+	BUG_ON(hdr.minorversion == 0);
+
+	encode_cb_compound4args(xdr, &hdr);
+	encode_cb_sequence4args(xdr, cb, &hdr);
+
+	p = xdr_reserve_space(xdr, 4);
+	*p = cpu_to_be32(OP_CB_NOTIFY_LOCK);
+	encode_nfs_fh4(xdr, &nbl->nbl_fh);
+	encode_stateowner(xdr, &lo->lo_owner);
+	hdr.nops++;
+
+	encode_cb_nops(&hdr);
+}
+
+static int nfs4_xdr_dec_cb_notify_lock(struct rpc_rqst *rqstp,
+					struct xdr_stream *xdr,
+					struct nfsd4_callback *cb)
+{
+	struct nfs4_cb_compound_hdr hdr;
+	int status;
+
+	status = decode_cb_compound4res(xdr, &hdr);
+	if (unlikely(status))
+		return status;
+
+	if (cb) {
+		status = decode_cb_sequence4res(xdr, cb);
+		if (unlikely(status || cb->cb_seq_status))
+			return status;
+	}
+	return decode_cb_op_status(xdr, OP_CB_NOTIFY_LOCK, &cb->cb_status);
+}
+
 /*
  * RPC procedure tables
  */
@@ -643,6 +699,7 @@ static struct rpc_procinfo nfs4_cb_procedures[] = {
 #ifdef CONFIG_NFSD_PNFS
 	PROC(CB_LAYOUT,	COMPOUND,	cb_layout,	cb_layout),
 #endif
+	PROC(CB_NOTIFY_LOCK,	COMPOUND,	cb_notify_lock,	cb_notify_lock),
 };
 
 static struct rpc_version nfs_cb_version4 = {
@@ -862,7 +919,6 @@ static void nfsd4_cb_prepare(struct rpc_task *task, void *calldata)
 	struct nfs4_client *clp = cb->cb_clp;
 	u32 minorversion = clp->cl_minorversion;
 
-	cb->cb_minorversion = minorversion;
 	/*
 	 * cb_seq_status is only set in decode_cb_sequence4res,
 	 * and so will remain 1 if an rpc level failure occurs.
diff --git a/fs/nfsd/nfs4layouts.c b/fs/nfsd/nfs4layouts.c
index 2be9602b0221..42aace4fc4c8 100644
--- a/fs/nfsd/nfs4layouts.c
+++ b/fs/nfsd/nfs4layouts.c
@@ -174,7 +174,8 @@ nfsd4_free_layout_stateid(struct nfs4_stid *stid)
 	list_del_init(&ls->ls_perfile);
 	spin_unlock(&fp->fi_lock);
 
-	vfs_setlease(ls->ls_file, F_UNLCK, NULL, (void **)&ls);
+	if (!nfsd4_layout_ops[ls->ls_layout_type]->disable_recalls)
+		vfs_setlease(ls->ls_file, F_UNLCK, NULL, (void **)&ls);
 	fput(ls->ls_file);
 
 	if (ls->ls_recalled)
@@ -189,6 +190,9 @@ nfsd4_layout_setlease(struct nfs4_layout_stateid *ls)
 	struct file_lock *fl;
 	int status;
 
+	if (nfsd4_layout_ops[ls->ls_layout_type]->disable_recalls)
+		return 0;
+
 	fl = locks_alloc_lock();
 	if (!fl)
 		return -ENOMEM;
diff --git a/fs/nfsd/nfs4proc.c b/fs/nfsd/nfs4proc.c
index 1fb222752b2b..abb09b580389 100644
--- a/fs/nfsd/nfs4proc.c
+++ b/fs/nfsd/nfs4proc.c
@@ -1010,47 +1010,97 @@ nfsd4_write(struct svc_rqst *rqstp, struct nfsd4_compound_state *cstate,
 }
 
 static __be32
-nfsd4_clone(struct svc_rqst *rqstp, struct nfsd4_compound_state *cstate,
-		struct nfsd4_clone *clone)
+nfsd4_verify_copy(struct svc_rqst *rqstp, struct nfsd4_compound_state *cstate,
+		  stateid_t *src_stateid, struct file **src,
+		  stateid_t *dst_stateid, struct file **dst)
 {
-	struct file *src, *dst;
 	__be32 status;
 
 	status = nfs4_preprocess_stateid_op(rqstp, cstate, &cstate->save_fh,
-					    &clone->cl_src_stateid, RD_STATE,
-					    &src, NULL);
+					    src_stateid, RD_STATE, src, NULL);
 	if (status) {
 		dprintk("NFSD: %s: couldn't process src stateid!\n", __func__);
 		goto out;
 	}
 
 	status = nfs4_preprocess_stateid_op(rqstp, cstate, &cstate->current_fh,
-					    &clone->cl_dst_stateid, WR_STATE,
-					    &dst, NULL);
+					    dst_stateid, WR_STATE, dst, NULL);
 	if (status) {
 		dprintk("NFSD: %s: couldn't process dst stateid!\n", __func__);
 		goto out_put_src;
 	}
 
 	/* fix up for NFS-specific error code */
-	if (!S_ISREG(file_inode(src)->i_mode) ||
-	    !S_ISREG(file_inode(dst)->i_mode)) {
+	if (!S_ISREG(file_inode(*src)->i_mode) ||
+	    !S_ISREG(file_inode(*dst)->i_mode)) {
 		status = nfserr_wrong_type;
 		goto out_put_dst;
 	}
 
+out:
+	return status;
+out_put_dst:
+	fput(*dst);
+out_put_src:
+	fput(*src);
+	goto out;
+}
+
+static __be32
+nfsd4_clone(struct svc_rqst *rqstp, struct nfsd4_compound_state *cstate,
+		struct nfsd4_clone *clone)
+{
+	struct file *src, *dst;
+	__be32 status;
+
+	status = nfsd4_verify_copy(rqstp, cstate, &clone->cl_src_stateid, &src,
+				   &clone->cl_dst_stateid, &dst);
+	if (status)
+		goto out;
+
 	status = nfsd4_clone_file_range(src, clone->cl_src_pos,
 			dst, clone->cl_dst_pos, clone->cl_count);
 
-out_put_dst:
 	fput(dst);
-out_put_src:
 	fput(src);
 out:
 	return status;
 }
 
 static __be32
+nfsd4_copy(struct svc_rqst *rqstp, struct nfsd4_compound_state *cstate,
+		struct nfsd4_copy *copy)
+{
+	struct file *src, *dst;
+	__be32 status;
+	ssize_t bytes;
+
+	status = nfsd4_verify_copy(rqstp, cstate, &copy->cp_src_stateid, &src,
+				   &copy->cp_dst_stateid, &dst);
+	if (status)
+		goto out;
+
+	bytes = nfsd_copy_file_range(src, copy->cp_src_pos,
+			dst, copy->cp_dst_pos, copy->cp_count);
+
+	if (bytes < 0)
+		status = nfserrno(bytes);
+	else {
+		copy->cp_res.wr_bytes_written = bytes;
+		copy->cp_res.wr_stable_how = NFS_UNSTABLE;
+		copy->cp_consecutive = 1;
+		copy->cp_synchronous = 1;
+		gen_boot_verifier(&copy->cp_res.wr_verifier, SVC_NET(rqstp));
+		status = nfs_ok;
+	}
+
+	fput(src);
+	fput(dst);
+out:
+	return status;
+}
+
+static __be32
 nfsd4_fallocate(struct svc_rqst *rqstp, struct nfsd4_compound_state *cstate,
 		struct nfsd4_fallocate *fallocate, int flags)
 {
@@ -1966,6 +2016,18 @@ static inline u32 nfsd4_create_session_rsize(struct svc_rqst *rqstp, struct nfsd
 		op_encode_channel_attrs_maxsz) * sizeof(__be32);
 }
 
+static inline u32 nfsd4_copy_rsize(struct svc_rqst *rqstp, struct nfsd4_op *op)
+{
+	return (op_encode_hdr_size +
+		1 /* wr_callback */ +
+		op_encode_stateid_maxsz /* wr_callback */ +
+		2 /* wr_count */ +
+		1 /* wr_committed */ +
+		op_encode_verifier_maxsz +
+		1 /* cr_consecutive */ +
+		1 /* cr_synchronous */) * sizeof(__be32);
+}
+
 #ifdef CONFIG_NFSD_PNFS
 /*
  * At this stage we don't really know what layout driver will handle the request,
@@ -2328,6 +2390,12 @@ static struct nfsd4_operation nfsd4_ops[] = {
 		.op_name = "OP_CLONE",
 		.op_rsize_bop = (nfsd4op_rsize)nfsd4_only_status_rsize,
 	},
+	[OP_COPY] = {
+		.op_func = (nfsd4op_func)nfsd4_copy,
+		.op_flags = OP_MODIFIES_SOMETHING | OP_CACHEME,
+		.op_name = "OP_COPY",
+		.op_rsize_bop = (nfsd4op_rsize)nfsd4_copy_rsize,
+	},
 	[OP_SEEK] = {
 		.op_func = (nfsd4op_func)nfsd4_seek,
 		.op_name = "OP_SEEK",
diff --git a/fs/nfsd/nfs4state.c b/fs/nfsd/nfs4state.c
index 39bfaba9c99c..9752beb78659 100644
--- a/fs/nfsd/nfs4state.c
+++ b/fs/nfsd/nfs4state.c
@@ -99,6 +99,7 @@ static struct kmem_cache *odstate_slab;
 static void free_session(struct nfsd4_session *);
 
 static const struct nfsd4_callback_ops nfsd4_cb_recall_ops;
+static const struct nfsd4_callback_ops nfsd4_cb_notify_lock_ops;
 
 static bool is_session_dead(struct nfsd4_session *ses)
 {
@@ -210,6 +211,85 @@ static void nfsd4_put_session(struct nfsd4_session *ses)
 	spin_unlock(&nn->client_lock);
 }
 
+static struct nfsd4_blocked_lock *
+find_blocked_lock(struct nfs4_lockowner *lo, struct knfsd_fh *fh,
+			struct nfsd_net *nn)
+{
+	struct nfsd4_blocked_lock *cur, *found = NULL;
+
+	spin_lock(&nn->client_lock);
+	list_for_each_entry(cur, &lo->lo_blocked, nbl_list) {
+		if (fh_match(fh, &cur->nbl_fh)) {
+			list_del_init(&cur->nbl_list);
+			list_del_init(&cur->nbl_lru);
+			found = cur;
+			break;
+		}
+	}
+	spin_unlock(&nn->client_lock);
+	if (found)
+		posix_unblock_lock(&found->nbl_lock);
+	return found;
+}
+
+static struct nfsd4_blocked_lock *
+find_or_allocate_block(struct nfs4_lockowner *lo, struct knfsd_fh *fh,
+			struct nfsd_net *nn)
+{
+	struct nfsd4_blocked_lock *nbl;
+
+	nbl = find_blocked_lock(lo, fh, nn);
+	if (!nbl) {
+		nbl= kmalloc(sizeof(*nbl), GFP_KERNEL);
+		if (nbl) {
+			fh_copy_shallow(&nbl->nbl_fh, fh);
+			locks_init_lock(&nbl->nbl_lock);
+			nfsd4_init_cb(&nbl->nbl_cb, lo->lo_owner.so_client,
+					&nfsd4_cb_notify_lock_ops,
+					NFSPROC4_CLNT_CB_NOTIFY_LOCK);
+		}
+	}
+	return nbl;
+}
+
+static void
+free_blocked_lock(struct nfsd4_blocked_lock *nbl)
+{
+	locks_release_private(&nbl->nbl_lock);
+	kfree(nbl);
+}
+
+static int
+nfsd4_cb_notify_lock_done(struct nfsd4_callback *cb, struct rpc_task *task)
+{
+	/*
+	 * Since this is just an optimization, we don't try very hard if it
+	 * turns out not to succeed. We'll requeue it on NFS4ERR_DELAY, and
+	 * just quit trying on anything else.
+	 */
+	switch (task->tk_status) {
+	case -NFS4ERR_DELAY:
+		rpc_delay(task, 1 * HZ);
+		return 0;
+	default:
+		return 1;
+	}
+}
+
+static void
+nfsd4_cb_notify_lock_release(struct nfsd4_callback *cb)
+{
+	struct nfsd4_blocked_lock	*nbl = container_of(cb,
+						struct nfsd4_blocked_lock, nbl_cb);
+
+	free_blocked_lock(nbl);
+}
+
+static const struct nfsd4_callback_ops nfsd4_cb_notify_lock_ops = {
+	.done		= nfsd4_cb_notify_lock_done,
+	.release	= nfsd4_cb_notify_lock_release,
+};
+
 static inline struct nfs4_stateowner *
 nfs4_get_stateowner(struct nfs4_stateowner *sop)
 {
@@ -3224,9 +3304,10 @@ nfsd4_setclientid_confirm(struct svc_rqst *rqstp,
 		goto out;
 	/* cases below refer to rfc 3530 section 14.2.34: */
 	if (!unconf || !same_verf(&confirm, &unconf->cl_confirm)) {
-		if (conf && !unconf) /* case 2: probable retransmit */
+		if (conf && same_verf(&confirm, &conf->cl_confirm)) {
+			/* case 2: probable retransmit */
 			status = nfs_ok;
-		else /* case 4: client hasn't noticed we rebooted yet? */
+		} else /* case 4: client hasn't noticed we rebooted yet? */
 			status = nfserr_stale_clientid;
 		goto out;
 	}
@@ -4410,9 +4491,11 @@ out:
 	* To finish the open response, we just need to set the rflags.
 	*/
 	open->op_rflags = NFS4_OPEN_RESULT_LOCKTYPE_POSIX;
-	if (!(open->op_openowner->oo_flags & NFS4_OO_CONFIRMED) &&
-	    !nfsd4_has_session(&resp->cstate))
+	if (nfsd4_has_session(&resp->cstate))
+		open->op_rflags |= NFS4_OPEN_RESULT_MAY_NOTIFY_LOCK;
+	else if (!(open->op_openowner->oo_flags & NFS4_OO_CONFIRMED))
 		open->op_rflags |= NFS4_OPEN_RESULT_CONFIRM;
+
 	if (dp)
 		nfs4_put_stid(&dp->dl_stid);
 	if (stp)
@@ -4501,6 +4584,7 @@ nfs4_laundromat(struct nfsd_net *nn)
 	struct nfs4_openowner *oo;
 	struct nfs4_delegation *dp;
 	struct nfs4_ol_stateid *stp;
+	struct nfsd4_blocked_lock *nbl;
 	struct list_head *pos, *next, reaplist;
 	time_t cutoff = get_seconds() - nn->nfsd4_lease;
 	time_t t, new_timeo = nn->nfsd4_lease;
@@ -4569,6 +4653,41 @@ nfs4_laundromat(struct nfsd_net *nn)
 	}
 	spin_unlock(&nn->client_lock);
 
+	/*
+	 * It's possible for a client to try and acquire an already held lock
+	 * that is being held for a long time, and then lose interest in it.
+	 * So, we clean out any un-revisited request after a lease period
+	 * under the assumption that the client is no longer interested.
+	 *
+	 * RFC5661, sec. 9.6 states that the client must not rely on getting
+	 * notifications and must continue to poll for locks, even when the
+	 * server supports them. Thus this shouldn't lead to clients blocking
+	 * indefinitely once the lock does become free.
+	 */
+	BUG_ON(!list_empty(&reaplist));
+	spin_lock(&nn->client_lock);
+	while (!list_empty(&nn->blocked_locks_lru)) {
+		nbl = list_first_entry(&nn->blocked_locks_lru,
+					struct nfsd4_blocked_lock, nbl_lru);
+		if (time_after((unsigned long)nbl->nbl_time,
+			       (unsigned long)cutoff)) {
+			t = nbl->nbl_time - cutoff;
+			new_timeo = min(new_timeo, t);
+			break;
+		}
+		list_move(&nbl->nbl_lru, &reaplist);
+		list_del_init(&nbl->nbl_list);
+	}
+	spin_unlock(&nn->client_lock);
+
+	while (!list_empty(&reaplist)) {
+		nbl = list_first_entry(&nn->blocked_locks_lru,
+					struct nfsd4_blocked_lock, nbl_lru);
+		list_del_init(&nbl->nbl_lru);
+		posix_unblock_lock(&nbl->nbl_lock);
+		free_blocked_lock(nbl);
+	}
+
 	new_timeo = max_t(time_t, new_timeo, NFSD_LAUNDROMAT_MINTIMEOUT);
 	return new_timeo;
 }
@@ -5309,7 +5428,31 @@ nfsd4_fl_put_owner(fl_owner_t owner)
 		nfs4_put_stateowner(&lo->lo_owner);
 }
 
+static void
+nfsd4_lm_notify(struct file_lock *fl)
+{
+	struct nfs4_lockowner		*lo = (struct nfs4_lockowner *)fl->fl_owner;
+	struct net			*net = lo->lo_owner.so_client->net;
+	struct nfsd_net			*nn = net_generic(net, nfsd_net_id);
+	struct nfsd4_blocked_lock	*nbl = container_of(fl,
+						struct nfsd4_blocked_lock, nbl_lock);
+	bool queue = false;
+
+	/* An empty list means that something else is going to be using it */
+	spin_lock(&nn->client_lock);
+	if (!list_empty(&nbl->nbl_list)) {
+		list_del_init(&nbl->nbl_list);
+		list_del_init(&nbl->nbl_lru);
+		queue = true;
+	}
+	spin_unlock(&nn->client_lock);
+
+	if (queue)
+		nfsd4_run_cb(&nbl->nbl_cb);
+}
+
 static const struct lock_manager_operations nfsd_posix_mng_ops  = {
+	.lm_notify = nfsd4_lm_notify,
 	.lm_get_owner = nfsd4_fl_get_owner,
 	.lm_put_owner = nfsd4_fl_put_owner,
 };
@@ -5407,6 +5550,7 @@ alloc_init_lock_stateowner(unsigned int strhashval, struct nfs4_client *clp,
 	lo = alloc_stateowner(lockowner_slab, &lock->lk_new_owner, clp);
 	if (!lo)
 		return NULL;
+	INIT_LIST_HEAD(&lo->lo_blocked);
 	INIT_LIST_HEAD(&lo->lo_owner.so_stateids);
 	lo->lo_owner.so_is_open_owner = 0;
 	lo->lo_owner.so_seqid = lock->lk_new_lock_seqid;
@@ -5588,12 +5732,15 @@ nfsd4_lock(struct svc_rqst *rqstp, struct nfsd4_compound_state *cstate,
 	struct nfs4_ol_stateid *open_stp = NULL;
 	struct nfs4_file *fp;
 	struct file *filp = NULL;
+	struct nfsd4_blocked_lock *nbl = NULL;
 	struct file_lock *file_lock = NULL;
 	struct file_lock *conflock = NULL;
 	__be32 status = 0;
 	int lkflg;
 	int err;
 	bool new = false;
+	unsigned char fl_type;
+	unsigned int fl_flags = FL_POSIX;
 	struct net *net = SVC_NET(rqstp);
 	struct nfsd_net *nn = net_generic(net, nfsd_net_id);
 
@@ -5658,46 +5805,55 @@ nfsd4_lock(struct svc_rqst *rqstp, struct nfsd4_compound_state *cstate,
 	if (!locks_in_grace(net) && lock->lk_reclaim)
 		goto out;
 
-	file_lock = locks_alloc_lock();
-	if (!file_lock) {
-		dprintk("NFSD: %s: unable to allocate lock!\n", __func__);
-		status = nfserr_jukebox;
-		goto out;
-	}
-
 	fp = lock_stp->st_stid.sc_file;
 	switch (lock->lk_type) {
-		case NFS4_READ_LT:
 		case NFS4_READW_LT:
+			if (nfsd4_has_session(cstate))
+				fl_flags |= FL_SLEEP;
+			/* Fallthrough */
+		case NFS4_READ_LT:
 			spin_lock(&fp->fi_lock);
 			filp = find_readable_file_locked(fp);
 			if (filp)
 				get_lock_access(lock_stp, NFS4_SHARE_ACCESS_READ);
 			spin_unlock(&fp->fi_lock);
-			file_lock->fl_type = F_RDLCK;
+			fl_type = F_RDLCK;
 			break;
-		case NFS4_WRITE_LT:
 		case NFS4_WRITEW_LT:
+			if (nfsd4_has_session(cstate))
+				fl_flags |= FL_SLEEP;
+			/* Fallthrough */
+		case NFS4_WRITE_LT:
 			spin_lock(&fp->fi_lock);
 			filp = find_writeable_file_locked(fp);
 			if (filp)
 				get_lock_access(lock_stp, NFS4_SHARE_ACCESS_WRITE);
 			spin_unlock(&fp->fi_lock);
-			file_lock->fl_type = F_WRLCK;
+			fl_type = F_WRLCK;
 			break;
 		default:
 			status = nfserr_inval;
 		goto out;
 	}
+
 	if (!filp) {
 		status = nfserr_openmode;
 		goto out;
 	}
 
+	nbl = find_or_allocate_block(lock_sop, &fp->fi_fhandle, nn);
+	if (!nbl) {
+		dprintk("NFSD: %s: unable to allocate block!\n", __func__);
+		status = nfserr_jukebox;
+		goto out;
+	}
+
+	file_lock = &nbl->nbl_lock;
+	file_lock->fl_type = fl_type;
 	file_lock->fl_owner = (fl_owner_t)lockowner(nfs4_get_stateowner(&lock_sop->lo_owner));
 	file_lock->fl_pid = current->tgid;
 	file_lock->fl_file = filp;
-	file_lock->fl_flags = FL_POSIX;
+	file_lock->fl_flags = fl_flags;
 	file_lock->fl_lmops = &nfsd_posix_mng_ops;
 	file_lock->fl_start = lock->lk_offset;
 	file_lock->fl_end = last_byte_offset(lock->lk_offset, lock->lk_length);
@@ -5710,18 +5866,29 @@ nfsd4_lock(struct svc_rqst *rqstp, struct nfsd4_compound_state *cstate,
 		goto out;
 	}
 
+	if (fl_flags & FL_SLEEP) {
+		nbl->nbl_time = jiffies;
+		spin_lock(&nn->client_lock);
+		list_add_tail(&nbl->nbl_list, &lock_sop->lo_blocked);
+		list_add_tail(&nbl->nbl_lru, &nn->blocked_locks_lru);
+		spin_unlock(&nn->client_lock);
+	}
+
 	err = vfs_lock_file(filp, F_SETLK, file_lock, conflock);
-	switch (-err) {
+	switch (err) {
 	case 0: /* success! */
 		nfs4_inc_and_copy_stateid(&lock->lk_resp_stateid, &lock_stp->st_stid);
 		status = 0;
 		break;
-	case (EAGAIN):		/* conflock holds conflicting lock */
+	case FILE_LOCK_DEFERRED:
+		nbl = NULL;
+		/* Fallthrough */
+	case -EAGAIN:		/* conflock holds conflicting lock */
 		status = nfserr_denied;
 		dprintk("NFSD: nfsd4_lock: conflicting lock found!\n");
 		nfs4_set_lock_denied(conflock, &lock->lk_denied);
 		break;
-	case (EDEADLK):
+	case -EDEADLK:
 		status = nfserr_deadlock;
 		break;
 	default:
@@ -5730,6 +5897,16 @@ nfsd4_lock(struct svc_rqst *rqstp, struct nfsd4_compound_state *cstate,
 		break;
 	}
 out:
+	if (nbl) {
+		/* dequeue it if we queued it before */
+		if (fl_flags & FL_SLEEP) {
+			spin_lock(&nn->client_lock);
+			list_del_init(&nbl->nbl_list);
+			list_del_init(&nbl->nbl_lru);
+			spin_unlock(&nn->client_lock);
+		}
+		free_blocked_lock(nbl);
+	}
 	if (filp)
 		fput(filp);
 	if (lock_stp) {
@@ -5753,8 +5930,6 @@ out:
 	if (open_stp)
 		nfs4_put_stid(&open_stp->st_stid);
 	nfsd4_bump_seqid(cstate, status);
-	if (file_lock)
-		locks_free_lock(file_lock);
 	if (conflock)
 		locks_free_lock(conflock);
 	return status;
@@ -6768,6 +6943,7 @@ static int nfs4_state_create_net(struct net *net)
 	INIT_LIST_HEAD(&nn->client_lru);
 	INIT_LIST_HEAD(&nn->close_lru);
 	INIT_LIST_HEAD(&nn->del_recall_lru);
+	INIT_LIST_HEAD(&nn->blocked_locks_lru);
 	spin_lock_init(&nn->client_lock);
 
 	INIT_DELAYED_WORK(&nn->laundromat_work, laundromat_main);
@@ -6865,6 +7041,7 @@ nfs4_state_shutdown_net(struct net *net)
 	struct nfs4_delegation *dp = NULL;
 	struct list_head *pos, *next, reaplist;
 	struct nfsd_net *nn = net_generic(net, nfsd_net_id);
+	struct nfsd4_blocked_lock *nbl;
 
 	cancel_delayed_work_sync(&nn->laundromat_work);
 	locks_end_grace(&nn->nfsd4_manager);
@@ -6885,6 +7062,24 @@ nfs4_state_shutdown_net(struct net *net)
 		nfs4_put_stid(&dp->dl_stid);
 	}
 
+	BUG_ON(!list_empty(&reaplist));
+	spin_lock(&nn->client_lock);
+	while (!list_empty(&nn->blocked_locks_lru)) {
+		nbl = list_first_entry(&nn->blocked_locks_lru,
+					struct nfsd4_blocked_lock, nbl_lru);
+		list_move(&nbl->nbl_lru, &reaplist);
+		list_del_init(&nbl->nbl_list);
+	}
+	spin_unlock(&nn->client_lock);
+
+	while (!list_empty(&reaplist)) {
+		nbl = list_first_entry(&nn->blocked_locks_lru,
+					struct nfsd4_blocked_lock, nbl_lru);
+		list_del_init(&nbl->nbl_lru);
+		posix_unblock_lock(&nbl->nbl_lock);
+		free_blocked_lock(nbl);
+	}
+
 	nfsd4_client_tracking_exit(net);
 	nfs4_state_destroy_net(net);
 }
diff --git a/fs/nfsd/nfs4xdr.c b/fs/nfsd/nfs4xdr.c
index 0aa0236a1429..c2d2895a1ec1 100644
--- a/fs/nfsd/nfs4xdr.c
+++ b/fs/nfsd/nfs4xdr.c
@@ -1694,6 +1694,30 @@ nfsd4_decode_clone(struct nfsd4_compoundargs *argp, struct nfsd4_clone *clone)
 }
 
 static __be32
+nfsd4_decode_copy(struct nfsd4_compoundargs *argp, struct nfsd4_copy *copy)
+{
+	DECODE_HEAD;
+	unsigned int tmp;
+
+	status = nfsd4_decode_stateid(argp, &copy->cp_src_stateid);
+	if (status)
+		return status;
+	status = nfsd4_decode_stateid(argp, &copy->cp_dst_stateid);
+	if (status)
+		return status;
+
+	READ_BUF(8 + 8 + 8 + 4 + 4 + 4);
+	p = xdr_decode_hyper(p, &copy->cp_src_pos);
+	p = xdr_decode_hyper(p, &copy->cp_dst_pos);
+	p = xdr_decode_hyper(p, &copy->cp_count);
+	copy->cp_consecutive = be32_to_cpup(p++);
+	copy->cp_synchronous = be32_to_cpup(p++);
+	tmp = be32_to_cpup(p); /* Source server list not supported */
+
+	DECODE_TAIL;
+}
+
+static __be32
 nfsd4_decode_seek(struct nfsd4_compoundargs *argp, struct nfsd4_seek *seek)
 {
 	DECODE_HEAD;
@@ -1793,7 +1817,7 @@ static nfsd4_dec nfsd4_dec_ops[] = {
 
 	/* new operations for NFSv4.2 */
 	[OP_ALLOCATE]		= (nfsd4_dec)nfsd4_decode_fallocate,
-	[OP_COPY]		= (nfsd4_dec)nfsd4_decode_notsupp,
+	[OP_COPY]		= (nfsd4_dec)nfsd4_decode_copy,
 	[OP_COPY_NOTIFY]	= (nfsd4_dec)nfsd4_decode_notsupp,
 	[OP_DEALLOCATE]		= (nfsd4_dec)nfsd4_decode_fallocate,
 	[OP_IO_ADVISE]		= (nfsd4_dec)nfsd4_decode_notsupp,
@@ -4062,7 +4086,7 @@ nfsd4_encode_getdeviceinfo(struct nfsd4_compoundres *resp, __be32 nfserr,
 	u32 starting_len = xdr->buf->len, needed_len;
 	__be32 *p;
 
-	dprintk("%s: err %d\n", __func__, nfserr);
+	dprintk("%s: err %d\n", __func__, be32_to_cpu(nfserr));
 	if (nfserr)
 		goto out;
 
@@ -4202,6 +4226,41 @@ nfsd4_encode_layoutreturn(struct nfsd4_compoundres *resp, __be32 nfserr,
 #endif /* CONFIG_NFSD_PNFS */
 
 static __be32
+nfsd42_encode_write_res(struct nfsd4_compoundres *resp, struct nfsd42_write_res *write)
+{
+	__be32 *p;
+
+	p = xdr_reserve_space(&resp->xdr, 4 + 8 + 4 + NFS4_VERIFIER_SIZE);
+	if (!p)
+		return nfserr_resource;
+
+	*p++ = cpu_to_be32(0);
+	p = xdr_encode_hyper(p, write->wr_bytes_written);
+	*p++ = cpu_to_be32(write->wr_stable_how);
+	p = xdr_encode_opaque_fixed(p, write->wr_verifier.data,
+				    NFS4_VERIFIER_SIZE);
+	return nfs_ok;
+}
+
+static __be32
+nfsd4_encode_copy(struct nfsd4_compoundres *resp, __be32 nfserr,
+		  struct nfsd4_copy *copy)
+{
+	__be32 *p;
+
+	if (!nfserr) {
+		nfserr = nfsd42_encode_write_res(resp, &copy->cp_res);
+		if (nfserr)
+			return nfserr;
+
+		p = xdr_reserve_space(&resp->xdr, 4 + 4);
+		*p++ = cpu_to_be32(copy->cp_consecutive);
+		*p++ = cpu_to_be32(copy->cp_synchronous);
+	}
+	return nfserr;
+}
+
+static __be32
 nfsd4_encode_seek(struct nfsd4_compoundres *resp, __be32 nfserr,
 		  struct nfsd4_seek *seek)
 {
@@ -4300,7 +4359,7 @@ static nfsd4_enc nfsd4_enc_ops[] = {
 
 	/* NFSv4.2 operations */
 	[OP_ALLOCATE]		= (nfsd4_enc)nfsd4_encode_noop,
-	[OP_COPY]		= (nfsd4_enc)nfsd4_encode_noop,
+	[OP_COPY]		= (nfsd4_enc)nfsd4_encode_copy,
 	[OP_COPY_NOTIFY]	= (nfsd4_enc)nfsd4_encode_noop,
 	[OP_DEALLOCATE]		= (nfsd4_enc)nfsd4_encode_noop,
 	[OP_IO_ADVISE]		= (nfsd4_enc)nfsd4_encode_noop,
diff --git a/fs/nfsd/nfsctl.c b/fs/nfsd/nfsctl.c
index 65ad0165a94f..36b2af931e06 100644
--- a/fs/nfsd/nfsctl.c
+++ b/fs/nfsd/nfsctl.c
@@ -1216,6 +1216,8 @@ static __net_init int nfsd_init_net(struct net *net)
 		goto out_idmap_error;
 	nn->nfsd4_lease = 90;	/* default lease time */
 	nn->nfsd4_grace = 90;
+	nn->clverifier_counter = prandom_u32();
+	nn->clientid_counter = prandom_u32();
 	return 0;
 
 out_idmap_error:
diff --git a/fs/nfsd/nfsproc.c b/fs/nfsd/nfsproc.c
index 08188743db53..010aff5c5a79 100644
--- a/fs/nfsd/nfsproc.c
+++ b/fs/nfsd/nfsproc.c
@@ -789,6 +789,7 @@ nfserrno (int errno)
 		{ nfserr_toosmall, -ETOOSMALL },
 		{ nfserr_serverfault, -ESERVERFAULT },
 		{ nfserr_serverfault, -ENFILE },
+		{ nfserr_io, -EUCLEAN },
 	};
 	int	i;
 
@@ -796,7 +797,7 @@ nfserrno (int errno)
 		if (nfs_errtbl[i].syserr == errno)
 			return nfs_errtbl[i].nfserr;
 	}
-	WARN(1, "nfsd: non-standard errno: %d\n", errno);
+	WARN_ONCE(1, "nfsd: non-standard errno: %d\n", errno);
 	return nfserr_io;
 }
 
diff --git a/fs/nfsd/nfssvc.c b/fs/nfsd/nfssvc.c
index 45007acaf364..a2b65fc56dd6 100644
--- a/fs/nfsd/nfssvc.c
+++ b/fs/nfsd/nfssvc.c
@@ -366,14 +366,21 @@ static struct notifier_block nfsd_inet6addr_notifier = {
 };
 #endif
 
+/* Only used under nfsd_mutex, so this atomic may be overkill: */
+static atomic_t nfsd_notifier_refcount = ATOMIC_INIT(0);
+
 static void nfsd_last_thread(struct svc_serv *serv, struct net *net)
 {
 	struct nfsd_net *nn = net_generic(net, nfsd_net_id);
 
-	unregister_inetaddr_notifier(&nfsd_inetaddr_notifier);
+	/* check if the notifier still has clients */
+	if (atomic_dec_return(&nfsd_notifier_refcount) == 0) {
+		unregister_inetaddr_notifier(&nfsd_inetaddr_notifier);
 #if IS_ENABLED(CONFIG_IPV6)
-	unregister_inet6addr_notifier(&nfsd_inet6addr_notifier);
+		unregister_inet6addr_notifier(&nfsd_inet6addr_notifier);
 #endif
+	}
+
 	/*
 	 * write_ports can create the server without actually starting
 	 * any threads--if we get shut down before any threads are
@@ -488,10 +495,13 @@ int nfsd_create_serv(struct net *net)
 	}
 
 	set_max_drc();
-	register_inetaddr_notifier(&nfsd_inetaddr_notifier);
+	/* check if the notifier is already set */
+	if (atomic_inc_return(&nfsd_notifier_refcount) == 1) {
+		register_inetaddr_notifier(&nfsd_inetaddr_notifier);
 #if IS_ENABLED(CONFIG_IPV6)
-	register_inet6addr_notifier(&nfsd_inet6addr_notifier);
+		register_inet6addr_notifier(&nfsd_inet6addr_notifier);
 #endif
+	}
 	do_gettimeofday(&nn->nfssvc_boot);		/* record boot time */
 	return 0;
 }
diff --git a/fs/nfsd/pnfs.h b/fs/nfsd/pnfs.h
index 0c2a716e8741..d27a5aa60022 100644
--- a/fs/nfsd/pnfs.h
+++ b/fs/nfsd/pnfs.h
@@ -19,6 +19,7 @@ struct nfsd4_deviceid_map {
 
 struct nfsd4_layout_ops {
 	u32		notify_types;
+	bool		disable_recalls;
 
 	__be32 (*proc_getdeviceinfo)(struct super_block *sb,
 			struct svc_rqst *rqstp,
diff --git a/fs/nfsd/state.h b/fs/nfsd/state.h
index b95adf9a1595..c9399366f9df 100644
--- a/fs/nfsd/state.h
+++ b/fs/nfsd/state.h
@@ -63,7 +63,6 @@ typedef struct {
 
 struct nfsd4_callback {
 	struct nfs4_client *cb_clp;
-	u32 cb_minorversion;
 	struct rpc_message cb_msg;
 	const struct nfsd4_callback_ops *cb_ops;
 	struct work_struct cb_work;
@@ -441,11 +440,11 @@ struct nfs4_openowner {
 /*
  * Represents a generic "lockowner". Similar to an openowner. References to it
  * are held by the lock stateids that are created on its behalf. This object is
- * a superset of the nfs4_stateowner struct (or would be if it needed any extra
- * fields).
+ * a superset of the nfs4_stateowner struct.
  */
 struct nfs4_lockowner {
-	struct nfs4_stateowner	lo_owner; /* must be first element */
+	struct nfs4_stateowner	lo_owner;	/* must be first element */
+	struct list_head	lo_blocked;	/* blocked file_locks */
 };
 
 static inline struct nfs4_openowner * openowner(struct nfs4_stateowner *so)
@@ -572,6 +571,7 @@ enum nfsd4_cb_op {
 	NFSPROC4_CLNT_CB_RECALL,
 	NFSPROC4_CLNT_CB_LAYOUT,
 	NFSPROC4_CLNT_CB_SEQUENCE,
+	NFSPROC4_CLNT_CB_NOTIFY_LOCK,
 };
 
 /* Returns true iff a is later than b: */
@@ -580,6 +580,20 @@ static inline bool nfsd4_stateid_generation_after(stateid_t *a, stateid_t *b)
 	return (s32)(a->si_generation - b->si_generation) > 0;
 }
 
+/*
+ * When a client tries to get a lock on a file, we set one of these objects
+ * on the blocking lock. When the lock becomes free, we can then issue a
+ * CB_NOTIFY_LOCK to the server.
+ */
+struct nfsd4_blocked_lock {
+	struct list_head	nbl_list;
+	struct list_head	nbl_lru;
+	unsigned long		nbl_time;
+	struct file_lock	nbl_lock;
+	struct knfsd_fh		nbl_fh;
+	struct nfsd4_callback	nbl_cb;
+};
+
 struct nfsd4_compound_state;
 struct nfsd_net;
 
diff --git a/fs/nfsd/vfs.c b/fs/nfsd/vfs.c
index ff476e654b8f..8ca642fe9b21 100644
--- a/fs/nfsd/vfs.c
+++ b/fs/nfsd/vfs.c
@@ -513,6 +513,22 @@ __be32 nfsd4_clone_file_range(struct file *src, u64 src_pos, struct file *dst,
 			count));
 }
 
+ssize_t nfsd_copy_file_range(struct file *src, u64 src_pos, struct file *dst,
+			     u64 dst_pos, u64 count)
+{
+
+	/*
+	 * Limit copy to 4MB to prevent indefinitely blocking an nfsd
+	 * thread and client rpc slot.  The choice of 4MB is somewhat
+	 * arbitrary.  We might instead base this on r/wsize, or make it
+	 * tunable, or use a time instead of a byte limit, or implement
+	 * asynchronous copy.  In theory a client could also recognize a
+	 * limit like this and pipeline multiple COPY requests.
+	 */
+	count = min_t(u64, count, 1 << 22);
+	return vfs_copy_file_range(src, src_pos, dst, dst_pos, count, 0);
+}
+
 __be32 nfsd4_vfs_fallocate(struct svc_rqst *rqstp, struct svc_fh *fhp,
 			   struct file *file, loff_t offset, loff_t len,
 			   int flags)
diff --git a/fs/nfsd/vfs.h b/fs/nfsd/vfs.h
index 3cbb1b33777b..0bf9e7bf5800 100644
--- a/fs/nfsd/vfs.h
+++ b/fs/nfsd/vfs.h
@@ -96,6 +96,8 @@ __be32		nfsd_symlink(struct svc_rqst *, struct svc_fh *,
 				struct svc_fh *res);
 __be32		nfsd_link(struct svc_rqst *, struct svc_fh *,
 				char *, int, struct svc_fh *);
+ssize_t		nfsd_copy_file_range(struct file *, u64,
+				     struct file *, u64, u64);
 __be32		nfsd_rename(struct svc_rqst *,
 				struct svc_fh *, char *, int,
 				struct svc_fh *, char *, int);
diff --git a/fs/nfsd/xdr4.h b/fs/nfsd/xdr4.h
index beea0c5edc51..8fda4abdf3b1 100644
--- a/fs/nfsd/xdr4.h
+++ b/fs/nfsd/xdr4.h
@@ -503,6 +503,28 @@ struct nfsd4_clone {
 	u64		cl_count;
 };
 
+struct nfsd42_write_res {
+	u64			wr_bytes_written;
+	u32			wr_stable_how;
+	nfs4_verifier		wr_verifier;
+};
+
+struct nfsd4_copy {
+	/* request */
+	stateid_t	cp_src_stateid;
+	stateid_t	cp_dst_stateid;
+	u64		cp_src_pos;
+	u64		cp_dst_pos;
+	u64		cp_count;
+
+	/* both */
+	bool		cp_consecutive;
+	bool		cp_synchronous;
+
+	/* response */
+	struct nfsd42_write_res	cp_res;
+};
+
 struct nfsd4_seek {
 	/* request */
 	stateid_t	seek_stateid;
@@ -568,6 +590,7 @@ struct nfsd4_op {
 		struct nfsd4_fallocate		allocate;
 		struct nfsd4_fallocate		deallocate;
 		struct nfsd4_clone		clone;
+		struct nfsd4_copy		copy;
 		struct nfsd4_seek		seek;
 	} u;
 	struct nfs4_replay *			replay;
diff --git a/fs/nfsd/xdr4cb.h b/fs/nfsd/xdr4cb.h
index c47f6fdb111a..49b719dfef95 100644
--- a/fs/nfsd/xdr4cb.h
+++ b/fs/nfsd/xdr4cb.h
@@ -28,3 +28,12 @@
 #define NFS4_dec_cb_layout_sz		(cb_compound_dec_hdr_sz  +      \
 					cb_sequence_dec_sz +            \
 					op_dec_sz)
+
+#define NFS4_enc_cb_notify_lock_sz	(cb_compound_enc_hdr_sz +        \
+					cb_sequence_enc_sz +             \
+					2 + 1 +				 \
+					XDR_QUADLEN(NFS4_OPAQUE_LIMIT) + \
+					enc_nfs4_fh_sz)
+#define NFS4_dec_cb_notify_lock_sz	(cb_compound_dec_hdr_sz  +      \
+					cb_sequence_dec_sz +            \
+					op_dec_sz)
diff --git a/include/linux/exportfs.h b/include/linux/exportfs.h
index b03c0625fa6e..5ab958cdc50b 100644
--- a/include/linux/exportfs.h
+++ b/include/linux/exportfs.h
@@ -157,12 +157,13 @@ struct fid {
  *    @fh_to_dentry is given a &struct super_block (@sb) and a file handle
  *    fragment (@fh, @fh_len). It should return a &struct dentry which refers
  *    to the same file that the file handle fragment refers to.  If it cannot,
- *    it should return a %NULL pointer if the file was found but no acceptable
- *    &dentries were available, or an %ERR_PTR error code indicating why it
- *    couldn't be found (e.g. %ENOENT or %ENOMEM).  Any suitable dentry can be
- *    returned including, if necessary, a new dentry created with d_alloc_root.
- *    The caller can then find any other extant dentries by following the
- *    d_alias links.
+ *    it should return a %NULL pointer if the file cannot be found, or an
+ *    %ERR_PTR error code of %ENOMEM if a memory allocation failure occurred.
+ *    Any other error code is treated like %NULL, and will cause an %ESTALE error
+ *    for callers of exportfs_decode_fh().
+ *    Any suitable dentry can be returned including, if necessary, a new dentry
+ *    created with d_alloc_root.  The caller can then find any other extant
+ *    dentries by following the d_alias links.
  *
  * fh_to_parent:
  *    Same as @fh_to_dentry, except that it returns a pointer to the parent
diff --git a/include/linux/sunrpc/rpc_rdma.h b/include/linux/sunrpc/rpc_rdma.h
index 3b1ff38f0c37..a7da6bf56610 100644
--- a/include/linux/sunrpc/rpc_rdma.h
+++ b/include/linux/sunrpc/rpc_rdma.h
@@ -41,6 +41,7 @@
 #define _LINUX_SUNRPC_RPC_RDMA_H
 
 #include <linux/types.h>
+#include <linux/bitops.h>
 
 #define RPCRDMA_VERSION		1
 #define rpcrdma_version		cpu_to_be32(RPCRDMA_VERSION)
@@ -129,4 +130,38 @@ enum rpcrdma_proc {
 #define rdma_done	cpu_to_be32(RDMA_DONE)
 #define rdma_error	cpu_to_be32(RDMA_ERROR)
 
+/*
+ * Private extension to RPC-over-RDMA Version One.
+ * Message passed during RDMA-CM connection set-up.
+ *
+ * Add new fields at the end, and don't permute existing
+ * fields.
+ */
+struct rpcrdma_connect_private {
+	__be32			cp_magic;
+	u8			cp_version;
+	u8			cp_flags;
+	u8			cp_send_size;
+	u8			cp_recv_size;
+} __packed;
+
+#define rpcrdma_cmp_magic	__cpu_to_be32(0xf6ab0e18)
+
+enum {
+	RPCRDMA_CMP_VERSION		= 1,
+	RPCRDMA_CMP_F_SND_W_INV_OK	= BIT(0),
+};
+
+static inline u8
+rpcrdma_encode_buffer_size(unsigned int size)
+{
+	return (size >> 10) - 1;
+}
+
+static inline unsigned int
+rpcrdma_decode_buffer_size(u8 val)
+{
+	return ((unsigned int)val + 1) << 10;
+}
+
 #endif				/* _LINUX_SUNRPC_RPC_RDMA_H */
diff --git a/include/linux/sunrpc/svc_rdma.h b/include/linux/sunrpc/svc_rdma.h
index d6917b896d3a..cc3ae16eac68 100644
--- a/include/linux/sunrpc/svc_rdma.h
+++ b/include/linux/sunrpc/svc_rdma.h
@@ -86,6 +86,7 @@ struct svc_rdma_op_ctxt {
 	unsigned long flags;
 	enum dma_data_direction direction;
 	int count;
+	unsigned int mapped_sges;
 	struct ib_sge sge[RPCSVC_MAXPAGES];
 	struct page *pages[RPCSVC_MAXPAGES];
 };
@@ -136,6 +137,7 @@ struct svcxprt_rdma {
 	int		     sc_ord;		/* RDMA read limit */
 	int                  sc_max_sge;
 	int                  sc_max_sge_rd;	/* max sge for read target */
+	bool		     sc_snd_w_inv;	/* OK to use Send With Invalidate */
 
 	atomic_t             sc_sq_count;	/* Number of SQ WR on queue */
 	unsigned int	     sc_sq_depth;	/* Depth of SQ */
@@ -193,6 +195,14 @@ struct svcxprt_rdma {
 
 #define RPCSVC_MAXPAYLOAD_RDMA	RPCSVC_MAXPAYLOAD
 
+/* Track DMA maps for this transport and context */
+static inline void svc_rdma_count_mappings(struct svcxprt_rdma *rdma,
+					   struct svc_rdma_op_ctxt *ctxt)
+{
+	ctxt->mapped_sges++;
+	atomic_inc(&rdma->sc_dma_used);
+}
+
 /* svc_rdma_backchannel.c */
 extern int svc_rdma_handle_bc_reply(struct rpc_xprt *xprt,
 				    struct rpcrdma_msg *rmsgp,
diff --git a/include/uapi/linux/nfs4.h b/include/uapi/linux/nfs4.h
index 2b871e0858d9..4ae62796bfde 100644
--- a/include/uapi/linux/nfs4.h
+++ b/include/uapi/linux/nfs4.h
@@ -39,8 +39,9 @@
 #define NFS4_FH_VOL_MIGRATION		0x0004
 #define NFS4_FH_VOL_RENAME		0x0008
 
-#define NFS4_OPEN_RESULT_CONFIRM 0x0002
-#define NFS4_OPEN_RESULT_LOCKTYPE_POSIX 0x0004
+#define NFS4_OPEN_RESULT_CONFIRM		0x0002
+#define NFS4_OPEN_RESULT_LOCKTYPE_POSIX		0x0004
+#define NFS4_OPEN_RESULT_MAY_NOTIFY_LOCK	0x0020
 
 #define NFS4_SHARE_ACCESS_MASK	0x000F
 #define NFS4_SHARE_ACCESS_READ	0x0001
diff --git a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
index a2a7519b0f23..cd0c5581498c 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
@@ -129,7 +129,7 @@ static int svc_rdma_bc_sendto(struct svcxprt_rdma *rdma,
 		ret = -EIO;
 		goto out_unmap;
 	}
-	atomic_inc(&rdma->sc_dma_used);
+	svc_rdma_count_mappings(rdma, ctxt);
 
 	memset(&send_wr, 0, sizeof(send_wr));
 	ctxt->cqe.done = svc_rdma_wc_send;
diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
index 2c25606f2561..ad1df979b3f0 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
@@ -159,7 +159,7 @@ int rdma_read_chunk_lcl(struct svcxprt_rdma *xprt,
 					   ctxt->sge[pno].addr);
 		if (ret)
 			goto err;
-		atomic_inc(&xprt->sc_dma_used);
+		svc_rdma_count_mappings(xprt, ctxt);
 
 		ctxt->sge[pno].lkey = xprt->sc_pd->local_dma_lkey;
 		ctxt->sge[pno].length = len;
diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
index 54d533300620..f5a91edcd233 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_sendto.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
@@ -225,6 +225,48 @@ svc_rdma_get_reply_array(struct rpcrdma_msg *rmsgp,
 	return rp_ary;
 }
 
+/* RPC-over-RDMA Version One private extension: Remote Invalidation.
+ * Responder's choice: requester signals it can handle Send With
+ * Invalidate, and responder chooses one rkey to invalidate.
+ *
+ * Find a candidate rkey to invalidate when sending a reply.  Picks the
+ * first rkey it finds in the chunks lists.
+ *
+ * Returns zero if RPC's chunk lists are empty.
+ */
+static u32 svc_rdma_get_inv_rkey(struct rpcrdma_msg *rdma_argp,
+				 struct rpcrdma_write_array *wr_ary,
+				 struct rpcrdma_write_array *rp_ary)
+{
+	struct rpcrdma_read_chunk *rd_ary;
+	struct rpcrdma_segment *arg_ch;
+	u32 inv_rkey;
+
+	inv_rkey = 0;
+
+	rd_ary = svc_rdma_get_read_chunk(rdma_argp);
+	if (rd_ary) {
+		inv_rkey = be32_to_cpu(rd_ary->rc_target.rs_handle);
+		goto out;
+	}
+
+	if (wr_ary && be32_to_cpu(wr_ary->wc_nchunks)) {
+		arg_ch = &wr_ary->wc_array[0].wc_target;
+		inv_rkey = be32_to_cpu(arg_ch->rs_handle);
+		goto out;
+	}
+
+	if (rp_ary && be32_to_cpu(rp_ary->wc_nchunks)) {
+		arg_ch = &rp_ary->wc_array[0].wc_target;
+		inv_rkey = be32_to_cpu(arg_ch->rs_handle);
+		goto out;
+	}
+
+out:
+	dprintk("svcrdma: Send With Invalidate rkey=%08x\n", inv_rkey);
+	return inv_rkey;
+}
+
 /* Assumptions:
  * - The specified write_len can be represented in sc_max_sge * PAGE_SIZE
  */
@@ -280,7 +322,7 @@ static int send_write(struct svcxprt_rdma *xprt, struct svc_rqst *rqstp,
 		if (ib_dma_mapping_error(xprt->sc_cm_id->device,
 					 sge[sge_no].addr))
 			goto err;
-		atomic_inc(&xprt->sc_dma_used);
+		svc_rdma_count_mappings(xprt, ctxt);
 		sge[sge_no].lkey = xprt->sc_pd->local_dma_lkey;
 		ctxt->count++;
 		sge_off = 0;
@@ -464,7 +506,8 @@ static int send_reply(struct svcxprt_rdma *rdma,
 		      struct page *page,
 		      struct rpcrdma_msg *rdma_resp,
 		      struct svc_rdma_req_map *vec,
-		      int byte_count)
+		      int byte_count,
+		      u32 inv_rkey)
 {
 	struct svc_rdma_op_ctxt *ctxt;
 	struct ib_send_wr send_wr;
@@ -489,7 +532,7 @@ static int send_reply(struct svcxprt_rdma *rdma,
 			    ctxt->sge[0].length, DMA_TO_DEVICE);
 	if (ib_dma_mapping_error(rdma->sc_cm_id->device, ctxt->sge[0].addr))
 		goto err;
-	atomic_inc(&rdma->sc_dma_used);
+	svc_rdma_count_mappings(rdma, ctxt);
 
 	ctxt->direction = DMA_TO_DEVICE;
 
@@ -505,7 +548,7 @@ static int send_reply(struct svcxprt_rdma *rdma,
 		if (ib_dma_mapping_error(rdma->sc_cm_id->device,
 					 ctxt->sge[sge_no].addr))
 			goto err;
-		atomic_inc(&rdma->sc_dma_used);
+		svc_rdma_count_mappings(rdma, ctxt);
 		ctxt->sge[sge_no].lkey = rdma->sc_pd->local_dma_lkey;
 		ctxt->sge[sge_no].length = sge_bytes;
 	}
@@ -523,23 +566,9 @@ static int send_reply(struct svcxprt_rdma *rdma,
 		ctxt->pages[page_no+1] = rqstp->rq_respages[page_no];
 		ctxt->count++;
 		rqstp->rq_respages[page_no] = NULL;
-		/*
-		 * If there are more pages than SGE, terminate SGE
-		 * list so that svc_rdma_unmap_dma doesn't attempt to
-		 * unmap garbage.
-		 */
-		if (page_no+1 >= sge_no)
-			ctxt->sge[page_no+1].length = 0;
 	}
 	rqstp->rq_next_page = rqstp->rq_respages + 1;
 
-	/* The loop above bumps sc_dma_used for each sge. The
-	 * xdr_buf.tail gets a separate sge, but resides in the
-	 * same page as xdr_buf.head. Don't count it twice.
-	 */
-	if (sge_no > ctxt->count)
-		atomic_dec(&rdma->sc_dma_used);
-
 	if (sge_no > rdma->sc_max_sge) {
 		pr_err("svcrdma: Too many sges (%d)\n", sge_no);
 		goto err;
@@ -549,7 +578,11 @@ static int send_reply(struct svcxprt_rdma *rdma,
 	send_wr.wr_cqe = &ctxt->cqe;
 	send_wr.sg_list = ctxt->sge;
 	send_wr.num_sge = sge_no;
-	send_wr.opcode = IB_WR_SEND;
+	if (inv_rkey) {
+		send_wr.opcode = IB_WR_SEND_WITH_INV;
+		send_wr.ex.invalidate_rkey = inv_rkey;
+	} else
+		send_wr.opcode = IB_WR_SEND;
 	send_wr.send_flags =  IB_SEND_SIGNALED;
 
 	ret = svc_rdma_send(rdma, &send_wr);
@@ -581,6 +614,7 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
 	int inline_bytes;
 	struct page *res_page;
 	struct svc_rdma_req_map *vec;
+	u32 inv_rkey;
 
 	dprintk("svcrdma: sending response for rqstp=%p\n", rqstp);
 
@@ -591,6 +625,10 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
 	wr_ary = svc_rdma_get_write_array(rdma_argp);
 	rp_ary = svc_rdma_get_reply_array(rdma_argp, wr_ary);
 
+	inv_rkey = 0;
+	if (rdma->sc_snd_w_inv)
+		inv_rkey = svc_rdma_get_inv_rkey(rdma_argp, wr_ary, rp_ary);
+
 	/* Build an req vec for the XDR */
 	vec = svc_rdma_get_req_map(rdma);
 	ret = svc_rdma_map_xdr(rdma, &rqstp->rq_res, vec, wr_ary != NULL);
@@ -633,9 +671,9 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
 		goto err1;
 
 	ret = send_reply(rdma, rqstp, res_page, rdma_resp, vec,
-			 inline_bytes);
+			 inline_bytes, inv_rkey);
 	if (ret < 0)
-		goto err1;
+		goto err0;
 
 	svc_rdma_put_req_map(rdma, vec);
 	dprintk("svcrdma: send_reply returns %d\n", ret);
@@ -692,7 +730,7 @@ void svc_rdma_send_error(struct svcxprt_rdma *xprt, struct rpcrdma_msg *rmsgp,
 		svc_rdma_put_context(ctxt, 1);
 		return;
 	}
-	atomic_inc(&xprt->sc_dma_used);
+	svc_rdma_count_mappings(xprt, ctxt);
 
 	/* Prepare SEND WR */
 	memset(&err_wr, 0, sizeof(err_wr));
diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c
index eb2857f52b05..6864fb967038 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_transport.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c
@@ -198,6 +198,7 @@ struct svc_rdma_op_ctxt *svc_rdma_get_context(struct svcxprt_rdma *xprt)
 
 out:
 	ctxt->count = 0;
+	ctxt->mapped_sges = 0;
 	ctxt->frmr = NULL;
 	return ctxt;
 
@@ -221,22 +222,27 @@ out_empty:
 void svc_rdma_unmap_dma(struct svc_rdma_op_ctxt *ctxt)
 {
 	struct svcxprt_rdma *xprt = ctxt->xprt;
-	int i;
-	for (i = 0; i < ctxt->count && ctxt->sge[i].length; i++) {
+	struct ib_device *device = xprt->sc_cm_id->device;
+	u32 lkey = xprt->sc_pd->local_dma_lkey;
+	unsigned int i, count;
+
+	for (count = 0, i = 0; i < ctxt->mapped_sges; i++) {
 		/*
 		 * Unmap the DMA addr in the SGE if the lkey matches
 		 * the local_dma_lkey, otherwise, ignore it since it is
 		 * an FRMR lkey and will be unmapped later when the
 		 * last WR that uses it completes.
 		 */
-		if (ctxt->sge[i].lkey == xprt->sc_pd->local_dma_lkey) {
-			atomic_dec(&xprt->sc_dma_used);
-			ib_dma_unmap_page(xprt->sc_cm_id->device,
+		if (ctxt->sge[i].lkey == lkey) {
+			count++;
+			ib_dma_unmap_page(device,
 					    ctxt->sge[i].addr,
 					    ctxt->sge[i].length,
 					    ctxt->direction);
 		}
 	}
+	ctxt->mapped_sges = 0;
+	atomic_sub(count, &xprt->sc_dma_used);
 }
 
 void svc_rdma_put_context(struct svc_rdma_op_ctxt *ctxt, int free_pages)
@@ -600,7 +606,7 @@ int svc_rdma_post_recv(struct svcxprt_rdma *xprt, gfp_t flags)
 				     DMA_FROM_DEVICE);
 		if (ib_dma_mapping_error(xprt->sc_cm_id->device, pa))
 			goto err_put_ctxt;
-		atomic_inc(&xprt->sc_dma_used);
+		svc_rdma_count_mappings(xprt, ctxt);
 		ctxt->sge[sge_no].addr = pa;
 		ctxt->sge[sge_no].length = PAGE_SIZE;
 		ctxt->sge[sge_no].lkey = xprt->sc_pd->local_dma_lkey;
@@ -642,6 +648,26 @@ int svc_rdma_repost_recv(struct svcxprt_rdma *xprt, gfp_t flags)
 	return ret;
 }
 
+static void
+svc_rdma_parse_connect_private(struct svcxprt_rdma *newxprt,
+			       struct rdma_conn_param *param)
+{
+	const struct rpcrdma_connect_private *pmsg = param->private_data;
+
+	if (pmsg &&
+	    pmsg->cp_magic == rpcrdma_cmp_magic &&
+	    pmsg->cp_version == RPCRDMA_CMP_VERSION) {
+		newxprt->sc_snd_w_inv = pmsg->cp_flags &
+					RPCRDMA_CMP_F_SND_W_INV_OK;
+
+		dprintk("svcrdma: client send_size %u, recv_size %u "
+			"remote inv %ssupported\n",
+			rpcrdma_decode_buffer_size(pmsg->cp_send_size),
+			rpcrdma_decode_buffer_size(pmsg->cp_recv_size),
+			newxprt->sc_snd_w_inv ? "" : "un");
+	}
+}
+
 /*
  * This function handles the CONNECT_REQUEST event on a listening
  * endpoint. It is passed the cma_id for the _new_ connection. The context in
@@ -653,7 +679,8 @@ int svc_rdma_repost_recv(struct svcxprt_rdma *xprt, gfp_t flags)
  * will call the recvfrom method on the listen xprt which will accept the new
  * connection.
  */
-static void handle_connect_req(struct rdma_cm_id *new_cma_id, size_t client_ird)
+static void handle_connect_req(struct rdma_cm_id *new_cma_id,
+			       struct rdma_conn_param *param)
 {
 	struct svcxprt_rdma *listen_xprt = new_cma_id->context;
 	struct svcxprt_rdma *newxprt;
@@ -669,9 +696,10 @@ static void handle_connect_req(struct rdma_cm_id *new_cma_id, size_t client_ird)
 	new_cma_id->context = newxprt;
 	dprintk("svcrdma: Creating newxprt=%p, cm_id=%p, listenxprt=%p\n",
 		newxprt, newxprt->sc_cm_id, listen_xprt);
+	svc_rdma_parse_connect_private(newxprt, param);
 
 	/* Save client advertised inbound read limit for use later in accept. */
-	newxprt->sc_ord = client_ird;
+	newxprt->sc_ord = param->initiator_depth;
 
 	/* Set the local and remote addresses in the transport */
 	sa = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.dst_addr;
@@ -706,8 +734,7 @@ static int rdma_listen_handler(struct rdma_cm_id *cma_id,
 		dprintk("svcrdma: Connect request on cma_id=%p, xprt = %p, "
 			"event = %s (%d)\n", cma_id, cma_id->context,
 			rdma_event_msg(event->event), event->event);
-		handle_connect_req(cma_id,
-				   event->param.conn.initiator_depth);
+		handle_connect_req(cma_id, &event->param.conn);
 		break;
 
 	case RDMA_CM_EVENT_ESTABLISHED:
@@ -941,6 +968,7 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
 	struct svcxprt_rdma *listen_rdma;
 	struct svcxprt_rdma *newxprt = NULL;
 	struct rdma_conn_param conn_param;
+	struct rpcrdma_connect_private pmsg;
 	struct ib_qp_init_attr qp_attr;
 	struct ib_device *dev;
 	unsigned int i;
@@ -1070,7 +1098,8 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
 			dev->attrs.max_fast_reg_page_list_len;
 		newxprt->sc_dev_caps |= SVCRDMA_DEVCAP_FAST_REG;
 		newxprt->sc_reader = rdma_read_chunk_frmr;
-	}
+	} else
+		newxprt->sc_snd_w_inv = false;
 
 	/*
 	 * Determine if a DMA MR is required and if so, what privs are required
@@ -1094,11 +1123,20 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
 	/* Swap out the handler */
 	newxprt->sc_cm_id->event_handler = rdma_cma_handler;
 
+	/* Construct RDMA-CM private message */
+	pmsg.cp_magic = rpcrdma_cmp_magic;
+	pmsg.cp_version = RPCRDMA_CMP_VERSION;
+	pmsg.cp_flags = 0;
+	pmsg.cp_send_size = pmsg.cp_recv_size =
+		rpcrdma_encode_buffer_size(newxprt->sc_max_req_size);
+
 	/* Accept Connection */
 	set_bit(RDMAXPRT_CONN_PENDING, &newxprt->sc_flags);
 	memset(&conn_param, 0, sizeof conn_param);
 	conn_param.responder_resources = 0;
 	conn_param.initiator_depth = newxprt->sc_ord;
+	conn_param.private_data = &pmsg;
+	conn_param.private_data_len = sizeof(pmsg);
 	ret = rdma_accept(newxprt->sc_cm_id, &conn_param);
 	if (ret) {
 		dprintk("svcrdma: failed to accept new connection, ret=%d\n",