Merge branch 'packet_rollover'

Willem de Bruijn says:

====================
refine packet socket rollover:

1. mitigate a case of lock contention
2. avoid exporting resource exhaustion to other sockets,
   by migrating only to a victim socket that has ample room
3. avoid reordering of most flows on the socket,
   by migrating first the flow responsible for load imbalance
4. help processes detect load imbalance,
   by exporting rollover counters

Context: rollover implements flow migration in packet socket fanout
groups in case of extreme load imbalance. It is a specific
implementation of migration that minimizes reordering by selecting
the same victim socket when possible (and by selecting subsequent
victims in a round robin fashion, from which its name derives).

Changes:
  v2 -> v3:
    - statistics: replace unsigned long with __aligned_u64
  v1 -> v2:
    - huge flow detection: run lockless
    - huge flow detection: replace stored index with random
    - contention avoidance: test in packet_poll while lock held
    - contention avoidance: clear pressure sooner

          packet_poll and packet_recvmsg would clear only if the sock
          is empty to avoid taking the necessary lock. But,
          * packet_poll already holds this lock, so a lockless variant
            __packet_rcv_has_room is cheap.
          * packet_recvmsg is usually called only for non-ring sockets,
            which also runs lockless.

    - preparation: drop "single return" patch

          packet_rcv_has_room is now a locked wrapper around
          __packet_rcv_has_room, achieving the same (single footer).

The benchmark mentioned in the patches is at
https://github.com/wdebruij/kerneltools/blob/master/tests/bench_rollover.c
====================

Signed-off-by: David S. Miller <davem@davemloft.net>
This commit is contained in:
David S. Miller 2015-05-13 15:43:01 -04:00
commit 9f0a74d7b6
3 changed files with 163 additions and 28 deletions

View File

