ceph: fix mds sync() race with completing requests

The wait_unsafe_requests() helper dropped the mdsc mutex to wait
for each request to complete, and then examined r_node to get the
next request after retaking the lock.  But the request completion
removes the request from the tree, so r_node was always undefined
at this point.  Since it's a small race, it usually led to a
valid request, but not always.  The result was an occasional
crash in rb_next() while dereferencing node->rb_left.

Fix this by clearing the rb_node when removing the request from
the request tree, and not walking off into the weeds when we
are done waiting for a request.  Since the request we waited on
will _always_ be out of the request tree, take a ref on the next
request, in the hopes that it won't be.  But if it is, it's ok:
we can start over from the beginning (and traverse over older read
requests again).

Signed-off-by: Sage Weil <sage@newdream.net>
This commit is contained in:
Sage Weil 2010-03-16 15:28:54 -07:00
parent 916623da10
commit 80fc7314a7
1 changed files with 20 additions and 7 deletions

View File

@ -529,6 +529,7 @@ static void __unregister_request(struct ceph_mds_client *mdsc,
{ {
dout("__unregister_request %p tid %lld\n", req, req->r_tid); dout("__unregister_request %p tid %lld\n", req, req->r_tid);
rb_erase(&req->r_node, &mdsc->request_tree); rb_erase(&req->r_node, &mdsc->request_tree);
RB_CLEAR_NODE(&req->r_node);
ceph_mdsc_put_request(req); ceph_mdsc_put_request(req);
if (req->r_unsafe_dir) { if (req->r_unsafe_dir) {
@ -2682,29 +2683,41 @@ void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc)
*/ */
static void wait_unsafe_requests(struct ceph_mds_client *mdsc, u64 want_tid) static void wait_unsafe_requests(struct ceph_mds_client *mdsc, u64 want_tid)
{ {
struct ceph_mds_request *req = NULL; struct ceph_mds_request *req = NULL, *nextreq;
struct rb_node *n; struct rb_node *n;
mutex_lock(&mdsc->mutex); mutex_lock(&mdsc->mutex);
dout("wait_unsafe_requests want %lld\n", want_tid); dout("wait_unsafe_requests want %lld\n", want_tid);
restart:
req = __get_oldest_req(mdsc); req = __get_oldest_req(mdsc);
while (req && req->r_tid <= want_tid) { while (req && req->r_tid <= want_tid) {
/* find next request */
n = rb_next(&req->r_node);
if (n)
nextreq = rb_entry(n, struct ceph_mds_request, r_node);
else
nextreq = NULL;
if ((req->r_op & CEPH_MDS_OP_WRITE)) { if ((req->r_op & CEPH_MDS_OP_WRITE)) {
/* write op */ /* write op */
ceph_mdsc_get_request(req); ceph_mdsc_get_request(req);
if (nextreq)
ceph_mdsc_get_request(nextreq);
mutex_unlock(&mdsc->mutex); mutex_unlock(&mdsc->mutex);
dout("wait_unsafe_requests wait on %llu (want %llu)\n", dout("wait_unsafe_requests wait on %llu (want %llu)\n",
req->r_tid, want_tid); req->r_tid, want_tid);
wait_for_completion(&req->r_safe_completion); wait_for_completion(&req->r_safe_completion);
mutex_lock(&mdsc->mutex); mutex_lock(&mdsc->mutex);
n = rb_next(&req->r_node);
ceph_mdsc_put_request(req); ceph_mdsc_put_request(req);
} else { if (!nextreq)
n = rb_next(&req->r_node); break; /* next dne before, so we're done! */
if (RB_EMPTY_NODE(&nextreq->r_node)) {
/* next request was removed from tree */
ceph_mdsc_put_request(nextreq);
goto restart;
}
ceph_mdsc_put_request(nextreq); /* won't go away */
} }
if (!n) req = nextreq;
break;
req = rb_entry(n, struct ceph_mds_request, r_node);
} }
mutex_unlock(&mdsc->mutex); mutex_unlock(&mdsc->mutex);
dout("wait_unsafe_requests done\n"); dout("wait_unsafe_requests done\n");