Merge branch 'kcm'

Tom Herbert says:

====================
kcm: Kernel Connection Multiplexor (KCM)

Kernel Connection Multiplexor (KCM) is a facility that provides a
message based interface over TCP for generic application protocols.
The motivation for this is based on the observation that although
TCP is byte stream transport protocol with no concept of message
boundaries, a common use case is to implement a framed application
layer protocol running over TCP. To date, most TCP stacks offer
byte stream API for applications, which places the burden of message
delineation, message I/O operation atomicity, and load balancing
in the application. With KCM an application can efficiently send
and receive application protocol messages over TCP using a
datagram interface.

In order to delineate message in a TCP stream for receive in KCM, the
kernel implements a message parser. For this we chose to employ BPF
which is applied to the TCP stream. BPF code parses application layer
messages and returns a message length. Nearly all binary application
protocols are parsable in this manner, so KCM should be applicable
across a wide range of applications. Other than message length
determination in receive, KCM does not require any other application
specific awareness. KCM does not implement any other application
protocol semantics-- these are are provided in userspace or could be
implemented in a kernel module layered above KCM.

KCM implements an NxM multiplexor in the kernel as diagrammed below:

+------------+   +------------+   +------------+   +------------+
| KCM socket |   | KCM socket |   | KCM socket |   | KCM socket |
+------------+   +------------+   +------------+   +------------+
      |                 |               |                |
      +-----------+     |               |     +----------+
                  |     |               |     |
               +----------------------------------+
               |           Multiplexor            |
               +----------------------------------+
                 |   |           |           |  |
       +---------+   |           |           |  ------------+
       |             |           |           |              |
+----------+  +----------+  +----------+  +----------+ +----------+
|  Psock   |  |  Psock   |  |  Psock   |  |  Psock   | |  Psock   |
+----------+  +----------+  +----------+  +----------+ +----------+
      |              |           |            |             |
+----------+  +----------+  +----------+  +----------+ +----------+
| TCP sock |  | TCP sock |  | TCP sock |  | TCP sock | | TCP sock |
+----------+  +----------+  +----------+  +----------+ +----------+

The KCM sockets provide the datagram interface to applications,
Psocks are the state for each attached TCP connection (i.e. where
message delineation is performed on receive).

A description of the APIs and design can be found in the included
Documentation/networking/kcm.txt.

In this patch set:

  - Add MSG_BATCH flag. This is used in sendmsg msg_hdr flags to
    indicate that more messages will be sent on the socket. The stack
    may batch messages up if it is beneficial for transmission.
  - In sendmmsg, set MSG_BATCH in all sub messages except for the last
    one.
  - In order to allow sendmmsg to contain multiple messages with
    SOCK_SEQPAKET we allow each msg_hdr in the sendmmsg to set MSG_EOR.
  - Add KCM module
    - This supports SOCK_DGRAM and SOCK_SEQPACKET.
  - KCM documentation

v2:
  - Added splice and page operations.
  - Assemble receive messages in place on TCP socket (don't have a
    separate assembly queue.
  - Based on above, enforce maxmimum receive message to be the size
    of the recceive socket buffer.
  - Support message assembly timeout. Use the timeout value in
    sk_rcvtimeo on the TCP socket.
  - Tested some with a couple of other production applications,
    see ~5% improvement in application latency.

Testing:

Dave Watson has integrated KCM into Thrift and we intend to put these
changes into open source. Example of this is in:

https://github.com/djwatson/fbthrift/commit/
dd7e0f9cf4e80912fdb90f6cd394db24e61a14cc

Some initial KCM Thrift benchmark numbers (comment from Dave)

Thrift by default ties a single connection to a single thread.  KCM is
instead able to load balance multiple connections across multiple epoll
loops easily.

A test sending ~5k bytes of data to a kcm thrift server, dropping the
bytes on recv:

QPS     Latency / std dev Latency
  without KCM
    70336     209/123
  with KCM
    70353     191/124

A test sending a small request, then doing work in the epoll thread,
before serving more requests:

QPS     Latency / std dev Latency
without KCM
    14282     559/602
with KCM
    23192     344/234

At the high end, there's definitely some additional kernel overhead:

Cranking the pipelining way up, with lots of small requests

QPS     Latency / std dev Latency
without KCM
   1863429     127/119
with KCM
   1337713     192/241

---

So for a "realistic" workload, KCM performs pretty well (second case).
Under extreme conditions of highest tps we still have some work to do.
In its nature a multiplexor will spread work between CPUs which is
logically good for load balancing but coan conflict with the goal
promoting affinity. Batching messages on both send and receive are
the means to recoup performance.

Future support:

 - Integration with TLS (TLS-in-kernel is a separate initiative).
 - Page operations/splice support
 - Unconnected KCM sockets. Will be able to attach sockets to different
   destinations, AF_KCM addresses with be used in sendmsg and recvmsg
   to indicate destination
 - Explore more utility in performing BPF inline with a TCP data stream
   (setting SO_MARK, rxhash for messages being sent received on
   KCM sockets).
 - Performance work
   - Diagnose performance issues under high message load

FAQ (Questions posted on LWN)

Q: Why do this in the kernel?

A: Because the kernel is good at scheduling threads and steering packets
   to threads. KCM fits well into this model since it allows the unit
   of work for scheduling and steering to be the application layer
   messages themselves. KCM should be thought of as generic application
   protocol acceleration. It to the philosophy that the kernel provides
   generic and extensible interfaces.

Q: How can adding code in the path yield better performance?

A: It is true that for just sending receiving a single message there
   would be some performance loss since the code path is longer (for
   instance comparing netperf to KCM). But for real production
   applications performance takes on many dynamics. Parallelism, context
   switching, affinity, granularity of locking, and load balancing are
   all relevant. The theory of KCM is that by an application-centric
   interface, the kernel can provide better support for these
   performance characteristics.

Q: Why not use an existing message-oriented protocol such as RUDP,
   DCCP, SCTP, RDS, and others?

A: Because that would entail using a completely new transport protocol.
   Deploying a new protocol at scale is either a huge undertaking or
   fundamentally infeasible. This is true in either the Internet and in
   the data center due in a large part to protocol ossification.
   Besides, KCM we want KCM to work existing, well deployed application
   protocols that we couldn't change even if we wanted to (e.g. http/2).

   KCM simply defines a new interface method, it does not redefine any
   aspect of the transport protocol nor application protocol, nor set
   any new requirements on these. Neither does KCM attempt to implement
   any application protocol logic other than message deliniation in the
   stream. These are fundamental requirement of KCM.

Q: How does this affect TCP?

A: It doesn't, not in the slightest. The use of KCM can be one-sided,
   KCM has no effect on the wire.

Q: Why force TCP into doing something it's not designed for?

A: TCP is defined as transport protocol and there is no standard that
   says the API into TCP must be stream based sockets, or for that
   matter sockets at all (or even that TCP needs to be implemented in a
   kernel). KCM is not inconsistent with the design of TCP just because
   to makes an message based interface over TCP, if it were then every
   application protocol sending messages over TCP would also be! :-)

