Threaded IO: read side WIP 2.

This commit is contained in:
antirez 2019-03-31 15:58:54 +02:00
parent dd5b105c73
commit a2245f8ff1
1 changed files with 23 additions and 7 deletions

View File

@ -2496,13 +2496,16 @@ int processEventsWhileBlocked(void) {
int tio_debug = 0;
#define SERVER_MAX_IO_THREADS 32
#define IO_THREADS_MAX_NUM 128
#define IO_THREADS_OP_READ 0
#define IO_THREADS_OP_WRITE 1
pthread_t io_threads[SERVER_MAX_IO_THREADS];
pthread_mutex_t io_threads_mutex[SERVER_MAX_IO_THREADS];
_Atomic unsigned long io_threads_pending[SERVER_MAX_IO_THREADS];
int io_threads_active;
list *io_threads_list[SERVER_MAX_IO_THREADS];
pthread_t io_threads[IO_THREADS_MAX_NUM];
pthread_mutex_t io_threads_mutex[IO_THREADS_MAX_NUM];
_Atomic unsigned long io_threads_pending[IO_THREADS_MAX_NUM];
int io_threads_active; /* Are the threads currently spinning waiting I/O? */
int io_threads_op; /* IO_THREADS_OP_WRITE or IO_THREADS_OP_READ. */
list *io_threads_list[IO_THREADS_MAX_NUM];
void *IOThreadMain(void *myid) {
/* The ID is the thread number (from 0 to server.iothreads_num-1), and is
@ -2533,7 +2536,11 @@ void *IOThreadMain(void *myid) {
listRewind(io_threads_list[id],&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
writeToClient(c->fd,c,0);
if (io_threads_op == IO_THREADS_OP_WRITE) {
writeToClient(c->fd,c,0);
} else {
readQueryFromClient(NULL,c->fd,c,0);
}
}
listEmpty(io_threads_list[id]);
io_threads_pending[id] = 0;
@ -2550,6 +2557,12 @@ void initThreadedIO(void) {
* we'll handle I/O directly from the main thread. */
if (server.io_threads_num == 1) return;
if (server.io_threads_num > IO_THREADS_MAX_NUM) {
serverLog(LL_WARNING,"Fatal: too many I/O threads configured. "
"The maximum number is %d.", IO_THREADS_MAX_NUM);
exit(1);
}
/* Spawn the I/O threads. */
for (int i = 0; i < server.io_threads_num; i++) {
pthread_t tid;
@ -2684,3 +2697,6 @@ int postponeClientRead(client *c) {
return 0;
}
}
int handleClientsWithPendingReadsUsingThreads(void) {
}