remote: make ssh-helper massively faster

It was reported that the performance of tunnelled migration and
volume upload/download regressed in 6.9.0, when the virt-ssh-helper
is used for remote SSH tunnelling instead of netcat.

When seeing data available to read from stdin, or the socket,
the current code will allocate at most 1k of extra space in
the buffer it has.

After writing data to the socket, or stdout, if more than 1k
of extra space is in the buffer, it will reallocate to free
up that space.

This results in a huge number of mallocs when doing I/O, as
well as a huge number of syscalls since at most 1k of data
will be read/written at a time.

Also if writing blocks for some reason, it will continue to
read data with no memory bound which is bad.

This changes the code to use a 1 MB fixed size buffer in each
direction. If that buffer becomes full, it will update the
watches to stop reading more data. It will never reallocate
the buffer at runtime.

This increases the performance by orders of magnitude.

Reviewed-by: Daniel Henrique Barboza <danielhb413@gmail.com>
Tested-by: Christian Ehrhardt <christian.ehrhardt@canonical.com>
Signed-off-by: Daniel P. Berrangé <berrange@redhat.com>
This commit is contained in:
Daniel P. Berrangé 2020-11-25 17:22:51 +00:00
parent 259b43673f
commit 829142699e
1 changed files with 68 additions and 45 deletions

View File

