libsnapshot:snapuserd: read-ahead COW copy ops

Introduce read-ahead mechanism for COW copy ops.

1: Read-ahead thread will read from base device
   and store the data in scratch space along with the metadata.
2: Worker threads during merge will retrieve the data
   from read-ahead cache
3: Fixed set of blocks are read during each cycle by the read-ahead
   thread.
4: When the last block in the region is merged, read-ahead thread
   makes forward progress.

Scratch space is set to 2MB and is only used from COW copy operations.
We can extend this to Replace Ops based on performance evaluation.

Performance:

As mentioned in bug 181883791, Incremental OTA of size 55M with
235K copy operations where every block is moved by 4k:

Without read-ahead: 40 Minutes for merge completion
With read-ahead:  21 Minutes for merge completion

Bug: 183863613

Test: 1: Full OTA - no regression observed.
2: Incremental OTA - with older COW format. Daemon will just skip
   the read-ahead feature for older COW format.
3: Incremental OTA - with new COW format.
4: Reboot and crash kernel when multiple times when incremental OTA is in-flight.
   Verify post reboot, read-ahead thread re-constructs the data from scratch
   space.
5: No regression observed in RSS-Anon memory usage when merge in-flight.

Signed-off-by: Akilesh Kailash <akailash@google.com>
Change-Id: Ic565bfbee3e9fcfc94af694596dbf44c0877639f
This commit is contained in:
Akilesh Kailash 2021-03-29 22:22:45 +00:00
parent d967d01f56
commit 580312bc95
8 changed files with 948 additions and 30 deletions

View File

