summary refs log tree commit diff
path: root/net/ceph
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2014-10-15 06:46:01 +0200
committerLinus Torvalds <torvalds@linux-foundation.org>2014-10-15 06:46:01 +0200
commit6b0490816671b2f4126a99998c9bf3c8c0472de2 (patch)
tree016543455c2bdbe47b422fed6a3b4ffb991c97d6 /net/ceph
parentce9d7f7b45930ed16c512aabcfe651d44f1c8619 (diff)
parent0bc62284ee3f2a228c64902ed818b6ba8e04159b (diff)
downloadlinux-6b0490816671b2f4126a99998c9bf3c8c0472de2.tar.gz
Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client
Pull Ceph updates from Sage Weil:
 "There is the long-awaited discard support for RBD (Guangliang Zhao,
  Josh Durgin), a pile of RBD bug fixes that didn't belong in late -rc's
  (Ilya Dryomov, Li RongQing), a pile of fs/ceph bug fixes and
  performance and debugging improvements (Yan, Zheng, John Spray), and a
  smattering of cleanups (Chao Yu, Fabian Frederick, Joe Perches)"

* 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client: (40 commits)
  ceph: fix divide-by-zero in __validate_layout()
  rbd: rbd workqueues need a resque worker
  libceph: ceph-msgr workqueue needs a resque worker
  ceph: fix bool assignments
  libceph: separate multiple ops with commas in debugfs output
  libceph: sync osd op definitions in rados.h
  libceph: remove redundant declaration
  ceph: additional debugfs output
  ceph: export ceph_session_state_name function
  ceph: include the initial ACL in create/mkdir/mknod MDS requests
  ceph: use pagelist to present MDS request data
  libceph: reference counting pagelist
  ceph: fix llistxattr on symlink
  ceph: send client metadata to MDS
  ceph: remove redundant code for max file size verification
  ceph: remove redundant io_iter_advance()
  ceph: move ceph_find_inode() outside the s_mutex
  ceph: request xattrs if xattr_version is zero
  rbd: set the remaining discard properties to enable support
  rbd: use helpers to handle discard for layered images correctly
  ...
Diffstat (limited to 'net/ceph')
-rw-r--r--net/ceph/Kconfig1
-rw-r--r--net/ceph/ceph_common.c15
-rw-r--r--net/ceph/ceph_strings.c75
-rw-r--r--net/ceph/debugfs.c3
-rw-r--r--net/ceph/messenger.c28
-rw-r--r--net/ceph/mon_client.c8
-rw-r--r--net/ceph/osd_client.c192
-rw-r--r--net/ceph/osdmap.c52
-rw-r--r--net/ceph/pagelist.c7
9 files changed, 153 insertions, 228 deletions
diff --git a/net/ceph/Kconfig b/net/ceph/Kconfig
index e50cc69ae8ca..f8cceb99e732 100644
--- a/net/ceph/Kconfig
+++ b/net/ceph/Kconfig
@@ -3,6 +3,7 @@ config CEPH_LIB
 	depends on INET
 	select LIBCRC32C
 	select CRYPTO_AES
+	select CRYPTO_CBC
 	select CRYPTO
 	select KEYS
 	default n
diff --git a/net/ceph/ceph_common.c b/net/ceph/ceph_common.c
index 1675021d8c12..58fbfe134f93 100644
--- a/net/ceph/ceph_common.c
+++ b/net/ceph/ceph_common.c
@@ -293,17 +293,20 @@ static int get_secret(struct ceph_crypto_key *dst, const char *name) {
 		key_err = PTR_ERR(ukey);
 		switch (key_err) {
 		case -ENOKEY:
-			pr_warning("ceph: Mount failed due to key not found: %s\n", name);
+			pr_warn("ceph: Mount failed due to key not found: %s\n",
+				name);
 			break;
 		case -EKEYEXPIRED:
-			pr_warning("ceph: Mount failed due to expired key: %s\n", name);
+			pr_warn("ceph: Mount failed due to expired key: %s\n",
+				name);
 			break;
 		case -EKEYREVOKED:
-			pr_warning("ceph: Mount failed due to revoked key: %s\n", name);
+			pr_warn("ceph: Mount failed due to revoked key: %s\n",
+				name);
 			break;
 		default:
-			pr_warning("ceph: Mount failed due to unknown key error"
-			       " %d: %s\n", key_err, name);
+			pr_warn("ceph: Mount failed due to unknown key error %d: %s\n",
+				key_err, name);
 		}
 		err = -EPERM;
 		goto out;
@@ -433,7 +436,7 @@ ceph_parse_options(char *options, const char *dev_name,
 
 			/* misc */
 		case Opt_osdtimeout:
-			pr_warning("ignoring deprecated osdtimeout option\n");
+			pr_warn("ignoring deprecated osdtimeout option\n");
 			break;
 		case Opt_osdkeepalivetimeout:
 			opt->osd_keepalive_timeout = intval;
diff --git a/net/ceph/ceph_strings.c b/net/ceph/ceph_strings.c
index 1348df96fe15..30560202f57b 100644
--- a/net/ceph/ceph_strings.c
+++ b/net/ceph/ceph_strings.c
@@ -19,77 +19,12 @@ const char *ceph_entity_type_name(int type)
 const char *ceph_osd_op_name(int op)
 {
 	switch (op) {
-	case CEPH_OSD_OP_READ: return "read";
-	case CEPH_OSD_OP_STAT: return "stat";
-	case CEPH_OSD_OP_MAPEXT: return "mapext";
-	case CEPH_OSD_OP_SPARSE_READ: return "sparse-read";
-	case CEPH_OSD_OP_NOTIFY: return "notify";
-	case CEPH_OSD_OP_NOTIFY_ACK: return "notify-ack";
-	case CEPH_OSD_OP_ASSERT_VER: return "assert-version";
-
-	case CEPH_OSD_OP_MASKTRUNC: return "masktrunc";
-
-	case CEPH_OSD_OP_CREATE: return "create";
-	case CEPH_OSD_OP_WRITE: return "write";
-	case CEPH_OSD_OP_DELETE: return "delete";
-	case CEPH_OSD_OP_TRUNCATE: return "truncate";
-	case CEPH_OSD_OP_ZERO: return "zero";
-	case CEPH_OSD_OP_WRITEFULL: return "writefull";
-	case CEPH_OSD_OP_ROLLBACK: return "rollback";
-
-	case CEPH_OSD_OP_APPEND: return "append";
-	case CEPH_OSD_OP_STARTSYNC: return "startsync";
-	case CEPH_OSD_OP_SETTRUNC: return "settrunc";
-	case CEPH_OSD_OP_TRIMTRUNC: return "trimtrunc";
-
-	case CEPH_OSD_OP_TMAPUP: return "tmapup";
-	case CEPH_OSD_OP_TMAPGET: return "tmapget";
-	case CEPH_OSD_OP_TMAPPUT: return "tmapput";
-	case CEPH_OSD_OP_WATCH: return "watch";
-
-	case CEPH_OSD_OP_CLONERANGE: return "clonerange";
-	case CEPH_OSD_OP_ASSERT_SRC_VERSION: return "assert-src-version";
-	case CEPH_OSD_OP_SRC_CMPXATTR: return "src-cmpxattr";
-
-	case CEPH_OSD_OP_GETXATTR: return "getxattr";
-	case CEPH_OSD_OP_GETXATTRS: return "getxattrs";
-	case CEPH_OSD_OP_SETXATTR: return "setxattr";
-	case CEPH_OSD_OP_SETXATTRS: return "setxattrs";
-	case CEPH_OSD_OP_RESETXATTRS: return "resetxattrs";
-	case CEPH_OSD_OP_RMXATTR: return "rmxattr";
-	case CEPH_OSD_OP_CMPXATTR: return "cmpxattr";
-
-	case CEPH_OSD_OP_PULL: return "pull";
-	case CEPH_OSD_OP_PUSH: return "push";
-	case CEPH_OSD_OP_BALANCEREADS: return "balance-reads";
-	case CEPH_OSD_OP_UNBALANCEREADS: return "unbalance-reads";
-	case CEPH_OSD_OP_SCRUB: return "scrub";
-	case CEPH_OSD_OP_SCRUB_RESERVE: return "scrub-reserve";
-	case CEPH_OSD_OP_SCRUB_UNRESERVE: return "scrub-unreserve";
-	case CEPH_OSD_OP_SCRUB_STOP: return "scrub-stop";
-	case CEPH_OSD_OP_SCRUB_MAP: return "scrub-map";
-
-	case CEPH_OSD_OP_WRLOCK: return "wrlock";
-	case CEPH_OSD_OP_WRUNLOCK: return "wrunlock";
-	case CEPH_OSD_OP_RDLOCK: return "rdlock";
-	case CEPH_OSD_OP_RDUNLOCK: return "rdunlock";
-	case CEPH_OSD_OP_UPLOCK: return "uplock";
-	case CEPH_OSD_OP_DNLOCK: return "dnlock";
-
-	case CEPH_OSD_OP_CALL: return "call";
-
-	case CEPH_OSD_OP_PGLS: return "pgls";
-	case CEPH_OSD_OP_PGLS_FILTER: return "pgls-filter";
-	case CEPH_OSD_OP_OMAPGETKEYS: return "omap-get-keys";
-	case CEPH_OSD_OP_OMAPGETVALS: return "omap-get-vals";
-	case CEPH_OSD_OP_OMAPGETHEADER: return "omap-get-header";
-	case CEPH_OSD_OP_OMAPGETVALSBYKEYS: return "omap-get-vals-by-keys";
-	case CEPH_OSD_OP_OMAPSETVALS: return "omap-set-vals";
-	case CEPH_OSD_OP_OMAPSETHEADER: return "omap-set-header";
-	case CEPH_OSD_OP_OMAPCLEAR: return "omap-clear";
-	case CEPH_OSD_OP_OMAPRMKEYS: return "omap-rm-keys";
+#define GENERATE_CASE(op, opcode, str)	case CEPH_OSD_OP_##op: return (str);
+__CEPH_FORALL_OSD_OPS(GENERATE_CASE)
+#undef GENERATE_CASE
+	default:
+		return "???";
 	}
-	return "???";
 }
 
 const char *ceph_osd_state_name(int s)
diff --git a/net/ceph/debugfs.c b/net/ceph/debugfs.c
index d1a62c69a9f4..d2d525529f87 100644
--- a/net/ceph/debugfs.c
+++ b/net/ceph/debugfs.c
@@ -169,7 +169,8 @@ static int osdc_show(struct seq_file *s, void *pp)
 
 		for (i = 0; i < req->r_num_ops; i++) {
 			opcode = req->r_ops[i].op;
-			seq_printf(s, "\t%s", ceph_osd_op_name(opcode));
+			seq_printf(s, "%s%s", (i == 0 ? "\t" : ","),
+				   ceph_osd_op_name(opcode));
 		}
 
 		seq_printf(s, "\n");
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index b2f571dd933d..559c9f619c20 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -292,7 +292,11 @@ int ceph_msgr_init(void)
 	if (ceph_msgr_slab_init())
 		return -ENOMEM;
 
-	ceph_msgr_wq = alloc_workqueue("ceph-msgr", 0, 0);
+	/*
+	 * The number of active work items is limited by the number of
+	 * connections, so leave @max_active at default.
+	 */
+	ceph_msgr_wq = alloc_workqueue("ceph-msgr", WQ_MEM_RECLAIM, 0);
 	if (ceph_msgr_wq)
 		return 0;
 
@@ -1937,11 +1941,11 @@ static int process_banner(struct ceph_connection *con)
 		   sizeof(con->peer_addr)) != 0 &&
 	    !(addr_is_blank(&con->actual_peer_addr.in_addr) &&
 	      con->actual_peer_addr.nonce == con->peer_addr.nonce)) {
-		pr_warning("wrong peer, want %s/%d, got %s/%d\n",
-			   ceph_pr_addr(&con->peer_addr.in_addr),
-			   (int)le32_to_cpu(con->peer_addr.nonce),
-			   ceph_pr_addr(&con->actual_peer_addr.in_addr),
-			   (int)le32_to_cpu(con->actual_peer_addr.nonce));
+		pr_warn("wrong peer, want %s/%d, got %s/%d\n",
+			ceph_pr_addr(&con->peer_addr.in_addr),
+			(int)le32_to_cpu(con->peer_addr.nonce),
+			ceph_pr_addr(&con->actual_peer_addr.in_addr),
+			(int)le32_to_cpu(con->actual_peer_addr.nonce));
 		con->error_msg = "wrong peer at address";
 		return -1;
 	}
@@ -2302,7 +2306,7 @@ static int read_partial_message(struct ceph_connection *con)
 
 		BUG_ON(!con->in_msg ^ skip);
 		if (con->in_msg && data_len > con->in_msg->data_length) {
-			pr_warning("%s skipping long message (%u > %zd)\n",
+			pr_warn("%s skipping long message (%u > %zd)\n",
 				__func__, data_len, con->in_msg->data_length);
 			ceph_msg_put(con->in_msg);
 			con->in_msg = NULL;
@@ -2712,7 +2716,7 @@ static bool con_sock_closed(struct ceph_connection *con)
 	CASE(OPEN);
 	CASE(STANDBY);
 	default:
-		pr_warning("%s con %p unrecognized state %lu\n",
+		pr_warn("%s con %p unrecognized state %lu\n",
 			__func__, con, con->state);
 		con->error_msg = "unrecognized con state";
 		BUG();
@@ -2828,8 +2832,8 @@ static void con_work(struct work_struct *work)
  */
 static void con_fault(struct ceph_connection *con)
 {
-	pr_warning("%s%lld %s %s\n", ENTITY_NAME(con->peer_name),
-	       ceph_pr_addr(&con->peer_addr.in_addr), con->error_msg);
+	pr_warn("%s%lld %s %s\n", ENTITY_NAME(con->peer_name),
+		ceph_pr_addr(&con->peer_addr.in_addr), con->error_msg);
 	dout("fault %p state %lu to peer %s\n",
 	     con, con->state, ceph_pr_addr(&con->peer_addr.in_addr));
 
@@ -3071,10 +3075,8 @@ static void ceph_msg_data_destroy(struct ceph_msg_data *data)
 		return;
 
 	WARN_ON(!list_empty(&data->links));
-	if (data->type == CEPH_MSG_DATA_PAGELIST) {
+	if (data->type == CEPH_MSG_DATA_PAGELIST)
 		ceph_pagelist_release(data->pagelist);
-		kfree(data->pagelist);
-	}
 	kmem_cache_free(ceph_msg_data_cache, data);
 }
 
diff --git a/net/ceph/mon_client.c b/net/ceph/mon_client.c
index 61fcfc304f68..a83062ceeec9 100644
--- a/net/ceph/mon_client.c
+++ b/net/ceph/mon_client.c
@@ -1182,10 +1182,10 @@ static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con,
 		pr_info("alloc_msg unknown type %d\n", type);
 		*skip = 1;
 	} else if (front_len > m->front_alloc_len) {
-		pr_warning("mon_alloc_msg front %d > prealloc %d (%u#%llu)\n",
-			   front_len, m->front_alloc_len,
-			   (unsigned int)con->peer_name.type,
-			   le64_to_cpu(con->peer_name.num));
+		pr_warn("mon_alloc_msg front %d > prealloc %d (%u#%llu)\n",
+			front_len, m->front_alloc_len,
+			(unsigned int)con->peer_name.type,
+			le64_to_cpu(con->peer_name.num));
 		ceph_msg_put(m);
 		m = ceph_msg_new(type, front_len, GFP_NOFS, false);
 	}
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 30f6faf3584f..f3fc54eac09d 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -30,8 +30,11 @@ static void __send_queued(struct ceph_osd_client *osdc);
 static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd);
 static void __register_request(struct ceph_osd_client *osdc,
 			       struct ceph_osd_request *req);
+static void __unregister_request(struct ceph_osd_client *osdc,
+				 struct ceph_osd_request *req);
 static void __unregister_linger_request(struct ceph_osd_client *osdc,
 					struct ceph_osd_request *req);
+static void __enqueue_request(struct ceph_osd_request *req);
 static void __send_request(struct ceph_osd_client *osdc,
 			   struct ceph_osd_request *req);
 
@@ -428,68 +431,9 @@ EXPORT_SYMBOL(ceph_osdc_alloc_request);
 static bool osd_req_opcode_valid(u16 opcode)
 {
 	switch (opcode) {
-	case CEPH_OSD_OP_READ:
-	case CEPH_OSD_OP_STAT:
-	case CEPH_OSD_OP_MAPEXT:
-	case CEPH_OSD_OP_MASKTRUNC:
-	case CEPH_OSD_OP_SPARSE_READ:
-	case CEPH_OSD_OP_NOTIFY:
-	case CEPH_OSD_OP_NOTIFY_ACK:
-	case CEPH_OSD_OP_ASSERT_VER:
-	case CEPH_OSD_OP_WRITE:
-	case CEPH_OSD_OP_WRITEFULL:
-	case CEPH_OSD_OP_TRUNCATE:
-	case CEPH_OSD_OP_ZERO:
-	case CEPH_OSD_OP_DELETE:
-	case CEPH_OSD_OP_APPEND:
-	case CEPH_OSD_OP_STARTSYNC:
-	case CEPH_OSD_OP_SETTRUNC:
-	case CEPH_OSD_OP_TRIMTRUNC:
-	case CEPH_OSD_OP_TMAPUP:
-	case CEPH_OSD_OP_TMAPPUT:
-	case CEPH_OSD_OP_TMAPGET:
-	case CEPH_OSD_OP_CREATE:
-	case CEPH_OSD_OP_ROLLBACK:
-	case CEPH_OSD_OP_WATCH:
-	case CEPH_OSD_OP_OMAPGETKEYS:
-	case CEPH_OSD_OP_OMAPGETVALS:
-	case CEPH_OSD_OP_OMAPGETHEADER:
-	case CEPH_OSD_OP_OMAPGETVALSBYKEYS:
-	case CEPH_OSD_OP_OMAPSETVALS:
-	case CEPH_OSD_OP_OMAPSETHEADER:
-	case CEPH_OSD_OP_OMAPCLEAR:
-	case CEPH_OSD_OP_OMAPRMKEYS:
-	case CEPH_OSD_OP_OMAP_CMP:
-	case CEPH_OSD_OP_SETALLOCHINT:
-	case CEPH_OSD_OP_CLONERANGE:
-	case CEPH_OSD_OP_ASSERT_SRC_VERSION:
-	case CEPH_OSD_OP_SRC_CMPXATTR:
-	case CEPH_OSD_OP_GETXATTR:
-	case CEPH_OSD_OP_GETXATTRS:
-	case CEPH_OSD_OP_CMPXATTR:
-	case CEPH_OSD_OP_SETXATTR:
-	case CEPH_OSD_OP_SETXATTRS:
-	case CEPH_OSD_OP_RESETXATTRS:
-	case CEPH_OSD_OP_RMXATTR:
-	case CEPH_OSD_OP_PULL:
-	case CEPH_OSD_OP_PUSH:
-	case CEPH_OSD_OP_BALANCEREADS:
-	case CEPH_OSD_OP_UNBALANCEREADS:
-	case CEPH_OSD_OP_SCRUB:
-	case CEPH_OSD_OP_SCRUB_RESERVE:
-	case CEPH_OSD_OP_SCRUB_UNRESERVE:
-	case CEPH_OSD_OP_SCRUB_STOP:
-	case CEPH_OSD_OP_SCRUB_MAP:
-	case CEPH_OSD_OP_WRLOCK:
-	case CEPH_OSD_OP_WRUNLOCK:
-	case CEPH_OSD_OP_RDLOCK:
-	case CEPH_OSD_OP_RDUNLOCK:
-	case CEPH_OSD_OP_UPLOCK:
-	case CEPH_OSD_OP_DNLOCK:
-	case CEPH_OSD_OP_CALL:
-	case CEPH_OSD_OP_PGLS:
-	case CEPH_OSD_OP_PGLS_FILTER:
-		return true;
+#define GENERATE_CASE(op, opcode, str)	case CEPH_OSD_OP_##op: return true;
+__CEPH_FORALL_OSD_OPS(GENERATE_CASE)
+#undef GENERATE_CASE
 	default:
 		return false;
 	}
@@ -892,6 +836,37 @@ __lookup_request_ge(struct ceph_osd_client *osdc,
 	return NULL;
 }
 
+static void __kick_linger_request(struct ceph_osd_request *req)
+{
+	struct ceph_osd_client *osdc = req->r_osdc;
+	struct ceph_osd *osd = req->r_osd;
+
+	/*
+	 * Linger requests need to be resent with a new tid to avoid
+	 * the dup op detection logic on the OSDs.  Achieve this with
+	 * a re-register dance instead of open-coding.
+	 */
+	ceph_osdc_get_request(req);
+	if (!list_empty(&req->r_linger_item))
+		__unregister_linger_request(osdc, req);
+	else
+		__unregister_request(osdc, req);
+	__register_request(osdc, req);
+	ceph_osdc_put_request(req);
+
+	/*
+	 * Unless request has been registered as both normal and
+	 * lingering, __unregister{,_linger}_request clears r_osd.
+	 * However, here we need to preserve r_osd to make sure we
+	 * requeue on the same OSD.
+	 */
+	WARN_ON(req->r_osd || !osd);
+	req->r_osd = osd;
+
+	dout("%s requeueing %p tid %llu\n", __func__, req, req->r_tid);
+	__enqueue_request(req);
+}
+
 /*
  * Resubmit requests pending on the given osd.
  */
@@ -900,12 +875,14 @@ static void __kick_osd_requests(struct ceph_osd_client *osdc,
 {
 	struct ceph_osd_request *req, *nreq;
 	LIST_HEAD(resend);
+	LIST_HEAD(resend_linger);
 	int err;
 
-	dout("__kick_osd_requests osd%d\n", osd->o_osd);
+	dout("%s osd%d\n", __func__, osd->o_osd);
 	err = __reset_osd(osdc, osd);
 	if (err)
 		return;
+
 	/*
 	 * Build up a list of requests to resend by traversing the
 	 * osd's list of requests.  Requests for a given object are
@@ -926,33 +903,32 @@ static void __kick_osd_requests(struct ceph_osd_client *osdc,
 	list_for_each_entry(req, &osd->o_requests, r_osd_item) {
 		if (!req->r_sent)
 			break;
-		list_move_tail(&req->r_req_lru_item, &resend);
-		dout("requeueing %p tid %llu osd%d\n", req, req->r_tid,
-		     osd->o_osd);
-		if (!req->r_linger)
+
+		if (!req->r_linger) {
+			dout("%s requeueing %p tid %llu\n", __func__, req,
+			     req->r_tid);
+			list_move_tail(&req->r_req_lru_item, &resend);
 			req->r_flags |= CEPH_OSD_FLAG_RETRY;
+		} else {
+			list_move_tail(&req->r_req_lru_item, &resend_linger);
+		}
 	}
 	list_splice(&resend, &osdc->req_unsent);
 
 	/*
-	 * Linger requests are re-registered before sending, which
-	 * sets up a new tid for each.  We add them to the unsent
-	 * list at the end to keep things in tid order.
+	 * Both registered and not yet registered linger requests are
+	 * enqueued with a new tid on the same OSD.  We add/move them
+	 * to req_unsent/o_requests at the end to keep things in tid
+	 * order.
 	 */
 	list_for_each_entry_safe(req, nreq, &osd->o_linger_requests,
 				 r_linger_osd_item) {
-		/*
-		 * reregister request prior to unregistering linger so
-		 * that r_osd is preserved.
-		 */
-		BUG_ON(!list_empty(&req->r_req_lru_item));
-		__register_request(osdc, req);
-		list_add_tail(&req->r_req_lru_item, &osdc->req_unsent);
-		list_add_tail(&req->r_osd_item, &req->r_osd->o_requests);
-		__unregister_linger_request(osdc, req);
-		dout("requeued lingering %p tid %llu osd%d\n", req, req->r_tid,
-		     osd->o_osd);
+		WARN_ON(!list_empty(&req->r_req_lru_item));
+		__kick_linger_request(req);
 	}
+
+	list_for_each_entry_safe(req, nreq, &resend_linger, r_req_lru_item)
+		__kick_linger_request(req);
 }
 
 /*
@@ -1346,6 +1322,22 @@ static int __calc_request_pg(struct ceph_osdmap *osdmap,
 				   &req->r_target_oid, pg_out);
 }
 
+static void __enqueue_request(struct ceph_osd_request *req)
+{
+	struct ceph_osd_client *osdc = req->r_osdc;
+
+	dout("%s %p tid %llu to osd%d\n", __func__, req, req->r_tid,
+	     req->r_osd ? req->r_osd->o_osd : -1);
+
+	if (req->r_osd) {
+		__remove_osd_from_lru(req->r_osd);
+		list_add_tail(&req->r_osd_item, &req->r_osd->o_requests);
+		list_move_tail(&req->r_req_lru_item, &osdc->req_unsent);
+	} else {
+		list_move_tail(&req->r_req_lru_item, &osdc->req_notarget);
+	}
+}
+
 /*
  * Pick an osd (the first 'up' osd in the pg), allocate the osd struct
  * (as needed), and set the request r_osd appropriately.  If there is
@@ -1423,13 +1415,7 @@ static int __map_request(struct ceph_osd_client *osdc,
 			      &osdc->osdmap->osd_addr[o]);
 	}
 
-	if (req->r_osd) {
-		__remove_osd_from_lru(req->r_osd);
-		list_add_tail(&req->r_osd_item, &req->r_osd->o_requests);
-		list_move_tail(&req->r_req_lru_item, &osdc->req_unsent);
-	} else {
-		list_move_tail(&req->r_req_lru_item, &osdc->req_notarget);
-	}
+	__enqueue_request(req);
 	err = 1;   /* osd or pg changed */
 
 out:
@@ -1774,8 +1760,8 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
 	}
 	bytes = le32_to_cpu(msg->hdr.data_len);
 	if (payload_len != bytes) {
-		pr_warning("sum of op payload lens %d != data_len %d",
-			   payload_len, bytes);
+		pr_warn("sum of op payload lens %d != data_len %d\n",
+			payload_len, bytes);
 		goto bad_put;
 	}
 
@@ -2313,24 +2299,19 @@ static void handle_watch_notify(struct ceph_osd_client *osdc,
 	if (event) {
 		event_work = kmalloc(sizeof(*event_work), GFP_NOIO);
 		if (!event_work) {
-			dout("ERROR: could not allocate event_work\n");
-			goto done_err;
+			pr_err("couldn't allocate event_work\n");
+			ceph_osdc_put_event(event);
+			return;
 		}
 		INIT_WORK(&event_work->work, do_event_work);
 		event_work->event = event;
 		event_work->ver = ver;
 		event_work->notify_id = notify_id;
 		event_work->opcode = opcode;
-		if (!queue_work(osdc->notify_wq, &event_work->work)) {
-			dout("WARNING: failed to queue notify event work\n");
-			goto done_err;
-		}
-	}
 
-	return;
+		queue_work(osdc->notify_wq, &event_work->work);
+	}
 
-done_err:
-	ceph_osdc_put_event(event);
 	return;
 
 bad:
@@ -2797,10 +2778,10 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
 	ceph_msg_revoke_incoming(req->r_reply);
 
 	if (front_len > req->r_reply->front_alloc_len) {
-		pr_warning("get_reply front %d > preallocated %d (%u#%llu)\n",
-			   front_len, req->r_reply->front_alloc_len,
-			   (unsigned int)con->peer_name.type,
-			   le64_to_cpu(con->peer_name.num));
+		pr_warn("get_reply front %d > preallocated %d (%u#%llu)\n",
+			front_len, req->r_reply->front_alloc_len,
+			(unsigned int)con->peer_name.type,
+			le64_to_cpu(con->peer_name.num));
 		m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front_len, GFP_NOFS,
 				 false);
 		if (!m)
@@ -2823,8 +2804,7 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
 			if (osd_data->pages &&
 				unlikely(osd_data->length < data_len)) {
 
-				pr_warning("tid %lld reply has %d bytes "
-					"we had only %llu bytes ready\n",
+				pr_warn("tid %lld reply has %d bytes we had only %llu bytes ready\n",
 					tid, data_len, osd_data->length);
 				*skip = 1;
 				ceph_msg_put(m);
diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c
index c547e46084d3..b8c3fde5b04f 100644
--- a/net/ceph/osdmap.c
+++ b/net/ceph/osdmap.c
@@ -521,11 +521,11 @@ static int decode_pool(void **p, void *end, struct ceph_pg_pool_info *pi)
 	ev = ceph_decode_8(p);  /* encoding version */
 	cv = ceph_decode_8(p); /* compat version */
 	if (ev < 5) {
-		pr_warning("got v %d < 5 cv %d of ceph_pg_pool\n", ev, cv);
+		pr_warn("got v %d < 5 cv %d of ceph_pg_pool\n", ev, cv);
 		return -EINVAL;
 	}
 	if (cv > 9) {
-		pr_warning("got v %d cv %d > 9 of ceph_pg_pool\n", ev, cv);
+		pr_warn("got v %d cv %d > 9 of ceph_pg_pool\n", ev, cv);
 		return -EINVAL;
 	}
 	len = ceph_decode_32(p);
@@ -671,26 +671,26 @@ static int osdmap_set_max_osd(struct ceph_osdmap *map, int max)
 	int i;
 
 	state = krealloc(map->osd_state, max*sizeof(*state), GFP_NOFS);
+	if (!state)
+		return -ENOMEM;
+	map->osd_state = state;
+
 	weight = krealloc(map->osd_weight, max*sizeof(*weight), GFP_NOFS);
-	addr = krealloc(map->osd_addr, max*sizeof(*addr), GFP_NOFS);
-	if (!state || !weight || !addr) {
-		kfree(state);
-		kfree(weight);
-		kfree(addr);
+	if (!weight)
+		return -ENOMEM;
+	map->osd_weight = weight;
 
+	addr = krealloc(map->osd_addr, max*sizeof(*addr), GFP_NOFS);
+	if (!addr)
 		return -ENOMEM;
-	}
+	map->osd_addr = addr;
 
 	for (i = map->max_osd; i < max; i++) {
-		state[i] = 0;
-		weight[i] = CEPH_OSD_OUT;
-		memset(addr + i, 0, sizeof(*addr));
+		map->osd_state[i] = 0;
+		map->osd_weight[i] = CEPH_OSD_OUT;
+		memset(map->osd_addr + i, 0, sizeof(*map->osd_addr));
 	}
 
-	map->osd_state = state;
-	map->osd_weight = weight;
-	map->osd_addr = addr;
-
 	if (map->osd_primary_affinity) {
 		u32 *affinity;
 
@@ -698,11 +698,11 @@ static int osdmap_set_max_osd(struct ceph_osdmap *map, int max)
 				    max*sizeof(*affinity), GFP_NOFS);
 		if (!affinity)
 			return -ENOMEM;
+		map->osd_primary_affinity = affinity;
 
 		for (i = map->max_osd; i < max; i++)
-			affinity[i] = CEPH_OSD_DEFAULT_PRIMARY_AFFINITY;
-
-		map->osd_primary_affinity = affinity;
+			map->osd_primary_affinity[i] =
+			    CEPH_OSD_DEFAULT_PRIMARY_AFFINITY;
 	}
 
 	map->max_osd = max;
@@ -729,9 +729,9 @@ static int get_osdmap_client_data_v(void **p, void *end,
 
 		ceph_decode_8_safe(p, end, struct_compat, e_inval);
 		if (struct_compat > OSDMAP_WRAPPER_COMPAT_VER) {
-			pr_warning("got v %d cv %d > %d of %s ceph_osdmap\n",
-				   struct_v, struct_compat,
-				   OSDMAP_WRAPPER_COMPAT_VER, prefix);
+			pr_warn("got v %d cv %d > %d of %s ceph_osdmap\n",
+				struct_v, struct_compat,
+				OSDMAP_WRAPPER_COMPAT_VER, prefix);
 			return -EINVAL;
 		}
 		*p += 4; /* ignore wrapper struct_len */
@@ -739,9 +739,9 @@ static int get_osdmap_client_data_v(void **p, void *end,
 		ceph_decode_8_safe(p, end, struct_v, e_inval);
 		ceph_decode_8_safe(p, end, struct_compat, e_inval);
 		if (struct_compat > OSDMAP_CLIENT_DATA_COMPAT_VER) {
-			pr_warning("got v %d cv %d > %d of %s ceph_osdmap client data\n",
-				   struct_v, struct_compat,
-				   OSDMAP_CLIENT_DATA_COMPAT_VER, prefix);
+			pr_warn("got v %d cv %d > %d of %s ceph_osdmap client data\n",
+				struct_v, struct_compat,
+				OSDMAP_CLIENT_DATA_COMPAT_VER, prefix);
 			return -EINVAL;
 		}
 		*p += 4; /* ignore client data struct_len */
@@ -751,8 +751,8 @@ static int get_osdmap_client_data_v(void **p, void *end,
 		*p -= 1;
 		ceph_decode_16_safe(p, end, version, e_inval);
 		if (version < 6) {
-			pr_warning("got v %d < 6 of %s ceph_osdmap\n", version,
-				   prefix);
+			pr_warn("got v %d < 6 of %s ceph_osdmap\n",
+				version, prefix);
 			return -EINVAL;
 		}
 
diff --git a/net/ceph/pagelist.c b/net/ceph/pagelist.c
index 92866bebb65f..c7c220a736e5 100644
--- a/net/ceph/pagelist.c
+++ b/net/ceph/pagelist.c
@@ -1,5 +1,6 @@
 #include <linux/module.h>
 #include <linux/gfp.h>
+#include <linux/slab.h>
 #include <linux/pagemap.h>
 #include <linux/highmem.h>
 #include <linux/ceph/pagelist.h>
@@ -13,8 +14,10 @@ static void ceph_pagelist_unmap_tail(struct ceph_pagelist *pl)
 	}
 }
 
-int ceph_pagelist_release(struct ceph_pagelist *pl)
+void ceph_pagelist_release(struct ceph_pagelist *pl)
 {
+	if (!atomic_dec_and_test(&pl->refcnt))
+		return;
 	ceph_pagelist_unmap_tail(pl);
 	while (!list_empty(&pl->head)) {
 		struct page *page = list_first_entry(&pl->head, struct page,
@@ -23,7 +26,7 @@ int ceph_pagelist_release(struct ceph_pagelist *pl)
 		__free_page(page);
 	}
 	ceph_pagelist_free_reserve(pl);
-	return 0;
+	kfree(pl);
 }
 EXPORT_SYMBOL(ceph_pagelist_release);