migration/next for 20150507

-----BEGIN PGP SIGNATURE-----
 Version: GnuPG v1
 
 iQIcBAABCAAGBQJVS5XrAAoJEPSH7xhYctcjzsAP/iNThA9tLhMED6muTqQ6Cn4E
 hqPnTj17pDCYwtqxNzuwjwEZYEnx2hFVgM9q2fMQ94cgn7AEXc69NLhuDButRTig
 f81vFW2r44DI9G6HXVODSh9HJqN3+ifpCR90ABc8i0RfIJWLXpX6eI5m9CQuSy9B
 keSn2lg1pLaTfvl7uY+BDT8Ck2Kwv7sZJ9g3NrhAbD2mXCTtnXKPCK43Q04WGGjZ
 baMst2cvVgtlIzvltLtY3DAAUl5wkDXbnc10C0W4UrnxejaExD9y/6CwKHxryl8Z
 7BAg09iey2cZ+8ImekLlviBkSR+RxqJDxQoVG/cAjvwPhIH/5fPRrwtzZiGFZEQk
 WlqHvSsSJJ7NPDgOv28VqDkKiLxLUKiNQQbpXB7jIMpQJxLIDcowLWnewNXUPcw6
 Nd9YoCf3FEYUTodULwTNn1639L/2XeXnXwl9+jCVN5tOn3uK3do+5qMELmTM6nY/
 DBEtl8CTMBBA3xzkw4DlbV8LhJBbbNwa/UT+bX9ppp232lfSodRC1dh0huQaoLVx
 +aa6PL/OUsNjWl2wQzbKXe39ihD44IvCNBIbLYCak/XmMtYvrSsFOWYTG/WSJ4pi
 iMt8y35JrdPdVfY1ZDojFc8E0yNZyWwUOlwmkSPbmzbfeA3oiXmcoVygmZmD4FWO
 iWv7IPNAIvdCWuulHa05
 =uSLC
 -----END PGP SIGNATURE-----

Merge remote-tracking branch 'remotes/juanquintela/tags/migration/20150507-1' into staging

migration/next for 20150507

# gpg: Signature made Thu May  7 17:42:19 2015 BST using RSA key ID 5872D723
# gpg: Good signature from "Juan Quintela <quintela@redhat.com>"
# gpg:                 aka "Juan Quintela <quintela@trasno.org>"

* remotes/juanquintela/tags/migration/20150507-1:
  migration: Fix migration state update issue
  migration: avoid divide by zero in xbzrle cache miss rate
  migration: Add hmp interface to set and query parameters
  migration: Add qmp commands to set and query parameters
  migration: Use an array instead of 3 parameters
  migration: Add interface to control compression
  migration: Add the core code for decompression
  migration: Make compression co-work with xbzrle
  migration: Add the core code of multi-thread compression
  migration: Split save_zero_page from ram_save_page
  arch_init: Add and free data struct for decompression
  arch_init: Alloc and free data struct for compression
  qemu-file: Add compression functions to QEMUFile
  migration: Add the framework of multi-thread decompression
  migration: Add the framework of multi-thread compression
  docs: Add a doc about multiple thread compression

Signed-off-by: Peter Maydell <peter.maydell@linaro.org>
This commit is contained in:
Peter Maydell 2015-05-07 18:22:03 +01:00
commit 838686357b
12 changed files with 1060 additions and 27 deletions

View File

