fs: dlm: listen socket out of connection hash

This patch introduces a own connection structure for the listen socket
handling instead of handling the listen socket as normal connection
structure in the connection hash. We can remove some nodeid equals zero
validation checks, because this nodeid should not exists anymore inside
the node hash. This patch also removes the sock mutex in
accept_from_sock() function because this function can't occur in another
parallel context if it's scheduled on only one workqueue.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
Signed-off-by: David Teigland <teigland@redhat.com>
This commit is contained in:
Alexander Aring 2020-11-02 20:04:25 -05:00 committed by David Teigland
parent 13004e8afe
commit d11ccd451b
1 changed files with 74 additions and 93 deletions

View File

@ -81,7 +81,6 @@ struct connection {
#define CF_CONNECTED 10 #define CF_CONNECTED 10
struct list_head writequeue; /* List of outgoing writequeue_entries */ struct list_head writequeue; /* List of outgoing writequeue_entries */
spinlock_t writequeue_lock; spinlock_t writequeue_lock;
int (*rx_action) (struct connection *); /* What to do when active */
void (*connect_action) (struct connection *); /* What to do to connect */ void (*connect_action) (struct connection *); /* What to do to connect */
void (*shutdown_action)(struct connection *con); /* What to do to shutdown */ void (*shutdown_action)(struct connection *con); /* What to do to shutdown */
int retries; int retries;
@ -98,6 +97,11 @@ struct connection {
}; };
#define sock2con(x) ((struct connection *)(x)->sk_user_data) #define sock2con(x) ((struct connection *)(x)->sk_user_data)
struct listen_connection {
struct socket *sock;
struct work_struct rwork;
};
/* An entry waiting to be sent */ /* An entry waiting to be sent */
struct writequeue_entry { struct writequeue_entry {
struct list_head list; struct list_head list;
@ -127,6 +131,7 @@ static struct listen_sock_callbacks {
static LIST_HEAD(dlm_node_addrs); static LIST_HEAD(dlm_node_addrs);
static DEFINE_SPINLOCK(dlm_node_addrs_spin); static DEFINE_SPINLOCK(dlm_node_addrs_spin);
static struct listen_connection listen_con;
static struct sockaddr_storage *dlm_local_addr[DLM_MAX_ADDR_COUNT]; static struct sockaddr_storage *dlm_local_addr[DLM_MAX_ADDR_COUNT];
static int dlm_local_count; static int dlm_local_count;
static int dlm_allow_conn; static int dlm_allow_conn;
@ -421,6 +426,11 @@ static void lowcomms_data_ready(struct sock *sk)
read_unlock_bh(&sk->sk_callback_lock); read_unlock_bh(&sk->sk_callback_lock);
} }
static void lowcomms_listen_data_ready(struct sock *sk)
{
queue_work(recv_workqueue, &listen_con.rwork);
}
static void lowcomms_write_space(struct sock *sk) static void lowcomms_write_space(struct sock *sk)
{ {
struct connection *con; struct connection *con;
@ -556,6 +566,21 @@ static void restore_callbacks(struct socket *sock)
write_unlock_bh(&sk->sk_callback_lock); write_unlock_bh(&sk->sk_callback_lock);
} }
static void add_listen_sock(struct socket *sock, struct listen_connection *con)
{
struct sock *sk = sock->sk;
write_lock_bh(&sk->sk_callback_lock);
save_listen_callbacks(sock);
con->sock = sock;
sk->sk_user_data = con;
sk->sk_allocation = GFP_NOFS;
/* Install a data_ready callback */
sk->sk_data_ready = lowcomms_listen_data_ready;
write_unlock_bh(&sk->sk_callback_lock);
}
/* Make a socket active */ /* Make a socket active */
static void add_sock(struct socket *sock, struct connection *con) static void add_sock(struct socket *sock, struct connection *con)
{ {
@ -593,6 +618,15 @@ static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port,
memset((char *)saddr + *addr_len, 0, sizeof(struct sockaddr_storage) - *addr_len); memset((char *)saddr + *addr_len, 0, sizeof(struct sockaddr_storage) - *addr_len);
} }
static void dlm_close_sock(struct socket **sock)
{
if (*sock) {
restore_callbacks(*sock);
sock_release(*sock);
*sock = NULL;
}
}
/* Close a remote connection and tidy up */ /* Close a remote connection and tidy up */
static void close_connection(struct connection *con, bool and_other, static void close_connection(struct connection *con, bool and_other,
bool tx, bool rx) bool tx, bool rx)
@ -609,11 +643,8 @@ static void close_connection(struct connection *con, bool and_other,
} }
mutex_lock(&con->sock_mutex); mutex_lock(&con->sock_mutex);
if (con->sock) { dlm_close_sock(&con->sock);
restore_callbacks(con->sock);
sock_release(con->sock);
con->sock = NULL;
}
if (con->othercon && and_other) { if (con->othercon && and_other) {
/* Will only re-enter once. */ /* Will only re-enter once. */
close_connection(con->othercon, false, true, true); close_connection(con->othercon, false, true, true);
@ -709,11 +740,6 @@ static int receive_from_sock(struct connection *con)
goto out_close; goto out_close;
} }
if (con->nodeid == 0) {
ret = -EINVAL;
goto out_close;
}
/* realloc if we get new buffer size to read out */ /* realloc if we get new buffer size to read out */
buflen = dlm_config.ci_buffer_size; buflen = dlm_config.ci_buffer_size;
if (con->rx_buflen != buflen && con->rx_leftover <= buflen) { if (con->rx_buflen != buflen && con->rx_leftover <= buflen) {
@ -785,7 +811,7 @@ static int receive_from_sock(struct connection *con)
} }
/* Listening socket is busy, accept a connection */ /* Listening socket is busy, accept a connection */
static int accept_from_sock(struct connection *con) static int accept_from_sock(struct listen_connection *con)
{ {
int result; int result;
struct sockaddr_storage peeraddr; struct sockaddr_storage peeraddr;
@ -800,12 +826,8 @@ static int accept_from_sock(struct connection *con)
return -1; return -1;
} }
mutex_lock_nested(&con->sock_mutex, 0); if (!con->sock)
if (!con->sock) {
mutex_unlock(&con->sock_mutex);
return -ENOTCONN; return -ENOTCONN;
}
result = kernel_accept(con->sock, &newsock, O_NONBLOCK); result = kernel_accept(con->sock, &newsock, O_NONBLOCK);
if (result < 0) if (result < 0)
@ -827,7 +849,6 @@ static int accept_from_sock(struct connection *con)
print_hex_dump_bytes("ss: ", DUMP_PREFIX_NONE, print_hex_dump_bytes("ss: ", DUMP_PREFIX_NONE,
b, sizeof(struct sockaddr_storage)); b, sizeof(struct sockaddr_storage));
sock_release(newsock); sock_release(newsock);
mutex_unlock(&con->sock_mutex);
return -1; return -1;
} }
@ -846,7 +867,8 @@ static int accept_from_sock(struct connection *con)
result = -ENOMEM; result = -ENOMEM;
goto accept_err; goto accept_err;
} }
mutex_lock_nested(&newcon->sock_mutex, 1);
mutex_lock(&newcon->sock_mutex);
if (newcon->sock) { if (newcon->sock) {
struct connection *othercon = newcon->othercon; struct connection *othercon = newcon->othercon;
@ -865,20 +887,18 @@ static int accept_from_sock(struct connection *con)
goto accept_err; goto accept_err;
} }
set_bit(CF_IS_OTHERCON, &othercon->flags);
newcon->othercon = othercon; newcon->othercon = othercon;
} else { } else {
/* close other sock con if we have something new */ /* close other sock con if we have something new */
close_connection(othercon, false, true, false); close_connection(othercon, false, true, false);
} }
mutex_lock_nested(&othercon->sock_mutex, 2); mutex_lock_nested(&othercon->sock_mutex, 1);
add_sock(newsock, othercon); add_sock(newsock, othercon);
addcon = othercon; addcon = othercon;
mutex_unlock(&othercon->sock_mutex); mutex_unlock(&othercon->sock_mutex);
} }
else { else {
newcon->rx_action = receive_from_sock;
/* accept copies the sk after we've saved the callbacks, so we /* accept copies the sk after we've saved the callbacks, so we
don't want to save them a second time or comm errors will don't want to save them a second time or comm errors will
result in calling sk_error_report recursively. */ result in calling sk_error_report recursively. */
@ -895,12 +915,10 @@ static int accept_from_sock(struct connection *con)
*/ */
if (!test_and_set_bit(CF_READ_PENDING, &addcon->flags)) if (!test_and_set_bit(CF_READ_PENDING, &addcon->flags))
queue_work(recv_workqueue, &addcon->rwork); queue_work(recv_workqueue, &addcon->rwork);
mutex_unlock(&con->sock_mutex);
return 0; return 0;
accept_err: accept_err:
mutex_unlock(&con->sock_mutex);
if (newsock) if (newsock)
sock_release(newsock); sock_release(newsock);
@ -973,11 +991,6 @@ static void sctp_connect_to_sock(struct connection *con)
struct socket *sock; struct socket *sock;
unsigned int mark; unsigned int mark;
if (con->nodeid == 0) {
log_print("attempt to connect sock 0 foiled");
return;
}
dlm_comm_mark(con->nodeid, &mark); dlm_comm_mark(con->nodeid, &mark);
mutex_lock(&con->sock_mutex); mutex_lock(&con->sock_mutex);
@ -1006,7 +1019,6 @@ static void sctp_connect_to_sock(struct connection *con)
sock_set_mark(sock->sk, mark); sock_set_mark(sock->sk, mark);
con->rx_action = receive_from_sock;
add_sock(sock, con); add_sock(sock, con);
/* Bind to all addresses. */ /* Bind to all addresses. */
@ -1073,11 +1085,6 @@ static void tcp_connect_to_sock(struct connection *con)
unsigned int mark; unsigned int mark;
int result; int result;
if (con->nodeid == 0) {
log_print("attempt to connect sock 0 foiled");
return;
}
dlm_comm_mark(con->nodeid, &mark); dlm_comm_mark(con->nodeid, &mark);
mutex_lock(&con->sock_mutex); mutex_lock(&con->sock_mutex);
@ -1103,7 +1110,6 @@ static void tcp_connect_to_sock(struct connection *con)
goto out_err; goto out_err;
} }
con->rx_action = receive_from_sock;
add_sock(sock, con); add_sock(sock, con);
/* Bind to our cluster-known address connecting to avoid /* Bind to our cluster-known address connecting to avoid
@ -1159,8 +1165,11 @@ static void tcp_connect_to_sock(struct connection *con)
return; return;
} }
static struct socket *tcp_create_listen_sock(struct connection *con, /* On error caller must run dlm_close_sock() for the
struct sockaddr_storage *saddr) * listen connection socket.
*/
static int tcp_create_listen_sock(struct listen_connection *con,
struct sockaddr_storage *saddr)
{ {
struct socket *sock = NULL; struct socket *sock = NULL;
int result = 0; int result = 0;
@ -1186,20 +1195,13 @@ static struct socket *tcp_create_listen_sock(struct connection *con,
sock_set_reuseaddr(sock->sk); sock_set_reuseaddr(sock->sk);
write_lock_bh(&sock->sk->sk_callback_lock); add_listen_sock(sock, con);
sock->sk->sk_user_data = con;
save_listen_callbacks(sock);
con->rx_action = accept_from_sock;
write_unlock_bh(&sock->sk->sk_callback_lock);
/* Bind to our port */ /* Bind to our port */
make_sockaddr(saddr, dlm_config.ci_tcp_port, &addr_len); make_sockaddr(saddr, dlm_config.ci_tcp_port, &addr_len);
result = sock->ops->bind(sock, (struct sockaddr *) saddr, addr_len); result = sock->ops->bind(sock, (struct sockaddr *) saddr, addr_len);
if (result < 0) { if (result < 0) {
log_print("Can't bind to port %d", dlm_config.ci_tcp_port); log_print("Can't bind to port %d", dlm_config.ci_tcp_port);
sock_release(sock);
sock = NULL;
con->sock = NULL;
goto create_out; goto create_out;
} }
sock_set_keepalive(sock->sk); sock_set_keepalive(sock->sk);
@ -1207,13 +1209,13 @@ static struct socket *tcp_create_listen_sock(struct connection *con,
result = sock->ops->listen(sock, 5); result = sock->ops->listen(sock, 5);
if (result < 0) { if (result < 0) {
log_print("Can't listen on port %d", dlm_config.ci_tcp_port); log_print("Can't listen on port %d", dlm_config.ci_tcp_port);
sock_release(sock);
sock = NULL;
goto create_out; goto create_out;
} }
return 0;
create_out: create_out:
return sock; return result;
} }
/* Get local addresses */ /* Get local addresses */
@ -1242,15 +1244,14 @@ static void deinit_local(void)
kfree(dlm_local_addr[i]); kfree(dlm_local_addr[i]);
} }
/* Initialise SCTP socket and bind to all interfaces */ /* Initialise SCTP socket and bind to all interfaces
static int sctp_listen_for_all(void) * On error caller must run dlm_close_sock() for the
* listen connection socket.
*/
static int sctp_listen_for_all(struct listen_connection *con)
{ {
struct socket *sock = NULL; struct socket *sock = NULL;
int result = -EINVAL; int result = -EINVAL;
struct connection *con = nodeid2con(0, GFP_NOFS);
if (!con)
return -ENOMEM;
log_print("Using SCTP for communications"); log_print("Using SCTP for communications");
@ -1265,44 +1266,27 @@ static int sctp_listen_for_all(void)
sock_set_mark(sock->sk, dlm_config.ci_mark); sock_set_mark(sock->sk, dlm_config.ci_mark);
sctp_sock_set_nodelay(sock->sk); sctp_sock_set_nodelay(sock->sk);
write_lock_bh(&sock->sk->sk_callback_lock); add_listen_sock(sock, con);
/* Init con struct */
sock->sk->sk_user_data = con;
save_listen_callbacks(sock);
con->sock = sock;
con->sock->sk->sk_data_ready = lowcomms_data_ready;
con->rx_action = accept_from_sock;
write_unlock_bh(&sock->sk->sk_callback_lock);
/* Bind to all addresses. */ /* Bind to all addresses. */
if (sctp_bind_addrs(con->sock, dlm_config.ci_tcp_port)) result = sctp_bind_addrs(con->sock, dlm_config.ci_tcp_port);
goto create_delsock; if (result < 0)
goto out;
result = sock->ops->listen(sock, 5); result = sock->ops->listen(sock, 5);
if (result < 0) { if (result < 0) {
log_print("Can't set socket listening"); log_print("Can't set socket listening");
goto create_delsock; goto out;
} }
return 0; return 0;
create_delsock:
sock_release(sock);
con->sock = NULL;
out: out:
return result; return result;
} }
static int tcp_listen_for_all(void) static int tcp_listen_for_all(void)
{ {
struct socket *sock = NULL;
struct connection *con = nodeid2con(0, GFP_NOFS);
int result = -EINVAL;
if (!con)
return -ENOMEM;
/* We don't support multi-homed hosts */ /* We don't support multi-homed hosts */
if (dlm_local_addr[1] != NULL) { if (dlm_local_addr[1] != NULL) {
log_print("TCP protocol can't handle multi-homed hosts, " log_print("TCP protocol can't handle multi-homed hosts, "
@ -1312,16 +1296,7 @@ static int tcp_listen_for_all(void)
log_print("Using TCP for communications"); log_print("Using TCP for communications");
sock = tcp_create_listen_sock(con, dlm_local_addr[0]); return tcp_create_listen_sock(&listen_con, dlm_local_addr[0]);
if (sock) {
add_sock(sock, con);
result = 0;
}
else {
result = -EADDRINUSE;
}
return result;
} }
@ -1541,10 +1516,15 @@ static void process_recv_sockets(struct work_struct *work)
clear_bit(CF_READ_PENDING, &con->flags); clear_bit(CF_READ_PENDING, &con->flags);
do { do {
err = con->rx_action(con); err = receive_from_sock(con);
} while (!err); } while (!err);
} }
static void process_listen_recv_socket(struct work_struct *work)
{
accept_from_sock(&listen_con);
}
/* Send workqueue function */ /* Send workqueue function */
static void process_send_sockets(struct work_struct *work) static void process_send_sockets(struct work_struct *work)
{ {
@ -1678,6 +1658,8 @@ void dlm_lowcomms_stop(void)
if (send_workqueue) if (send_workqueue)
flush_workqueue(send_workqueue); flush_workqueue(send_workqueue);
dlm_close_sock(&listen_con.sock);
foreach_conn(shutdown_conn); foreach_conn(shutdown_conn);
work_flush(); work_flush();
foreach_conn(free_conn); foreach_conn(free_conn);
@ -1688,7 +1670,6 @@ void dlm_lowcomms_stop(void)
int dlm_lowcomms_start(void) int dlm_lowcomms_start(void)
{ {
int error = -EINVAL; int error = -EINVAL;
struct connection *con;
int i; int i;
for (i = 0; i < CONN_HASH_SIZE; i++) for (i = 0; i < CONN_HASH_SIZE; i++)
@ -1701,6 +1682,8 @@ int dlm_lowcomms_start(void)
goto fail; goto fail;
} }
INIT_WORK(&listen_con.rwork, process_listen_recv_socket);
error = work_start(); error = work_start();
if (error) if (error)
goto fail; goto fail;
@ -1711,7 +1694,7 @@ int dlm_lowcomms_start(void)
if (dlm_config.ci_protocol == 0) if (dlm_config.ci_protocol == 0)
error = tcp_listen_for_all(); error = tcp_listen_for_all();
else else
error = sctp_listen_for_all(); error = sctp_listen_for_all(&listen_con);
if (error) if (error)
goto fail_unlisten; goto fail_unlisten;
@ -1719,9 +1702,7 @@ int dlm_lowcomms_start(void)
fail_unlisten: fail_unlisten:
dlm_allow_conn = 0; dlm_allow_conn = 0;
con = nodeid2con(0,0); dlm_close_sock(&listen_con.sock);
if (con)
free_conn(con);
fail: fail:
return error; return error;
} }