dlm: fix missing dir remove

I don't know exactly how, but in some cases, a dir
record is not removed, or a new one is created when
it shouldn't be.  The result is that the dir node
lookup returns a master node where the rsb does not
exist.  In this case, The master node will repeatedly
return -EBADR for requests, and the lock requests will
be stuck.

Until all possible ways for this to happen can be
eliminated, a simple and effective way to recover from
this situation is for the supposed master node to send
a standard remove message to the dir node when it
receives a request for a resource it has no rsb for.

Signed-off-by: David Teigland <teigland@redhat.com>
This commit is contained in:
David Teigland 2012-06-25 13:48:05 -05:00
parent c503a62103
commit 96006ea6d4
1 changed files with 68 additions and 2 deletions

View File

@ -4000,12 +4000,70 @@ static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms)
return error; return error;
} }
static void send_repeat_remove(struct dlm_ls *ls, char *ms_name, int len)
{
char name[DLM_RESNAME_MAXLEN + 1];
struct dlm_message *ms;
struct dlm_mhandle *mh;
struct dlm_rsb *r;
uint32_t hash, b;
int rv, dir_nodeid;
memset(name, 0, sizeof(name));
memcpy(name, ms_name, len);
hash = jhash(name, len, 0);
b = hash & (ls->ls_rsbtbl_size - 1);
dir_nodeid = dlm_hash2nodeid(ls, hash);
log_error(ls, "send_repeat_remove dir %d %s", dir_nodeid, name);
spin_lock(&ls->ls_rsbtbl[b].lock);
rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
if (!rv) {
spin_unlock(&ls->ls_rsbtbl[b].lock);
log_error(ls, "repeat_remove on keep %s", name);
return;
}
rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
if (!rv) {
spin_unlock(&ls->ls_rsbtbl[b].lock);
log_error(ls, "repeat_remove on toss %s", name);
return;
}
/* use ls->remove_name2 to avoid conflict with shrink? */
spin_lock(&ls->ls_remove_spin);
ls->ls_remove_len = len;
memcpy(ls->ls_remove_name, name, DLM_RESNAME_MAXLEN);
spin_unlock(&ls->ls_remove_spin);
spin_unlock(&ls->ls_rsbtbl[b].lock);
rv = _create_message(ls, sizeof(struct dlm_message) + len,
dir_nodeid, DLM_MSG_REMOVE, &ms, &mh);
if (rv)
return;
memcpy(ms->m_extra, name, len);
ms->m_hash = hash;
send_message(mh, ms);
spin_lock(&ls->ls_remove_spin);
ls->ls_remove_len = 0;
memset(ls->ls_remove_name, 0, DLM_RESNAME_MAXLEN);
spin_unlock(&ls->ls_remove_spin);
}
static int receive_request(struct dlm_ls *ls, struct dlm_message *ms) static int receive_request(struct dlm_ls *ls, struct dlm_message *ms)
{ {
struct dlm_lkb *lkb; struct dlm_lkb *lkb;
struct dlm_rsb *r; struct dlm_rsb *r;
int from_nodeid; int from_nodeid;
int error, namelen; int error, namelen = 0;
from_nodeid = ms->m_header.h_nodeid; from_nodeid = ms->m_header.h_nodeid;
@ -4073,13 +4131,21 @@ static int receive_request(struct dlm_ls *ls, struct dlm_message *ms)
delayed in being sent/arriving/being processed on the dir node. delayed in being sent/arriving/being processed on the dir node.
Another node would repeatedly lookup up the master, and the dir Another node would repeatedly lookup up the master, and the dir
node would continue returning our nodeid until our send_remove node would continue returning our nodeid until our send_remove
took effect. */ took effect.
We send another remove message in case our previous send_remove
was lost/ignored/missed somehow. */
if (error != -ENOTBLK) { if (error != -ENOTBLK) {
log_limit(ls, "receive_request %x from %d %d", log_limit(ls, "receive_request %x from %d %d",
ms->m_lkid, from_nodeid, error); ms->m_lkid, from_nodeid, error);
} }
if (namelen && error == -EBADR) {
send_repeat_remove(ls, ms->m_extra, namelen);
msleep(1000);
}
setup_stub_lkb(ls, ms); setup_stub_lkb(ls, ms);
send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error); send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
return error; return error;