From be3b0400edbf68556cd390125e2c868988616391 Mon Sep 17 00:00:00 2001 From: Alexander Aring Date: Tue, 30 Nov 2021 14:47:17 -0500 Subject: [PATCH] fs: dlm: remove wq_alloc mutex This patch cleanups the code for allocating a new buffer in the dlm writequeue mechanism. There was a possible tuneup to allow scheduling while a new writequeue entry needs to be allocated because either no sending page is available or are full. To avoid multiple concurrent users checking at the same time if an entry is available or full alloc_wq was introduce that those are waiting if there is currently a new writequeue entry in process to be queued so possible further users will check on the new allocated writequeue entry if it's full. To simplify the code we just remove this mutex and switch that the already introduced spin lock will be held during writequeue check, allocation and queueing. So other users can never check on available writequeues while there is a new one in process but not queued yet. Signed-off-by: Alexander Aring Signed-off-by: David Teigland --- fs/dlm/lowcomms.c | 48 +++++++++++------------------------------------ 1 file changed, 11 insertions(+), 37 deletions(-) diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c index 6d500ebc6145..4919faf79709 100644 --- a/fs/dlm/lowcomms.c +++ b/fs/dlm/lowcomms.c @@ -86,7 +86,6 @@ struct connection { struct list_head writequeue; /* List of outgoing writequeue_entries */ spinlock_t writequeue_lock; atomic_t writequeue_cnt; - struct mutex wq_alloc; int retries; #define MAX_CONNECT_RETRIES 3 struct hlist_node list; @@ -270,8 +269,6 @@ static struct connection *nodeid2con(int nodeid, gfp_t alloc) return NULL; } - mutex_init(&con->wq_alloc); - spin_lock(&connections_lock); /* Because multiple workqueues/threads calls this function it can * race on multiple cpu's. Instead of locking hot path __find_con() @@ -1176,16 +1173,15 @@ static void deinit_local(void) kfree(dlm_local_addr[i]); } -static struct writequeue_entry *new_writequeue_entry(struct connection *con, - gfp_t allocation) +static struct writequeue_entry *new_writequeue_entry(struct connection *con) { struct writequeue_entry *entry; - entry = kzalloc(sizeof(*entry), allocation); + entry = kzalloc(sizeof(*entry), GFP_ATOMIC); if (!entry) return NULL; - entry->page = alloc_page(allocation | __GFP_ZERO); + entry->page = alloc_page(GFP_ATOMIC | __GFP_ZERO); if (!entry->page) { kfree(entry); return NULL; @@ -1200,8 +1196,8 @@ static struct writequeue_entry *new_writequeue_entry(struct connection *con, } static struct writequeue_entry *new_wq_entry(struct connection *con, int len, - gfp_t allocation, char **ppc, - void (*cb)(void *data), void *data) + char **ppc, void (*cb)(void *data), + void *data) { struct writequeue_entry *e; @@ -1217,29 +1213,25 @@ static struct writequeue_entry *new_wq_entry(struct connection *con, int len, e->end += len; e->users++; - spin_unlock(&con->writequeue_lock); - - return e; + goto out; } } - spin_unlock(&con->writequeue_lock); - e = new_writequeue_entry(con, allocation); + e = new_writequeue_entry(con); if (!e) - return NULL; + goto out; kref_get(&e->ref); *ppc = page_address(e->page); e->end += len; atomic_inc(&con->writequeue_cnt); - - spin_lock(&con->writequeue_lock); if (cb) cb(data); list_add_tail(&e->list, &con->writequeue); - spin_unlock(&con->writequeue_lock); +out: + spin_unlock(&con->writequeue_lock); return e; }; @@ -1250,37 +1242,19 @@ static struct dlm_msg *dlm_lowcomms_new_msg_con(struct connection *con, int len, { struct writequeue_entry *e; struct dlm_msg *msg; - bool sleepable; msg = kzalloc(sizeof(*msg), allocation); if (!msg) return NULL; - /* this mutex is being used as a wait to avoid multiple "fast" - * new writequeue page list entry allocs in new_wq_entry in - * normal operation which is sleepable context. Without it - * we could end in multiple writequeue entries with one - * dlm message because multiple callers were waiting at - * the writequeue_lock in new_wq_entry(). - */ - sleepable = gfpflags_normal_context(allocation); - if (sleepable) - mutex_lock(&con->wq_alloc); - kref_init(&msg->ref); - e = new_wq_entry(con, len, allocation, ppc, cb, data); + e = new_wq_entry(con, len, ppc, cb, data); if (!e) { - if (sleepable) - mutex_unlock(&con->wq_alloc); - kfree(msg); return NULL; } - if (sleepable) - mutex_unlock(&con->wq_alloc); - msg->ppc = *ppc; msg->len = len; msg->entry = e;