From d9c9e138f22c48626f719f880920e04c639e0177 Mon Sep 17 00:00:00 2001 From: Ossi Herrala Date: Mon, 20 Jul 2015 12:44:32 +0000 Subject: [PATCH] rpc: Fix slow volume download (virsh vol-download) Use I/O vector (iovec) instead of one huge memory buffer as suggested in https://bugzilla.redhat.com/show_bug.cgi?id=1026137#c7. This avoids doing memmove() to big buffers and performance doesn't degrade if source (virNetClientStreamQueuePacket()) is faster than sink (virNetClientStreamRecvPacket()). Resolves: http://bugzilla.redhat.com/1026137 Signed-off-by: Martin Kletzander --- src/rpc/virnetclientstream.c | 152 +++++++++++++++++++++++------------ 1 file changed, 99 insertions(+), 53 deletions(-) diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c index b428f4b4d5..1cc9002b0a 100644 --- a/src/rpc/virnetclientstream.c +++ b/src/rpc/virnetclientstream.c @@ -49,9 +49,9 @@ struct _virNetClientStream { * time by stopping consuming any incoming data * off the socket.... */ - char *incoming; - size_t incomingOffset; - size_t incomingLength; + struct iovec *incomingVec; /* I/O Vector to hold data */ + size_t writeVec; /* Vectors produced */ + size_t readVec; /* Vectors consumed */ bool incomingEOF; virNetClientStreamEventCallback cb; @@ -86,9 +86,9 @@ virNetClientStreamEventTimerUpdate(virNetClientStreamPtr st) if (!st->cb) return; - VIR_DEBUG("Check timer offset=%zu %d", st->incomingOffset, st->cbEvents); + VIR_DEBUG("Check timer readVec %zu writeVec %zu %d", st->readVec, st->writeVec, st->cbEvents); - if (((st->incomingOffset || st->incomingEOF) && + if ((((st->readVec < st->writeVec) || st->incomingEOF) && (st->cbEvents & VIR_STREAM_EVENT_READABLE)) || (st->cbEvents & VIR_STREAM_EVENT_WRITABLE)) { VIR_DEBUG("Enabling event timer"); @@ -110,13 +110,14 @@ virNetClientStreamEventTimer(int timer ATTRIBUTE_UNUSED, void *opaque) if (st->cb && (st->cbEvents & VIR_STREAM_EVENT_READABLE) && - (st->incomingOffset || st->incomingEOF)) + ((st->readVec < st->writeVec) || st->incomingEOF)) events |= VIR_STREAM_EVENT_READABLE; if (st->cb && (st->cbEvents & VIR_STREAM_EVENT_WRITABLE)) events |= VIR_STREAM_EVENT_WRITABLE; - VIR_DEBUG("Got Timer dispatch %d %d offset=%zu", events, st->cbEvents, st->incomingOffset); + VIR_DEBUG("Got Timer dispatch %d %d readVec %zu writeVec %zu", events, st->cbEvents, + st->readVec, st->writeVec); if (events) { virNetClientStreamEventCallback cb = st->cb; void *cbOpaque = st->cbOpaque; @@ -161,7 +162,7 @@ void virNetClientStreamDispose(void *obj) virNetClientStreamPtr st = obj; virResetError(&st->err); - VIR_FREE(st->incoming); + VIR_FREE(st->incomingVec); virObjectUnref(st->prog); } @@ -265,38 +266,50 @@ int virNetClientStreamQueuePacket(virNetClientStreamPtr st, virNetMessagePtr msg) { int ret = -1; - size_t need; + struct iovec iov; + char *base; + size_t piece, pieces, length, offset = 0, size = 1024*1024; virObjectLock(st); - need = msg->bufferLength - msg->bufferOffset; - if (need) { - size_t avail = st->incomingLength - st->incomingOffset; - if (need > avail) { - size_t extra = need - avail; - if (VIR_REALLOC_N(st->incoming, - st->incomingLength + extra) < 0) { - VIR_DEBUG("Out of memory handling stream data"); - goto cleanup; - } - st->incomingLength += extra; - } - memcpy(st->incoming + st->incomingOffset, - msg->buffer + msg->bufferOffset, - msg->bufferLength - msg->bufferOffset); - st->incomingOffset += (msg->bufferLength - msg->bufferOffset); - } else { + length = msg->bufferLength - msg->bufferOffset; + + if (length == 0) { st->incomingEOF = true; + goto end; } - VIR_DEBUG("Stream incoming data offset %zu length %zu EOF %d", - st->incomingOffset, st->incomingLength, - st->incomingEOF); - virNetClientStreamEventTimerUpdate(st); + pieces = (length + size - 1) / size; + for (piece = 0; piece < pieces; piece++) { + if (size > length - offset) + size = length - offset; + if (VIR_ALLOC_N(base, size)) { + VIR_DEBUG("Allocation failed"); + goto cleanup; + } + + memcpy(base, msg->buffer + msg->bufferOffset + offset, size); + iov.iov_base = base; + iov.iov_len = size; + offset += size; + + if (VIR_APPEND_ELEMENT(st->incomingVec, st->writeVec, iov) < 0) { + VIR_DEBUG("Append failed"); + VIR_FREE(base); + goto cleanup; + } + VIR_DEBUG("Wrote piece of vector. readVec %zu, writeVec %zu size %zu", + st->readVec, st->writeVec, size); + } + + end: + virNetClientStreamEventTimerUpdate(st); ret = 0; cleanup: + VIR_DEBUG("Stream incoming data readVec %zu writeVec %zu EOF %d", + st->readVec, st->writeVec, st->incomingEOF); virObjectUnlock(st); return ret; } @@ -361,17 +374,21 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st, size_t nbytes, bool nonblock) { - int rv = -1; + int ret = -1; + size_t partial, offset; + + virObjectLock(st); + VIR_DEBUG("st=%p client=%p data=%p nbytes=%zu nonblock=%d", st, client, data, nbytes, nonblock); - virObjectLock(st); - if (!st->incomingOffset && !st->incomingEOF) { + + if ((st->readVec >= st->writeVec) && !st->incomingEOF) { virNetMessagePtr msg; - int ret; + int rv; if (nonblock) { VIR_DEBUG("Non-blocking mode and no data available"); - rv = -2; + ret = -2; goto cleanup; } @@ -387,37 +404,66 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st, VIR_DEBUG("Dummy packet to wait for stream data"); virObjectUnlock(st); - ret = virNetClientSendWithReplyStream(client, msg, st); + rv = virNetClientSendWithReplyStream(client, msg, st); virObjectLock(st); virNetMessageFree(msg); - if (ret < 0) + if (rv < 0) goto cleanup; } - VIR_DEBUG("After IO %zu", st->incomingOffset); - if (st->incomingOffset) { - int want = st->incomingOffset; - if (want > nbytes) - want = nbytes; - memcpy(data, st->incoming, want); - if (want < st->incomingOffset) { - memmove(st->incoming, st->incoming + want, st->incomingOffset - want); - st->incomingOffset -= want; - } else { - VIR_FREE(st->incoming); - st->incomingOffset = st->incomingLength = 0; + offset = 0; + partial = nbytes; + + while (st->incomingVec && (st->readVec < st->writeVec)) { + struct iovec *iov = st->incomingVec + st->readVec; + + if (!iov || !iov->iov_base) { + virReportError(VIR_ERR_INTERNAL_ERROR, + "%s", _("NULL pointer encountered")); + goto cleanup; } - rv = want; - } else { - rv = 0; + + if (partial < iov->iov_len) { + memcpy(data+offset, iov->iov_base, partial); + memmove(iov->iov_base, (char*)iov->iov_base+partial, + iov->iov_len-partial); + iov->iov_len -= partial; + offset += partial; + VIR_DEBUG("Consumed %zu, left %zu", partial, iov->iov_len); + break; + } + + memcpy(data+offset, iov->iov_base, iov->iov_len); + VIR_DEBUG("Consumed %zu. Moving to next piece", iov->iov_len); + partial -= iov->iov_len; + offset += iov->iov_len; + VIR_FREE(iov->iov_base); + iov->iov_len = 0; + st->readVec++; + + VIR_DEBUG("Read piece of vector. read %zu, readVec %zu, writeVec %zu", + offset, st->readVec, st->writeVec); } + /* Shrink the I/O Vector buffer to free up memory. Do the + shrinking only when there is selected amount or more buffers to + free so it doesn't constantly memmove() and realloc() buffers. + */ + if (st->readVec >= 16) { + memmove(st->incomingVec, st->incomingVec + st->readVec, + sizeof(*st->incomingVec)*(st->writeVec - st->readVec)); + VIR_SHRINK_N(st->incomingVec, st->writeVec, st->readVec); + VIR_DEBUG("shrink removed %zu, left %zu", st->readVec, st->writeVec); + st->readVec = 0; + } + + ret = offset; virNetClientStreamEventTimerUpdate(st); cleanup: virObjectUnlock(st); - return rv; + return ret; }