@ -32,6 +32,8 @@
#define VIR_FROM_THIS VIR_FROM_REMOTE
#define SSH_BUF_SIZE (1024 * 1024)
VIR_LOG_INIT("remote.remote_ssh_helper");
struct virRemoteSSHHelperBuffer {
@ -45,8 +47,11 @@ typedef virRemoteSSHHelper *virRemoteSSHHelperPtr;
struct virRemoteSSHHelper {
bool quit;
virNetSocketPtr sock;
int sockEvents;
int stdinWatch;
int stdinEvents;
int stdoutWatch;
int stdoutEvents;
struct virRemoteSSHHelperBuffer sockToTerminal;
struct virRemoteSSHHelperBuffer terminalToSock;
@ -75,6 +80,40 @@ virRemoteSSHHelperShutdown(virRemoteSSHHelperPtr proxy)
}
static void
virRemoteSSHHelperUpdateEvents(virRemoteSSHHelperPtr proxy)
{
int sockEvents = 0;
int stdinEvents = 0;
int stdoutEvents = 0;
if (proxy->terminalToSock.offset != 0)
sockEvents |= VIR_EVENT_HANDLE_WRITABLE;
if (proxy->terminalToSock.offset < proxy->terminalToSock.length)
stdinEvents |= VIR_EVENT_HANDLE_READABLE;
if (proxy->sockToTerminal.offset != 0)
stdoutEvents |= VIR_EVENT_HANDLE_WRITABLE;
if (proxy->sockToTerminal.offset < proxy->sockToTerminal.length)
sockEvents |= VIR_EVENT_HANDLE_READABLE;
if (sockEvents != proxy->sockEvents) {
VIR_DEBUG("Update sock events %d -> %d", proxy->sockEvents, sockEvents);
virNetSocketUpdateIOCallback(proxy->sock, sockEvents);
proxy->sockEvents = sockEvents;
}
if (stdinEvents != proxy->stdinEvents) {
VIR_DEBUG("Update stdin events %d -> %d", proxy->stdinEvents, stdinEvents);
virEventUpdateHandle(proxy->stdinWatch, stdinEvents);
proxy->stdinEvents = stdinEvents;
}
if (stdoutEvents != proxy->stdoutEvents) {
VIR_DEBUG("Update stdout events %d -> %d", proxy->stdoutEvents, stdoutEvents);
virEventUpdateHandle(proxy->stdoutWatch, stdoutEvents);
proxy->stdoutEvents = stdoutEvents;
}
}
static void
virRemoteSSHHelperEventOnSocket(virNetSocketPtr sock,
int events,
@ -91,14 +130,9 @@ virRemoteSSHHelperEventOnSocket(virNetSocketPtr sock,
proxy->sockToTerminal.offset;
int got;
if (avail < 1024) {
if (VIR_REALLOC_N(proxy->sockToTerminal.data,
proxy->sockToTerminal.length + 1024) < 0) {
virRemoteSSHHelperShutdown(proxy);
return;
}
proxy->sockToTerminal.length += 1024;
avail += 1024;
if (avail == 0) {
VIR_DEBUG("Unexpectedly called with no space in buffer");
goto cleanup;
}
got = virNetSocketRead(sock,
@ -117,15 +151,11 @@ virRemoteSSHHelperEventOnSocket(virNetSocketPtr sock,
return;
}
proxy->sockToTerminal.offset += got;
if (proxy->sockToTerminal.offset)
virEventUpdateHandle(proxy->stdoutWatch,
VIR_EVENT_HANDLE_WRITABLE);
}
if (events & VIR_EVENT_HANDLE_WRITABLE &&
proxy->terminalToSock.offset) {
ssize_t done;
size_t avail;
done = virNetSocketWrite(proxy->sock,
proxy->terminalToSock.data,
proxy->terminalToSock.offset);
@ -135,26 +165,21 @@ virRemoteSSHHelperEventOnSocket(virNetSocketPtr sock,
virRemoteSSHHelperShutdown(proxy);
return;
}
memmove(proxy->terminalToSock.data,
proxy->terminalToSock.data + done,
proxy->terminalToSock.offset - done);
proxy->terminalToSock.offset -= done;
avail = proxy->terminalToSock.length - proxy->terminalToSock.offset;
if (avail > 1024) {
ignore_value(VIR_REALLOC_N(proxy->terminalToSock.data,
proxy->terminalToSock.offset + 1024));
proxy->terminalToSock.length = proxy->terminalToSock.offset + 1024;
}
}
if (!proxy->terminalToSock.offset)
virNetSocketUpdateIOCallback(proxy->sock,
VIR_EVENT_HANDLE_READABLE);
if (events & VIR_EVENT_HANDLE_ERROR ||
events & VIR_EVENT_HANDLE_HANGUP) {
virRemoteSSHHelperShutdown(proxy);
return;
}
cleanup:
virRemoteSSHHelperUpdateEvents(proxy);
}
@ -175,14 +200,9 @@ virRemoteSSHHelperEventOnStdin(int watch G_GNUC_UNUSED,
proxy->terminalToSock.offset;
int got;
if (avail < 1024) {
if (VIR_REALLOC_N(proxy->terminalToSock.data,
proxy->terminalToSock.length + 1024) < 0) {
virRemoteSSHHelperShutdown(proxy);
return;
}
proxy->terminalToSock.length += 1024;
avail += 1024;
if (avail == 0) {
VIR_DEBUG("Unexpectedly called with no space in buffer");
goto cleanup;
}
got = read(fd,
@ -203,10 +223,6 @@ virRemoteSSHHelperEventOnStdin(int watch G_GNUC_UNUSED,
}
proxy->terminalToSock.offset += got;
if (proxy->terminalToSock.offset)
virNetSocketUpdateIOCallback(proxy->sock,
VIR_EVENT_HANDLE_READABLE |
VIR_EVENT_HANDLE_WRITABLE);
}
if (events & VIR_EVENT_HANDLE_ERROR) {
@ -220,6 +236,9 @@ virRemoteSSHHelperEventOnStdin(int watch G_GNUC_UNUSED,
virRemoteSSHHelperShutdown(proxy);
return;
}
cleanup:
virRemoteSSHHelperUpdateEvents(proxy);
}
@ -238,7 +257,6 @@ virRemoteSSHHelperEventOnStdout(int watch G_GNUC_UNUSED,
if (events & VIR_EVENT_HANDLE_WRITABLE &&
proxy->sockToTerminal.offset) {
ssize_t done;
size_t avail;
done = write(fd,
proxy->sockToTerminal.data,
proxy->sockToTerminal.offset);
@ -253,18 +271,8 @@ virRemoteSSHHelperEventOnStdout(int watch G_GNUC_UNUSED,
proxy->sockToTerminal.data + done,
proxy->sockToTerminal.offset - done);
proxy->sockToTerminal.offset -= done;
avail = proxy->sockToTerminal.length - proxy->sockToTerminal.offset;
if (avail > 1024) {
ignore_value(VIR_REALLOC_N(proxy->sockToTerminal.data,
proxy->sockToTerminal.offset + 1024));
proxy->sockToTerminal.length = proxy->sockToTerminal.offset + 1024;
}
}
if (!proxy->sockToTerminal.offset)
virEventUpdateHandle(proxy->stdoutWatch, 0);
if (events & VIR_EVENT_HANDLE_ERROR) {
virReportError(VIR_ERR_INTERNAL_ERROR, "%s", _("IO error stdout"));
virRemoteSSHHelperShutdown(proxy);
@ -276,6 +284,8 @@ virRemoteSSHHelperEventOnStdout(int watch G_GNUC_UNUSED,
virRemoteSSHHelperShutdown(proxy);
return;
}
virRemoteSSHHelperUpdateEvents(proxy);
}
@ -285,8 +295,21 @@ virRemoteSSHHelperRun(virNetSocketPtr sock)
int ret = -1;
virRemoteSSHHelper proxy = {
.sock = sock,
.sockEvents = VIR_EVENT_HANDLE_READABLE,
.stdinWatch = -1,
.stdinEvents = VIR_EVENT_HANDLE_READABLE,
.stdoutWatch = -1,
.stdoutEvents = 0,
.sockToTerminal = {
.offset = 0,
.length = SSH_BUF_SIZE,
.data = g_new0(char, SSH_BUF_SIZE),
},
.terminalToSock = {
.offset = 0,
.length = SSH_BUF_SIZE,
.data = g_new0(char, SSH_BUF_SIZE),
},
};
virEventRegisterDefaultImpl();