tipc: redesign connection-level flow control

There are two flow control mechanisms in TIPC; one at link level that
handles network congestion, burst control, and retransmission, and one
at connection level which' only remaining task is to prevent overflow
in the receiving socket buffer. In TIPC, the latter task has to be
solved end-to-end because messages can not be thrown away once they
have been accepted and delivered upwards from the link layer, i.e, we
can never permit the receive buffer to overflow.

Currently, this algorithm is message based. A counter in the receiving
socket keeps track of number of consumed messages, and sends a dedicated
acknowledge message back to the sender for each 256 consumed message.
A counter at the sending end keeps track of the sent, not yet
acknowledged messages, and blocks the sender if this number ever reaches
512 unacknowledged messages. When the missing acknowledge arrives, the
socket is then woken up for renewed transmission. This works well for
keeping the message flow running, as it almost never happens that a
sender socket is blocked this way.

A problem with the current mechanism is that it potentially is very
memory consuming. Since we don't distinguish between small and large
messages, we have to dimension the socket receive buffer according
to a worst-case of both. I.e., the window size must be chosen large
enough to sustain a reasonable throughput even for the smallest
messages, while we must still consider a scenario where all messages
are of maximum size. Hence, the current fix window size of 512 messages
and a maximum message size of 66k results in a receive buffer of 66 MB
when truesize(66k) = 131k is taken into account. It is possible to do
much better.

This commit introduces an algorithm where we instead use 1024-byte
blocks as base unit. This unit, always rounded upwards from the
actual message size, is used when we advertise windows as well as when
we count and acknowledge transmitted data. The advertised window is
based on the configured receive buffer size in such a way that even
the worst-case truesize/msgsize ratio always is covered. Since the
smallest possible message size (from a flow control viewpoint) now is
1024 bytes, we can safely assume this ratio to be less than four, which
is the value we are now using.

This way, we have been able to reduce the default receive buffer size
from 66 MB to 2 MB with maintained performance.

In order to keep this solution backwards compatible, we introduce a
new capability bit in the discovery protocol, and use this throughout
the message sending/reception path to always select the right unit.

Acked-by: Ying Xue <ying.xue@windriver.com>
Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
This commit is contained in:
Jon Paul Maloy 2016-05-02 11:58:47 -04:00 committed by David S. Miller
parent 60020e1857
commit 10724cc7bb
5 changed files with 122 additions and 62 deletions

View File

@ -112,11 +112,9 @@ static int __init tipc_init(void)
pr_info("Activated (version " TIPC_MOD_VER ")\n");
sysctl_tipc_rmem[0] = TIPC_CONN_OVERLOAD_LIMIT >> 4 <<
TIPC_LOW_IMPORTANCE;
sysctl_tipc_rmem[1] = TIPC_CONN_OVERLOAD_LIMIT >> 4 <<
TIPC_CRITICAL_IMPORTANCE;
sysctl_tipc_rmem[2] = TIPC_CONN_OVERLOAD_LIMIT;
sysctl_tipc_rmem[0] = RCVBUF_MIN;
sysctl_tipc_rmem[1] = RCVBUF_DEF;
sysctl_tipc_rmem[2] = RCVBUF_MAX;
err = tipc_netlink_start();
if (err)

View File