Q: What about the problem of a connections with very slow rate of
   incoming data? As a result your application can get storms of very
   short reads. And it actually happens a lot with connection from
   mobile devices and it is a problem for servers handling a lot of
   connections.

A: The storm of short reads will occur regardless of whether KCM is used
   or not. KCM does have one advantage in this scenario though, it will
   only wake up the application when a full message has been received,
   not for each packet that makes up part of a bigger messages. If a
   bunch of small messages are received, the application can receive
   messages in batches using recvmmsg.

Q: Why not just use DPDK, or at least provide KCM like functionality in
   DPDK?

A: DPDK, or more generally OS bypass presumably with a TCP stack in
   userland, presents a different model of load balancing than that of
   KCM (and the kernel). KCM implements load balancing of messages
   across the threads of an application, whereas DPDK load balances
   based on queues which are more static and coarse-grained since
   multiple connections are bound to queues. DPDK works best when
   processing of packets is silo'ed in a thread on the CPU processing
   a queue, and packet processing (for both the stack and application)
   is fairly uniform. KCM works well for applications where the amount
   of work to process messages varies an application work is commonly
   delegated to worker threads often on different CPUs.

   The message based interface over TCP is something that could be
   provide by a DPDK or OS bypass library.

Q: I'm not quite seeing this for HTTP. Maybe for HTTP/2, I guess, or web
   sockets?

A: Yes. KCM is most appropriate for message based protocols over TCP
   where is easy to deduce the message length (e.g. a length field)
   and the protocol implements its own message ordering semantics.
   Fortunately this encompasses many modern protocols.

Q: How is memory limited and controlled?

A: In v2 all data for messages is now kept in socket buffers, either
   those for TCP or KCM, so socket buffer limits are applicable.
   This includes receive messages assembly which is now done ont teh
   TCP socket buffer instead of a separate queue-- this has the
   consequence that the TCP socket buffer limit provides an
   enforceable maxmimum message size.

   Additionally, a timeout may be set for messages assembly. The
   value used for this is taken from sk_rcvtimeo of the TCP socket.
====================

Signed-off-by: David S. Miller <davem@davemloft.net>
This commit is contained in:
David S. Miller 2016-03-09 16:36:16 -05:00
commit 9531ab65f4
16 changed files with 3483 additions and 43 deletions

View File

