Merge branch 'mptcp-update-mptcp-ack-sequence-outside-of-recv-path'

Florian Westphal says:

====================
mptcp: update mptcp ack sequence outside of recv path

This series moves mptcp-level ack sequence update outside of the recvmsg path.
Current approach has two problems:

1. There is delay between arrival of new data and the time we can ack
   this data.
2. If userspace doesn't call recv for some time, mptcp ack_seq is not
   updated at all, even if this data is queued in the subflow socket
   receive queue.

Move skbs from the subflow socket receive queue to the mptcp-level
receive queue, updating the mptcp-level ack sequence and have recv
take skbs from the mptcp-level receive queue.

The first place where we will attempt to update the mptcp level acks
is from the subflows' data_ready callback, even before we make userspace
aware of new data.

Because of possible deadlock (we need to take the mptcp socket lock
while already holding the subflow sockets lock), we may still need to
defer the mptcp-level ack update.  In such case, this work will be either
done from work queue or recv path, depending on which runs sooner.

In order to avoid pointless scheduling of the work queue, work
will be queued from the mptcp sockets lock release callback.
This allows to detect when the socket owner did drain the subflow
socket receive queue.

Please see individual patches for more information.
====================

Signed-off-by: David S. Miller <davem@davemloft.net>
This commit is contained in:
David S. Miller 2020-02-26 20:46:26 -08:00
commit 621135a0f9
3 changed files with 290 additions and 117 deletions

View File

