migration: Add the core code for decompression

Implement the core logic of multiple thread decompression,
the decompression can work now.

Signed-off-by: Liang Li <liang.z.li@intel.com>
Signed-off-by: Yang Zhang <yang.z.zhang@intel.com>
Signed-off-by: Juan Quintela <quintela@redhat.com>
This commit is contained in:
Liang Li 2015-03-23 16:32:25 +08:00 committed by Juan Quintela
parent 98f1138902
commit 68ae113646
1 changed files with 48 additions and 2 deletions

View File

@ -887,6 +887,13 @@ static inline void start_compression(CompressParam *param)
qemu_mutex_unlock(&param->mutex); qemu_mutex_unlock(&param->mutex);
} }
static inline void start_decompression(DecompressParam *param)
{
qemu_mutex_lock(&param->mutex);
param->start = true;
qemu_cond_signal(&param->cond);
qemu_mutex_unlock(&param->mutex);
}
static uint64_t bytes_transferred; static uint64_t bytes_transferred;
@ -1460,8 +1467,26 @@ void ram_handle_compressed(void *host, uint8_t ch, uint64_t size)
static void *do_data_decompress(void *opaque) static void *do_data_decompress(void *opaque)
{ {
DecompressParam *param = opaque;
unsigned long pagesize;
while (!quit_decomp_thread) { while (!quit_decomp_thread) {
/* To be done */ qemu_mutex_lock(&param->mutex);
while (!param->start && !quit_decomp_thread) {
qemu_cond_wait(&param->cond, &param->mutex);
pagesize = TARGET_PAGE_SIZE;
if (!quit_decomp_thread) {
/* uncompress() will return failed in some case, especially
* when the page is dirted when doing the compression, it's
* not a problem because the dirty page will be retransferred
* and uncompress() won't break the data in other pages.
*/
uncompress((Bytef *)param->des, &pagesize,
(const Bytef *)param->compbuf, param->len);
}
param->start = false;
}
qemu_mutex_unlock(&param->mutex);
} }
return NULL; return NULL;
@ -1492,6 +1517,11 @@ void migrate_decompress_threads_join(void)
quit_decomp_thread = true; quit_decomp_thread = true;
thread_count = migrate_decompress_threads(); thread_count = migrate_decompress_threads();
for (i = 0; i < thread_count; i++) {
qemu_mutex_lock(&decomp_param[i].mutex);
qemu_cond_signal(&decomp_param[i].cond);
qemu_mutex_unlock(&decomp_param[i].mutex);
}
for (i = 0; i < thread_count; i++) { for (i = 0; i < thread_count; i++) {
qemu_thread_join(decompress_threads + i); qemu_thread_join(decompress_threads + i);
qemu_mutex_destroy(&decomp_param[i].mutex); qemu_mutex_destroy(&decomp_param[i].mutex);
@ -1509,7 +1539,23 @@ void migrate_decompress_threads_join(void)
static void decompress_data_with_multi_threads(uint8_t *compbuf, static void decompress_data_with_multi_threads(uint8_t *compbuf,
void *host, int len) void *host, int len)
{ {
/* To be done */ int idx, thread_count;
thread_count = migrate_decompress_threads();
while (true) {
for (idx = 0; idx < thread_count; idx++) {
if (!decomp_param[idx].start) {
memcpy(decomp_param[idx].compbuf, compbuf, len);
decomp_param[idx].des = host;
decomp_param[idx].len = len;
start_decompression(&decomp_param[idx]);
break;
}
}
if (idx < thread_count) {
break;
}
}
} }
static int ram_load(QEMUFile *f, void *opaque, int version_id) static int ram_load(QEMUFile *f, void *opaque, int version_id)