diff --git a/include/linux/skbuff.h b/include/linux/skbuff.h index 0cd92b0f2af5..332e76756f54 100644 --- a/include/linux/skbuff.h +++ b/include/linux/skbuff.h @@ -645,8 +645,15 @@ struct sk_buff { struct rb_node rbnode; /* used in netem & tcp stack */ }; struct sock *sk; - struct net_device *dev; + union { + struct net_device *dev; + /* Some protocols might use this space to store information, + * while device pointer would be NULL. + * UDP receive path is one user. + */ + unsigned long dev_scratch; + }; /* * This is the control buffer. It is free to use for every * layer. Please put your private variables there. If you diff --git a/include/linux/udp.h b/include/linux/udp.h index d1fd8cd39478..c0f530809d1f 100644 --- a/include/linux/udp.h +++ b/include/linux/udp.h @@ -79,6 +79,9 @@ struct udp_sock { int (*gro_complete)(struct sock *sk, struct sk_buff *skb, int nhoff); + + /* This field is dirtied by udp_recvmsg() */ + int forward_deficit; }; static inline struct udp_sock *udp_sk(const struct sock *sk) diff --git a/net/ipv4/udp.c b/net/ipv4/udp.c index f5628ada47b5..9ca279b130d5 100644 --- a/net/ipv4/udp.c +++ b/net/ipv4/udp.c @@ -1177,28 +1177,71 @@ int udp_sendpage(struct sock *sk, struct page *page, int offset, /* fully reclaim rmem/fwd memory allocated for skb */ static void udp_rmem_release(struct sock *sk, int size, int partial) { + struct udp_sock *up = udp_sk(sk); int amt; - atomic_sub(size, &sk->sk_rmem_alloc); + if (likely(partial)) { + up->forward_deficit += size; + size = up->forward_deficit; + if (size < (sk->sk_rcvbuf >> 2) && + !skb_queue_empty(&sk->sk_receive_queue)) + return; + } else { + size += up->forward_deficit; + } + up->forward_deficit = 0; + sk->sk_forward_alloc += size; amt = (sk->sk_forward_alloc - partial) & ~(SK_MEM_QUANTUM - 1); sk->sk_forward_alloc -= amt; if (amt) __sk_mem_reduce_allocated(sk, amt >> SK_MEM_QUANTUM_SHIFT); + + atomic_sub(size, &sk->sk_rmem_alloc); } -/* Note: called with sk_receive_queue.lock held */ +/* Note: called with sk_receive_queue.lock held. + * Instead of using skb->truesize here, find a copy of it in skb->dev_scratch + * This avoids a cache line miss while receive_queue lock is held. + * Look at __udp_enqueue_schedule_skb() to find where this copy is done. + */ void udp_skb_destructor(struct sock *sk, struct sk_buff *skb) { - udp_rmem_release(sk, skb->truesize, 1); + udp_rmem_release(sk, skb->dev_scratch, 1); } EXPORT_SYMBOL(udp_skb_destructor); +/* Idea of busylocks is to let producers grab an extra spinlock + * to relieve pressure on the receive_queue spinlock shared by consumer. + * Under flood, this means that only one producer can be in line + * trying to acquire the receive_queue spinlock. + * These busylock can be allocated on a per cpu manner, instead of a + * per socket one (that would consume a cache line per socket) + */ +static int udp_busylocks_log __read_mostly; +static spinlock_t *udp_busylocks __read_mostly; + +static spinlock_t *busylock_acquire(void *ptr) +{ + spinlock_t *busy; + + busy = udp_busylocks + hash_ptr(ptr, udp_busylocks_log); + spin_lock(busy); + return busy; +} + +static void busylock_release(spinlock_t *busy) +{ + if (busy) + spin_unlock(busy); +} + int __udp_enqueue_schedule_skb(struct sock *sk, struct sk_buff *skb) { struct sk_buff_head *list = &sk->sk_receive_queue; int rmem, delta, amt, err = -ENOMEM; + spinlock_t *busy = NULL; int size; /* try to avoid the costly atomic add/sub pair when the receive @@ -1214,9 +1257,16 @@ int __udp_enqueue_schedule_skb(struct sock *sk, struct sk_buff *skb) * - Less cache line misses at copyout() time * - Less work at consume_skb() (less alien page frag freeing) */ - if (rmem > (sk->sk_rcvbuf >> 1)) + if (rmem > (sk->sk_rcvbuf >> 1)) { skb_condense(skb); + + busy = busylock_acquire(sk); + } size = skb->truesize; + /* Copy skb->truesize into skb->dev_scratch to avoid a cache line miss + * in udp_skb_destructor() + */ + skb->dev_scratch = size; /* we drop only if the receive buf is full and the receive * queue contains some other skb @@ -1243,7 +1293,6 @@ int __udp_enqueue_schedule_skb(struct sock *sk, struct sk_buff *skb) /* no need to setup a destructor, we will explicitly release the * forward allocated memory on dequeue */ - skb->dev = NULL; sock_skb_set_dropcount(sk, skb); __skb_queue_tail(list, skb); @@ -1252,6 +1301,7 @@ int __udp_enqueue_schedule_skb(struct sock *sk, struct sk_buff *skb) if (!sock_flag(sk, SOCK_DEAD)) sk->sk_data_ready(sk); + busylock_release(busy); return 0; uncharge_drop: @@ -1259,6 +1309,7 @@ int __udp_enqueue_schedule_skb(struct sock *sk, struct sk_buff *skb) drop: atomic_inc(&sk->sk_drops); + busylock_release(busy); return err; } EXPORT_SYMBOL_GPL(__udp_enqueue_schedule_skb); @@ -2613,6 +2664,7 @@ EXPORT_SYMBOL(udp_flow_hashrnd); void __init udp_init(void) { unsigned long limit; + unsigned int i; udp_table_init(&udp_table, "UDP"); limit = nr_free_buffer_pages() / 8; @@ -2623,4 +2675,13 @@ void __init udp_init(void) sysctl_udp_rmem_min = SK_MEM_QUANTUM; sysctl_udp_wmem_min = SK_MEM_QUANTUM; + + /* 16 spinlocks per cpu */ + udp_busylocks_log = ilog2(nr_cpu_ids) + 4; + udp_busylocks = kmalloc(sizeof(spinlock_t) << udp_busylocks_log, + GFP_KERNEL); + if (!udp_busylocks) + panic("UDP: failed to alloc udp_busylocks\n"); + for (i = 0; i < (1U << udp_busylocks_log); i++) + spin_lock_init(udp_busylocks + i); }