ceph: ignore wbc->range_{start,end} when write back snapshot data

writepages() needs to write dirty pages to OSD in strict order of
snapshot context. It must first write dirty pages associated with
the oldest snapshot context. In the write range case, dirty pages
in the specified range can be associated with newer snapc. They
are not writeable until we write all dirty pages associated with
the oldest snapc.

Signed-off-by: "Yan, Zheng" <zyan@redhat.com>
Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
This commit is contained in:
Yan, Zheng 2017-09-01 16:53:58 +08:00 committed by Ilya Dryomov
parent 590e9d9861
commit 2a2d927e35
1 changed files with 46 additions and 34 deletions

View File

@ -469,6 +469,7 @@ struct ceph_writeback_ctl
u64 truncate_size;
u32 truncate_seq;
bool size_stable;
bool head_snapc;
};
/*
@ -504,6 +505,7 @@ get_oldest_context(struct inode *inode, struct ceph_writeback_ctl *ctl,
}
ctl->truncate_size = capsnap->truncate_size;
ctl->truncate_seq = capsnap->truncate_seq;
ctl->head_snapc = false;
}
if (snapc)
@ -524,6 +526,7 @@ get_oldest_context(struct inode *inode, struct ceph_writeback_ctl *ctl,
ctl->truncate_size = ci->i_truncate_size;
ctl->truncate_seq = ci->i_truncate_seq;
ctl->size_stable = false;
ctl->head_snapc = true;
}
}
spin_unlock(&ci->i_ceph_lock);
@ -781,7 +784,7 @@ static int ceph_writepages_start(struct address_space *mapping,
struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
struct ceph_vino vino = ceph_vino(inode);
pgoff_t index, start_index, end;
pgoff_t index, start_index, end = -1;
struct ceph_snap_context *snapc = NULL, *last_snapc = NULL, *pgsnapc;
struct pagevec pvec;
int rc = 0;
@ -810,25 +813,10 @@ static int ceph_writepages_start(struct address_space *mapping,
pagevec_init(&pvec, 0);
start_index = wbc->range_cyclic ? mapping->writeback_index : 0;
/* where to start/end? */
if (wbc->range_cyclic) {
index = start_index;
end = -1;
should_loop = (index > 0);
dout(" cyclic, start at %lu\n", index);
} else {
index = wbc->range_start >> PAGE_SHIFT;
end = wbc->range_end >> PAGE_SHIFT;
if (wbc->range_start == 0 && wbc->range_end == LLONG_MAX)
range_whole = true;
should_loop = false;
dout(" not cyclic, %lu to %lu\n", index, end);
}
index = start_index;
retry:
/* find oldest snap context with dirty data */
ceph_put_snap_context(snapc);
snapc = get_oldest_context(inode, &ceph_wbc, NULL);
if (!snapc) {
/* hmm, why does writepages get called when there
@ -839,13 +827,33 @@ static int ceph_writepages_start(struct address_space *mapping,
dout(" oldest snapc is %p seq %lld (%d snaps)\n",
snapc, snapc->seq, snapc->num_snaps);
if (last_snapc && snapc != last_snapc) {
/* if we switched to a newer snapc, restart our scan at the
* start of the original file range. */
dout(" snapc differs from last pass, restarting at %lu\n",
index);
index = start;
should_loop = false;
if (ceph_wbc.head_snapc && snapc != last_snapc) {
/* where to start/end? */
if (wbc->range_cyclic) {
index = start_index;
end = -1;
if (index > 0)
should_loop = true;
dout(" cyclic, start at %lu\n", index);
} else {
index = wbc->range_start >> PAGE_SHIFT;
end = wbc->range_end >> PAGE_SHIFT;
if (wbc->range_start == 0 && wbc->range_end == LLONG_MAX)
range_whole = true;
dout(" not cyclic, %lu to %lu\n", index, end);
}
} else if (!ceph_wbc.head_snapc) {
/* Do not respect wbc->range_{start,end}. Dirty pages
* in that range can be associated with newer snapc.
* They are not writeable until we write all dirty pages
* associated with 'snapc' get written */
if (index > 0 || wbc->sync_mode != WB_SYNC_NONE)
should_loop = true;
dout(" non-head snapc, range whole\n");
}
ceph_put_snap_context(last_snapc);
last_snapc = snapc;
stop = false;
@ -891,7 +899,9 @@ static int ceph_writepages_start(struct address_space *mapping,
dout("end of range %p\n", page);
/* can't be range_cyclic (1st pass) because
* end == -1 in that case. */
stop = done = true;
stop = true;
if (ceph_wbc.head_snapc)
done = true;
unlock_page(page);
break;
}
@ -1136,24 +1146,26 @@ static int ceph_writepages_start(struct address_space *mapping,
if (pages)
goto new_request;
if (wbc->nr_to_write <= 0)
stop = done = true;
/*
* We stop writing back only if we are not doing
* integrity sync. In case of integrity sync we have to
* keep going until we have written all the pages
* we tagged for writeback prior to entering this loop.
*/
if (wbc->nr_to_write <= 0 && wbc->sync_mode == WB_SYNC_NONE)
done = stop = true;
release_pvec_pages:
dout("pagevec_release on %d pages (%p)\n", (int)pvec.nr,
pvec.nr ? pvec.pages[0] : NULL);
pagevec_release(&pvec);
if (locked_pages && !done)
goto retry;
}
if (should_loop && !done) {
/* more to do; loop back to beginning of file */
dout("writepages looping back to beginning of file\n");
should_loop = false;
end = start_index - 1;
end = start_index - 1; /* OK even when start_index == 0 */
start_index = 0;
index = 0;
goto retry;
}
@ -1163,8 +1175,8 @@ static int ceph_writepages_start(struct address_space *mapping,
out:
ceph_osdc_put_request(req);
ceph_put_snap_context(snapc);
dout("writepages done, rc = %d\n", rc);
ceph_put_snap_context(last_snapc);
dout("writepages dend - startone, rc = %d\n", rc);
return rc;
}