Merge branch 'tipc-link-setup-improvements'

Jon Maloy says:

====================
tipc: improvements to the link setup algorithm

This series addresses some smaller issues regarding the link setup
algorithm. The first commit fixes a rare bug we have discovered during
testing; the second one may have some future impact on cluster
scalabilty, while remaining ones can be regarded as cosmetic in
a wider sense of the word.
====================

Signed-off-by: David S. Miller <davem@davemloft.net>
This commit is contained in:
David S. Miller 2016-04-15 16:09:07 -04:00
commit 0818556cd0
4 changed files with 49 additions and 27 deletions

View File

@ -140,6 +140,7 @@ struct tipc_link {
char if_name[TIPC_MAX_IF_NAME];
u32 priority;
char net_plane;
u16 rst_cnt;
/* Failover/synch */
u16 drop_point;
@ -699,42 +700,37 @@ static void link_profile_stats(struct tipc_link *l)
l->stats.msg_length_profile[6]++;
}
/* tipc_link_timeout - perform periodic task as instructed from node timeout
*/
/* tipc_link_timeout - perform periodic task as instructed from node timeout
*/
int tipc_link_timeout(struct tipc_link *l, struct sk_buff_head *xmitq)
{
int rc = 0;
int mtyp = STATE_MSG;
bool xmit = false;
bool prb = false;
int mtyp, rc = 0;
bool state = false;
bool probe = false;
bool setup = false;
u16 bc_snt = l->bc_sndlink->snd_nxt - 1;
u16 bc_acked = l->bc_rcvlink->acked;
bool bc_up = link_is_up(l->bc_rcvlink);
link_profile_stats(l);
switch (l->state) {
case LINK_ESTABLISHED:
case LINK_SYNCHING:
if (!l->silent_intv_cnt) {
if (bc_up && (bc_acked != bc_snt))
xmit = true;
} else if (l->silent_intv_cnt <= l->abort_limit) {
xmit = true;
prb = true;
} else {
rc |= tipc_link_fsm_evt(l, LINK_FAILURE_EVT);
}
l->silent_intv_cnt++;
if (l->silent_intv_cnt > l->abort_limit)
return tipc_link_fsm_evt(l, LINK_FAILURE_EVT);
mtyp = STATE_MSG;
state = bc_acked != bc_snt;
probe = l->silent_intv_cnt;
if (probe)
l->silent_intv_cnt++;
break;
case LINK_RESET:
xmit = true;
setup = l->rst_cnt++ <= 4;
setup |= !(l->rst_cnt % 16);
mtyp = RESET_MSG;
break;
case LINK_ESTABLISHING:
xmit = true;
setup = true;
mtyp = ACTIVATE_MSG;
break;
case LINK_PEER_RESET:
@ -745,8 +741,8 @@ int tipc_link_timeout(struct tipc_link *l, struct sk_buff_head *xmitq)
break;
}
if (xmit)
tipc_link_build_proto_msg(l, mtyp, prb, 0, 0, 0, xmitq);
if (state || probe || setup)
tipc_link_build_proto_msg(l, mtyp, probe, 0, 0, 0, xmitq);
return rc;
}
@ -833,6 +829,7 @@ void tipc_link_reset(struct tipc_link *l)
l->rcv_nxt = 1;
l->acked = 0;
l->silent_intv_cnt = 0;
l->rst_cnt = 0;
l->stats.recv_info = 0;
l->stale_count = 0;
l->bc_peer_is_up = false;
@ -1110,12 +1107,12 @@ static bool tipc_link_release_pkts(struct tipc_link *l, u16 acked)
return released;
}
/* tipc_link_build_ack_msg: prepare link acknowledge message for transmission
/* tipc_link_build_state_msg: prepare link state message for transmission
*
* Note that sending of broadcast ack is coordinated among nodes, to reduce
* risk of ack storms towards the sender
*/
int tipc_link_build_ack_msg(struct tipc_link *l, struct sk_buff_head *xmitq)
int tipc_link_build_state_msg(struct tipc_link *l, struct sk_buff_head *xmitq)
{
if (!l)
return 0;
@ -1140,11 +1137,17 @@ int tipc_link_build_ack_msg(struct tipc_link *l, struct sk_buff_head *xmitq)
void tipc_link_build_reset_msg(struct tipc_link *l, struct sk_buff_head *xmitq)
{
int mtyp = RESET_MSG;
struct sk_buff *skb;
if (l->state == LINK_ESTABLISHING)
mtyp = ACTIVATE_MSG;
tipc_link_build_proto_msg(l, mtyp, 0, 0, 0, 0, xmitq);
/* Inform peer that this endpoint is going down if applicable */
skb = skb_peek_tail(xmitq);
if (skb && (l->state == LINK_RESET))
msg_set_peer_stopping(buf_msg(skb), 1);
}
/* tipc_link_build_nack_msg: prepare link nack message for transmission
@ -1219,7 +1222,7 @@ int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb,
if (!tipc_data_input(l, skb, l->inputq))
rc |= tipc_link_input(l, skb, l->inputq);
if (unlikely(++l->rcv_unacked >= TIPC_MIN_LINK_WIN))
rc |= tipc_link_build_ack_msg(l, xmitq);
rc |= tipc_link_build_state_msg(l, xmitq);
if (unlikely(rc & ~TIPC_LINK_SND_BC_ACK))
break;
} while ((skb = __skb_dequeue(defq)));
@ -1411,7 +1414,9 @@ static int tipc_link_proto_rcv(struct tipc_link *l, struct sk_buff *skb,
l->priority = peers_prio;
/* ACTIVATE_MSG serves as PEER_RESET if link is already down */
if ((mtyp == RESET_MSG) || !link_is_up(l))
if (msg_peer_stopping(hdr))
rc = tipc_link_fsm_evt(l, LINK_FAILURE_EVT);
else if ((mtyp == RESET_MSG) || !link_is_up(l))
rc = tipc_link_fsm_evt(l, LINK_PEER_RESET_EVT);
/* ACTIVATE_MSG takes up link if it was already locally reset */

View File

@ -123,7 +123,7 @@ int tipc_nl_parse_link_prop(struct nlattr *prop, struct nlattr *props[]);
int tipc_link_timeout(struct tipc_link *l, struct sk_buff_head *xmitq);
int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb,
struct sk_buff_head *xmitq);
int tipc_link_build_ack_msg(struct tipc_link *l, struct sk_buff_head *xmitq);
int tipc_link_build_state_msg(struct tipc_link *l, struct sk_buff_head *xmitq);
void tipc_link_add_bc_peer(struct tipc_link *snd_l,
struct tipc_link *uc_l,
struct sk_buff_head *xmitq);