@ -54,6 +54,7 @@ struct sockaddr_ll {
#define PACKET_FANOUT 18
#define PACKET_TX_HAS_OFF 19
#define PACKET_QDISC_BYPASS 20
#define PACKET_ROLLOVER_STATS 21
#define PACKET_FANOUT_HASH 0
#define PACKET_FANOUT_LB 1
@ -75,6 +76,12 @@ struct tpacket_stats_v3 {
unsigned int tp_freeze_q_cnt;
};
struct tpacket_rollover_stats {
__aligned_u64 tp_all;
__aligned_u64 tp_huge;
__aligned_u64 tp_failed;
};
union tpacket_stats_u {
struct tpacket_stats stats1;
struct tpacket_stats_v3 stats3;

View File

@ -1234,27 +1234,86 @@ static void packet_free_pending(struct packet_sock *po)
free_percpu(po->tx_ring.pending_refcnt);
}
static bool packet_rcv_has_room(struct packet_sock *po, struct sk_buff *skb)
#define ROOM_POW_OFF 2
#define ROOM_NONE 0x0
#define ROOM_LOW 0x1
#define ROOM_NORMAL 0x2
static bool __tpacket_has_room(struct packet_sock *po, int pow_off)
{
int idx, len;
len = po->rx_ring.frame_max + 1;
idx = po->rx_ring.head;
if (pow_off)
idx += len >> pow_off;
if (idx >= len)
idx -= len;
return packet_lookup_frame(po, &po->rx_ring, idx, TP_STATUS_KERNEL);
}
static bool __tpacket_v3_has_room(struct packet_sock *po, int pow_off)
{
int idx, len;
len = po->rx_ring.prb_bdqc.knum_blocks;
idx = po->rx_ring.prb_bdqc.kactive_blk_num;
if (pow_off)
idx += len >> pow_off;
if (idx >= len)
idx -= len;
return prb_lookup_block(po, &po->rx_ring, idx, TP_STATUS_KERNEL);
}
static int __packet_rcv_has_room(struct packet_sock *po, struct sk_buff *skb)
{
struct sock *sk = &po->sk;
int ret = ROOM_NONE;
if (po->prot_hook.func != tpacket_rcv) {
int avail = sk->sk_rcvbuf - atomic_read(&sk->sk_rmem_alloc)
- (skb ? skb->truesize : 0);
if (avail > (sk->sk_rcvbuf >> ROOM_POW_OFF))
return ROOM_NORMAL;
else if (avail > 0)
return ROOM_LOW;
else
return ROOM_NONE;
}
if (po->tp_version == TPACKET_V3) {
if (__tpacket_v3_has_room(po, ROOM_POW_OFF))
ret = ROOM_NORMAL;
else if (__tpacket_v3_has_room(po, 0))
ret = ROOM_LOW;
} else {
if (__tpacket_has_room(po, ROOM_POW_OFF))
ret = ROOM_NORMAL;
else if (__tpacket_has_room(po, 0))
ret = ROOM_LOW;
}
return ret;
}
static int packet_rcv_has_room(struct packet_sock *po, struct sk_buff *skb)
{
int ret;
bool has_room;
if (po->prot_hook.func != tpacket_rcv)
return (atomic_read(&sk->sk_rmem_alloc) + skb->truesize)
<= sk->sk_rcvbuf;
if (po->prot_hook.func == tpacket_rcv) {
spin_lock(&po->sk.sk_receive_queue.lock);
ret = __packet_rcv_has_room(po, skb);
spin_unlock(&po->sk.sk_receive_queue.lock);
} else {
ret = __packet_rcv_has_room(po, skb);
}
spin_lock(&sk->sk_receive_queue.lock);
if (po->tp_version == TPACKET_V3)
has_room = prb_lookup_block(po, &po->rx_ring,
po->rx_ring.prb_bdqc.kactive_blk_num,
TP_STATUS_KERNEL);
else
has_room = packet_lookup_frame(po, &po->rx_ring,
po->rx_ring.head,
TP_STATUS_KERNEL);
spin_unlock(&sk->sk_receive_queue.lock);
has_room = ret == ROOM_NORMAL;
if (po->pressure == has_room)
xchg(&po->pressure, !has_room);
return has_room;
return ret;
}
static void packet_sock_destruct(struct sock *sk)
@ -1282,6 +1341,20 @@ static int fanout_rr_next(struct packet_fanout *f, unsigned int num)
return x;
}
static bool fanout_flow_is_huge(struct packet_sock *po, struct sk_buff *skb)
{
u32 rxhash;
int i, count = 0;
rxhash = skb_get_hash(skb);
for (i = 0; i < ROLLOVER_HLEN; i++)
if (po->rollover->history[i] == rxhash)
count++;
po->rollover->history[prandom_u32() % ROLLOVER_HLEN] = rxhash;
return count > (ROLLOVER_HLEN >> 1);
}
static unsigned int fanout_demux_hash(struct packet_fanout *f,
struct sk_buff *skb,
unsigned int num)
@ -1318,22 +1391,39 @@ static unsigned int fanout_demux_rnd(struct packet_fanout *f,
static unsigned int fanout_demux_rollover(struct packet_fanout *f,
struct sk_buff *skb,
unsigned int idx, unsigned int skip,
unsigned int idx, bool try_self,
unsigned int num)
{
unsigned int i, j;
struct packet_sock *po, *po_next;
unsigned int i, j, room = ROOM_NONE;
i = j = min_t(int, f->next[idx], num - 1);
po = pkt_sk(f->arr[idx]);
if (try_self) {
room = packet_rcv_has_room(po, skb);
if (room == ROOM_NORMAL ||
(room == ROOM_LOW && !fanout_flow_is_huge(po, skb)))
return idx;
}
i = j = min_t(int, po->rollover->sock, num - 1);
do {
if (i != skip && packet_rcv_has_room(pkt_sk(f->arr[i]), skb)) {
po_next = pkt_sk(f->arr[i]);
if (po_next != po && !po_next->pressure &&
packet_rcv_has_room(po_next, skb) == ROOM_NORMAL) {
if (i != j)
f->next[idx] = i;
po->rollover->sock = i;
atomic_long_inc(&po->rollover->num);
if (room == ROOM_LOW)
atomic_long_inc(&po->rollover->num_huge);
return i;
}
if (++i == num)
i = 0;
} while (i != j);
atomic_long_inc(&po->rollover->num_failed);
return idx;
}
@ -1386,17 +1476,14 @@ static int packet_rcv_fanout(struct sk_buff *skb, struct net_device *dev,
idx = fanout_demux_qm(f, skb, num);
break;
case PACKET_FANOUT_ROLLOVER:
idx = fanout_demux_rollover(f, skb, 0, (unsigned int) -1, num);
idx = fanout_demux_rollover(f, skb, 0, false, num);
break;
}
po = pkt_sk(f->arr[idx]);
if (fanout_has_flag(f, PACKET_FANOUT_FLAG_ROLLOVER) &&
unlikely(!packet_rcv_has_room(po, skb))) {
idx = fanout_demux_rollover(f, skb, idx, idx, num);
po = pkt_sk(f->arr[idx]);
}
if (fanout_has_flag(f, PACKET_FANOUT_FLAG_ROLLOVER))
idx = fanout_demux_rollover(f, skb, idx, true, num);
po = pkt_sk(f->arr[idx]);
return po->prot_hook.func(skb, dev, &po->prot_hook, orig_dev);
}
@ -1467,6 +1554,15 @@ static int fanout_add(struct sock *sk, u16 id, u16 type_flags)
if (po->fanout)
return -EALREADY;
if (type_flags & PACKET_FANOUT_FLAG_ROLLOVER) {
po->rollover = kzalloc(sizeof(*po->rollover), GFP_KERNEL);
if (!po->rollover)
return -ENOMEM;
atomic_long_set(&po->rollover->num, 0);
atomic_long_set(&po->rollover->num_huge, 0);
atomic_long_set(&po->rollover->num_failed, 0);
}
mutex_lock(&fanout_mutex);
match = NULL;
list_for_each_entry(f, &fanout_list, list) {
@ -1515,6 +1611,10 @@ static int fanout_add(struct sock *sk, u16 id, u16 type_flags)
}
out:
mutex_unlock(&fanout_mutex);
if (err) {
kfree(po->rollover);
po->rollover = NULL;
}
return err;
}
@ -1536,6 +1636,8 @@ static void fanout_release(struct sock *sk)
kfree(f);
}
mutex_unlock(&fanout_mutex);
kfree(po->rollover);
}
static const struct proto_ops packet_ops;
@ -2865,6 +2967,7 @@ static int packet_create(struct net *net, struct socket *sock, int protocol,
spin_lock_init(&po->bind_lock);
mutex_init(&po->pg_vec_lock);
po->rollover = NULL;
po->prot_hook.func = packet_rcv;
if (sock->type == SOCK_PACKET)
@ -2942,6 +3045,9 @@ static int packet_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
if (skb == NULL)
goto out;
if (pkt_sk(sk)->pressure)
packet_rcv_has_room(pkt_sk(sk), NULL);
if (pkt_sk(sk)->has_vnet_hdr) {
struct virtio_net_hdr vnet_hdr = { 0 };
@ -3485,6 +3591,7 @@ static int packet_getsockopt(struct socket *sock, int level, int optname,
struct packet_sock *po = pkt_sk(sk);
void *data = &val;
union tpacket_stats_u st;
struct tpacket_rollover_stats rstats;
if (level != SOL_PACKET)
return -ENOPROTOOPT;
@ -3560,6 +3667,15 @@ static int packet_getsockopt(struct socket *sock, int level, int optname,
((u32)po->fanout->flags << 24)) :
0);
break;
case PACKET_ROLLOVER_STATS:
if (!po->rollover)
return -EINVAL;
rstats.tp_all = atomic_long_read(&po->rollover->num);
rstats.tp_huge = atomic_long_read(&po->rollover->num_huge);
rstats.tp_failed = atomic_long_read(&po->rollover->num_failed);
data = &rstats;
lv = sizeof(rstats);
break;
case PACKET_TX_HAS_OFF:
val = po->tp_tx_has_off;
break;
@ -3697,6 +3813,8 @@ static unsigned int packet_poll(struct file *file, struct socket *sock,
TP_STATUS_KERNEL))
mask |= POLLIN | POLLRDNORM;
}
if (po->pressure && __packet_rcv_has_room(po, NULL) == ROOM_NORMAL)
xchg(&po->pressure, 0);
spin_unlock_bh(&sk->sk_receive_queue.lock);
spin_lock_bh(&sk->sk_write_queue.lock);
if (po->tx_ring.pg_vec) {

View File

@ -82,12 +82,20 @@ struct packet_fanout {
atomic_t rr_cur;
struct list_head list;
struct sock *arr[PACKET_FANOUT_MAX];
int next[PACKET_FANOUT_MAX];
spinlock_t lock;
atomic_t sk_ref;
struct packet_type prot_hook ____cacheline_aligned_in_smp;
};
struct packet_rollover {
int sock;
atomic_long_t num;
atomic_long_t num_huge;
atomic_long_t num_failed;
#define ROLLOVER_HLEN (L1_CACHE_BYTES / sizeof(u32))
u32 history[ROLLOVER_HLEN] ____cacheline_aligned;
} ____cacheline_aligned_in_smp;
struct packet_sock {
/* struct sock has to be the first member of packet_sock */
struct sock sk;
@ -102,8 +110,10 @@ struct packet_sock {
auxdata:1,
origdev:1,
has_vnet_hdr:1;
int pressure;
int ifindex; /* bound device */
__be16 num;
struct packet_rollover *rollover;
struct packet_mclist *mclist;
atomic_t mapped;
enum tpacket_versions tp_version;