libsysutils: Add iovec/runOnEachSocket
SocketClient: * Replace sendDataLocked with sendDataLockedv which takes an iovec. * Add a version of sendData, sendDatav, which takes an iovec. * do not preserve iovec content through sendDatav SocketListener: * Add runOnEachSocket, which allows to to specify a SocketClientCommand to run individually on each socket. This allows you to do broadcast-like actions customized for each individual socket. * Client safe list reference counting for sendBroadcast & runOnEach Socket Signed-off-by: Nick Kralevich <nnk@google.com> Signed-off-by: Mark Salyzyn <salyzyn@google.com> Change-Id: I716f89c01b4cb7af900045c7e41fac1492defb06
This commit is contained in:
parent
349f894e52
commit
a6e965578e
|
@ -6,22 +6,23 @@
|
|||
#include <pthread.h>
|
||||
#include <cutils/atomic.h>
|
||||
#include <sys/types.h>
|
||||
#include <sys/uio.h>
|
||||
|
||||
class SocketClient {
|
||||
int mSocket;
|
||||
bool mSocketOwned;
|
||||
pthread_mutex_t mWriteMutex;
|
||||
|
||||
/* Peer process ID */
|
||||
// Peer process ID
|
||||
pid_t mPid;
|
||||
|
||||
/* Peer user ID */
|
||||
// Peer user ID
|
||||
uid_t mUid;
|
||||
|
||||
/* Peer group ID */
|
||||
// Peer group ID
|
||||
gid_t mGid;
|
||||
|
||||
/* Reference count (starts at 1) */
|
||||
// Reference count (starts at 1)
|
||||
pthread_mutex_t mRefCountMutex;
|
||||
int mRefCount;
|
||||
|
||||
|
@ -38,12 +39,15 @@ public:
|
|||
pid_t getPid() const { return mPid; }
|
||||
uid_t getUid() const { return mUid; }
|
||||
gid_t getGid() const { return mGid; }
|
||||
void setCmdNum(int cmdNum) { android_atomic_release_store(cmdNum, &mCmdNum); }
|
||||
void setCmdNum(int cmdNum) {
|
||||
android_atomic_release_store(cmdNum, &mCmdNum);
|
||||
}
|
||||
int getCmdNum() { return mCmdNum; }
|
||||
|
||||
// Send null-terminated C strings:
|
||||
int sendMsg(int code, const char *msg, bool addErrno);
|
||||
int sendMsg(int code, const char *msg, bool addErrno, bool useCmdNum);
|
||||
int sendMsg(const char *msg);
|
||||
|
||||
// Provides a mechanism to send a response code to the client.
|
||||
// Sends the code and a null character.
|
||||
|
@ -56,6 +60,8 @@ public:
|
|||
|
||||
// Sending binary data:
|
||||
int sendData(const void *data, int len);
|
||||
// iovec contents not preserved through call
|
||||
int sendDatav(struct iovec *iov, int iovcnt);
|
||||
|
||||
// Optional reference counting. Reference count starts at 1. If
|
||||
// it's decremented to 0, it deletes itself.
|
||||
|
@ -64,19 +70,18 @@ public:
|
|||
void incRef();
|
||||
bool decRef(); // returns true at 0 (but note: SocketClient already deleted)
|
||||
|
||||
// return a new string in quotes with '\\' and '\"' escaped for "my arg" transmissions
|
||||
// return a new string in quotes with '\\' and '\"' escaped for "my arg"
|
||||
// transmissions
|
||||
static char *quoteArg(const char *arg);
|
||||
|
||||
private:
|
||||
// Send null-terminated C strings
|
||||
int sendMsg(const char *msg);
|
||||
void init(int socket, bool owned, bool useCmdNum);
|
||||
|
||||
// Sending binary data. The caller should use make sure this is protected
|
||||
// Sending binary data. The caller should make sure this is protected
|
||||
// from multiple threads entering simultaneously.
|
||||
// returns 0 if successful, -1 if there is a 0 byte write and -2 if any other
|
||||
// error occurred (use errno to get the error)
|
||||
int sendDataLocked(const void *data, int len);
|
||||
// returns 0 if successful, -1 if there is a 0 byte write or if any
|
||||
// other error occurred (use errno to get the error)
|
||||
int sendDataLockedv(struct iovec *iov, int iovcnt);
|
||||
};
|
||||
|
||||
typedef android::sysutils::List<SocketClient *> SocketClientCollection;
|
||||
|
|
|
@ -0,0 +1,27 @@
|
|||
/*
|
||||
* Copyright (C) 2012 The Android Open Source Project
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
#ifndef _SOCKETCLIENTCOMMAND_H
|
||||
#define _SOCKETCLIENTCOMMAND_H
|
||||
|
||||
#include <sysutils/SocketClient.h>
|
||||
|
||||
class SocketClientCommand {
|
||||
public:
|
||||
virtual ~SocketClientCommand() { }
|
||||
virtual void runSocketCommand(SocketClient *client) = 0;
|
||||
};
|
||||
|
||||
#endif
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright (C) 2008 The Android Open Source Project
|
||||
* Copyright (C) 2008-2014 The Android Open Source Project
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
@ -19,6 +19,7 @@
|
|||
#include <pthread.h>
|
||||
|
||||
#include <sysutils/SocketClient.h>
|
||||
#include "SocketClientCommand.h"
|
||||
|
||||
class SocketListener {
|
||||
bool mListen;
|
||||
|
@ -41,6 +42,8 @@ public:
|
|||
|
||||
void sendBroadcast(int code, const char *msg, bool addErrno);
|
||||
|
||||
void runOnEachSocket(SocketClientCommand *command);
|
||||
|
||||
protected:
|
||||
virtual bool onDataAvailable(SocketClient *c) = 0;
|
||||
|
||||
|
|
|
@ -71,7 +71,7 @@ int SocketClient::sendMsg(int code, const char *msg, bool addErrno, bool useCmdN
|
|||
ret = asprintf(&buf, "%d %s", code, msg);
|
||||
}
|
||||
}
|
||||
/* Send the zero-terminated message */
|
||||
// Send the zero-terminated message
|
||||
if (ret != -1) {
|
||||
ret = sendMsg(buf);
|
||||
free(buf);
|
||||
|
@ -79,22 +79,25 @@ int SocketClient::sendMsg(int code, const char *msg, bool addErrno, bool useCmdN
|
|||
return ret;
|
||||
}
|
||||
|
||||
/** send 3-digit code, null, binary-length, binary data */
|
||||
// send 3-digit code, null, binary-length, binary data
|
||||
int SocketClient::sendBinaryMsg(int code, const void *data, int len) {
|
||||
|
||||
/* 4 bytes for the code & null + 4 bytes for the len */
|
||||
// 4 bytes for the code & null + 4 bytes for the len
|
||||
char buf[8];
|
||||
/* Write the code */
|
||||
// Write the code
|
||||
snprintf(buf, 4, "%.3d", code);
|
||||
/* Write the len */
|
||||
// Write the len
|
||||
uint32_t tmp = htonl(len);
|
||||
memcpy(buf + 4, &tmp, sizeof(uint32_t));
|
||||
|
||||
struct iovec vec[2];
|
||||
vec[0].iov_base = (void *) buf;
|
||||
vec[0].iov_len = sizeof(buf);
|
||||
vec[1].iov_base = (void *) data;
|
||||
vec[1].iov_len = len;
|
||||
|
||||
pthread_mutex_lock(&mWriteMutex);
|
||||
int result = sendDataLocked(buf, sizeof(buf));
|
||||
if (result == 0 && len > 0) {
|
||||
result = sendDataLocked(data, len);
|
||||
}
|
||||
int result = sendDataLockedv(vec, (len > 0) ? 2 : 1);
|
||||
pthread_mutex_unlock(&mWriteMutex);
|
||||
|
||||
return result;
|
||||
|
@ -147,33 +150,51 @@ int SocketClient::sendMsg(const char *msg) {
|
|||
}
|
||||
|
||||
int SocketClient::sendData(const void *data, int len) {
|
||||
struct iovec vec[1];
|
||||
vec[0].iov_base = (void *) data;
|
||||
vec[0].iov_len = len;
|
||||
|
||||
pthread_mutex_lock(&mWriteMutex);
|
||||
int rc = sendDataLocked(data, len);
|
||||
int rc = sendDataLockedv(vec, 1);
|
||||
pthread_mutex_unlock(&mWriteMutex);
|
||||
|
||||
return rc;
|
||||
}
|
||||
|
||||
int SocketClient::sendDataLocked(const void *data, int len) {
|
||||
int rc = 0;
|
||||
const char *p = (const char*) data;
|
||||
int brtw = len;
|
||||
int SocketClient::sendDatav(struct iovec *iov, int iovcnt) {
|
||||
pthread_mutex_lock(&mWriteMutex);
|
||||
int rc = sendDataLockedv(iov, iovcnt);
|
||||
pthread_mutex_unlock(&mWriteMutex);
|
||||
|
||||
return rc;
|
||||
}
|
||||
|
||||
int SocketClient::sendDataLockedv(struct iovec *iov, int iovcnt) {
|
||||
|
||||
if (mSocket < 0) {
|
||||
errno = EHOSTUNREACH;
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (len == 0) {
|
||||
if (iovcnt <= 0) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
while (brtw > 0) {
|
||||
rc = send(mSocket, p, brtw, MSG_NOSIGNAL);
|
||||
int current = 0;
|
||||
|
||||
for (;;) {
|
||||
ssize_t rc = writev(mSocket, iov + current, iovcnt - current);
|
||||
if (rc > 0) {
|
||||
p += rc;
|
||||
brtw -= rc;
|
||||
size_t written = rc;
|
||||
while ((current < iovcnt) && (written >= iov[current].iov_len)) {
|
||||
written -= iov[current].iov_len;
|
||||
current++;
|
||||
}
|
||||
if (current == iovcnt) {
|
||||
break;
|
||||
}
|
||||
iov[current].iov_base = (char *)iov[current].iov_base + written;
|
||||
iov[current].iov_len -= written;
|
||||
continue;
|
||||
}
|
||||
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright (C) 2008 The Android Open Source Project
|
||||
* Copyright (C) 2008-2014 The Android Open Source Project
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
@ -164,6 +164,7 @@ void SocketListener::runListener() {
|
|||
|
||||
pthread_mutex_lock(&mClientsLock);
|
||||
for (it = mClients->begin(); it != mClients->end(); ++it) {
|
||||
// NB: calling out to an other object with mClientsLock held (safe)
|
||||
int fd = (*it)->getSocket();
|
||||
FD_SET(fd, &read_fds);
|
||||
if (fd > max)
|
||||
|
@ -206,9 +207,12 @@ void SocketListener::runListener() {
|
|||
pendingList->clear();
|
||||
pthread_mutex_lock(&mClientsLock);
|
||||
for (it = mClients->begin(); it != mClients->end(); ++it) {
|
||||
int fd = (*it)->getSocket();
|
||||
SocketClient* c = *it;
|
||||
// NB: calling out to an other object with mClientsLock held (safe)
|
||||
int fd = c->getSocket();
|
||||
if (FD_ISSET(fd, &read_fds)) {
|
||||
pendingList->push_back(*it);
|
||||
pendingList->push_back(c);
|
||||
c->incRef();
|
||||
}
|
||||
}
|
||||
pthread_mutex_unlock(&mClientsLock);
|
||||
|
@ -236,20 +240,61 @@ void SocketListener::runListener() {
|
|||
/* Remove our reference to the client */
|
||||
c->decRef();
|
||||
}
|
||||
c->decRef();
|
||||
}
|
||||
}
|
||||
delete pendingList;
|
||||
}
|
||||
|
||||
void SocketListener::sendBroadcast(int code, const char *msg, bool addErrno) {
|
||||
SocketClientCollection safeList;
|
||||
|
||||
/* Add all active clients to the safe list first */
|
||||
safeList.clear();
|
||||
pthread_mutex_lock(&mClientsLock);
|
||||
SocketClientCollection::iterator i;
|
||||
|
||||
for (i = mClients->begin(); i != mClients->end(); ++i) {
|
||||
// broadcasts are unsolicited and should not include a cmd number
|
||||
if ((*i)->sendMsg(code, msg, addErrno, false)) {
|
||||
SLOGW("Error sending broadcast (%s)", strerror(errno));
|
||||
}
|
||||
SocketClient* c = *i;
|
||||
c->incRef();
|
||||
safeList.push_back(c);
|
||||
}
|
||||
pthread_mutex_unlock(&mClientsLock);
|
||||
|
||||
while (!safeList.empty()) {
|
||||
/* Pop the first item from the list */
|
||||
i = safeList.begin();
|
||||
SocketClient* c = *i;
|
||||
safeList.erase(i);
|
||||
// broadcasts are unsolicited and should not include a cmd number
|
||||
if (c->sendMsg(code, msg, addErrno, false)) {
|
||||
SLOGW("Error sending broadcast (%s)", strerror(errno));
|
||||
}
|
||||
c->decRef();
|
||||
}
|
||||
}
|
||||
|
||||
void SocketListener::runOnEachSocket(SocketClientCommand *command) {
|
||||
SocketClientCollection safeList;
|
||||
|
||||
/* Add all active clients to the safe list first */
|
||||
safeList.clear();
|
||||
pthread_mutex_lock(&mClientsLock);
|
||||
SocketClientCollection::iterator i;
|
||||
|
||||
for (i = mClients->begin(); i != mClients->end(); ++i) {
|
||||
SocketClient* c = *i;
|
||||
c->incRef();
|
||||
safeList.push_back(c);
|
||||
}
|
||||
pthread_mutex_unlock(&mClientsLock);
|
||||
|
||||
while (!safeList.empty()) {
|
||||
/* Pop the first item from the list */
|
||||
i = safeList.begin();
|
||||
SocketClient* c = *i;
|
||||
safeList.erase(i);
|
||||
command->runSocketCommand(c);
|
||||
c->decRef();
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue