diff --git a/arch_init.c b/arch_init.c index 4c8fceed95..23d3feba44 100644 --- a/arch_init.c +++ b/arch_init.c @@ -24,6 +24,7 @@ #include #include #include +#include #ifndef _WIN32 #include #include @@ -127,6 +128,7 @@ static uint64_t bitmap_sync_count; #define RAM_SAVE_FLAG_CONTINUE 0x20 #define RAM_SAVE_FLAG_XBZRLE 0x40 /* 0x80 is reserved in migration.h start with 0x100 next */ +#define RAM_SAVE_FLAG_COMPRESS_PAGE 0x100 static struct defconfig_file { const char *filename; @@ -316,6 +318,147 @@ static uint64_t migration_dirty_pages; static uint32_t last_version; static bool ram_bulk_stage; +struct CompressParam { + bool start; + bool done; + QEMUFile *file; + QemuMutex mutex; + QemuCond cond; + RAMBlock *block; + ram_addr_t offset; +}; +typedef struct CompressParam CompressParam; + +struct DecompressParam { + bool start; + QemuMutex mutex; + QemuCond cond; + void *des; + uint8 *compbuf; + int len; +}; +typedef struct DecompressParam DecompressParam; + +static CompressParam *comp_param; +static QemuThread *compress_threads; +/* comp_done_cond is used to wake up the migration thread when + * one of the compression threads has finished the compression. + * comp_done_lock is used to co-work with comp_done_cond. + */ +static QemuMutex *comp_done_lock; +static QemuCond *comp_done_cond; +/* The empty QEMUFileOps will be used by file in CompressParam */ +static const QEMUFileOps empty_ops = { }; + +static bool compression_switch; +static bool quit_comp_thread; +static bool quit_decomp_thread; +static DecompressParam *decomp_param; +static QemuThread *decompress_threads; +static uint8_t *compressed_data_buf; + +static int do_compress_ram_page(CompressParam *param); + +static void *do_data_compress(void *opaque) +{ + CompressParam *param = opaque; + + while (!quit_comp_thread) { + qemu_mutex_lock(¶m->mutex); + /* Re-check the quit_comp_thread in case of + * terminate_compression_threads is called just before + * qemu_mutex_lock(¶m->mutex) and after + * while(!quit_comp_thread), re-check it here can make + * sure the compression thread terminate as expected. + */ + while (!param->start && !quit_comp_thread) { + qemu_cond_wait(¶m->cond, ¶m->mutex); + } + if (!quit_comp_thread) { + do_compress_ram_page(param); + } + param->start = false; + qemu_mutex_unlock(¶m->mutex); + + qemu_mutex_lock(comp_done_lock); + param->done = true; + qemu_cond_signal(comp_done_cond); + qemu_mutex_unlock(comp_done_lock); + } + + return NULL; +} + +static inline void terminate_compression_threads(void) +{ + int idx, thread_count; + + thread_count = migrate_compress_threads(); + quit_comp_thread = true; + for (idx = 0; idx < thread_count; idx++) { + qemu_mutex_lock(&comp_param[idx].mutex); + qemu_cond_signal(&comp_param[idx].cond); + qemu_mutex_unlock(&comp_param[idx].mutex); + } +} + +void migrate_compress_threads_join(void) +{ + int i, thread_count; + + if (!migrate_use_compression()) { + return; + } + terminate_compression_threads(); + thread_count = migrate_compress_threads(); + for (i = 0; i < thread_count; i++) { + qemu_thread_join(compress_threads + i); + qemu_fclose(comp_param[i].file); + qemu_mutex_destroy(&comp_param[i].mutex); + qemu_cond_destroy(&comp_param[i].cond); + } + qemu_mutex_destroy(comp_done_lock); + qemu_cond_destroy(comp_done_cond); + g_free(compress_threads); + g_free(comp_param); + g_free(comp_done_cond); + g_free(comp_done_lock); + compress_threads = NULL; + comp_param = NULL; + comp_done_cond = NULL; + comp_done_lock = NULL; +} + +void migrate_compress_threads_create(void) +{ + int i, thread_count; + + if (!migrate_use_compression()) { + return; + } + quit_comp_thread = false; + compression_switch = true; + thread_count = migrate_compress_threads(); + compress_threads = g_new0(QemuThread, thread_count); + comp_param = g_new0(CompressParam, thread_count); + comp_done_cond = g_new0(QemuCond, 1); + comp_done_lock = g_new0(QemuMutex, 1); + qemu_cond_init(comp_done_cond); + qemu_mutex_init(comp_done_lock); + for (i = 0; i < thread_count; i++) { + /* com_param[i].file is just used as a dummy buffer to save data, set + * it's ops to empty. + */ + comp_param[i].file = qemu_fopen_ops(NULL, &empty_ops); + comp_param[i].done = true; + qemu_mutex_init(&comp_param[i].mutex); + qemu_cond_init(&comp_param[i].cond); + qemu_thread_create(compress_threads + i, "compress", + do_data_compress, comp_param + i, + QEMU_THREAD_JOINABLE); + } +} + /** * save_page_header: Write page header to wire * @@ -520,12 +663,16 @@ static void migration_bitmap_sync_range(ram_addr_t start, ram_addr_t length) static int64_t start_time; static int64_t bytes_xfer_prev; static int64_t num_dirty_pages_period; +static uint64_t xbzrle_cache_miss_prev; +static uint64_t iterations_prev; static void migration_bitmap_sync_init(void) { start_time = 0; bytes_xfer_prev = 0; num_dirty_pages_period = 0; + xbzrle_cache_miss_prev = 0; + iterations_prev = 0; } /* Called with iothread lock held, to protect ram_list.dirty_memory[] */ @@ -536,8 +683,6 @@ static void migration_bitmap_sync(void) MigrationState *s = migrate_get_current(); int64_t end_time; int64_t bytes_xfer_now; - static uint64_t xbzrle_cache_miss_prev; - static uint64_t iterations_prev; bitmap_sync_count++; @@ -585,7 +730,7 @@ static void migration_bitmap_sync(void) mig_throttle_on = false; } if (migrate_use_xbzrle()) { - if (iterations_prev != 0) { + if (iterations_prev != acct_info.iterations) { acct_info.xbzrle_cache_miss_rate = (double)(acct_info.xbzrle_cache_miss - xbzrle_cache_miss_prev) / @@ -599,8 +744,36 @@ static void migration_bitmap_sync(void) s->dirty_bytes_rate = s->dirty_pages_rate * TARGET_PAGE_SIZE; start_time = end_time; num_dirty_pages_period = 0; - s->dirty_sync_count = bitmap_sync_count; } + s->dirty_sync_count = bitmap_sync_count; +} + +/** + * save_zero_page: Send the zero page to the stream + * + * Returns: Number of pages written. + * + * @f: QEMUFile where to send the data + * @block: block that contains the page we want to send + * @offset: offset inside the block for the page + * @p: pointer to the page + * @bytes_transferred: increase it with the number of transferred bytes + */ +static int save_zero_page(QEMUFile *f, RAMBlock *block, ram_addr_t offset, + uint8_t *p, uint64_t *bytes_transferred) +{ + int pages = -1; + + if (is_zero_range(p, TARGET_PAGE_SIZE)) { + acct_info.dup_pages++; + *bytes_transferred += save_page_header(f, block, + offset | RAM_SAVE_FLAG_COMPRESS); + qemu_put_byte(f, 0); + *bytes_transferred += 1; + pages = 1; + } + + return pages; } /** @@ -651,25 +824,22 @@ static int ram_save_page(QEMUFile *f, RAMBlock* block, ram_addr_t offset, acct_info.dup_pages++; } } - } else if (is_zero_range(p, TARGET_PAGE_SIZE)) { - acct_info.dup_pages++; - *bytes_transferred += save_page_header(f, block, - offset | RAM_SAVE_FLAG_COMPRESS); - qemu_put_byte(f, 0); - *bytes_transferred += 1; - pages = 1; - /* Must let xbzrle know, otherwise a previous (now 0'd) cached - * page would be stale - */ - xbzrle_cache_zero_page(current_addr); - } else if (!ram_bulk_stage && migrate_use_xbzrle()) { - pages = save_xbzrle_page(f, &p, current_addr, block, - offset, last_stage, bytes_transferred); - if (!last_stage) { - /* Can't send this cached data async, since the cache page - * might get updated before it gets to the wire + } else { + pages = save_zero_page(f, block, offset, p, bytes_transferred); + if (pages > 0) { + /* Must let xbzrle know, otherwise a previous (now 0'd) cached + * page would be stale */ - send_async = false; + xbzrle_cache_zero_page(current_addr); + } else if (!ram_bulk_stage && migrate_use_xbzrle()) { + pages = save_xbzrle_page(f, &p, current_addr, block, + offset, last_stage, bytes_transferred); + if (!last_stage) { + /* Can't send this cached data async, since the cache page + * might get updated before it gets to the wire + */ + send_async = false; + } } } @@ -692,6 +862,178 @@ static int ram_save_page(QEMUFile *f, RAMBlock* block, ram_addr_t offset, return pages; } +static int do_compress_ram_page(CompressParam *param) +{ + int bytes_sent, blen; + uint8_t *p; + RAMBlock *block = param->block; + ram_addr_t offset = param->offset; + + p = memory_region_get_ram_ptr(block->mr) + (offset & TARGET_PAGE_MASK); + + bytes_sent = save_page_header(param->file, block, offset | + RAM_SAVE_FLAG_COMPRESS_PAGE); + blen = qemu_put_compression_data(param->file, p, TARGET_PAGE_SIZE, + migrate_compress_level()); + bytes_sent += blen; + + return bytes_sent; +} + +static inline void start_compression(CompressParam *param) +{ + param->done = false; + qemu_mutex_lock(¶m->mutex); + param->start = true; + qemu_cond_signal(¶m->cond); + qemu_mutex_unlock(¶m->mutex); +} + +static inline void start_decompression(DecompressParam *param) +{ + qemu_mutex_lock(¶m->mutex); + param->start = true; + qemu_cond_signal(¶m->cond); + qemu_mutex_unlock(¶m->mutex); +} + +static uint64_t bytes_transferred; + +static void flush_compressed_data(QEMUFile *f) +{ + int idx, len, thread_count; + + if (!migrate_use_compression()) { + return; + } + thread_count = migrate_compress_threads(); + for (idx = 0; idx < thread_count; idx++) { + if (!comp_param[idx].done) { + qemu_mutex_lock(comp_done_lock); + while (!comp_param[idx].done && !quit_comp_thread) { + qemu_cond_wait(comp_done_cond, comp_done_lock); + } + qemu_mutex_unlock(comp_done_lock); + } + if (!quit_comp_thread) { + len = qemu_put_qemu_file(f, comp_param[idx].file); + bytes_transferred += len; + } + } +} + +static inline void set_compress_params(CompressParam *param, RAMBlock *block, + ram_addr_t offset) +{ + param->block = block; + param->offset = offset; +} + +static int compress_page_with_multi_thread(QEMUFile *f, RAMBlock *block, + ram_addr_t offset, + uint64_t *bytes_transferred) +{ + int idx, thread_count, bytes_xmit = -1, pages = -1; + + thread_count = migrate_compress_threads(); + qemu_mutex_lock(comp_done_lock); + while (true) { + for (idx = 0; idx < thread_count; idx++) { + if (comp_param[idx].done) { + bytes_xmit = qemu_put_qemu_file(f, comp_param[idx].file); + set_compress_params(&comp_param[idx], block, offset); + start_compression(&comp_param[idx]); + pages = 1; + acct_info.norm_pages++; + *bytes_transferred += bytes_xmit; + break; + } + } + if (pages > 0) { + break; + } else { + qemu_cond_wait(comp_done_cond, comp_done_lock); + } + } + qemu_mutex_unlock(comp_done_lock); + + return pages; +} + +/** + * ram_save_compressed_page: compress the given page and send it to the stream + * + * Returns: Number of pages written. + * + * @f: QEMUFile where to send the data + * @block: block that contains the page we want to send + * @offset: offset inside the block for the page + * @last_stage: if we are at the completion stage + * @bytes_transferred: increase it with the number of transferred bytes + */ +static int ram_save_compressed_page(QEMUFile *f, RAMBlock *block, + ram_addr_t offset, bool last_stage, + uint64_t *bytes_transferred) +{ + int pages = -1; + uint64_t bytes_xmit; + MemoryRegion *mr = block->mr; + uint8_t *p; + int ret; + + p = memory_region_get_ram_ptr(mr) + offset; + + bytes_xmit = 0; + ret = ram_control_save_page(f, block->offset, + offset, TARGET_PAGE_SIZE, &bytes_xmit); + if (bytes_xmit) { + *bytes_transferred += bytes_xmit; + pages = 1; + } + if (block == last_sent_block) { + offset |= RAM_SAVE_FLAG_CONTINUE; + } + if (ret != RAM_SAVE_CONTROL_NOT_SUPP) { + if (ret != RAM_SAVE_CONTROL_DELAYED) { + if (bytes_xmit > 0) { + acct_info.norm_pages++; + } else if (bytes_xmit == 0) { + acct_info.dup_pages++; + } + } + } else { + /* When starting the process of a new block, the first page of + * the block should be sent out before other pages in the same + * block, and all the pages in last block should have been sent + * out, keeping this order is important, because the 'cont' flag + * is used to avoid resending the block name. + */ + if (block != last_sent_block) { + flush_compressed_data(f); + pages = save_zero_page(f, block, offset, p, bytes_transferred); + if (pages == -1) { + set_compress_params(&comp_param[0], block, offset); + /* Use the qemu thread to compress the data to make sure the + * first page is sent out before other pages + */ + bytes_xmit = do_compress_ram_page(&comp_param[0]); + acct_info.norm_pages++; + qemu_put_qemu_file(f, comp_param[0].file); + *bytes_transferred += bytes_xmit; + pages = 1; + } + } else { + pages = save_zero_page(f, block, offset, p, bytes_transferred); + if (pages == -1) { + pages = compress_page_with_multi_thread(f, block, offset, + bytes_transferred); + } + } + } + + return pages; +} + /** * ram_find_and_save_block: Finds a dirty page and sends it to f * @@ -731,10 +1073,22 @@ static int ram_find_and_save_block(QEMUFile *f, bool last_stage, block = QLIST_FIRST_RCU(&ram_list.blocks); complete_round = true; ram_bulk_stage = false; + if (migrate_use_xbzrle()) { + /* If xbzrle is on, stop using the data compression at this + * point. In theory, xbzrle can do better than compression. + */ + flush_compressed_data(f); + compression_switch = false; + } } } else { - pages = ram_save_page(f, block, offset, last_stage, - bytes_transferred); + if (compression_switch && migrate_use_compression()) { + pages = ram_save_compressed_page(f, block, offset, last_stage, + bytes_transferred); + } else { + pages = ram_save_page(f, block, offset, last_stage, + bytes_transferred); + } /* if page is unmodified, continue to the next */ if (pages > 0) { @@ -750,8 +1104,6 @@ static int ram_find_and_save_block(QEMUFile *f, bool last_stage, return pages; } -static uint64_t bytes_transferred; - void acct_update_position(QEMUFile *f, size_t size, bool zero) { uint64_t pages = size / TARGET_PAGE_SIZE; @@ -965,6 +1317,7 @@ static int ram_save_iterate(QEMUFile *f, void *opaque) } i++; } + flush_compressed_data(f); rcu_read_unlock(); /* @@ -1006,6 +1359,7 @@ static int ram_save_complete(QEMUFile *f, void *opaque) } } + flush_compressed_data(f); ram_control_after_iterate(f, RAM_CONTROL_FINISH); migration_end(); @@ -1113,10 +1467,104 @@ void ram_handle_compressed(void *host, uint8_t ch, uint64_t size) } } +static void *do_data_decompress(void *opaque) +{ + DecompressParam *param = opaque; + unsigned long pagesize; + + while (!quit_decomp_thread) { + qemu_mutex_lock(¶m->mutex); + while (!param->start && !quit_decomp_thread) { + qemu_cond_wait(¶m->cond, ¶m->mutex); + pagesize = TARGET_PAGE_SIZE; + if (!quit_decomp_thread) { + /* uncompress() will return failed in some case, especially + * when the page is dirted when doing the compression, it's + * not a problem because the dirty page will be retransferred + * and uncompress() won't break the data in other pages. + */ + uncompress((Bytef *)param->des, &pagesize, + (const Bytef *)param->compbuf, param->len); + } + param->start = false; + } + qemu_mutex_unlock(¶m->mutex); + } + + return NULL; +} + +void migrate_decompress_threads_create(void) +{ + int i, thread_count; + + thread_count = migrate_decompress_threads(); + decompress_threads = g_new0(QemuThread, thread_count); + decomp_param = g_new0(DecompressParam, thread_count); + compressed_data_buf = g_malloc0(compressBound(TARGET_PAGE_SIZE)); + quit_decomp_thread = false; + for (i = 0; i < thread_count; i++) { + qemu_mutex_init(&decomp_param[i].mutex); + qemu_cond_init(&decomp_param[i].cond); + decomp_param[i].compbuf = g_malloc0(compressBound(TARGET_PAGE_SIZE)); + qemu_thread_create(decompress_threads + i, "decompress", + do_data_decompress, decomp_param + i, + QEMU_THREAD_JOINABLE); + } +} + +void migrate_decompress_threads_join(void) +{ + int i, thread_count; + + quit_decomp_thread = true; + thread_count = migrate_decompress_threads(); + for (i = 0; i < thread_count; i++) { + qemu_mutex_lock(&decomp_param[i].mutex); + qemu_cond_signal(&decomp_param[i].cond); + qemu_mutex_unlock(&decomp_param[i].mutex); + } + for (i = 0; i < thread_count; i++) { + qemu_thread_join(decompress_threads + i); + qemu_mutex_destroy(&decomp_param[i].mutex); + qemu_cond_destroy(&decomp_param[i].cond); + g_free(decomp_param[i].compbuf); + } + g_free(decompress_threads); + g_free(decomp_param); + g_free(compressed_data_buf); + decompress_threads = NULL; + decomp_param = NULL; + compressed_data_buf = NULL; +} + +static void decompress_data_with_multi_threads(uint8_t *compbuf, + void *host, int len) +{ + int idx, thread_count; + + thread_count = migrate_decompress_threads(); + while (true) { + for (idx = 0; idx < thread_count; idx++) { + if (!decomp_param[idx].start) { + memcpy(decomp_param[idx].compbuf, compbuf, len); + decomp_param[idx].des = host; + decomp_param[idx].len = len; + start_decompression(&decomp_param[idx]); + break; + } + } + if (idx < thread_count) { + break; + } + } +} + static int ram_load(QEMUFile *f, void *opaque, int version_id) { int flags = 0, ret = 0; static uint64_t seq_iter; + int len = 0; seq_iter++; @@ -1196,6 +1644,23 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id) } qemu_get_buffer(f, host, TARGET_PAGE_SIZE); break; + case RAM_SAVE_FLAG_COMPRESS_PAGE: + host = host_from_stream_offset(f, addr, flags); + if (!host) { + error_report("Invalid RAM offset " RAM_ADDR_FMT, addr); + ret = -EINVAL; + break; + } + + len = qemu_get_be32(f); + if (len < 0 || len > compressBound(TARGET_PAGE_SIZE)) { + error_report("Invalid compressed data length: %d", len); + ret = -EINVAL; + break; + } + qemu_get_buffer(f, compressed_data_buf, len); + decompress_data_with_multi_threads(compressed_data_buf, host, len); + break; case RAM_SAVE_FLAG_XBZRLE: host = host_from_stream_offset(f, addr, flags); if (!host) { diff --git a/docs/multi-thread-compression.txt b/docs/multi-thread-compression.txt new file mode 100644 index 0000000000..3d477c3bd2 --- /dev/null +++ b/docs/multi-thread-compression.txt @@ -0,0 +1,149 @@ +Use multiple thread (de)compression in live migration +===================================================== +Copyright (C) 2015 Intel Corporation +Author: Liang Li + +This work is licensed under the terms of the GNU GPLv2 or later. See +the COPYING file in the top-level directory. + +Contents: +========= +* Introduction +* When to use +* Performance +* Usage +* TODO + +Introduction +============ +Instead of sending the guest memory directly, this solution will +compress the RAM page before sending; after receiving, the data will +be decompressed. Using compression in live migration can help +to reduce the data transferred about 60%, this is very useful when the +bandwidth is limited, and the total migration time can also be reduced +about 70% in a typical case. In addition to this, the VM downtime can be +reduced about 50%. The benefit depends on data's compressibility in VM. + +The process of compression will consume additional CPU cycles, and the +extra CPU cycles will increase the migration time. On the other hand, +the amount of data transferred will decrease; this factor can reduce +the total migration time. If the process of the compression is quick +enough, then the total migration time can be reduced, and multiple +thread compression can be used to accelerate the compression process. + +The decompression speed of Zlib is at least 4 times as quick as +compression, if the source and destination CPU have equal speed, +keeping the compression thread count 4 times the decompression +thread count can avoid resource waste. + +Compression level can be used to control the compression speed and the +compression ratio. High compression ratio will take more time, level 0 +stands for no compression, level 1 stands for the best compression +speed, and level 9 stands for the best compression ratio. Users can +select a level number between 0 and 9. + + +When to use the multiple thread compression in live migration +============================================================= +Compression of data will consume extra CPU cycles; so in a system with +high overhead of CPU, avoid using this feature. When the network +bandwidth is very limited and the CPU resource is adequate, use of +multiple thread compression will be very helpful. If both the CPU and +the network bandwidth are adequate, use of multiple thread compression +can still help to reduce the migration time. + +Performance +=========== +Test environment: + +CPU: Intel(R) Xeon(R) CPU E5-2680 0 @ 2.70GHz +Socket Count: 2 +RAM: 128G +NIC: Intel I350 (10/100/1000Mbps) +Host OS: CentOS 7 64-bit +Guest OS: RHEL 6.5 64-bit +Parameter: qemu-system-x86_64 -enable-kvm -smp 4 -m 4096 + /share/ia32e_rhel6u5.qcow -monitor stdio + +There is no additional application is running on the guest when doing +the test. + + +Speed limit: 1000Gb/s +--------------------------------------------------------------- + | original | compress thread: 8 + | way | decompress thread: 2 + | | compression level: 1 +--------------------------------------------------------------- +total time(msec): | 3333 | 1833 +--------------------------------------------------------------- +downtime(msec): | 100 | 27 +--------------------------------------------------------------- +transferred ram(kB):| 363536 | 107819 +--------------------------------------------------------------- +throughput(mbps): | 893.73 | 482.22 +--------------------------------------------------------------- +total ram(kB): | 4211524 | 4211524 +--------------------------------------------------------------- + +There is an application running on the guest which write random numbers +to RAM block areas periodically. + +Speed limit: 1000Gb/s +--------------------------------------------------------------- + | original | compress thread: 8 + | way | decompress thread: 2 + | | compression level: 1 +--------------------------------------------------------------- +total time(msec): | 37369 | 15989 +--------------------------------------------------------------- +downtime(msec): | 337 | 173 +--------------------------------------------------------------- +transferred ram(kB):| 4274143 | 1699824 +--------------------------------------------------------------- +throughput(mbps): | 936.99 | 870.95 +--------------------------------------------------------------- +total ram(kB): | 4211524 | 4211524 +--------------------------------------------------------------- + +Usage +===== +1. Verify both the source and destination QEMU are able +to support the multiple thread compression migration: + {qemu} info_migrate_capabilities + {qemu} ... compress: off ... + +2. Activate compression on the source: + {qemu} migrate_set_capability compress on + +3. Set the compression thread count on source: + {qemu} migrate_set_parameter compress_threads 12 + +4. Set the compression level on the source: + {qemu} migrate_set_parameter compress_level 1 + +5. Set the decompression thread count on destination: + {qemu} migrate_set_parameter decompress_threads 3 + +6. Start outgoing migration: + {qemu} migrate -d tcp:destination.host:4444 + {qemu} info migrate + Capabilities: ... compress: on + ... + +The following are the default settings: + compress: off + compress_threads: 8 + decompress_threads: 2 + compress_level: 1 (which means best speed) + +So, only the first two steps are required to use the multiple +thread compression in migration. You can do more if the default +settings are not appropriate. + +TODO +==== +Some faster (de)compression method such as LZ4 and Quicklz can help +to reduce the CPU consumption when doing (de)compression. If using +these faster (de)compression method, less (de)compression threads +are needed when doing the migration. diff --git a/hmp-commands.hx b/hmp-commands.hx index a6de819f69..e864a6ca81 100644 --- a/hmp-commands.hx +++ b/hmp-commands.hx @@ -990,6 +990,21 @@ STEXI @item migrate_set_capability @var{capability} @var{state} @findex migrate_set_capability Enable/Disable the usage of a capability @var{capability} for migration. +ETEXI + + { + .name = "migrate_set_parameter", + .args_type = "parameter:s,value:i", + .params = "parameter value", + .help = "Set the parameter for migration", + .mhandler.cmd = hmp_migrate_set_parameter, + .command_completion = migrate_set_parameter_completion, + }, + +STEXI +@item migrate_set_parameter @var{parameter} @var{value} +@findex migrate_set_parameter +Set the parameter @var{parameter} for migration. ETEXI { @@ -1761,6 +1776,8 @@ show user network stack connection states show migration status @item info migrate_capabilities show current migration capabilities +@item info migrate_parameters +show current migration parameters @item info migrate_cache_size show current migration XBZRLE cache size @item info balloon diff --git a/hmp.c b/hmp.c index 3010d04c92..e17852d1f9 100644 --- a/hmp.c +++ b/hmp.c @@ -252,6 +252,29 @@ void hmp_info_migrate_capabilities(Monitor *mon, const QDict *qdict) qapi_free_MigrationCapabilityStatusList(caps); } +void hmp_info_migrate_parameters(Monitor *mon, const QDict *qdict) +{ + MigrationParameters *params; + + params = qmp_query_migrate_parameters(NULL); + + if (params) { + monitor_printf(mon, "parameters:"); + monitor_printf(mon, " %s: %" PRId64, + MigrationParameter_lookup[MIGRATION_PARAMETER_COMPRESS_LEVEL], + params->compress_level); + monitor_printf(mon, " %s: %" PRId64, + MigrationParameter_lookup[MIGRATION_PARAMETER_COMPRESS_THREADS], + params->compress_threads); + monitor_printf(mon, " %s: %" PRId64, + MigrationParameter_lookup[MIGRATION_PARAMETER_DECOMPRESS_THREADS], + params->decompress_threads); + monitor_printf(mon, "\n"); + } + + qapi_free_MigrationParameters(params); +} + void hmp_info_migrate_cache_size(Monitor *mon, const QDict *qdict) { monitor_printf(mon, "xbzrel cache size: %" PRId64 " kbytes\n", @@ -1185,6 +1208,48 @@ void hmp_migrate_set_capability(Monitor *mon, const QDict *qdict) } } +void hmp_migrate_set_parameter(Monitor *mon, const QDict *qdict) +{ + const char *param = qdict_get_str(qdict, "parameter"); + int value = qdict_get_int(qdict, "value"); + Error *err = NULL; + bool has_compress_level = false; + bool has_compress_threads = false; + bool has_decompress_threads = false; + int i; + + for (i = 0; i < MIGRATION_PARAMETER_MAX; i++) { + if (strcmp(param, MigrationParameter_lookup[i]) == 0) { + switch (i) { + case MIGRATION_PARAMETER_COMPRESS_LEVEL: + has_compress_level = true; + break; + case MIGRATION_PARAMETER_COMPRESS_THREADS: + has_compress_threads = true; + break; + case MIGRATION_PARAMETER_DECOMPRESS_THREADS: + has_decompress_threads = true; + break; + } + qmp_migrate_set_parameters(has_compress_level, value, + has_compress_threads, value, + has_decompress_threads, value, + &err); + break; + } + } + + if (i == MIGRATION_PARAMETER_MAX) { + error_set(&err, QERR_INVALID_PARAMETER, param); + } + + if (err) { + monitor_printf(mon, "migrate_set_parameter: %s\n", + error_get_pretty(err)); + error_free(err); + } +} + void hmp_set_password(Monitor *mon, const QDict *qdict) { const char *protocol = qdict_get_str(qdict, "protocol"); diff --git a/hmp.h b/hmp.h index 12acb6d414..a158e3fda1 100644 --- a/hmp.h +++ b/hmp.h @@ -28,6 +28,7 @@ void hmp_info_chardev(Monitor *mon, const QDict *qdict); void hmp_info_mice(Monitor *mon, const QDict *qdict); void hmp_info_migrate(Monitor *mon, const QDict *qdict); void hmp_info_migrate_capabilities(Monitor *mon, const QDict *qdict); +void hmp_info_migrate_parameters(Monitor *mon, const QDict *qdict); void hmp_info_migrate_cache_size(Monitor *mon, const QDict *qdict); void hmp_info_cpus(Monitor *mon, const QDict *qdict); void hmp_info_block(Monitor *mon, const QDict *qdict); @@ -64,6 +65,7 @@ void hmp_migrate_incoming(Monitor *mon, const QDict *qdict); void hmp_migrate_set_downtime(Monitor *mon, const QDict *qdict); void hmp_migrate_set_speed(Monitor *mon, const QDict *qdict); void hmp_migrate_set_capability(Monitor *mon, const QDict *qdict); +void hmp_migrate_set_parameter(Monitor *mon, const QDict *qdict); void hmp_migrate_set_cache_size(Monitor *mon, const QDict *qdict); void hmp_set_password(Monitor *mon, const QDict *qdict); void hmp_expire_password(Monitor *mon, const QDict *qdict); @@ -113,6 +115,8 @@ void watchdog_action_completion(ReadLineState *rs, int nb_args, const char *str); void migrate_set_capability_completion(ReadLineState *rs, int nb_args, const char *str); +void migrate_set_parameter_completion(ReadLineState *rs, int nb_args, + const char *str); void host_net_add_completion(ReadLineState *rs, int nb_args, const char *str); void host_net_remove_completion(ReadLineState *rs, int nb_args, const char *str); diff --git a/include/migration/migration.h b/include/migration/migration.h index bf09968d76..a6e025a248 100644 --- a/include/migration/migration.h +++ b/include/migration/migration.h @@ -50,6 +50,7 @@ struct MigrationState QemuThread thread; QEMUBH *cleanup_bh; QEMUFile *file; + int parameters[MIGRATION_PARAMETER_MAX]; int state; MigrationParams params; @@ -104,6 +105,10 @@ bool migration_has_finished(MigrationState *); bool migration_has_failed(MigrationState *); MigrationState *migrate_get_current(void); +void migrate_compress_threads_create(void); +void migrate_compress_threads_join(void); +void migrate_decompress_threads_create(void); +void migrate_decompress_threads_join(void); uint64_t ram_bytes_remaining(void); uint64_t ram_bytes_transferred(void); uint64_t ram_bytes_total(void); @@ -152,6 +157,11 @@ int64_t migrate_xbzrle_cache_size(void); int64_t xbzrle_cache_resize(int64_t new_size); +bool migrate_use_compression(void); +int migrate_compress_level(void); +int migrate_compress_threads(void); +int migrate_decompress_threads(void); + void ram_control_before_iterate(QEMUFile *f, uint64_t flags); void ram_control_after_iterate(QEMUFile *f, uint64_t flags); void ram_control_load_hook(QEMUFile *f, uint64_t flags); diff --git a/include/migration/qemu-file.h b/include/migration/qemu-file.h index 745a850e51..a01c5b817e 100644 --- a/include/migration/qemu-file.h +++ b/include/migration/qemu-file.h @@ -159,6 +159,9 @@ void qemu_put_be32(QEMUFile *f, unsigned int v); void qemu_put_be64(QEMUFile *f, uint64_t v); int qemu_peek_buffer(QEMUFile *f, uint8_t *buf, int size, size_t offset); int qemu_get_buffer(QEMUFile *f, uint8_t *buf, int size); +ssize_t qemu_put_compression_data(QEMUFile *f, const uint8_t *p, size_t size, + int level); +int qemu_put_qemu_file(QEMUFile *f_des, QEMUFile *f_src); /* * Note that you can only peek continuous bytes from where the current pointer * is; you aren't guaranteed to be able to peak to +n bytes unless you've diff --git a/migration/migration.c b/migration/migration.c index bc424907f3..732d229708 100644 --- a/migration/migration.c +++ b/migration/migration.c @@ -33,6 +33,14 @@ #define BUFFER_DELAY 100 #define XFER_LIMIT_RATIO (1000 / BUFFER_DELAY) +/* Default compression thread count */ +#define DEFAULT_MIGRATE_COMPRESS_THREAD_COUNT 8 +/* Default decompression thread count, usually decompression is at + * least 4 times as fast as compression.*/ +#define DEFAULT_MIGRATE_DECOMPRESS_THREAD_COUNT 2 +/*0: means nocompress, 1: best speed, ... 9: best compress ratio */ +#define DEFAULT_MIGRATE_COMPRESS_LEVEL 1 + /* Migration XBZRLE default cache size */ #define DEFAULT_MIGRATE_CACHE_SIZE (64 * 1024 * 1024) @@ -52,6 +60,12 @@ MigrationState *migrate_get_current(void) .bandwidth_limit = MAX_THROTTLE, .xbzrle_cache_size = DEFAULT_MIGRATE_CACHE_SIZE, .mbps = -1, + .parameters[MIGRATION_PARAMETER_COMPRESS_LEVEL] = + DEFAULT_MIGRATE_COMPRESS_LEVEL, + .parameters[MIGRATION_PARAMETER_COMPRESS_THREADS] = + DEFAULT_MIGRATE_COMPRESS_THREAD_COUNT, + .parameters[MIGRATION_PARAMETER_DECOMPRESS_THREADS] = + DEFAULT_MIGRATE_DECOMPRESS_THREAD_COUNT, }; return ¤t_migration; @@ -106,6 +120,7 @@ static void process_incoming_migration_co(void *opaque) free_xbzrle_decoded_buf(); if (ret < 0) { error_report("load of migration failed: %s", strerror(-ret)); + migrate_decompress_threads_join(); exit(EXIT_FAILURE); } qemu_announce_self(); @@ -114,6 +129,7 @@ static void process_incoming_migration_co(void *opaque) bdrv_invalidate_cache_all(&local_err); if (local_err) { error_report_err(local_err); + migrate_decompress_threads_join(); exit(EXIT_FAILURE); } @@ -122,6 +138,7 @@ static void process_incoming_migration_co(void *opaque) } else { runstate_set(RUN_STATE_PAUSED); } + migrate_decompress_threads_join(); } void process_incoming_migration(QEMUFile *f) @@ -130,6 +147,7 @@ void process_incoming_migration(QEMUFile *f) int fd = qemu_get_fd(f); assert(fd != -1); + migrate_decompress_threads_create(); qemu_set_nonblock(fd); qemu_coroutine_enter(co, f); } @@ -170,6 +188,21 @@ MigrationCapabilityStatusList *qmp_query_migrate_capabilities(Error **errp) return head; } +MigrationParameters *qmp_query_migrate_parameters(Error **errp) +{ + MigrationParameters *params; + MigrationState *s = migrate_get_current(); + + params = g_malloc0(sizeof(*params)); + params->compress_level = s->parameters[MIGRATION_PARAMETER_COMPRESS_LEVEL]; + params->compress_threads = + s->parameters[MIGRATION_PARAMETER_COMPRESS_THREADS]; + params->decompress_threads = + s->parameters[MIGRATION_PARAMETER_DECOMPRESS_THREADS]; + + return params; +} + static void get_xbzrle_cache_stats(MigrationInfo *info) { if (migrate_use_xbzrle()) { @@ -283,6 +316,47 @@ void qmp_migrate_set_capabilities(MigrationCapabilityStatusList *params, } } +void qmp_migrate_set_parameters(bool has_compress_level, + int64_t compress_level, + bool has_compress_threads, + int64_t compress_threads, + bool has_decompress_threads, + int64_t decompress_threads, Error **errp) +{ + MigrationState *s = migrate_get_current(); + + if (has_compress_level && (compress_level < 0 || compress_level > 9)) { + error_set(errp, QERR_INVALID_PARAMETER_VALUE, "compress_level", + "is invalid, it should be in the range of 0 to 9"); + return; + } + if (has_compress_threads && + (compress_threads < 1 || compress_threads > 255)) { + error_set(errp, QERR_INVALID_PARAMETER_VALUE, + "compress_threads", + "is invalid, it should be in the range of 1 to 255"); + return; + } + if (has_decompress_threads && + (decompress_threads < 1 || decompress_threads > 255)) { + error_set(errp, QERR_INVALID_PARAMETER_VALUE, + "decompress_threads", + "is invalid, it should be in the range of 1 to 255"); + return; + } + + if (has_compress_level) { + s->parameters[MIGRATION_PARAMETER_COMPRESS_LEVEL] = compress_level; + } + if (has_compress_threads) { + s->parameters[MIGRATION_PARAMETER_COMPRESS_THREADS] = compress_threads; + } + if (has_decompress_threads) { + s->parameters[MIGRATION_PARAMETER_DECOMPRESS_THREADS] = + decompress_threads; + } +} + /* shared migration helpers */ static void migrate_set_state(MigrationState *s, int old_state, int new_state) @@ -305,6 +379,7 @@ static void migrate_fd_cleanup(void *opaque) qemu_thread_join(&s->thread); qemu_mutex_lock_iothread(); + migrate_compress_threads_join(); qemu_fclose(s->file); s->file = NULL; } @@ -390,6 +465,11 @@ static MigrationState *migrate_init(const MigrationParams *params) int64_t bandwidth_limit = s->bandwidth_limit; bool enabled_capabilities[MIGRATION_CAPABILITY_MAX]; int64_t xbzrle_cache_size = s->xbzrle_cache_size; + int compress_level = s->parameters[MIGRATION_PARAMETER_COMPRESS_LEVEL]; + int compress_thread_count = + s->parameters[MIGRATION_PARAMETER_COMPRESS_THREADS]; + int decompress_thread_count = + s->parameters[MIGRATION_PARAMETER_DECOMPRESS_THREADS]; memcpy(enabled_capabilities, s->enabled_capabilities, sizeof(enabled_capabilities)); @@ -400,6 +480,11 @@ static MigrationState *migrate_init(const MigrationParams *params) sizeof(enabled_capabilities)); s->xbzrle_cache_size = xbzrle_cache_size; + s->parameters[MIGRATION_PARAMETER_COMPRESS_LEVEL] = compress_level; + s->parameters[MIGRATION_PARAMETER_COMPRESS_THREADS] = + compress_thread_count; + s->parameters[MIGRATION_PARAMETER_DECOMPRESS_THREADS] = + decompress_thread_count; s->bandwidth_limit = bandwidth_limit; s->state = MIGRATION_STATUS_SETUP; trace_migrate_set_state(MIGRATION_STATUS_SETUP); @@ -587,6 +672,42 @@ bool migrate_zero_blocks(void) return s->enabled_capabilities[MIGRATION_CAPABILITY_ZERO_BLOCKS]; } +bool migrate_use_compression(void) +{ + MigrationState *s; + + s = migrate_get_current(); + + return s->enabled_capabilities[MIGRATION_CAPABILITY_COMPRESS]; +} + +int migrate_compress_level(void) +{ + MigrationState *s; + + s = migrate_get_current(); + + return s->parameters[MIGRATION_PARAMETER_COMPRESS_LEVEL]; +} + +int migrate_compress_threads(void) +{ + MigrationState *s; + + s = migrate_get_current(); + + return s->parameters[MIGRATION_PARAMETER_COMPRESS_THREADS]; +} + +int migrate_decompress_threads(void) +{ + MigrationState *s; + + s = migrate_get_current(); + + return s->parameters[MIGRATION_PARAMETER_DECOMPRESS_THREADS]; +} + int migrate_use_xbzrle(void) { MigrationState *s; @@ -730,6 +851,7 @@ void migrate_fd_connect(MigrationState *s) /* Notify before starting migration thread */ notifier_list_notify(&migration_state_notifiers, s); + migrate_compress_threads_create(); qemu_thread_create(&s->thread, "migration", migration_thread, s, QEMU_THREAD_JOINABLE); } diff --git a/migration/qemu-file.c b/migration/qemu-file.c index 1a4f9868ed..2750365a7e 100644 --- a/migration/qemu-file.c +++ b/migration/qemu-file.c @@ -21,6 +21,7 @@ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN * THE SOFTWARE. */ +#include #include "qemu-common.h" #include "qemu/iov.h" #include "qemu/sockets.h" @@ -546,3 +547,41 @@ uint64_t qemu_get_be64(QEMUFile *f) v |= qemu_get_be32(f); return v; } + +/* compress size bytes of data start at p with specific compression + * level and store the compressed data to the buffer of f. + */ + +ssize_t qemu_put_compression_data(QEMUFile *f, const uint8_t *p, size_t size, + int level) +{ + ssize_t blen = IO_BUF_SIZE - f->buf_index - sizeof(int32_t); + + if (blen < compressBound(size)) { + return 0; + } + if (compress2(f->buf + f->buf_index + sizeof(int32_t), (uLongf *)&blen, + (Bytef *)p, size, level) != Z_OK) { + error_report("Compress Failed!"); + return 0; + } + qemu_put_be32(f, blen); + f->buf_index += blen; + return blen + sizeof(int32_t); +} + +/* Put the data in the buffer of f_src to the buffer of f_des, and + * then reset the buf_index of f_src to 0. + */ + +int qemu_put_qemu_file(QEMUFile *f_des, QEMUFile *f_src) +{ + int len = 0; + + if (f_src->buf_index > 0) { + len = f_src->buf_index; + qemu_put_buffer(f_des, f_src->buf, f_src->buf_index); + f_src->buf_index = 0; + } + return len; +} diff --git a/monitor.c b/monitor.c index d831d984d8..3952d646cd 100644 --- a/monitor.c +++ b/monitor.c @@ -2858,6 +2858,13 @@ static mon_cmd_t info_cmds[] = { .help = "show current migration capabilities", .mhandler.cmd = hmp_info_migrate_capabilities, }, + { + .name = "migrate_parameters", + .args_type = "", + .params = "", + .help = "show current migration parameters", + .mhandler.cmd = hmp_info_migrate_parameters, + }, { .name = "migrate_cache_size", .args_type = "", @@ -4540,6 +4547,24 @@ void migrate_set_capability_completion(ReadLineState *rs, int nb_args, } } +void migrate_set_parameter_completion(ReadLineState *rs, int nb_args, + const char *str) +{ + size_t len; + + len = strlen(str); + readline_set_completion_index(rs, len); + if (nb_args == 2) { + int i; + for (i = 0; i < MIGRATION_PARAMETER_MAX; i++) { + const char *name = MigrationParameter_lookup[i]; + if (!strncmp(str, name, len)) { + readline_add_completion(rs, name); + } + } + } +} + void host_net_add_completion(ReadLineState *rs, int nb_args, const char *str) { int i; diff --git a/qapi-schema.json b/qapi-schema.json index 27ec9882db..9c92482898 100644 --- a/qapi-schema.json +++ b/qapi-schema.json @@ -515,13 +515,22 @@ # to enable the capability on the source VM. The feature is disabled by # default. (since 1.6) # +# @compress: Use multiple compression threads to accelerate live migration. +# This feature can help to reduce the migration traffic, by sending +# compressed pages. Please note that if compress and xbzrle are both +# on, compress only takes effect in the ram bulk stage, after that, +# it will be disabled and only xbzrle takes effect, this can help to +# minimize migration traffic. The feature is disabled by default. +# (since 2.4 ) +# # @auto-converge: If enabled, QEMU will automatically throttle down the guest # to speed up convergence of RAM migration. (since 1.6) # # Since: 1.2 ## { 'enum': 'MigrationCapability', - 'data': ['xbzrle', 'rdma-pin-all', 'auto-converge', 'zero-blocks'] } + 'data': ['xbzrle', 'rdma-pin-all', 'auto-converge', 'zero-blocks', + 'compress'] } ## # @MigrationCapabilityStatus @@ -560,6 +569,74 @@ ## { 'command': 'query-migrate-capabilities', 'returns': ['MigrationCapabilityStatus']} +# @MigrationParameter +# +# Migration parameters enumeration +# +# @compress-level: Set the compression level to be used in live migration, +# the compression level is an integer between 0 and 9, where 0 means +# no compression, 1 means the best compression speed, and 9 means best +# compression ratio which will consume more CPU. +# +# @compress-threads: Set compression thread count to be used in live migration, +# the compression thread count is an integer between 1 and 255. +# +# @decompress-threads: Set decompression thread count to be used in live +# migration, the decompression thread count is an integer between 1 +# and 255. Usually, decompression is at least 4 times as fast as +# compression, so set the decompress-threads to the number about 1/4 +# of compress-threads is adequate. +# +# Since: 2.4 +## +{ 'enum': 'MigrationParameter', + 'data': ['compress-level', 'compress-threads', 'decompress-threads'] } + +# +# @migrate-set-parameters +# +# Set the following migration parameters +# +# @compress-level: compression level +# +# @compress-threads: compression thread count +# +# @decompress-threads: decompression thread count +# +# Since: 2.4 +## +{ 'command': 'migrate-set-parameters', + 'data': { '*compress-level': 'int', + '*compress-threads': 'int', + '*decompress-threads': 'int'} } + +# +# @MigrationParameters +# +# @compress-level: compression level +# +# @compress-threads: compression thread count +# +# @decompress-threads: decompression thread count +# +# Since: 2.4 +## +{ 'struct': 'MigrationParameters', + 'data': { 'compress-level': 'int', + 'compress-threads': 'int', + 'decompress-threads': 'int'} } +## +# @query-migrate-parameters +# +# Returns information about the current migration parameters +# +# Returns: @MigrationParameters +# +# Since: 2.4 +## +{ 'command': 'query-migrate-parameters', + 'returns': 'MigrationParameters' } + ## # @MouseInfo: # diff --git a/qmp-commands.hx b/qmp-commands.hx index d4a837c89b..7506774afb 100644 --- a/qmp-commands.hx +++ b/qmp-commands.hx @@ -3442,6 +3442,63 @@ EQMP .mhandler.cmd_new = qmp_marshal_input_query_migrate_capabilities, }, +SQMP +migrate-set-parameters +---------------------- + +Set migration parameters + +- "compress-level": set compression level during migration (json-int) +- "compress-threads": set compression thread count for migration (json-int) +- "decompress-threads": set decompression thread count for migration (json-int) + +Arguments: + +Example: + +-> { "execute": "migrate-set-parameters" , "arguments": + { "compress-level": 1 } } + +EQMP + + { + .name = "migrate-set-parameters", + .args_type = + "compress-level:i?,compress-threads:i?,decompress-threads:i?", + .mhandler.cmd_new = qmp_marshal_input_migrate_set_parameters, + }, +SQMP +query-migrate-parameters +------------------------ + +Query current migration parameters + +- "parameters": migration parameters value + - "compress-level" : compression level value (json-int) + - "compress-threads" : compression thread count value (json-int) + - "decompress-threads" : decompression thread count value (json-int) + +Arguments: + +Example: + +-> { "execute": "query-migrate-parameters" } +<- { + "return": { + "decompress-threads", 2, + "compress-threads", 8, + "compress-level", 1 + } + } + +EQMP + + { + .name = "query-migrate-parameters", + .args_type = "", + .mhandler.cmd_new = qmp_marshal_input_query_migrate_parameters, + }, + SQMP query-balloon -------------