675 lines
28 KiB
C++
675 lines
28 KiB
C++
#include "GlobalDataAdaptor.h"
|
|
#include "ReadCacheClient.h"
|
|
#include "ReplicationWriteCacheClient.h"
|
|
#include "ErasureCodingWriteCacheClient.h"
|
|
|
|
using HybridCache::ByteBuffer;
|
|
|
|
#define CONFIG_GC_ON_EXCEEDING_DISKSPACE
|
|
|
|
DEFINE_uint32(bg_execution_period, 10, "Background execution period in seconds");
|
|
|
|
GlobalDataAdaptor::GlobalDataAdaptor(std::shared_ptr<DataAdaptor> base_adaptor,
|
|
const std::vector<std::string> &server_list,
|
|
std::shared_ptr<EtcdClient> etcd_client,
|
|
std::shared_ptr<folly::CPUThreadPoolExecutor> executor)
|
|
: base_adaptor_(base_adaptor),
|
|
executor_(executor),
|
|
server_list_(server_list),
|
|
etcd_client_(etcd_client),
|
|
meta_cache_(GetGlobalConfig().meta_cache_max_size, GetGlobalConfig().meta_cache_clear_size) {
|
|
if (!executor_) {
|
|
executor_ = std::make_shared<folly::CPUThreadPoolExecutor>(GetGlobalConfig().folly_threads);
|
|
}
|
|
|
|
read_cache_ = std::make_shared<ReadCacheClient>(this);
|
|
write_caches_[WC_TYPE_REPLICATION] = std::make_shared<ReplicationWriteCacheClient>(this);
|
|
write_caches_[WC_TYPE_REEDSOLOMON] = std::make_shared<ErasureCodingWriteCacheClient>(this);
|
|
|
|
for (int conn_id = 0; conn_id < GetGlobalConfig().rpc_connections; conn_id++) {
|
|
auto client = std::make_shared<GlobalCacheClient>(std::to_string(conn_id));
|
|
int server_id = 0;
|
|
for (auto &entry: server_list_) {
|
|
if (client->RegisterServer(server_id, entry.c_str())) {
|
|
// TODO 周期性尝试重连
|
|
LOG(WARNING) << "Failed to connect with server id: " << server_id
|
|
<< ", address: " << entry;
|
|
bg_mutex_.lock();
|
|
bg_tasks_.push_back([client,server_id, entry]() -> int {
|
|
return client->RegisterServer(server_id, entry.c_str());
|
|
});
|
|
bg_mutex_.unlock();
|
|
}
|
|
server_id++;
|
|
}
|
|
rpc_client_.push_back(client);
|
|
}
|
|
srand48(time(nullptr));
|
|
bg_running_ = true;
|
|
bg_thread_ = std::thread(std::bind(&GlobalDataAdaptor::BackgroundWorker, this));
|
|
}
|
|
|
|
GlobalDataAdaptor::~GlobalDataAdaptor() {
|
|
bg_running_ = false;
|
|
bg_cv_.notify_all();
|
|
bg_thread_.join();
|
|
}
|
|
|
|
void GlobalDataAdaptor::BackgroundWorker() {
|
|
while (bg_running_) {
|
|
std::unique_lock<std::mutex> lock(bg_mutex_);
|
|
std::vector<std::function<int()>> bg_tasks_next;
|
|
for (auto &entry : bg_tasks_) {
|
|
if (entry()) {
|
|
bg_tasks_next.push_back(entry);
|
|
}
|
|
}
|
|
bg_tasks_ = bg_tasks_next;
|
|
bg_cv_.wait_for(lock, std::chrono::seconds(FLAGS_bg_execution_period));
|
|
}
|
|
}
|
|
|
|
struct DownloadArgs {
|
|
DownloadArgs(const std::string &key, size_t start, size_t size, ByteBuffer &buffer)
|
|
: key(key), start(start), size(size), buffer(buffer) {}
|
|
|
|
std::string key;
|
|
size_t start;
|
|
size_t size;
|
|
ByteBuffer &buffer;
|
|
};
|
|
|
|
folly::Future<int> GlobalDataAdaptor::DownLoad(const std::string &key,
|
|
size_t start,
|
|
size_t size,
|
|
ByteBuffer &buffer) {
|
|
return DownLoadFromGlobalCache(key, start, size, buffer).then(
|
|
[this, key, start, size, &buffer](folly::Try<int> &&output) -> folly::Future<int> {
|
|
if (output.value_or(FOLLY_ERROR) == RPC_FAILED) {
|
|
return base_adaptor_->DownLoad(key, start, size, buffer);
|
|
}
|
|
return output.value_or(FOLLY_ERROR);
|
|
});
|
|
}
|
|
|
|
folly::Future<int> GlobalDataAdaptor::DownLoadFromGlobalCache(const std::string &key,
|
|
size_t start,
|
|
size_t size,
|
|
ByteBuffer &buffer) {
|
|
auto &policy = GetCachePolicy(key);
|
|
auto meta_cache_entry = GetMetaCacheEntry(key);
|
|
if (meta_cache_entry->present) {
|
|
if (!meta_cache_entry->existed) {
|
|
LOG(ERROR) << "Request for potential deleted file: " << key;
|
|
return folly::makeFuture(NOT_FOUND);
|
|
}
|
|
if (start + size > meta_cache_entry->size) {
|
|
LOG(ERROR) << "Request out of file range, key: " << key
|
|
<< ", start: " << start
|
|
<< ", size: " << size
|
|
<< ", file length: " << meta_cache_entry->size;
|
|
return folly::makeFuture(END_OF_FILE);
|
|
}
|
|
}
|
|
|
|
if (policy.write_cache_type != NOCACHE) {
|
|
auto args = std::make_shared<DownloadArgs>(key, start, size, buffer);
|
|
if (meta_cache_entry->present) {
|
|
if (meta_cache_entry->write_cached) {
|
|
auto &root = meta_cache_entry->root;
|
|
if (root["type"] == "replication") {
|
|
return write_caches_[WC_TYPE_REPLICATION]->Get(args->key, args->start, args->size, args->buffer, root);
|
|
} else if (root["type"] == "reed-solomon") {
|
|
return write_caches_[WC_TYPE_REEDSOLOMON]->Get(args->key, args->start, args->size, args->buffer, root);
|
|
}
|
|
LOG(ERROR) << "Failed to download data, reason: unsuppported type, key: " << args->key
|
|
<< ", start: " << args->start
|
|
<< ", size: " << args->size
|
|
<< ", type: " << root["type"];
|
|
return folly::makeFuture(UNSUPPORTED_TYPE);
|
|
} else {
|
|
return read_cache_->Get(key, start, size, buffer);
|
|
}
|
|
} else {
|
|
return etcd_client_->GetJson(key).then(
|
|
[this, args, meta_cache_entry](folly::Try<EtcdClient::GetResult> &&output) -> folly::Future<int> {
|
|
if (!output.hasValue()) { // 当 GetJson 函数抛出异常时执行这部分代码
|
|
LOG(ERROR) << "Failed to download data, reason: internal error, key: " << args->key
|
|
<< ", start: " << args->start
|
|
<< ", size: " << args->size;
|
|
return folly::makeFuture(FOLLY_ERROR);
|
|
}
|
|
|
|
auto &status = output.value().status;
|
|
if (status == NOT_FOUND) {
|
|
if (GetGlobalConfig().use_meta_cache) {
|
|
return base_adaptor_->Head(args->key, meta_cache_entry->size, meta_cache_entry->headers).then(
|
|
[this, meta_cache_entry, args](folly::Try<int> &&output) -> folly::Future<int> {
|
|
int res = output.value_or(FOLLY_ERROR);
|
|
if (res == OK || res == NOT_FOUND) {
|
|
meta_cache_entry->present = true;
|
|
meta_cache_entry->existed = (res == OK);
|
|
meta_cache_entry->write_cached = false;
|
|
}
|
|
if (res == OK) {
|
|
return read_cache_->Get(args->key, args->start, args->size, args->buffer);
|
|
}
|
|
return res;
|
|
});
|
|
} else {
|
|
return read_cache_->Get(args->key, args->start, args->size, args->buffer);
|
|
}
|
|
} else if (status != OK) {
|
|
return folly::makeFuture(status);
|
|
}
|
|
|
|
auto &root = output.value().root;
|
|
if (GetGlobalConfig().use_meta_cache) {
|
|
meta_cache_entry->present = true;
|
|
meta_cache_entry->existed = true;
|
|
meta_cache_entry->write_cached = true;
|
|
meta_cache_entry->size = root["size"].asInt64();
|
|
for (auto iter = root["headers"].begin(); iter != root["headers"].end(); iter++) {
|
|
meta_cache_entry->headers[iter.key().asString()] = (*iter).asString();
|
|
}
|
|
meta_cache_entry->root = root;
|
|
}
|
|
|
|
if (root["type"] == "replication") {
|
|
return write_caches_[WC_TYPE_REPLICATION]->Get(args->key, args->start, args->size, args->buffer, root);
|
|
} else if (root["type"] == "reed-solomon") {
|
|
return write_caches_[WC_TYPE_REEDSOLOMON]->Get(args->key, args->start, args->size, args->buffer, root);
|
|
}
|
|
|
|
LOG(ERROR) << "Failed to download data, reason: unsuppported type, key: " << args->key
|
|
<< ", start: " << args->start
|
|
<< ", size: " << args->size
|
|
<< ", type: " << root["type"];
|
|
|
|
return folly::makeFuture(UNSUPPORTED_TYPE);
|
|
});
|
|
}
|
|
} else {
|
|
return read_cache_->Get(key, start, size, buffer);
|
|
}
|
|
}
|
|
|
|
folly::Future<int> GlobalDataAdaptor::UpLoad(const std::string &key,
|
|
size_t size,
|
|
const ByteBuffer &buffer,
|
|
const std::map <std::string, std::string> &headers) {
|
|
#ifdef CONFIG_GC_ON_EXCEEDING_DISKSPACE
|
|
return DoUpLoad(key, size, buffer, headers).thenValue([this, key, size, &buffer, &headers](int &&res) -> int {
|
|
if (res != NO_ENOUGH_DISKSPACE) {
|
|
return res;
|
|
}
|
|
LOG(INFO) << "Disk limit exceeded - perform GC immediately";
|
|
res = PerformGarbageCollection();
|
|
if (res) {
|
|
LOG(WARNING) << "GC failed";
|
|
return res;
|
|
}
|
|
LOG(INFO) << "Disk limit exceeded - GC completed, now retry";
|
|
return DoUpLoad(key, size, buffer, headers).get();
|
|
});
|
|
#else
|
|
return DoUpLoad(key, size, buffer, headers);
|
|
#endif
|
|
}
|
|
|
|
folly::Future<int> GlobalDataAdaptor::DoUpLoad(const std::string &key,
|
|
size_t size,
|
|
const ByteBuffer &buffer,
|
|
const std::map <std::string, std::string> &headers) {
|
|
butil::Timer *t = new butil::Timer();
|
|
t->start();
|
|
auto &policy = GetCachePolicy(key);
|
|
auto meta_cache_entry = GetMetaCacheEntry(key);
|
|
meta_cache_entry->present = false;
|
|
meta_cache_entry->existed = true;
|
|
meta_cache_entry->size = size;
|
|
meta_cache_entry->headers = headers;
|
|
auto pre_op = read_cache_->Invalidate(key, size);
|
|
if (policy.write_cache_type == REPLICATION || policy.write_cache_type == REED_SOLOMON) {
|
|
auto write_cache = policy.write_cache_type == REPLICATION
|
|
? write_caches_[WC_TYPE_REPLICATION]
|
|
: write_caches_[WC_TYPE_REEDSOLOMON];
|
|
return std::move(pre_op)
|
|
.then(std::bind(&WriteCacheClient::Put, write_cache.get(), key, size, buffer, headers, 0))
|
|
.then([this, key, meta_cache_entry, t] (folly::Try<WriteCacheClient::PutResult> output) -> folly::Future<int> {
|
|
int status = output.hasValue() ? output.value().status : FOLLY_ERROR;
|
|
if (status == OK) {
|
|
status = etcd_client_->PutJson(key, output.value().root).get();
|
|
if (status == OK && GetGlobalConfig().use_meta_cache) {
|
|
meta_cache_entry->root = output.value().root;
|
|
meta_cache_entry->write_cached = true;
|
|
meta_cache_entry->present = true;
|
|
}
|
|
t->stop();
|
|
LOG(INFO) << "JSON: " << t->u_elapsed();
|
|
delete t;
|
|
}
|
|
return folly::makeFuture(status);
|
|
});
|
|
} else if (policy.write_cache_type == NOCACHE) {
|
|
return std::move(pre_op)
|
|
.then(std::bind(&DataAdaptor::UpLoad, base_adaptor_.get(), key, size, buffer, headers))
|
|
.thenValue([meta_cache_entry](int &&res) -> int {
|
|
if (res == OK && GetGlobalConfig().use_meta_cache) {
|
|
meta_cache_entry->write_cached = false;
|
|
meta_cache_entry->present = true;
|
|
}
|
|
return res;
|
|
});
|
|
} else {
|
|
LOG(ERROR) << "Failed to upload data, reason: unsuppported type, key: " << key
|
|
<< ", size: " << size
|
|
<< ", type: " << policy.write_cache_type;
|
|
return folly::makeFuture(UNSUPPORTED_TYPE);
|
|
}
|
|
}
|
|
|
|
folly::Future<int> GlobalDataAdaptor::Delete(const std::string &key) {
|
|
auto &policy = GetCachePolicy(key);
|
|
if (policy.write_cache_type == NOCACHE) {
|
|
InvalidateMetaCacheEntry(key);
|
|
return base_adaptor_->Delete(key);
|
|
} else {
|
|
auto meta_cache_entry = GetMetaCacheEntry(key);
|
|
auto size = meta_cache_entry->size;
|
|
bool present = meta_cache_entry->present;
|
|
bool has_write_cache = false;
|
|
|
|
if (!present) {
|
|
auto result = etcd_client_->GetJson(key).get();
|
|
if (result.status == OK) {
|
|
size = result.root["size"].asInt64();
|
|
has_write_cache = true;
|
|
} else if (result.status == NOT_FOUND) { // 只在 S3 里存储
|
|
std::map<std::string, std::string> headers;
|
|
int ret = base_adaptor_->Head(key, size, headers).get();
|
|
if (ret) return ret;
|
|
} else {
|
|
return folly::makeFuture(result.status);
|
|
}
|
|
}
|
|
|
|
InvalidateMetaCacheEntry(key);
|
|
|
|
if (has_write_cache) {
|
|
return base_adaptor_->Delete(key)
|
|
.then(std::bind(&ReadCacheClient::Invalidate, read_cache_.get(), key, size))
|
|
.then(std::bind(&EtcdClient::DeleteJson, etcd_client_.get(), key));
|
|
} else {
|
|
return base_adaptor_->Delete(key)
|
|
.then(std::bind(&ReadCacheClient::Invalidate, read_cache_.get(), key, size));
|
|
}
|
|
}
|
|
}
|
|
|
|
struct DeepFlushArgs {
|
|
DeepFlushArgs(const std::string &key) : key(key) {}
|
|
~DeepFlushArgs() { if (buffer.data) delete []buffer.data; }
|
|
|
|
std::string key;
|
|
std::map <std::string, std::string> headers;
|
|
ByteBuffer buffer;
|
|
};
|
|
|
|
folly::Future<int> GlobalDataAdaptor::DeepFlush(const std::string &key) {
|
|
butil::Timer *t = new butil::Timer();
|
|
t->start();
|
|
auto &policy = GetCachePolicy(key);
|
|
if (policy.write_cache_type == REPLICATION || policy.write_cache_type == REED_SOLOMON) {
|
|
auto args = std::make_shared<DeepFlushArgs>(key);
|
|
return etcd_client_->GetJson(key).then([this, t, args](folly::Try<EtcdClient::GetResult> &&output) -> folly::Future<int> {
|
|
if (!output.hasValue()) {
|
|
return folly::makeFuture(FOLLY_ERROR);
|
|
}
|
|
if (output.value().status != OK) {
|
|
return folly::makeFuture(output.value().status);
|
|
}
|
|
auto &root = output.value().root;
|
|
args->buffer.len = root["size"].asInt64();
|
|
args->buffer.data = new char[args->buffer.len];
|
|
for (auto iter = root["headers"].begin(); iter != root["headers"].end(); iter++) {
|
|
args->headers[iter.key().asString()] = (*iter).asString();
|
|
}
|
|
t->stop();
|
|
LOG(INFO) << "DeepFlush phase 1: " << t->u_elapsed();
|
|
|
|
return DownLoad(args->key, 0, args->buffer.len, args->buffer);
|
|
}).then([this, t, args](folly::Try<int> &&output) -> folly::Future<int> {
|
|
int res = output.value_or(FOLLY_ERROR);
|
|
t->stop();
|
|
LOG(INFO) << "DeepFlush phase 2: " << t->u_elapsed();
|
|
if (res != OK) {
|
|
return folly::makeFuture(res);
|
|
} else {
|
|
return base_adaptor_->UpLoad(args->key, args->buffer.len, args->buffer, args->headers);
|
|
}
|
|
}).then([this, t, key, args](folly::Try<int> &&output) -> folly::Future<int> {
|
|
t->stop();
|
|
LOG(INFO) << "DeepFlush phase 3: " << t->u_elapsed();
|
|
int res = output.value_or(FOLLY_ERROR);
|
|
if (res != OK) {
|
|
return folly::makeFuture(res);
|
|
} else {
|
|
InvalidateMetaCacheEntry(key);
|
|
return etcd_client_->DeleteJson(key);
|
|
}
|
|
});
|
|
} else {
|
|
t->stop();
|
|
LOG(INFO) << "DeepFlush phase 4: " << t->u_elapsed();
|
|
return folly::makeFuture(OK);
|
|
}
|
|
}
|
|
|
|
struct HeadArgs {
|
|
HeadArgs(const std::string &key, size_t &size, std::map <std::string, std::string> &headers)
|
|
: key(key), size(size), headers(headers) {}
|
|
|
|
std::string key;
|
|
size_t &size;
|
|
std::map <std::string, std::string> &headers;
|
|
};
|
|
|
|
folly::Future<int> GlobalDataAdaptor::Head(const std::string &key,
|
|
size_t &size,
|
|
std::map <std::string, std::string> &headers) {
|
|
auto &policy = GetCachePolicy(key);
|
|
auto meta_cache_entry = GetMetaCacheEntry(key);
|
|
if (meta_cache_entry->present) {
|
|
if (!meta_cache_entry->existed) {
|
|
LOG(ERROR) << "Request for potential deleted file: " << key;
|
|
return folly::makeFuture(NOT_FOUND);
|
|
}
|
|
size = meta_cache_entry->size;
|
|
headers = meta_cache_entry->headers;
|
|
return folly::makeFuture(OK);
|
|
}
|
|
|
|
if (policy.write_cache_type == REPLICATION || policy.write_cache_type == REED_SOLOMON) {
|
|
auto args = std::make_shared<HeadArgs>(key, size, headers);
|
|
return etcd_client_->GetJson(key).then([this, args, meta_cache_entry](folly::Try<EtcdClient::GetResult> &&output) -> folly::Future<int> {
|
|
if (!output.hasValue()) {
|
|
return folly::makeFuture(FOLLY_ERROR);
|
|
}
|
|
if (output.value().status != OK) {
|
|
return folly::makeFuture(output.value().status);
|
|
}
|
|
auto &root = output.value().root;
|
|
args->size = root["size"].asInt64();
|
|
for (auto iter = root["headers"].begin(); iter != root["headers"].end(); iter++) {
|
|
args->headers[iter.key().asString()] = (*iter).asString();
|
|
}
|
|
|
|
if (GetGlobalConfig().use_meta_cache) {
|
|
meta_cache_entry->present = true;
|
|
meta_cache_entry->existed = true;
|
|
meta_cache_entry->write_cached = true;
|
|
meta_cache_entry->size = args->size;
|
|
meta_cache_entry->headers = args->headers;
|
|
meta_cache_entry->root = output.value().root;
|
|
}
|
|
|
|
return folly::makeFuture(OK);
|
|
}).then([this, args, meta_cache_entry](folly::Try<int> &&output) -> folly::Future<int> {
|
|
int res = output.value_or(FOLLY_ERROR);
|
|
if (res != NOT_FOUND) {
|
|
return folly::makeFuture(res);
|
|
} else {
|
|
return base_adaptor_->Head(args->key, args->size, args->headers).thenValue([args, meta_cache_entry](int &&res) -> int {
|
|
if (GetGlobalConfig().use_meta_cache && (res == OK || res == NOT_FOUND)) {
|
|
meta_cache_entry->present = true;
|
|
meta_cache_entry->existed = (res == OK);
|
|
meta_cache_entry->write_cached = false;
|
|
meta_cache_entry->size = args->size;
|
|
meta_cache_entry->headers = args->headers;
|
|
}
|
|
return res;
|
|
});
|
|
}
|
|
});
|
|
} else {
|
|
return base_adaptor_->Head(key, size, headers).thenValue([meta_cache_entry, &size, &headers](int &&res) -> int {
|
|
if (GetGlobalConfig().use_meta_cache && (res == OK || res == NOT_FOUND)) {
|
|
meta_cache_entry->present = true;
|
|
meta_cache_entry->existed = (res == OK);
|
|
meta_cache_entry->write_cached = false;
|
|
meta_cache_entry->size = size;
|
|
meta_cache_entry->headers = headers;
|
|
}
|
|
return res;
|
|
});
|
|
}
|
|
}
|
|
|
|
void GlobalDataAdaptor::InvalidateMetaCache() {
|
|
std::lock_guard<std::mutex> lock(meta_cache_mutex_);
|
|
meta_cache_.clear();
|
|
}
|
|
|
|
void GlobalDataAdaptor::InvalidateMetaCacheEntry(const std::string &key) {
|
|
std::lock_guard<std::mutex> lock(meta_cache_mutex_);
|
|
meta_cache_.erase(key);
|
|
}
|
|
|
|
std::shared_ptr<GlobalDataAdaptor::MetaCacheEntry> GlobalDataAdaptor::GetMetaCacheEntry(const std::string &key) {
|
|
std::lock_guard<std::mutex> lock(meta_cache_mutex_);
|
|
auto iter = meta_cache_.find(key);
|
|
if (iter == meta_cache_.end()) {
|
|
auto entry = std::make_shared<MetaCacheEntry>(key);
|
|
meta_cache_.insert(key, entry);
|
|
return entry;
|
|
} else {
|
|
return iter->second;
|
|
}
|
|
}
|
|
|
|
|
|
void GlobalDataAdaptor::SetCachePolicy(const std::string &key, CachePolicy &policy) {
|
|
// ...
|
|
}
|
|
|
|
const CachePolicy &GlobalDataAdaptor::GetCachePolicy(const std::string &key) const {
|
|
return GetGlobalConfig().default_policy;
|
|
}
|
|
|
|
std::shared_ptr<GlobalCacheClient> GlobalDataAdaptor::GetRpcClient() const {
|
|
return rpc_client_[lrand48() % rpc_client_.size()];
|
|
}
|
|
|
|
int GlobalDataAdaptor::PerformGarbageCollection(const std::string &prefix) {
|
|
LOG(INFO) << "==================GC START===================";
|
|
butil::Timer t;
|
|
t.start();
|
|
|
|
std::vector<uint64_t> write_cache_ts;
|
|
std::set<int> skipped_server_id_list;
|
|
for (int server_id = 0; server_id < server_list_.size(); ++server_id) {
|
|
auto res = GetRpcClient()->QueryTsFromWriteCache(server_id).get();
|
|
if (res.status != OK) {
|
|
std::cerr << RED << "Skip recycling write cache data in server " << server_id << WHITE << std::endl;
|
|
skipped_server_id_list.insert(server_id);
|
|
}
|
|
write_cache_ts.push_back(res.timestamp);
|
|
LOG(INFO) << "TS for server " << server_id << ": " << res.timestamp;
|
|
}
|
|
|
|
t.stop();
|
|
LOG(INFO) << "Flush stage 1: " << t.u_elapsed();
|
|
|
|
if (server_list_.size() == skipped_server_id_list.size()) {
|
|
std::cerr << RED << "All servers are not available." << WHITE << std::endl;
|
|
return RPC_FAILED;
|
|
}
|
|
|
|
std::vector<std::string> key_list;
|
|
int rc = etcd_client_->ListJson(prefix, key_list).get();
|
|
if (rc) {
|
|
std::cerr << RED << "Failed to list metadata in write cache. "
|
|
<< "Check the availability of etcd server." << WHITE << std::endl;
|
|
return rc;
|
|
}
|
|
|
|
for (auto &key : key_list) {
|
|
LOG(INFO) << "Found entry: " << key;
|
|
}
|
|
|
|
t.stop();
|
|
LOG(INFO) << "Flush stage 2: " << t.u_elapsed();
|
|
|
|
std::vector<folly::Future<int>> future_list;
|
|
for (auto &key : key_list) {
|
|
future_list.emplace_back(DeepFlush(key));
|
|
}
|
|
|
|
auto output = folly::collectAll(future_list).get();
|
|
for (auto &entry: output)
|
|
if (entry.value_or(FOLLY_ERROR) != OK) {
|
|
LOG(ERROR) << "Cannot flush data to S3 storage";
|
|
}
|
|
|
|
t.stop();
|
|
LOG(INFO) << "Flush stage 3: " << t.u_elapsed();
|
|
|
|
// Recheck the JSON metadata from etcd server
|
|
rc = etcd_client_->ListJson(prefix, key_list).get();
|
|
if (rc != 0 && rc != NOT_FOUND) {
|
|
return rc;
|
|
}
|
|
|
|
t.stop();
|
|
LOG(INFO) << "Flush stage 4: " << t.u_elapsed();
|
|
|
|
std::unordered_map<int, std::vector<std::string>> preserve_chunk_keys_map;
|
|
for (auto &key : key_list) {
|
|
auto resp = etcd_client_->GetJson(key).get();
|
|
if (resp.status) {
|
|
continue;
|
|
}
|
|
|
|
std::vector<int> replicas;
|
|
for (auto &entry : resp.root["replica"]) {
|
|
replicas.push_back(entry.asInt());
|
|
}
|
|
|
|
std::vector<std::string> internal_keys;
|
|
for (auto &entry : resp.root["path"]) {
|
|
internal_keys.push_back(entry.asString());
|
|
}
|
|
|
|
assert(!replicas.empty() && !internal_keys.empty());
|
|
for (int i = 0; i < internal_keys.size(); ++i) {
|
|
preserve_chunk_keys_map[replicas[i % replicas.size()]].push_back(internal_keys[i]);
|
|
}
|
|
}
|
|
|
|
for (int server_id = 0; server_id < server_list_.size(); ++server_id) {
|
|
if (skipped_server_id_list.count(server_id)) {
|
|
continue;
|
|
}
|
|
|
|
std::vector<std::string> except_keys;
|
|
if (preserve_chunk_keys_map.count(server_id)) {
|
|
except_keys = preserve_chunk_keys_map[server_id];
|
|
}
|
|
|
|
int rc = GetRpcClient()->DeleteEntryFromWriteCache(server_id,
|
|
prefix,
|
|
write_cache_ts[server_id],
|
|
except_keys).get();
|
|
if (rc) {
|
|
LOG(WARNING) << "Cannot delete unused entries from write cache. Server id: " << server_id;
|
|
}
|
|
}
|
|
|
|
t.stop();
|
|
LOG(INFO) << "Flush stage 5: " << t.u_elapsed();
|
|
|
|
LOG(INFO) << "==================GC END===================";
|
|
return 0;
|
|
}
|
|
|
|
folly::Future<int> GlobalDataAdaptor::UpLoadPart(const std::string &key,
|
|
size_t off,
|
|
size_t size,
|
|
const ByteBuffer &buffer,
|
|
const std::map<std::string, std::string> &headers,
|
|
Json::Value& root) {
|
|
#ifdef CONFIG_GC_ON_EXCEEDING_DISKSPACE
|
|
return DoUpLoadPart(key, off, size, buffer, headers, root)
|
|
.thenValue([this, key, off, size, &buffer, &headers, &root](int &&res) -> int {
|
|
if (res != NO_ENOUGH_DISKSPACE) {
|
|
return res;
|
|
}
|
|
LOG(INFO) << "Disk limit exceeded - perform GC immediately";
|
|
res = PerformGarbageCollection();
|
|
if (res) {
|
|
LOG(WARNING) << "GC failed";
|
|
return res;
|
|
}
|
|
LOG(INFO) << "Disk limit exceeded - GC completed, now retry";
|
|
return DoUpLoadPart(key, off, size, buffer, headers, root).get();
|
|
});
|
|
#else
|
|
return DoUpLoadPart(key, off, size, buffer, headers, root);
|
|
#endif
|
|
}
|
|
|
|
folly::Future<int> GlobalDataAdaptor::DoUpLoadPart(const std::string &key,
|
|
size_t off,
|
|
size_t size,
|
|
const ByteBuffer &buffer,
|
|
const std::map<std::string, std::string> &headers,
|
|
Json::Value& root) {
|
|
butil::Timer *t = new butil::Timer();
|
|
t->start();
|
|
auto &policy = GetCachePolicy(key);
|
|
auto pre_op = read_cache_->Invalidate(key, off + size);
|
|
if (policy.write_cache_type == REPLICATION || policy.write_cache_type == REED_SOLOMON) {
|
|
auto write_cache = policy.write_cache_type == REPLICATION
|
|
? write_caches_[WC_TYPE_REPLICATION]
|
|
: write_caches_[WC_TYPE_REEDSOLOMON];
|
|
return std::move(pre_op)
|
|
.then(std::bind(&WriteCacheClient::Put, write_cache.get(), key, size, buffer, headers, off))
|
|
.then([this, t, &root] (folly::Try<WriteCacheClient::PutResult> output) -> folly::Future<int> {
|
|
int status = output.hasValue() ? output.value().status : FOLLY_ERROR;
|
|
if (status == OK) {
|
|
root = std::move(output.value().root);
|
|
t->stop();
|
|
delete t;
|
|
}
|
|
return folly::makeFuture(status);
|
|
});
|
|
} else {
|
|
LOG(ERROR) << "Failed to upload data, reason: unsuppported type, key: " << key
|
|
<< ", size: " << size
|
|
<< ", type: " << policy.write_cache_type;
|
|
return folly::makeFuture(UNSUPPORTED_TYPE);
|
|
}
|
|
}
|
|
|
|
folly::Future<int> GlobalDataAdaptor::Completed(const std::string &key,
|
|
const std::vector<Json::Value> &roots,
|
|
size_t size) {
|
|
if (!roots.empty()) {
|
|
auto meta_cache_entry = GetMetaCacheEntry(key);
|
|
meta_cache_entry->present = false;
|
|
|
|
Json::Value json_path(Json::arrayValue);
|
|
for (int i=0; i<roots.size(); ++i) {
|
|
for (auto& partial_key : roots[i]["path"])
|
|
json_path.append(partial_key.asString());
|
|
}
|
|
Json::Value new_root = roots[0];
|
|
new_root["path"] = json_path;
|
|
new_root["size"] = size;
|
|
|
|
return etcd_client_->PutJson(key, new_root);
|
|
}
|
|
return folly::makeFuture(OK);
|
|
}
|