@ -420,7 +420,8 @@ cc_defaults {
"snapuserd_server.cpp",
"snapuserd.cpp",
"snapuserd_daemon.cpp",
"snapuserd_worker.cpp",
"snapuserd_worker.cpp",
"snapuserd_readahead.cpp",
],
cflags: [

View File

@ -369,13 +369,7 @@ void CowReader::InitializeMerge() {
// Replace-op-4, Zero-op-9, Replace-op-5 }
//==============================================================
for (uint64_t i = 0; i < ops_->size(); i++) {
auto& current_op = ops_->data()[i];
if (current_op.type != kCowCopyOp) {
break;
}
num_copy_ops += 1;
}
num_copy_ops = FindNumCopyops();
std::sort(ops_.get()->begin() + num_copy_ops, ops_.get()->end(),
[](CowOperation& op1, CowOperation& op2) -> bool {
@ -386,6 +380,23 @@ void CowReader::InitializeMerge() {
CHECK(ops_->size() >= header_.num_merge_ops);
ops_->erase(ops_.get()->begin(), ops_.get()->begin() + header_.num_merge_ops);
}
num_copy_ops = FindNumCopyops();
set_copy_ops(num_copy_ops);
}
uint64_t CowReader::FindNumCopyops() {
uint64_t num_copy_ops = 0;
for (uint64_t i = 0; i < ops_->size(); i++) {
auto& current_op = ops_->data()[i];
if (current_op.type != kCowCopyOp) {
break;
}
num_copy_ops += 1;
}
return num_copy_ops;
}
bool CowReader::GetHeader(CowHeader* header) {

View File

@ -35,6 +35,8 @@ static constexpr uint32_t BLOCK_SHIFT = (__builtin_ffs(BLOCK_SZ) - 1);
// +-----------------------+
// | Header (fixed) |
// +-----------------------+
// | Scratch space |
// +-----------------------+
// | Operation (variable) |
// | Data (variable) |
// +-----------------------+
@ -152,11 +154,28 @@ static constexpr uint8_t kCowCompressNone = 0;
static constexpr uint8_t kCowCompressGz = 1;
static constexpr uint8_t kCowCompressBrotli = 2;
static constexpr uint8_t kCowReadAheadNotStarted = 0;
static constexpr uint8_t kCowReadAheadInProgress = 1;
static constexpr uint8_t kCowReadAheadDone = 2;
struct CowFooter {
CowFooterOperation op;
CowFooterData data;
} __attribute__((packed));
struct ScratchMetadata {
// Block of data in the image that operation modifies
// and read-ahead thread stores the modified data
// in the scratch space
uint64_t new_block;
// Offset within the file to read the data
uint64_t file_offset;
} __attribute__((packed));
struct BufferState {
uint8_t read_ahead_state;
} __attribute__((packed));
// 2MB Scratch space used for read-ahead
static constexpr uint64_t BUFFER_REGION_DEFAULT_SIZE = (1ULL << 21);

View File

@ -141,18 +141,21 @@ class CowReader : public ICowReader {
bool GetRawBytes(uint64_t offset, void* buffer, size_t len, size_t* read);
void UpdateMergeProgress(uint64_t merge_ops) { header_.num_merge_ops += merge_ops; }
void InitializeMerge();
void set_total_data_ops(uint64_t size) { total_data_ops_ = size; }
uint64_t total_data_ops() { return total_data_ops_; }
void set_copy_ops(uint64_t size) { copy_ops_ = size; }
uint64_t total_copy_ops() { return copy_ops_; }
void CloseCowFd() { owned_fd_ = {}; }
private:
bool ParseOps(std::optional<uint64_t> label);
uint64_t FindNumCopyops();
android::base::unique_fd owned_fd_;
android::base::borrowed_fd fd_;
@ -162,6 +165,7 @@ class CowReader : public ICowReader {
std::optional<uint64_t> last_label_;
std::shared_ptr<std::vector<CowOperation>> ops_;
uint64_t total_data_ops_;
uint64_t copy_ops_;
};
} // namespace snapshot

View File

@ -47,6 +47,9 @@ bool Snapuserd::InitializeWorkers() {
worker_threads_.push_back(std::move(wt));
}
read_ahead_thread_ = std::make_unique<ReadAheadThread>(cow_device_, backing_store_device_,
misc_name_, GetSharedPtr());
return true;
}
@ -54,7 +57,11 @@ bool Snapuserd::CommitMerge(int num_merge_ops) {
struct CowHeader* ch = reinterpret_cast<struct CowHeader*>(mapped_addr_);
ch->num_merge_ops += num_merge_ops;
// Sync the first 4k block
if (read_ahead_feature_ && read_ahead_ops_.size() > 0) {
struct BufferState* ra_state = GetBufferState();
ra_state->read_ahead_state = kCowReadAheadInProgress;
}
int ret = msync(mapped_addr_, BLOCK_SZ, MS_SYNC);
if (ret < 0) {
PLOG(ERROR) << "msync header failed: " << ret;
@ -66,6 +73,174 @@ bool Snapuserd::CommitMerge(int num_merge_ops) {
return true;
}
void Snapuserd::PrepareReadAhead() {
if (!read_ahead_feature_) {
return;
}
struct BufferState* ra_state = GetBufferState();
// Check if the data has to be re-constructed from COW device
if (ra_state->read_ahead_state == kCowReadAheadDone) {
populate_data_from_cow_ = true;
} else {
populate_data_from_cow_ = false;
}
StartReadAhead();
}
bool Snapuserd::GetRABuffer(std::unique_lock<std::mutex>* lock, uint64_t block, void* buffer) {
CHECK(lock->owns_lock());
std::unordered_map<uint64_t, void*>::iterator it = read_ahead_buffer_map_.find(block);
// This will be true only for IO's generated as part of reading a root
// filesystem. IO's related to merge should always be in read-ahead cache.
if (it == read_ahead_buffer_map_.end()) {
return false;
}
// Theoretically, we can send the data back from the read-ahead buffer
// all the way to the kernel without memcpy. However, if the IO is
// un-aligned, the wrapper function will need to touch the read-ahead
// buffers and transitions will be bit more complicated.
memcpy(buffer, it->second, BLOCK_SZ);
return true;
}
// ========== State transition functions for read-ahead operations ===========
bool Snapuserd::GetReadAheadPopulatedBuffer(uint64_t block, void* buffer) {
if (!read_ahead_feature_) {
return false;
}
{
std::unique_lock<std::mutex> lock(lock_);
if (io_state_ == READ_AHEAD_IO_TRANSITION::READ_AHEAD_FAILURE) {
return false;
}
if (io_state_ == READ_AHEAD_IO_TRANSITION::IO_IN_PROGRESS) {
return GetRABuffer(&lock, block, buffer);
}
}
{
// Read-ahead thread IO is in-progress. Wait for it to complete
std::unique_lock<std::mutex> lock(lock_);
while (!(io_state_ == READ_AHEAD_IO_TRANSITION::READ_AHEAD_FAILURE ||
io_state_ == READ_AHEAD_IO_TRANSITION::IO_IN_PROGRESS)) {
cv.wait(lock);
}
return GetRABuffer(&lock, block, buffer);
}
}
// This is invoked by read-ahead thread waiting for merge IO's
// to complete
bool Snapuserd::WaitForMergeToComplete() {
{
std::unique_lock<std::mutex> lock(lock_);
while (!(io_state_ == READ_AHEAD_IO_TRANSITION::READ_AHEAD_BEGIN ||
io_state_ == READ_AHEAD_IO_TRANSITION::IO_TERMINATED)) {
cv.wait(lock);
}
if (io_state_ == READ_AHEAD_IO_TRANSITION::IO_TERMINATED) {
return false;
}
io_state_ = READ_AHEAD_IO_TRANSITION::READ_AHEAD_IN_PROGRESS;
return true;
}
}
// This is invoked during the launch of worker threads. We wait
// for read-ahead thread to by fully up before worker threads
// are launched; else we will have a race between worker threads
// and read-ahead thread specifically during re-construction.
bool Snapuserd::WaitForReadAheadToStart() {
{
std::unique_lock<std::mutex> lock(lock_);
while (!(io_state_ == READ_AHEAD_IO_TRANSITION::IO_IN_PROGRESS ||
io_state_ == READ_AHEAD_IO_TRANSITION::READ_AHEAD_FAILURE)) {
cv.wait(lock);
}
if (io_state_ == READ_AHEAD_IO_TRANSITION::READ_AHEAD_FAILURE) {
return false;
}
return true;
}
}
// Invoked by worker threads when a sequence of merge operation
// is complete notifying read-ahead thread to make forward
// progress.
void Snapuserd::StartReadAhead() {
{
std::lock_guard<std::mutex> lock(lock_);
io_state_ = READ_AHEAD_IO_TRANSITION::READ_AHEAD_BEGIN;
}
cv.notify_one();
}
void Snapuserd::MergeCompleted() {
{
std::lock_guard<std::mutex> lock(lock_);
io_state_ = READ_AHEAD_IO_TRANSITION::IO_TERMINATED;
}
cv.notify_one();
}
bool Snapuserd::ReadAheadIOCompleted() {
// Flush the entire buffer region
int ret = msync(mapped_addr_, total_mapped_addr_length_, MS_SYNC);
if (ret < 0) {
PLOG(ERROR) << "msync failed after ReadAheadIOCompleted: " << ret;
return false;
}
// Metadata and data are synced. Now, update the state.
// We need to update the state after flushing data; if there is a crash
// when read-ahead IO is in progress, the state of data in the COW file
// is unknown. kCowReadAheadDone acts as a checkpoint wherein the data
// in the scratch space is good and during next reboot, read-ahead thread
// can safely re-construct the data.
struct BufferState* ra_state = GetBufferState();
ra_state->read_ahead_state = kCowReadAheadDone;
ret = msync(mapped_addr_, BLOCK_SZ, MS_SYNC);
if (ret < 0) {
PLOG(ERROR) << "msync failed to flush Readahead completion state...";
return false;
}
// Notify the worker threads
{
std::lock_guard<std::mutex> lock(lock_);
io_state_ = READ_AHEAD_IO_TRANSITION::IO_IN_PROGRESS;
}
cv.notify_all();
return true;
}
void Snapuserd::ReadAheadIOFailed() {
{
std::lock_guard<std::mutex> lock(lock_);
io_state_ = READ_AHEAD_IO_TRANSITION::READ_AHEAD_FAILURE;
}
cv.notify_all();
}
//========== End of state transition functions ====================
bool Snapuserd::IsChunkIdMetadata(chunk_t chunk) {
uint32_t stride = exceptions_per_area_ + 1;
lldiv_t divresult = lldiv(chunk, stride);
@ -257,13 +432,16 @@ bool Snapuserd::ReadMetadata() {
data_chunk_id = GetNextAllocatableChunkId(data_chunk_id);
}
int num_ra_ops_per_iter = ((GetBufferDataSize()) / BLOCK_SZ);
std::optional<chunk_t> prev_id = {};
std::map<uint64_t, const CowOperation*> map;
std::set<uint64_t> dest_blocks;
size_t pending_copy_ops = exceptions_per_area_ - num_ops;
SNAP_LOG(INFO) << " Processing copy-ops at Area: " << vec_.size()
<< " Number of replace/zero ops completed in this area: " << num_ops
<< " Pending copy ops for this area: " << pending_copy_ops;
uint64_t total_copy_ops = reader_->total_copy_ops();
SNAP_LOG(DEBUG) << " Processing copy-ops at Area: " << vec_.size()
<< " Number of replace/zero ops completed in this area: " << num_ops
<< " Pending copy ops for this area: " << pending_copy_ops;
while (!cowop_riter_->Done()) {
do {
const CowOperation* cow_op = &cowop_riter_->Get();
@ -425,6 +603,9 @@ bool Snapuserd::ReadMetadata() {
offset += sizeof(struct disk_exception);
num_ops += 1;
copy_ops++;
if (read_ahead_feature_) {
read_ahead_ops_.push_back(it->second);
}
SNAP_LOG(DEBUG) << num_ops << ":"
<< " Copy-op: "
@ -452,6 +633,15 @@ bool Snapuserd::ReadMetadata() {
}
data_chunk_id = GetNextAllocatableChunkId(data_chunk_id);
total_copy_ops -= 1;
/*
* Split the number of ops based on the size of read-ahead buffer
* region. We need to ensure that kernel doesn't issue IO on blocks
* which are not read by the read-ahead thread.
*/
if (read_ahead_feature_ && (total_copy_ops % num_ra_ops_per_iter == 0)) {
data_chunk_id = GetNextAllocatableChunkId(data_chunk_id);
}
}
map.clear();
dest_blocks.clear();
@ -469,6 +659,7 @@ bool Snapuserd::ReadMetadata() {
chunk_vec_.shrink_to_fit();
vec_.shrink_to_fit();
read_ahead_ops_.shrink_to_fit();
// Sort the vector based on sectors as we need this during un-aligned access
std::sort(chunk_vec_.begin(), chunk_vec_.end(), compare);
@ -483,6 +674,8 @@ bool Snapuserd::ReadMetadata() {
// Total number of sectors required for creating dm-user device
num_sectors_ = ChunkToSector(data_chunk_id);
merge_initiated_ = false;
PrepareReadAhead();
return true;
}
@ -490,8 +683,15 @@ bool Snapuserd::MmapMetadata() {
CowHeader header;
reader_->GetHeader(&header);
// mmap the first 4k page
total_mapped_addr_length_ = BLOCK_SZ;
if (header.major_version >= 2 && header.buffer_size > 0) {
total_mapped_addr_length_ = header.header_size + BUFFER_REGION_DEFAULT_SIZE;
read_ahead_feature_ = true;
} else {
// mmap the first 4k page - older COW format
total_mapped_addr_length_ = BLOCK_SZ;
read_ahead_feature_ = false;
}
mapped_addr_ = mmap(NULL, total_mapped_addr_length_, PROT_READ | PROT_WRITE, MAP_SHARED,
cow_fd_.get(), 0);
if (mapped_addr_ == MAP_FAILED) {
@ -529,11 +729,26 @@ bool Snapuserd::InitCowDevice() {
}
/*
* Entry point to launch worker threads
* Entry point to launch threads
*/
bool Snapuserd::Start() {
std::vector<std::future<bool>> threads;
std::future<bool> ra_thread;
bool rathread = (read_ahead_feature_ && (read_ahead_ops_.size() > 0));
// Start the read-ahead thread and wait
// for it as the data has to be re-constructed
// from COW device.
if (rathread) {
ra_thread = std::async(std::launch::async, &ReadAheadThread::RunThread,
read_ahead_thread_.get());
if (!WaitForReadAheadToStart()) {
SNAP_LOG(ERROR) << "Failed to start Read-ahead thread...";
return false;
}
}
// Launch worker threads
for (int i = 0; i < worker_threads_.size(); i++) {
threads.emplace_back(
std::async(std::launch::async, &WorkerThread::RunThread, worker_threads_[i].get()));
@ -544,8 +759,69 @@ bool Snapuserd::Start() {
ret = t.get() && ret;
}
if (rathread) {
// Notify the read-ahead thread that all worker threads
// are done. We need this explicit notification when
// there is an IO failure or there was a switch
// of dm-user table; thus, forcing the read-ahead
// thread to wake up.
MergeCompleted();
ret = ret && ra_thread.get();
}
return ret;
}
uint64_t Snapuserd::GetBufferMetadataOffset() {
CowHeader header;
reader_->GetHeader(&header);
size_t size = header.header_size + sizeof(BufferState);
return size;
}
/*
* Metadata for read-ahead is 16 bytes. For a 2 MB region, we will
* end up with 8k (2 PAGE) worth of metadata. Thus, a 2MB buffer
* region is split into:
*
* 1: 8k metadata
*
*/
size_t Snapuserd::GetBufferMetadataSize() {
CowHeader header;
reader_->GetHeader(&header);
size_t metadata_bytes = (header.buffer_size * sizeof(struct ScratchMetadata)) / BLOCK_SZ;
return metadata_bytes;
}
size_t Snapuserd::GetBufferDataOffset() {
CowHeader header;
reader_->GetHeader(&header);
return (header.header_size + GetBufferMetadataSize());
}
/*
* (2MB - 8K = 2088960 bytes) will be the buffer region to hold the data.
*/
size_t Snapuserd::GetBufferDataSize() {
CowHeader header;
reader_->GetHeader(&header);
size_t size = header.buffer_size - GetBufferMetadataSize();
return size;
}
struct BufferState* Snapuserd::GetBufferState() {
CowHeader header;
reader_->GetHeader(&header);
struct BufferState* ra_state =
reinterpret_cast<struct BufferState*>((char*)mapped_addr_ + header.header_size);
return ra_state;
}
} // namespace snapshot
} // namespace android

View File

@ -20,6 +20,7 @@
#include <sys/mman.h>
#include <bitset>
#include <condition_variable>
#include <csignal>
#include <cstring>
#include <future>
@ -57,6 +58,35 @@ static_assert(PAYLOAD_SIZE >= BLOCK_SZ);
*/
static constexpr int NUM_THREADS_PER_PARTITION = 4;
/*
* State transitions between worker threads and read-ahead
* threads.
*
* READ_AHEAD_BEGIN: Worker threads initiates the read-ahead
* thread to begin reading the copy operations
* for each bounded region.
*
* READ_AHEAD_IN_PROGRESS: When read ahead thread is in-flight
* and reading the copy operations.
*
* IO_IN_PROGRESS: Merge operation is in-progress by worker threads.
*
* IO_TERMINATED: When all the worker threads are done, request the
* read-ahead thread to terminate
*
* READ_AHEAD_FAILURE: If there are any IO failures when read-ahead
* thread is reading from COW device.
*
* The transition of each states is described in snapuserd_readahead.cpp
*/
enum class READ_AHEAD_IO_TRANSITION {
READ_AHEAD_BEGIN,
READ_AHEAD_IN_PROGRESS,
IO_IN_PROGRESS,
IO_TERMINATED,
READ_AHEAD_FAILURE,
};
class BufferSink : public IByteSink {
public:
void Initialize(size_t size);
@ -77,6 +107,42 @@ class BufferSink : public IByteSink {
class Snapuserd;
class ReadAheadThread {
public:
ReadAheadThread(const std::string& cow_device, const std::string& backing_device,
const std::string& misc_name, std::shared_ptr<Snapuserd> snapuserd);
bool RunThread();
private:
void InitializeIter();
bool IterDone();
void IterNext();
const CowOperation* GetIterOp();
void InitializeBuffer();
bool InitializeFds();
void CloseFds() {
cow_fd_ = {};
backing_store_fd_ = {};
}
bool ReadAheadIOStart();
void PrepareReadAhead(uint64_t* source_block, int* pending_ops, std::vector<uint64_t>& blocks);
bool ReconstructDataFromCow();
void* read_ahead_buffer_;
void* metadata_buffer_;
std::vector<const CowOperation*>::reverse_iterator read_ahead_iter_;
std::string cow_device_;
std::string backing_store_device_;
std::string misc_name_;
unique_fd cow_fd_;
unique_fd backing_store_fd_;
std::shared_ptr<Snapuserd> snapuserd_;
};
class WorkerThread {
public:
WorkerThread(const std::string& cow_device, const std::string& backing_device,
@ -117,12 +183,16 @@ class WorkerThread {
bool ProcessCopyOp(const CowOperation* cow_op);
bool ProcessZeroOp();
bool ReadFromBaseDevice(const CowOperation* cow_op);
bool GetReadAheadPopulatedBuffer(const CowOperation* cow_op);
// Merge related functions
bool ProcessMergeComplete(chunk_t chunk, void* buffer);
loff_t GetMergeStartOffset(void* merged_buffer, void* unmerged_buffer,
int* unmerged_exceptions);
int GetNumberOfMergedOps(void* merged_buffer, void* unmerged_buffer, loff_t offset,
int unmerged_exceptions);
int unmerged_exceptions, bool* copy_op, bool* commit);
sector_t ChunkToSector(chunk_t chunk) { return chunk << CHUNK_SHIFT; }
chunk_t SectorToChunk(sector_t sector) { return sector >> CHUNK_SHIFT; }
@ -159,7 +229,10 @@ class Snapuserd : public std::enable_shared_from_this<Snapuserd> {
bool CommitMerge(int num_merge_ops);
void CloseFds() { cow_fd_ = {}; }
void FreeResources() { worker_threads_.clear(); }
void FreeResources() {
worker_threads_.clear();
read_ahead_thread_ = nullptr;
}
size_t GetMetadataAreaSize() { return vec_.size(); }
void* GetExceptionBuffer(size_t i) { return vec_[i].get(); }
@ -177,14 +250,44 @@ class Snapuserd : public std::enable_shared_from_this<Snapuserd> {
void UnmapBufferRegion();
bool MmapMetadata();
// Read-ahead related functions
std::vector<const CowOperation*>& GetReadAheadOpsVec() { return read_ahead_ops_; }
std::unordered_map<uint64_t, void*>& GetReadAheadMap() { return read_ahead_buffer_map_; }
void* GetMappedAddr() { return mapped_addr_; }
bool IsReadAheadFeaturePresent() { return read_ahead_feature_; }
void PrepareReadAhead();
void StartReadAhead();
void MergeCompleted();
bool ReadAheadIOCompleted();
void ReadAheadIOFailed();
bool WaitForMergeToComplete();
bool GetReadAheadPopulatedBuffer(uint64_t block, void* buffer);
bool ReconstructDataFromCow() { return populate_data_from_cow_; }
void ReconstructDataFromCowFinish() { populate_data_from_cow_ = false; }
bool WaitForReadAheadToStart();
uint64_t GetBufferMetadataOffset();
size_t GetBufferMetadataSize();
size_t GetBufferDataOffset();
size_t GetBufferDataSize();
// Final block to be merged in a given read-ahead buffer region
void SetFinalBlockMerged(uint64_t x) { final_block_merged_ = x; }
uint64_t GetFinalBlockMerged() { return final_block_merged_; }
// Total number of blocks to be merged in a given read-ahead buffer region
void SetTotalRaBlocksMerged(int x) { total_ra_blocks_merged_ = x; }
int GetTotalRaBlocksMerged() { return total_ra_blocks_merged_; }
private:
bool IsChunkIdMetadata(chunk_t chunk);
chunk_t GetNextAllocatableChunkId(chunk_t chunk_id);
bool GetRABuffer(std::unique_lock<std::mutex>* lock, uint64_t block, void* buffer);
bool ReadMetadata();
sector_t ChunkToSector(chunk_t chunk) { return chunk << CHUNK_SHIFT; }
chunk_t SectorToChunk(sector_t sector) { return sector >> CHUNK_SHIFT; }
bool IsBlockAligned(int read_size) { return ((read_size & (BLOCK_SZ - 1)) == 0); }
struct BufferState* GetBufferState();
std::string cow_device_;
std::string backing_store_device_;
@ -209,11 +312,22 @@ class Snapuserd : public std::enable_shared_from_this<Snapuserd> {
std::vector<std::pair<sector_t, const CowOperation*>> chunk_vec_;
std::mutex lock_;
std::condition_variable cv;
void* mapped_addr_;
size_t total_mapped_addr_length_;
std::vector<std::unique_ptr<WorkerThread>> worker_threads_;
// Read-ahead related
std::unordered_map<uint64_t, void*> read_ahead_buffer_map_;
std::vector<const CowOperation*> read_ahead_ops_;
bool populate_data_from_cow_ = false;
bool read_ahead_feature_;
uint64_t final_block_merged_;
int total_ra_blocks_merged_ = 0;
READ_AHEAD_IO_TRANSITION io_state_;
std::unique_ptr<ReadAheadThread> read_ahead_thread_;
bool merge_initiated_ = false;
bool attached_ = false;
};

View File

@ -0,0 +1,439 @@
/*
* Copyright (C) 2021 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "snapuserd.h"
#include <csignal>
#include <optional>
#include <set>
#include <libsnapshot/snapuserd_client.h>
namespace android {
namespace snapshot {
using namespace android;
using namespace android::dm;
using android::base::unique_fd;
#define SNAP_LOG(level) LOG(level) << misc_name_ << ": "
#define SNAP_PLOG(level) PLOG(level) << misc_name_ << ": "
/*
* Merging a copy operation involves the following flow:
*
* 1: dm-snapshot layer requests merge for a 4k block. dm-user sends the request
* to the daemon
* 2: daemon reads the source block
* 3: daemon copies the source data
* 4: IO completion sent back to dm-user (a switch from user space to kernel)
* 5: dm-snapshot merges the data to base device
* 6: dm-snapshot sends the merge-completion IO to dm-user
* 7: dm-user re-directs the merge completion IO to daemon (one more switch)
* 8: daemon updates the COW file about the completed merge request (a write syscall) and followed
* by a fysnc. 9: Send the IO completion back to dm-user
*
* The above sequence is a significant overhead especially when merging one 4k
* block at a time.
*
* Read-ahead layer will optimize the above path by reading the data from base
* device in the background so that merging thread can retrieve the data from
* the read-ahead cache. Additionally, syncing of merged data is deferred to
* read-ahead thread threadby the IO path is not bottlenecked.
*
* We create a scratch space of 2MB to store the read-ahead data in the COW
* device.
*
* +-----------------------+
* | Header (fixed) |
* +-----------------------+
* | Scratch space | <-- 2MB
* +-----------------------+
*
* Scratch space is as follows:
*
* +-----------------------+
* | Metadata | <- 4k page
* +-----------------------+
* | Metadata | <- 4k page
* +-----------------------+
* | |
* | Read-ahead data |
* | |
* +-----------------------+
*
* State transitions and communication between read-ahead thread and worker
* thread during merge:
* =====================================================================
*
* Worker Threads Read-Ahead thread
* ------------------------------------------------------------------
*
* |
* |
* --> -----------------READ_AHEAD_BEGIN------------->|
* | | | READ_AHEAD_IN_PROGRESS
* | WAIT |
* | | |
* | |<-----------------IO_IN_PROGRESS---------------
* | | |
* | | IO_IN_PRGRESS WAIT
* | | |
* |<--| |
* | |
* ------------------IO_TERMINATED--------------->|
* END
*
*
* ===================================================================
*
* Example:
*
* We have 6 copy operations to be executed in OTA and there is a overlap. Update-engine
* will write to COW file as follows:
*
* Op-1: 20 -> 23
* Op-2: 19 -> 22
* Op-3: 18 -> 21
* Op-4: 17 -> 20
* Op-5: 16 -> 19
* Op-6: 15 -> 18
*
* Read-ahead thread will read all the 6 source blocks and store the data in the
* scratch space. Metadata will contain the destination block numbers. Thus,
* scratch space will look something like this:
*
* +--------------+
* | Block 23 |
* | offset - 1 |
* +--------------+
* | Block 22 |
* | offset - 2 |
* +--------------+
* | Block 21 |
* | offset - 3 |
* +--------------+
* ...
* ...
* +--------------+
* | Data-Block 20| <-- offset - 1
* +--------------+
* | Data-Block 19| <-- offset - 2
* +--------------+
* | Data-Block 18| <-- offset - 3
* +--------------+
* ...
* ...
*
* ====================================================================
* IO Path:
*
* Read-ahead will serve the data to worker threads during merge only
* after metadata and data are persisted to the scratch space. Worker
* threads during merge will always retrieve the data from cache; if the
* cache is not populated, it will wait for the read-ahead thread to finish.
* Furthermore, the number of operations merged will by synced to the header
* only when all the blocks in the read-ahead cache are merged. In the above
* case, when all 6 operations are merged, COW Header is updated with
* num_merge_ops = 6.
*
* Merge resume after crash:
*
* Let's say we have a crash after 5 operations are merged. i.e. after
* Op-5: 16->19 is completed but before the Op-6 is merged. Thus, COW Header
* num_merge_ops will be 0 as the all the ops were not merged yet. During next
* reboot, read-ahead thread will re-construct the data in-memory from the
* scratch space; when merge resumes, Op-1 will be re-exectued. However,
* data will be served from read-ahead cache safely even though, block 20
* was over-written by Op-4.
*
*/
ReadAheadThread::ReadAheadThread(const std::string& cow_device, const std::string& backing_device,
const std::string& misc_name,
std::shared_ptr<Snapuserd> snapuserd) {
cow_device_ = cow_device;
backing_store_device_ = backing_device;
misc_name_ = misc_name;
snapuserd_ = snapuserd;
}
void ReadAheadThread::PrepareReadAhead(uint64_t* source_block, int* pending_ops,
std::vector<uint64_t>& blocks) {
int num_ops = *pending_ops;
int nr_consecutive = 0;
if (!IterDone() && num_ops) {
// Get the first block
const CowOperation* cow_op = GetIterOp();
*source_block = cow_op->source;
IterNext();
num_ops -= 1;
nr_consecutive = 1;
blocks.push_back(cow_op->new_block);
/*
* Find number of consecutive blocks working backwards.
*/
while (!IterDone() && num_ops) {
const CowOperation* op = GetIterOp();
if (op->source != (*source_block - nr_consecutive)) {
break;
}
nr_consecutive += 1;
num_ops -= 1;
blocks.push_back(op->new_block);
IterNext();
}
}
}
bool ReadAheadThread::ReconstructDataFromCow() {
std::unordered_map<uint64_t, void*>& read_ahead_buffer_map = snapuserd_->GetReadAheadMap();
read_ahead_buffer_map.clear();
loff_t metadata_offset = 0;
loff_t start_data_offset = snapuserd_->GetBufferDataOffset();
int num_ops = 0;
int total_blocks_merged = 0;
while (true) {
struct ScratchMetadata* bm = reinterpret_cast<struct ScratchMetadata*>(
(char*)metadata_buffer_ + metadata_offset);
// Done reading metadata
if (bm->new_block == 0 && bm->file_offset == 0) {
break;
}
loff_t buffer_offset = bm->file_offset - start_data_offset;
void* bufptr = static_cast<void*>((char*)read_ahead_buffer_ + buffer_offset);
read_ahead_buffer_map[bm->new_block] = bufptr;
num_ops += 1;
total_blocks_merged += 1;
metadata_offset += sizeof(struct ScratchMetadata);
}
// We are done re-constructing the mapping; however, we need to make sure
// all the COW operations to-be merged are present in the re-constructed
// mapping.
while (!IterDone()) {
const CowOperation* op = GetIterOp();
if (read_ahead_buffer_map.find(op->new_block) != read_ahead_buffer_map.end()) {
num_ops -= 1;
snapuserd_->SetFinalBlockMerged(op->new_block);
IterNext();
} else {
// Verify that we have covered all the ops which were re-constructed
// from COW device - These are the ops which are being
// re-constructed after crash.
CHECK(num_ops == 0);
break;
}
}
snapuserd_->SetTotalRaBlocksMerged(total_blocks_merged);
snapuserd_->ReconstructDataFromCowFinish();
if (!snapuserd_->ReadAheadIOCompleted()) {
SNAP_LOG(ERROR) << "ReadAheadIOCompleted failed...";
snapuserd_->ReadAheadIOFailed();
return false;
}
SNAP_LOG(INFO) << "ReconstructDataFromCow success";
return true;
}
bool ReadAheadThread::ReadAheadIOStart() {
// Check if the data has to be constructed from the COW file.
// This will be true only once during boot up after a crash
// during merge.
if (snapuserd_->ReconstructDataFromCow()) {
return ReconstructDataFromCow();
}
std::unordered_map<uint64_t, void*>& read_ahead_buffer_map = snapuserd_->GetReadAheadMap();
read_ahead_buffer_map.clear();
int num_ops = (snapuserd_->GetBufferDataSize()) / BLOCK_SZ;
loff_t metadata_offset = 0;
struct ScratchMetadata* bm =
reinterpret_cast<struct ScratchMetadata*>((char*)metadata_buffer_ + metadata_offset);
bm->new_block = 0;
bm->file_offset = 0;
std::vector<uint64_t> blocks;
loff_t buffer_offset = 0;
loff_t offset = 0;
loff_t file_offset = snapuserd_->GetBufferDataOffset();
int total_blocks_merged = 0;
while (true) {
uint64_t source_block;
int linear_blocks;
PrepareReadAhead(&source_block, &num_ops, blocks);
linear_blocks = blocks.size();
if (linear_blocks == 0) {
// No more blocks to read
SNAP_LOG(DEBUG) << " Read-ahead completed....";
break;
}
// Get the first block in the consecutive set of blocks
source_block = source_block + 1 - linear_blocks;
size_t io_size = (linear_blocks * BLOCK_SZ);
num_ops -= linear_blocks;
total_blocks_merged += linear_blocks;
// Mark the block number as the one which will
// be the final block to be merged in this entire region.
// Read-ahead thread will get
// notified when this block is merged to make
// forward progress
snapuserd_->SetFinalBlockMerged(blocks.back());
while (linear_blocks) {
uint64_t new_block = blocks.back();
blocks.pop_back();
// Assign the mapping
void* bufptr = static_cast<void*>((char*)read_ahead_buffer_ + offset);
read_ahead_buffer_map[new_block] = bufptr;
offset += BLOCK_SZ;
bm = reinterpret_cast<struct ScratchMetadata*>((char*)metadata_buffer_ +
metadata_offset);
bm->new_block = new_block;
bm->file_offset = file_offset;
metadata_offset += sizeof(struct ScratchMetadata);
file_offset += BLOCK_SZ;
linear_blocks -= 1;
}
// Read from the base device consecutive set of blocks in one shot
if (!android::base::ReadFullyAtOffset(backing_store_fd_,
(char*)read_ahead_buffer_ + buffer_offset, io_size,
source_block * BLOCK_SZ)) {
SNAP_PLOG(ERROR) << "Copy-op failed. Read from backing store: " << backing_store_device_
<< "at block :" << source_block << " buffer_offset : " << buffer_offset
<< " io_size : " << io_size << " buf-addr : " << read_ahead_buffer_;
snapuserd_->ReadAheadIOFailed();
return false;
}
// This is important - explicitly set the contents to zero. This is used
// when re-constructing the data after crash. This indicates end of
// reading metadata contents when re-constructing the data
bm = reinterpret_cast<struct ScratchMetadata*>((char*)metadata_buffer_ + metadata_offset);
bm->new_block = 0;
bm->file_offset = 0;
buffer_offset += io_size;
CHECK(offset == buffer_offset);
CHECK((file_offset - snapuserd_->GetBufferDataOffset()) == offset);
}
snapuserd_->SetTotalRaBlocksMerged(total_blocks_merged);
if (!snapuserd_->ReadAheadIOCompleted()) {
SNAP_LOG(ERROR) << "ReadAheadIOCompleted failed...";
snapuserd_->ReadAheadIOFailed();
return false;
}
return true;
}
bool ReadAheadThread::RunThread() {
if (!InitializeFds()) {
return false;
}
InitializeIter();
InitializeBuffer();
while (!IterDone()) {
if (!ReadAheadIOStart()) {
return false;
}
bool status = snapuserd_->WaitForMergeToComplete();
if (status && !snapuserd_->CommitMerge(snapuserd_->GetTotalRaBlocksMerged())) {
return false;
}
if (!status) break;
}
CloseFds();
SNAP_LOG(INFO) << " ReadAhead thread terminating....";
return true;
}
// Initialization
bool ReadAheadThread::InitializeFds() {
backing_store_fd_.reset(open(backing_store_device_.c_str(), O_RDONLY));
if (backing_store_fd_ < 0) {
SNAP_PLOG(ERROR) << "Open Failed: " << backing_store_device_;
return false;
}
cow_fd_.reset(open(cow_device_.c_str(), O_RDWR));
if (cow_fd_ < 0) {
SNAP_PLOG(ERROR) << "Open Failed: " << cow_device_;
return false;
}
return true;
}
void ReadAheadThread::InitializeIter() {
std::vector<const CowOperation*>& read_ahead_ops = snapuserd_->GetReadAheadOpsVec();
read_ahead_iter_ = read_ahead_ops.rbegin();
}
bool ReadAheadThread::IterDone() {
std::vector<const CowOperation*>& read_ahead_ops = snapuserd_->GetReadAheadOpsVec();
return read_ahead_iter_ == read_ahead_ops.rend();
}
void ReadAheadThread::IterNext() {
read_ahead_iter_++;
}
const CowOperation* ReadAheadThread::GetIterOp() {
return *read_ahead_iter_;
}
void ReadAheadThread::InitializeBuffer() {
void* mapped_addr = snapuserd_->GetMappedAddr();
// Map the scratch space region into memory
metadata_buffer_ =
static_cast<void*>((char*)mapped_addr + snapuserd_->GetBufferMetadataOffset());
read_ahead_buffer_ = static_cast<void*>((char*)mapped_addr + snapuserd_->GetBufferDataOffset());
}
} // namespace snapshot
} // namespace android

View File

@ -135,14 +135,11 @@ bool WorkerThread::ProcessReplaceOp(const CowOperation* cow_op) {
return true;
}
// Start the copy operation. This will read the backing
// block device which is represented by cow_op->source.
bool WorkerThread::ProcessCopyOp(const CowOperation* cow_op) {
bool WorkerThread::ReadFromBaseDevice(const CowOperation* cow_op) {
void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ);
CHECK(buffer != nullptr);
// Issue a single 4K IO. However, this can be optimized
// if the successive blocks are contiguous.
SNAP_LOG(DEBUG) << " ReadFromBaseDevice...: new-block: " << cow_op->new_block
<< " Source: " << cow_op->source;
if (!android::base::ReadFullyAtOffset(backing_store_fd_, buffer, BLOCK_SZ,
cow_op->source * BLOCK_SZ)) {
SNAP_PLOG(ERROR) << "Copy-op failed. Read from backing store: " << backing_store_device_
@ -153,6 +150,31 @@ bool WorkerThread::ProcessCopyOp(const CowOperation* cow_op) {
return true;
}
bool WorkerThread::GetReadAheadPopulatedBuffer(const CowOperation* cow_op) {
void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ);
CHECK(buffer != nullptr);
if (!snapuserd_->GetReadAheadPopulatedBuffer(cow_op->new_block, buffer)) {
return false;
}
return true;
}
// Start the copy operation. This will read the backing
// block device which is represented by cow_op->source.
bool WorkerThread::ProcessCopyOp(const CowOperation* cow_op) {
if (!GetReadAheadPopulatedBuffer(cow_op)) {
SNAP_LOG(DEBUG) << " GetReadAheadPopulatedBuffer failed..."
<< " new_block: " << cow_op->new_block;
if (!ReadFromBaseDevice(cow_op)) {
return false;
}
}
return true;
}
bool WorkerThread::ProcessZeroOp() {
// Zero out the entire block
void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ);
@ -386,8 +408,10 @@ loff_t WorkerThread::GetMergeStartOffset(void* merged_buffer, void* unmerged_buf
}
int WorkerThread::GetNumberOfMergedOps(void* merged_buffer, void* unmerged_buffer, loff_t offset,
int unmerged_exceptions) {
int unmerged_exceptions, bool* copy_op, bool* commit) {
int merged_ops_cur_iter = 0;
std::unordered_map<uint64_t, void*>& read_ahead_buffer_map = snapuserd_->GetReadAheadMap();
*copy_op = false;
std::vector<std::pair<sector_t, const CowOperation*>>& chunk_vec = snapuserd_->GetChunkVec();
// Find the operations which are merged in this cycle.
@ -411,6 +435,23 @@ int WorkerThread::GetNumberOfMergedOps(void* merged_buffer, void* unmerged_buffe
const CowOperation* cow_op = it->second;
CHECK(cow_op != nullptr);
if (snapuserd_->IsReadAheadFeaturePresent() && cow_op->type == kCowCopyOp) {
*copy_op = true;
// Every single copy operation has to come from read-ahead
// cache.
if (read_ahead_buffer_map.find(cow_op->new_block) == read_ahead_buffer_map.end()) {
SNAP_LOG(ERROR)
<< " Block: " << cow_op->new_block << " not found in read-ahead cache"
<< " Source: " << cow_op->source;
return -1;
}
// If this is a final block merged in the read-ahead buffer
// region, notify the read-ahead thread to make forward
// progress
if (cow_op->new_block == snapuserd_->GetFinalBlockMerged()) {
*commit = true;
}
}
CHECK(cow_op->new_block == cow_de->old_chunk);
// zero out to indicate that operation is merged.
@ -442,6 +483,8 @@ int WorkerThread::GetNumberOfMergedOps(void* merged_buffer, void* unmerged_buffe
bool WorkerThread::ProcessMergeComplete(chunk_t chunk, void* buffer) {
uint32_t stride = exceptions_per_area_ + 1;
const std::vector<std::unique_ptr<uint8_t[]>>& vec = snapuserd_->GetMetadataVec();
bool copy_op = false;
bool commit = false;
// ChunkID to vector index
lldiv_t divresult = lldiv(chunk, stride);
@ -452,13 +495,24 @@ bool WorkerThread::ProcessMergeComplete(chunk_t chunk, void* buffer) {
int unmerged_exceptions = 0;
loff_t offset = GetMergeStartOffset(buffer, vec[divresult.quot].get(), &unmerged_exceptions);
int merged_ops_cur_iter =
GetNumberOfMergedOps(buffer, vec[divresult.quot].get(), offset, unmerged_exceptions);
int merged_ops_cur_iter = GetNumberOfMergedOps(buffer, vec[divresult.quot].get(), offset,
unmerged_exceptions, &copy_op, &commit);
// There should be at least one operation merged in this cycle
CHECK(merged_ops_cur_iter > 0);
if (!snapuserd_->CommitMerge(merged_ops_cur_iter)) {
return false;
if (copy_op) {
if (commit) {
// Push the flushing logic to read-ahead thread so that merge thread
// can make forward progress. Sync will happen in the background
snapuserd_->StartReadAhead();
}
} else {
// Non-copy ops and all ops in older COW format
if (!snapuserd_->CommitMerge(merged_ops_cur_iter)) {
SNAP_LOG(ERROR) << "CommitMerge failed...";
return false;
}
}
SNAP_LOG(DEBUG) << "Merge success: " << merged_ops_cur_iter << "chunk: " << chunk;