fs: dlm: remove wq_alloc mutex

This patch cleanups the code for allocating a new buffer in the dlm
writequeue mechanism. There was a possible tuneup to allow scheduling
while a new writequeue entry needs to be allocated because either no
sending page is available or are full. To avoid multiple concurrent
users checking at the same time if an entry is available or full
alloc_wq was introduce that those are waiting if there is currently a
new writequeue entry in process to be queued so possible further users
will check on the new allocated writequeue entry if it's full.

To simplify the code we just remove this mutex and switch that the
already introduced spin lock will be held during writequeue check,
allocation and queueing. So other users can never check on available
writequeues while there is a new one in process but not queued yet.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
Signed-off-by: David Teigland <teigland@redhat.com>
This commit is contained in:
Alexander Aring 2021-11-30 14:47:17 -05:00 committed by David Teigland
parent 21d9ac1a53
commit be3b0400ed
1 changed files with 11 additions and 37 deletions

View File

@ -86,7 +86,6 @@ struct connection {
struct list_head writequeue; /* List of outgoing writequeue_entries */ struct list_head writequeue; /* List of outgoing writequeue_entries */
spinlock_t writequeue_lock; spinlock_t writequeue_lock;
atomic_t writequeue_cnt; atomic_t writequeue_cnt;
struct mutex wq_alloc;
int retries; int retries;
#define MAX_CONNECT_RETRIES 3 #define MAX_CONNECT_RETRIES 3
struct hlist_node list; struct hlist_node list;
@ -270,8 +269,6 @@ static struct connection *nodeid2con(int nodeid, gfp_t alloc)
return NULL; return NULL;
} }
mutex_init(&con->wq_alloc);
spin_lock(&connections_lock); spin_lock(&connections_lock);
/* Because multiple workqueues/threads calls this function it can /* Because multiple workqueues/threads calls this function it can
* race on multiple cpu's. Instead of locking hot path __find_con() * race on multiple cpu's. Instead of locking hot path __find_con()
@ -1176,16 +1173,15 @@ static void deinit_local(void)
kfree(dlm_local_addr[i]); kfree(dlm_local_addr[i]);
} }
static struct writequeue_entry *new_writequeue_entry(struct connection *con, static struct writequeue_entry *new_writequeue_entry(struct connection *con)
gfp_t allocation)
{ {
struct writequeue_entry *entry; struct writequeue_entry *entry;
entry = kzalloc(sizeof(*entry), allocation); entry = kzalloc(sizeof(*entry), GFP_ATOMIC);
if (!entry) if (!entry)
return NULL; return NULL;
entry->page = alloc_page(allocation | __GFP_ZERO); entry->page = alloc_page(GFP_ATOMIC | __GFP_ZERO);
if (!entry->page) { if (!entry->page) {
kfree(entry); kfree(entry);
return NULL; return NULL;
@ -1200,8 +1196,8 @@ static struct writequeue_entry *new_writequeue_entry(struct connection *con,
} }
static struct writequeue_entry *new_wq_entry(struct connection *con, int len, static struct writequeue_entry *new_wq_entry(struct connection *con, int len,
gfp_t allocation, char **ppc, char **ppc, void (*cb)(void *data),
void (*cb)(void *data), void *data) void *data)
{ {
struct writequeue_entry *e; struct writequeue_entry *e;
@ -1217,29 +1213,25 @@ static struct writequeue_entry *new_wq_entry(struct connection *con, int len,
e->end += len; e->end += len;
e->users++; e->users++;
spin_unlock(&con->writequeue_lock); goto out;
return e;
} }
} }
spin_unlock(&con->writequeue_lock);
e = new_writequeue_entry(con, allocation); e = new_writequeue_entry(con);
if (!e) if (!e)
return NULL; goto out;
kref_get(&e->ref); kref_get(&e->ref);
*ppc = page_address(e->page); *ppc = page_address(e->page);
e->end += len; e->end += len;
atomic_inc(&con->writequeue_cnt); atomic_inc(&con->writequeue_cnt);
spin_lock(&con->writequeue_lock);
if (cb) if (cb)
cb(data); cb(data);
list_add_tail(&e->list, &con->writequeue); list_add_tail(&e->list, &con->writequeue);
spin_unlock(&con->writequeue_lock);
out:
spin_unlock(&con->writequeue_lock);
return e; return e;
}; };
@ -1250,37 +1242,19 @@ static struct dlm_msg *dlm_lowcomms_new_msg_con(struct connection *con, int len,
{ {
struct writequeue_entry *e; struct writequeue_entry *e;
struct dlm_msg *msg; struct dlm_msg *msg;
bool sleepable;
msg = kzalloc(sizeof(*msg), allocation); msg = kzalloc(sizeof(*msg), allocation);
if (!msg) if (!msg)
return NULL; return NULL;
/* this mutex is being used as a wait to avoid multiple "fast"
* new writequeue page list entry allocs in new_wq_entry in
* normal operation which is sleepable context. Without it
* we could end in multiple writequeue entries with one
* dlm message because multiple callers were waiting at
* the writequeue_lock in new_wq_entry().
*/
sleepable = gfpflags_normal_context(allocation);
if (sleepable)
mutex_lock(&con->wq_alloc);
kref_init(&msg->ref); kref_init(&msg->ref);
e = new_wq_entry(con, len, allocation, ppc, cb, data); e = new_wq_entry(con, len, ppc, cb, data);
if (!e) { if (!e) {
if (sleepable)
mutex_unlock(&con->wq_alloc);
kfree(msg); kfree(msg);
return NULL; return NULL;
} }
if (sleepable)
mutex_unlock(&con->wq_alloc);
msg->ppc = *ppc; msg->ppc = *ppc;
msg->len = len; msg->len = len;
msg->entry = e; msg->entry = e;