staging/lustre/clio: generalize cl_sync_io

To make cl_sync_io interfaces not just wait for pages, but to be
a generic synchronization mechanism.

Also remove cl_io_cancel that became not used.

Signed-off-by: Jinshan Xiong <jinshan.xiong@intel.com>
Reviewed-on: http://review.whamcloud.com/8656
Intel-bug-id: https://jira.hpdd.intel.com/browse/LU-4198
Reviewed-by: Bobi Jam <bobijam@gmail.com>
Reviewed-by: Lai Siyao <lai.siyao@intel.com>
Signed-off-by: Oleg Drokin <green@linuxhacker.ru>
Signed-off-by: Greg Kroah-Hartman <gregkh@linuxfoundation.org>
This commit is contained in:
Jinshan Xiong 2016-03-30 19:48:39 -04:00 committed by Greg Kroah-Hartman
parent bb41292b4c
commit e5c4e635c3
3 changed files with 44 additions and 45 deletions

View File

@ -3125,13 +3125,18 @@ struct cl_sync_io {
atomic_t csi_barrier;
/** completion to be signaled when transfer is complete. */
wait_queue_head_t csi_waitq;
/** callback to invoke when this IO is finished */
void (*csi_end_io)(const struct lu_env *,
struct cl_sync_io *);
};
void cl_sync_io_init(struct cl_sync_io *anchor, int nrpages);
int cl_sync_io_wait(const struct lu_env *env, struct cl_io *io,
struct cl_page_list *queue, struct cl_sync_io *anchor,
void cl_sync_io_init(struct cl_sync_io *anchor, int nr,
void (*end)(const struct lu_env *, struct cl_sync_io *));
int cl_sync_io_wait(const struct lu_env *env, struct cl_sync_io *anchor,
long timeout);
void cl_sync_io_note(struct cl_sync_io *anchor, int ioret);
void cl_sync_io_note(const struct lu_env *env, struct cl_sync_io *anchor,
int ioret);
void cl_sync_io_end(const struct lu_env *env, struct cl_sync_io *anchor);
/** @} cl_sync_io */

View File

@ -800,6 +800,9 @@ int cl_io_submit_rw(const struct lu_env *env, struct cl_io *io,
}
EXPORT_SYMBOL(cl_io_submit_rw);
static void cl_page_list_assume(const struct lu_env *env,
struct cl_io *io, struct cl_page_list *plist);
/**
* Submit a sync_io and wait for the IO to be finished, or error happens.
* If \a timeout is zero, it means to wait for the IO unconditionally.
@ -817,7 +820,7 @@ int cl_io_submit_sync(const struct lu_env *env, struct cl_io *io,
pg->cp_sync_io = anchor;
}
cl_sync_io_init(anchor, queue->c2_qin.pl_nr);
cl_sync_io_init(anchor, queue->c2_qin.pl_nr, &cl_sync_io_end);
rc = cl_io_submit_rw(env, io, iot, queue);
if (rc == 0) {
/*
@ -828,12 +831,12 @@ int cl_io_submit_sync(const struct lu_env *env, struct cl_io *io,
*/
cl_page_list_for_each(pg, &queue->c2_qin) {
pg->cp_sync_io = NULL;
cl_sync_io_note(anchor, 1);
cl_sync_io_note(env, anchor, 1);
}
/* wait for the IO to be finished. */
rc = cl_sync_io_wait(env, io, &queue->c2_qout,
anchor, timeout);
rc = cl_sync_io_wait(env, anchor, timeout);
cl_page_list_assume(env, io, &queue->c2_qout);
} else {
LASSERT(list_empty(&queue->c2_qout.pl_pages));
cl_page_list_for_each(pg, &queue->c2_qin)
@ -843,25 +846,6 @@ int cl_io_submit_sync(const struct lu_env *env, struct cl_io *io,
}
EXPORT_SYMBOL(cl_io_submit_sync);
/**
* Cancel an IO which has been submitted by cl_io_submit_rw.
*/
static int cl_io_cancel(const struct lu_env *env, struct cl_io *io,
struct cl_page_list *queue)
{
struct cl_page *page;
int result = 0;
CERROR("Canceling ongoing page transmission\n");
cl_page_list_for_each(page, queue) {
int rc;
rc = cl_page_cancel(env, page);
result = result ?: rc;
}
return result;
}
/**
* Main io loop.
*
@ -1433,25 +1417,38 @@ void cl_req_attr_set(const struct lu_env *env, struct cl_req *req,
}
EXPORT_SYMBOL(cl_req_attr_set);
/* cl_sync_io_callback assumes the caller must call cl_sync_io_wait() to
* wait for the IO to finish.
*/
void cl_sync_io_end(const struct lu_env *env, struct cl_sync_io *anchor)
{
wake_up_all(&anchor->csi_waitq);
/* it's safe to nuke or reuse anchor now */
atomic_set(&anchor->csi_barrier, 0);
}
EXPORT_SYMBOL(cl_sync_io_end);
/**
* Initialize synchronous io wait anchor, for transfer of \a nrpages pages.
* Initialize synchronous io wait anchor
*/
void cl_sync_io_init(struct cl_sync_io *anchor, int nrpages)
void cl_sync_io_init(struct cl_sync_io *anchor, int nr,
void (*end)(const struct lu_env *, struct cl_sync_io *))
{
init_waitqueue_head(&anchor->csi_waitq);
atomic_set(&anchor->csi_sync_nr, nrpages);
atomic_set(&anchor->csi_barrier, nrpages > 0);
atomic_set(&anchor->csi_sync_nr, nr);
atomic_set(&anchor->csi_barrier, nr > 0);
anchor->csi_sync_rc = 0;
anchor->csi_end_io = end;
LASSERT(end);
}
EXPORT_SYMBOL(cl_sync_io_init);
/**
* Wait until all transfer completes. Transfer completion routine has to call
* cl_sync_io_note() for every page.
* Wait until all IO completes. Transfer completion routine has to call
* cl_sync_io_note() for every entity.
*/
int cl_sync_io_wait(const struct lu_env *env, struct cl_io *io,
struct cl_page_list *queue, struct cl_sync_io *anchor,
int cl_sync_io_wait(const struct lu_env *env, struct cl_sync_io *anchor,
long timeout)
{
struct l_wait_info lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(timeout),
@ -1464,11 +1461,9 @@ int cl_sync_io_wait(const struct lu_env *env, struct cl_io *io,
atomic_read(&anchor->csi_sync_nr) == 0,
&lwi);
if (rc < 0) {
CERROR("SYNC IO failed with error: %d, try to cancel %d remaining pages\n",
CERROR("IO failed: %d, still wait for %d remaining entries\n",
rc, atomic_read(&anchor->csi_sync_nr));
(void)cl_io_cancel(env, io, queue);
lwi = (struct l_wait_info) { 0 };
(void)l_wait_event(anchor->csi_waitq,
atomic_read(&anchor->csi_sync_nr) == 0,
@ -1477,14 +1472,12 @@ int cl_sync_io_wait(const struct lu_env *env, struct cl_io *io,
rc = anchor->csi_sync_rc;
}
LASSERT(atomic_read(&anchor->csi_sync_nr) == 0);
cl_page_list_assume(env, io, queue);
/* wait until cl_sync_io_note() has done wakeup */
while (unlikely(atomic_read(&anchor->csi_barrier) != 0)) {
cpu_relax();
}
POISON(anchor, 0x5a, sizeof(*anchor));
return rc;
}
EXPORT_SYMBOL(cl_sync_io_wait);
@ -1492,7 +1485,8 @@ EXPORT_SYMBOL(cl_sync_io_wait);
/**
* Indicate that transfer of a single page completed.
*/
void cl_sync_io_note(struct cl_sync_io *anchor, int ioret)
void cl_sync_io_note(const struct lu_env *env, struct cl_sync_io *anchor,
int ioret)
{
if (anchor->csi_sync_rc == 0 && ioret < 0)
anchor->csi_sync_rc = ioret;
@ -1503,9 +1497,9 @@ void cl_sync_io_note(struct cl_sync_io *anchor, int ioret)
*/
LASSERT(atomic_read(&anchor->csi_sync_nr) > 0);
if (atomic_dec_and_test(&anchor->csi_sync_nr)) {
wake_up_all(&anchor->csi_waitq);
/* it's safe to nuke or reuse anchor now */
atomic_set(&anchor->csi_barrier, 0);
LASSERT(anchor->csi_end_io);
anchor->csi_end_io(env, anchor);
/* Can't access anchor any more */
}
}
EXPORT_SYMBOL(cl_sync_io_note);

View File

@ -887,7 +887,7 @@ void cl_page_completion(const struct lu_env *env,
cl_page_put(env, pg);
if (anchor)
cl_sync_io_note(anchor, ioret);
cl_sync_io_note(env, anchor, ioret);
}
EXPORT_SYMBOL(cl_page_completion);