Merge branch 'tipc-Sep17-2011' of git://openlinux.windriver.com/people/paulg/net-next

This commit is contained in:
David S. Miller 2011-09-20 14:39:04 -04:00
commit 46151ae817
16 changed files with 245 additions and 203 deletions

View File

@ -39,6 +39,7 @@
#include "link.h" #include "link.h"
#include "port.h" #include "port.h"
#include "bcast.h" #include "bcast.h"
#include "name_distr.h"
#define MAX_PKT_DEFAULT_MCAST 1500 /* bcast link max packet size (fixed) */ #define MAX_PKT_DEFAULT_MCAST 1500 /* bcast link max packet size (fixed) */
@ -298,14 +299,9 @@ static void bclink_send_nack(struct tipc_node *n_ptr)
msg_set_bcgap_to(msg, n_ptr->bclink.gap_to); msg_set_bcgap_to(msg, n_ptr->bclink.gap_to);
msg_set_bcast_tag(msg, tipc_own_tag); msg_set_bcast_tag(msg, tipc_own_tag);
if (tipc_bearer_send(&bcbearer->bearer, buf, NULL)) { tipc_bearer_send(&bcbearer->bearer, buf, NULL);
bcl->stats.sent_nacks++; bcl->stats.sent_nacks++;
buf_discard(buf); buf_discard(buf);
} else {
tipc_bearer_schedule(bcl->b_ptr, bcl);
bcl->proto_msg_queue = buf;
bcl->stats.bearer_congs++;
}
/* /*
* Ensure we doesn't send another NACK msg to the node * Ensure we doesn't send another NACK msg to the node
@ -426,20 +422,28 @@ int tipc_bclink_send_msg(struct sk_buff *buf)
void tipc_bclink_recv_pkt(struct sk_buff *buf) void tipc_bclink_recv_pkt(struct sk_buff *buf)
{ {
struct tipc_msg *msg = buf_msg(buf); struct tipc_msg *msg = buf_msg(buf);
struct tipc_node *node = tipc_node_find(msg_prevnode(msg)); struct tipc_node *node;
u32 next_in; u32 next_in;
u32 seqno; u32 seqno;
struct sk_buff *deferred; struct sk_buff *deferred;
if (unlikely(!node || !tipc_node_is_up(node) || !node->bclink.supported || /* Screen out unwanted broadcast messages */
(msg_mc_netid(msg) != tipc_net_id))) {
buf_discard(buf); if (msg_mc_netid(msg) != tipc_net_id)
return; goto exit;
}
node = tipc_node_find(msg_prevnode(msg));
if (unlikely(!node))
goto exit;
tipc_node_lock(node);
if (unlikely(!node->bclink.supported))
goto unlock;
if (unlikely(msg_user(msg) == BCAST_PROTOCOL)) { if (unlikely(msg_user(msg) == BCAST_PROTOCOL)) {
if (msg_type(msg) != STATE_MSG)
goto unlock;
if (msg_destnode(msg) == tipc_own_addr) { if (msg_destnode(msg) == tipc_own_addr) {
tipc_node_lock(node);
tipc_bclink_acknowledge(node, msg_bcast_ack(msg)); tipc_bclink_acknowledge(node, msg_bcast_ack(msg));
tipc_node_unlock(node); tipc_node_unlock(node);
spin_lock_bh(&bc_lock); spin_lock_bh(&bc_lock);
@ -449,18 +453,18 @@ void tipc_bclink_recv_pkt(struct sk_buff *buf)
msg_bcgap_to(msg)); msg_bcgap_to(msg));
spin_unlock_bh(&bc_lock); spin_unlock_bh(&bc_lock);
} else { } else {
tipc_node_unlock(node);
tipc_bclink_peek_nack(msg_destnode(msg), tipc_bclink_peek_nack(msg_destnode(msg),
msg_bcast_tag(msg), msg_bcast_tag(msg),
msg_bcgap_after(msg), msg_bcgap_after(msg),
msg_bcgap_to(msg)); msg_bcgap_to(msg));
} }
buf_discard(buf); goto exit;
return;
} }
tipc_node_lock(node); /* Handle in-sequence broadcast message */
receive: receive:
deferred = node->bclink.deferred_head;
next_in = mod(node->bclink.last_in + 1); next_in = mod(node->bclink.last_in + 1);
seqno = msg_seqno(msg); seqno = msg_seqno(msg);
@ -474,7 +478,10 @@ void tipc_bclink_recv_pkt(struct sk_buff *buf)
} }
if (likely(msg_isdata(msg))) { if (likely(msg_isdata(msg))) {
tipc_node_unlock(node); tipc_node_unlock(node);
tipc_port_recv_mcast(buf, NULL); if (likely(msg_mcast(msg)))
tipc_port_recv_mcast(buf, NULL);
else
buf_discard(buf);
} else if (msg_user(msg) == MSG_BUNDLER) { } else if (msg_user(msg) == MSG_BUNDLER) {
bcl->stats.recv_bundles++; bcl->stats.recv_bundles++;
bcl->stats.recv_bundled += msg_msgcnt(msg); bcl->stats.recv_bundled += msg_msgcnt(msg);
@ -487,18 +494,22 @@ void tipc_bclink_recv_pkt(struct sk_buff *buf)
bcl->stats.recv_fragmented++; bcl->stats.recv_fragmented++;
tipc_node_unlock(node); tipc_node_unlock(node);
tipc_net_route_msg(buf); tipc_net_route_msg(buf);
} else if (msg_user(msg) == NAME_DISTRIBUTOR) {
tipc_node_unlock(node);
tipc_named_recv(buf);
} else { } else {
tipc_node_unlock(node); tipc_node_unlock(node);
tipc_net_route_msg(buf); buf_discard(buf);
} }
buf = NULL;
tipc_node_lock(node);
deferred = node->bclink.deferred_head;
if (deferred && (buf_seqno(deferred) == mod(next_in + 1))) { if (deferred && (buf_seqno(deferred) == mod(next_in + 1))) {
tipc_node_lock(node);
buf = deferred; buf = deferred;
msg = buf_msg(buf); msg = buf_msg(buf);
node->bclink.deferred_head = deferred->next; node->bclink.deferred_head = deferred->next;
goto receive; goto receive;
} }
return;
} else if (less(next_in, seqno)) { } else if (less(next_in, seqno)) {
u32 gap_after = node->bclink.gap_after; u32 gap_after = node->bclink.gap_after;
u32 gap_to = node->bclink.gap_to; u32 gap_to = node->bclink.gap_to;
@ -513,6 +524,7 @@ void tipc_bclink_recv_pkt(struct sk_buff *buf)
else if (less(gap_after, seqno) && less(seqno, gap_to)) else if (less(gap_after, seqno) && less(seqno, gap_to))
node->bclink.gap_to = seqno; node->bclink.gap_to = seqno;
} }
buf = NULL;
if (bclink_ack_allowed(node->bclink.nack_sync)) { if (bclink_ack_allowed(node->bclink.nack_sync)) {
if (gap_to != gap_after) if (gap_to != gap_after)
bclink_send_nack(node); bclink_send_nack(node);
@ -520,9 +532,11 @@ void tipc_bclink_recv_pkt(struct sk_buff *buf)
} }
} else { } else {
bcl->stats.duplicates++; bcl->stats.duplicates++;
buf_discard(buf);
} }
unlock:
tipc_node_unlock(node); tipc_node_unlock(node);
exit:
buf_discard(buf);
} }
u32 tipc_bclink_acks_missing(struct tipc_node *n_ptr) u32 tipc_bclink_acks_missing(struct tipc_node *n_ptr)
@ -535,10 +549,11 @@ u32 tipc_bclink_acks_missing(struct tipc_node *n_ptr)
/** /**
* tipc_bcbearer_send - send a packet through the broadcast pseudo-bearer * tipc_bcbearer_send - send a packet through the broadcast pseudo-bearer
* *
* Send through as many bearers as necessary to reach all nodes * Send packet over as many bearers as necessary to reach all nodes
* that support TIPC multicasting. * that have joined the broadcast link.
* *
* Returns 0 if packet sent successfully, non-zero if not * Returns 0 (packet sent successfully) under all circumstances,
* since the broadcast link's pseudo-bearer never blocks
*/ */
static int tipc_bcbearer_send(struct sk_buff *buf, static int tipc_bcbearer_send(struct sk_buff *buf,
@ -547,7 +562,12 @@ static int tipc_bcbearer_send(struct sk_buff *buf,
{ {
int bp_index; int bp_index;
/* Prepare buffer for broadcasting (if first time trying to send it) */ /*
* Prepare broadcast link message for reliable transmission,
* if first time trying to send it;
* preparation is skipped for broadcast link protocol messages
* since they are sent in an unreliable manner and don't need it
*/
if (likely(!msg_non_seq(buf_msg(buf)))) { if (likely(!msg_non_seq(buf_msg(buf)))) {
struct tipc_msg *msg; struct tipc_msg *msg;
@ -596,18 +616,12 @@ static int tipc_bcbearer_send(struct sk_buff *buf,
} }
if (bcbearer->remains_new.count == 0) if (bcbearer->remains_new.count == 0)
return 0; break; /* all targets reached */
bcbearer->remains = bcbearer->remains_new; bcbearer->remains = bcbearer->remains_new;
} }
/* return 0;
* Unable to reach all targets (indicate success, since currently
* there isn't code in place to properly block & unblock the
* pseudo-bearer used by the broadcast link)
*/
return TIPC_OK;
} }
/** /**
@ -667,27 +681,6 @@ void tipc_bcbearer_sort(void)
spin_unlock_bh(&bc_lock); spin_unlock_bh(&bc_lock);
} }
/**
* tipc_bcbearer_push - resolve bearer congestion
*
* Forces bclink to push out any unsent packets, until all packets are gone
* or congestion reoccurs.
* No locks set when function called
*/
void tipc_bcbearer_push(void)
{
struct tipc_bearer *b_ptr;
spin_lock_bh(&bc_lock);
b_ptr = &bcbearer->bearer;
if (b_ptr->blocked) {
b_ptr->blocked = 0;
tipc_bearer_lock_push(b_ptr);
}
spin_unlock_bh(&bc_lock);
}
int tipc_bclink_stats(char *buf, const u32 buf_size) int tipc_bclink_stats(char *buf, const u32 buf_size)
{ {
@ -764,7 +757,7 @@ int tipc_bclink_init(void)
bcbearer = kzalloc(sizeof(*bcbearer), GFP_ATOMIC); bcbearer = kzalloc(sizeof(*bcbearer), GFP_ATOMIC);
bclink = kzalloc(sizeof(*bclink), GFP_ATOMIC); bclink = kzalloc(sizeof(*bclink), GFP_ATOMIC);
if (!bcbearer || !bclink) { if (!bcbearer || !bclink) {
warn("Multicast link creation failed, no memory\n"); warn("Broadcast link creation failed, no memory\n");
kfree(bcbearer); kfree(bcbearer);
bcbearer = NULL; bcbearer = NULL;
kfree(bclink); kfree(bclink);
@ -775,7 +768,7 @@ int tipc_bclink_init(void)
INIT_LIST_HEAD(&bcbearer->bearer.cong_links); INIT_LIST_HEAD(&bcbearer->bearer.cong_links);
bcbearer->bearer.media = &bcbearer->media; bcbearer->bearer.media = &bcbearer->media;
bcbearer->media.send_msg = tipc_bcbearer_send; bcbearer->media.send_msg = tipc_bcbearer_send;
sprintf(bcbearer->media.name, "tipc-multicast"); sprintf(bcbearer->media.name, "tipc-broadcast");
bcl = &bclink->link; bcl = &bclink->link;
INIT_LIST_HEAD(&bcl->waiting_ports); INIT_LIST_HEAD(&bcl->waiting_ports);

View File

@ -101,6 +101,5 @@ int tipc_bclink_stats(char *stats_buf, const u32 buf_size);
int tipc_bclink_reset_stats(void); int tipc_bclink_reset_stats(void);
int tipc_bclink_set_queue_limits(u32 limit); int tipc_bclink_set_queue_limits(u32 limit);
void tipc_bcbearer_sort(void); void tipc_bcbearer_sort(void);
void tipc_bcbearer_push(void);
#endif #endif

View File

@ -385,13 +385,9 @@ static int bearer_push(struct tipc_bearer *b_ptr)
void tipc_bearer_lock_push(struct tipc_bearer *b_ptr) void tipc_bearer_lock_push(struct tipc_bearer *b_ptr)
{ {
int res;
spin_lock_bh(&b_ptr->lock); spin_lock_bh(&b_ptr->lock);
res = bearer_push(b_ptr); bearer_push(b_ptr);
spin_unlock_bh(&b_ptr->lock); spin_unlock_bh(&b_ptr->lock);
if (res)
tipc_bcbearer_push();
} }
@ -608,6 +604,7 @@ int tipc_block_bearer(const char *name)
info("Blocking bearer <%s>\n", name); info("Blocking bearer <%s>\n", name);
spin_lock_bh(&b_ptr->lock); spin_lock_bh(&b_ptr->lock);
b_ptr->blocked = 1; b_ptr->blocked = 1;
list_splice_init(&b_ptr->cong_links, &b_ptr->links);
list_for_each_entry_safe(l_ptr, temp_l_ptr, &b_ptr->links, link_list) { list_for_each_entry_safe(l_ptr, temp_l_ptr, &b_ptr->links, link_list) {
struct tipc_node *n_ptr = l_ptr->owner; struct tipc_node *n_ptr = l_ptr->owner;
@ -635,6 +632,7 @@ static void bearer_disable(struct tipc_bearer *b_ptr)
spin_lock_bh(&b_ptr->lock); spin_lock_bh(&b_ptr->lock);
b_ptr->blocked = 1; b_ptr->blocked = 1;
b_ptr->media->disable_bearer(b_ptr); b_ptr->media->disable_bearer(b_ptr);
list_splice_init(&b_ptr->cong_links, &b_ptr->links);
list_for_each_entry_safe(l_ptr, temp_l_ptr, &b_ptr->links, link_list) { list_for_each_entry_safe(l_ptr, temp_l_ptr, &b_ptr->links, link_list) {
tipc_link_delete(l_ptr); tipc_link_delete(l_ptr);
} }

View File

@ -39,8 +39,8 @@
#include "bcast.h" #include "bcast.h"
#define MAX_BEARERS 8 #define MAX_BEARERS 2
#define MAX_MEDIA 4 #define MAX_MEDIA 2
/* /*
* Identifiers of supported TIPC media types * Identifiers of supported TIPC media types

View File

@ -65,7 +65,6 @@ struct sk_buff *tipc_cfg_do_cmd(u32 orig_node, u16 cmd,
const void *req_tlv_area, int req_tlv_space, const void *req_tlv_area, int req_tlv_space,
int headroom); int headroom);
void tipc_cfg_link_event(u32 addr, char *name, int up);
int tipc_cfg_init(void); int tipc_cfg_init(void);
void tipc_cfg_stop(void); void tipc_cfg_stop(void);

View File

@ -159,12 +159,6 @@ void tipc_disc_recv_msg(struct sk_buff *buf, struct tipc_bearer *b_ptr)
} }
tipc_node_lock(n_ptr); tipc_node_lock(n_ptr);
/* Don't talk to neighbor during cleanup after last session */
if (n_ptr->cleanup_required) {
tipc_node_unlock(n_ptr);
return;
}
link = n_ptr->links[b_ptr->identity]; link = n_ptr->links[b_ptr->identity];
/* Create a link endpoint for this bearer, if necessary */ /* Create a link endpoint for this bearer, if necessary */

View File

@ -2,7 +2,7 @@
* net/tipc/eth_media.c: Ethernet bearer support for TIPC * net/tipc/eth_media.c: Ethernet bearer support for TIPC
* *
* Copyright (c) 2001-2007, Ericsson AB * Copyright (c) 2001-2007, Ericsson AB
* Copyright (c) 2005-2007, Wind River Systems * Copyright (c) 2005-2008, 2011, Wind River Systems
* All rights reserved. * All rights reserved.
* *
* Redistribution and use in source and binary forms, with or without * Redistribution and use in source and binary forms, with or without
@ -37,7 +37,7 @@
#include "core.h" #include "core.h"
#include "bearer.h" #include "bearer.h"
#define MAX_ETH_BEARERS 2 #define MAX_ETH_BEARERS MAX_BEARERS
#define ETH_LINK_PRIORITY TIPC_DEF_LINK_PRI #define ETH_LINK_PRIORITY TIPC_DEF_LINK_PRI
#define ETH_LINK_TOLERANCE TIPC_DEF_LINK_TOL #define ETH_LINK_TOLERANCE TIPC_DEF_LINK_TOL
#define ETH_LINK_WINDOW TIPC_DEF_LINK_WIN #define ETH_LINK_WINDOW TIPC_DEF_LINK_WIN
@ -144,31 +144,27 @@ static int enable_bearer(struct tipc_bearer *tb_ptr)
/* Find device with specified name */ /* Find device with specified name */
read_lock(&dev_base_lock);
for_each_netdev(&init_net, pdev) { for_each_netdev(&init_net, pdev) {
if (!strncmp(pdev->name, driver_name, IFNAMSIZ)) { if (!strncmp(pdev->name, driver_name, IFNAMSIZ)) {
dev = pdev; dev = pdev;
dev_hold(dev);
break; break;
} }
} }
read_unlock(&dev_base_lock);
if (!dev) if (!dev)
return -ENODEV; return -ENODEV;
/* Find Ethernet bearer for device (or create one) */ /* Create Ethernet bearer for device */
while ((eb_ptr != stop) && eb_ptr->dev && (eb_ptr->dev != dev)) eb_ptr->dev = dev;
eb_ptr++; eb_ptr->tipc_packet_type.type = htons(ETH_P_TIPC);
if (eb_ptr == stop) eb_ptr->tipc_packet_type.dev = dev;
return -EDQUOT; eb_ptr->tipc_packet_type.func = recv_msg;
if (!eb_ptr->dev) { eb_ptr->tipc_packet_type.af_packet_priv = eb_ptr;
eb_ptr->dev = dev; INIT_LIST_HEAD(&(eb_ptr->tipc_packet_type.list));
eb_ptr->tipc_packet_type.type = htons(ETH_P_TIPC); dev_add_pack(&eb_ptr->tipc_packet_type);
eb_ptr->tipc_packet_type.dev = dev;
eb_ptr->tipc_packet_type.func = recv_msg;
eb_ptr->tipc_packet_type.af_packet_priv = eb_ptr;
INIT_LIST_HEAD(&(eb_ptr->tipc_packet_type.list));
dev_hold(dev);
dev_add_pack(&eb_ptr->tipc_packet_type);
}
/* Associate TIPC bearer with Ethernet bearer */ /* Associate TIPC bearer with Ethernet bearer */

View File

@ -332,15 +332,16 @@ struct link *tipc_link_create(struct tipc_node *n_ptr,
l_ptr->addr = peer; l_ptr->addr = peer;
if_name = strchr(b_ptr->name, ':') + 1; if_name = strchr(b_ptr->name, ':') + 1;
sprintf(l_ptr->name, "%u.%u.%u:%s-%u.%u.%u:", sprintf(l_ptr->name, "%u.%u.%u:%s-%u.%u.%u:unknown",
tipc_zone(tipc_own_addr), tipc_cluster(tipc_own_addr), tipc_zone(tipc_own_addr), tipc_cluster(tipc_own_addr),
tipc_node(tipc_own_addr), tipc_node(tipc_own_addr),
if_name, if_name,
tipc_zone(peer), tipc_cluster(peer), tipc_node(peer)); tipc_zone(peer), tipc_cluster(peer), tipc_node(peer));
/* note: peer i/f is appended to link name by reset/activate */ /* note: peer i/f name is updated by reset/activate message */
memcpy(&l_ptr->media_addr, media_addr, sizeof(*media_addr)); memcpy(&l_ptr->media_addr, media_addr, sizeof(*media_addr));
l_ptr->owner = n_ptr; l_ptr->owner = n_ptr;
l_ptr->checkpoint = 1; l_ptr->checkpoint = 1;
l_ptr->peer_session = INVALID_SESSION;
l_ptr->b_ptr = b_ptr; l_ptr->b_ptr = b_ptr;
link_set_supervision_props(l_ptr, b_ptr->media->tolerance); link_set_supervision_props(l_ptr, b_ptr->media->tolerance);
l_ptr->state = RESET_UNKNOWN; l_ptr->state = RESET_UNKNOWN;
@ -536,9 +537,6 @@ void tipc_link_stop(struct link *l_ptr)
l_ptr->proto_msg_queue = NULL; l_ptr->proto_msg_queue = NULL;
} }
/* LINK EVENT CODE IS NOT SUPPORTED AT PRESENT */
#define link_send_event(fcn, l_ptr, up) do { } while (0)
void tipc_link_reset(struct link *l_ptr) void tipc_link_reset(struct link *l_ptr)
{ {
struct sk_buff *buf; struct sk_buff *buf;
@ -596,10 +594,6 @@ void tipc_link_reset(struct link *l_ptr)
l_ptr->fsm_msg_cnt = 0; l_ptr->fsm_msg_cnt = 0;
l_ptr->stale_count = 0; l_ptr->stale_count = 0;
link_reset_statistics(l_ptr); link_reset_statistics(l_ptr);
link_send_event(tipc_cfg_link_event, l_ptr, 0);
if (!in_own_cluster(l_ptr->addr))
link_send_event(tipc_disc_link_event, l_ptr, 0);
} }
@ -608,9 +602,6 @@ static void link_activate(struct link *l_ptr)
l_ptr->next_in_no = l_ptr->stats.recv_info = 1; l_ptr->next_in_no = l_ptr->stats.recv_info = 1;
tipc_node_link_up(l_ptr->owner, l_ptr); tipc_node_link_up(l_ptr->owner, l_ptr);
tipc_bearer_add_dest(l_ptr->b_ptr, l_ptr->addr); tipc_bearer_add_dest(l_ptr->b_ptr, l_ptr->addr);
link_send_event(tipc_cfg_link_event, l_ptr, 1);
if (!in_own_cluster(l_ptr->addr))
link_send_event(tipc_disc_link_event, l_ptr, 1);
} }
/** /**
@ -984,6 +975,51 @@ int tipc_link_send(struct sk_buff *buf, u32 dest, u32 selector)
return res; return res;
} }
/*
* tipc_link_send_names - send name table entries to new neighbor
*
* Send routine for bulk delivery of name table messages when contact
* with a new neighbor occurs. No link congestion checking is performed
* because name table messages *must* be delivered. The messages must be
* small enough not to require fragmentation.
* Called without any locks held.
*/
void tipc_link_send_names(struct list_head *message_list, u32 dest)
{
struct tipc_node *n_ptr;
struct link *l_ptr;
struct sk_buff *buf;
struct sk_buff *temp_buf;
if (list_empty(message_list))
return;
read_lock_bh(&tipc_net_lock);
n_ptr = tipc_node_find(dest);
if (n_ptr) {
tipc_node_lock(n_ptr);
l_ptr = n_ptr->active_links[0];
if (l_ptr) {
/* convert circular list to linear list */
((struct sk_buff *)message_list->prev)->next = NULL;
link_add_chain_to_outqueue(l_ptr,
(struct sk_buff *)message_list->next, 0);
tipc_link_push_queue(l_ptr);
INIT_LIST_HEAD(message_list);
}
tipc_node_unlock(n_ptr);
}
read_unlock_bh(&tipc_net_lock);
/* discard the messages if they couldn't be sent */
list_for_each_safe(buf, temp_buf, ((struct sk_buff *)message_list)) {
list_del((struct list_head *)buf);
buf_discard(buf);
}
}
/* /*
* link_send_buf_fast: Entry for data messages where the * link_send_buf_fast: Entry for data messages where the
* destination link is known and the header is complete, * destination link is known and the header is complete,
@ -1031,9 +1067,6 @@ int tipc_send_buf_fast(struct sk_buff *buf, u32 destnode)
u32 selector = msg_origport(buf_msg(buf)) & 1; u32 selector = msg_origport(buf_msg(buf)) & 1;
u32 dummy; u32 dummy;
if (destnode == tipc_own_addr)
return tipc_port_recv_msg(buf);
read_lock_bh(&tipc_net_lock); read_lock_bh(&tipc_net_lock);
n_ptr = tipc_node_find(destnode); n_ptr = tipc_node_find(destnode);
if (likely(n_ptr)) { if (likely(n_ptr)) {
@ -1658,19 +1691,12 @@ void tipc_recv_msg(struct sk_buff *head, struct tipc_bearer *b_ptr)
continue; continue;
} }
/* Discard unicast link messages destined for another node */
if (unlikely(!msg_short(msg) && if (unlikely(!msg_short(msg) &&
(msg_destnode(msg) != tipc_own_addr))) (msg_destnode(msg) != tipc_own_addr)))
goto cont; goto cont;
/* Discard non-routeable messages destined for another node */
if (unlikely(!msg_isdata(msg) &&
(msg_destnode(msg) != tipc_own_addr))) {
if ((msg_user(msg) != CONN_MANAGER) &&
(msg_user(msg) != MSG_FRAGMENTER))
goto cont;
}
/* Locate neighboring node that sent message */ /* Locate neighboring node that sent message */
n_ptr = tipc_node_find(msg_prevnode(msg)); n_ptr = tipc_node_find(msg_prevnode(msg));
@ -1678,13 +1704,6 @@ void tipc_recv_msg(struct sk_buff *head, struct tipc_bearer *b_ptr)
goto cont; goto cont;
tipc_node_lock(n_ptr); tipc_node_lock(n_ptr);
/* Don't talk to neighbor during cleanup after last session */
if (n_ptr->cleanup_required) {
tipc_node_unlock(n_ptr);
goto cont;
}
/* Locate unicast link endpoint that should handle message */ /* Locate unicast link endpoint that should handle message */
l_ptr = n_ptr->links[b_ptr->identity]; l_ptr = n_ptr->links[b_ptr->identity];
@ -1693,6 +1712,20 @@ void tipc_recv_msg(struct sk_buff *head, struct tipc_bearer *b_ptr)
goto cont; goto cont;
} }
/* Verify that communication with node is currently allowed */
if ((n_ptr->block_setup & WAIT_PEER_DOWN) &&
msg_user(msg) == LINK_PROTOCOL &&
(msg_type(msg) == RESET_MSG ||
msg_type(msg) == ACTIVATE_MSG) &&
!msg_redundant_link(msg))
n_ptr->block_setup &= ~WAIT_PEER_DOWN;
if (n_ptr->block_setup) {
tipc_node_unlock(n_ptr);
goto cont;
}
/* Validate message sequence number info */ /* Validate message sequence number info */
seq_no = msg_seqno(msg); seq_no = msg_seqno(msg);
@ -1923,6 +1956,12 @@ void tipc_link_send_proto_msg(struct link *l_ptr, u32 msg_typ, int probe_msg,
if (link_blocked(l_ptr)) if (link_blocked(l_ptr))
return; return;
/* Abort non-RESET send if communication with node is prohibited */
if ((l_ptr->owner->block_setup) && (msg_typ != RESET_MSG))
return;
msg_set_type(msg, msg_typ); msg_set_type(msg, msg_typ);
msg_set_net_plane(msg, l_ptr->b_ptr->net_plane); msg_set_net_plane(msg, l_ptr->b_ptr->net_plane);
msg_set_bcast_ack(msg, mod(l_ptr->owner->bclink.last_in)); msg_set_bcast_ack(msg, mod(l_ptr->owner->bclink.last_in));
@ -2051,9 +2090,19 @@ static void link_recv_proto_msg(struct link *l_ptr, struct sk_buff *buf)
case RESET_MSG: case RESET_MSG:
if (!link_working_unknown(l_ptr) && if (!link_working_unknown(l_ptr) &&
(l_ptr->peer_session != INVALID_SESSION)) { (l_ptr->peer_session != INVALID_SESSION)) {
if (msg_session(msg) == l_ptr->peer_session) if (less_eq(msg_session(msg), l_ptr->peer_session))
break; /* duplicate: ignore */ break; /* duplicate or old reset: ignore */
} }
if (!msg_redundant_link(msg) && (link_working_working(l_ptr) ||
link_working_unknown(l_ptr))) {
/*
* peer has lost contact -- don't allow peer's links
* to reactivate before we recognize loss & clean up
*/
l_ptr->owner->block_setup = WAIT_NODE_DOWN;
}
/* fall thru' */ /* fall thru' */
case ACTIVATE_MSG: case ACTIVATE_MSG:
/* Update link settings according other endpoint's values */ /* Update link settings according other endpoint's values */

View File

@ -223,6 +223,7 @@ struct sk_buff *tipc_link_cmd_show_stats(const void *req_tlv_area, int req_tlv_s
struct sk_buff *tipc_link_cmd_reset_stats(const void *req_tlv_area, int req_tlv_space); struct sk_buff *tipc_link_cmd_reset_stats(const void *req_tlv_area, int req_tlv_space);
void tipc_link_reset(struct link *l_ptr); void tipc_link_reset(struct link *l_ptr);
int tipc_link_send(struct sk_buff *buf, u32 dest, u32 selector); int tipc_link_send(struct sk_buff *buf, u32 dest, u32 selector);
void tipc_link_send_names(struct list_head *message_list, u32 dest);
int tipc_link_send_buf(struct link *l_ptr, struct sk_buff *buf); int tipc_link_send_buf(struct link *l_ptr, struct sk_buff *buf);
u32 tipc_link_get_max_pkt(u32 dest, u32 selector); u32 tipc_link_get_max_pkt(u32 dest, u32 selector);
int tipc_link_send_sections_fast(struct tipc_port *sender, int tipc_link_send_sections_fast(struct tipc_port *sender,

View File

@ -173,18 +173,40 @@ void tipc_named_withdraw(struct publication *publ)
* tipc_named_node_up - tell specified node about all publications by this node * tipc_named_node_up - tell specified node about all publications by this node
*/ */
void tipc_named_node_up(unsigned long node) void tipc_named_node_up(unsigned long nodearg)
{ {
struct tipc_node *n_ptr;
struct link *l_ptr;
struct publication *publ; struct publication *publ;
struct distr_item *item = NULL; struct distr_item *item = NULL;
struct sk_buff *buf = NULL; struct sk_buff *buf = NULL;
struct list_head message_list;
u32 node = (u32)nodearg;
u32 left = 0; u32 left = 0;
u32 rest; u32 rest;
u32 max_item_buf; u32 max_item_buf = 0;
/* compute maximum amount of publication data to send per message */
read_lock_bh(&tipc_net_lock);
n_ptr = tipc_node_find(node);
if (n_ptr) {
tipc_node_lock(n_ptr);
l_ptr = n_ptr->active_links[0];
if (l_ptr)
max_item_buf = ((l_ptr->max_pkt - INT_H_SIZE) /
ITEM_SIZE) * ITEM_SIZE;
tipc_node_unlock(n_ptr);
}
read_unlock_bh(&tipc_net_lock);
if (!max_item_buf)
return;
/* create list of publication messages, then send them as a unit */
INIT_LIST_HEAD(&message_list);
read_lock_bh(&tipc_nametbl_lock); read_lock_bh(&tipc_nametbl_lock);
max_item_buf = TIPC_MAX_USER_MSG_SIZE / ITEM_SIZE;
max_item_buf *= ITEM_SIZE;
rest = publ_cnt * ITEM_SIZE; rest = publ_cnt * ITEM_SIZE;
list_for_each_entry(publ, &publ_root, local_list) { list_for_each_entry(publ, &publ_root, local_list) {
@ -202,13 +224,14 @@ void tipc_named_node_up(unsigned long node)
item++; item++;
left -= ITEM_SIZE; left -= ITEM_SIZE;
if (!left) { if (!left) {
msg_set_link_selector(buf_msg(buf), node); list_add_tail((struct list_head *)buf, &message_list);
tipc_link_send(buf, node, node);
buf = NULL; buf = NULL;
} }
} }
exit: exit:
read_unlock_bh(&tipc_nametbl_lock); read_unlock_bh(&tipc_nametbl_lock);
tipc_link_send_names(&message_list, (u32)node);
} }
/** /**

View File

@ -141,17 +141,6 @@ void tipc_net_route_msg(struct sk_buff *buf)
return; return;
msg = buf_msg(buf); msg = buf_msg(buf);
msg_incr_reroute_cnt(msg);
if (msg_reroute_cnt(msg) > 6) {
if (msg_errcode(msg)) {
buf_discard(buf);
} else {
tipc_reject_msg(buf, msg_destport(msg) ?
TIPC_ERR_NO_PORT : TIPC_ERR_NO_NAME);
}
return;
}
/* Handle message for this node */ /* Handle message for this node */
dnode = msg_short(msg) ? tipc_own_addr : msg_destnode(msg); dnode = msg_short(msg) ? tipc_own_addr : msg_destnode(msg);
if (tipc_in_scope(dnode, tipc_own_addr)) { if (tipc_in_scope(dnode, tipc_own_addr)) {

View File

@ -112,6 +112,7 @@ struct tipc_node *tipc_node_create(u32 addr)
break; break;
} }
list_add_tail(&n_ptr->list, &temp_node->list); list_add_tail(&n_ptr->list, &temp_node->list);
n_ptr->block_setup = WAIT_PEER_DOWN;
tipc_num_nodes++; tipc_num_nodes++;
@ -312,7 +313,7 @@ static void node_established_contact(struct tipc_node *n_ptr)
} }
} }
static void node_cleanup_finished(unsigned long node_addr) static void node_name_purge_complete(unsigned long node_addr)
{ {
struct tipc_node *n_ptr; struct tipc_node *n_ptr;
@ -320,7 +321,7 @@ static void node_cleanup_finished(unsigned long node_addr)
n_ptr = tipc_node_find(node_addr); n_ptr = tipc_node_find(node_addr);
if (n_ptr) { if (n_ptr) {
tipc_node_lock(n_ptr); tipc_node_lock(n_ptr);
n_ptr->cleanup_required = 0; n_ptr->block_setup &= ~WAIT_NAMES_GONE;
tipc_node_unlock(n_ptr); tipc_node_unlock(n_ptr);
} }
read_unlock_bh(&tipc_net_lock); read_unlock_bh(&tipc_net_lock);
@ -331,29 +332,33 @@ static void node_lost_contact(struct tipc_node *n_ptr)
char addr_string[16]; char addr_string[16];
u32 i; u32 i;
/* Clean up broadcast reception remains */
n_ptr->bclink.gap_after = n_ptr->bclink.gap_to = 0;
while (n_ptr->bclink.deferred_head) {
struct sk_buff *buf = n_ptr->bclink.deferred_head;
n_ptr->bclink.deferred_head = buf->next;
buf_discard(buf);
}
if (n_ptr->bclink.defragm) {
buf_discard(n_ptr->bclink.defragm);
n_ptr->bclink.defragm = NULL;
}
if (n_ptr->bclink.supported) {
tipc_bclink_acknowledge(n_ptr,
mod(n_ptr->bclink.acked + 10000));
tipc_nmap_remove(&tipc_bcast_nmap, n_ptr->addr);
if (n_ptr->addr < tipc_own_addr)
tipc_own_tag--;
}
info("Lost contact with %s\n", info("Lost contact with %s\n",
tipc_addr_string_fill(addr_string, n_ptr->addr)); tipc_addr_string_fill(addr_string, n_ptr->addr));
/* Flush broadcast link info associated with lost node */
if (n_ptr->bclink.supported) {
n_ptr->bclink.gap_after = n_ptr->bclink.gap_to = 0;
while (n_ptr->bclink.deferred_head) {
struct sk_buff *buf = n_ptr->bclink.deferred_head;
n_ptr->bclink.deferred_head = buf->next;
buf_discard(buf);
}
if (n_ptr->bclink.defragm) {
buf_discard(n_ptr->bclink.defragm);
n_ptr->bclink.defragm = NULL;
}
tipc_nmap_remove(&tipc_bcast_nmap, n_ptr->addr);
tipc_bclink_acknowledge(n_ptr,
mod(n_ptr->bclink.acked + 10000));
if (n_ptr->addr < tipc_own_addr)
tipc_own_tag--;
n_ptr->bclink.supported = 0;
}
/* Abort link changeover */ /* Abort link changeover */
for (i = 0; i < MAX_BEARERS; i++) { for (i = 0; i < MAX_BEARERS; i++) {
struct link *l_ptr = n_ptr->links[i]; struct link *l_ptr = n_ptr->links[i];
@ -367,10 +372,10 @@ static void node_lost_contact(struct tipc_node *n_ptr)
/* Notify subscribers */ /* Notify subscribers */
tipc_nodesub_notify(n_ptr); tipc_nodesub_notify(n_ptr);
/* Prevent re-contact with node until all cleanup is done */ /* Prevent re-contact with node until cleanup is done */
n_ptr->cleanup_required = 1; n_ptr->block_setup = WAIT_PEER_DOWN | WAIT_NAMES_GONE;
tipc_k_signal((Handler)node_cleanup_finished, n_ptr->addr); tipc_k_signal((Handler)node_name_purge_complete, n_ptr->addr);
} }
struct sk_buff *tipc_node_get_nodes(const void *req_tlv_area, int req_tlv_space) struct sk_buff *tipc_node_get_nodes(const void *req_tlv_area, int req_tlv_space)

View File

@ -42,6 +42,12 @@
#include "net.h" #include "net.h"
#include "bearer.h" #include "bearer.h"
/* Flags used to block (re)establishment of contact with a neighboring node */
#define WAIT_PEER_DOWN 0x0001 /* wait to see that peer's links are down */
#define WAIT_NAMES_GONE 0x0002 /* wait for peer's publications to be purged */
#define WAIT_NODE_DOWN 0x0004 /* wait until peer node is declared down */
/** /**
* struct tipc_node - TIPC node structure * struct tipc_node - TIPC node structure
* @addr: network address of node * @addr: network address of node
@ -52,7 +58,7 @@
* @active_links: pointers to active links to node * @active_links: pointers to active links to node
* @links: pointers to all links to node * @links: pointers to all links to node
* @working_links: number of working links to node (both active and standby) * @working_links: number of working links to node (both active and standby)
* @cleanup_required: non-zero if cleaning up after a prior loss of contact * @block_setup: bit mask of conditions preventing link establishment to node
* @link_cnt: number of links to node * @link_cnt: number of links to node
* @permit_changeover: non-zero if node has redundant links to this system * @permit_changeover: non-zero if node has redundant links to this system
* @bclink: broadcast-related info * @bclink: broadcast-related info
@ -77,7 +83,7 @@ struct tipc_node {
struct link *links[MAX_BEARERS]; struct link *links[MAX_BEARERS];
int link_cnt; int link_cnt;
int working_links; int working_links;
int cleanup_required; int block_setup;
int permit_changeover; int permit_changeover;
struct { struct {
int supported; int supported;

View File

@ -49,7 +49,7 @@ struct tipc_sock {
struct sock sk; struct sock sk;
struct tipc_port *p; struct tipc_port *p;
struct tipc_portid peer_name; struct tipc_portid peer_name;
long conn_timeout; unsigned int conn_timeout;
}; };
#define tipc_sk(sk) ((struct tipc_sock *)(sk)) #define tipc_sk(sk) ((struct tipc_sock *)(sk))
@ -231,7 +231,7 @@ static int tipc_create(struct net *net, struct socket *sock, int protocol,
sock_init_data(sock, sk); sock_init_data(sock, sk);
sk->sk_backlog_rcv = backlog_rcv; sk->sk_backlog_rcv = backlog_rcv;
tipc_sk(sk)->p = tp_ptr; tipc_sk(sk)->p = tp_ptr;
tipc_sk(sk)->conn_timeout = msecs_to_jiffies(CONN_TIMEOUT_DEFAULT); tipc_sk(sk)->conn_timeout = CONN_TIMEOUT_DEFAULT;
spin_unlock_bh(tp_ptr->lock); spin_unlock_bh(tp_ptr->lock);
@ -525,6 +525,7 @@ static int send_msg(struct kiocb *iocb, struct socket *sock,
struct tipc_port *tport = tipc_sk_port(sk); struct tipc_port *tport = tipc_sk_port(sk);
struct sockaddr_tipc *dest = (struct sockaddr_tipc *)m->msg_name; struct sockaddr_tipc *dest = (struct sockaddr_tipc *)m->msg_name;
int needs_conn; int needs_conn;
long timeout_val;
int res = -EINVAL; int res = -EINVAL;
if (unlikely(!dest)) if (unlikely(!dest))
@ -564,6 +565,8 @@ static int send_msg(struct kiocb *iocb, struct socket *sock,
reject_rx_queue(sk); reject_rx_queue(sk);
} }
timeout_val = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
do { do {
if (dest->addrtype == TIPC_ADDR_NAME) { if (dest->addrtype == TIPC_ADDR_NAME) {
res = dest_name_check(dest, m); res = dest_name_check(dest, m);
@ -600,16 +603,14 @@ static int send_msg(struct kiocb *iocb, struct socket *sock,
sock->state = SS_CONNECTING; sock->state = SS_CONNECTING;
break; break;
} }
if (m->msg_flags & MSG_DONTWAIT) { if (timeout_val <= 0L) {
res = -EWOULDBLOCK; res = timeout_val ? timeout_val : -EWOULDBLOCK;
break; break;
} }
release_sock(sk); release_sock(sk);
res = wait_event_interruptible(*sk_sleep(sk), timeout_val = wait_event_interruptible_timeout(*sk_sleep(sk),
!tport->congested); !tport->congested, timeout_val);
lock_sock(sk); lock_sock(sk);
if (res)
break;
} while (1); } while (1);
exit: exit:
@ -636,6 +637,7 @@ static int send_packet(struct kiocb *iocb, struct socket *sock,
struct sock *sk = sock->sk; struct sock *sk = sock->sk;
struct tipc_port *tport = tipc_sk_port(sk); struct tipc_port *tport = tipc_sk_port(sk);
struct sockaddr_tipc *dest = (struct sockaddr_tipc *)m->msg_name; struct sockaddr_tipc *dest = (struct sockaddr_tipc *)m->msg_name;
long timeout_val;
int res; int res;
/* Handle implied connection establishment */ /* Handle implied connection establishment */
@ -650,6 +652,8 @@ static int send_packet(struct kiocb *iocb, struct socket *sock,
if (iocb) if (iocb)
lock_sock(sk); lock_sock(sk);
timeout_val = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
do { do {
if (unlikely(sock->state != SS_CONNECTED)) { if (unlikely(sock->state != SS_CONNECTED)) {
if (sock->state == SS_DISCONNECTING) if (sock->state == SS_DISCONNECTING)
@ -663,16 +667,14 @@ static int send_packet(struct kiocb *iocb, struct socket *sock,
total_len); total_len);
if (likely(res != -ELINKCONG)) if (likely(res != -ELINKCONG))
break; break;
if (m->msg_flags & MSG_DONTWAIT) { if (timeout_val <= 0L) {
res = -EWOULDBLOCK; res = timeout_val ? timeout_val : -EWOULDBLOCK;
break; break;
} }
release_sock(sk); release_sock(sk);
res = wait_event_interruptible(*sk_sleep(sk), timeout_val = wait_event_interruptible_timeout(*sk_sleep(sk),
(!tport->congested || !tport->connected)); (!tport->congested || !tport->connected), timeout_val);
lock_sock(sk); lock_sock(sk);
if (res)
break;
} while (1); } while (1);
if (iocb) if (iocb)
@ -1369,7 +1371,7 @@ static int connect(struct socket *sock, struct sockaddr *dest, int destlen,
struct msghdr m = {NULL,}; struct msghdr m = {NULL,};
struct sk_buff *buf; struct sk_buff *buf;
struct tipc_msg *msg; struct tipc_msg *msg;
long timeout; unsigned int timeout;
int res; int res;
lock_sock(sk); lock_sock(sk);
@ -1434,7 +1436,8 @@ static int connect(struct socket *sock, struct sockaddr *dest, int destlen,
res = wait_event_interruptible_timeout(*sk_sleep(sk), res = wait_event_interruptible_timeout(*sk_sleep(sk),
(!skb_queue_empty(&sk->sk_receive_queue) || (!skb_queue_empty(&sk->sk_receive_queue) ||
(sock->state != SS_CONNECTING)), (sock->state != SS_CONNECTING)),
timeout ? timeout : MAX_SCHEDULE_TIMEOUT); timeout ? (long)msecs_to_jiffies(timeout)
: MAX_SCHEDULE_TIMEOUT);
lock_sock(sk); lock_sock(sk);
if (res > 0) { if (res > 0) {
@ -1480,9 +1483,7 @@ static int listen(struct socket *sock, int len)
lock_sock(sk); lock_sock(sk);
if (sock->state == SS_READY) if (sock->state != SS_UNCONNECTED)
res = -EOPNOTSUPP;
else if (sock->state != SS_UNCONNECTED)
res = -EINVAL; res = -EINVAL;
else { else {
sock->state = SS_LISTENING; sock->state = SS_LISTENING;
@ -1510,10 +1511,6 @@ static int accept(struct socket *sock, struct socket *new_sock, int flags)
lock_sock(sk); lock_sock(sk);
if (sock->state == SS_READY) {
res = -EOPNOTSUPP;
goto exit;
}
if (sock->state != SS_LISTENING) { if (sock->state != SS_LISTENING) {
res = -EINVAL; res = -EINVAL;
goto exit; goto exit;
@ -1696,7 +1693,7 @@ static int setsockopt(struct socket *sock,
res = tipc_set_portunreturnable(tport->ref, value); res = tipc_set_portunreturnable(tport->ref, value);
break; break;
case TIPC_CONN_TIMEOUT: case TIPC_CONN_TIMEOUT:
tipc_sk(sk)->conn_timeout = msecs_to_jiffies(value); tipc_sk(sk)->conn_timeout = value;
/* no need to set "res", since already 0 at this point */ /* no need to set "res", since already 0 at this point */
break; break;
default: default:
@ -1752,7 +1749,7 @@ static int getsockopt(struct socket *sock,
res = tipc_portunreturnable(tport->ref, &value); res = tipc_portunreturnable(tport->ref, &value);
break; break;
case TIPC_CONN_TIMEOUT: case TIPC_CONN_TIMEOUT:
value = jiffies_to_msecs(tipc_sk(sk)->conn_timeout); value = tipc_sk(sk)->conn_timeout;
/* no need to set "res", since already 0 at this point */ /* no need to set "res", since already 0 at this point */
break; break;
case TIPC_NODE_RECVQ_DEPTH: case TIPC_NODE_RECVQ_DEPTH:
@ -1790,11 +1787,11 @@ static const struct proto_ops msg_ops = {
.bind = bind, .bind = bind,
.connect = connect, .connect = connect,
.socketpair = sock_no_socketpair, .socketpair = sock_no_socketpair,
.accept = accept, .accept = sock_no_accept,
.getname = get_name, .getname = get_name,
.poll = poll, .poll = poll,
.ioctl = sock_no_ioctl, .ioctl = sock_no_ioctl,
.listen = listen, .listen = sock_no_listen,
.shutdown = shutdown, .shutdown = shutdown,
.setsockopt = setsockopt, .setsockopt = setsockopt,
.getsockopt = getsockopt, .getsockopt = getsockopt,

View File

@ -151,7 +151,7 @@ void tipc_subscr_report_overlap(struct subscription *sub,
if (!must && !(sub->filter & TIPC_SUB_PORTS)) if (!must && !(sub->filter & TIPC_SUB_PORTS))
return; return;
sub->event_cb(sub, found_lower, found_upper, event, port_ref, node); subscr_send_event(sub, found_lower, found_upper, event, port_ref, node);
} }
/** /**
@ -365,7 +365,6 @@ static struct subscription *subscr_subscribe(struct tipc_subscr *s,
subscr_terminate(subscriber); subscr_terminate(subscriber);
return NULL; return NULL;
} }
sub->event_cb = subscr_send_event;
INIT_LIST_HEAD(&sub->nameseq_list); INIT_LIST_HEAD(&sub->nameseq_list);
list_add(&sub->subscription_list, &subscriber->subscription_list); list_add(&sub->subscription_list, &subscriber->subscription_list);
sub->server_ref = subscriber->port_ref; sub->server_ref = subscriber->port_ref;

View File

@ -39,16 +39,11 @@
struct subscription; struct subscription;
typedef void (*tipc_subscr_event) (struct subscription *sub,
u32 found_lower, u32 found_upper,
u32 event, u32 port_ref, u32 node);
/** /**
* struct subscription - TIPC network topology subscription object * struct subscription - TIPC network topology subscription object
* @seq: name sequence associated with subscription * @seq: name sequence associated with subscription
* @timeout: duration of subscription (in ms) * @timeout: duration of subscription (in ms)
* @filter: event filtering to be done for subscription * @filter: event filtering to be done for subscription
* @event_cb: routine invoked when a subscription event is detected
* @timer: timer governing subscription duration (optional) * @timer: timer governing subscription duration (optional)
* @nameseq_list: adjacent subscriptions in name sequence's subscription list * @nameseq_list: adjacent subscriptions in name sequence's subscription list
* @subscription_list: adjacent subscriptions in subscriber's subscription list * @subscription_list: adjacent subscriptions in subscriber's subscription list
@ -61,7 +56,6 @@ struct subscription {
struct tipc_name_seq seq; struct tipc_name_seq seq;
u32 timeout; u32 timeout;
u32 filter; u32 filter;
tipc_subscr_event event_cb;
struct timer_list timer; struct timer_list timer;
struct list_head nameseq_list; struct list_head nameseq_list;
struct list_head subscription_list; struct list_head subscription_list;