258 lines
9.5 KiB
C++
258 lines
9.5 KiB
C++
#include "errorcode.h"
|
|
#include "read_cache.h"
|
|
|
|
namespace HybridCache {
|
|
|
|
ReadCache::ReadCache(const ReadCacheConfig& cfg,
|
|
std::shared_ptr<DataAdaptor> dataAdaptor,
|
|
std::shared_ptr<ThreadPool> executor) :
|
|
cfg_(cfg), dataAdaptor_(dataAdaptor), executor_(executor) {
|
|
Init();
|
|
}
|
|
|
|
folly::Future<int> ReadCache::Get(const std::string &key, size_t start,
|
|
size_t len, ByteBuffer &buffer) {
|
|
std::chrono::steady_clock::time_point startTime;
|
|
if (EnableLogging) startTime = std::chrono::steady_clock::now();
|
|
|
|
int res = SUCCESS;
|
|
uint32_t pageSize = cfg_.CacheCfg.PageBodySize;
|
|
size_t index = start / pageSize;
|
|
uint32_t pagePos = start % pageSize;
|
|
size_t readLen = 0;
|
|
size_t realReadLen = 0;
|
|
size_t bufOffset = 0;
|
|
size_t remainLen = len;
|
|
uint64_t readPageCnt = 0;
|
|
std::vector<std::pair<size_t, size_t>> dataBoundary;
|
|
|
|
while (remainLen > 0) {
|
|
readLen = pagePos + remainLen > pageSize ? pageSize - pagePos : remainLen;
|
|
std::string pageKey = std::move(GetPageKey(key, index));
|
|
std::vector<std::pair<size_t, size_t>> stepDataBoundary;
|
|
int tmpRes = pageCache_->Read(pageKey, pagePos, readLen,
|
|
(buffer.data + bufOffset), stepDataBoundary);
|
|
if (SUCCESS == tmpRes) {
|
|
++readPageCnt;
|
|
} else if (PAGE_NOT_FOUND != tmpRes) {
|
|
res = tmpRes;
|
|
break;
|
|
}
|
|
|
|
for (auto& it : stepDataBoundary) {
|
|
dataBoundary.push_back(std::make_pair(it.first + bufOffset, it.second));
|
|
realReadLen += it.second;
|
|
}
|
|
remainLen -= readLen;
|
|
++index;
|
|
bufOffset += readLen;
|
|
pagePos = (pagePos + readLen) % pageSize;
|
|
}
|
|
|
|
remainLen = len - realReadLen;
|
|
if (remainLen > 0 && !dataAdaptor_) {
|
|
res = ADAPTOR_NOT_FOUND;
|
|
}
|
|
|
|
// handle cache misses
|
|
readLen = 0;
|
|
size_t stepStart = 0;
|
|
size_t fileStartOff = 0;
|
|
std::vector<folly::Future<int>> fs;
|
|
auto it = dataBoundary.begin();
|
|
while (remainLen > 0 && SUCCESS == res) {
|
|
ByteBuffer stepBuffer(buffer.data + stepStart);
|
|
fileStartOff = start + stepStart;
|
|
if (it != dataBoundary.end()) {
|
|
readLen = it->first - stepStart;
|
|
if (!readLen) {
|
|
stepStart = it->first + it->second;
|
|
++it;
|
|
continue;
|
|
}
|
|
stepStart = it->first + it->second;
|
|
++it;
|
|
} else {
|
|
readLen = remainLen;
|
|
}
|
|
stepBuffer.len = readLen;
|
|
remainLen -= readLen;
|
|
|
|
auto download = folly::via(executor_.get(), [this, readLen]() {
|
|
// download flow control
|
|
while(!this->tokenBucket_->consume(readLen));
|
|
return SUCCESS;
|
|
}).thenValue([this, key, fileStartOff, readLen, stepBuffer](int i) {
|
|
// LOG(INFO) << "Extra download: " << key << " " << readLen;
|
|
ByteBuffer tmpBuffer(stepBuffer.data, readLen);
|
|
return this->dataAdaptor_->DownLoad(key, fileStartOff, readLen, tmpBuffer).get();
|
|
}).thenValue([this, key, fileStartOff, readLen, stepBuffer](int downRes) {
|
|
if (EnableLogging && SUCCESS != downRes) {
|
|
LOG(ERROR) << "[ReadCache]DownLoad failed, file:" << key
|
|
<< ", start:" << fileStartOff << ", len:" << readLen
|
|
<< ", res:" << downRes;
|
|
return downRes;
|
|
}
|
|
return this->Put(key, fileStartOff, readLen, stepBuffer);
|
|
});
|
|
|
|
fs.emplace_back(std::move(download));
|
|
}
|
|
|
|
if (!fs.empty()) {
|
|
return collectAll(fs).via(executor_.get())
|
|
.thenValue([key, start, len, readPageCnt, startTime](
|
|
std::vector<folly::Try<int>, std::allocator<folly::Try<int>>>&& tups) {
|
|
int finalRes = SUCCESS;
|
|
for (const auto& t : tups) {
|
|
if (SUCCESS != t.value()) finalRes = t.value();
|
|
}
|
|
if (EnableLogging) {
|
|
double totalTime = std::chrono::duration<double, std::milli>(
|
|
std::chrono::steady_clock::now() - startTime).count();
|
|
LOG(INFO) << "[ReadCache]Get, key:" << key << ", start:" << start
|
|
<< ", len:" << len << ", res:" << finalRes
|
|
<< ", readPageCnt:" << readPageCnt
|
|
<< ", time:" << totalTime << "ms";
|
|
}
|
|
return finalRes;
|
|
});
|
|
// auto tups = collectAll(fs).get();
|
|
// int finalRes = SUCCESS;
|
|
// for (const auto& t : tups) {
|
|
// if (SUCCESS != t.value()) finalRes = t.value();
|
|
// }
|
|
// if (EnableLogging) {
|
|
// double totalTime = std::chrono::duration<double, std::milli>(
|
|
// std::chrono::steady_clock::now() - startTime).count();
|
|
// LOG(INFO) << "[ReadCache]Get, key:" << key << ", start:" << start
|
|
// << ", len:" << len << ", res:" << finalRes
|
|
// << ", readPageCnt:" << readPageCnt
|
|
// << ", time:" << totalTime << "ms";
|
|
// }
|
|
// return finalRes;
|
|
}
|
|
|
|
if (EnableLogging) {
|
|
double totalTime = std::chrono::duration<double, std::milli>(
|
|
std::chrono::steady_clock::now() - startTime).count();
|
|
LOG(INFO) << "[ReadCache]Get, key:" << key << ", start:" << start
|
|
<< ", len:" << len << ", res:" << res
|
|
<< ", readPageCnt:" << readPageCnt
|
|
<< ", time:" << totalTime << "ms";
|
|
}
|
|
return folly::makeFuture(res);
|
|
}
|
|
|
|
int ReadCache::Put(const std::string &key, size_t start, size_t len,
|
|
const ByteBuffer &buffer) {
|
|
std::chrono::steady_clock::time_point startTime;
|
|
if (EnableLogging) startTime = std::chrono::steady_clock::now();
|
|
|
|
int res = SUCCESS;
|
|
uint32_t pageSize = cfg_.CacheCfg.PageBodySize;
|
|
uint64_t index = start / pageSize;
|
|
uint64_t pagePos = start % pageSize;
|
|
uint64_t writeLen = 0;
|
|
uint64_t writeOffset = 0;
|
|
uint64_t writePageCnt = 0;
|
|
size_t remainLen = len;
|
|
|
|
while (remainLen > 0) {
|
|
writeLen = pagePos + remainLen > pageSize ? pageSize - pagePos : remainLen;
|
|
std::string pageKey = std::move(GetPageKey(key, index));
|
|
res = pageCache_->Write(pageKey, pagePos, writeLen,
|
|
(buffer.data + writeOffset));
|
|
if (SUCCESS != res) break;
|
|
++writePageCnt;
|
|
remainLen -= writeLen;
|
|
++index;
|
|
writeOffset += writeLen;
|
|
pagePos = (pagePos + writeLen) % pageSize;
|
|
}
|
|
|
|
if (EnableLogging) {
|
|
double totalTime = std::chrono::duration<double, std::milli>(
|
|
std::chrono::steady_clock::now() - startTime).count();
|
|
LOG(INFO) << "[ReadCache]Put, key:" << key << ", start:" << start
|
|
<< ", len:" << len << ", res:" << res
|
|
<< ", writePageCnt:" << writePageCnt
|
|
<< ", time:" << totalTime << "ms";
|
|
}
|
|
return res;
|
|
}
|
|
|
|
int ReadCache::Delete(const std::string &key) {
|
|
std::chrono::steady_clock::time_point startTime;
|
|
if (EnableLogging) startTime = std::chrono::steady_clock::now();
|
|
|
|
int res = SUCCESS;
|
|
size_t delPageNum = 0;
|
|
std::string firstPage = std::move(GetPageKey(key, 0));
|
|
auto pageKey = pageCache_->GetPageList().lower_bound(firstPage);
|
|
while (pageKey != pageCache_->GetPageList().end()) {
|
|
std::vector<std::string> tokens;
|
|
split(*pageKey, PAGE_SEPARATOR, tokens);
|
|
if (key != tokens[0]) break;
|
|
int tmpRes = pageCache_->Delete(*pageKey);
|
|
if (SUCCESS == tmpRes) {
|
|
++delPageNum;
|
|
} else if (PAGE_NOT_FOUND != tmpRes) {
|
|
res = tmpRes;
|
|
break;
|
|
}
|
|
++pageKey;
|
|
}
|
|
|
|
if (EnableLogging) {
|
|
double totalTime = std::chrono::duration<double, std::milli>(
|
|
std::chrono::steady_clock::now() - startTime).count();
|
|
LOG(INFO) << "[ReadCache]Delete, key:" << key << ", res:" << res
|
|
<< ", delPageCnt:" << delPageNum
|
|
<< ", time:" << totalTime << "ms";
|
|
}
|
|
return res;
|
|
}
|
|
|
|
int ReadCache::GetAllKeys(std::set<std::string>& keys) {
|
|
std::chrono::steady_clock::time_point startTime;
|
|
if (EnableLogging) startTime = std::chrono::steady_clock::now();
|
|
|
|
auto pageKey = pageCache_->GetPageList().begin();
|
|
while (pageKey != pageCache_->GetPageList().end()) {
|
|
std::vector<std::string> tokens;
|
|
split(*pageKey, PAGE_SEPARATOR, tokens);
|
|
keys.insert(tokens[0]);
|
|
++pageKey;
|
|
}
|
|
if (EnableLogging) {
|
|
double totalTime = std::chrono::duration<double, std::milli>(
|
|
std::chrono::steady_clock::now() - startTime).count();
|
|
LOG(INFO) << "[ReadCache]Get all keys, keyCnt:" << keys.size()
|
|
<< ", time:" << totalTime << "ms";
|
|
}
|
|
return SUCCESS;
|
|
}
|
|
|
|
void ReadCache::Close() {
|
|
pageCache_->Close();
|
|
LOG(WARNING) << "[ReadCache]Close";
|
|
}
|
|
|
|
int ReadCache::Init() {
|
|
pageCache_ = std::make_shared<PageCacheImpl>(cfg_.CacheCfg);
|
|
tokenBucket_ = std::make_shared<folly::TokenBucket>(
|
|
cfg_.DownloadNormalFlowLimit, cfg_.DownloadBurstFlowLimit);
|
|
int res = pageCache_->Init();
|
|
LOG(WARNING) << "[ReadCache]Init, res:" << res;
|
|
return res;
|
|
}
|
|
|
|
std::string ReadCache::GetPageKey(const std::string &key, size_t pageIndex) {
|
|
std::string pageKey(key);
|
|
pageKey.append(std::string(1, PAGE_SEPARATOR)).append(std::to_string(pageIndex));
|
|
return pageKey;
|
|
}
|
|
|
|
} // namespace HybridCache
|