Merge branch 'tipc-fix-some-issues'

Tuong Lien says:

====================
tipc: fix some issues

This series consists of some bug-fixes for TIPC.
====================

Signed-off-by: David S. Miller <davem@davemloft.net>
This commit is contained in:
David S. Miller 2019-12-10 17:45:04 -08:00
commit f1ce0a1557
4 changed files with 215 additions and 126 deletions

View File

@ -305,17 +305,17 @@ static int tipc_rcast_xmit(struct net *net, struct sk_buff_head *pkts,
* @skb: socket buffer to copy * @skb: socket buffer to copy
* @method: send method to be used * @method: send method to be used
* @dests: destination nodes for message. * @dests: destination nodes for message.
* @cong_link_cnt: returns number of encountered congested destination links
* Returns 0 if success, otherwise errno * Returns 0 if success, otherwise errno
*/ */
static int tipc_mcast_send_sync(struct net *net, struct sk_buff *skb, static int tipc_mcast_send_sync(struct net *net, struct sk_buff *skb,
struct tipc_mc_method *method, struct tipc_mc_method *method,
struct tipc_nlist *dests, struct tipc_nlist *dests)
u16 *cong_link_cnt)
{ {
struct tipc_msg *hdr, *_hdr; struct tipc_msg *hdr, *_hdr;
struct sk_buff_head tmpq; struct sk_buff_head tmpq;
struct sk_buff *_skb; struct sk_buff *_skb;
u16 cong_link_cnt;
int rc = 0;
/* Is a cluster supporting with new capabilities ? */ /* Is a cluster supporting with new capabilities ? */
if (!(tipc_net(net)->capabilities & TIPC_MCAST_RBCTL)) if (!(tipc_net(net)->capabilities & TIPC_MCAST_RBCTL))
@ -343,18 +343,19 @@ static int tipc_mcast_send_sync(struct net *net, struct sk_buff *skb,
_hdr = buf_msg(_skb); _hdr = buf_msg(_skb);
msg_set_size(_hdr, MCAST_H_SIZE); msg_set_size(_hdr, MCAST_H_SIZE);
msg_set_is_rcast(_hdr, !msg_is_rcast(hdr)); msg_set_is_rcast(_hdr, !msg_is_rcast(hdr));
msg_set_errcode(_hdr, TIPC_ERR_NO_PORT);
__skb_queue_head_init(&tmpq); __skb_queue_head_init(&tmpq);
__skb_queue_tail(&tmpq, _skb); __skb_queue_tail(&tmpq, _skb);
if (method->rcast) if (method->rcast)
tipc_bcast_xmit(net, &tmpq, cong_link_cnt); rc = tipc_bcast_xmit(net, &tmpq, &cong_link_cnt);
else else
tipc_rcast_xmit(net, &tmpq, dests, cong_link_cnt); rc = tipc_rcast_xmit(net, &tmpq, dests, &cong_link_cnt);
/* This queue should normally be empty by now */ /* This queue should normally be empty by now */
__skb_queue_purge(&tmpq); __skb_queue_purge(&tmpq);
return 0; return rc;
} }
/* tipc_mcast_xmit - deliver message to indicated destination nodes /* tipc_mcast_xmit - deliver message to indicated destination nodes
@ -396,9 +397,14 @@ int tipc_mcast_xmit(struct net *net, struct sk_buff_head *pkts,
msg_set_is_rcast(hdr, method->rcast); msg_set_is_rcast(hdr, method->rcast);
/* Switch method ? */ /* Switch method ? */
if (rcast != method->rcast) if (rcast != method->rcast) {
tipc_mcast_send_sync(net, skb, method, rc = tipc_mcast_send_sync(net, skb, method, dests);
dests, cong_link_cnt); if (unlikely(rc)) {
pr_err("Unable to send SYN: method %d, rc %d\n",
rcast, rc);
goto exit;
}
}
if (method->rcast) if (method->rcast)
rc = tipc_rcast_xmit(net, pkts, dests, cong_link_cnt); rc = tipc_rcast_xmit(net, pkts, dests, cong_link_cnt);

View File

@ -194,6 +194,7 @@ void tipc_disc_rcv(struct net *net, struct sk_buff *skb,
{ {
struct tipc_net *tn = tipc_net(net); struct tipc_net *tn = tipc_net(net);
struct tipc_msg *hdr = buf_msg(skb); struct tipc_msg *hdr = buf_msg(skb);
u32 pnet_hash = msg_peer_net_hash(hdr);
u16 caps = msg_node_capabilities(hdr); u16 caps = msg_node_capabilities(hdr);
bool legacy = tn->legacy_addr_format; bool legacy = tn->legacy_addr_format;
u32 sugg = msg_sugg_node_addr(hdr); u32 sugg = msg_sugg_node_addr(hdr);
@ -242,9 +243,8 @@ void tipc_disc_rcv(struct net *net, struct sk_buff *skb,
return; return;
if (!tipc_in_scope(legacy, b->domain, src)) if (!tipc_in_scope(legacy, b->domain, src))
return; return;
tipc_node_check_dest(net, src, peer_id, b, caps, signature, tipc_node_check_dest(net, src, peer_id, b, caps, signature, pnet_hash,
msg_peer_net_hash(hdr), &maddr, &respond, &maddr, &respond, &dupl_addr);
&dupl_addr);
if (dupl_addr) if (dupl_addr)
disc_dupl_alert(b, src, &maddr); disc_dupl_alert(b, src, &maddr);
if (!respond) if (!respond)

View File

@ -36,6 +36,7 @@
#include <net/sock.h> #include <net/sock.h>
#include <linux/list_sort.h> #include <linux/list_sort.h>
#include <linux/rbtree_augmented.h>
#include "core.h" #include "core.h"
#include "netlink.h" #include "netlink.h"
#include "name_table.h" #include "name_table.h"
@ -51,6 +52,7 @@
* @lower: service range lower bound * @lower: service range lower bound
* @upper: service range upper bound * @upper: service range upper bound
* @tree_node: member of service range RB tree * @tree_node: member of service range RB tree
* @max: largest 'upper' in this node subtree
* @local_publ: list of identical publications made from this node * @local_publ: list of identical publications made from this node
* Used by closest_first lookup and multicast lookup algorithm * Used by closest_first lookup and multicast lookup algorithm
* @all_publ: all publications identical to this one, whatever node and scope * @all_publ: all publications identical to this one, whatever node and scope
@ -60,6 +62,7 @@ struct service_range {
u32 lower; u32 lower;
u32 upper; u32 upper;
struct rb_node tree_node; struct rb_node tree_node;
u32 max;
struct list_head local_publ; struct list_head local_publ;
struct list_head all_publ; struct list_head all_publ;
}; };
@ -84,6 +87,130 @@ struct tipc_service {
struct rcu_head rcu; struct rcu_head rcu;
}; };
#define service_range_upper(sr) ((sr)->upper)
RB_DECLARE_CALLBACKS_MAX(static, sr_callbacks,
struct service_range, tree_node, u32, max,
service_range_upper)
#define service_range_entry(rbtree_node) \
(container_of(rbtree_node, struct service_range, tree_node))
#define service_range_overlap(sr, start, end) \
((sr)->lower <= (end) && (sr)->upper >= (start))
/**
* service_range_foreach_match - iterate over tipc service rbtree for each
* range match
* @sr: the service range pointer as a loop cursor
* @sc: the pointer to tipc service which holds the service range rbtree
* @start, end: the range (end >= start) for matching
*/
#define service_range_foreach_match(sr, sc, start, end) \
for (sr = service_range_match_first((sc)->ranges.rb_node, \
start, \
end); \
sr; \
sr = service_range_match_next(&(sr)->tree_node, \
start, \
end))
/**
* service_range_match_first - find first service range matching a range
* @n: the root node of service range rbtree for searching
* @start, end: the range (end >= start) for matching
*
* Return: the leftmost service range node in the rbtree that overlaps the
* specific range if any. Otherwise, returns NULL.
*/
static struct service_range *service_range_match_first(struct rb_node *n,
u32 start, u32 end)
{
struct service_range *sr;
struct rb_node *l, *r;
/* Non overlaps in tree at all? */
if (!n || service_range_entry(n)->max < start)
return NULL;
while (n) {
l = n->rb_left;
if (l && service_range_entry(l)->max >= start) {
/* A leftmost overlap range node must be one in the left
* subtree. If not, it has lower > end, then nodes on
* the right side cannot satisfy the condition either.
*/
n = l;
continue;
}
/* No one in the left subtree can match, return if this node is
* an overlap i.e. leftmost.
*/
sr = service_range_entry(n);
if (service_range_overlap(sr, start, end))
return sr;
/* Ok, try to lookup on the right side */
r = n->rb_right;
if (sr->lower <= end &&
r && service_range_entry(r)->max >= start) {
n = r;
continue;
}
break;
}
return NULL;
}
/**
* service_range_match_next - find next service range matching a range
* @n: a node in service range rbtree from which the searching starts
* @start, end: the range (end >= start) for matching
*
* Return: the next service range node to the given node in the rbtree that
* overlaps the specific range if any. Otherwise, returns NULL.
*/
static struct service_range *service_range_match_next(struct rb_node *n,
u32 start, u32 end)
{
struct service_range *sr;
struct rb_node *p, *r;
while (n) {
r = n->rb_right;
if (r && service_range_entry(r)->max >= start)
/* A next overlap range node must be one in the right
* subtree. If not, it has lower > end, then any next
* successor (- an ancestor) of this node cannot
* satisfy the condition either.
*/
return service_range_match_first(r, start, end);
/* No one in the right subtree can match, go up to find an
* ancestor of this node which is parent of a left-hand child.
*/
while ((p = rb_parent(n)) && n == p->rb_right)
n = p;
if (!p)
break;
/* Return if this ancestor is an overlap */
sr = service_range_entry(p);
if (service_range_overlap(sr, start, end))
return sr;
/* Ok, try to lookup more from this ancestor */
if (sr->lower <= end) {
n = p;
continue;
}
break;
}
return NULL;
}
static int hash(int x) static int hash(int x)
{ {
return x & (TIPC_NAMETBL_SIZE - 1); return x & (TIPC_NAMETBL_SIZE - 1);
@ -139,84 +266,51 @@ static struct tipc_service *tipc_service_create(u32 type, struct hlist_head *hd)
return service; return service;
} }
/**
* tipc_service_first_range - find first service range in tree matching instance
*
* Very time-critical, so binary search through range rb tree
*/
static struct service_range *tipc_service_first_range(struct tipc_service *sc,
u32 instance)
{
struct rb_node *n = sc->ranges.rb_node;
struct service_range *sr;
while (n) {
sr = container_of(n, struct service_range, tree_node);
if (sr->lower > instance)
n = n->rb_left;
else if (sr->upper < instance)
n = n->rb_right;
else
return sr;
}
return NULL;
}
/* tipc_service_find_range - find service range matching publication parameters /* tipc_service_find_range - find service range matching publication parameters
*/ */
static struct service_range *tipc_service_find_range(struct tipc_service *sc, static struct service_range *tipc_service_find_range(struct tipc_service *sc,
u32 lower, u32 upper) u32 lower, u32 upper)
{ {
struct rb_node *n = sc->ranges.rb_node;
struct service_range *sr; struct service_range *sr;
sr = tipc_service_first_range(sc, lower); service_range_foreach_match(sr, sc, lower, upper) {
if (!sr) /* Look for exact match */
return NULL; if (sr->lower == lower && sr->upper == upper)
return sr;
/* Look for exact match */
for (n = &sr->tree_node; n; n = rb_next(n)) {
sr = container_of(n, struct service_range, tree_node);
if (sr->upper == upper)
break;
} }
if (!n || sr->lower != lower || sr->upper != upper)
return NULL;
return sr; return NULL;
} }
static struct service_range *tipc_service_create_range(struct tipc_service *sc, static struct service_range *tipc_service_create_range(struct tipc_service *sc,
u32 lower, u32 upper) u32 lower, u32 upper)
{ {
struct rb_node **n, *parent = NULL; struct rb_node **n, *parent = NULL;
struct service_range *sr, *tmp; struct service_range *sr;
n = &sc->ranges.rb_node; n = &sc->ranges.rb_node;
while (*n) { while (*n) {
tmp = container_of(*n, struct service_range, tree_node);
parent = *n; parent = *n;
tmp = container_of(parent, struct service_range, tree_node); sr = service_range_entry(parent);
if (lower < tmp->lower) if (lower == sr->lower && upper == sr->upper)
n = &(*n)->rb_left; return sr;
else if (lower > tmp->lower) if (sr->max < upper)
n = &(*n)->rb_right; sr->max = upper;
else if (upper < tmp->upper) if (lower <= sr->lower)
n = &(*n)->rb_left; n = &parent->rb_left;
else if (upper > tmp->upper)
n = &(*n)->rb_right;
else else
return tmp; n = &parent->rb_right;
} }
sr = kzalloc(sizeof(*sr), GFP_ATOMIC); sr = kzalloc(sizeof(*sr), GFP_ATOMIC);
if (!sr) if (!sr)
return NULL; return NULL;
sr->lower = lower; sr->lower = lower;
sr->upper = upper; sr->upper = upper;
sr->max = upper;
INIT_LIST_HEAD(&sr->local_publ); INIT_LIST_HEAD(&sr->local_publ);
INIT_LIST_HEAD(&sr->all_publ); INIT_LIST_HEAD(&sr->all_publ);
rb_link_node(&sr->tree_node, parent, n); rb_link_node(&sr->tree_node, parent, n);
rb_insert_color(&sr->tree_node, &sc->ranges); rb_insert_augmented(&sr->tree_node, &sc->ranges, &sr_callbacks);
return sr; return sr;
} }
@ -310,7 +404,6 @@ static void tipc_service_subscribe(struct tipc_service *service,
struct list_head publ_list; struct list_head publ_list;
struct service_range *sr; struct service_range *sr;
struct tipc_name_seq ns; struct tipc_name_seq ns;
struct rb_node *n;
u32 filter; u32 filter;
ns.type = tipc_sub_read(sb, seq.type); ns.type = tipc_sub_read(sb, seq.type);
@ -325,13 +418,7 @@ static void tipc_service_subscribe(struct tipc_service *service,
return; return;
INIT_LIST_HEAD(&publ_list); INIT_LIST_HEAD(&publ_list);
for (n = rb_first(&service->ranges); n; n = rb_next(n)) { service_range_foreach_match(sr, service, ns.lower, ns.upper) {
sr = container_of(n, struct service_range, tree_node);
if (sr->lower > ns.upper)
break;
if (!tipc_sub_check_overlap(&ns, sr->lower, sr->upper))
continue;
first = NULL; first = NULL;
list_for_each_entry(p, &sr->all_publ, all_publ) { list_for_each_entry(p, &sr->all_publ, all_publ) {
if (filter & TIPC_SUB_PORTS) if (filter & TIPC_SUB_PORTS)
@ -425,7 +512,7 @@ struct publication *tipc_nametbl_remove_publ(struct net *net, u32 type,
/* Remove service range item if this was its last publication */ /* Remove service range item if this was its last publication */
if (list_empty(&sr->all_publ)) { if (list_empty(&sr->all_publ)) {
rb_erase(&sr->tree_node, &sc->ranges); rb_erase_augmented(&sr->tree_node, &sc->ranges, &sr_callbacks);
kfree(sr); kfree(sr);
} }
@ -473,34 +560,39 @@ u32 tipc_nametbl_translate(struct net *net, u32 type, u32 instance, u32 *dnode)
rcu_read_lock(); rcu_read_lock();
sc = tipc_service_find(net, type); sc = tipc_service_find(net, type);
if (unlikely(!sc)) if (unlikely(!sc))
goto not_found; goto exit;
spin_lock_bh(&sc->lock); spin_lock_bh(&sc->lock);
sr = tipc_service_first_range(sc, instance); service_range_foreach_match(sr, sc, instance, instance) {
if (unlikely(!sr)) /* Select lookup algo: local, closest-first or round-robin */
goto no_match; if (*dnode == self) {
list = &sr->local_publ;
/* Select lookup algorithm: local, closest-first or round-robin */ if (list_empty(list))
if (*dnode == self) { continue;
list = &sr->local_publ; p = list_first_entry(list, struct publication,
if (list_empty(list)) local_publ);
goto no_match; list_move_tail(&p->local_publ, &sr->local_publ);
p = list_first_entry(list, struct publication, local_publ); } else if (legacy && !*dnode && !list_empty(&sr->local_publ)) {
list_move_tail(&p->local_publ, &sr->local_publ); list = &sr->local_publ;
} else if (legacy && !*dnode && !list_empty(&sr->local_publ)) { p = list_first_entry(list, struct publication,
list = &sr->local_publ; local_publ);
p = list_first_entry(list, struct publication, local_publ); list_move_tail(&p->local_publ, &sr->local_publ);
list_move_tail(&p->local_publ, &sr->local_publ); } else {
} else { list = &sr->all_publ;
list = &sr->all_publ; p = list_first_entry(list, struct publication,
p = list_first_entry(list, struct publication, all_publ); all_publ);
list_move_tail(&p->all_publ, &sr->all_publ); list_move_tail(&p->all_publ, &sr->all_publ);
}
port = p->port;
node = p->node;
/* Todo: as for legacy, pick the first matching range only, a
* "true" round-robin will be performed as needed.
*/
break;
} }
port = p->port;
node = p->node;
no_match:
spin_unlock_bh(&sc->lock); spin_unlock_bh(&sc->lock);
not_found:
exit:
rcu_read_unlock(); rcu_read_unlock();
*dnode = node; *dnode = node;
return port; return port;
@ -523,7 +615,8 @@ bool tipc_nametbl_lookup(struct net *net, u32 type, u32 instance, u32 scope,
spin_lock_bh(&sc->lock); spin_lock_bh(&sc->lock);
sr = tipc_service_first_range(sc, instance); /* Todo: a full search i.e. service_range_foreach_match() instead? */
sr = service_range_match_first(sc->ranges.rb_node, instance, instance);
if (!sr) if (!sr)
goto no_match; goto no_match;
@ -552,7 +645,6 @@ void tipc_nametbl_mc_lookup(struct net *net, u32 type, u32 lower, u32 upper,
struct service_range *sr; struct service_range *sr;
struct tipc_service *sc; struct tipc_service *sc;
struct publication *p; struct publication *p;
struct rb_node *n;
rcu_read_lock(); rcu_read_lock();
sc = tipc_service_find(net, type); sc = tipc_service_find(net, type);
@ -560,13 +652,7 @@ void tipc_nametbl_mc_lookup(struct net *net, u32 type, u32 lower, u32 upper,
goto exit; goto exit;
spin_lock_bh(&sc->lock); spin_lock_bh(&sc->lock);
service_range_foreach_match(sr, sc, lower, upper) {
for (n = rb_first(&sc->ranges); n; n = rb_next(n)) {
sr = container_of(n, struct service_range, tree_node);
if (sr->upper < lower)
continue;
if (sr->lower > upper)
break;
list_for_each_entry(p, &sr->local_publ, local_publ) { list_for_each_entry(p, &sr->local_publ, local_publ) {
if (p->scope == scope || (!exact && p->scope < scope)) if (p->scope == scope || (!exact && p->scope < scope))
tipc_dest_push(dports, 0, p->port); tipc_dest_push(dports, 0, p->port);
@ -587,7 +673,6 @@ void tipc_nametbl_lookup_dst_nodes(struct net *net, u32 type, u32 lower,
struct service_range *sr; struct service_range *sr;
struct tipc_service *sc; struct tipc_service *sc;
struct publication *p; struct publication *p;
struct rb_node *n;
rcu_read_lock(); rcu_read_lock();
sc = tipc_service_find(net, type); sc = tipc_service_find(net, type);
@ -595,13 +680,7 @@ void tipc_nametbl_lookup_dst_nodes(struct net *net, u32 type, u32 lower,
goto exit; goto exit;
spin_lock_bh(&sc->lock); spin_lock_bh(&sc->lock);
service_range_foreach_match(sr, sc, lower, upper) {
for (n = rb_first(&sc->ranges); n; n = rb_next(n)) {
sr = container_of(n, struct service_range, tree_node);
if (sr->upper < lower)
continue;
if (sr->lower > upper)
break;
list_for_each_entry(p, &sr->all_publ, all_publ) { list_for_each_entry(p, &sr->all_publ, all_publ) {
tipc_nlist_add(nodes, p->node); tipc_nlist_add(nodes, p->node);
} }
@ -799,7 +878,7 @@ static void tipc_service_delete(struct net *net, struct tipc_service *sc)
tipc_service_remove_publ(sr, p->node, p->key); tipc_service_remove_publ(sr, p->node, p->key);
kfree_rcu(p, rcu); kfree_rcu(p, rcu);
} }
rb_erase(&sr->tree_node, &sc->ranges); rb_erase_augmented(&sr->tree_node, &sc->ranges, &sr_callbacks);
kfree(sr); kfree(sr);
} }
hlist_del_init_rcu(&sc->service_list); hlist_del_init_rcu(&sc->service_list);

View File

@ -1364,8 +1364,8 @@ static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen)
struct tipc_msg *hdr = &tsk->phdr; struct tipc_msg *hdr = &tsk->phdr;
struct tipc_name_seq *seq; struct tipc_name_seq *seq;
struct sk_buff_head pkts; struct sk_buff_head pkts;
u32 dport, dnode = 0; u32 dport = 0, dnode = 0;
u32 type, inst; u32 type = 0, inst = 0;
int mtu, rc; int mtu, rc;
if (unlikely(dlen > TIPC_MAX_USER_MSG_SIZE)) if (unlikely(dlen > TIPC_MAX_USER_MSG_SIZE))
@ -1418,23 +1418,11 @@ static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen)
type = dest->addr.name.name.type; type = dest->addr.name.name.type;
inst = dest->addr.name.name.instance; inst = dest->addr.name.name.instance;
dnode = dest->addr.name.domain; dnode = dest->addr.name.domain;
msg_set_type(hdr, TIPC_NAMED_MSG);
msg_set_hdr_sz(hdr, NAMED_H_SIZE);
msg_set_nametype(hdr, type);
msg_set_nameinst(hdr, inst);
msg_set_lookup_scope(hdr, tipc_node2scope(dnode));
dport = tipc_nametbl_translate(net, type, inst, &dnode); dport = tipc_nametbl_translate(net, type, inst, &dnode);
msg_set_destnode(hdr, dnode);
msg_set_destport(hdr, dport);
if (unlikely(!dport && !dnode)) if (unlikely(!dport && !dnode))
return -EHOSTUNREACH; return -EHOSTUNREACH;
} else if (dest->addrtype == TIPC_ADDR_ID) { } else if (dest->addrtype == TIPC_ADDR_ID) {
dnode = dest->addr.id.node; dnode = dest->addr.id.node;
msg_set_type(hdr, TIPC_DIRECT_MSG);
msg_set_lookup_scope(hdr, 0);
msg_set_destnode(hdr, dnode);
msg_set_destport(hdr, dest->addr.id.ref);
msg_set_hdr_sz(hdr, BASIC_H_SIZE);
} else { } else {
return -EINVAL; return -EINVAL;
} }
@ -1445,6 +1433,22 @@ static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen)
if (unlikely(rc)) if (unlikely(rc))
return rc; return rc;
if (dest->addrtype == TIPC_ADDR_NAME) {
msg_set_type(hdr, TIPC_NAMED_MSG);
msg_set_hdr_sz(hdr, NAMED_H_SIZE);
msg_set_nametype(hdr, type);
msg_set_nameinst(hdr, inst);
msg_set_lookup_scope(hdr, tipc_node2scope(dnode));
msg_set_destnode(hdr, dnode);
msg_set_destport(hdr, dport);
} else { /* TIPC_ADDR_ID */
msg_set_type(hdr, TIPC_DIRECT_MSG);
msg_set_lookup_scope(hdr, 0);
msg_set_destnode(hdr, dnode);
msg_set_destport(hdr, dest->addr.id.ref);
msg_set_hdr_sz(hdr, BASIC_H_SIZE);
}
__skb_queue_head_init(&pkts); __skb_queue_head_init(&pkts);
mtu = tipc_node_get_mtu(net, dnode, tsk->portid, false); mtu = tipc_node_get_mtu(net, dnode, tsk->portid, false);
rc = tipc_msg_build(hdr, m, 0, dlen, mtu, &pkts); rc = tipc_msg_build(hdr, m, 0, dlen, mtu, &pkts);