From 0cb43965d42a21a7af41f88f1021b478dc102425 Mon Sep 17 00:00:00 2001 From: Sowmini Varadhan Date: Mon, 13 Jun 2016 09:44:26 -0700 Subject: [PATCH 01/17] RDS: split out connection specific state from rds_connection to rds_conn_path In preparation for multipath RDS, split the rds_connection structure into a base structure, and a per-path struct rds_conn_path. The base structure tracks information and locks common to all paths. The workqs for send/recv/shutdown etc are tracked per rds_conn_path. Thus the workq callbacks now work with rds_conn_path. This commit allows for one rds_conn_path per rds_connection, and will be extended into multiple conn_paths in subsequent commits. Signed-off-by: Sowmini Varadhan Signed-off-by: David S. Miller --- net/rds/cong.c | 3 +- net/rds/connection.c | 18 ++++-- net/rds/ib.c | 1 + net/rds/ib_cm.c | 1 + net/rds/ib_rdma.c | 1 + net/rds/ib_recv.c | 1 + net/rds/ib_send.c | 1 + net/rds/loop.c | 1 + net/rds/rdma_transport.c | 1 + net/rds/rds.h | 122 +++++++++++++++++++++++++------------- net/rds/rds_single_path.h | 30 ++++++++++ net/rds/recv.c | 1 + net/rds/send.c | 1 + net/rds/tcp.c | 1 + net/rds/tcp_connect.c | 4 +- net/rds/tcp_listen.c | 11 ++-- net/rds/tcp_recv.c | 1 + net/rds/tcp_send.c | 1 + net/rds/threads.c | 92 +++++++++++++++------------- 19 files changed, 199 insertions(+), 93 deletions(-) create mode 100644 net/rds/rds_single_path.h diff --git a/net/rds/cong.c b/net/rds/cong.c index 6641bcf7c185..8398fee7c866 100644 --- a/net/rds/cong.c +++ b/net/rds/cong.c @@ -235,7 +235,8 @@ void rds_cong_queue_updates(struct rds_cong_map *map) * therefore trigger warnings. * Defer the xmit to rds_send_worker() instead. */ - queue_delayed_work(rds_wq, &conn->c_send_w, 0); + queue_delayed_work(rds_wq, + &conn->c_path[0].cp_send_w, 0); } } diff --git a/net/rds/connection.c b/net/rds/connection.c index e3b118cae81d..6fa2074044b9 100644 --- a/net/rds/connection.c +++ b/net/rds/connection.c @@ -36,6 +36,7 @@ #include #include +#include "rds_single_path.h" #include "rds.h" #include "loop.h" @@ -155,6 +156,7 @@ static struct rds_connection *__rds_conn_create(struct net *net, conn->c_faddr = faddr; spin_lock_init(&conn->c_lock); conn->c_next_tx_seq = 1; + conn->c_path[0].cp_conn = conn; rds_conn_net_set(conn, net); init_waitqueue_head(&conn->c_waitq); @@ -197,7 +199,7 @@ static struct rds_connection *__rds_conn_create(struct net *net, atomic_set(&conn->c_state, RDS_CONN_DOWN); conn->c_send_gen = 0; - conn->c_outgoing = (is_outgoing ? 1 : 0); + conn->c_path[0].cp_outgoing = (is_outgoing ? 1 : 0); conn->c_reconnect_jiffies = 0; INIT_DELAYED_WORK(&conn->c_send_w, rds_send_worker); INIT_DELAYED_WORK(&conn->c_recv_w, rds_recv_worker); @@ -320,8 +322,8 @@ void rds_conn_shutdown(struct rds_connection *conn) if (!hlist_unhashed(&conn->c_hash_node)) { rcu_read_unlock(); if (conn->c_trans->t_type != RDS_TRANS_TCP || - conn->c_outgoing == 1) - rds_queue_reconnect(conn); + conn->c_path[0].cp_outgoing == 1) + rds_queue_reconnect(&conn->c_path[0]); } else { rcu_read_unlock(); } @@ -553,10 +555,16 @@ void rds_conn_exit(void) /* * Force a disconnect */ +void rds_conn_path_drop(struct rds_conn_path *cp) +{ + atomic_set(&cp->cp_state, RDS_CONN_ERROR); + queue_work(rds_wq, &cp->cp_down_w); +} +EXPORT_SYMBOL_GPL(rds_conn_path_drop); + void rds_conn_drop(struct rds_connection *conn) { - atomic_set(&conn->c_state, RDS_CONN_ERROR); - queue_work(rds_wq, &conn->c_down_w); + rds_conn_path_drop(&conn->c_path[0]); } EXPORT_SYMBOL_GPL(rds_conn_drop); diff --git a/net/rds/ib.c b/net/rds/ib.c index b5342fddaf98..44946a681a8c 100644 --- a/net/rds/ib.c +++ b/net/rds/ib.c @@ -40,6 +40,7 @@ #include #include +#include "rds_single_path.h" #include "rds.h" #include "ib.h" #include "ib_mr.h" diff --git a/net/rds/ib_cm.c b/net/rds/ib_cm.c index 310cabce2311..4de5a35f5c40 100644 --- a/net/rds/ib_cm.c +++ b/net/rds/ib_cm.c @@ -36,6 +36,7 @@ #include #include +#include "rds_single_path.h" #include "rds.h" #include "ib.h" diff --git a/net/rds/ib_rdma.c b/net/rds/ib_rdma.c index a0f21b65a83c..977f69886c00 100644 --- a/net/rds/ib_rdma.c +++ b/net/rds/ib_rdma.c @@ -35,6 +35,7 @@ #include #include +#include "rds_single_path.h" #include "ib_mr.h" struct workqueue_struct *rds_ib_mr_wq; diff --git a/net/rds/ib_recv.c b/net/rds/ib_recv.c index abc8cc805e8d..4ea8cb17cc7a 100644 --- a/net/rds/ib_recv.c +++ b/net/rds/ib_recv.c @@ -36,6 +36,7 @@ #include #include +#include "rds_single_path.h" #include "rds.h" #include "ib.h" diff --git a/net/rds/ib_send.c b/net/rds/ib_send.c index f27d2c82b036..6e4110aa5135 100644 --- a/net/rds/ib_send.c +++ b/net/rds/ib_send.c @@ -36,6 +36,7 @@ #include #include +#include "rds_single_path.h" #include "rds.h" #include "ib.h" diff --git a/net/rds/loop.c b/net/rds/loop.c index 6b12b68541ae..268f07faaa1a 100644 --- a/net/rds/loop.c +++ b/net/rds/loop.c @@ -34,6 +34,7 @@ #include #include +#include "rds_single_path.h" #include "rds.h" #include "loop.h" diff --git a/net/rds/rdma_transport.c b/net/rds/rdma_transport.c index 7220bebcf558..345f09059e9f 100644 --- a/net/rds/rdma_transport.c +++ b/net/rds/rdma_transport.c @@ -33,6 +33,7 @@ #include #include +#include "rds_single_path.h" #include "rdma_transport.h" #include "ib.h" diff --git a/net/rds/rds.h b/net/rds/rds.h index 387df5f32e49..ca31a07f70f5 100644 --- a/net/rds/rds.h +++ b/net/rds/rds.h @@ -84,56 +84,69 @@ enum { #define RDS_IN_XMIT 2 #define RDS_RECV_REFILL 3 +/* Max number of multipaths per RDS connection. Must be a power of 2 */ +#define RDS_MPATH_WORKERS 1 + +/* Per mpath connection state */ +struct rds_conn_path { + struct rds_connection *cp_conn; + struct rds_message *cp_xmit_rm; + unsigned long cp_xmit_sg; + unsigned int cp_xmit_hdr_off; + unsigned int cp_xmit_data_off; + unsigned int cp_xmit_atomic_sent; + unsigned int cp_xmit_rdma_sent; + unsigned int cp_xmit_data_sent; + + spinlock_t cp_lock; /* protect msg queues */ + u64 cp_next_tx_seq; + struct list_head cp_send_queue; + struct list_head cp_retrans; + + u64 cp_next_rx_seq; + + void *cp_transport_data; + + atomic_t cp_state; + unsigned long cp_send_gen; + unsigned long cp_flags; + unsigned long cp_reconnect_jiffies; + struct delayed_work cp_send_w; + struct delayed_work cp_recv_w; + struct delayed_work cp_conn_w; + struct work_struct cp_down_w; + struct mutex cp_cm_lock; /* protect cp_state & cm */ + wait_queue_head_t cp_waitq; + + unsigned int cp_unacked_packets; + unsigned int cp_unacked_bytes; + unsigned int cp_outgoing:1, + cp_pad_to_32:31; + unsigned int cp_index; +}; + +/* One rds_connection per RDS address pair */ struct rds_connection { struct hlist_node c_hash_node; __be32 c_laddr; __be32 c_faddr; unsigned int c_loopback:1, - c_outgoing:1, - c_pad_to_32:30; + c_pad_to_32:31; + int c_npaths; struct rds_connection *c_passive; + struct rds_transport *c_trans; struct rds_cong_map *c_lcong; struct rds_cong_map *c_fcong; - struct rds_message *c_xmit_rm; - unsigned long c_xmit_sg; - unsigned int c_xmit_hdr_off; - unsigned int c_xmit_data_off; - unsigned int c_xmit_atomic_sent; - unsigned int c_xmit_rdma_sent; - unsigned int c_xmit_data_sent; - - spinlock_t c_lock; /* protect msg queues */ - u64 c_next_tx_seq; - struct list_head c_send_queue; - struct list_head c_retrans; - - u64 c_next_rx_seq; - - struct rds_transport *c_trans; - void *c_transport_data; - - atomic_t c_state; - unsigned long c_send_gen; - unsigned long c_flags; - unsigned long c_reconnect_jiffies; - struct delayed_work c_send_w; - struct delayed_work c_recv_w; - struct delayed_work c_conn_w; - struct work_struct c_down_w; - struct mutex c_cm_lock; /* protect conn state & cm */ - wait_queue_head_t c_waitq; + /* Protocol version */ + unsigned int c_version; + possible_net_t c_net; struct list_head c_map_item; unsigned long c_map_queued; - unsigned int c_unacked_packets; - unsigned int c_unacked_bytes; - - /* Protocol version */ - unsigned int c_version; - possible_net_t c_net; + struct rds_conn_path c_path[RDS_MPATH_WORKERS]; }; static inline @@ -639,6 +652,7 @@ struct rds_connection *rds_conn_create_outgoing(struct net *net, void rds_conn_shutdown(struct rds_connection *conn); void rds_conn_destroy(struct rds_connection *conn); void rds_conn_drop(struct rds_connection *conn); +void rds_conn_path_drop(struct rds_conn_path *cpath); void rds_conn_connect_if_down(struct rds_connection *conn); void rds_for_each_conn_info(struct socket *sock, unsigned int len, struct rds_info_iterator *iter, @@ -650,28 +664,52 @@ void __rds_conn_error(struct rds_connection *conn, const char *, ...); #define rds_conn_error(conn, fmt...) \ __rds_conn_error(conn, KERN_WARNING "RDS: " fmt) +static inline int +rds_conn_path_transition(struct rds_conn_path *cp, int old, int new) +{ + return atomic_cmpxchg(&cp->cp_state, old, new) == old; +} + static inline int rds_conn_transition(struct rds_connection *conn, int old, int new) { - return atomic_cmpxchg(&conn->c_state, old, new) == old; + return rds_conn_path_transition(&conn->c_path[0], old, new); +} + +static inline int +rds_conn_path_state(struct rds_conn_path *cp) +{ + return atomic_read(&cp->cp_state); } static inline int rds_conn_state(struct rds_connection *conn) { - return atomic_read(&conn->c_state); + return rds_conn_path_state(&conn->c_path[0]); +} + +static inline int +rds_conn_path_up(struct rds_conn_path *cp) +{ + return atomic_read(&cp->cp_state) == RDS_CONN_UP; } static inline int rds_conn_up(struct rds_connection *conn) { - return atomic_read(&conn->c_state) == RDS_CONN_UP; + return rds_conn_path_up(&conn->c_path[0]); +} + +static inline int +rds_conn_path_connecting(struct rds_conn_path *cp) +{ + return atomic_read(&cp->cp_state) == RDS_CONN_CONNECTING; } static inline int rds_conn_connecting(struct rds_connection *conn) { - return atomic_read(&conn->c_state) == RDS_CONN_CONNECTING; + return rds_conn_path_connecting(&conn->c_path[0]); } /* message.c */ @@ -809,12 +847,12 @@ extern unsigned int rds_sysctl_trace_level; int rds_threads_init(void); void rds_threads_exit(void); extern struct workqueue_struct *rds_wq; -void rds_queue_reconnect(struct rds_connection *conn); +void rds_queue_reconnect(struct rds_conn_path *cp); void rds_connect_worker(struct work_struct *); void rds_shutdown_worker(struct work_struct *); void rds_send_worker(struct work_struct *); void rds_recv_worker(struct work_struct *); -void rds_connect_path_complete(struct rds_connection *conn, int curr); +void rds_connect_path_complete(struct rds_conn_path *conn, int curr); void rds_connect_complete(struct rds_connection *conn); /* transport.c */ diff --git a/net/rds/rds_single_path.h b/net/rds/rds_single_path.h new file mode 100644 index 000000000000..e1241af7c1ad --- /dev/null +++ b/net/rds/rds_single_path.h @@ -0,0 +1,30 @@ +#ifndef _RDS_RDS_SINGLE_H +#define _RDS_RDS_SINGLE_H + +#define c_xmit_rm c_path[0].cp_xmit_rm +#define c_xmit_sg c_path[0].cp_xmit_sg +#define c_xmit_hdr_off c_path[0].cp_xmit_hdr_off +#define c_xmit_data_off c_path[0].cp_xmit_data_off +#define c_xmit_atomic_sent c_path[0].cp_xmit_atomic_sent +#define c_xmit_rdma_sent c_path[0].cp_xmit_rdma_sent +#define c_xmit_data_sent c_path[0].cp_xmit_data_sent +#define c_lock c_path[0].cp_lock +#define c_next_tx_seq c_path[0].cp_next_tx_seq +#define c_send_queue c_path[0].cp_send_queue +#define c_retrans c_path[0].cp_retrans +#define c_next_rx_seq c_path[0].cp_next_rx_seq +#define c_transport_data c_path[0].cp_transport_data +#define c_state c_path[0].cp_state +#define c_send_gen c_path[0].cp_send_gen +#define c_flags c_path[0].cp_flags +#define c_reconnect_jiffies c_path[0].cp_reconnect_jiffies +#define c_send_w c_path[0].cp_send_w +#define c_recv_w c_path[0].cp_recv_w +#define c_conn_w c_path[0].cp_conn_w +#define c_down_w c_path[0].cp_down_w +#define c_cm_lock c_path[0].cp_cm_lock +#define c_waitq c_path[0].cp_waitq +#define c_unacked_packets c_path[0].cp_unacked_packets +#define c_unacked_bytes c_path[0].cp_unacked_bytes + +#endif /* _RDS_RDS_SINGLE_H */ diff --git a/net/rds/recv.c b/net/rds/recv.c index 8413f6c99e13..78b5c430324d 100644 --- a/net/rds/recv.c +++ b/net/rds/recv.c @@ -38,6 +38,7 @@ #include #include +#include "rds_single_path.h" #include "rds.h" void rds_inc_init(struct rds_incoming *inc, struct rds_connection *conn, diff --git a/net/rds/send.c b/net/rds/send.c index b1962f8e30f7..a3b3b35ad57a 100644 --- a/net/rds/send.c +++ b/net/rds/send.c @@ -40,6 +40,7 @@ #include #include +#include "rds_single_path.h" #include "rds.h" /* When transmitting messages in rds_send_xmit, we need to emerge from diff --git a/net/rds/tcp.c b/net/rds/tcp.c index 74ee126a6fe6..4bc1c153e93a 100644 --- a/net/rds/tcp.c +++ b/net/rds/tcp.c @@ -38,6 +38,7 @@ #include #include +#include "rds_single_path.h" #include "rds.h" #include "tcp.h" diff --git a/net/rds/tcp_connect.c b/net/rds/tcp_connect.c index fba13d0305fb..ba9ec67f4e41 100644 --- a/net/rds/tcp_connect.c +++ b/net/rds/tcp_connect.c @@ -34,6 +34,7 @@ #include #include +#include "rds_single_path.h" #include "rds.h" #include "tcp.h" @@ -60,7 +61,8 @@ void rds_tcp_state_change(struct sock *sk) case TCP_SYN_RECV: break; case TCP_ESTABLISHED: - rds_connect_path_complete(conn, RDS_CONN_CONNECTING); + rds_connect_path_complete(&conn->c_path[0], + RDS_CONN_CONNECTING); break; case TCP_CLOSE_WAIT: case TCP_CLOSE: diff --git a/net/rds/tcp_listen.c b/net/rds/tcp_listen.c index 686b1d03a558..22d9bb15f731 100644 --- a/net/rds/tcp_listen.c +++ b/net/rds/tcp_listen.c @@ -35,6 +35,7 @@ #include #include +#include "rds_single_path.h" #include "rds.h" #include "tcp.h" @@ -132,17 +133,19 @@ int rds_tcp_accept_one(struct socket *sock) * c_transport_data. */ if (ntohl(inet->inet_saddr) < ntohl(inet->inet_daddr) || - !conn->c_outgoing) { + !conn->c_path[0].cp_outgoing) { goto rst_nsk; } else { rds_tcp_reset_callbacks(new_sock, conn); - conn->c_outgoing = 0; + conn->c_path[0].cp_outgoing = 0; /* rds_connect_path_complete() marks RDS_CONN_UP */ - rds_connect_path_complete(conn, RDS_CONN_DISCONNECTING); + rds_connect_path_complete(&conn->c_path[0], + RDS_CONN_DISCONNECTING); } } else { rds_tcp_set_callbacks(new_sock, conn); - rds_connect_path_complete(conn, RDS_CONN_CONNECTING); + rds_connect_path_complete(&conn->c_path[0], + RDS_CONN_CONNECTING); } new_sock = NULL; ret = 0; diff --git a/net/rds/tcp_recv.c b/net/rds/tcp_recv.c index c3196f9d070a..3f8fb38996c7 100644 --- a/net/rds/tcp_recv.c +++ b/net/rds/tcp_recv.c @@ -34,6 +34,7 @@ #include #include +#include "rds_single_path.h" #include "rds.h" #include "tcp.h" diff --git a/net/rds/tcp_send.c b/net/rds/tcp_send.c index 22d0f2020a79..2b3414f3c45c 100644 --- a/net/rds/tcp_send.c +++ b/net/rds/tcp_send.c @@ -34,6 +34,7 @@ #include #include +#include "rds_single_path.h" #include "rds.h" #include "tcp.h" diff --git a/net/rds/threads.c b/net/rds/threads.c index 4a323045719b..6d0979b8dc63 100644 --- a/net/rds/threads.c +++ b/net/rds/threads.c @@ -71,30 +71,30 @@ struct workqueue_struct *rds_wq; EXPORT_SYMBOL_GPL(rds_wq); -void rds_connect_path_complete(struct rds_connection *conn, int curr) +void rds_connect_path_complete(struct rds_conn_path *cp, int curr) { - if (!rds_conn_transition(conn, curr, RDS_CONN_UP)) { + if (!rds_conn_path_transition(cp, curr, RDS_CONN_UP)) { printk(KERN_WARNING "%s: Cannot transition to state UP, " "current state is %d\n", __func__, - atomic_read(&conn->c_state)); - rds_conn_drop(conn); + atomic_read(&cp->cp_state)); + rds_conn_path_drop(cp); return; } rdsdebug("conn %p for %pI4 to %pI4 complete\n", - conn, &conn->c_laddr, &conn->c_faddr); + cp->cp_conn, &cp->cp_conn->c_laddr, &cp->cp_conn->c_faddr); - conn->c_reconnect_jiffies = 0; - set_bit(0, &conn->c_map_queued); - queue_delayed_work(rds_wq, &conn->c_send_w, 0); - queue_delayed_work(rds_wq, &conn->c_recv_w, 0); + cp->cp_reconnect_jiffies = 0; + set_bit(0, &cp->cp_conn->c_map_queued); + queue_delayed_work(rds_wq, &cp->cp_send_w, 0); + queue_delayed_work(rds_wq, &cp->cp_recv_w, 0); } EXPORT_SYMBOL_GPL(rds_connect_path_complete); void rds_connect_complete(struct rds_connection *conn) { - rds_connect_path_complete(conn, RDS_CONN_CONNECTING); + rds_connect_path_complete(&conn->c_path[0], RDS_CONN_CONNECTING); } EXPORT_SYMBOL_GPL(rds_connect_complete); @@ -116,46 +116,52 @@ EXPORT_SYMBOL_GPL(rds_connect_complete); * We should *always* start with a random backoff; otherwise a broken connection * will always take several iterations to be re-established. */ -void rds_queue_reconnect(struct rds_connection *conn) +void rds_queue_reconnect(struct rds_conn_path *cp) { unsigned long rand; + struct rds_connection *conn = cp->cp_conn; rdsdebug("conn %p for %pI4 to %pI4 reconnect jiffies %lu\n", conn, &conn->c_laddr, &conn->c_faddr, - conn->c_reconnect_jiffies); + cp->cp_reconnect_jiffies); - set_bit(RDS_RECONNECT_PENDING, &conn->c_flags); - if (conn->c_reconnect_jiffies == 0) { - conn->c_reconnect_jiffies = rds_sysctl_reconnect_min_jiffies; - queue_delayed_work(rds_wq, &conn->c_conn_w, 0); + set_bit(RDS_RECONNECT_PENDING, &cp->cp_flags); + if (cp->cp_reconnect_jiffies == 0) { + cp->cp_reconnect_jiffies = rds_sysctl_reconnect_min_jiffies; + queue_delayed_work(rds_wq, &cp->cp_conn_w, 0); return; } get_random_bytes(&rand, sizeof(rand)); rdsdebug("%lu delay %lu ceil conn %p for %pI4 -> %pI4\n", - rand % conn->c_reconnect_jiffies, conn->c_reconnect_jiffies, + rand % cp->cp_reconnect_jiffies, cp->cp_reconnect_jiffies, conn, &conn->c_laddr, &conn->c_faddr); - queue_delayed_work(rds_wq, &conn->c_conn_w, - rand % conn->c_reconnect_jiffies); + queue_delayed_work(rds_wq, &cp->cp_conn_w, + rand % cp->cp_reconnect_jiffies); - conn->c_reconnect_jiffies = min(conn->c_reconnect_jiffies * 2, + cp->cp_reconnect_jiffies = min(cp->cp_reconnect_jiffies * 2, rds_sysctl_reconnect_max_jiffies); } void rds_connect_worker(struct work_struct *work) { - struct rds_connection *conn = container_of(work, struct rds_connection, c_conn_w.work); + struct rds_conn_path *cp = container_of(work, + struct rds_conn_path, + cp_conn_w.work); + struct rds_connection *conn = cp->cp_conn; int ret; - clear_bit(RDS_RECONNECT_PENDING, &conn->c_flags); - if (rds_conn_transition(conn, RDS_CONN_DOWN, RDS_CONN_CONNECTING)) { + clear_bit(RDS_RECONNECT_PENDING, &cp->cp_flags); + if (rds_conn_path_transition(cp, RDS_CONN_DOWN, RDS_CONN_CONNECTING)) { ret = conn->c_trans->conn_connect(conn); rdsdebug("conn %p for %pI4 to %pI4 dispatched, ret %d\n", conn, &conn->c_laddr, &conn->c_faddr, ret); if (ret) { - if (rds_conn_transition(conn, RDS_CONN_CONNECTING, RDS_CONN_DOWN)) - rds_queue_reconnect(conn); + if (rds_conn_path_transition(cp, + RDS_CONN_CONNECTING, + RDS_CONN_DOWN)) + rds_queue_reconnect(cp); else rds_conn_error(conn, "RDS: connect failed\n"); } @@ -164,22 +170,24 @@ void rds_connect_worker(struct work_struct *work) void rds_send_worker(struct work_struct *work) { - struct rds_connection *conn = container_of(work, struct rds_connection, c_send_w.work); + struct rds_conn_path *cp = container_of(work, + struct rds_conn_path, + cp_send_w.work); int ret; - if (rds_conn_state(conn) == RDS_CONN_UP) { - clear_bit(RDS_LL_SEND_FULL, &conn->c_flags); - ret = rds_send_xmit(conn); + if (rds_conn_path_state(cp) == RDS_CONN_UP) { + clear_bit(RDS_LL_SEND_FULL, &cp->cp_flags); + ret = rds_send_xmit(cp->cp_conn); cond_resched(); - rdsdebug("conn %p ret %d\n", conn, ret); + rdsdebug("conn %p ret %d\n", cp->cp_conn, ret); switch (ret) { case -EAGAIN: rds_stats_inc(s_send_immediate_retry); - queue_delayed_work(rds_wq, &conn->c_send_w, 0); + queue_delayed_work(rds_wq, &cp->cp_send_w, 0); break; case -ENOMEM: rds_stats_inc(s_send_delayed_retry); - queue_delayed_work(rds_wq, &conn->c_send_w, 2); + queue_delayed_work(rds_wq, &cp->cp_send_w, 2); default: break; } @@ -188,20 +196,22 @@ void rds_send_worker(struct work_struct *work) void rds_recv_worker(struct work_struct *work) { - struct rds_connection *conn = container_of(work, struct rds_connection, c_recv_w.work); + struct rds_conn_path *cp = container_of(work, + struct rds_conn_path, + cp_recv_w.work); int ret; - if (rds_conn_state(conn) == RDS_CONN_UP) { - ret = conn->c_trans->recv(conn); - rdsdebug("conn %p ret %d\n", conn, ret); + if (rds_conn_path_state(cp) == RDS_CONN_UP) { + ret = cp->cp_conn->c_trans->recv(cp->cp_conn); + rdsdebug("conn %p ret %d\n", cp->cp_conn, ret); switch (ret) { case -EAGAIN: rds_stats_inc(s_recv_immediate_retry); - queue_delayed_work(rds_wq, &conn->c_recv_w, 0); + queue_delayed_work(rds_wq, &cp->cp_recv_w, 0); break; case -ENOMEM: rds_stats_inc(s_recv_delayed_retry); - queue_delayed_work(rds_wq, &conn->c_recv_w, 2); + queue_delayed_work(rds_wq, &cp->cp_recv_w, 2); default: break; } @@ -210,9 +220,11 @@ void rds_recv_worker(struct work_struct *work) void rds_shutdown_worker(struct work_struct *work) { - struct rds_connection *conn = container_of(work, struct rds_connection, c_down_w); + struct rds_conn_path *cp = container_of(work, + struct rds_conn_path, + cp_down_w); - rds_conn_shutdown(conn); + rds_conn_shutdown(cp->cp_conn); } void rds_threads_exit(void) From 7e8f4413d7861efcb332ebce8d9b000a17eaa0e5 Mon Sep 17 00:00:00 2001 From: Sowmini Varadhan Date: Mon, 13 Jun 2016 09:44:27 -0700 Subject: [PATCH 02/17] RDS: add t_mp_capable bit to be set by MP capable transports The t_mp_capable bit will be used in the core rds module to support multipathing logic when the transport supports it. Signed-off-by: Sowmini Varadhan Signed-off-by: David S. Miller --- net/rds/rds.h | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/net/rds/rds.h b/net/rds/rds.h index ca31a07f70f5..28f001cbc893 100644 --- a/net/rds/rds.h +++ b/net/rds/rds.h @@ -446,7 +446,8 @@ struct rds_transport { char t_name[TRANSNAMSIZ]; struct list_head t_item; struct module *t_owner; - unsigned int t_prefer_loopback:1; + unsigned int t_prefer_loopback:1, + t_mp_capable:1; unsigned int t_type; int (*laddr_check)(struct net *net, __be32 addr); @@ -673,6 +674,7 @@ rds_conn_path_transition(struct rds_conn_path *cp, int old, int new) static inline int rds_conn_transition(struct rds_connection *conn, int old, int new) { + WARN_ON(conn->c_trans->t_mp_capable); return rds_conn_path_transition(&conn->c_path[0], old, new); } @@ -685,6 +687,7 @@ rds_conn_path_state(struct rds_conn_path *cp) static inline int rds_conn_state(struct rds_connection *conn) { + WARN_ON(conn->c_trans->t_mp_capable); return rds_conn_path_state(&conn->c_path[0]); } @@ -697,6 +700,7 @@ rds_conn_path_up(struct rds_conn_path *cp) static inline int rds_conn_up(struct rds_connection *conn) { + WARN_ON(conn->c_trans->t_mp_capable); return rds_conn_path_up(&conn->c_path[0]); } @@ -709,6 +713,7 @@ rds_conn_path_connecting(struct rds_conn_path *cp) static inline int rds_conn_connecting(struct rds_connection *conn) { + WARN_ON(conn->c_trans->t_mp_capable); return rds_conn_path_connecting(&conn->c_path[0]); } From ef9e62c2e5087cb9bc713e3d9776336e1bb40df1 Mon Sep 17 00:00:00 2001 From: Sowmini Varadhan Date: Mon, 13 Jun 2016 09:44:28 -0700 Subject: [PATCH 03/17] RDS: recv path gets the conn_path from rds_incoming for MP capable transports Transports that are t_mp_capable should set the rds_conn_path on which the datagram was recived in the ->i_conn_path field of struct rds_incoming. Signed-off-by: Sowmini Varadhan Signed-off-by: David S. Miller --- net/rds/rds.h | 1 + net/rds/recv.c | 12 ++++++++---- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/net/rds/rds.h b/net/rds/rds.h index 28f001cbc893..7c85b2d792b6 100644 --- a/net/rds/rds.h +++ b/net/rds/rds.h @@ -231,6 +231,7 @@ struct rds_incoming { atomic_t i_refcount; struct list_head i_item; struct rds_connection *i_conn; + struct rds_conn_path *i_conn_path; struct rds_header i_hdr; unsigned long i_rx_jiffies; __be32 i_saddr; diff --git a/net/rds/recv.c b/net/rds/recv.c index 78b5c430324d..e36652cfbd35 100644 --- a/net/rds/recv.c +++ b/net/rds/recv.c @@ -38,7 +38,6 @@ #include #include -#include "rds_single_path.h" #include "rds.h" void rds_inc_init(struct rds_incoming *inc, struct rds_connection *conn, @@ -165,13 +164,18 @@ void rds_recv_incoming(struct rds_connection *conn, __be32 saddr, __be32 daddr, struct rds_sock *rs = NULL; struct sock *sk; unsigned long flags; + struct rds_conn_path *cp; inc->i_conn = conn; inc->i_rx_jiffies = jiffies; + if (conn->c_trans->t_mp_capable) + cp = inc->i_conn_path; + else + cp = &conn->c_path[0]; rdsdebug("conn %p next %llu inc %p seq %llu len %u sport %u dport %u " "flags 0x%x rx_jiffies %lu\n", conn, - (unsigned long long)conn->c_next_rx_seq, + (unsigned long long)cp->cp_next_rx_seq, inc, (unsigned long long)be64_to_cpu(inc->i_hdr.h_sequence), be32_to_cpu(inc->i_hdr.h_len), @@ -200,12 +204,12 @@ void rds_recv_incoming(struct rds_connection *conn, __be32 saddr, __be32 daddr, * XXX we could spend more on the wire to get more robust failure * detection, arguably worth it to avoid data corruption. */ - if (be64_to_cpu(inc->i_hdr.h_sequence) < conn->c_next_rx_seq && + if (be64_to_cpu(inc->i_hdr.h_sequence) < cp->cp_next_rx_seq && (inc->i_hdr.h_flags & RDS_FLAG_RETRANSMITTED)) { rds_stats_inc(s_recv_drop_old_seq); goto out; } - conn->c_next_rx_seq = be64_to_cpu(inc->i_hdr.h_sequence) + 1; + cp->cp_next_rx_seq = be64_to_cpu(inc->i_hdr.h_sequence) + 1; if (rds_sysctl_ping_enable && inc->i_hdr.h_dport == 0) { rds_stats_inc(s_recv_ping); From 5e833e025d9dc3f61c04e74936a14419efb6a032 Mon Sep 17 00:00:00 2001 From: Sowmini Varadhan Date: Mon, 13 Jun 2016 09:44:29 -0700 Subject: [PATCH 04/17] RDS: rds_inc_path_init() helper function for MP capable transports t_mp_capable transports can use rds_inc_path_init to initialize all fields in struct rds_incoming, including the i_conn_path. Signed-off-by: Sowmini Varadhan Signed-off-by: David S. Miller --- net/rds/rds.h | 2 ++ net/rds/recv.c | 14 ++++++++++++++ 2 files changed, 16 insertions(+) diff --git a/net/rds/rds.h b/net/rds/rds.h index 7c85b2d792b6..c3b14ccd7037 100644 --- a/net/rds/rds.h +++ b/net/rds/rds.h @@ -764,6 +764,8 @@ void rds_page_exit(void); /* recv.c */ void rds_inc_init(struct rds_incoming *inc, struct rds_connection *conn, __be32 saddr); +void rds_inc_path_init(struct rds_incoming *inc, struct rds_conn_path *conn, + __be32 saddr); void rds_inc_put(struct rds_incoming *inc); void rds_recv_incoming(struct rds_connection *conn, __be32 saddr, __be32 daddr, struct rds_incoming *inc, gfp_t gfp); diff --git a/net/rds/recv.c b/net/rds/recv.c index e36652cfbd35..6d7bd63121fc 100644 --- a/net/rds/recv.c +++ b/net/rds/recv.c @@ -53,6 +53,20 @@ void rds_inc_init(struct rds_incoming *inc, struct rds_connection *conn, } EXPORT_SYMBOL_GPL(rds_inc_init); +void rds_inc_path_init(struct rds_incoming *inc, struct rds_conn_path *cp, + __be32 saddr) +{ + atomic_set(&inc->i_refcount, 1); + INIT_LIST_HEAD(&inc->i_item); + inc->i_conn = cp->cp_conn; + inc->i_conn_path = cp; + inc->i_saddr = saddr; + inc->i_rdma_cookie = 0; + inc->i_rx_tstamp.tv_sec = 0; + inc->i_rx_tstamp.tv_usec = 0; +} +EXPORT_SYMBOL_GPL(rds_inc_path_init); + static void rds_inc_addref(struct rds_incoming *inc) { rdsdebug("addref inc %p ref %d\n", inc, atomic_read(&inc->i_refcount)); From 4e9b551c14560399776c05f4234650c6d3729458 Mon Sep 17 00:00:00 2001 From: Sowmini Varadhan Date: Mon, 13 Jun 2016 09:44:30 -0700 Subject: [PATCH 05/17] RDS: Add rds_send_path_reset() rds_send_path_reset() is the path specific version of rds_send_reset() intended for MP capable callers. Signed-off-by: Sowmini Varadhan Signed-off-by: David S. Miller --- net/rds/send.c | 39 ++++++++++++++++++++++----------------- 1 file changed, 22 insertions(+), 17 deletions(-) diff --git a/net/rds/send.c b/net/rds/send.c index a3b3b35ad57a..bfb3e0530213 100644 --- a/net/rds/send.c +++ b/net/rds/send.c @@ -63,14 +63,14 @@ static void rds_send_remove_from_sock(struct list_head *messages, int status); * Reset the send state. Callers must ensure that this doesn't race with * rds_send_xmit(). */ -void rds_send_reset(struct rds_connection *conn) +static void rds_send_path_reset(struct rds_conn_path *cp) { struct rds_message *rm, *tmp; unsigned long flags; - if (conn->c_xmit_rm) { - rm = conn->c_xmit_rm; - conn->c_xmit_rm = NULL; + if (cp->cp_xmit_rm) { + rm = cp->cp_xmit_rm; + cp->cp_xmit_rm = NULL; /* Tell the user the RDMA op is no longer mapped by the * transport. This isn't entirely true (it's flushed out * independently) but as the connection is down, there's @@ -79,26 +79,31 @@ void rds_send_reset(struct rds_connection *conn) rds_message_put(rm); } - conn->c_xmit_sg = 0; - conn->c_xmit_hdr_off = 0; - conn->c_xmit_data_off = 0; - conn->c_xmit_atomic_sent = 0; - conn->c_xmit_rdma_sent = 0; - conn->c_xmit_data_sent = 0; + cp->cp_xmit_sg = 0; + cp->cp_xmit_hdr_off = 0; + cp->cp_xmit_data_off = 0; + cp->cp_xmit_atomic_sent = 0; + cp->cp_xmit_rdma_sent = 0; + cp->cp_xmit_data_sent = 0; - conn->c_map_queued = 0; + cp->cp_conn->c_map_queued = 0; - conn->c_unacked_packets = rds_sysctl_max_unacked_packets; - conn->c_unacked_bytes = rds_sysctl_max_unacked_bytes; + cp->cp_unacked_packets = rds_sysctl_max_unacked_packets; + cp->cp_unacked_bytes = rds_sysctl_max_unacked_bytes; /* Mark messages as retransmissions, and move them to the send q */ - spin_lock_irqsave(&conn->c_lock, flags); - list_for_each_entry_safe(rm, tmp, &conn->c_retrans, m_conn_item) { + spin_lock_irqsave(&cp->cp_lock, flags); + list_for_each_entry_safe(rm, tmp, &cp->cp_retrans, m_conn_item) { set_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags); set_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags); } - list_splice_init(&conn->c_retrans, &conn->c_send_queue); - spin_unlock_irqrestore(&conn->c_lock, flags); + list_splice_init(&cp->cp_retrans, &cp->cp_send_queue); + spin_unlock_irqrestore(&cp->cp_lock, flags); +} + +void rds_send_reset(struct rds_connection *conn) +{ + rds_send_path_reset(&conn->c_path[0]); } EXPORT_SYMBOL_GPL(rds_send_reset); From 5c3d274c75fbcee8e1c919acf25c7feb19a31492 Mon Sep 17 00:00:00 2001 From: Sowmini Varadhan Date: Mon, 13 Jun 2016 09:44:31 -0700 Subject: [PATCH 06/17] RDS: Add rds_send_path_drop_acked() rds_send_path_drop_acked() is the path-specific version of rds_send_drop_acked() to be invoked by MP capable callers. Signed-off-by: Sowmini Varadhan Signed-off-by: David S. Miller --- net/rds/rds.h | 2 ++ net/rds/send.c | 18 +++++++++++++----- 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/net/rds/rds.h b/net/rds/rds.h index c3b14ccd7037..d94aa36cab93 100644 --- a/net/rds/rds.h +++ b/net/rds/rds.h @@ -786,6 +786,8 @@ void rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in *dest); typedef int (*is_acked_func)(struct rds_message *rm, uint64_t ack); void rds_send_drop_acked(struct rds_connection *conn, u64 ack, is_acked_func is_acked); +void rds_send_path_drop_acked(struct rds_conn_path *cp, u64 ack, + is_acked_func is_acked); int rds_send_pong(struct rds_connection *conn, __be16 dport); struct rds_message *rds_send_get_message(struct rds_connection *, struct rm_rdma_op *); diff --git a/net/rds/send.c b/net/rds/send.c index bfb3e0530213..3f6a96cb3b94 100644 --- a/net/rds/send.c +++ b/net/rds/send.c @@ -691,16 +691,16 @@ static void rds_send_remove_from_sock(struct list_head *messages, int status) * assigned the m_ack_seq yet - but that's fine as long as tcp_is_acked * checks the RDS_MSG_HAS_ACK_SEQ bit. */ -void rds_send_drop_acked(struct rds_connection *conn, u64 ack, - is_acked_func is_acked) +void rds_send_path_drop_acked(struct rds_conn_path *cp, u64 ack, + is_acked_func is_acked) { struct rds_message *rm, *tmp; unsigned long flags; LIST_HEAD(list); - spin_lock_irqsave(&conn->c_lock, flags); + spin_lock_irqsave(&cp->cp_lock, flags); - list_for_each_entry_safe(rm, tmp, &conn->c_retrans, m_conn_item) { + list_for_each_entry_safe(rm, tmp, &cp->cp_retrans, m_conn_item) { if (!rds_send_is_acked(rm, ack, is_acked)) break; @@ -712,11 +712,19 @@ void rds_send_drop_acked(struct rds_connection *conn, u64 ack, if (!list_empty(&list)) smp_mb__after_atomic(); - spin_unlock_irqrestore(&conn->c_lock, flags); + spin_unlock_irqrestore(&cp->cp_lock, flags); /* now remove the messages from the sock list as needed */ rds_send_remove_from_sock(&list, RDS_RDMA_SUCCESS); } +EXPORT_SYMBOL_GPL(rds_send_path_drop_acked); + +void rds_send_drop_acked(struct rds_connection *conn, u64 ack, + is_acked_func is_acked) +{ + WARN_ON(conn->c_trans->t_mp_capable); + rds_send_path_drop_acked(&conn->c_path[0], ack, is_acked); +} EXPORT_SYMBOL_GPL(rds_send_drop_acked); void rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in *dest) From 7d885d0fc69abe22382fae5dddd84684333ab29b Mon Sep 17 00:00:00 2001 From: Sowmini Varadhan Date: Mon, 13 Jun 2016 09:44:32 -0700 Subject: [PATCH 07/17] RDS: Remove stale function rds_send_get_message() The only caller of rds_send_get_message() was rds_iw_send_cq_comp_handler() which was removed as part of commit dcdede0406d3 ("RDS: Drop stale iWARP RDMA transport"), so remove rds_send_get_message() for the same reason. Signed-off-by: Sowmini Varadhan Signed-off-by: David S. Miller --- net/rds/rds.h | 2 -- net/rds/send.c | 36 ------------------------------------ 2 files changed, 38 deletions(-) diff --git a/net/rds/rds.h b/net/rds/rds.h index d94aa36cab93..2cffd37a550f 100644 --- a/net/rds/rds.h +++ b/net/rds/rds.h @@ -789,8 +789,6 @@ void rds_send_drop_acked(struct rds_connection *conn, u64 ack, void rds_send_path_drop_acked(struct rds_conn_path *cp, u64 ack, is_acked_func is_acked); int rds_send_pong(struct rds_connection *conn, __be16 dport); -struct rds_message *rds_send_get_message(struct rds_connection *, - struct rm_rdma_op *); /* rdma.c */ void rds_rdma_unuse(struct rds_sock *rs, u32 r_key, int force); diff --git a/net/rds/send.c b/net/rds/send.c index 3f6a96cb3b94..3fb280b75160 100644 --- a/net/rds/send.c +++ b/net/rds/send.c @@ -565,42 +565,6 @@ __rds_send_complete(struct rds_sock *rs, struct rds_message *rm, int status) /* No need to wake the app - caller does this */ } -/* - * This is called from the IB send completion when we detect - * a RDMA operation that failed with remote access error. - * So speed is not an issue here. - */ -struct rds_message *rds_send_get_message(struct rds_connection *conn, - struct rm_rdma_op *op) -{ - struct rds_message *rm, *tmp, *found = NULL; - unsigned long flags; - - spin_lock_irqsave(&conn->c_lock, flags); - - list_for_each_entry_safe(rm, tmp, &conn->c_retrans, m_conn_item) { - if (&rm->rdma == op) { - atomic_inc(&rm->m_refcount); - found = rm; - goto out; - } - } - - list_for_each_entry_safe(rm, tmp, &conn->c_send_queue, m_conn_item) { - if (&rm->rdma == op) { - atomic_inc(&rm->m_refcount); - found = rm; - break; - } - } - -out: - spin_unlock_irqrestore(&conn->c_lock, flags); - - return found; -} -EXPORT_SYMBOL_GPL(rds_send_get_message); - /* * This removes messages from the socket's list if they're on it. The list * argument must be private to the caller, we must be able to modify it From 780a6d9e16d1827eb97c2497d7814fe34d280c15 Mon Sep 17 00:00:00 2001 From: Sowmini Varadhan Date: Mon, 13 Jun 2016 09:44:33 -0700 Subject: [PATCH 08/17] RDS: Make rds_send_queue_rm() rds_conn_path aware Pass the rds_conn_path to rds_send_queue_rm, and use it to initialize the i_conn_path field in struct rds_incoming. This commit also makes rds_send_queue_rm() MP capable, because it now takes locks specific to the rds_conn_path passed in, instead of defaulting to the c_path[0] based defines from rds_single_path.h Signed-off-by: Sowmini Varadhan Signed-off-by: David S. Miller --- net/rds/send.c | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/net/rds/send.c b/net/rds/send.c index 3fb280b75160..076ee413d21c 100644 --- a/net/rds/send.c +++ b/net/rds/send.c @@ -787,6 +787,7 @@ void rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in *dest) * message from the flow with RDS_CANCEL_SENT_TO. */ static int rds_send_queue_rm(struct rds_sock *rs, struct rds_connection *conn, + struct rds_conn_path *cp, struct rds_message *rm, __be16 sport, __be16 dport, int *queued) { @@ -830,13 +831,14 @@ static int rds_send_queue_rm(struct rds_sock *rs, struct rds_connection *conn, trying to minimize the time we hold c_lock */ rds_message_populate_header(&rm->m_inc.i_hdr, sport, dport, 0); rm->m_inc.i_conn = conn; + rm->m_inc.i_conn_path = cp; rds_message_addref(rm); - spin_lock(&conn->c_lock); - rm->m_inc.i_hdr.h_sequence = cpu_to_be64(conn->c_next_tx_seq++); - list_add_tail(&rm->m_conn_item, &conn->c_send_queue); + spin_lock(&cp->cp_lock); + rm->m_inc.i_hdr.h_sequence = cpu_to_be64(cp->cp_next_tx_seq++); + list_add_tail(&rm->m_conn_item, &cp->cp_send_queue); set_bit(RDS_MSG_ON_CONN, &rm->m_flags); - spin_unlock(&conn->c_lock); + spin_unlock(&cp->cp_lock); rdsdebug("queued msg %p len %d, rs %p bytes %d seq %llu\n", rm, len, rs, rs->rs_snd_bytes, @@ -968,6 +970,7 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len) int queued = 0, allocated_mr = 0; int nonblock = msg->msg_flags & MSG_DONTWAIT; long timeo = sock_sndtimeo(sk, nonblock); + struct rds_conn_path *cpath; /* Mirror Linux UDP mirror of BSD error message compatibility */ /* XXX: Perhaps MSG_MORE someday */ @@ -1074,7 +1077,9 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len) goto out; } - while (!rds_send_queue_rm(rs, conn, rm, rs->rs_bound_port, + cpath = &conn->c_path[0]; + + while (!rds_send_queue_rm(rs, conn, cpath, rm, rs->rs_bound_port, dport, &queued)) { rds_stats_inc(s_send_queue_full); @@ -1084,7 +1089,7 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len) } timeo = wait_event_interruptible_timeout(*sk_sleep(sk), - rds_send_queue_rm(rs, conn, rm, + rds_send_queue_rm(rs, conn, cpath, rm, rs->rs_bound_port, dport, &queued), From 1f9ecd7eacfd9ee52a114b87292bfe885aafdb1f Mon Sep 17 00:00:00 2001 From: Sowmini Varadhan Date: Mon, 13 Jun 2016 09:44:34 -0700 Subject: [PATCH 09/17] RDS: Pass rds_conn_path to rds_send_xmit() Pass a struct rds_conn_path to rds_send_xmit so that MP capable transports can transmit packets on something other than c_path[0]. The eventual goal for MP capable transports is to hash the rds socket to a path based on the bound local address/port, and use this path as the argument to rds_send_xmit() Signed-off-by: Sowmini Varadhan Signed-off-by: David S. Miller --- net/rds/ib_cm.c | 2 +- net/rds/rds.h | 4 +- net/rds/send.c | 149 +++++++++++++++++++++++++--------------------- net/rds/threads.c | 2 +- 4 files changed, 87 insertions(+), 70 deletions(-) diff --git a/net/rds/ib_cm.c b/net/rds/ib_cm.c index 4de5a35f5c40..334287602b78 100644 --- a/net/rds/ib_cm.c +++ b/net/rds/ib_cm.c @@ -274,7 +274,7 @@ static void rds_ib_tasklet_fn_send(unsigned long data) if (rds_conn_up(conn) && (!test_bit(RDS_LL_SEND_FULL, &conn->c_flags) || test_bit(0, &conn->c_map_queued))) - rds_send_xmit(ic->conn); + rds_send_xmit(&ic->conn->c_path[0]); } static void poll_rcq(struct rds_ib_connection *ic, struct ib_cq *cq, diff --git a/net/rds/rds.h b/net/rds/rds.h index 2cffd37a550f..b6072eb05fb6 100644 --- a/net/rds/rds.h +++ b/net/rds/rds.h @@ -457,7 +457,9 @@ struct rds_transport { int (*conn_connect)(struct rds_connection *conn); void (*conn_shutdown)(struct rds_connection *conn); void (*xmit_prepare)(struct rds_connection *conn); + void (*xmit_path_prepare)(struct rds_conn_path *cp); void (*xmit_complete)(struct rds_connection *conn); + void (*xmit_path_complete)(struct rds_conn_path *cp); int (*xmit)(struct rds_connection *conn, struct rds_message *rm, unsigned int hdr_off, unsigned int sg, unsigned int off); int (*xmit_rdma)(struct rds_connection *conn, struct rm_rdma_op *op); @@ -780,7 +782,7 @@ void rds_inc_info_copy(struct rds_incoming *inc, /* send.c */ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len); void rds_send_reset(struct rds_connection *conn); -int rds_send_xmit(struct rds_connection *conn); +int rds_send_xmit(struct rds_conn_path *cp); struct sockaddr_in; void rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in *dest); typedef int (*is_acked_func)(struct rds_message *rm, uint64_t ack); diff --git a/net/rds/send.c b/net/rds/send.c index 076ee413d21c..966311d135af 100644 --- a/net/rds/send.c +++ b/net/rds/send.c @@ -107,14 +107,14 @@ void rds_send_reset(struct rds_connection *conn) } EXPORT_SYMBOL_GPL(rds_send_reset); -static int acquire_in_xmit(struct rds_connection *conn) +static int acquire_in_xmit(struct rds_conn_path *cp) { - return test_and_set_bit(RDS_IN_XMIT, &conn->c_flags) == 0; + return test_and_set_bit(RDS_IN_XMIT, &cp->cp_flags) == 0; } -static void release_in_xmit(struct rds_connection *conn) +static void release_in_xmit(struct rds_conn_path *cp) { - clear_bit(RDS_IN_XMIT, &conn->c_flags); + clear_bit(RDS_IN_XMIT, &cp->cp_flags); smp_mb__after_atomic(); /* * We don't use wait_on_bit()/wake_up_bit() because our waking is in a @@ -122,8 +122,8 @@ static void release_in_xmit(struct rds_connection *conn) * the system-wide hashed waitqueue buckets in the fast path only to * almost never find waiters. */ - if (waitqueue_active(&conn->c_waitq)) - wake_up_all(&conn->c_waitq); + if (waitqueue_active(&cp->cp_waitq)) + wake_up_all(&cp->cp_waitq); } /* @@ -140,8 +140,9 @@ static void release_in_xmit(struct rds_connection *conn) * - small message latency is higher behind queued large messages * - large message latency isn't starved by intervening small sends */ -int rds_send_xmit(struct rds_connection *conn) +int rds_send_xmit(struct rds_conn_path *cp) { + struct rds_connection *conn = cp->cp_conn; struct rds_message *rm; unsigned long flags; unsigned int tmp; @@ -161,7 +162,7 @@ int rds_send_xmit(struct rds_connection *conn) * avoids blocking the caller and trading per-connection data between * caches per message. */ - if (!acquire_in_xmit(conn)) { + if (!acquire_in_xmit(cp)) { rds_stats_inc(s_send_lock_contention); ret = -ENOMEM; goto out; @@ -175,21 +176,25 @@ int rds_send_xmit(struct rds_connection *conn) * The acquire_in_xmit() check above ensures that only one * caller can increment c_send_gen at any time. */ - conn->c_send_gen++; - send_gen = conn->c_send_gen; + cp->cp_send_gen++; + send_gen = cp->cp_send_gen; /* * rds_conn_shutdown() sets the conn state and then tests RDS_IN_XMIT, * we do the opposite to avoid races. */ - if (!rds_conn_up(conn)) { - release_in_xmit(conn); + if (!rds_conn_path_up(cp)) { + release_in_xmit(cp); ret = 0; goto out; } - if (conn->c_trans->xmit_prepare) + if (conn->c_trans->t_mp_capable) { + if (conn->c_trans->xmit_path_prepare) + conn->c_trans->xmit_path_prepare(cp); + } else if (conn->c_trans->xmit_prepare) { conn->c_trans->xmit_prepare(conn); + } /* * spin trying to push headers and data down the connection until @@ -197,7 +202,7 @@ int rds_send_xmit(struct rds_connection *conn) */ while (1) { - rm = conn->c_xmit_rm; + rm = cp->cp_xmit_rm; /* * If between sending messages, we can send a pending congestion @@ -210,14 +215,16 @@ int rds_send_xmit(struct rds_connection *conn) break; } rm->data.op_active = 1; + rm->m_inc.i_conn_path = cp; + rm->m_inc.i_conn = cp->cp_conn; - conn->c_xmit_rm = rm; + cp->cp_xmit_rm = rm; } /* * If not already working on one, grab the next message. * - * c_xmit_rm holds a ref while we're sending this message down + * cp_xmit_rm holds a ref while we're sending this message down * the connction. We can use this ref while holding the * send_sem.. rds_send_reset() is serialized with it. */ @@ -234,10 +241,10 @@ int rds_send_xmit(struct rds_connection *conn) if (batch_count >= send_batch_count) goto over_batch; - spin_lock_irqsave(&conn->c_lock, flags); + spin_lock_irqsave(&cp->cp_lock, flags); - if (!list_empty(&conn->c_send_queue)) { - rm = list_entry(conn->c_send_queue.next, + if (!list_empty(&cp->cp_send_queue)) { + rm = list_entry(cp->cp_send_queue.next, struct rds_message, m_conn_item); rds_message_addref(rm); @@ -246,10 +253,11 @@ int rds_send_xmit(struct rds_connection *conn) * Move the message from the send queue to the retransmit * list right away. */ - list_move_tail(&rm->m_conn_item, &conn->c_retrans); + list_move_tail(&rm->m_conn_item, + &cp->cp_retrans); } - spin_unlock_irqrestore(&conn->c_lock, flags); + spin_unlock_irqrestore(&cp->cp_lock, flags); if (!rm) break; @@ -263,32 +271,34 @@ int rds_send_xmit(struct rds_connection *conn) */ if (rm->rdma.op_active && test_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags)) { - spin_lock_irqsave(&conn->c_lock, flags); + spin_lock_irqsave(&cp->cp_lock, flags); if (test_and_clear_bit(RDS_MSG_ON_CONN, &rm->m_flags)) list_move(&rm->m_conn_item, &to_be_dropped); - spin_unlock_irqrestore(&conn->c_lock, flags); + spin_unlock_irqrestore(&cp->cp_lock, flags); continue; } /* Require an ACK every once in a while */ len = ntohl(rm->m_inc.i_hdr.h_len); - if (conn->c_unacked_packets == 0 || - conn->c_unacked_bytes < len) { + if (cp->cp_unacked_packets == 0 || + cp->cp_unacked_bytes < len) { __set_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags); - conn->c_unacked_packets = rds_sysctl_max_unacked_packets; - conn->c_unacked_bytes = rds_sysctl_max_unacked_bytes; + cp->cp_unacked_packets = + rds_sysctl_max_unacked_packets; + cp->cp_unacked_bytes = + rds_sysctl_max_unacked_bytes; rds_stats_inc(s_send_ack_required); } else { - conn->c_unacked_bytes -= len; - conn->c_unacked_packets--; + cp->cp_unacked_bytes -= len; + cp->cp_unacked_packets--; } - conn->c_xmit_rm = rm; + cp->cp_xmit_rm = rm; } /* The transport either sends the whole rdma or none of it */ - if (rm->rdma.op_active && !conn->c_xmit_rdma_sent) { + if (rm->rdma.op_active && !cp->cp_xmit_rdma_sent) { rm->m_final_op = &rm->rdma; /* The transport owns the mapped memory for now. * You can't unmap it while it's on the send queue @@ -300,11 +310,11 @@ int rds_send_xmit(struct rds_connection *conn) wake_up_interruptible(&rm->m_flush_wait); break; } - conn->c_xmit_rdma_sent = 1; + cp->cp_xmit_rdma_sent = 1; } - if (rm->atomic.op_active && !conn->c_xmit_atomic_sent) { + if (rm->atomic.op_active && !cp->cp_xmit_atomic_sent) { rm->m_final_op = &rm->atomic; /* The transport owns the mapped memory for now. * You can't unmap it while it's on the send queue @@ -316,7 +326,7 @@ int rds_send_xmit(struct rds_connection *conn) wake_up_interruptible(&rm->m_flush_wait); break; } - conn->c_xmit_atomic_sent = 1; + cp->cp_xmit_atomic_sent = 1; } @@ -342,41 +352,42 @@ int rds_send_xmit(struct rds_connection *conn) rm->data.op_active = 0; } - if (rm->data.op_active && !conn->c_xmit_data_sent) { + if (rm->data.op_active && !cp->cp_xmit_data_sent) { rm->m_final_op = &rm->data; + ret = conn->c_trans->xmit(conn, rm, - conn->c_xmit_hdr_off, - conn->c_xmit_sg, - conn->c_xmit_data_off); + cp->cp_xmit_hdr_off, + cp->cp_xmit_sg, + cp->cp_xmit_data_off); if (ret <= 0) break; - if (conn->c_xmit_hdr_off < sizeof(struct rds_header)) { + if (cp->cp_xmit_hdr_off < sizeof(struct rds_header)) { tmp = min_t(int, ret, sizeof(struct rds_header) - - conn->c_xmit_hdr_off); - conn->c_xmit_hdr_off += tmp; + cp->cp_xmit_hdr_off); + cp->cp_xmit_hdr_off += tmp; ret -= tmp; } - sg = &rm->data.op_sg[conn->c_xmit_sg]; + sg = &rm->data.op_sg[cp->cp_xmit_sg]; while (ret) { tmp = min_t(int, ret, sg->length - - conn->c_xmit_data_off); - conn->c_xmit_data_off += tmp; + cp->cp_xmit_data_off); + cp->cp_xmit_data_off += tmp; ret -= tmp; - if (conn->c_xmit_data_off == sg->length) { - conn->c_xmit_data_off = 0; + if (cp->cp_xmit_data_off == sg->length) { + cp->cp_xmit_data_off = 0; sg++; - conn->c_xmit_sg++; - BUG_ON(ret != 0 && - conn->c_xmit_sg == rm->data.op_nents); + cp->cp_xmit_sg++; + BUG_ON(ret != 0 && cp->cp_xmit_sg == + rm->data.op_nents); } } - if (conn->c_xmit_hdr_off == sizeof(struct rds_header) && - (conn->c_xmit_sg == rm->data.op_nents)) - conn->c_xmit_data_sent = 1; + if (cp->cp_xmit_hdr_off == sizeof(struct rds_header) && + (cp->cp_xmit_sg == rm->data.op_nents)) + cp->cp_xmit_data_sent = 1; } /* @@ -384,23 +395,27 @@ int rds_send_xmit(struct rds_connection *conn) * if there is a data op. Thus, if the data is sent (or there was * none), then we're done with the rm. */ - if (!rm->data.op_active || conn->c_xmit_data_sent) { - conn->c_xmit_rm = NULL; - conn->c_xmit_sg = 0; - conn->c_xmit_hdr_off = 0; - conn->c_xmit_data_off = 0; - conn->c_xmit_rdma_sent = 0; - conn->c_xmit_atomic_sent = 0; - conn->c_xmit_data_sent = 0; + if (!rm->data.op_active || cp->cp_xmit_data_sent) { + cp->cp_xmit_rm = NULL; + cp->cp_xmit_sg = 0; + cp->cp_xmit_hdr_off = 0; + cp->cp_xmit_data_off = 0; + cp->cp_xmit_rdma_sent = 0; + cp->cp_xmit_atomic_sent = 0; + cp->cp_xmit_data_sent = 0; rds_message_put(rm); } } over_batch: - if (conn->c_trans->xmit_complete) + if (conn->c_trans->t_mp_capable) { + if (conn->c_trans->xmit_path_complete) + conn->c_trans->xmit_path_complete(cp); + } else if (conn->c_trans->xmit_complete) { conn->c_trans->xmit_complete(conn); - release_in_xmit(conn); + } + release_in_xmit(cp); /* Nuke any messages we decided not to retransmit. */ if (!list_empty(&to_be_dropped)) { @@ -428,12 +443,12 @@ int rds_send_xmit(struct rds_connection *conn) if (ret == 0) { smp_mb(); if ((test_bit(0, &conn->c_map_queued) || - !list_empty(&conn->c_send_queue)) && - send_gen == conn->c_send_gen) { + !list_empty(&cp->cp_send_queue)) && + send_gen == cp->cp_send_gen) { rds_stats_inc(s_send_lock_queue_raced); if (batch_count < send_batch_count) goto restart; - queue_delayed_work(rds_wq, &conn->c_send_w, 1); + queue_delayed_work(rds_wq, &cp->cp_send_w, 1); } } out: @@ -1110,9 +1125,9 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len) */ rds_stats_inc(s_send_queued); - ret = rds_send_xmit(conn); + ret = rds_send_xmit(cpath); if (ret == -ENOMEM || ret == -EAGAIN) - queue_delayed_work(rds_wq, &conn->c_send_w, 1); + queue_delayed_work(rds_wq, &cpath->cp_send_w, 1); rds_message_put(rm); return payload_len; diff --git a/net/rds/threads.c b/net/rds/threads.c index 6d0979b8dc63..50d26576dee7 100644 --- a/net/rds/threads.c +++ b/net/rds/threads.c @@ -177,7 +177,7 @@ void rds_send_worker(struct work_struct *work) if (rds_conn_path_state(cp) == RDS_CONN_UP) { clear_bit(RDS_LL_SEND_FULL, &cp->cp_flags); - ret = rds_send_xmit(cp->cp_conn); + ret = rds_send_xmit(cp); cond_resched(); rdsdebug("conn %p ret %d\n", cp->cp_conn, ret); switch (ret) { From 01ff34ed44a48ed0ae875291b4b6b7dc9ebeea69 Mon Sep 17 00:00:00 2001 From: Sowmini Varadhan Date: Mon, 13 Jun 2016 09:44:35 -0700 Subject: [PATCH 10/17] RDS: Extract rds_conn_path from i_conn_path in rds_send_drop_to() for MP-capable transports Explicitly set up rds_conn_path, either from i_conn_path (for MP capable transpots) or as c_path[0], and use this in rds_send_drop_to() Signed-off-by: Sowmini Varadhan Signed-off-by: David S. Miller --- net/rds/send.c | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/net/rds/send.c b/net/rds/send.c index 966311d135af..9c34fd204639 100644 --- a/net/rds/send.c +++ b/net/rds/send.c @@ -710,6 +710,7 @@ void rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in *dest) { struct rds_message *rm, *tmp; struct rds_connection *conn; + struct rds_conn_path *cp; unsigned long flags; LIST_HEAD(list); @@ -738,22 +739,26 @@ void rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in *dest) list_for_each_entry(rm, &list, m_sock_item) { conn = rm->m_inc.i_conn; + if (conn->c_trans->t_mp_capable) + cp = rm->m_inc.i_conn_path; + else + cp = &conn->c_path[0]; - spin_lock_irqsave(&conn->c_lock, flags); + spin_lock_irqsave(&cp->cp_lock, flags); /* * Maybe someone else beat us to removing rm from the conn. * If we race with their flag update we'll get the lock and * then really see that the flag has been cleared. */ if (!test_and_clear_bit(RDS_MSG_ON_CONN, &rm->m_flags)) { - spin_unlock_irqrestore(&conn->c_lock, flags); + spin_unlock_irqrestore(&cp->cp_lock, flags); spin_lock_irqsave(&rm->m_rs_lock, flags); rm->m_rs = NULL; spin_unlock_irqrestore(&rm->m_rs_lock, flags); continue; } list_del_init(&rm->m_conn_item); - spin_unlock_irqrestore(&conn->c_lock, flags); + spin_unlock_irqrestore(&cp->cp_lock, flags); /* * Couldn't grab m_rs_lock in top loop (lock ordering), From 45997e9e2e01d76607d70461414f66f51487bfe5 Mon Sep 17 00:00:00 2001 From: Sowmini Varadhan Date: Mon, 13 Jun 2016 09:44:36 -0700 Subject: [PATCH 11/17] RDS: Make rds_send_pong() take a rds_conn_path argument This commit allows rds_send_pong() callers to send back the rds pong message on some path other than c_path[0] by passing in a struct rds_conn_path * argument. It also removes the last dependency on the #defines in rds_single.h from send.c Signed-off-by: Sowmini Varadhan Signed-off-by: David S. Miller --- net/rds/rds.h | 2 +- net/rds/recv.c | 2 +- net/rds/send.c | 24 ++++++++++++------------ 3 files changed, 14 insertions(+), 14 deletions(-) diff --git a/net/rds/rds.h b/net/rds/rds.h index b6072eb05fb6..e31515195526 100644 --- a/net/rds/rds.h +++ b/net/rds/rds.h @@ -790,7 +790,7 @@ void rds_send_drop_acked(struct rds_connection *conn, u64 ack, is_acked_func is_acked); void rds_send_path_drop_acked(struct rds_conn_path *cp, u64 ack, is_acked_func is_acked); -int rds_send_pong(struct rds_connection *conn, __be16 dport); +int rds_send_pong(struct rds_conn_path *cp, __be16 dport); /* rdma.c */ void rds_rdma_unuse(struct rds_sock *rs, u32 r_key, int force); diff --git a/net/rds/recv.c b/net/rds/recv.c index 6d7bd63121fc..b58f50571782 100644 --- a/net/rds/recv.c +++ b/net/rds/recv.c @@ -227,7 +227,7 @@ void rds_recv_incoming(struct rds_connection *conn, __be32 saddr, __be32 daddr, if (rds_sysctl_ping_enable && inc->i_hdr.h_dport == 0) { rds_stats_inc(s_recv_ping); - rds_send_pong(conn, inc->i_hdr.h_sport); + rds_send_pong(cp, inc->i_hdr.h_sport); goto out; } diff --git a/net/rds/send.c b/net/rds/send.c index 9c34fd204639..e614513150fe 100644 --- a/net/rds/send.c +++ b/net/rds/send.c @@ -40,7 +40,6 @@ #include #include -#include "rds_single_path.h" #include "rds.h" /* When transmitting messages in rds_send_xmit, we need to emerge from @@ -1153,7 +1152,7 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len) * Reply to a ping packet. */ int -rds_send_pong(struct rds_connection *conn, __be16 dport) +rds_send_pong(struct rds_conn_path *cp, __be16 dport) { struct rds_message *rm; unsigned long flags; @@ -1165,31 +1164,32 @@ rds_send_pong(struct rds_connection *conn, __be16 dport) goto out; } - rm->m_daddr = conn->c_faddr; + rm->m_daddr = cp->cp_conn->c_faddr; rm->data.op_active = 1; - rds_conn_connect_if_down(conn); + rds_conn_connect_if_down(cp->cp_conn); - ret = rds_cong_wait(conn->c_fcong, dport, 1, NULL); + ret = rds_cong_wait(cp->cp_conn->c_fcong, dport, 1, NULL); if (ret) goto out; - spin_lock_irqsave(&conn->c_lock, flags); - list_add_tail(&rm->m_conn_item, &conn->c_send_queue); + spin_lock_irqsave(&cp->cp_lock, flags); + list_add_tail(&rm->m_conn_item, &cp->cp_send_queue); set_bit(RDS_MSG_ON_CONN, &rm->m_flags); rds_message_addref(rm); - rm->m_inc.i_conn = conn; + rm->m_inc.i_conn = cp->cp_conn; + rm->m_inc.i_conn_path = cp; rds_message_populate_header(&rm->m_inc.i_hdr, 0, dport, - conn->c_next_tx_seq); - conn->c_next_tx_seq++; - spin_unlock_irqrestore(&conn->c_lock, flags); + cp->cp_next_tx_seq); + cp->cp_next_tx_seq++; + spin_unlock_irqrestore(&cp->cp_lock, flags); rds_stats_inc(s_send_queued); rds_stats_inc(s_send_pong); /* schedule the send work on rds_wq */ - queue_delayed_work(rds_wq, &conn->c_send_w, 1); + queue_delayed_work(rds_wq, &cp->cp_send_w, 1); rds_message_put(rm); return 0; From 3c0a59001a416ec2a1c46576917732fe5b99336b Mon Sep 17 00:00:00 2001 From: Sowmini Varadhan Date: Mon, 13 Jun 2016 09:44:37 -0700 Subject: [PATCH 12/17] RDS: Add rds_conn_path_connect_if_down() for MP-aware callers rds_conn_path_connect_if_down() works on the rds_conn_path that it is passed. Callers who are not t_m_capable may continue calling rds_conn_connect_if_down, which will invoke rds_conn_path_connect_if_down() with the default c_path[0]. Signed-off-by: Sowmini Varadhan Signed-off-by: David S. Miller --- net/rds/connection.c | 12 +++++++++--- net/rds/rds.h | 1 + net/rds/send.c | 9 ++++----- 3 files changed, 14 insertions(+), 8 deletions(-) diff --git a/net/rds/connection.c b/net/rds/connection.c index 6fa2074044b9..953a426b25ab 100644 --- a/net/rds/connection.c +++ b/net/rds/connection.c @@ -572,11 +572,17 @@ EXPORT_SYMBOL_GPL(rds_conn_drop); * If the connection is down, trigger a connect. We may have scheduled a * delayed reconnect however - in this case we should not interfere. */ +void rds_conn_path_connect_if_down(struct rds_conn_path *cp) +{ + if (rds_conn_path_state(cp) == RDS_CONN_DOWN && + !test_and_set_bit(RDS_RECONNECT_PENDING, &cp->cp_flags)) + queue_delayed_work(rds_wq, &cp->cp_conn_w, 0); +} + void rds_conn_connect_if_down(struct rds_connection *conn) { - if (rds_conn_state(conn) == RDS_CONN_DOWN && - !test_and_set_bit(RDS_RECONNECT_PENDING, &conn->c_flags)) - queue_delayed_work(rds_wq, &conn->c_conn_w, 0); + WARN_ON(conn->c_trans->t_mp_capable); + rds_conn_path_connect_if_down(&conn->c_path[0]); } EXPORT_SYMBOL_GPL(rds_conn_connect_if_down); diff --git a/net/rds/rds.h b/net/rds/rds.h index e31515195526..74fcf5a28723 100644 --- a/net/rds/rds.h +++ b/net/rds/rds.h @@ -658,6 +658,7 @@ void rds_conn_destroy(struct rds_connection *conn); void rds_conn_drop(struct rds_connection *conn); void rds_conn_path_drop(struct rds_conn_path *cpath); void rds_conn_connect_if_down(struct rds_connection *conn); +void rds_conn_path_connect_if_down(struct rds_conn_path *cp); void rds_for_each_conn_info(struct socket *sock, unsigned int len, struct rds_info_iterator *iter, struct rds_info_lengths *lens, diff --git a/net/rds/send.c b/net/rds/send.c index e614513150fe..369bd6690218 100644 --- a/net/rds/send.c +++ b/net/rds/send.c @@ -1088,16 +1088,15 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len) goto out; } - rds_conn_connect_if_down(conn); + cpath = &conn->c_path[0]; + + rds_conn_path_connect_if_down(cpath); ret = rds_cong_wait(conn->c_fcong, dport, nonblock, rs); if (ret) { rs->rs_seen_congestion = 1; goto out; } - - cpath = &conn->c_path[0]; - while (!rds_send_queue_rm(rs, conn, cpath, rm, rs->rs_bound_port, dport, &queued)) { rds_stats_inc(s_send_queue_full); @@ -1167,7 +1166,7 @@ rds_send_pong(struct rds_conn_path *cp, __be16 dport) rm->m_daddr = cp->cp_conn->c_faddr; rm->data.op_active = 1; - rds_conn_connect_if_down(cp->cp_conn); + rds_conn_path_connect_if_down(cp); ret = rds_cong_wait(cp->cp_conn->c_fcong, dport, 1, NULL); if (ret) From 992c9ec5fe1d0c7859e158ee22f293cbee95c6a3 Mon Sep 17 00:00:00 2001 From: Sowmini Varadhan Date: Mon, 13 Jun 2016 09:44:38 -0700 Subject: [PATCH 13/17] RDS: update rds-info related functions to traverse multiple conn_paths This commit updates the callbacks related to the rds-info command so that they walk through all the rds_conn_path structures and report the requested info. Signed-off-by: Sowmini Varadhan Signed-off-by: David S. Miller --- net/rds/connection.c | 105 ++++++++++++++++++++++++++++++++----------- 1 file changed, 80 insertions(+), 25 deletions(-) diff --git a/net/rds/connection.c b/net/rds/connection.c index 953a426b25ab..9e0b489aea41 100644 --- a/net/rds/connection.c +++ b/net/rds/connection.c @@ -400,6 +400,7 @@ static void rds_conn_message_info(struct socket *sock, unsigned int len, unsigned int total = 0; unsigned long flags; size_t i; + int j; len /= sizeof(struct rds_info_message); @@ -408,23 +409,32 @@ static void rds_conn_message_info(struct socket *sock, unsigned int len, for (i = 0, head = rds_conn_hash; i < ARRAY_SIZE(rds_conn_hash); i++, head++) { hlist_for_each_entry_rcu(conn, head, c_hash_node) { - if (want_send) - list = &conn->c_send_queue; - else - list = &conn->c_retrans; + struct rds_conn_path *cp; - spin_lock_irqsave(&conn->c_lock, flags); + for (j = 0; j < RDS_MPATH_WORKERS; j++) { + cp = &conn->c_path[j]; + if (want_send) + list = &cp->cp_send_queue; + else + list = &cp->cp_retrans; - /* XXX too lazy to maintain counts.. */ - list_for_each_entry(rm, list, m_conn_item) { - total++; - if (total <= len) - rds_inc_info_copy(&rm->m_inc, iter, - conn->c_laddr, - conn->c_faddr, 0); + spin_lock_irqsave(&cp->cp_lock, flags); + + /* XXX too lazy to maintain counts.. */ + list_for_each_entry(rm, list, m_conn_item) { + total++; + if (total <= len) + rds_inc_info_copy(&rm->m_inc, + iter, + conn->c_laddr, + conn->c_faddr, + 0); + } + + spin_unlock_irqrestore(&cp->cp_lock, flags); + if (!conn->c_trans->t_mp_capable) + break; } - - spin_unlock_irqrestore(&conn->c_lock, flags); } } rcu_read_unlock(); @@ -486,27 +496,72 @@ void rds_for_each_conn_info(struct socket *sock, unsigned int len, } EXPORT_SYMBOL_GPL(rds_for_each_conn_info); -static int rds_conn_info_visitor(struct rds_connection *conn, - void *buffer) +void rds_walk_conn_path_info(struct socket *sock, unsigned int len, + struct rds_info_iterator *iter, + struct rds_info_lengths *lens, + int (*visitor)(struct rds_conn_path *, void *), + size_t item_len) +{ + u64 buffer[(item_len + 7) / 8]; + struct hlist_head *head; + struct rds_connection *conn; + size_t i; + int j; + + rcu_read_lock(); + + lens->nr = 0; + lens->each = item_len; + + for (i = 0, head = rds_conn_hash; i < ARRAY_SIZE(rds_conn_hash); + i++, head++) { + hlist_for_each_entry_rcu(conn, head, c_hash_node) { + struct rds_conn_path *cp; + + for (j = 0; j < RDS_MPATH_WORKERS; j++) { + cp = &conn->c_path[j]; + + /* XXX no cp_lock usage.. */ + if (!visitor(cp, buffer)) + continue; + if (!conn->c_trans->t_mp_capable) + break; + } + + /* We copy as much as we can fit in the buffer, + * but we count all items so that the caller + * can resize the buffer. + */ + if (len >= item_len) { + rds_info_copy(iter, buffer, item_len); + len -= item_len; + } + lens->nr++; + } + } + rcu_read_unlock(); +} + +static int rds_conn_info_visitor(struct rds_conn_path *cp, void *buffer) { struct rds_info_connection *cinfo = buffer; - cinfo->next_tx_seq = conn->c_next_tx_seq; - cinfo->next_rx_seq = conn->c_next_rx_seq; - cinfo->laddr = conn->c_laddr; - cinfo->faddr = conn->c_faddr; - strncpy(cinfo->transport, conn->c_trans->t_name, + cinfo->next_tx_seq = cp->cp_next_tx_seq; + cinfo->next_rx_seq = cp->cp_next_rx_seq; + cinfo->laddr = cp->cp_conn->c_laddr; + cinfo->faddr = cp->cp_conn->c_faddr; + strncpy(cinfo->transport, cp->cp_conn->c_trans->t_name, sizeof(cinfo->transport)); cinfo->flags = 0; - rds_conn_info_set(cinfo->flags, test_bit(RDS_IN_XMIT, &conn->c_flags), + rds_conn_info_set(cinfo->flags, test_bit(RDS_IN_XMIT, &cp->cp_flags), SENDING); /* XXX Future: return the state rather than these funky bits */ rds_conn_info_set(cinfo->flags, - atomic_read(&conn->c_state) == RDS_CONN_CONNECTING, + atomic_read(&cp->cp_state) == RDS_CONN_CONNECTING, CONNECTING); rds_conn_info_set(cinfo->flags, - atomic_read(&conn->c_state) == RDS_CONN_UP, + atomic_read(&cp->cp_state) == RDS_CONN_UP, CONNECTED); return 1; } @@ -515,7 +570,7 @@ static void rds_conn_info(struct socket *sock, unsigned int len, struct rds_info_iterator *iter, struct rds_info_lengths *lens) { - rds_for_each_conn_info(sock, len, iter, lens, + rds_walk_conn_path_info(sock, len, iter, lens, rds_conn_info_visitor, sizeof(struct rds_info_connection)); } From fb1b3dc43dabd4bf7b57b3d63fd2875a499c81f0 Mon Sep 17 00:00:00 2001 From: Sowmini Varadhan Date: Mon, 13 Jun 2016 09:44:39 -0700 Subject: [PATCH 14/17] RDS: Add rds_conn_path_error() rds_conn_path_error() is the MP-aware analog of rds_conn_error, to be used by multipath-capable callers. Signed-off-by: Sowmini Varadhan Signed-off-by: David S. Miller --- net/rds/connection.c | 12 ++++++++++++ net/rds/rds.h | 4 ++++ net/rds/threads.c | 3 ++- 3 files changed, 18 insertions(+), 1 deletion(-) diff --git a/net/rds/connection.c b/net/rds/connection.c index 9e0b489aea41..57556e615ce2 100644 --- a/net/rds/connection.c +++ b/net/rds/connection.c @@ -655,3 +655,15 @@ __rds_conn_error(struct rds_connection *conn, const char *fmt, ...) rds_conn_drop(conn); } + +void +__rds_conn_path_error(struct rds_conn_path *cp, const char *fmt, ...) +{ + va_list ap; + + va_start(ap, fmt); + vprintk(fmt, ap); + va_end(ap); + + rds_conn_path_drop(cp); +} diff --git a/net/rds/rds.h b/net/rds/rds.h index 74fcf5a28723..85f98bd88c1c 100644 --- a/net/rds/rds.h +++ b/net/rds/rds.h @@ -669,6 +669,10 @@ void __rds_conn_error(struct rds_connection *conn, const char *, ...); #define rds_conn_error(conn, fmt...) \ __rds_conn_error(conn, KERN_WARNING "RDS: " fmt) +void __rds_conn_path_error(struct rds_conn_path *cp, const char *, ...); +#define rds_conn_path_error(cp, fmt...) \ + __rds_conn_path_error(cp, KERN_WARNING "RDS: " fmt) + static inline int rds_conn_path_transition(struct rds_conn_path *cp, int old, int new) { diff --git a/net/rds/threads.c b/net/rds/threads.c index 50d26576dee7..94cca66ba5d6 100644 --- a/net/rds/threads.c +++ b/net/rds/threads.c @@ -163,7 +163,8 @@ void rds_connect_worker(struct work_struct *work) RDS_CONN_DOWN)) rds_queue_reconnect(cp); else - rds_conn_error(conn, "RDS: connect failed\n"); + rds_conn_path_error(cp, + "RDS: connect failed\n"); } } } From 1c5113cf796bb730abc1798a3649b61e9e022be6 Mon Sep 17 00:00:00 2001 From: Sowmini Varadhan Date: Mon, 13 Jun 2016 09:44:40 -0700 Subject: [PATCH 15/17] RDS: Initialize all RDS_MPATH_WORKERS in __rds_conn_create Add a for() loop in __rds_conn_create to initialize all the conn_paths, in preparate for MP capable transports. Signed-off-by: Sowmini Varadhan Signed-off-by: David S. Miller --- net/rds/connection.c | 65 ++++++++++++++++++++++++++++++-------------- 1 file changed, 45 insertions(+), 20 deletions(-) diff --git a/net/rds/connection.c b/net/rds/connection.c index 57556e615ce2..a99ac69f77ac 100644 --- a/net/rds/connection.c +++ b/net/rds/connection.c @@ -111,6 +111,32 @@ static void rds_conn_reset(struct rds_connection *conn) * reliability guarantees of RDS. */ } +static void __rds_conn_path_init(struct rds_connection *conn, + struct rds_conn_path *cp, bool is_outgoing) +{ + spin_lock_init(&cp->cp_lock); + cp->cp_next_tx_seq = 1; + init_waitqueue_head(&cp->cp_waitq); + INIT_LIST_HEAD(&cp->cp_send_queue); + INIT_LIST_HEAD(&cp->cp_retrans); + + cp->cp_conn = conn; + atomic_set(&cp->cp_state, RDS_CONN_DOWN); + cp->cp_send_gen = 0; + /* cp_outgoing is per-path. So we can only set it here + * for the single-path transports. + */ + if (!conn->c_trans->t_mp_capable) + cp->cp_outgoing = (is_outgoing ? 1 : 0); + cp->cp_reconnect_jiffies = 0; + INIT_DELAYED_WORK(&cp->cp_send_w, rds_send_worker); + INIT_DELAYED_WORK(&cp->cp_recv_w, rds_recv_worker); + INIT_DELAYED_WORK(&cp->cp_conn_w, rds_connect_worker); + INIT_WORK(&cp->cp_down_w, rds_shutdown_worker); + mutex_init(&cp->cp_cm_lock); + cp->cp_flags = 0; +} + /* * There is only every one 'conn' for a given pair of addresses in the * system at a time. They contain messages to be retransmitted and so @@ -154,14 +180,8 @@ static struct rds_connection *__rds_conn_create(struct net *net, INIT_HLIST_NODE(&conn->c_hash_node); conn->c_laddr = laddr; conn->c_faddr = faddr; - spin_lock_init(&conn->c_lock); - conn->c_next_tx_seq = 1; - conn->c_path[0].cp_conn = conn; - rds_conn_net_set(conn, net); - init_waitqueue_head(&conn->c_waitq); - INIT_LIST_HEAD(&conn->c_send_queue); - INIT_LIST_HEAD(&conn->c_retrans); + rds_conn_net_set(conn, net); ret = rds_cong_get_maps(conn); if (ret) { @@ -197,17 +217,6 @@ static struct rds_connection *__rds_conn_create(struct net *net, goto out; } - atomic_set(&conn->c_state, RDS_CONN_DOWN); - conn->c_send_gen = 0; - conn->c_path[0].cp_outgoing = (is_outgoing ? 1 : 0); - conn->c_reconnect_jiffies = 0; - INIT_DELAYED_WORK(&conn->c_send_w, rds_send_worker); - INIT_DELAYED_WORK(&conn->c_recv_w, rds_recv_worker); - INIT_DELAYED_WORK(&conn->c_conn_w, rds_connect_worker); - INIT_WORK(&conn->c_down_w, rds_shutdown_worker); - mutex_init(&conn->c_cm_lock); - conn->c_flags = 0; - rdsdebug("allocated conn %p for %pI4 -> %pI4 over %s %s\n", conn, &laddr, &faddr, trans->t_name ? trans->t_name : "[unknown]", @@ -224,7 +233,7 @@ static struct rds_connection *__rds_conn_create(struct net *net, if (parent) { /* Creating passive conn */ if (parent->c_passive) { - trans->conn_free(conn->c_transport_data); + trans->conn_free(conn->c_path[0].cp_transport_data); kmem_cache_free(rds_conn_slab, conn); conn = parent->c_passive; } else { @@ -238,10 +247,26 @@ static struct rds_connection *__rds_conn_create(struct net *net, found = rds_conn_lookup(net, head, laddr, faddr, trans); if (found) { - trans->conn_free(conn->c_transport_data); + struct rds_conn_path *cp; + int i; + + for (i = 0; i < RDS_MPATH_WORKERS; i++) { + cp = &conn->c_path[i]; + trans->conn_free(cp->cp_transport_data); + if (!trans->t_mp_capable) + break; + } kmem_cache_free(rds_conn_slab, conn); conn = found; } else { + int i; + + for (i = 0; i < RDS_MPATH_WORKERS; i++) { + __rds_conn_path_init(conn, &conn->c_path[i], + is_outgoing); + conn->c_path[i].cp_index = i; + } + hlist_add_head_rcu(&conn->c_hash_node, head); rds_cong_add_conn(conn); rds_conn_count++; From d769ef81d5b5932520fbefb02614a4380c132495 Mon Sep 17 00:00:00 2001 From: Sowmini Varadhan Date: Mon, 13 Jun 2016 09:44:41 -0700 Subject: [PATCH 16/17] RDS: Update rds_conn_shutdown to work with rds_conn_path This commit changes rds_conn_shutdown to take a rds_conn_path * argument, allowing it to shutdown paths other than c_path[0] for MP-capable transports. Signed-off-by: Sowmini Varadhan Signed-off-by: David S. Miller --- net/rds/connection.c | 64 +++++++++++++++++++++++++------------------- net/rds/rds.h | 5 ++-- net/rds/send.c | 9 ++----- net/rds/tcp.c | 2 +- net/rds/threads.c | 2 +- 5 files changed, 44 insertions(+), 38 deletions(-) diff --git a/net/rds/connection.c b/net/rds/connection.c index a99ac69f77ac..a88d26fd8223 100644 --- a/net/rds/connection.c +++ b/net/rds/connection.c @@ -96,14 +96,16 @@ static struct rds_connection *rds_conn_lookup(struct net *net, * and receiving over this connection again in the future. It is up to * the transport to have serialized this call with its send and recv. */ -static void rds_conn_reset(struct rds_connection *conn) +static void rds_conn_path_reset(struct rds_conn_path *cp) { + struct rds_connection *conn = cp->cp_conn; + rdsdebug("connection %pI4 to %pI4 reset\n", &conn->c_laddr, &conn->c_faddr); rds_stats_inc(s_conn_reset); - rds_send_reset(conn); - conn->c_flags = 0; + rds_send_path_reset(cp); + cp->cp_flags = 0; /* Do not clear next_rx_seq here, else we cannot distinguish * retransmitted packets from new packets, and will hand all @@ -294,10 +296,12 @@ struct rds_connection *rds_conn_create_outgoing(struct net *net, } EXPORT_SYMBOL_GPL(rds_conn_create_outgoing); -void rds_conn_shutdown(struct rds_connection *conn) +void rds_conn_shutdown(struct rds_conn_path *cp) { + struct rds_connection *conn = cp->cp_conn; + /* shut it down unless it's down already */ - if (!rds_conn_transition(conn, RDS_CONN_DOWN, RDS_CONN_DOWN)) { + if (!rds_conn_path_transition(cp, RDS_CONN_DOWN, RDS_CONN_DOWN)) { /* * Quiesce the connection mgmt handlers before we start tearing * things down. We don't hold the mutex for the entire @@ -305,35 +309,41 @@ void rds_conn_shutdown(struct rds_connection *conn) * deadlocking with the CM handler. Instead, the CM event * handler is supposed to check for state DISCONNECTING */ - mutex_lock(&conn->c_cm_lock); - if (!rds_conn_transition(conn, RDS_CONN_UP, RDS_CONN_DISCONNECTING) - && !rds_conn_transition(conn, RDS_CONN_ERROR, RDS_CONN_DISCONNECTING)) { - rds_conn_error(conn, "shutdown called in state %d\n", - atomic_read(&conn->c_state)); - mutex_unlock(&conn->c_cm_lock); + mutex_lock(&cp->cp_cm_lock); + if (!rds_conn_path_transition(cp, RDS_CONN_UP, + RDS_CONN_DISCONNECTING) && + !rds_conn_path_transition(cp, RDS_CONN_ERROR, + RDS_CONN_DISCONNECTING)) { + rds_conn_path_error(cp, + "shutdown called in state %d\n", + atomic_read(&cp->cp_state)); + mutex_unlock(&cp->cp_cm_lock); return; } - mutex_unlock(&conn->c_cm_lock); + mutex_unlock(&cp->cp_cm_lock); - wait_event(conn->c_waitq, - !test_bit(RDS_IN_XMIT, &conn->c_flags)); - wait_event(conn->c_waitq, - !test_bit(RDS_RECV_REFILL, &conn->c_flags)); + wait_event(cp->cp_waitq, + !test_bit(RDS_IN_XMIT, &cp->cp_flags)); + wait_event(cp->cp_waitq, + !test_bit(RDS_RECV_REFILL, &cp->cp_flags)); - conn->c_trans->conn_shutdown(conn); - rds_conn_reset(conn); + if (!conn->c_trans->t_mp_capable) + conn->c_trans->conn_shutdown(conn); + else + conn->c_trans->conn_path_shutdown(cp); + rds_conn_path_reset(cp); - if (!rds_conn_transition(conn, RDS_CONN_DISCONNECTING, RDS_CONN_DOWN)) { + if (!rds_conn_path_transition(cp, RDS_CONN_DISCONNECTING, + RDS_CONN_DOWN)) { /* This can happen - eg when we're in the middle of tearing * down the connection, and someone unloads the rds module. * Quite reproduceable with loopback connections. * Mostly harmless. */ - rds_conn_error(conn, - "%s: failed to transition to state DOWN, " - "current state is %d\n", - __func__, - atomic_read(&conn->c_state)); + rds_conn_path_error(cp, "%s: failed to transition " + "to state DOWN, current state " + "is %d\n", __func__, + atomic_read(&cp->cp_state)); return; } } @@ -342,13 +352,13 @@ void rds_conn_shutdown(struct rds_connection *conn) * The passive side of an IB loopback connection is never added * to the conn hash, so we never trigger a reconnect on this * conn - the reconnect is always triggered by the active peer. */ - cancel_delayed_work_sync(&conn->c_conn_w); + cancel_delayed_work_sync(&cp->cp_conn_w); rcu_read_lock(); if (!hlist_unhashed(&conn->c_hash_node)) { rcu_read_unlock(); if (conn->c_trans->t_type != RDS_TRANS_TCP || - conn->c_path[0].cp_outgoing == 1) - rds_queue_reconnect(&conn->c_path[0]); + cp->cp_outgoing == 1) + rds_queue_reconnect(cp); } else { rcu_read_unlock(); } diff --git a/net/rds/rds.h b/net/rds/rds.h index 85f98bd88c1c..2e35b738176f 100644 --- a/net/rds/rds.h +++ b/net/rds/rds.h @@ -456,6 +456,7 @@ struct rds_transport { void (*conn_free)(void *data); int (*conn_connect)(struct rds_connection *conn); void (*conn_shutdown)(struct rds_connection *conn); + void (*conn_path_shutdown)(struct rds_conn_path *conn); void (*xmit_prepare)(struct rds_connection *conn); void (*xmit_path_prepare)(struct rds_conn_path *cp); void (*xmit_complete)(struct rds_connection *conn); @@ -653,7 +654,7 @@ struct rds_connection *rds_conn_create(struct net *net, struct rds_connection *rds_conn_create_outgoing(struct net *net, __be32 laddr, __be32 faddr, struct rds_transport *trans, gfp_t gfp); -void rds_conn_shutdown(struct rds_connection *conn); +void rds_conn_shutdown(struct rds_conn_path *cpath); void rds_conn_destroy(struct rds_connection *conn); void rds_conn_drop(struct rds_connection *conn); void rds_conn_path_drop(struct rds_conn_path *cpath); @@ -786,7 +787,7 @@ void rds_inc_info_copy(struct rds_incoming *inc, /* send.c */ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len); -void rds_send_reset(struct rds_connection *conn); +void rds_send_path_reset(struct rds_conn_path *conn); int rds_send_xmit(struct rds_conn_path *cp); struct sockaddr_in; void rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in *dest); diff --git a/net/rds/send.c b/net/rds/send.c index 369bd6690218..ee43d6b2ea8f 100644 --- a/net/rds/send.c +++ b/net/rds/send.c @@ -62,7 +62,7 @@ static void rds_send_remove_from_sock(struct list_head *messages, int status); * Reset the send state. Callers must ensure that this doesn't race with * rds_send_xmit(). */ -static void rds_send_path_reset(struct rds_conn_path *cp) +void rds_send_path_reset(struct rds_conn_path *cp) { struct rds_message *rm, *tmp; unsigned long flags; @@ -99,12 +99,7 @@ static void rds_send_path_reset(struct rds_conn_path *cp) list_splice_init(&cp->cp_retrans, &cp->cp_send_queue); spin_unlock_irqrestore(&cp->cp_lock, flags); } - -void rds_send_reset(struct rds_connection *conn) -{ - rds_send_path_reset(&conn->c_path[0]); -} -EXPORT_SYMBOL_GPL(rds_send_reset); +EXPORT_SYMBOL_GPL(rds_send_path_reset); static int acquire_in_xmit(struct rds_conn_path *cp) { diff --git a/net/rds/tcp.c b/net/rds/tcp.c index 4bc1c153e93a..0e757a0d7421 100644 --- a/net/rds/tcp.c +++ b/net/rds/tcp.c @@ -186,7 +186,7 @@ void rds_tcp_reset_callbacks(struct socket *sock, release_sock(osock->sk); sock_release(osock); newsock: - rds_send_reset(conn); + rds_send_path_reset(&conn->c_path[0]); lock_sock(sock->sk); write_lock_bh(&sock->sk->sk_callback_lock); tc->t_sock = sock; diff --git a/net/rds/threads.c b/net/rds/threads.c index 94cca66ba5d6..9fbe95bb14a9 100644 --- a/net/rds/threads.c +++ b/net/rds/threads.c @@ -225,7 +225,7 @@ void rds_shutdown_worker(struct work_struct *work) struct rds_conn_path, cp_down_w); - rds_conn_shutdown(cp->cp_conn); + rds_conn_shutdown(cp); } void rds_threads_exit(void) From 3ecc5693c02bb154bc8609c640eb862804c4aabb Mon Sep 17 00:00:00 2001 From: Sowmini Varadhan Date: Mon, 13 Jun 2016 09:44:42 -0700 Subject: [PATCH 17/17] RDS: Update rds_conn_destroy to be MP capable Refactor rds_conn_destroy() so that the per-path dismantling is done in rds_conn_path_destroy, and then iterate as needed over rds_conn_path_destroy(). Signed-off-by: Sowmini Varadhan Signed-off-by: David S. Miller --- net/rds/connection.c | 59 +++++++++++++++++++++++++++++--------------- 1 file changed, 39 insertions(+), 20 deletions(-) diff --git a/net/rds/connection.c b/net/rds/connection.c index a88d26fd8223..a4b07c899d89 100644 --- a/net/rds/connection.c +++ b/net/rds/connection.c @@ -36,7 +36,6 @@ #include #include -#include "rds_single_path.h" #include "rds.h" #include "loop.h" @@ -364,6 +363,34 @@ void rds_conn_shutdown(struct rds_conn_path *cp) } } +/* destroy a single rds_conn_path. rds_conn_destroy() iterates over + * all paths using rds_conn_path_destroy() + */ +static void rds_conn_path_destroy(struct rds_conn_path *cp) +{ + struct rds_message *rm, *rtmp; + + rds_conn_path_drop(cp); + flush_work(&cp->cp_down_w); + + /* make sure lingering queued work won't try to ref the conn */ + cancel_delayed_work_sync(&cp->cp_send_w); + cancel_delayed_work_sync(&cp->cp_recv_w); + + /* tear down queued messages */ + list_for_each_entry_safe(rm, rtmp, + &cp->cp_send_queue, + m_conn_item) { + list_del_init(&rm->m_conn_item); + BUG_ON(!list_empty(&rm->m_sock_item)); + rds_message_put(rm); + } + if (cp->cp_xmit_rm) + rds_message_put(cp->cp_xmit_rm); + + cp->cp_conn->c_trans->conn_free(cp->cp_transport_data); +} + /* * Stop and free a connection. * @@ -373,7 +400,6 @@ void rds_conn_shutdown(struct rds_conn_path *cp) */ void rds_conn_destroy(struct rds_connection *conn) { - struct rds_message *rm, *rtmp; unsigned long flags; rdsdebug("freeing conn %p for %pI4 -> " @@ -387,25 +413,19 @@ void rds_conn_destroy(struct rds_connection *conn) synchronize_rcu(); /* shut the connection down */ - rds_conn_drop(conn); - flush_work(&conn->c_down_w); + if (!conn->c_trans->t_mp_capable) { + rds_conn_path_destroy(&conn->c_path[0]); + BUG_ON(!list_empty(&conn->c_path[0].cp_retrans)); + } else { + int i; + struct rds_conn_path *cp; - /* make sure lingering queued work won't try to ref the conn */ - cancel_delayed_work_sync(&conn->c_send_w); - cancel_delayed_work_sync(&conn->c_recv_w); - - /* tear down queued messages */ - list_for_each_entry_safe(rm, rtmp, - &conn->c_send_queue, - m_conn_item) { - list_del_init(&rm->m_conn_item); - BUG_ON(!list_empty(&rm->m_sock_item)); - rds_message_put(rm); + for (i = 0; i < RDS_MPATH_WORKERS; i++) { + cp = &conn->c_path[i]; + rds_conn_path_destroy(cp); + BUG_ON(!list_empty(&cp->cp_retrans)); + } } - if (conn->c_xmit_rm) - rds_message_put(conn->c_xmit_rm); - - conn->c_trans->conn_free(conn->c_transport_data); /* * The congestion maps aren't freed up here. They're @@ -414,7 +434,6 @@ void rds_conn_destroy(struct rds_connection *conn) */ rds_cong_remove_conn(conn); - BUG_ON(!list_empty(&conn->c_retrans)); kmem_cache_free(rds_conn_slab, conn); spin_lock_irqsave(&rds_conn_lock, flags);