@ -0,0 +1,285 @@
Kernel Connection Mulitplexor
-----------------------------
Kernel Connection Multiplexor (KCM) is a mechanism that provides a message based
interface over TCP for generic application protocols. With KCM an application
can efficiently send and receive application protocol messages over TCP using
datagram sockets.
KCM implements an NxM multiplexor in the kernel as diagrammed below:
+------------+ +------------+ +------------+ +------------+
| KCM socket | | KCM socket | | KCM socket | | KCM socket |
+------------+ +------------+ +------------+ +------------+
| | | |
+-----------+ | | +----------+
| | | |
+----------------------------------+
| Multiplexor |
+----------------------------------+
| | | | |
+---------+ | | | ------------+
| | | | |
+----------+ +----------+ +----------+ +----------+ +----------+
| Psock | | Psock | | Psock | | Psock | | Psock |
+----------+ +----------+ +----------+ +----------+ +----------+
| | | | |
+----------+ +----------+ +----------+ +----------+ +----------+
| TCP sock | | TCP sock | | TCP sock | | TCP sock | | TCP sock |
+----------+ +----------+ +----------+ +----------+ +----------+
KCM sockets
-----------
The KCM sockets provide the user interface to the muliplexor. All the KCM sockets
bound to a multiplexor are considered to have equivalent function, and I/O
operations in different sockets may be done in parallel without the need for
synchronization between threads in userspace.
Multiplexor
-----------
The multiplexor provides the message steering. In the transmit path, messages
written on a KCM socket are sent atomically on an appropriate TCP socket.
Similarly, in the receive path, messages are constructed on each TCP socket
(Psock) and complete messages are steered to a KCM socket.
TCP sockets & Psocks
--------------------
TCP sockets may be bound to a KCM multiplexor. A Psock structure is allocated
for each bound TCP socket, this structure holds the state for constructing
messages on receive as well as other connection specific information for KCM.
Connected mode semantics
------------------------
Each multiplexor assumes that all attached TCP connections are to the same
destination and can use the different connections for load balancing when
transmitting. The normal send and recv calls (include sendmmsg and recvmmsg)
can be used to send and receive messages from the KCM socket.
Socket types
------------
KCM supports SOCK_DGRAM and SOCK_SEQPACKET socket types.
Message delineation
-------------------
Messages are sent over a TCP stream with some application protocol message
format that typically includes a header which frames the messages. The length
of a received message can be deduced from the application protocol header
(often just a simple length field).
A TCP stream must be parsed to determine message boundaries. Berkeley Packet
Filter (BPF) is used for this. When attaching a TCP socket to a multiplexor a
BPF program must be specified. The program is called at the start of receiving
a new message and is given an skbuff that contains the bytes received so far.
It parses the message header and returns the length of the message. Given this
information, KCM will construct the message of the stated length and deliver it
to a KCM socket.
TCP socket management
---------------------
When a TCP socket is attached to a KCM multiplexor data ready (POLLIN) and
write space available (POLLOUT) events are handled by the multiplexor. If there
is a state change (disconnection) or other error on a TCP socket, an error is
posted on the TCP socket so that a POLLERR event happens and KCM discontinues
using the socket. When the application gets the error notification for a
TCP socket, it should unattach the socket from KCM and then handle the error
condition (the typical response is to close the socket and create a new
connection if necessary).
KCM limits the maximum receive message size to be the size of the receive
socket buffer on the attached TCP socket (the socket buffer size can be set by
SO_RCVBUF). If the length of a new message reported by the BPF program is
greater than this limit a corresponding error (EMSGSIZE) is posted on the TCP
socket. The BPF program may also enforce a maximum messages size and report an
error when it is exceeded.
A timeout may be set for assembling messages on a receive socket. The timeout
value is taken from the receive timeout of the attached TCP socket (this is set
by SO_RCVTIMEO). If the timer expires before assembly is complete an error
(ETIMEDOUT) is posted on the socket.
User interface
==============
Creating a multiplexor
----------------------
A new multiplexor and initial KCM socket is created by a socket call:
socket(AF_KCM, type, protocol)
- type is either SOCK_DGRAM or SOCK_SEQPACKET
- protocol is KCMPROTO_CONNECTED
Cloning KCM sockets
-------------------
After the first KCM socket is created using the socket call as described
above, additional sockets for the multiplexor can be created by cloning
a KCM socket. This is accomplished by an ioctl on a KCM socket:
/* From linux/kcm.h */
struct kcm_clone {
int fd;
};
struct kcm_clone info;
memset(&info, 0, sizeof(info));
err = ioctl(kcmfd, SIOCKCMCLONE, &info);
if (!err)
newkcmfd = info.fd;
Attach transport sockets
------------------------
Attaching of transport sockets to a multiplexor is performed by calling an
ioctl on a KCM socket for the multiplexor. e.g.:
/* From linux/kcm.h */
struct kcm_attach {
int fd;
int bpf_fd;
};
struct kcm_attach info;
memset(&info, 0, sizeof(info));
info.fd = tcpfd;
info.bpf_fd = bpf_prog_fd;
ioctl(kcmfd, SIOCKCMATTACH, &info);
The kcm_attach structure contains:
fd: file descriptor for TCP socket being attached
bpf_prog_fd: file descriptor for compiled BPF program downloaded
Unattach transport sockets
--------------------------
Unattaching a transport socket from a multiplexor is straightforward. An
"unattach" ioctl is done with the kcm_unattach structure as the argument:
/* From linux/kcm.h */
struct kcm_unattach {
int fd;
};
struct kcm_unattach info;
memset(&info, 0, sizeof(info));
info.fd = cfd;
ioctl(fd, SIOCKCMUNATTACH, &info);
Disabling receive on KCM socket
-------------------------------
A setsockopt is used to disable or enable receiving on a KCM socket.
When receive is disabled, any pending messages in the socket's
receive buffer are moved to other sockets. This feature is useful
if an application thread knows that it will be doing a lot of
work on a request and won't be able to service new messages for a
while. Example use:
int val = 1;
setsockopt(kcmfd, SOL_KCM, KCM_RECV_DISABLE, &val, sizeof(val))
BFP programs for message delineation
------------------------------------
BPF programs can be compiled using the BPF LLVM backend. For exmple,
the BPF program for parsing Thrift is:
#include "bpf.h" /* for __sk_buff */
#include "bpf_helpers.h" /* for load_word intrinsic */
SEC("socket_kcm")
int bpf_prog1(struct __sk_buff *skb)
{
return load_word(skb, 0) + 4;
}
char _license[] SEC("license") = "GPL";
Use in applications
===================
KCM accelerates application layer protocols. Specifically, it allows
applications to use a message based interface for sending and receiving
messages. The kernel provides necessary assurances that messages are sent
and received atomically. This relieves much of the burden applications have
in mapping a message based protocol onto the TCP stream. KCM also make
application layer messages a unit of work in the kernel for the purposes of
steerng and scheduling, which in turn allows a simpler networking model in
multithreaded applications.
Configurations
--------------
In an Nx1 configuration, KCM logically provides multiple socket handles
to the same TCP connection. This allows parallelism between in I/O
operations on the TCP socket (for instance copyin and copyout of data is
parallelized). In an application, a KCM socket can be opened for each
processing thread and inserted into the epoll (similar to how SO_REUSEPORT
is used to allow multiple listener sockets on the same port).
In a MxN configuration, multiple connections are established to the
same destination. These are used for simple load balancing.
Message batching
----------------
The primary purpose of KCM is load balancing between KCM sockets and hence
threads in a nominal use case. Perfect load balancing, that is steering
each received message to a different KCM socket or steering each sent
message to a different TCP socket, can negatively impact performance
since this doesn't allow for affinities to be established. Balancing
based on groups, or batches of messages, can be beneficial for performance.
On transmit, there are three ways an application can batch (pipeline)
messages on a KCM socket.
1) Send multiple messages in a single sendmmsg.
2) Send a group of messages each with a sendmsg call, where all messages
except the last have MSG_BATCH in the flags of sendmsg call.
3) Create "super message" composed of multiple messages and send this
with a single sendmsg.
On receive, the KCM module attempts to queue messages received on the
same KCM socket during each TCP ready callback. The targeted KCM socket
changes at each receive ready callback on the KCM socket. The application
does not need to configure this.
Error handling
--------------
An application should include a thread to monitor errors raised on
the TCP connection. Normally, this will be done by placing each
TCP socket attached to a KCM multiplexor in epoll set for POLLERR
event. If an error occurs on an attached TCP socket, KCM sets an EPIPE
on the socket thus waking up the application thread. When the application
sees the error (which may just be a disconnect) it should unattach the
socket from KCM and then close it. It is assumed that once an error is
posted on the TCP socket the data stream is unrecoverable (i.e. an error
may have occurred in in the middle of receiving a messssge).
TCP connection monitoring
-------------------------
In KCM there is no means to correlate a message to the TCP socket that
was used to send or receive the message (except in the case there is
only one attached TCP socket). However, the application does retain
an open file descriptor to the socket so it will be able to get statistics
from the socket which can be used in detecting issues (such as high
retransmissions on the socket).

View File