@ -24,6 +24,7 @@
#include <stdint.h>
#include <stdarg.h>
#include <stdlib.h>
#include <zlib.h>
#ifndef _WIN32
#include <sys/types.h>
#include <sys/mman.h>
@ -127,6 +128,7 @@ static uint64_t bitmap_sync_count;
#define RAM_SAVE_FLAG_CONTINUE 0x20
#define RAM_SAVE_FLAG_XBZRLE 0x40
/* 0x80 is reserved in migration.h start with 0x100 next */
#define RAM_SAVE_FLAG_COMPRESS_PAGE 0x100
static struct defconfig_file {
const char *filename;
@ -316,6 +318,147 @@ static uint64_t migration_dirty_pages;
static uint32_t last_version;
static bool ram_bulk_stage;
struct CompressParam {
bool start;
bool done;
QEMUFile *file;
QemuMutex mutex;
QemuCond cond;
RAMBlock *block;
ram_addr_t offset;
};
typedef struct CompressParam CompressParam;
struct DecompressParam {
bool start;
QemuMutex mutex;
QemuCond cond;
void *des;
uint8 *compbuf;
int len;
};
typedef struct DecompressParam DecompressParam;
static CompressParam *comp_param;
static QemuThread *compress_threads;
/* comp_done_cond is used to wake up the migration thread when
* one of the compression threads has finished the compression.
* comp_done_lock is used to co-work with comp_done_cond.
*/
static QemuMutex *comp_done_lock;
static QemuCond *comp_done_cond;
/* The empty QEMUFileOps will be used by file in CompressParam */
static const QEMUFileOps empty_ops = { };
static bool compression_switch;
static bool quit_comp_thread;
static bool quit_decomp_thread;
static DecompressParam *decomp_param;
static QemuThread *decompress_threads;
static uint8_t *compressed_data_buf;
static int do_compress_ram_page(CompressParam *param);
static void *do_data_compress(void *opaque)
{
CompressParam *param = opaque;
while (!quit_comp_thread) {
qemu_mutex_lock(&param->mutex);
/* Re-check the quit_comp_thread in case of
* terminate_compression_threads is called just before
* qemu_mutex_lock(&param->mutex) and after
* while(!quit_comp_thread), re-check it here can make
* sure the compression thread terminate as expected.
*/
while (!param->start && !quit_comp_thread) {
qemu_cond_wait(&param->cond, &param->mutex);
}
if (!quit_comp_thread) {
do_compress_ram_page(param);
}
param->start = false;
qemu_mutex_unlock(&param->mutex);
qemu_mutex_lock(comp_done_lock);
param->done = true;
qemu_cond_signal(comp_done_cond);
qemu_mutex_unlock(comp_done_lock);
}
return NULL;
}
static inline void terminate_compression_threads(void)
{
int idx, thread_count;
thread_count = migrate_compress_threads();
quit_comp_thread = true;
for (idx = 0; idx < thread_count; idx++) {
qemu_mutex_lock(&comp_param[idx].mutex);
qemu_cond_signal(&comp_param[idx].cond);
qemu_mutex_unlock(&comp_param[idx].mutex);
}
}
void migrate_compress_threads_join(void)
{
int i, thread_count;
if (!migrate_use_compression()) {
return;
}
terminate_compression_threads();
thread_count = migrate_compress_threads();
for (i = 0; i < thread_count; i++) {
qemu_thread_join(compress_threads + i);
qemu_fclose(comp_param[i].file);
qemu_mutex_destroy(&comp_param[i].mutex);
qemu_cond_destroy(&comp_param[i].cond);
}
qemu_mutex_destroy(comp_done_lock);
qemu_cond_destroy(comp_done_cond);
g_free(compress_threads);
g_free(comp_param);
g_free(comp_done_cond);
g_free(comp_done_lock);
compress_threads = NULL;
comp_param = NULL;
comp_done_cond = NULL;
comp_done_lock = NULL;
}
void migrate_compress_threads_create(void)
{
int i, thread_count;
if (!migrate_use_compression()) {
return;
}
quit_comp_thread = false;
compression_switch = true;
thread_count = migrate_compress_threads();
compress_threads = g_new0(QemuThread, thread_count);
comp_param = g_new0(CompressParam, thread_count);
comp_done_cond = g_new0(QemuCond, 1);
comp_done_lock = g_new0(QemuMutex, 1);
qemu_cond_init(comp_done_cond);
qemu_mutex_init(comp_done_lock);
for (i = 0; i < thread_count; i++) {
/* com_param[i].file is just used as a dummy buffer to save data, set
* it's ops to empty.
*/
comp_param[i].file = qemu_fopen_ops(NULL, &empty_ops);
comp_param[i].done = true;
qemu_mutex_init(&comp_param[i].mutex);
qemu_cond_init(&comp_param[i].cond);
qemu_thread_create(compress_threads + i, "compress",
do_data_compress, comp_param + i,
QEMU_THREAD_JOINABLE);
}
}
/**
* save_page_header: Write page header to wire
*
@ -520,12 +663,16 @@ static void migration_bitmap_sync_range(ram_addr_t start, ram_addr_t length)
static int64_t start_time;
static int64_t bytes_xfer_prev;
static int64_t num_dirty_pages_period;
static uint64_t xbzrle_cache_miss_prev;
static uint64_t iterations_prev;
static void migration_bitmap_sync_init(void)
{
start_time = 0;
bytes_xfer_prev = 0;
num_dirty_pages_period = 0;
xbzrle_cache_miss_prev = 0;
iterations_prev = 0;
}
/* Called with iothread lock held, to protect ram_list.dirty_memory[] */
@ -536,8 +683,6 @@ static void migration_bitmap_sync(void)
MigrationState *s = migrate_get_current();
int64_t end_time;
int64_t bytes_xfer_now;
static uint64_t xbzrle_cache_miss_prev;
static uint64_t iterations_prev;
bitmap_sync_count++;
@ -585,7 +730,7 @@ static void migration_bitmap_sync(void)
mig_throttle_on = false;
}
if (migrate_use_xbzrle()) {
if (iterations_prev != 0) {
if (iterations_prev != acct_info.iterations) {
acct_info.xbzrle_cache_miss_rate =
(double)(acct_info.xbzrle_cache_miss -
xbzrle_cache_miss_prev) /
@ -599,8 +744,36 @@ static void migration_bitmap_sync(void)
s->dirty_bytes_rate = s->dirty_pages_rate * TARGET_PAGE_SIZE;
start_time = end_time;
num_dirty_pages_period = 0;
s->dirty_sync_count = bitmap_sync_count;
}
s->dirty_sync_count = bitmap_sync_count;
}
/**
* save_zero_page: Send the zero page to the stream
*
* Returns: Number of pages written.
*
* @f: QEMUFile where to send the data
* @block: block that contains the page we want to send
* @offset: offset inside the block for the page
* @p: pointer to the page
* @bytes_transferred: increase it with the number of transferred bytes
*/
static int save_zero_page(QEMUFile *f, RAMBlock *block, ram_addr_t offset,
uint8_t *p, uint64_t *bytes_transferred)
{
int pages = -1;
if (is_zero_range(p, TARGET_PAGE_SIZE)) {
acct_info.dup_pages++;
*bytes_transferred += save_page_header(f, block,
offset | RAM_SAVE_FLAG_COMPRESS);
qemu_put_byte(f, 0);
*bytes_transferred += 1;
pages = 1;
}
return pages;
}
/**
@ -651,25 +824,22 @@ static int ram_save_page(QEMUFile *f, RAMBlock* block, ram_addr_t offset,
acct_info.dup_pages++;
}
}
} else if (is_zero_range(p, TARGET_PAGE_SIZE)) {
acct_info.dup_pages++;
*bytes_transferred += save_page_header(f, block,
offset | RAM_SAVE_FLAG_COMPRESS);
qemu_put_byte(f, 0);
*bytes_transferred += 1;
pages = 1;
/* Must let xbzrle know, otherwise a previous (now 0'd) cached
* page would be stale
*/
xbzrle_cache_zero_page(current_addr);
} else if (!ram_bulk_stage && migrate_use_xbzrle()) {
pages = save_xbzrle_page(f, &p, current_addr, block,
offset, last_stage, bytes_transferred);
if (!last_stage) {
/* Can't send this cached data async, since the cache page
* might get updated before it gets to the wire
} else {
pages = save_zero_page(f, block, offset, p, bytes_transferred);
if (pages > 0) {
/* Must let xbzrle know, otherwise a previous (now 0'd) cached
* page would be stale
*/
send_async = false;
xbzrle_cache_zero_page(current_addr);
} else if (!ram_bulk_stage && migrate_use_xbzrle()) {
pages = save_xbzrle_page(f, &p, current_addr, block,
offset, last_stage, bytes_transferred);
if (!last_stage) {
/* Can't send this cached data async, since the cache page
* might get updated before it gets to the wire
*/
send_async = false;
}
}
}
@ -692,6 +862,178 @@ static int ram_save_page(QEMUFile *f, RAMBlock* block, ram_addr_t offset,
return pages;
}
static int do_compress_ram_page(CompressParam *param)
{
int bytes_sent, blen;
uint8_t *p;
RAMBlock *block = param->block;
ram_addr_t offset = param->offset;
p = memory_region_get_ram_ptr(block->mr) + (offset & TARGET_PAGE_MASK);
bytes_sent = save_page_header(param->file, block, offset |
RAM_SAVE_FLAG_COMPRESS_PAGE);
blen = qemu_put_compression_data(param->file, p, TARGET_PAGE_SIZE,
migrate_compress_level());
bytes_sent += blen;
return bytes_sent;
}
static inline void start_compression(CompressParam *param)
{
param->done = false;
qemu_mutex_lock(&param->mutex);
param->start = true;
qemu_cond_signal(&param->cond);
qemu_mutex_unlock(&param->mutex);
}
static inline void start_decompression(DecompressParam *param)
{
qemu_mutex_lock(&param->mutex);
param->start = true;
qemu_cond_signal(&param->cond);
qemu_mutex_unlock(&param->mutex);
}
static uint64_t bytes_transferred;
static void flush_compressed_data(QEMUFile *f)
{
int idx, len, thread_count;
if (!migrate_use_compression()) {
return;
}
thread_count = migrate_compress_threads();
for (idx = 0; idx < thread_count; idx++) {
if (!comp_param[idx].done) {
qemu_mutex_lock(comp_done_lock);
while (!comp_param[idx].done && !quit_comp_thread) {
qemu_cond_wait(comp_done_cond, comp_done_lock);
}
qemu_mutex_unlock(comp_done_lock);
}
if (!quit_comp_thread) {
len = qemu_put_qemu_file(f, comp_param[idx].file);
bytes_transferred += len;
}
}
}
static inline void set_compress_params(CompressParam *param, RAMBlock *block,
ram_addr_t offset)
{
param->block = block;
param->offset = offset;
}
static int compress_page_with_multi_thread(QEMUFile *f, RAMBlock *block,
ram_addr_t offset,
uint64_t *bytes_transferred)
{
int idx, thread_count, bytes_xmit = -1, pages = -1;
thread_count = migrate_compress_threads();
qemu_mutex_lock(comp_done_lock);
while (true) {
for (idx = 0; idx < thread_count; idx++) {
if (comp_param[idx].done) {
bytes_xmit = qemu_put_qemu_file(f, comp_param[idx].file);
set_compress_params(&comp_param[idx], block, offset);
start_compression(&comp_param[idx]);
pages = 1;
acct_info.norm_pages++;
*bytes_transferred += bytes_xmit;
break;
}
}
if (pages > 0) {
break;
} else {
qemu_cond_wait(comp_done_cond, comp_done_lock);
}
}
qemu_mutex_unlock(comp_done_lock);
return pages;
}
/**
* ram_save_compressed_page: compress the given page and send it to the stream
*
* Returns: Number of pages written.
*
* @f: QEMUFile where to send the data
* @block: block that contains the page we want to send
* @offset: offset inside the block for the page
* @last_stage: if we are at the completion stage
* @bytes_transferred: increase it with the number of transferred bytes
*/
static int ram_save_compressed_page(QEMUFile *f, RAMBlock *block,
ram_addr_t offset, bool last_stage,
uint64_t *bytes_transferred)
{
int pages = -1;
uint64_t bytes_xmit;
MemoryRegion *mr = block->mr;
uint8_t *p;
int ret;
p = memory_region_get_ram_ptr(mr) + offset;
bytes_xmit = 0;
ret = ram_control_save_page(f, block->offset,
offset, TARGET_PAGE_SIZE, &bytes_xmit);
if (bytes_xmit) {
*bytes_transferred += bytes_xmit;
pages = 1;
}
if (block == last_sent_block) {
offset |= RAM_SAVE_FLAG_CONTINUE;
}
if (ret != RAM_SAVE_CONTROL_NOT_SUPP) {
if (ret != RAM_SAVE_CONTROL_DELAYED) {
if (bytes_xmit > 0) {
acct_info.norm_pages++;
} else if (bytes_xmit == 0) {
acct_info.dup_pages++;
}
}
} else {
/* When starting the process of a new block, the first page of
* the block should be sent out before other pages in the same
* block, and all the pages in last block should have been sent
* out, keeping this order is important, because the 'cont' flag
* is used to avoid resending the block name.
*/
if (block != last_sent_block) {
flush_compressed_data(f);
pages = save_zero_page(f, block, offset, p, bytes_transferred);
if (pages == -1) {
set_compress_params(&comp_param[0], block, offset);
/* Use the qemu thread to compress the data to make sure the
* first page is sent out before other pages
*/
bytes_xmit = do_compress_ram_page(&comp_param[0]);
acct_info.norm_pages++;
qemu_put_qemu_file(f, comp_param[0].file);
*bytes_transferred += bytes_xmit;
pages = 1;
}
} else {
pages = save_zero_page(f, block, offset, p, bytes_transferred);
if (pages == -1) {
pages = compress_page_with_multi_thread(f, block, offset,
bytes_transferred);
}
}
}
return pages;
}
/**
* ram_find_and_save_block: Finds a dirty page and sends it to f
*
@ -731,10 +1073,22 @@ static int ram_find_and_save_block(QEMUFile *f, bool last_stage,
block = QLIST_FIRST_RCU(&ram_list.blocks);
complete_round = true;
ram_bulk_stage = false;
if (migrate_use_xbzrle()) {
/* If xbzrle is on, stop using the data compression at this
* point. In theory, xbzrle can do better than compression.
*/
flush_compressed_data(f);
compression_switch = false;
}
}
} else {
pages = ram_save_page(f, block, offset, last_stage,
bytes_transferred);
if (compression_switch && migrate_use_compression()) {
pages = ram_save_compressed_page(f, block, offset, last_stage,
bytes_transferred);
} else {
pages = ram_save_page(f, block, offset, last_stage,
bytes_transferred);
}
/* if page is unmodified, continue to the next */
if (pages > 0) {
@ -750,8 +1104,6 @@ static int ram_find_and_save_block(QEMUFile *f, bool last_stage,
return pages;
}
static uint64_t bytes_transferred;
void acct_update_position(QEMUFile *f, size_t size, bool zero)
{
uint64_t pages = size / TARGET_PAGE_SIZE;
@ -965,6 +1317,7 @@ static int ram_save_iterate(QEMUFile *f, void *opaque)
}
i++;
}
flush_compressed_data(f);
rcu_read_unlock();
/*
@ -1006,6 +1359,7 @@ static int ram_save_complete(QEMUFile *f, void *opaque)
}
}
flush_compressed_data(f);
ram_control_after_iterate(f, RAM_CONTROL_FINISH);
migration_end();
@ -1113,10 +1467,104 @@ void ram_handle_compressed(void *host, uint8_t ch, uint64_t size)
}
}
static void *do_data_decompress(void *opaque)
{
DecompressParam *param = opaque;
unsigned long pagesize;
while (!quit_decomp_thread) {
qemu_mutex_lock(&param->mutex);
while (!param->start && !quit_decomp_thread) {
qemu_cond_wait(&param->cond, &param->mutex);
pagesize = TARGET_PAGE_SIZE;
if (!quit_decomp_thread) {
/* uncompress() will return failed in some case, especially
* when the page is dirted when doing the compression, it's
* not a problem because the dirty page will be retransferred
* and uncompress() won't break the data in other pages.
*/
uncompress((Bytef *)param->des, &pagesize,
(const Bytef *)param->compbuf, param->len);
}
param->start = false;
}
qemu_mutex_unlock(&param->mutex);
}
return NULL;
}
void migrate_decompress_threads_create(void)
{
int i, thread_count;
thread_count = migrate_decompress_threads();
decompress_threads = g_new0(QemuThread, thread_count);
decomp_param = g_new0(DecompressParam, thread_count);
compressed_data_buf = g_malloc0(compressBound(TARGET_PAGE_SIZE));
quit_decomp_thread = false;
for (i = 0; i < thread_count; i++) {
qemu_mutex_init(&decomp_param[i].mutex);
qemu_cond_init(&decomp_param[i].cond);
decomp_param[i].compbuf = g_malloc0(compressBound(TARGET_PAGE_SIZE));
qemu_thread_create(decompress_threads + i, "decompress",
do_data_decompress, decomp_param + i,
QEMU_THREAD_JOINABLE);
}
}
void migrate_decompress_threads_join(void)
{
int i, thread_count;
quit_decomp_thread = true;
thread_count = migrate_decompress_threads();
for (i = 0; i < thread_count; i++) {
qemu_mutex_lock(&decomp_param[i].mutex);
qemu_cond_signal(&decomp_param[i].cond);
qemu_mutex_unlock(&decomp_param[i].mutex);
}
for (i = 0; i < thread_count; i++) {
qemu_thread_join(decompress_threads + i);
qemu_mutex_destroy(&decomp_param[i].mutex);
qemu_cond_destroy(&decomp_param[i].cond);
g_free(decomp_param[i].compbuf);
}
g_free(decompress_threads);
g_free(decomp_param);
g_free(compressed_data_buf);
decompress_threads = NULL;
decomp_param = NULL;
compressed_data_buf = NULL;
}
static void decompress_data_with_multi_threads(uint8_t *compbuf,
void *host, int len)
{
int idx, thread_count;
thread_count = migrate_decompress_threads();
while (true) {
for (idx = 0; idx < thread_count; idx++) {
if (!decomp_param[idx].start) {
memcpy(decomp_param[idx].compbuf, compbuf, len);
decomp_param[idx].des = host;
decomp_param[idx].len = len;
start_decompression(&decomp_param[idx]);
break;
}
}
if (idx < thread_count) {
break;
}
}
}
static int ram_load(QEMUFile *f, void *opaque, int version_id)
{
int flags = 0, ret = 0;
static uint64_t seq_iter;
int len = 0;
seq_iter++;
@ -1196,6 +1644,23 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
}
qemu_get_buffer(f, host, TARGET_PAGE_SIZE);
break;
case RAM_SAVE_FLAG_COMPRESS_PAGE:
host = host_from_stream_offset(f, addr, flags);
if (!host) {
error_report("Invalid RAM offset " RAM_ADDR_FMT, addr);
ret = -EINVAL;
break;
}
len = qemu_get_be32(f);
if (len < 0 || len > compressBound(TARGET_PAGE_SIZE)) {
error_report("Invalid compressed data length: %d", len);
ret = -EINVAL;
break;
}
qemu_get_buffer(f, compressed_data_buf, len);
decompress_data_with_multi_threads(compressed_data_buf, host, len);
break;
case RAM_SAVE_FLAG_XBZRLE:
host = host_from_stream_offset(f, addr, flags);
if (!host) {

View File

@ -0,0 +1,149 @@
Use multiple thread (de)compression in live migration
=====================================================
Copyright (C) 2015 Intel Corporation
Author: Liang Li <liang.z.li@intel.com>
This work is licensed under the terms of the GNU GPLv2 or later. See
the COPYING file in the top-level directory.
Contents:
=========
* Introduction
* When to use
* Performance
* Usage
* TODO
Introduction
============
Instead of sending the guest memory directly, this solution will
compress the RAM page before sending; after receiving, the data will
be decompressed. Using compression in live migration can help
to reduce the data transferred about 60%, this is very useful when the
bandwidth is limited, and the total migration time can also be reduced
about 70% in a typical case. In addition to this, the VM downtime can be
reduced about 50%. The benefit depends on data's compressibility in VM.
The process of compression will consume additional CPU cycles, and the
extra CPU cycles will increase the migration time. On the other hand,
the amount of data transferred will decrease; this factor can reduce
the total migration time. If the process of the compression is quick
enough, then the total migration time can be reduced, and multiple
thread compression can be used to accelerate the compression process.
The decompression speed of Zlib is at least 4 times as quick as
compression, if the source and destination CPU have equal speed,
keeping the compression thread count 4 times the decompression
thread count can avoid resource waste.
Compression level can be used to control the compression speed and the
compression ratio. High compression ratio will take more time, level 0
stands for no compression, level 1 stands for the best compression
speed, and level 9 stands for the best compression ratio. Users can
select a level number between 0 and 9.
When to use the multiple thread compression in live migration
=============================================================
Compression of data will consume extra CPU cycles; so in a system with
high overhead of CPU, avoid using this feature. When the network
bandwidth is very limited and the CPU resource is adequate, use of
multiple thread compression will be very helpful. If both the CPU and
the network bandwidth are adequate, use of multiple thread compression
can still help to reduce the migration time.
Performance
===========
Test environment:
CPU: Intel(R) Xeon(R) CPU E5-2680 0 @ 2.70GHz
Socket Count: 2
RAM: 128G
NIC: Intel I350 (10/100/1000Mbps)
Host OS: CentOS 7 64-bit
Guest OS: RHEL 6.5 64-bit
Parameter: qemu-system-x86_64 -enable-kvm -smp 4 -m 4096
/share/ia32e_rhel6u5.qcow -monitor stdio
There is no additional application is running on the guest when doing
the test.
Speed limit: 1000Gb/s
---------------------------------------------------------------
| original | compress thread: 8
| way | decompress thread: 2
| | compression level: 1
---------------------------------------------------------------
total time(msec): | 3333 | 1833
---------------------------------------------------------------
downtime(msec): | 100 | 27
---------------------------------------------------------------
transferred ram(kB):| 363536 | 107819
---------------------------------------------------------------
throughput(mbps): | 893.73 | 482.22
---------------------------------------------------------------
total ram(kB): | 4211524 | 4211524
---------------------------------------------------------------
There is an application running on the guest which write random numbers
to RAM block areas periodically.
Speed limit: 1000Gb/s
---------------------------------------------------------------
| original | compress thread: 8
| way | decompress thread: 2
| | compression level: 1
---------------------------------------------------------------
total time(msec): | 37369 | 15989
---------------------------------------------------------------
downtime(msec): | 337 | 173
---------------------------------------------------------------
transferred ram(kB):| 4274143 | 1699824
---------------------------------------------------------------
throughput(mbps): | 936.99 | 870.95
---------------------------------------------------------------
total ram(kB): | 4211524 | 4211524
---------------------------------------------------------------
Usage
=====
1. Verify both the source and destination QEMU are able
to support the multiple thread compression migration:
{qemu} info_migrate_capabilities
{qemu} ... compress: off ...
2. Activate compression on the source:
{qemu} migrate_set_capability compress on
3. Set the compression thread count on source:
{qemu} migrate_set_parameter compress_threads 12
4. Set the compression level on the source:
{qemu} migrate_set_parameter compress_level 1
5. Set the decompression thread count on destination:
{qemu} migrate_set_parameter decompress_threads 3
6. Start outgoing migration:
{qemu} migrate -d tcp:destination.host:4444
{qemu} info migrate
Capabilities: ... compress: on
...
The following are the default settings:
compress: off
compress_threads: 8
decompress_threads: 2
compress_level: 1 (which means best speed)
So, only the first two steps are required to use the multiple
thread compression in migration. You can do more if the default
settings are not appropriate.
TODO
====
Some faster (de)compression method such as LZ4 and Quicklz can help
to reduce the CPU consumption when doing (de)compression. If using
these faster (de)compression method, less (de)compression threads
are needed when doing the migration.

View File

@ -990,6 +990,21 @@ STEXI
@item migrate_set_capability @var{capability} @var{state}
@findex migrate_set_capability
Enable/Disable the usage of a capability @var{capability} for migration.
ETEXI
{
.name = "migrate_set_parameter",
.args_type = "parameter:s,value:i",
.params = "parameter value",
.help = "Set the parameter for migration",
.mhandler.cmd = hmp_migrate_set_parameter,
.command_completion = migrate_set_parameter_completion,
},
STEXI
@item migrate_set_parameter @var{parameter} @var{value}
@findex migrate_set_parameter
Set the parameter @var{parameter} for migration.
ETEXI
{
@ -1761,6 +1776,8 @@ show user network stack connection states
show migration status
@item info migrate_capabilities
show current migration capabilities
@item info migrate_parameters
show current migration parameters
@item info migrate_cache_size
show current migration XBZRLE cache size
@item info balloon

65
hmp.c
View File

@ -252,6 +252,29 @@ void hmp_info_migrate_capabilities(Monitor *mon, const QDict *qdict)
qapi_free_MigrationCapabilityStatusList(caps);
}
void hmp_info_migrate_parameters(Monitor *mon, const QDict *qdict)
{
MigrationParameters *params;
params = qmp_query_migrate_parameters(NULL);
if (params) {
monitor_printf(mon, "parameters:");
monitor_printf(mon, " %s: %" PRId64,
MigrationParameter_lookup[MIGRATION_PARAMETER_COMPRESS_LEVEL],
params->compress_level);
monitor_printf(mon, " %s: %" PRId64,
MigrationParameter_lookup[MIGRATION_PARAMETER_COMPRESS_THREADS],
params->compress_threads);
monitor_printf(mon, " %s: %" PRId64,
MigrationParameter_lookup[MIGRATION_PARAMETER_DECOMPRESS_THREADS],
params->decompress_threads);
monitor_printf(mon, "\n");
}
qapi_free_MigrationParameters(params);
}
void hmp_info_migrate_cache_size(Monitor *mon, const QDict *qdict)
{
monitor_printf(mon, "xbzrel cache size: %" PRId64 " kbytes\n",
@ -1185,6 +1208,48 @@ void hmp_migrate_set_capability(Monitor *mon, const QDict *qdict)
}
}
void hmp_migrate_set_parameter(Monitor *mon, const QDict *qdict)
{
const char *param = qdict_get_str(qdict, "parameter");
int value = qdict_get_int(qdict, "value");
Error *err = NULL;
bool has_compress_level = false;
bool has_compress_threads = false;
bool has_decompress_threads = false;
int i;
for (i = 0; i < MIGRATION_PARAMETER_MAX; i++) {
if (strcmp(param, MigrationParameter_lookup[i]) == 0) {
switch (i) {
case MIGRATION_PARAMETER_COMPRESS_LEVEL:
has_compress_level = true;
break;
case MIGRATION_PARAMETER_COMPRESS_THREADS:
has_compress_threads = true;
break;
case MIGRATION_PARAMETER_DECOMPRESS_THREADS:
has_decompress_threads = true;
break;
}
qmp_migrate_set_parameters(has_compress_level, value,
has_compress_threads, value,
has_decompress_threads, value,
&err);
break;
}
}
if (i == MIGRATION_PARAMETER_MAX) {
error_set(&err, QERR_INVALID_PARAMETER, param);
}
if (err) {
monitor_printf(mon, "migrate_set_parameter: %s\n",
error_get_pretty(err));
error_free(err);
}
}
void hmp_set_password(Monitor *mon, const QDict *qdict)
{
const char *protocol = qdict_get_str(qdict, "protocol");

4
hmp.h
View File

@ -28,6 +28,7 @@ void hmp_info_chardev(Monitor *mon, const QDict *qdict);
void hmp_info_mice(Monitor *mon, const QDict *qdict);
void hmp_info_migrate(Monitor *mon, const QDict *qdict);
void hmp_info_migrate_capabilities(Monitor *mon, const QDict *qdict);
void hmp_info_migrate_parameters(Monitor *mon, const QDict *qdict);
void hmp_info_migrate_cache_size(Monitor *mon, const QDict *qdict);
void hmp_info_cpus(Monitor *mon, const QDict *qdict);
void hmp_info_block(Monitor *mon, const QDict *qdict);
@ -64,6 +65,7 @@ void hmp_migrate_incoming(Monitor *mon, const QDict *qdict);
void hmp_migrate_set_downtime(Monitor *mon, const QDict *qdict);
void hmp_migrate_set_speed(Monitor *mon, const QDict *qdict);
void hmp_migrate_set_capability(Monitor *mon, const QDict *qdict);
void hmp_migrate_set_parameter(Monitor *mon, const QDict *qdict);
void hmp_migrate_set_cache_size(Monitor *mon, const QDict *qdict);
void hmp_set_password(Monitor *mon, const QDict *qdict);
void hmp_expire_password(Monitor *mon, const QDict *qdict);
@ -113,6 +115,8 @@ void watchdog_action_completion(ReadLineState *rs, int nb_args,
const char *str);
void migrate_set_capability_completion(ReadLineState *rs, int nb_args,
const char *str);
void migrate_set_parameter_completion(ReadLineState *rs, int nb_args,
const char *str);
void host_net_add_completion(ReadLineState *rs, int nb_args, const char *str);
void host_net_remove_completion(ReadLineState *rs, int nb_args,
const char *str);

View File

@ -50,6 +50,7 @@ struct MigrationState
QemuThread thread;
QEMUBH *cleanup_bh;
QEMUFile *file;
int parameters[MIGRATION_PARAMETER_MAX];
int state;
MigrationParams params;
@ -104,6 +105,10 @@ bool migration_has_finished(MigrationState *);
bool migration_has_failed(MigrationState *);
MigrationState *migrate_get_current(void);
void migrate_compress_threads_create(void);
void migrate_compress_threads_join(void);
void migrate_decompress_threads_create(void);
void migrate_decompress_threads_join(void);
uint64_t ram_bytes_remaining(void);
uint64_t ram_bytes_transferred(void);
uint64_t ram_bytes_total(void);
@ -152,6 +157,11 @@ int64_t migrate_xbzrle_cache_size(void);
int64_t xbzrle_cache_resize(int64_t new_size);
bool migrate_use_compression(void);
int migrate_compress_level(void);
int migrate_compress_threads(void);
int migrate_decompress_threads(void);
void ram_control_before_iterate(QEMUFile *f, uint64_t flags);
void ram_control_after_iterate(QEMUFile *f, uint64_t flags);
void ram_control_load_hook(QEMUFile *f, uint64_t flags);

View File

@ -159,6 +159,9 @@ void qemu_put_be32(QEMUFile *f, unsigned int v);
void qemu_put_be64(QEMUFile *f, uint64_t v);
int qemu_peek_buffer(QEMUFile *f, uint8_t *buf, int size, size_t offset);
int qemu_get_buffer(QEMUFile *f, uint8_t *buf, int size);
ssize_t qemu_put_compression_data(QEMUFile *f, const uint8_t *p, size_t size,
int level);
int qemu_put_qemu_file(QEMUFile *f_des, QEMUFile *f_src);
/*
* Note that you can only peek continuous bytes from where the current pointer
* is; you aren't guaranteed to be able to peak to +n bytes unless you've

View File

@ -33,6 +33,14 @@
#define BUFFER_DELAY 100
#define XFER_LIMIT_RATIO (1000 / BUFFER_DELAY)
/* Default compression thread count */
#define DEFAULT_MIGRATE_COMPRESS_THREAD_COUNT 8
/* Default decompression thread count, usually decompression is at
* least 4 times as fast as compression.*/
#define DEFAULT_MIGRATE_DECOMPRESS_THREAD_COUNT 2
/*0: means nocompress, 1: best speed, ... 9: best compress ratio */
#define DEFAULT_MIGRATE_COMPRESS_LEVEL 1
/* Migration XBZRLE default cache size */
#define DEFAULT_MIGRATE_CACHE_SIZE (64 * 1024 * 1024)
@ -52,6 +60,12 @@ MigrationState *migrate_get_current(void)
.bandwidth_limit = MAX_THROTTLE,
.xbzrle_cache_size = DEFAULT_MIGRATE_CACHE_SIZE,
.mbps = -1,
.parameters[MIGRATION_PARAMETER_COMPRESS_LEVEL] =
DEFAULT_MIGRATE_COMPRESS_LEVEL,
.parameters[MIGRATION_PARAMETER_COMPRESS_THREADS] =
DEFAULT_MIGRATE_COMPRESS_THREAD_COUNT,
.parameters[MIGRATION_PARAMETER_DECOMPRESS_THREADS] =
DEFAULT_MIGRATE_DECOMPRESS_THREAD_COUNT,
};
return &current_migration;
@ -106,6 +120,7 @@ static void process_incoming_migration_co(void *opaque)
free_xbzrle_decoded_buf();
if (ret < 0) {
error_report("load of migration failed: %s", strerror(-ret));
migrate_decompress_threads_join();
exit(EXIT_FAILURE);
}
qemu_announce_self();
@ -114,6 +129,7 @@ static void process_incoming_migration_co(void *opaque)
bdrv_invalidate_cache_all(&local_err);
if (local_err) {
error_report_err(local_err);
migrate_decompress_threads_join();
exit(EXIT_FAILURE);
}
@ -122,6 +138,7 @@ static void process_incoming_migration_co(void *opaque)
} else {
runstate_set(RUN_STATE_PAUSED);
}
migrate_decompress_threads_join();
}
void process_incoming_migration(QEMUFile *f)
@ -130,6 +147,7 @@ void process_incoming_migration(QEMUFile *f)
int fd = qemu_get_fd(f);
assert(fd != -1);
migrate_decompress_threads_create();
qemu_set_nonblock(fd);
qemu_coroutine_enter(co, f);
}
@ -170,6 +188,21 @@ MigrationCapabilityStatusList *qmp_query_migrate_capabilities(Error **errp)
return head;
}
MigrationParameters *qmp_query_migrate_parameters(Error **errp)
{
MigrationParameters *params;
MigrationState *s = migrate_get_current();
params = g_malloc0(sizeof(*params));
params->compress_level = s->parameters[MIGRATION_PARAMETER_COMPRESS_LEVEL];
params->compress_threads =
s->parameters[MIGRATION_PARAMETER_COMPRESS_THREADS];
params->decompress_threads =
s->parameters[MIGRATION_PARAMETER_DECOMPRESS_THREADS];
return params;
}
static void get_xbzrle_cache_stats(MigrationInfo *info)
{
if (migrate_use_xbzrle()) {
@ -283,6 +316,47 @@ void qmp_migrate_set_capabilities(MigrationCapabilityStatusList *params,
}
}
void qmp_migrate_set_parameters(bool has_compress_level,
int64_t compress_level,
bool has_compress_threads,
int64_t compress_threads,
bool has_decompress_threads,
int64_t decompress_threads, Error **errp)
{
MigrationState *s = migrate_get_current();
if (has_compress_level && (compress_level < 0 || compress_level > 9)) {
error_set(errp, QERR_INVALID_PARAMETER_VALUE, "compress_level",
"is invalid, it should be in the range of 0 to 9");
return;
}
if (has_compress_threads &&
(compress_threads < 1 || compress_threads > 255)) {
error_set(errp, QERR_INVALID_PARAMETER_VALUE,
"compress_threads",
"is invalid, it should be in the range of 1 to 255");
return;
}
if (has_decompress_threads &&
(decompress_threads < 1 || decompress_threads > 255)) {
error_set(errp, QERR_INVALID_PARAMETER_VALUE,
"decompress_threads",
"is invalid, it should be in the range of 1 to 255");
return;
}
if (has_compress_level) {
s->parameters[MIGRATION_PARAMETER_COMPRESS_LEVEL] = compress_level;
}
if (has_compress_threads) {
s->parameters[MIGRATION_PARAMETER_COMPRESS_THREADS] = compress_threads;
}
if (has_decompress_threads) {
s->parameters[MIGRATION_PARAMETER_DECOMPRESS_THREADS] =
decompress_threads;
}
}
/* shared migration helpers */
static void migrate_set_state(MigrationState *s, int old_state, int new_state)
@ -305,6 +379,7 @@ static void migrate_fd_cleanup(void *opaque)
qemu_thread_join(&s->thread);
qemu_mutex_lock_iothread();
migrate_compress_threads_join();
qemu_fclose(s->file);
s->file = NULL;
}
@ -390,6 +465,11 @@ static MigrationState *migrate_init(const MigrationParams *params)
int64_t bandwidth_limit = s->bandwidth_limit;
bool enabled_capabilities[MIGRATION_CAPABILITY_MAX];
int64_t xbzrle_cache_size = s->xbzrle_cache_size;
int compress_level = s->parameters[MIGRATION_PARAMETER_COMPRESS_LEVEL];
int compress_thread_count =
s->parameters[MIGRATION_PARAMETER_COMPRESS_THREADS];
int decompress_thread_count =
s->parameters[MIGRATION_PARAMETER_DECOMPRESS_THREADS];
memcpy(enabled_capabilities, s->enabled_capabilities,
sizeof(enabled_capabilities));
@ -400,6 +480,11 @@ static MigrationState *migrate_init(const MigrationParams *params)
sizeof(enabled_capabilities));
s->xbzrle_cache_size = xbzrle_cache_size;
s->parameters[MIGRATION_PARAMETER_COMPRESS_LEVEL] = compress_level;
s->parameters[MIGRATION_PARAMETER_COMPRESS_THREADS] =
compress_thread_count;
s->parameters[MIGRATION_PARAMETER_DECOMPRESS_THREADS] =
decompress_thread_count;
s->bandwidth_limit = bandwidth_limit;
s->state = MIGRATION_STATUS_SETUP;
trace_migrate_set_state(MIGRATION_STATUS_SETUP);
@ -587,6 +672,42 @@ bool migrate_zero_blocks(void)
return s->enabled_capabilities[MIGRATION_CAPABILITY_ZERO_BLOCKS];
}
bool migrate_use_compression(void)
{
MigrationState *s;
s = migrate_get_current();
return s->enabled_capabilities[MIGRATION_CAPABILITY_COMPRESS];
}
int migrate_compress_level(void)
{
MigrationState *s;
s = migrate_get_current();
return s->parameters[MIGRATION_PARAMETER_COMPRESS_LEVEL];
}
int migrate_compress_threads(void)
{
MigrationState *s;
s = migrate_get_current();
return s->parameters[MIGRATION_PARAMETER_COMPRESS_THREADS];
}
int migrate_decompress_threads(void)
{
MigrationState *s;
s = migrate_get_current();
return s->parameters[MIGRATION_PARAMETER_DECOMPRESS_THREADS];
}
int migrate_use_xbzrle(void)
{
MigrationState *s;
@ -730,6 +851,7 @@ void migrate_fd_connect(MigrationState *s)
/* Notify before starting migration thread */
notifier_list_notify(&migration_state_notifiers, s);
migrate_compress_threads_create();
qemu_thread_create(&s->thread, "migration", migration_thread, s,
QEMU_THREAD_JOINABLE);
}

View File

@ -21,6 +21,7 @@
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
#include <zlib.h>
#include "qemu-common.h"
#include "qemu/iov.h"
#include "qemu/sockets.h"
@ -546,3 +547,41 @@ uint64_t qemu_get_be64(QEMUFile *f)
v |= qemu_get_be32(f);
return v;
}
/* compress size bytes of data start at p with specific compression
* level and store the compressed data to the buffer of f.
*/
ssize_t qemu_put_compression_data(QEMUFile *f, const uint8_t *p, size_t size,
int level)
{
ssize_t blen = IO_BUF_SIZE - f->buf_index - sizeof(int32_t);
if (blen < compressBound(size)) {
return 0;
}
if (compress2(f->buf + f->buf_index + sizeof(int32_t), (uLongf *)&blen,
(Bytef *)p, size, level) != Z_OK) {
error_report("Compress Failed!");
return 0;
}
qemu_put_be32(f, blen);
f->buf_index += blen;
return blen + sizeof(int32_t);
}
/* Put the data in the buffer of f_src to the buffer of f_des, and
* then reset the buf_index of f_src to 0.
*/
int qemu_put_qemu_file(QEMUFile *f_des, QEMUFile *f_src)
{
int len = 0;
if (f_src->buf_index > 0) {
len = f_src->buf_index;
qemu_put_buffer(f_des, f_src->buf, f_src->buf_index);
f_src->buf_index = 0;
}
return len;
}

