diff --git a/net/xdp/xsk_queue.h b/net/xdp/xsk_queue.h index 610c0bdc0c2b..88b9ae24658d 100644 --- a/net/xdp/xsk_queue.h +++ b/net/xdp/xsk_queue.h @@ -43,6 +43,48 @@ struct xsk_queue { u64 invalid_descs; }; +/* The structure of the shared state of the rings are the same as the + * ring buffer in kernel/events/ring_buffer.c. For the Rx and completion + * ring, the kernel is the producer and user space is the consumer. For + * the Tx and fill rings, the kernel is the consumer and user space is + * the producer. + * + * producer consumer + * + * if (LOAD ->consumer) { LOAD ->producer + * (A) smp_rmb() (C) + * STORE $data LOAD $data + * smp_wmb() (B) smp_mb() (D) + * STORE ->producer STORE ->consumer + * } + * + * (A) pairs with (D), and (B) pairs with (C). + * + * Starting with (B), it protects the data from being written after + * the producer pointer. If this barrier was missing, the consumer + * could observe the producer pointer being set and thus load the data + * before the producer has written the new data. The consumer would in + * this case load the old data. + * + * (C) protects the consumer from speculatively loading the data before + * the producer pointer actually has been read. If we do not have this + * barrier, some architectures could load old data as speculative loads + * are not discarded as the CPU does not know there is a dependency + * between ->producer and data. + * + * (A) is a control dependency that separates the load of ->consumer + * from the stores of $data. In case ->consumer indicates there is no + * room in the buffer to store $data we do not. So no barrier is needed. + * + * (D) protects the load of the data to be observed to happen after the + * store of the consumer pointer. If we did not have this memory + * barrier, the producer could observe the consumer pointer being set + * and overwrite the data with a new value before the consumer got the + * chance to read the old value. The consumer would thus miss reading + * the old entry and very likely read the new entry twice, once right + * now and again after circling through the ring. + */ + /* Common functions operating for both RXTX and umem queues */ static inline u64 xskq_nb_invalid_descs(struct xsk_queue *q) @@ -106,6 +148,7 @@ static inline u64 *xskq_validate_addr(struct xsk_queue *q, u64 *addr) static inline u64 *xskq_peek_addr(struct xsk_queue *q, u64 *addr) { if (q->cons_tail == q->cons_head) { + smp_mb(); /* D, matches A */ WRITE_ONCE(q->ring->consumer, q->cons_tail); q->cons_head = q->cons_tail + xskq_nb_avail(q, RX_BATCH_SIZE); @@ -128,10 +171,11 @@ static inline int xskq_produce_addr(struct xsk_queue *q, u64 addr) if (xskq_nb_free(q, q->prod_tail, 1) == 0) return -ENOSPC; + /* A, matches D */ ring->desc[q->prod_tail++ & q->ring_mask] = addr; /* Order producer and data */ - smp_wmb(); + smp_wmb(); /* B, matches C */ WRITE_ONCE(q->ring->producer, q->prod_tail); return 0; @@ -144,6 +188,7 @@ static inline int xskq_produce_addr_lazy(struct xsk_queue *q, u64 addr) if (xskq_nb_free(q, q->prod_head, LAZY_UPDATE_THRESHOLD) == 0) return -ENOSPC; + /* A, matches D */ ring->desc[q->prod_head++ & q->ring_mask] = addr; return 0; } @@ -152,7 +197,7 @@ static inline void xskq_produce_flush_addr_n(struct xsk_queue *q, u32 nb_entries) { /* Order producer and data */ - smp_wmb(); + smp_wmb(); /* B, matches C */ q->prod_tail += nb_entries; WRITE_ONCE(q->ring->producer, q->prod_tail); @@ -163,6 +208,7 @@ static inline int xskq_reserve_addr(struct xsk_queue *q) if (xskq_nb_free(q, q->prod_head, 1) == 0) return -ENOSPC; + /* A, matches D */ q->prod_head++; return 0; } @@ -204,11 +250,12 @@ static inline struct xdp_desc *xskq_peek_desc(struct xsk_queue *q, struct xdp_desc *desc) { if (q->cons_tail == q->cons_head) { + smp_mb(); /* D, matches A */ WRITE_ONCE(q->ring->consumer, q->cons_tail); q->cons_head = q->cons_tail + xskq_nb_avail(q, RX_BATCH_SIZE); /* Order consumer and data */ - smp_rmb(); + smp_rmb(); /* C, matches B */ } return xskq_validate_desc(q, desc); @@ -228,6 +275,7 @@ static inline int xskq_produce_batch_desc(struct xsk_queue *q, if (xskq_nb_free(q, q->prod_head, 1) == 0) return -ENOSPC; + /* A, matches D */ idx = (q->prod_head++) & q->ring_mask; ring->desc[idx].addr = addr; ring->desc[idx].len = len; @@ -238,7 +286,7 @@ static inline int xskq_produce_batch_desc(struct xsk_queue *q, static inline void xskq_produce_flush_desc(struct xsk_queue *q) { /* Order producer and data */ - smp_wmb(); + smp_wmb(); /* B, matches C */ q->prod_tail = q->prod_head, WRITE_ONCE(q->ring->producer, q->prod_tail);