View File

@ -715,6 +715,16 @@ static inline void msg_set_redundant_link(struct tipc_msg *m, u32 r)
msg_set_bits(m, 5, 12, 0x1, r);
}
static inline u32 msg_peer_stopping(struct tipc_msg *m)
{
return msg_bits(m, 5, 13, 0x1);
}
static inline void msg_set_peer_stopping(struct tipc_msg *m, u32 s)
{
msg_set_bits(m, 5, 13, 0x1, s);
}
static inline char *msg_media_addr(struct tipc_msg *m)
{
return (char *)&m->hdr[TIPC_MEDIA_INFO_OFFSET];

View File

@ -545,6 +545,9 @@ static void __tipc_node_link_up(struct tipc_node *n, int bearer_id,
pr_debug("Established link <%s> on network plane %c\n",
tipc_link_name(nl), tipc_link_plane(nl));
/* Ensure that a STATE message goes first */
tipc_link_build_state_msg(nl, xmitq);
/* First link? => give it both slots */
if (!ol) {
*slot0 = bearer_id;
@ -581,8 +584,12 @@ static void __tipc_node_link_up(struct tipc_node *n, int bearer_id,
static void tipc_node_link_up(struct tipc_node *n, int bearer_id,
struct sk_buff_head *xmitq)
{
struct tipc_media_addr *maddr;
tipc_node_write_lock(n);
__tipc_node_link_up(n, bearer_id, xmitq);
maddr = &n->links[bearer_id].maddr;
tipc_bearer_xmit(n->net, bearer_id, xmitq, maddr);
tipc_node_write_unlock(n);
}
@ -1279,7 +1286,7 @@ static void tipc_node_bc_rcv(struct net *net, struct sk_buff *skb, int bearer_id
/* Broadcast ACKs are sent on a unicast link */
if (rc & TIPC_LINK_SND_BC_ACK) {
tipc_node_read_lock(n);
tipc_link_build_ack_msg(le->link, &xmitq);
tipc_link_build_state_msg(le->link, &xmitq);
tipc_node_read_unlock(n);
}