View File

@ -2858,6 +2858,13 @@ static mon_cmd_t info_cmds[] = {
.help = "show current migration capabilities",
.mhandler.cmd = hmp_info_migrate_capabilities,
},
{
.name = "migrate_parameters",
.args_type = "",
.params = "",
.help = "show current migration parameters",
.mhandler.cmd = hmp_info_migrate_parameters,
},
{
.name = "migrate_cache_size",
.args_type = "",
@ -4540,6 +4547,24 @@ void migrate_set_capability_completion(ReadLineState *rs, int nb_args,
}
}
void migrate_set_parameter_completion(ReadLineState *rs, int nb_args,
const char *str)
{
size_t len;
len = strlen(str);
readline_set_completion_index(rs, len);
if (nb_args == 2) {
int i;
for (i = 0; i < MIGRATION_PARAMETER_MAX; i++) {
const char *name = MigrationParameter_lookup[i];
if (!strncmp(str, name, len)) {
readline_add_completion(rs, name);
}
}
}
}
void host_net_add_completion(ReadLineState *rs, int nb_args, const char *str)
{
int i;

View File

@ -515,13 +515,22 @@
# to enable the capability on the source VM. The feature is disabled by
# default. (since 1.6)
#
# @compress: Use multiple compression threads to accelerate live migration.
# This feature can help to reduce the migration traffic, by sending
# compressed pages. Please note that if compress and xbzrle are both
# on, compress only takes effect in the ram bulk stage, after that,
# it will be disabled and only xbzrle takes effect, this can help to
# minimize migration traffic. The feature is disabled by default.
# (since 2.4 )
#
# @auto-converge: If enabled, QEMU will automatically throttle down the guest
# to speed up convergence of RAM migration. (since 1.6)
#
# Since: 1.2
##
{ 'enum': 'MigrationCapability',
'data': ['xbzrle', 'rdma-pin-all', 'auto-converge', 'zero-blocks'] }
'data': ['xbzrle', 'rdma-pin-all', 'auto-converge', 'zero-blocks',
'compress'] }
##
# @MigrationCapabilityStatus
@ -560,6 +569,74 @@
##
{ 'command': 'query-migrate-capabilities', 'returns': ['MigrationCapabilityStatus']}
# @MigrationParameter
#
# Migration parameters enumeration
#
# @compress-level: Set the compression level to be used in live migration,
# the compression level is an integer between 0 and 9, where 0 means
# no compression, 1 means the best compression speed, and 9 means best
# compression ratio which will consume more CPU.
#
# @compress-threads: Set compression thread count to be used in live migration,
# the compression thread count is an integer between 1 and 255.
#
# @decompress-threads: Set decompression thread count to be used in live
# migration, the decompression thread count is an integer between 1
# and 255. Usually, decompression is at least 4 times as fast as
# compression, so set the decompress-threads to the number about 1/4
# of compress-threads is adequate.
#
# Since: 2.4
##
{ 'enum': 'MigrationParameter',
'data': ['compress-level', 'compress-threads', 'decompress-threads'] }
#
# @migrate-set-parameters
#
# Set the following migration parameters
#
# @compress-level: compression level
#
# @compress-threads: compression thread count
#
# @decompress-threads: decompression thread count
#
# Since: 2.4
##
{ 'command': 'migrate-set-parameters',
'data': { '*compress-level': 'int',
'*compress-threads': 'int',
'*decompress-threads': 'int'} }
#
# @MigrationParameters
#
# @compress-level: compression level
#
# @compress-threads: compression thread count
#
# @decompress-threads: decompression thread count
#
# Since: 2.4
##
{ 'struct': 'MigrationParameters',
'data': { 'compress-level': 'int',
'compress-threads': 'int',
'decompress-threads': 'int'} }
##
# @query-migrate-parameters
#
# Returns information about the current migration parameters
#
# Returns: @MigrationParameters
#
# Since: 2.4
##
{ 'command': 'query-migrate-parameters',
'returns': 'MigrationParameters' }
##
# @MouseInfo:
#