@ -215,6 +215,7 @@ int __sock_create(struct net *net, int family, int type, int proto,
int sock_create(int family, int type, int proto, struct socket **res);
int sock_create_kern(struct net *net, int family, int type, int proto, struct socket **res);
int sock_create_lite(int family, int type, int proto, struct socket **res);
struct socket *sock_alloc(void);
void sock_release(struct socket *sock);
int sock_sendmsg(struct socket *sock, struct msghdr *msg);
int sock_recvmsg(struct socket *sock, struct msghdr *msg, size_t size,

View File

@ -318,6 +318,27 @@ static inline void list_splice_tail_init_rcu(struct list_head *list,
likely(__ptr != __next) ? list_entry_rcu(__next, type, member) : NULL; \
})
/**
* list_next_or_null_rcu - get the first element from a list
* @head: the head for the list.
* @ptr: the list head to take the next element from.
* @type: the type of the struct this is embedded in.
* @member: the name of the list_head within the struct.
*
* Note that if the ptr is at the end of the list, NULL is returned.
*
* This primitive may safely run concurrently with the _rcu list-mutation
* primitives such as list_add_rcu() as long as it's guarded by rcu_read_lock().
*/
#define list_next_or_null_rcu(head, ptr, type, member) \
({ \
struct list_head *__head = (head); \
struct list_head *__ptr = (ptr); \
struct list_head *__next = READ_ONCE(__ptr->next); \
likely(__next != __head) ? list_entry_rcu(__next, type, \
member) : NULL; \
})
/**
* list_for_each_entry_rcu - iterate over rcu list of given type
* @pos: the type * to use as a loop cursor.

View File

@ -200,7 +200,9 @@ struct ucred {
#define AF_ALG 38 /* Algorithm sockets */
#define AF_NFC 39 /* NFC sockets */
#define AF_VSOCK 40 /* vSockets */
#define AF_MAX 41 /* For now.. */
#define AF_KCM 41 /* Kernel Connection Multiplexor*/
#define AF_MAX 42 /* For now.. */
/* Protocol families, same as address families. */
#define PF_UNSPEC AF_UNSPEC
@ -246,6 +248,7 @@ struct ucred {
#define PF_ALG AF_ALG
#define PF_NFC AF_NFC
#define PF_VSOCK AF_VSOCK
#define PF_KCM AF_KCM
#define PF_MAX AF_MAX
/* Maximum queue length specifiable by listen. */
@ -274,6 +277,7 @@ struct ucred {
#define MSG_MORE 0x8000 /* Sender will send more */
#define MSG_WAITFORONE 0x10000 /* recvmmsg(): block until 1+ packets avail */
#define MSG_SENDPAGE_NOTLAST 0x20000 /* sendpage() internal : not the last page */
#define MSG_BATCH 0x40000 /* sendmmsg(): more messages coming */
#define MSG_EOF MSG_FIN
#define MSG_FASTOPEN 0x20000000 /* Send data in TCP SYN */
@ -322,6 +326,7 @@ struct ucred {
#define SOL_CAIF 278
#define SOL_ALG 279
#define SOL_NFC 280
#define SOL_KCM 281
/* IPX options */
#define IPX_TYPE 1

226
include/net/kcm.h Normal file
View File

@ -0,0 +1,226 @@
/*
* Kernel Connection Multiplexor
*
* Copyright (c) 2016 Tom Herbert <tom@herbertland.com>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License version 2
* as published by the Free Software Foundation.
*/
#ifndef __NET_KCM_H_
#define __NET_KCM_H_
#include <linux/skbuff.h>
#include <net/sock.h>
#include <uapi/linux/kcm.h>
extern unsigned int kcm_net_id;
#define KCM_STATS_ADD(stat, count) ((stat) += (count))
#define KCM_STATS_INCR(stat) ((stat)++)
struct kcm_psock_stats {
unsigned long long rx_msgs;
unsigned long long rx_bytes;
unsigned long long tx_msgs;
unsigned long long tx_bytes;
unsigned int rx_aborts;
unsigned int rx_mem_fail;
unsigned int rx_need_more_hdr;
unsigned int rx_msg_too_big;
unsigned int rx_msg_timeouts;
unsigned int rx_bad_hdr_len;
unsigned long long reserved;
unsigned long long unreserved;
unsigned int tx_aborts;
};
struct kcm_mux_stats {
unsigned long long rx_msgs;
unsigned long long rx_bytes;
unsigned long long tx_msgs;
unsigned long long tx_bytes;
unsigned int rx_ready_drops;
unsigned int tx_retries;
unsigned int psock_attach;
unsigned int psock_unattach_rsvd;
unsigned int psock_unattach;
};
struct kcm_stats {
unsigned long long rx_msgs;
unsigned long long rx_bytes;
unsigned long long tx_msgs;
unsigned long long tx_bytes;
};
struct kcm_tx_msg {
unsigned int sent;
unsigned int fragidx;
unsigned int frag_offset;
unsigned int msg_flags;
struct sk_buff *frag_skb;
struct sk_buff *last_skb;
};
struct kcm_rx_msg {
int full_len;
int accum_len;
int offset;
int early_eaten;
};
/* Socket structure for KCM client sockets */
struct kcm_sock {
struct sock sk;
struct kcm_mux *mux;
struct list_head kcm_sock_list;
int index;
u32 done : 1;
struct work_struct done_work;
struct kcm_stats stats;
/* Transmit */
struct kcm_psock *tx_psock;
struct work_struct tx_work;
struct list_head wait_psock_list;
struct sk_buff *seq_skb;
/* Don't use bit fields here, these are set under different locks */
bool tx_wait;
bool tx_wait_more;
/* Receive */
struct kcm_psock *rx_psock;
struct list_head wait_rx_list; /* KCMs waiting for receiving */
bool rx_wait;
u32 rx_disabled : 1;
};
struct bpf_prog;
/* Structure for an attached lower socket */
struct kcm_psock {
struct sock *sk;
struct kcm_mux *mux;
int index;
u32 tx_stopped : 1;
u32 rx_stopped : 1;
u32 done : 1;
u32 unattaching : 1;
void (*save_state_change)(struct sock *sk);
void (*save_data_ready)(struct sock *sk);
void (*save_write_space)(struct sock *sk);
struct list_head psock_list;
struct kcm_psock_stats stats;
/* Receive */
struct sk_buff *rx_skb_head;
struct sk_buff **rx_skb_nextp;
struct sk_buff *ready_rx_msg;
struct list_head psock_ready_list;
struct work_struct rx_work;
struct delayed_work rx_delayed_work;
struct bpf_prog *bpf_prog;
struct kcm_sock *rx_kcm;
unsigned long long saved_rx_bytes;
unsigned long long saved_rx_msgs;
struct timer_list rx_msg_timer;
unsigned int rx_need_bytes;
/* Transmit */
struct kcm_sock *tx_kcm;
struct list_head psock_avail_list;
unsigned long long saved_tx_bytes;
unsigned long long saved_tx_msgs;
};
/* Per net MUX list */
struct kcm_net {
struct mutex mutex;
struct kcm_psock_stats aggregate_psock_stats;
struct kcm_mux_stats aggregate_mux_stats;
struct list_head mux_list;
int count;
};
/* Structure for a MUX */
struct kcm_mux {
struct list_head kcm_mux_list;
struct rcu_head rcu;
struct kcm_net *knet;
struct list_head kcm_socks; /* All KCM sockets on MUX */
int kcm_socks_cnt; /* Total KCM socket count for MUX */
struct list_head psocks; /* List of all psocks on MUX */
int psocks_cnt; /* Total attached sockets */
struct kcm_mux_stats stats;
struct kcm_psock_stats aggregate_psock_stats;
/* Receive */
spinlock_t rx_lock ____cacheline_aligned_in_smp;
struct list_head kcm_rx_waiters; /* KCMs waiting for receiving */
struct list_head psocks_ready; /* List of psocks with a msg ready */
struct sk_buff_head rx_hold_queue;
/* Transmit */
spinlock_t lock ____cacheline_aligned_in_smp; /* TX and mux locking */
struct list_head psocks_avail; /* List of available psocks */
struct list_head kcm_tx_waiters; /* KCMs waiting for a TX psock */
};
#ifdef CONFIG_PROC_FS
int kcm_proc_init(void);
void kcm_proc_exit(void);
#else
static int kcm_proc_init(void) { return 0; }
static void kcm_proc_exit(void) { }
#endif
static inline void aggregate_psock_stats(struct kcm_psock_stats *stats,
struct kcm_psock_stats *agg_stats)
{
/* Save psock statistics in the mux when psock is being unattached. */
#define SAVE_PSOCK_STATS(_stat) (agg_stats->_stat += stats->_stat)
SAVE_PSOCK_STATS(rx_msgs);
SAVE_PSOCK_STATS(rx_bytes);
SAVE_PSOCK_STATS(rx_aborts);
SAVE_PSOCK_STATS(rx_mem_fail);
SAVE_PSOCK_STATS(rx_need_more_hdr);
SAVE_PSOCK_STATS(rx_msg_too_big);
SAVE_PSOCK_STATS(rx_msg_timeouts);
SAVE_PSOCK_STATS(rx_bad_hdr_len);
SAVE_PSOCK_STATS(tx_msgs);
SAVE_PSOCK_STATS(tx_bytes);
SAVE_PSOCK_STATS(reserved);
SAVE_PSOCK_STATS(unreserved);
SAVE_PSOCK_STATS(tx_aborts);
#undef SAVE_PSOCK_STATS
}
static inline void aggregate_mux_stats(struct kcm_mux_stats *stats,
struct kcm_mux_stats *agg_stats)
{
/* Save psock statistics in the mux when psock is being unattached. */
#define SAVE_MUX_STATS(_stat) (agg_stats->_stat += stats->_stat)
SAVE_MUX_STATS(rx_msgs);
SAVE_MUX_STATS(rx_bytes);
SAVE_MUX_STATS(tx_msgs);
SAVE_MUX_STATS(tx_bytes);
SAVE_MUX_STATS(rx_ready_drops);
SAVE_MUX_STATS(psock_attach);
SAVE_MUX_STATS(psock_unattach_rsvd);
SAVE_MUX_STATS(psock_unattach);
#undef SAVE_MUX_STATS
}
#endif /* __NET_KCM_H_ */

View File

@ -1816,4 +1816,28 @@ static inline void skb_set_tcp_pure_ack(struct sk_buff *skb)
skb->truesize = 2;
}
static inline int tcp_inq(struct sock *sk)
{
struct tcp_sock *tp = tcp_sk(sk);
int answ;
if ((1 << sk->sk_state) & (TCPF_SYN_SENT | TCPF_SYN_RECV)) {
answ = 0;
} else if (sock_flag(sk, SOCK_URGINLINE) ||
!tp->urg_data ||
before(tp->urg_seq, tp->copied_seq) ||
!before(tp->urg_seq, tp->rcv_nxt)) {
answ = tp->rcv_nxt - tp->copied_seq;
/* Subtract 1, if FIN was received */
if (answ && sock_flag(sk, SOCK_DONE))
answ--;
} else {
answ = tp->urg_seq - tp->copied_seq;
}
return answ;
}
#endif /* _TCP_H */

40
include/uapi/linux/kcm.h Normal file
View File

@ -0,0 +1,40 @@
/*
* Kernel Connection Multiplexor
*
* Copyright (c) 2016 Tom Herbert <tom@herbertland.com>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License version 2
* as published by the Free Software Foundation.
*
* User API to clone KCM sockets and attach transport socket to a KCM
* multiplexor.
*/
#ifndef KCM_KERNEL_H
#define KCM_KERNEL_H
struct kcm_attach {
int fd;
int bpf_fd;
};
struct kcm_unattach {
int fd;
};
struct kcm_clone {
int fd;
};
#define SIOCKCMATTACH (SIOCPROTOPRIVATE + 0)
#define SIOCKCMUNATTACH (SIOCPROTOPRIVATE + 1)
#define SIOCKCMCLONE (SIOCPROTOPRIVATE + 2)
#define KCMPROTO_CONNECTED 0
/* Socket options */
#define KCM_RECV_DISABLE 1
#endif

View File

@ -360,6 +360,7 @@ source "net/can/Kconfig"
source "net/irda/Kconfig"
source "net/bluetooth/Kconfig"
source "net/rxrpc/Kconfig"
source "net/kcm/Kconfig"
config FIB_RULES
bool

View File

@ -34,6 +34,7 @@ obj-$(CONFIG_IRDA) += irda/
obj-$(CONFIG_BT) += bluetooth/
obj-$(CONFIG_SUNRPC) += sunrpc/
obj-$(CONFIG_AF_RXRPC) += rxrpc/
obj-$(CONFIG_AF_KCM) += kcm/
obj-$(CONFIG_ATM) += atm/
obj-$(CONFIG_L2TP) += l2tp/
obj-$(CONFIG_DECNET) += decnet/

View File

@ -1918,6 +1918,7 @@ static bool __skb_splice_bits(struct sk_buff *skb, struct pipe_inode_info *pipe,
struct splice_pipe_desc *spd, struct sock *sk)
{
int seg;
struct sk_buff *iter;
/* map the linear part :
* If skb->head_frag is set, this 'linear' part is backed by a
@ -1944,6 +1945,19 @@ static bool __skb_splice_bits(struct sk_buff *skb, struct pipe_inode_info *pipe,
return true;
}
skb_walk_frags(skb, iter) {
if (*offset >= iter->len) {
*offset -= iter->len;
continue;
}
/* __skb_splice_bits() only fails if the output has no room
* left, so no point in going over the frag_list for the error
* case.
*/
if (__skb_splice_bits(iter, pipe, offset, len, spd, sk))
return true;
}
return false;
}
@ -1970,9 +1984,7 @@ ssize_t skb_socket_splice(struct sock *sk,
/*
* Map data from the skb to a pipe. Should handle both the linear part,
* the fragments, and the frag list. It does NOT handle frag lists within
* the frag list, if such a thing exists. We'd probably need to recurse to
* handle that cleanly.
* the fragments, and the frag list.
*/
int skb_splice_bits(struct sk_buff *skb, struct sock *sk, unsigned int offset,
struct pipe_inode_info *pipe, unsigned int tlen,
@ -1991,29 +2003,10 @@ int skb_splice_bits(struct sk_buff *skb, struct sock *sk, unsigned int offset,
.ops = &nosteal_pipe_buf_ops,
.spd_release = sock_spd_release,
};
struct sk_buff *frag_iter;
int ret = 0;
/*
* __skb_splice_bits() only fails if the output has no room left,
* so no point in going over the frag_list for the error case.
*/
if (__skb_splice_bits(skb, pipe, &offset, &tlen, &spd, sk))
goto done;
else if (!tlen)
goto done;
__skb_splice_bits(skb, pipe, &offset, &tlen, &spd, sk);
/*
* now see if we have a frag_list to map
*/
skb_walk_frags(skb, frag_iter) {
if (!tlen)
break;
if (__skb_splice_bits(frag_iter, pipe, &offset, &tlen, &spd, sk))
break;
}
done:
if (spd.nr_pages)
ret = splice_cb(sk, pipe, &spd);

View File

@ -556,20 +556,7 @@ int tcp_ioctl(struct sock *sk, int cmd, unsigned long arg)
return -EINVAL;
slow = lock_sock_fast(sk);
if ((1 << sk->sk_state) & (TCPF_SYN_SENT | TCPF_SYN_RECV))
answ = 0;
else if (sock_flag(sk, SOCK_URGINLINE) ||
!tp->urg_data ||
before(tp->urg_seq, tp->copied_seq) ||
!before(tp->urg_seq, tp->rcv_nxt)) {
answ = tp->rcv_nxt - tp->copied_seq;
/* Subtract 1, if FIN was received */
if (answ && sock_flag(sk, SOCK_DONE))
answ--;
} else
answ = tp->urg_seq - tp->copied_seq;
answ = tcp_inq(sk);
unlock_sock_fast(sk, slow);
break;
case SIOCATMARK:

10
net/kcm/Kconfig Normal file
View File

@ -0,0 +1,10 @@
config AF_KCM
tristate "KCM sockets"
depends on INET
select BPF_SYSCALL
---help---
KCM (Kernel Connection Multiplexor) sockets provide a method
for multiplexing messages of a message based application
protocol over kernel connectons (e.g. TCP connections).

3
net/kcm/Makefile Normal file
View File

@ -0,0 +1,3 @@
obj-$(CONFIG_AF_KCM) += kcm.o
kcm-y := kcmsock.o kcmproc.o

426
net/kcm/kcmproc.c Normal file
View File

@ -0,0 +1,426 @@
#include <linux/in.h>
#include <linux/inet.h>
#include <linux/list.h>
#include <linux/module.h>
#include <linux/net.h>
#include <linux/proc_fs.h>
#include <linux/rculist.h>
#include <linux/seq_file.h>
#include <linux/socket.h>
#include <net/inet_sock.h>
#include <net/kcm.h>
#include <net/net_namespace.h>
#include <net/netns/generic.h>
#include <net/tcp.h>
#ifdef CONFIG_PROC_FS
struct kcm_seq_muxinfo {
char *name;
const struct file_operations *seq_fops;
const struct seq_operations seq_ops;
};
static struct kcm_mux *kcm_get_first(struct seq_file *seq)
{
struct net *net = seq_file_net(seq);
struct kcm_net *knet = net_generic(net, kcm_net_id);
return list_first_or_null_rcu(&knet->mux_list,
struct kcm_mux, kcm_mux_list);
}
static struct kcm_mux *kcm_get_next(struct kcm_mux *mux)
{
struct kcm_net *knet = mux->knet;
return list_next_or_null_rcu(&knet->mux_list, &mux->kcm_mux_list,
struct kcm_mux, kcm_mux_list);
}
static struct kcm_mux *kcm_get_idx(struct seq_file *seq, loff_t pos)
{
struct net *net = seq_file_net(seq);
struct kcm_net *knet = net_generic(net, kcm_net_id);
struct kcm_mux *m;
list_for_each_entry_rcu(m, &knet->mux_list, kcm_mux_list) {
if (!pos)
return m;
--pos;
}
return NULL;
}
static void *kcm_seq_next(struct seq_file *seq, void *v, loff_t *pos)
{
void *p;
if (v == SEQ_START_TOKEN)
p = kcm_get_first(seq);
else
p = kcm_get_next(v);
++*pos;
return p;
}
static void *kcm_seq_start(struct seq_file *seq, loff_t *pos)
__acquires(rcu)
{
rcu_read_lock();
if (!*pos)
return SEQ_START_TOKEN;
else
return kcm_get_idx(seq, *pos - 1);
}
static void kcm_seq_stop(struct seq_file *seq, void *v)
__releases(rcu)
{
rcu_read_unlock();
}
struct kcm_proc_mux_state {
struct seq_net_private p;
int idx;
};
static int kcm_seq_open(struct inode *inode, struct file *file)
{
struct kcm_seq_muxinfo *muxinfo = PDE_DATA(inode);
int err;
err = seq_open_net(inode, file, &muxinfo->seq_ops,
sizeof(struct kcm_proc_mux_state));
if (err < 0)
return err;
return err;
}
static void kcm_format_mux_header(struct seq_file *seq)
{
struct net *net = seq_file_net(seq);
struct kcm_net *knet = net_generic(net, kcm_net_id);
seq_printf(seq,
"*** KCM statistics (%d MUX) ****\n",
knet->count);
seq_printf(seq,
"%-14s %-10s %-16s %-10s %-16s %-8s %-8s %-8s %-8s %s",
"Object",
"RX-Msgs",
"RX-Bytes",
"TX-Msgs",
"TX-Bytes",
"Recv-Q",
"Rmem",
"Send-Q",
"Smem",
"Status");
/* XXX: pdsts header stuff here */
seq_puts(seq, "\n");
}
static void kcm_format_sock(struct kcm_sock *kcm, struct seq_file *seq,
int i, int *len)
{
seq_printf(seq,
" kcm-%-7u %-10llu %-16llu %-10llu %-16llu %-8d %-8d %-8d %-8s ",
kcm->index,
kcm->stats.rx_msgs,
kcm->stats.rx_bytes,
kcm->stats.tx_msgs,
kcm->stats.tx_bytes,
kcm->sk.sk_receive_queue.qlen,
sk_rmem_alloc_get(&kcm->sk),
kcm->sk.sk_write_queue.qlen,
"-");
if (kcm->tx_psock)
seq_printf(seq, "Psck-%u ", kcm->tx_psock->index);
if (kcm->tx_wait)
seq_puts(seq, "TxWait ");
if (kcm->tx_wait_more)
seq_puts(seq, "WMore ");
if (kcm->rx_wait)
seq_puts(seq, "RxWait ");
seq_puts(seq, "\n");
}
static void kcm_format_psock(struct kcm_psock *psock, struct seq_file *seq,
int i, int *len)
{
seq_printf(seq,
" psock-%-5u %-10llu %-16llu %-10llu %-16llu %-8d %-8d %-8d %-8d ",
psock->index,
psock->stats.rx_msgs,
psock->stats.rx_bytes,
psock->stats.tx_msgs,
psock->stats.tx_bytes,
psock->sk->sk_receive_queue.qlen,
atomic_read(&psock->sk->sk_rmem_alloc),
psock->sk->sk_write_queue.qlen,
atomic_read(&psock->sk->sk_wmem_alloc));
if (psock->done)
seq_puts(seq, "Done ");
if (psock->tx_stopped)
seq_puts(seq, "TxStop ");
if (psock->rx_stopped)
seq_puts(seq, "RxStop ");
if (psock->tx_kcm)
seq_printf(seq, "Rsvd-%d ", psock->tx_kcm->index);
if (psock->ready_rx_msg)
seq_puts(seq, "RdyRx ");
seq_puts(seq, "\n");
}
static void
kcm_format_mux(struct kcm_mux *mux, loff_t idx, struct seq_file *seq)
{
int i, len;
struct kcm_sock *kcm;
struct kcm_psock *psock;
/* mux information */
seq_printf(seq,
"%-6s%-8s %-10llu %-16llu %-10llu %-16llu %-8s %-8s %-8s %-8s ",
"mux", "",
mux->stats.rx_msgs,
mux->stats.rx_bytes,
mux->stats.tx_msgs,
mux->stats.tx_bytes,
"-", "-", "-", "-");
seq_printf(seq, "KCMs: %d, Psocks %d\n",
mux->kcm_socks_cnt, mux->psocks_cnt);
/* kcm sock information */
i = 0;
spin_lock_bh(&mux->lock);
list_for_each_entry(kcm, &mux->kcm_socks, kcm_sock_list) {
kcm_format_sock(kcm, seq, i, &len);
i++;
}
i = 0;
list_for_each_entry(psock, &mux->psocks, psock_list) {
kcm_format_psock(psock, seq, i, &len);
i++;
}
spin_unlock_bh(&mux->lock);
}
static int kcm_seq_show(struct seq_file *seq, void *v)
{
struct kcm_proc_mux_state *mux_state;
mux_state = seq->private;
if (v == SEQ_START_TOKEN) {
mux_state->idx = 0;
kcm_format_mux_header(seq);
} else {
kcm_format_mux(v, mux_state->idx, seq);
mux_state->idx++;
}
return 0;
}
static const struct file_operations kcm_seq_fops = {
.owner = THIS_MODULE,
.open = kcm_seq_open,
.read = seq_read,
.llseek = seq_lseek,
};
static struct kcm_seq_muxinfo kcm_seq_muxinfo = {
.name = "kcm",
.seq_fops = &kcm_seq_fops,
.seq_ops = {
.show = kcm_seq_show,
.start = kcm_seq_start,
.next = kcm_seq_next,
.stop = kcm_seq_stop,
}
};
static int kcm_proc_register(struct net *net, struct kcm_seq_muxinfo *muxinfo)
{
struct proc_dir_entry *p;
int rc = 0;
p = proc_create_data(muxinfo->name, S_IRUGO, net->proc_net,
muxinfo->seq_fops, muxinfo);
if (!p)
rc = -ENOMEM;
return rc;
}
EXPORT_SYMBOL(kcm_proc_register);
static void kcm_proc_unregister(struct net *net,
struct kcm_seq_muxinfo *muxinfo)
{
remove_proc_entry(muxinfo->name, net->proc_net);
}
EXPORT_SYMBOL(kcm_proc_unregister);
static int kcm_stats_seq_show(struct seq_file *seq, void *v)
{
struct kcm_psock_stats psock_stats;
struct kcm_mux_stats mux_stats;
struct kcm_mux *mux;
struct kcm_psock *psock;
struct net *net = seq->private;
struct kcm_net *knet = net_generic(net, kcm_net_id);
memset(&mux_stats, 0, sizeof(mux_stats));
memset(&psock_stats, 0, sizeof(psock_stats));
mutex_lock(&knet->mutex);
aggregate_mux_stats(&knet->aggregate_mux_stats, &mux_stats);
aggregate_psock_stats(&knet->aggregate_psock_stats,
&psock_stats);
list_for_each_entry_rcu(mux, &knet->mux_list, kcm_mux_list) {
spin_lock_bh(&mux->lock);
aggregate_mux_stats(&mux->stats, &mux_stats);
aggregate_psock_stats(&mux->aggregate_psock_stats,
&psock_stats);
list_for_each_entry(psock, &mux->psocks, psock_list)
aggregate_psock_stats(&psock->stats, &psock_stats);
spin_unlock_bh(&mux->lock);
}
mutex_unlock(&knet->mutex);
seq_printf(seq,
"%-8s %-10s %-16s %-10s %-16s %-10s %-10s %-10s %-10s %-10s\n",
"MUX",
"RX-Msgs",
"RX-Bytes",
"TX-Msgs",
"TX-Bytes",
"TX-Retries",
"Attach",
"Unattach",
"UnattchRsvd",
"RX-RdyDrops");
seq_printf(seq,
"%-8s %-10llu %-16llu %-10llu %-16llu %-10u %-10u %-10u %-10u %-10u\n",
"",
mux_stats.rx_msgs,
mux_stats.rx_bytes,
mux_stats.tx_msgs,
mux_stats.tx_bytes,
mux_stats.tx_retries,
mux_stats.psock_attach,
mux_stats.psock_unattach_rsvd,
mux_stats.psock_unattach,
mux_stats.rx_ready_drops);
seq_printf(seq,
"%-8s %-10s %-16s %-10s %-16s %-10s %-10s %-10s %-10s %-10s %-10s %-10s %-10s %-10s\n",
"Psock",
"RX-Msgs",
"RX-Bytes",
"TX-Msgs",
"TX-Bytes",
"Reserved",
"Unreserved",
"RX-Aborts",
"RX-MemFail",
"RX-NeedMor",
"RX-BadLen",
"RX-TooBig",
"RX-Timeout",
"TX-Aborts");
seq_printf(seq,
"%-8s %-10llu %-16llu %-10llu %-16llu %-10llu %-10llu %-10u %-10u %-10u %-10u %-10u %-10u %-10u\n",
"",
psock_stats.rx_msgs,
psock_stats.rx_bytes,
psock_stats.tx_msgs,
psock_stats.tx_bytes,
psock_stats.reserved,
psock_stats.unreserved,
psock_stats.rx_aborts,
psock_stats.rx_mem_fail,
psock_stats.rx_need_more_hdr,
psock_stats.rx_bad_hdr_len,
psock_stats.rx_msg_too_big,
psock_stats.rx_msg_timeouts,
psock_stats.tx_aborts);
return 0;
}
static int kcm_stats_seq_open(struct inode *inode, struct file *file)
{
return single_open_net(inode, file, kcm_stats_seq_show);
}
static const struct file_operations kcm_stats_seq_fops = {
.owner = THIS_MODULE,
.open = kcm_stats_seq_open,
.read = seq_read,
.llseek = seq_lseek,
.release = single_release_net,
};
static int kcm_proc_init_net(struct net *net)
{
int err;
if (!proc_create("kcm_stats", S_IRUGO, net->proc_net,
&kcm_stats_seq_fops)) {
err = -ENOMEM;
goto out_kcm_stats;
}
err = kcm_proc_register(net, &kcm_seq_muxinfo);
if (err)
goto out_kcm;
return 0;
out_kcm:
remove_proc_entry("kcm_stats", net->proc_net);
out_kcm_stats:
return err;
}
static void kcm_proc_exit_net(struct net *net)
{
kcm_proc_unregister(net, &kcm_seq_muxinfo);
remove_proc_entry("kcm_stats", net->proc_net);
}
static struct pernet_operations kcm_net_ops = {
.init = kcm_proc_init_net,
.exit = kcm_proc_exit_net,
};
int __init kcm_proc_init(void)
{
return register_pernet_subsys(&kcm_net_ops);
}
void __exit kcm_proc_exit(void)
{
unregister_pernet_subsys(&kcm_net_ops);
}
#endif /* CONFIG_PROC_FS */

2409
net/kcm/kcmsock.c Normal file

File diff suppressed because it is too large Load Diff

View File

@ -533,7 +533,7 @@ static const struct inode_operations sockfs_inode_ops = {
* NULL is returned.
*/
static struct socket *sock_alloc(void)
struct socket *sock_alloc(void)
{
struct inode *inode;
struct socket *sock;
@ -554,6 +554,7 @@ static struct socket *sock_alloc(void)
this_cpu_add(sockets_in_use, 1);
return sock;
}
EXPORT_SYMBOL(sock_alloc);
/**
* sock_release - close a socket
@ -1874,7 +1875,8 @@ static int copy_msghdr_from_user(struct msghdr *kmsg,
static int ___sys_sendmsg(struct socket *sock, struct user_msghdr __user *msg,
struct msghdr *msg_sys, unsigned int flags,
struct used_address *used_address)
struct used_address *used_address,
unsigned int allowed_msghdr_flags)
{
struct compat_msghdr __user *msg_compat =
(struct compat_msghdr __user *)msg;
@ -1900,6 +1902,7 @@ static int ___sys_sendmsg(struct socket *sock, struct user_msghdr __user *msg,
if (msg_sys->msg_controllen > INT_MAX)
goto out_freeiov;
flags |= (msg_sys->msg_flags & allowed_msghdr_flags);
ctl_len = msg_sys->msg_controllen;
if ((MSG_CMSG_COMPAT & flags) && ctl_len) {
err =
@ -1978,7 +1981,7 @@ long __sys_sendmsg(int fd, struct user_msghdr __user *msg, unsigned flags)
if (!sock)
goto out;
err = ___sys_sendmsg(sock, msg, &msg_sys, flags, NULL);
err = ___sys_sendmsg(sock, msg, &msg_sys, flags, NULL, 0);
fput_light(sock->file, fput_needed);
out:
@ -2005,6 +2008,7 @@ int __sys_sendmmsg(int fd, struct mmsghdr __user *mmsg, unsigned int vlen,
struct compat_mmsghdr __user *compat_entry;
struct msghdr msg_sys;
struct used_address used_address;
unsigned int oflags = flags;
if (vlen > UIO_MAXIOV)
vlen = UIO_MAXIOV;
@ -2019,11 +2023,15 @@ int __sys_sendmmsg(int fd, struct mmsghdr __user *mmsg, unsigned int vlen,
entry = mmsg;
compat_entry = (struct compat_mmsghdr __user *)mmsg;
err = 0;
flags |= MSG_BATCH;
while (datagrams < vlen) {
if (datagrams == vlen - 1)
flags = oflags;
if (MSG_CMSG_COMPAT & flags) {
err = ___sys_sendmsg(sock, (struct user_msghdr __user *)compat_entry,
&msg_sys, flags, &used_address);
&msg_sys, flags, &used_address, MSG_EOR);
if (err < 0)
break;
err = __put_user(err, &compat_entry->msg_len);
@ -2031,7 +2039,7 @@ int __sys_sendmmsg(int fd, struct mmsghdr __user *mmsg, unsigned int vlen,
} else {
err = ___sys_sendmsg(sock,
(struct user_msghdr __user *)entry,
&msg_sys, flags, &used_address);
&msg_sys, flags, &used_address, MSG_EOR);
if (err < 0)
break;
err = put_user(err, &entry->msg_len);