@ -31,6 +31,12 @@ struct mptcp6_sock {
};
#endif
struct mptcp_skb_cb {
u32 offset;
};
#define MPTCP_SKB_CB(__skb) ((struct mptcp_skb_cb *)&((__skb)->cb[0]))
/* If msk has an initial subflow socket, and the MP_CAPABLE handshake has not
* completed yet or has failed, return the subflow socket.
* Otherwise return NULL.
@ -111,6 +117,141 @@ static struct sock *mptcp_subflow_get(const struct mptcp_sock *msk)
return NULL;
}
static void __mptcp_move_skb(struct mptcp_sock *msk, struct sock *ssk,
struct sk_buff *skb,
unsigned int offset, size_t copy_len)
{
struct sock *sk = (struct sock *)msk;
__skb_unlink(skb, &ssk->sk_receive_queue);
skb_set_owner_r(skb, sk);
__skb_queue_tail(&sk->sk_receive_queue, skb);
msk->ack_seq += copy_len;
MPTCP_SKB_CB(skb)->offset = offset;
}
static bool __mptcp_move_skbs_from_subflow(struct mptcp_sock *msk,
struct sock *ssk,
unsigned int *bytes)
{
struct mptcp_subflow_context *subflow = mptcp_subflow_ctx(ssk);
struct sock *sk = (struct sock *)msk;
unsigned int moved = 0;
bool more_data_avail;
struct tcp_sock *tp;
bool done = false;
int rcvbuf;
rcvbuf = max(ssk->sk_rcvbuf, sk->sk_rcvbuf);
if (rcvbuf > sk->sk_rcvbuf)
sk->sk_rcvbuf = rcvbuf;
tp = tcp_sk(ssk);
do {
u32 map_remaining, offset;
u32 seq = tp->copied_seq;
struct sk_buff *skb;
bool fin;
/* try to move as much data as available */
map_remaining = subflow->map_data_len -
mptcp_subflow_get_map_offset(subflow);
skb = skb_peek(&ssk->sk_receive_queue);
if (!skb)
break;
offset = seq - TCP_SKB_CB(skb)->seq;
fin = TCP_SKB_CB(skb)->tcp_flags & TCPHDR_FIN;
if (fin) {
done = true;
seq++;
}
if (offset < skb->len) {
size_t len = skb->len - offset;
if (tp->urg_data)
done = true;
__mptcp_move_skb(msk, ssk, skb, offset, len);
seq += len;
moved += len;
if (WARN_ON_ONCE(map_remaining < len))
break;
} else {
WARN_ON_ONCE(!fin);
sk_eat_skb(ssk, skb);
done = true;
}
WRITE_ONCE(tp->copied_seq, seq);
more_data_avail = mptcp_subflow_data_available(ssk);
if (atomic_read(&sk->sk_rmem_alloc) > READ_ONCE(sk->sk_rcvbuf)) {
done = true;
break;
}
} while (more_data_avail);
*bytes = moved;
return done;
}
/* In most cases we will be able to lock the mptcp socket. If its already
* owned, we need to defer to the work queue to avoid ABBA deadlock.
*/
static bool move_skbs_to_msk(struct mptcp_sock *msk, struct sock *ssk)
{
struct sock *sk = (struct sock *)msk;
unsigned int moved = 0;
if (READ_ONCE(sk->sk_lock.owned))
return false;
if (unlikely(!spin_trylock_bh(&sk->sk_lock.slock)))
return false;
/* must re-check after taking the lock */
if (!READ_ONCE(sk->sk_lock.owned))
__mptcp_move_skbs_from_subflow(msk, ssk, &moved);
spin_unlock_bh(&sk->sk_lock.slock);
return moved > 0;
}
void mptcp_data_ready(struct sock *sk, struct sock *ssk)
{
struct mptcp_sock *msk = mptcp_sk(sk);
set_bit(MPTCP_DATA_READY, &msk->flags);
if (atomic_read(&sk->sk_rmem_alloc) < READ_ONCE(sk->sk_rcvbuf) &&
move_skbs_to_msk(msk, ssk))
goto wake;
/* don't schedule if mptcp sk is (still) over limit */
if (atomic_read(&sk->sk_rmem_alloc) > READ_ONCE(sk->sk_rcvbuf))
goto wake;
/* mptcp socket is owned, release_cb should retry */
if (!test_and_set_bit(TCP_DELACK_TIMER_DEFERRED,
&sk->sk_tsq_flags)) {
sock_hold(sk);
/* need to try again, its possible release_cb() has already
* been called after the test_and_set_bit() above.
*/
move_skbs_to_msk(msk, ssk);
}
wake:
sk->sk_data_ready(sk);
}
static bool mptcp_ext_cache_refill(struct mptcp_sock *msk)
{
if (!msk->cached_ext)
@ -323,33 +464,6 @@ static int mptcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t len)
return ret;
}
int mptcp_read_actor(read_descriptor_t *desc, struct sk_buff *skb,
unsigned int offset, size_t len)
{
struct mptcp_read_arg *arg = desc->arg.data;
size_t copy_len;
copy_len = min(desc->count, len);
if (likely(arg->msg)) {
int err;
err = skb_copy_datagram_msg(skb, offset, arg->msg, copy_len);
if (err) {
pr_debug("error path");
desc->error = err;
return err;
}
} else {
pr_debug("Flushing skb payload");
}
desc->count -= copy_len;
pr_debug("consumed %zu bytes, %zu left", copy_len, desc->count);
return copy_len;
}
static void mptcp_wait_data(struct sock *sk, long *timeo)
{
DEFINE_WAIT_FUNC(wait, woken_wake_function);
@ -365,19 +479,68 @@ static void mptcp_wait_data(struct sock *sk, long *timeo)
remove_wait_queue(sk_sleep(sk), &wait);
}
static int __mptcp_recvmsg_mskq(struct mptcp_sock *msk,
struct msghdr *msg,
size_t len)
{
struct sock *sk = (struct sock *)msk;
struct sk_buff *skb;
int copied = 0;
while ((skb = skb_peek(&sk->sk_receive_queue)) != NULL) {
u32 offset = MPTCP_SKB_CB(skb)->offset;
u32 data_len = skb->len - offset;
u32 count = min_t(size_t, len - copied, data_len);
int err;
err = skb_copy_datagram_msg(skb, offset, msg, count);
if (unlikely(err < 0)) {
if (!copied)
return err;
break;
}
copied += count;
if (count < data_len) {
MPTCP_SKB_CB(skb)->offset += count;
break;
}
__skb_unlink(skb, &sk->sk_receive_queue);
__kfree_skb(skb);
if (copied >= len)
break;
}
return copied;
}
static bool __mptcp_move_skbs(struct mptcp_sock *msk)
{
unsigned int moved = 0;
bool done;
do {
struct sock *ssk = mptcp_subflow_recv_lookup(msk);
if (!ssk)
break;
lock_sock(ssk);
done = __mptcp_move_skbs_from_subflow(msk, ssk, &moved);
release_sock(ssk);
} while (!done);
return moved > 0;
}
static int mptcp_recvmsg(struct sock *sk, struct msghdr *msg, size_t len,
int nonblock, int flags, int *addr_len)
{
struct mptcp_sock *msk = mptcp_sk(sk);
struct mptcp_subflow_context *subflow;
bool more_data_avail = false;
struct mptcp_read_arg arg;
read_descriptor_t desc;
bool wait_data = false;
struct socket *ssock;
struct tcp_sock *tp;
bool done = false;
struct sock *ssk;
int copied = 0;
int target;
long timeo;
@ -395,65 +558,26 @@ static int mptcp_recvmsg(struct sock *sk, struct msghdr *msg, size_t len,
return copied;
}
arg.msg = msg;
desc.arg.data = &arg;
desc.error = 0;
timeo = sock_rcvtimeo(sk, nonblock);
len = min_t(size_t, len, INT_MAX);
target = sock_rcvlowat(sk, flags & MSG_WAITALL, len);
while (!done) {
u32 map_remaining;
while (len > (size_t)copied) {
int bytes_read;
ssk = mptcp_subflow_recv_lookup(msk);
pr_debug("msk=%p ssk=%p", msk, ssk);
if (!ssk)
goto wait_for_data;
bytes_read = __mptcp_recvmsg_mskq(msk, msg, len - copied);
if (unlikely(bytes_read < 0)) {
if (!copied)
copied = bytes_read;
goto out_err;
}
subflow = mptcp_subflow_ctx(ssk);
tp = tcp_sk(ssk);
copied += bytes_read;
lock_sock(ssk);
do {
/* try to read as much data as available */
map_remaining = subflow->map_data_len -
mptcp_subflow_get_map_offset(subflow);
desc.count = min_t(size_t, len - copied, map_remaining);
pr_debug("reading %zu bytes, copied %d", desc.count,
copied);
bytes_read = tcp_read_sock(ssk, &desc,
mptcp_read_actor);
if (bytes_read < 0) {
if (!copied)
copied = bytes_read;
done = true;
goto next;
}
pr_debug("msk ack_seq=%llx -> %llx", msk->ack_seq,
msk->ack_seq + bytes_read);
msk->ack_seq += bytes_read;
copied += bytes_read;
if (copied >= len) {
done = true;
goto next;
}
if (tp->urg_data && tp->urg_seq == tp->copied_seq) {
pr_err("Urgent data present, cannot proceed");
done = true;
goto next;
}
next:
more_data_avail = mptcp_subflow_data_available(ssk);
} while (more_data_avail && !done);
release_sock(ssk);
continue;
wait_for_data:
more_data_avail = false;
if (skb_queue_empty(&sk->sk_receive_queue) &&
__mptcp_move_skbs(msk))
continue;
/* only the master socket status is relevant here. The exit
* conditions mirror closely tcp_recvmsg()
@ -494,26 +618,25 @@ static int mptcp_recvmsg(struct sock *sk, struct msghdr *msg, size_t len,
}
pr_debug("block timeout %ld", timeo);
wait_data = true;
mptcp_wait_data(sk, &timeo);
if (unlikely(__mptcp_tcp_fallback(msk)))
goto fallback;
}
if (more_data_avail) {
if (!test_bit(MPTCP_DATA_READY, &msk->flags))
set_bit(MPTCP_DATA_READY, &msk->flags);
} else if (!wait_data) {
if (skb_queue_empty(&sk->sk_receive_queue)) {
/* entire backlog drained, clear DATA_READY. */
clear_bit(MPTCP_DATA_READY, &msk->flags);
/* .. race-breaker: ssk might get new data after last
* data_available() returns false.
/* .. race-breaker: ssk might have gotten new data
* after last __mptcp_move_skbs() returned false.
*/
ssk = mptcp_subflow_recv_lookup(msk);
if (unlikely(ssk))
if (unlikely(__mptcp_move_skbs(msk)))
set_bit(MPTCP_DATA_READY, &msk->flags);
} else if (unlikely(!test_bit(MPTCP_DATA_READY, &msk->flags))) {
/* data to read but mptcp_wait_data() cleared DATA_READY */
set_bit(MPTCP_DATA_READY, &msk->flags);
}
out_err:
release_sock(sk);
return copied;
}
@ -543,12 +666,24 @@ static void __mptcp_close_ssk(struct sock *sk, struct sock *ssk,
}
}
static void mptcp_worker(struct work_struct *work)
{
struct mptcp_sock *msk = container_of(work, struct mptcp_sock, work);
struct sock *sk = &msk->sk.icsk_inet.sk;
lock_sock(sk);
__mptcp_move_skbs(msk);
release_sock(sk);
sock_put(sk);
}
static int __mptcp_init_sock(struct sock *sk)
{
struct mptcp_sock *msk = mptcp_sk(sk);
INIT_LIST_HEAD(&msk->conn_list);
__set_bit(MPTCP_SEND_SPACE, &msk->flags);
INIT_WORK(&msk->work, mptcp_worker);
msk->first = NULL;
@ -563,6 +698,14 @@ static int mptcp_init_sock(struct sock *sk)
return __mptcp_init_sock(sk);
}
static void mptcp_cancel_work(struct sock *sk)
{
struct mptcp_sock *msk = mptcp_sk(sk);
if (cancel_work_sync(&msk->work))
sock_put(sk);
}
static void mptcp_subflow_shutdown(struct sock *ssk, int how)
{
lock_sock(ssk);
@ -608,6 +751,10 @@ static void mptcp_close(struct sock *sk, long timeout)
__mptcp_close_ssk(sk, ssk, subflow, timeout);
}
mptcp_cancel_work(sk);
__skb_queue_purge(&sk->sk_receive_queue);
sk_common_release(sk);
}
@ -801,6 +948,32 @@ static int mptcp_getsockopt(struct sock *sk, int level, int optname,
return -EOPNOTSUPP;
}
#define MPTCP_DEFERRED_ALL TCPF_DELACK_TIMER_DEFERRED
/* this is very alike tcp_release_cb() but we must handle differently a
* different set of events
*/
static void mptcp_release_cb(struct sock *sk)
{
unsigned long flags, nflags;
do {
flags = sk->sk_tsq_flags;
if (!(flags & MPTCP_DEFERRED_ALL))
return;
nflags = flags & ~MPTCP_DEFERRED_ALL;
} while (cmpxchg(&sk->sk_tsq_flags, flags, nflags) != flags);
if (flags & TCPF_DELACK_TIMER_DEFERRED) {
struct mptcp_sock *msk = mptcp_sk(sk);
struct sock *ssk;
ssk = mptcp_subflow_recv_lookup(msk);
if (!ssk || !schedule_work(&msk->work))
__sock_put(sk);
}
}
static int mptcp_get_port(struct sock *sk, unsigned short snum)
{
struct mptcp_sock *msk = mptcp_sk(sk);
@ -876,6 +1049,7 @@ static struct proto mptcp_prot = {
.destroy = mptcp_destroy,
.sendmsg = mptcp_sendmsg,
.recvmsg = mptcp_recvmsg,
.release_cb = mptcp_release_cb,
.hash = inet_hash,
.unhash = inet_unhash,
.get_port = mptcp_get_port,
@ -1174,6 +1348,8 @@ void mptcp_proto_init(void)
panic("Failed to register MPTCP proto.\n");
inet_register_protosw(&mptcp_protosw);
BUILD_BUG_ON(sizeof(struct mptcp_skb_cb) > sizeof_field(struct sk_buff, cb));
}
#if IS_ENABLED(CONFIG_MPTCP_IPV6)

View File

@ -70,6 +70,7 @@ struct mptcp_sock {
u32 token;
unsigned long flags;
bool can_ack;
struct work_struct work;
struct list_head conn_list;
struct skb_ext *cached_ext; /* for the next sendmsg */
struct socket *subflow; /* outgoing connect/listener/!mp_capable */
@ -190,17 +191,11 @@ void mptcp_proto_init(void);
int mptcp_proto_v6_init(void);
#endif
struct mptcp_read_arg {
struct msghdr *msg;
};
int mptcp_read_actor(read_descriptor_t *desc, struct sk_buff *skb,
unsigned int offset, size_t len);
void mptcp_get_options(const struct sk_buff *skb,
struct tcp_options_received *opt_rx);
void mptcp_finish_connect(struct sock *sk);
void mptcp_data_ready(struct sock *sk, struct sock *ssk);
int mptcp_token_new_request(struct request_sock *req);
void mptcp_token_destroy_request(u32 token);

View File

@ -408,6 +408,18 @@ static enum mapping_status get_mapping_status(struct sock *ssk)
return MAPPING_OK;
}
static int subflow_read_actor(read_descriptor_t *desc,
struct sk_buff *skb,
unsigned int offset, size_t len)
{
size_t copy_len = min(desc->count, len);
desc->count -= copy_len;
pr_debug("flushed %zu bytes, %zu left", copy_len, desc->count);
return copy_len;
}
static bool subflow_check_data_avail(struct sock *ssk)
{
struct mptcp_subflow_context *subflow = mptcp_subflow_ctx(ssk);
@ -482,16 +494,12 @@ static bool subflow_check_data_avail(struct sock *ssk)
pr_debug("discarding %zu bytes, current map len=%d", delta,
map_remaining);
if (delta) {
struct mptcp_read_arg arg = {
.msg = NULL,
};
read_descriptor_t desc = {
.count = delta,
.arg.data = &arg,
};
int ret;
ret = tcp_read_sock(ssk, &desc, mptcp_read_actor);
ret = tcp_read_sock(ssk, &desc, subflow_read_actor);
if (ret < 0) {
ssk->sk_err = -ret;
goto fatal;
@ -554,11 +562,8 @@ static void subflow_data_ready(struct sock *sk)
return;
}
if (mptcp_subflow_data_available(sk)) {
set_bit(MPTCP_DATA_READY, &mptcp_sk(parent)->flags);
parent->sk_data_ready(parent);
}
if (mptcp_subflow_data_available(sk))
mptcp_data_ready(parent, sk);
}
static void subflow_write_space(struct sock *sk)
@ -690,11 +695,8 @@ static void subflow_state_change(struct sock *sk)
* a fin packet carrying a DSS can be unnoticed if we don't trigger
* the data available machinery here.
*/
if (parent && subflow->mp_capable && mptcp_subflow_data_available(sk)) {
set_bit(MPTCP_DATA_READY, &mptcp_sk(parent)->flags);
parent->sk_data_ready(parent);
}
if (parent && subflow->mp_capable && mptcp_subflow_data_available(sk))
mptcp_data_ready(parent, sk);
if (parent && !(parent->sk_shutdown & RCV_SHUTDOWN) &&
!subflow->rx_eof && subflow_is_done(sk)) {