View File

@ -3442,6 +3442,63 @@ EQMP
.mhandler.cmd_new = qmp_marshal_input_query_migrate_capabilities,
},
SQMP
migrate-set-parameters
----------------------
Set migration parameters
- "compress-level": set compression level during migration (json-int)
- "compress-threads": set compression thread count for migration (json-int)
- "decompress-threads": set decompression thread count for migration (json-int)
Arguments:
Example:
-> { "execute": "migrate-set-parameters" , "arguments":
{ "compress-level": 1 } }
EQMP
{
.name = "migrate-set-parameters",
.args_type =
"compress-level:i?,compress-threads:i?,decompress-threads:i?",
.mhandler.cmd_new = qmp_marshal_input_migrate_set_parameters,
},
SQMP
query-migrate-parameters
------------------------
Query current migration parameters
- "parameters": migration parameters value
- "compress-level" : compression level value (json-int)
- "compress-threads" : compression thread count value (json-int)
- "decompress-threads" : decompression thread count value (json-int)
Arguments:
Example:
-> { "execute": "query-migrate-parameters" }
<- {
"return": {
"decompress-threads", 2,
"compress-threads", 8,
"compress-level", 1
}
}
EQMP
{
.name = "query-migrate-parameters",
.args_type = "",
.mhandler.cmd_new = qmp_marshal_input_query_migrate_parameters,
},
SQMP
query-balloon
-------------