tipc: clean up handling of link congestion

After the recent changes in message importance handling it becomes
possible to simplify handling of messages and sockets when we
encounter link congestion.

We merge the function tipc_link_cong() into link_schedule_user(),
and simplify the code of the latter. The code should now be
easier to follow, especially regarding return codes and handling
of the message that caused the situation.

In case the scheduling function is unable to pre-allocate a wakeup
message buffer, it now returns -ENOBUFS, which is a more correct
code than the previously used -EHOSTUNREACH.

Reviewed-by: Ying Xue <ying.xue@windriver.com>
Reviewed-by: Erik Hugne <erik.hugne@ericsson.com>
Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
This commit is contained in:
Jon Paul Maloy 2015-03-25 12:07:25 -04:00 committed by David S. Miller
parent 1f66d161ab
commit 3127a0200d
2 changed files with 60 additions and 72 deletions

View File

@ -367,28 +367,43 @@ void tipc_link_delete_list(struct net *net, unsigned int bearer_id,
} }
/** /**
* link_schedule_user - schedule user for wakeup after congestion * link_schedule_user - schedule a message sender for wakeup after congestion
* @link: congested link * @link: congested link
* @oport: sending port * @list: message that was attempted sent
* @chain_sz: size of buffer chain that was attempted sent
* @imp: importance of message attempted sent
* Create pseudo msg to send back to user when congestion abates * Create pseudo msg to send back to user when congestion abates
* Only consumes message if there is an error
*/ */
static bool link_schedule_user(struct tipc_link *link, u32 oport, static int link_schedule_user(struct tipc_link *link, struct sk_buff_head *list)
uint chain_sz, uint imp)
{ {
struct sk_buff *buf; struct tipc_msg *msg = buf_msg(skb_peek(list));
int imp = msg_importance(msg);
u32 oport = msg_origport(msg);
u32 addr = link_own_addr(link);
struct sk_buff *skb;
buf = tipc_msg_create(SOCK_WAKEUP, 0, INT_H_SIZE, 0, /* This really cannot happen... */
link_own_addr(link), link_own_addr(link), if (unlikely(imp > TIPC_CRITICAL_IMPORTANCE)) {
oport, 0, 0); pr_warn("%s<%s>, send queue full", link_rst_msg, link->name);
if (!buf) tipc_link_reset(link);
return false; goto err;
TIPC_SKB_CB(buf)->chain_sz = chain_sz; }
TIPC_SKB_CB(buf)->chain_imp = imp; /* Non-blocking sender: */
skb_queue_tail(&link->wakeupq, buf); if (TIPC_SKB_CB(skb_peek(list))->wakeup_pending)
return -ELINKCONG;
/* Create and schedule wakeup pseudo message */
skb = tipc_msg_create(SOCK_WAKEUP, 0, INT_H_SIZE, 0,
addr, addr, oport, 0, 0);
if (!skb)
goto err;
TIPC_SKB_CB(skb)->chain_sz = skb_queue_len(list);
TIPC_SKB_CB(skb)->chain_imp = imp;
skb_queue_tail(&link->wakeupq, skb);
link->stats.link_congs++; link->stats.link_congs++;
return true; return -ELINKCONG;
err:
__skb_queue_purge(list);
return -ENOBUFS;
} }
/** /**
@ -708,48 +723,15 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event)
} }
} }
/* tipc_link_cong: determine return value and how to treat the
* sent buffer during link congestion.
* - For plain, errorless user data messages we keep the buffer and
* return -ELINKONG.
* - For all other messages we discard the buffer and return -EHOSTUNREACH
* - For TIPC internal messages we also reset the link
*/
static int tipc_link_cong(struct tipc_link *link, struct sk_buff_head *list)
{
struct sk_buff *skb = skb_peek(list);
struct tipc_msg *msg = buf_msg(skb);
int imp = msg_importance(msg);
u32 oport = msg_tot_origport(msg);
if (unlikely(imp > TIPC_CRITICAL_IMPORTANCE)) {
pr_warn("%s<%s>, send queue full", link_rst_msg, link->name);
tipc_link_reset(link);
goto drop;
}
if (unlikely(msg_errcode(msg)))
goto drop;
if (unlikely(msg_reroute_cnt(msg)))
goto drop;
if (TIPC_SKB_CB(skb)->wakeup_pending)
return -ELINKCONG;
if (link_schedule_user(link, oport, skb_queue_len(list), imp))
return -ELINKCONG;
drop:
__skb_queue_purge(list);
return -EHOSTUNREACH;
}
/** /**
* __tipc_link_xmit(): same as tipc_link_xmit, but destlink is known & locked * __tipc_link_xmit(): same as tipc_link_xmit, but destlink is known & locked
* @link: link to use * @link: link to use
* @list: chain of buffers containing message * @list: chain of buffers containing message
* *
* Consumes the buffer chain, except when returning -ELINKCONG * Consumes the buffer chain, except when returning -ELINKCONG,
* Returns 0 if success, otherwise errno: -ELINKCONG, -EMSGSIZE (plain socket * since the caller then may want to make more send attempts.
* user data messages) or -EHOSTUNREACH (all other messages/senders) * Returns 0 if success, or errno: -ELINKCONG, -EMSGSIZE or -ENOBUFS
* Only the socket functions tipc_send_stream() and tipc_send_packet() need * Messages at TIPC_SYSTEM_IMPORTANCE are always accepted
* to act on the return value, since they may need to do more send attempts.
*/ */
int __tipc_link_xmit(struct net *net, struct tipc_link *link, int __tipc_link_xmit(struct net *net, struct tipc_link *link,
struct sk_buff_head *list) struct sk_buff_head *list)
@ -768,7 +750,7 @@ int __tipc_link_xmit(struct net *net, struct tipc_link *link,
/* Match backlog limit against msg importance: */ /* Match backlog limit against msg importance: */
if (unlikely(link->backlog[imp].len >= link->backlog[imp].limit)) if (unlikely(link->backlog[imp].len >= link->backlog[imp].limit))
return tipc_link_cong(link, list); return link_schedule_user(link, list);
if (unlikely(msg_size(msg) > mtu)) { if (unlikely(msg_size(msg) > mtu)) {
__skb_queue_purge(list); __skb_queue_purge(list);
@ -820,13 +802,25 @@ static int __tipc_link_xmit_skb(struct tipc_link *link, struct sk_buff *skb)
return __tipc_link_xmit(link->owner->net, link, &head); return __tipc_link_xmit(link->owner->net, link, &head);
} }
/* tipc_link_xmit_skb(): send single buffer to destination
* Buffers sent via this functon are generally TIPC_SYSTEM_IMPORTANCE
* messages, which will not be rejected
* The only exception is datagram messages rerouted after secondary
* lookup, which are rare and safe to dispose of anyway.
* TODO: Return real return value, and let callers use
* tipc_wait_for_sendpkt() where applicable
*/
int tipc_link_xmit_skb(struct net *net, struct sk_buff *skb, u32 dnode, int tipc_link_xmit_skb(struct net *net, struct sk_buff *skb, u32 dnode,
u32 selector) u32 selector)
{ {
struct sk_buff_head head; struct sk_buff_head head;
int rc;
skb2list(skb, &head); skb2list(skb, &head);
return tipc_link_xmit(net, &head, dnode, selector); rc = tipc_link_xmit(net, &head, dnode, selector);
if (rc == -ELINKCONG)
kfree_skb(skb);
return 0;
} }
/** /**

View File

@ -240,6 +240,15 @@ static inline void msg_set_size(struct tipc_msg *m, u32 sz)
m->hdr[0] = htonl((msg_word(m, 0) & ~0x1ffff) | sz); m->hdr[0] = htonl((msg_word(m, 0) & ~0x1ffff) | sz);
} }
static inline unchar *msg_data(struct tipc_msg *m)
{
return ((unchar *)m) + msg_hdr_sz(m);
}
static inline struct tipc_msg *msg_get_wrapped(struct tipc_msg *m)
{
return (struct tipc_msg *)msg_data(m);
}
/* /*
* Word 1 * Word 1
@ -372,6 +381,8 @@ static inline void msg_set_prevnode(struct tipc_msg *m, u32 a)
static inline u32 msg_origport(struct tipc_msg *m) static inline u32 msg_origport(struct tipc_msg *m)
{ {
if (msg_user(m) == MSG_FRAGMENTER)
m = msg_get_wrapped(m);
return msg_word(m, 4); return msg_word(m, 4);
} }
@ -467,16 +478,6 @@ static inline void msg_set_nameupper(struct tipc_msg *m, u32 n)
msg_set_word(m, 10, n); msg_set_word(m, 10, n);
} }
static inline unchar *msg_data(struct tipc_msg *m)
{
return ((unchar *)m) + msg_hdr_sz(m);
}
static inline struct tipc_msg *msg_get_wrapped(struct tipc_msg *m)
{
return (struct tipc_msg *)msg_data(m);
}
/* /*
* Constants and routines used to read and write TIPC internal message headers * Constants and routines used to read and write TIPC internal message headers
*/ */
@ -753,13 +754,6 @@ static inline void msg_set_link_tolerance(struct tipc_msg *m, u32 n)
msg_set_bits(m, 9, 0, 0xffff, n); msg_set_bits(m, 9, 0, 0xffff, n);
} }
static inline u32 msg_tot_origport(struct tipc_msg *m)
{
if ((msg_user(m) == MSG_FRAGMENTER) && (msg_type(m) == FIRST_FRAGMENT))
return msg_origport(msg_get_wrapped(m));
return msg_origport(m);
}
struct sk_buff *tipc_buf_acquire(u32 size); struct sk_buff *tipc_buf_acquire(u32 size);
bool tipc_msg_validate(struct sk_buff *skb); bool tipc_msg_validate(struct sk_buff *skb);
bool tipc_msg_reverse(u32 own_addr, struct sk_buff *buf, u32 *dnode, bool tipc_msg_reverse(u32 own_addr, struct sk_buff *buf, u32 *dnode,