summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--fs/ceph/file.c52
-rw-r--r--include/linux/ceph/osd_client.h4
-rw-r--r--net/ceph/osd_client.c12
3 files changed, 40 insertions, 28 deletions
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index ae23e31a8f38..a65acf355384 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -446,19 +446,35 @@ done:
 }
 
 /*
- * Write commit callback, called if we requested both an ACK and
- * ONDISK commit reply from the OSD.
+ * Write commit request unsafe callback, called to tell us when a
+ * request is unsafe (that is, in flight--has been handed to the
+ * messenger to send to its target osd).  It is called again when
+ * we've received a response message indicating the request is
+ * "safe" (its CEPH_OSD_FLAG_ONDISK flag is set), or when a request
+ * is completed early (and unsuccessfully) due to a timeout or
+ * interrupt.
+ *
+ * This is used if we requested both an ACK and ONDISK commit reply
+ * from the OSD.
  */
-static void sync_write_commit(struct ceph_osd_request *req,
-			      struct ceph_msg *msg)
+static void ceph_sync_write_unsafe(struct ceph_osd_request *req, bool unsafe)
 {
 	struct ceph_inode_info *ci = ceph_inode(req->r_inode);
 
-	dout("sync_write_commit %p tid %llu\n", req, req->r_tid);
-	spin_lock(&ci->i_unsafe_lock);
-	list_del_init(&req->r_unsafe_item);
-	spin_unlock(&ci->i_unsafe_lock);
-	ceph_put_cap_refs(ci, CEPH_CAP_FILE_WR);
+	dout("%s %p tid %llu %ssafe\n", __func__, req, req->r_tid,
+		unsafe ? "un" : "");
+	if (unsafe) {
+		ceph_get_cap_refs(ci, CEPH_CAP_FILE_WR);
+		spin_lock(&ci->i_unsafe_lock);
+		list_add_tail(&req->r_unsafe_item,
+			      &ci->i_unsafe_writes);
+		spin_unlock(&ci->i_unsafe_lock);
+	} else {
+		spin_lock(&ci->i_unsafe_lock);
+		list_del_init(&req->r_unsafe_item);
+		spin_unlock(&ci->i_unsafe_lock);
+		ceph_put_cap_refs(ci, CEPH_CAP_FILE_WR);
+	}
 }
 
 /*
@@ -570,7 +586,8 @@ more:
 
 		if ((file->f_flags & O_SYNC) == 0) {
 			/* get a second commit callback */
-			req->r_safe_callback = sync_write_commit;
+			req->r_unsafe_callback = ceph_sync_write_unsafe;
+			req->r_inode = inode;
 			own_pages = true;
 		}
 	}
@@ -581,21 +598,8 @@ more:
 	ceph_osdc_build_request(req, pos, snapc, vino.snap, &mtime);
 
 	ret = ceph_osdc_start_request(&fsc->client->osdc, req, false);
-	if (!ret) {
-		if (req->r_safe_callback) {
-			/*
-			 * Add to inode unsafe list only after we
-			 * start_request so that a tid has been assigned.
-			 */
-			spin_lock(&ci->i_unsafe_lock);
-			list_add_tail(&req->r_unsafe_item,
-				      &ci->i_unsafe_writes);
-			spin_unlock(&ci->i_unsafe_lock);
-			ceph_get_cap_refs(ci, CEPH_CAP_FILE_WR);
-		}
-		
+	if (!ret)
 		ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
-	}
 
 	if (file->f_flags & O_DIRECT)
 		ceph_put_page_vector(pages, num_pages, false);
diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
index 2a68a7465c18..0d3358ef5285 100644
--- a/include/linux/ceph/osd_client.h
+++ b/include/linux/ceph/osd_client.h
@@ -29,6 +29,7 @@ struct ceph_authorizer;
  */
 typedef void (*ceph_osdc_callback_t)(struct ceph_osd_request *,
 				     struct ceph_msg *);
+typedef void (*ceph_osdc_unsafe_callback_t)(struct ceph_osd_request *, bool);
 
 /* a given osd we're communicating with */
 struct ceph_osd {
@@ -149,7 +150,8 @@ struct ceph_osd_request {
 	struct kref       r_kref;
 	bool              r_mempool;
 	struct completion r_completion, r_safe_completion;
-	ceph_osdc_callback_t r_callback, r_safe_callback;
+	ceph_osdc_callback_t r_callback;
+	ceph_osdc_unsafe_callback_t r_unsafe_callback;
 	struct ceph_eversion r_reassert_version;
 	struct list_head  r_unsafe_item;
 
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 939be67199ca..0c5bf2fb5075 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -1314,8 +1314,14 @@ static void __send_request(struct ceph_osd_client *osdc,
 	list_move_tail(&req->r_req_lru_item, &osdc->req_lru);
 
 	ceph_msg_get(req->r_request); /* send consumes a ref */
-	ceph_con_send(&req->r_osd->o_con, req->r_request);
+
+	/* Mark the request unsafe if this is the first timet's being sent. */
+
+	if (!req->r_sent && req->r_unsafe_callback)
+		req->r_unsafe_callback(req, true);
 	req->r_sent = req->r_osd->o_incarnation;
+
+	ceph_con_send(&req->r_osd->o_con, req->r_request);
 }
 
 /*
@@ -1403,8 +1409,8 @@ static void handle_osds_timeout(struct work_struct *work)
 
 static void complete_request(struct ceph_osd_request *req)
 {
-	if (req->r_safe_callback)
-		req->r_safe_callback(req, NULL);
+	if (req->r_unsafe_callback)
+		req->r_unsafe_callback(req, false);
 	complete_all(&req->r_safe_completion);  /* fsync waiter */
 }