xfs: factor callbacks out of xlog_state_do_callback()

Simplify the code flow by lifting the iclog callback work out of
the main iclog iteration loop. This isolates the log juggling and
callbacks from the iclog state change logic in the loop.

Note that the loopdidcallbacks variable is not actually tracking
whether callbacks are actually run - it is tracking whether the
icloglock was dropped during the loop and so determines if we
completed the entire iclog scan loop atomically. Hence we know for
certain there are either no more ordered completions to run or
that the next completion will run the remaining ordered iclog
completions. Hence rename that variable appropriately for it's
function.

Signed-off-by: Dave Chinner <dchinner@redhat.com>
Reviewed-by: Darrick J. Wong <darrick.wong@oracle.com>
Signed-off-by: Darrick J. Wong <darrick.wong@oracle.com>
This commit is contained in:
Dave Chinner 2019-09-05 17:32:50 -07:00 committed by Darrick J. Wong
parent 6769aa2a4f
commit 6546818c85
1 changed files with 48 additions and 28 deletions

View File

@ -2630,6 +2630,42 @@ xlog_get_lowest_lsn(
return lowest_lsn; return lowest_lsn;
} }
/*
* Keep processing entries in the iclog callback list until we come around and
* it is empty. We need to atomically see that the list is empty and change the
* state to DIRTY so that we don't miss any more callbacks being added.
*
* This function is called with the icloglock held and returns with it held. We
* drop it while running callbacks, however, as holding it over thousands of
* callbacks is unnecessary and causes excessive contention if we do.
*/
static void
xlog_state_do_iclog_callbacks(
struct xlog *log,
struct xlog_in_core *iclog,
bool aborted)
{
spin_unlock(&log->l_icloglock);
spin_lock(&iclog->ic_callback_lock);
while (!list_empty(&iclog->ic_callbacks)) {
LIST_HEAD(tmp);
list_splice_init(&iclog->ic_callbacks, &tmp);
spin_unlock(&iclog->ic_callback_lock);
xlog_cil_process_committed(&tmp, aborted);
spin_lock(&iclog->ic_callback_lock);
}
/*
* Pick up the icloglock while still holding the callback lock so we
* serialise against anyone trying to add more callbacks to this iclog
* now we've finished processing.
*/
spin_lock(&log->l_icloglock);
spin_unlock(&iclog->ic_callback_lock);
}
#ifdef DEBUG #ifdef DEBUG
/* /*
* Make one last gasp attempt to see if iclogs are being left in limbo. If the * Make one last gasp attempt to see if iclogs are being left in limbo. If the
@ -2684,15 +2720,15 @@ xlog_state_do_callback(
int flushcnt = 0; int flushcnt = 0;
xfs_lsn_t lowest_lsn; xfs_lsn_t lowest_lsn;
int ioerrors; /* counter: iclogs with errors */ int ioerrors; /* counter: iclogs with errors */
int loopdidcallbacks; /* flag: inner loop did callbacks*/ bool cycled_icloglock;
int funcdidcallbacks; /* flag: function did callbacks */ bool did_callbacks;
int repeats; /* for issuing console warnings if int repeats; /* for issuing console warnings if
* looping too many times */ * looping too many times */
spin_lock(&log->l_icloglock); spin_lock(&log->l_icloglock);
first_iclog = iclog = log->l_iclog; first_iclog = iclog = log->l_iclog;
ioerrors = 0; ioerrors = 0;
funcdidcallbacks = 0; did_callbacks = 0;
repeats = 0; repeats = 0;
do { do {
@ -2706,7 +2742,7 @@ xlog_state_do_callback(
*/ */
first_iclog = log->l_iclog; first_iclog = log->l_iclog;
iclog = log->l_iclog; iclog = log->l_iclog;
loopdidcallbacks = 0; cycled_icloglock = false;
repeats++; repeats++;
do { do {
@ -2797,31 +2833,13 @@ xlog_state_do_callback(
} else } else
ioerrors++; ioerrors++;
spin_unlock(&log->l_icloglock);
/* /*
* Keep processing entries in the callback list until * Running callbacks will drop the icloglock which means
* we come around and it is empty. We need to * we'll have to run at least one more complete loop.
* atomically see that the list is empty and change the
* state to DIRTY so that we don't miss any more
* callbacks being added.
*/ */
spin_lock(&iclog->ic_callback_lock); cycled_icloglock = true;
while (!list_empty(&iclog->ic_callbacks)) { xlog_state_do_iclog_callbacks(log, iclog, aborted);
LIST_HEAD(tmp);
list_splice_init(&iclog->ic_callbacks, &tmp);
spin_unlock(&iclog->ic_callback_lock);
xlog_cil_process_committed(&tmp, aborted);
spin_lock(&iclog->ic_callback_lock);
}
loopdidcallbacks++;
funcdidcallbacks++;
spin_lock(&log->l_icloglock);
spin_unlock(&iclog->ic_callback_lock);
if (!(iclog->ic_state & XLOG_STATE_IOERROR)) if (!(iclog->ic_state & XLOG_STATE_IOERROR))
iclog->ic_state = XLOG_STATE_DIRTY; iclog->ic_state = XLOG_STATE_DIRTY;
@ -2837,6 +2855,8 @@ xlog_state_do_callback(
iclog = iclog->ic_next; iclog = iclog->ic_next;
} while (first_iclog != iclog); } while (first_iclog != iclog);
did_callbacks |= cycled_icloglock;
if (repeats > 5000) { if (repeats > 5000) {
flushcnt += repeats; flushcnt += repeats;
repeats = 0; repeats = 0;
@ -2844,9 +2864,9 @@ xlog_state_do_callback(
"%s: possible infinite loop (%d iterations)", "%s: possible infinite loop (%d iterations)",
__func__, flushcnt); __func__, flushcnt);
} }
} while (!ioerrors && loopdidcallbacks); } while (!ioerrors && cycled_icloglock);
if (funcdidcallbacks) if (did_callbacks)
xlog_state_callback_check_state(log); xlog_state_callback_check_state(log);
if (log->l_iclog->ic_state & (XLOG_STATE_ACTIVE|XLOG_STATE_IOERROR)) if (log->l_iclog->ic_state & (XLOG_STATE_ACTIVE|XLOG_STATE_IOERROR))