@ -743,16 +743,26 @@ static inline void msg_set_msgcnt(struct tipc_msg *m, u16 n)
msg_set_bits(m, 9, 16, 0xffff, n);
}
static inline u32 msg_bcast_tag(struct tipc_msg *m)
static inline u32 msg_conn_ack(struct tipc_msg *m)
{
return msg_bits(m, 9, 16, 0xffff);
}
static inline void msg_set_bcast_tag(struct tipc_msg *m, u32 n)
static inline void msg_set_conn_ack(struct tipc_msg *m, u32 n)
{
msg_set_bits(m, 9, 16, 0xffff, n);
}
static inline u32 msg_adv_win(struct tipc_msg *m)
{
return msg_bits(m, 9, 0, 0xffff);
}
static inline void msg_set_adv_win(struct tipc_msg *m, u32 n)
{
msg_set_bits(m, 9, 0, 0xffff, n);
}
static inline u32 msg_max_pkt(struct tipc_msg *m)
{
return msg_bits(m, 9, 16, 0xffff) * 4;

View File

@ -45,10 +45,11 @@
/* Optional capabilities supported by this code version
*/
enum {
TIPC_BCAST_SYNCH = (1 << 1)
TIPC_BCAST_SYNCH = (1 << 1),
TIPC_BLOCK_FLOWCTL = (2 << 1)
};
#define TIPC_NODE_CAPABILITIES TIPC_BCAST_SYNCH
#define TIPC_NODE_CAPABILITIES (TIPC_BCAST_SYNCH | TIPC_BLOCK_FLOWCTL)
#define INVALID_BEARER_ID -1
void tipc_node_stop(struct net *net);

View File

@ -96,9 +96,11 @@ struct tipc_sock {
uint conn_timeout;
atomic_t dupl_rcvcnt;
bool link_cong;
uint sent_unacked;
uint rcv_unacked;
u16 snt_unacked;
u16 snd_win;
u16 peer_caps;
u16 rcv_unacked;
u16 rcv_win;
struct sockaddr_tipc remote;
struct rhash_head node;
struct rcu_head rcu;
@ -228,9 +230,29 @@ static struct tipc_sock *tipc_sk(const struct sock *sk)
return container_of(sk, struct tipc_sock, sk);
}
static int tsk_conn_cong(struct tipc_sock *tsk)
static bool tsk_conn_cong(struct tipc_sock *tsk)
{
return tsk->sent_unacked >= TIPC_FLOWCTRL_WIN;
return tsk->snt_unacked >= tsk->snd_win;
}
/* tsk_blocks(): translate a buffer size in bytes to number of
* advertisable blocks, taking into account the ratio truesize(len)/len
* We can trust that this ratio is always < 4 for len >= FLOWCTL_BLK_SZ
*/
static u16 tsk_adv_blocks(int len)
{
return len / FLOWCTL_BLK_SZ / 4;
}
/* tsk_inc(): increment counter for sent or received data
* - If block based flow control is not supported by peer we
* fall back to message based ditto, incrementing the counter
*/
static u16 tsk_inc(struct tipc_sock *tsk, int msglen)
{
if (likely(tsk->peer_caps & TIPC_BLOCK_FLOWCTL))
return ((msglen / FLOWCTL_BLK_SZ) + 1);
return 1;
}
/**
@ -378,9 +400,12 @@ static int tipc_sk_create(struct net *net, struct socket *sock,
sk->sk_write_space = tipc_write_space;
sk->sk_destruct = tipc_sock_destruct;
tsk->conn_timeout = CONN_TIMEOUT_DEFAULT;
tsk->sent_unacked = 0;
atomic_set(&tsk->dupl_rcvcnt, 0);
/* Start out with safe limits until we receive an advertised window */
tsk->snd_win = tsk_adv_blocks(RCVBUF_MIN);
tsk->rcv_win = tsk->snd_win;
if (sock->state == SS_READY) {
tsk_set_unreturnable(tsk, true);
if (sock->type == SOCK_DGRAM)
@ -776,7 +801,7 @@ static void tipc_sk_proto_rcv(struct tipc_sock *tsk, struct sk_buff *skb)
struct sock *sk = &tsk->sk;
struct tipc_msg *hdr = buf_msg(skb);
int mtyp = msg_type(hdr);
int conn_cong;
bool conn_cong;
/* Ignore if connection cannot be validated: */
if (!tsk_peer_msg(tsk, hdr))
@ -790,7 +815,9 @@ static void tipc_sk_proto_rcv(struct tipc_sock *tsk, struct sk_buff *skb)
return;
} else if (mtyp == CONN_ACK) {
conn_cong = tsk_conn_cong(tsk);
tsk->sent_unacked -= msg_msgcnt(hdr);
tsk->snt_unacked -= msg_conn_ack(hdr);
if (tsk->peer_caps & TIPC_BLOCK_FLOWCTL)
tsk->snd_win = msg_adv_win(hdr);
if (conn_cong)
sk->sk_write_space(sk);
} else if (mtyp != CONN_PROBE_REPLY) {
@ -1021,12 +1048,14 @@ static int __tipc_send_stream(struct socket *sock, struct msghdr *m, size_t dsz)
u32 dnode;
uint mtu, send, sent = 0;
struct iov_iter save;
int hlen = MIN_H_SIZE;
/* Handle implied connection establishment */
if (unlikely(dest)) {
rc = __tipc_sendmsg(sock, m, dsz);
hlen = msg_hdr_sz(mhdr);
if (dsz && (dsz == rc))
tsk->sent_unacked = 1;
tsk->snt_unacked = tsk_inc(tsk, dsz + hlen);
return rc;
}
if (dsz > (uint)INT_MAX)
@ -1055,7 +1084,7 @@ static int __tipc_send_stream(struct socket *sock, struct msghdr *m, size_t dsz)
if (likely(!tsk_conn_cong(tsk))) {
rc = tipc_node_xmit(net, &pktchain, dnode, portid);
if (likely(!rc)) {
tsk->sent_unacked++;
tsk->snt_unacked += tsk_inc(tsk, send + hlen);
sent += send;
if (sent == dsz)
return dsz;
@ -1120,6 +1149,12 @@ static void tipc_sk_finish_conn(struct tipc_sock *tsk, u32 peer_port,
tipc_node_add_conn(net, peer_node, tsk->portid, peer_port);
tsk->max_pkt = tipc_node_get_mtu(net, peer_node, tsk->portid);
tsk->peer_caps = tipc_node_get_capabilities(net, peer_node);
if (tsk->peer_caps & TIPC_BLOCK_FLOWCTL)
return;
/* Fall back to message based flow control */
tsk->rcv_win = FLOWCTL_MSG_WIN;
tsk->snd_win = FLOWCTL_MSG_WIN;
}
/**
@ -1216,7 +1251,7 @@ static int tipc_sk_anc_data_recv(struct msghdr *m, struct tipc_msg *msg,
return 0;
}
static void tipc_sk_send_ack(struct tipc_sock *tsk, uint ack)
static void tipc_sk_send_ack(struct tipc_sock *tsk)
{
struct net *net = sock_net(&tsk->sk);
struct sk_buff *skb = NULL;
@ -1232,7 +1267,14 @@ static void tipc_sk_send_ack(struct tipc_sock *tsk, uint ack)
if (!skb)
return;
msg = buf_msg(skb);
msg_set_msgcnt(msg, ack);
msg_set_conn_ack(msg, tsk->rcv_unacked);
tsk->rcv_unacked = 0;
/* Adjust to and advertize the correct window limit */
if (tsk->peer_caps & TIPC_BLOCK_FLOWCTL) {
tsk->rcv_win = tsk_adv_blocks(tsk->sk.sk_rcvbuf);
msg_set_adv_win(msg, tsk->rcv_win);
}
tipc_node_xmit_skb(net, skb, dnode, msg_link_selector(msg));
}
@ -1290,7 +1332,7 @@ static int tipc_recvmsg(struct socket *sock, struct msghdr *m, size_t buf_len,
long timeo;
unsigned int sz;
u32 err;
int res;
int res, hlen;
/* Catch invalid receive requests */
if (unlikely(!buf_len))
@ -1315,6 +1357,7 @@ static int tipc_recvmsg(struct socket *sock, struct msghdr *m, size_t buf_len,
buf = skb_peek(&sk->sk_receive_queue);
msg = buf_msg(buf);
sz = msg_data_sz(msg);
hlen = msg_hdr_sz(msg);
err = msg_errcode(msg);
/* Discard an empty non-errored message & try again */
@ -1337,7 +1380,7 @@ static int tipc_recvmsg(struct socket *sock, struct msghdr *m, size_t buf_len,
sz = buf_len;
m->msg_flags |= MSG_TRUNC;
}
res = skb_copy_datagram_msg(buf, msg_hdr_sz(msg), m, sz);
res = skb_copy_datagram_msg(buf, hlen, m, sz);
if (res)
goto exit;
res = sz;
@ -1349,15 +1392,15 @@ static int tipc_recvmsg(struct socket *sock, struct msghdr *m, size_t buf_len,
res = -ECONNRESET;
}
/* Consume received message (optional) */
if (likely(!(flags & MSG_PEEK))) {
if ((sock->state != SS_READY) &&
(++tsk->rcv_unacked >= TIPC_CONNACK_INTV)) {
tipc_sk_send_ack(tsk, tsk->rcv_unacked);
tsk->rcv_unacked = 0;
}
tsk_advance_rx_queue(sk);
if (unlikely(flags & MSG_PEEK))
goto exit;
if (likely(sock->state != SS_READY)) {
tsk->rcv_unacked += tsk_inc(tsk, hlen + sz);
if (unlikely(tsk->rcv_unacked >= (tsk->rcv_win / 4)))
tipc_sk_send_ack(tsk);
}
tsk_advance_rx_queue(sk);
exit:
release_sock(sk);
return res;
@ -1386,7 +1429,7 @@ static int tipc_recv_stream(struct socket *sock, struct msghdr *m,
int sz_to_copy, target, needed;
int sz_copied = 0;
u32 err;
int res = 0;
int res = 0, hlen;
/* Catch invalid receive attempts */
if (unlikely(!buf_len))
@ -1412,6 +1455,7 @@ static int tipc_recv_stream(struct socket *sock, struct msghdr *m,
buf = skb_peek(&sk->sk_receive_queue);
msg = buf_msg(buf);
sz = msg_data_sz(msg);
hlen = msg_hdr_sz(msg);
err = msg_errcode(msg);
/* Discard an empty non-errored message & try again */
@ -1436,8 +1480,7 @@ static int tipc_recv_stream(struct socket *sock, struct msghdr *m,
needed = (buf_len - sz_copied);
sz_to_copy = (sz <= needed) ? sz : needed;
res = skb_copy_datagram_msg(buf, msg_hdr_sz(msg) + offset,
m, sz_to_copy);
res = skb_copy_datagram_msg(buf, hlen + offset, m, sz_to_copy);
if (res)
goto exit;
@ -1459,20 +1502,18 @@ static int tipc_recv_stream(struct socket *sock, struct msghdr *m,
res = -ECONNRESET;
}
/* Consume received message (optional) */
if (likely(!(flags & MSG_PEEK))) {
if (unlikely(++tsk->rcv_unacked >= TIPC_CONNACK_INTV)) {
tipc_sk_send_ack(tsk, tsk->rcv_unacked);
tsk->rcv_unacked = 0;
}
tsk_advance_rx_queue(sk);
}
if (unlikely(flags & MSG_PEEK))
goto exit;
tsk->rcv_unacked += tsk_inc(tsk, hlen + sz);
if (unlikely(tsk->rcv_unacked >= (tsk->rcv_win / 4)))
tipc_sk_send_ack(tsk);
tsk_advance_rx_queue(sk);
/* Loop around if more data is required */
if ((sz_copied < buf_len) && /* didn't get all requested data */
(!skb_queue_empty(&sk->sk_receive_queue) ||
(sz_copied < target)) && /* and more is ready or required */
(!(flags & MSG_PEEK)) && /* and aren't just peeking at data */
(!err)) /* and haven't reached a FIN */
goto restart;
@ -1604,30 +1645,33 @@ static bool filter_connect(struct tipc_sock *tsk, struct sk_buff *skb)
/**
* rcvbuf_limit - get proper overload limit of socket receive queue
* @sk: socket
* @buf: message
* @skb: message
*
* For all connection oriented messages, irrespective of importance,
* the default overload value (i.e. 67MB) is set as limit.
* For connection oriented messages, irrespective of importance,
* default queue limit is 2 MB.
*
* For all connectionless messages, by default new queue limits are
* as belows:
* For connectionless messages, queue limits are based on message
* importance as follows:
*
* TIPC_LOW_IMPORTANCE (4 MB)
* TIPC_MEDIUM_IMPORTANCE (8 MB)
* TIPC_HIGH_IMPORTANCE (16 MB)
* TIPC_CRITICAL_IMPORTANCE (32 MB)
* TIPC_LOW_IMPORTANCE (2 MB)
* TIPC_MEDIUM_IMPORTANCE (4 MB)
* TIPC_HIGH_IMPORTANCE (8 MB)
* TIPC_CRITICAL_IMPORTANCE (16 MB)
*
* Returns overload limit according to corresponding message importance
*/
static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *buf)
static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *skb)
{
struct tipc_msg *msg = buf_msg(buf);
struct tipc_sock *tsk = tipc_sk(sk);
struct tipc_msg *hdr = buf_msg(skb);
if (msg_connected(msg))
return sysctl_tipc_rmem[2];
if (unlikely(!msg_connected(hdr)))
return sk->sk_rcvbuf << msg_importance(hdr);
return sk->sk_rcvbuf >> TIPC_CRITICAL_IMPORTANCE <<
msg_importance(msg);
if (likely(tsk->peer_caps & TIPC_BLOCK_FLOWCTL))
return sk->sk_rcvbuf;
return FLOWCTL_MSG_LIM;
}
/**

View File

@ -1,6 +1,6 @@
/* net/tipc/socket.h: Include file for TIPC socket code
*
* Copyright (c) 2014-2015, Ericsson AB
* Copyright (c) 2014-2016, Ericsson AB
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@ -38,10 +38,17 @@
#include <net/sock.h>
#include <net/genetlink.h>
#define TIPC_CONNACK_INTV 256
#define TIPC_FLOWCTRL_WIN (TIPC_CONNACK_INTV * 2)
#define TIPC_CONN_OVERLOAD_LIMIT ((TIPC_FLOWCTRL_WIN * 2 + 1) * \
SKB_TRUESIZE(TIPC_MAX_USER_MSG_SIZE))
/* Compatibility values for deprecated message based flow control */
#define FLOWCTL_MSG_WIN 512
#define FLOWCTL_MSG_LIM ((FLOWCTL_MSG_WIN * 2 + 1) * SKB_TRUESIZE(MAX_MSG_SIZE))
#define FLOWCTL_BLK_SZ 1024
/* Socket receive buffer sizes */
#define RCVBUF_MIN (FLOWCTL_BLK_SZ * 512)
#define RCVBUF_DEF (FLOWCTL_BLK_SZ * 1024 * 2)
#define RCVBUF_MAX (FLOWCTL_BLK_SZ * 1024 * 16)
int tipc_socket_init(void);
void tipc_socket_stop(void);
void tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq);