diff --git a/fs_mgr/libsnapshot/Android.bp b/fs_mgr/libsnapshot/Android.bp index 3cb412378..c97dca0ad 100644 --- a/fs_mgr/libsnapshot/Android.bp +++ b/fs_mgr/libsnapshot/Android.bp @@ -420,7 +420,8 @@ cc_defaults { "snapuserd_server.cpp", "snapuserd.cpp", "snapuserd_daemon.cpp", - "snapuserd_worker.cpp", + "snapuserd_worker.cpp", + "snapuserd_readahead.cpp", ], cflags: [ diff --git a/fs_mgr/libsnapshot/cow_reader.cpp b/fs_mgr/libsnapshot/cow_reader.cpp index c2e4f89df..35a02e654 100644 --- a/fs_mgr/libsnapshot/cow_reader.cpp +++ b/fs_mgr/libsnapshot/cow_reader.cpp @@ -369,13 +369,7 @@ void CowReader::InitializeMerge() { // Replace-op-4, Zero-op-9, Replace-op-5 } //============================================================== - for (uint64_t i = 0; i < ops_->size(); i++) { - auto& current_op = ops_->data()[i]; - if (current_op.type != kCowCopyOp) { - break; - } - num_copy_ops += 1; - } + num_copy_ops = FindNumCopyops(); std::sort(ops_.get()->begin() + num_copy_ops, ops_.get()->end(), [](CowOperation& op1, CowOperation& op2) -> bool { @@ -386,6 +380,23 @@ void CowReader::InitializeMerge() { CHECK(ops_->size() >= header_.num_merge_ops); ops_->erase(ops_.get()->begin(), ops_.get()->begin() + header_.num_merge_ops); } + + num_copy_ops = FindNumCopyops(); + set_copy_ops(num_copy_ops); +} + +uint64_t CowReader::FindNumCopyops() { + uint64_t num_copy_ops = 0; + + for (uint64_t i = 0; i < ops_->size(); i++) { + auto& current_op = ops_->data()[i]; + if (current_op.type != kCowCopyOp) { + break; + } + num_copy_ops += 1; + } + + return num_copy_ops; } bool CowReader::GetHeader(CowHeader* header) { diff --git a/fs_mgr/libsnapshot/include/libsnapshot/cow_format.h b/fs_mgr/libsnapshot/include/libsnapshot/cow_format.h index 060d363ee..c05b7efed 100644 --- a/fs_mgr/libsnapshot/include/libsnapshot/cow_format.h +++ b/fs_mgr/libsnapshot/include/libsnapshot/cow_format.h @@ -35,6 +35,8 @@ static constexpr uint32_t BLOCK_SHIFT = (__builtin_ffs(BLOCK_SZ) - 1); // +-----------------------+ // | Header (fixed) | // +-----------------------+ +// | Scratch space | +// +-----------------------+ // | Operation (variable) | // | Data (variable) | // +-----------------------+ @@ -152,11 +154,28 @@ static constexpr uint8_t kCowCompressNone = 0; static constexpr uint8_t kCowCompressGz = 1; static constexpr uint8_t kCowCompressBrotli = 2; +static constexpr uint8_t kCowReadAheadNotStarted = 0; +static constexpr uint8_t kCowReadAheadInProgress = 1; +static constexpr uint8_t kCowReadAheadDone = 2; + struct CowFooter { CowFooterOperation op; CowFooterData data; } __attribute__((packed)); +struct ScratchMetadata { + // Block of data in the image that operation modifies + // and read-ahead thread stores the modified data + // in the scratch space + uint64_t new_block; + // Offset within the file to read the data + uint64_t file_offset; +} __attribute__((packed)); + +struct BufferState { + uint8_t read_ahead_state; +} __attribute__((packed)); + // 2MB Scratch space used for read-ahead static constexpr uint64_t BUFFER_REGION_DEFAULT_SIZE = (1ULL << 21); diff --git a/fs_mgr/libsnapshot/include/libsnapshot/cow_reader.h b/fs_mgr/libsnapshot/include/libsnapshot/cow_reader.h index 552fd96d1..9ebcfd983 100644 --- a/fs_mgr/libsnapshot/include/libsnapshot/cow_reader.h +++ b/fs_mgr/libsnapshot/include/libsnapshot/cow_reader.h @@ -141,18 +141,21 @@ class CowReader : public ICowReader { bool GetRawBytes(uint64_t offset, void* buffer, size_t len, size_t* read); - void UpdateMergeProgress(uint64_t merge_ops) { header_.num_merge_ops += merge_ops; } - void InitializeMerge(); void set_total_data_ops(uint64_t size) { total_data_ops_ = size; } uint64_t total_data_ops() { return total_data_ops_; } + void set_copy_ops(uint64_t size) { copy_ops_ = size; } + + uint64_t total_copy_ops() { return copy_ops_; } + void CloseCowFd() { owned_fd_ = {}; } private: bool ParseOps(std::optional label); + uint64_t FindNumCopyops(); android::base::unique_fd owned_fd_; android::base::borrowed_fd fd_; @@ -162,6 +165,7 @@ class CowReader : public ICowReader { std::optional last_label_; std::shared_ptr> ops_; uint64_t total_data_ops_; + uint64_t copy_ops_; }; } // namespace snapshot diff --git a/fs_mgr/libsnapshot/snapuserd.cpp b/fs_mgr/libsnapshot/snapuserd.cpp index d104bd8d3..e6af7776a 100644 --- a/fs_mgr/libsnapshot/snapuserd.cpp +++ b/fs_mgr/libsnapshot/snapuserd.cpp @@ -47,6 +47,9 @@ bool Snapuserd::InitializeWorkers() { worker_threads_.push_back(std::move(wt)); } + + read_ahead_thread_ = std::make_unique(cow_device_, backing_store_device_, + misc_name_, GetSharedPtr()); return true; } @@ -54,7 +57,11 @@ bool Snapuserd::CommitMerge(int num_merge_ops) { struct CowHeader* ch = reinterpret_cast(mapped_addr_); ch->num_merge_ops += num_merge_ops; - // Sync the first 4k block + if (read_ahead_feature_ && read_ahead_ops_.size() > 0) { + struct BufferState* ra_state = GetBufferState(); + ra_state->read_ahead_state = kCowReadAheadInProgress; + } + int ret = msync(mapped_addr_, BLOCK_SZ, MS_SYNC); if (ret < 0) { PLOG(ERROR) << "msync header failed: " << ret; @@ -66,6 +73,174 @@ bool Snapuserd::CommitMerge(int num_merge_ops) { return true; } +void Snapuserd::PrepareReadAhead() { + if (!read_ahead_feature_) { + return; + } + + struct BufferState* ra_state = GetBufferState(); + // Check if the data has to be re-constructed from COW device + if (ra_state->read_ahead_state == kCowReadAheadDone) { + populate_data_from_cow_ = true; + } else { + populate_data_from_cow_ = false; + } + + StartReadAhead(); +} + +bool Snapuserd::GetRABuffer(std::unique_lock* lock, uint64_t block, void* buffer) { + CHECK(lock->owns_lock()); + std::unordered_map::iterator it = read_ahead_buffer_map_.find(block); + + // This will be true only for IO's generated as part of reading a root + // filesystem. IO's related to merge should always be in read-ahead cache. + if (it == read_ahead_buffer_map_.end()) { + return false; + } + + // Theoretically, we can send the data back from the read-ahead buffer + // all the way to the kernel without memcpy. However, if the IO is + // un-aligned, the wrapper function will need to touch the read-ahead + // buffers and transitions will be bit more complicated. + memcpy(buffer, it->second, BLOCK_SZ); + return true; +} + +// ========== State transition functions for read-ahead operations =========== + +bool Snapuserd::GetReadAheadPopulatedBuffer(uint64_t block, void* buffer) { + if (!read_ahead_feature_) { + return false; + } + + { + std::unique_lock lock(lock_); + if (io_state_ == READ_AHEAD_IO_TRANSITION::READ_AHEAD_FAILURE) { + return false; + } + + if (io_state_ == READ_AHEAD_IO_TRANSITION::IO_IN_PROGRESS) { + return GetRABuffer(&lock, block, buffer); + } + } + + { + // Read-ahead thread IO is in-progress. Wait for it to complete + std::unique_lock lock(lock_); + while (!(io_state_ == READ_AHEAD_IO_TRANSITION::READ_AHEAD_FAILURE || + io_state_ == READ_AHEAD_IO_TRANSITION::IO_IN_PROGRESS)) { + cv.wait(lock); + } + + return GetRABuffer(&lock, block, buffer); + } +} + +// This is invoked by read-ahead thread waiting for merge IO's +// to complete +bool Snapuserd::WaitForMergeToComplete() { + { + std::unique_lock lock(lock_); + while (!(io_state_ == READ_AHEAD_IO_TRANSITION::READ_AHEAD_BEGIN || + io_state_ == READ_AHEAD_IO_TRANSITION::IO_TERMINATED)) { + cv.wait(lock); + } + + if (io_state_ == READ_AHEAD_IO_TRANSITION::IO_TERMINATED) { + return false; + } + + io_state_ = READ_AHEAD_IO_TRANSITION::READ_AHEAD_IN_PROGRESS; + return true; + } +} + +// This is invoked during the launch of worker threads. We wait +// for read-ahead thread to by fully up before worker threads +// are launched; else we will have a race between worker threads +// and read-ahead thread specifically during re-construction. +bool Snapuserd::WaitForReadAheadToStart() { + { + std::unique_lock lock(lock_); + while (!(io_state_ == READ_AHEAD_IO_TRANSITION::IO_IN_PROGRESS || + io_state_ == READ_AHEAD_IO_TRANSITION::READ_AHEAD_FAILURE)) { + cv.wait(lock); + } + + if (io_state_ == READ_AHEAD_IO_TRANSITION::READ_AHEAD_FAILURE) { + return false; + } + + return true; + } +} + +// Invoked by worker threads when a sequence of merge operation +// is complete notifying read-ahead thread to make forward +// progress. +void Snapuserd::StartReadAhead() { + { + std::lock_guard lock(lock_); + io_state_ = READ_AHEAD_IO_TRANSITION::READ_AHEAD_BEGIN; + } + + cv.notify_one(); +} + +void Snapuserd::MergeCompleted() { + { + std::lock_guard lock(lock_); + io_state_ = READ_AHEAD_IO_TRANSITION::IO_TERMINATED; + } + + cv.notify_one(); +} + +bool Snapuserd::ReadAheadIOCompleted() { + // Flush the entire buffer region + int ret = msync(mapped_addr_, total_mapped_addr_length_, MS_SYNC); + if (ret < 0) { + PLOG(ERROR) << "msync failed after ReadAheadIOCompleted: " << ret; + return false; + } + + // Metadata and data are synced. Now, update the state. + // We need to update the state after flushing data; if there is a crash + // when read-ahead IO is in progress, the state of data in the COW file + // is unknown. kCowReadAheadDone acts as a checkpoint wherein the data + // in the scratch space is good and during next reboot, read-ahead thread + // can safely re-construct the data. + struct BufferState* ra_state = GetBufferState(); + ra_state->read_ahead_state = kCowReadAheadDone; + + ret = msync(mapped_addr_, BLOCK_SZ, MS_SYNC); + if (ret < 0) { + PLOG(ERROR) << "msync failed to flush Readahead completion state..."; + return false; + } + + // Notify the worker threads + { + std::lock_guard lock(lock_); + io_state_ = READ_AHEAD_IO_TRANSITION::IO_IN_PROGRESS; + } + + cv.notify_all(); + return true; +} + +void Snapuserd::ReadAheadIOFailed() { + { + std::lock_guard lock(lock_); + io_state_ = READ_AHEAD_IO_TRANSITION::READ_AHEAD_FAILURE; + } + + cv.notify_all(); +} + +//========== End of state transition functions ==================== + bool Snapuserd::IsChunkIdMetadata(chunk_t chunk) { uint32_t stride = exceptions_per_area_ + 1; lldiv_t divresult = lldiv(chunk, stride); @@ -257,13 +432,16 @@ bool Snapuserd::ReadMetadata() { data_chunk_id = GetNextAllocatableChunkId(data_chunk_id); } + int num_ra_ops_per_iter = ((GetBufferDataSize()) / BLOCK_SZ); std::optional prev_id = {}; std::map map; std::set dest_blocks; size_t pending_copy_ops = exceptions_per_area_ - num_ops; - SNAP_LOG(INFO) << " Processing copy-ops at Area: " << vec_.size() - << " Number of replace/zero ops completed in this area: " << num_ops - << " Pending copy ops for this area: " << pending_copy_ops; + uint64_t total_copy_ops = reader_->total_copy_ops(); + + SNAP_LOG(DEBUG) << " Processing copy-ops at Area: " << vec_.size() + << " Number of replace/zero ops completed in this area: " << num_ops + << " Pending copy ops for this area: " << pending_copy_ops; while (!cowop_riter_->Done()) { do { const CowOperation* cow_op = &cowop_riter_->Get(); @@ -425,6 +603,9 @@ bool Snapuserd::ReadMetadata() { offset += sizeof(struct disk_exception); num_ops += 1; copy_ops++; + if (read_ahead_feature_) { + read_ahead_ops_.push_back(it->second); + } SNAP_LOG(DEBUG) << num_ops << ":" << " Copy-op: " @@ -452,6 +633,15 @@ bool Snapuserd::ReadMetadata() { } data_chunk_id = GetNextAllocatableChunkId(data_chunk_id); + total_copy_ops -= 1; + /* + * Split the number of ops based on the size of read-ahead buffer + * region. We need to ensure that kernel doesn't issue IO on blocks + * which are not read by the read-ahead thread. + */ + if (read_ahead_feature_ && (total_copy_ops % num_ra_ops_per_iter == 0)) { + data_chunk_id = GetNextAllocatableChunkId(data_chunk_id); + } } map.clear(); dest_blocks.clear(); @@ -469,6 +659,7 @@ bool Snapuserd::ReadMetadata() { chunk_vec_.shrink_to_fit(); vec_.shrink_to_fit(); + read_ahead_ops_.shrink_to_fit(); // Sort the vector based on sectors as we need this during un-aligned access std::sort(chunk_vec_.begin(), chunk_vec_.end(), compare); @@ -483,6 +674,8 @@ bool Snapuserd::ReadMetadata() { // Total number of sectors required for creating dm-user device num_sectors_ = ChunkToSector(data_chunk_id); merge_initiated_ = false; + PrepareReadAhead(); + return true; } @@ -490,8 +683,15 @@ bool Snapuserd::MmapMetadata() { CowHeader header; reader_->GetHeader(&header); - // mmap the first 4k page - total_mapped_addr_length_ = BLOCK_SZ; + if (header.major_version >= 2 && header.buffer_size > 0) { + total_mapped_addr_length_ = header.header_size + BUFFER_REGION_DEFAULT_SIZE; + read_ahead_feature_ = true; + } else { + // mmap the first 4k page - older COW format + total_mapped_addr_length_ = BLOCK_SZ; + read_ahead_feature_ = false; + } + mapped_addr_ = mmap(NULL, total_mapped_addr_length_, PROT_READ | PROT_WRITE, MAP_SHARED, cow_fd_.get(), 0); if (mapped_addr_ == MAP_FAILED) { @@ -529,11 +729,26 @@ bool Snapuserd::InitCowDevice() { } /* - * Entry point to launch worker threads + * Entry point to launch threads */ bool Snapuserd::Start() { std::vector> threads; + std::future ra_thread; + bool rathread = (read_ahead_feature_ && (read_ahead_ops_.size() > 0)); + // Start the read-ahead thread and wait + // for it as the data has to be re-constructed + // from COW device. + if (rathread) { + ra_thread = std::async(std::launch::async, &ReadAheadThread::RunThread, + read_ahead_thread_.get()); + if (!WaitForReadAheadToStart()) { + SNAP_LOG(ERROR) << "Failed to start Read-ahead thread..."; + return false; + } + } + + // Launch worker threads for (int i = 0; i < worker_threads_.size(); i++) { threads.emplace_back( std::async(std::launch::async, &WorkerThread::RunThread, worker_threads_[i].get())); @@ -544,8 +759,69 @@ bool Snapuserd::Start() { ret = t.get() && ret; } + if (rathread) { + // Notify the read-ahead thread that all worker threads + // are done. We need this explicit notification when + // there is an IO failure or there was a switch + // of dm-user table; thus, forcing the read-ahead + // thread to wake up. + MergeCompleted(); + ret = ret && ra_thread.get(); + } + return ret; } +uint64_t Snapuserd::GetBufferMetadataOffset() { + CowHeader header; + reader_->GetHeader(&header); + + size_t size = header.header_size + sizeof(BufferState); + return size; +} + +/* + * Metadata for read-ahead is 16 bytes. For a 2 MB region, we will + * end up with 8k (2 PAGE) worth of metadata. Thus, a 2MB buffer + * region is split into: + * + * 1: 8k metadata + * + */ +size_t Snapuserd::GetBufferMetadataSize() { + CowHeader header; + reader_->GetHeader(&header); + + size_t metadata_bytes = (header.buffer_size * sizeof(struct ScratchMetadata)) / BLOCK_SZ; + return metadata_bytes; +} + +size_t Snapuserd::GetBufferDataOffset() { + CowHeader header; + reader_->GetHeader(&header); + + return (header.header_size + GetBufferMetadataSize()); +} + +/* + * (2MB - 8K = 2088960 bytes) will be the buffer region to hold the data. + */ +size_t Snapuserd::GetBufferDataSize() { + CowHeader header; + reader_->GetHeader(&header); + + size_t size = header.buffer_size - GetBufferMetadataSize(); + return size; +} + +struct BufferState* Snapuserd::GetBufferState() { + CowHeader header; + reader_->GetHeader(&header); + + struct BufferState* ra_state = + reinterpret_cast((char*)mapped_addr_ + header.header_size); + return ra_state; +} + } // namespace snapshot } // namespace android diff --git a/fs_mgr/libsnapshot/snapuserd.h b/fs_mgr/libsnapshot/snapuserd.h index 65af553c4..cd9d82a49 100644 --- a/fs_mgr/libsnapshot/snapuserd.h +++ b/fs_mgr/libsnapshot/snapuserd.h @@ -20,6 +20,7 @@ #include #include +#include #include #include #include @@ -57,6 +58,35 @@ static_assert(PAYLOAD_SIZE >= BLOCK_SZ); */ static constexpr int NUM_THREADS_PER_PARTITION = 4; +/* + * State transitions between worker threads and read-ahead + * threads. + * + * READ_AHEAD_BEGIN: Worker threads initiates the read-ahead + * thread to begin reading the copy operations + * for each bounded region. + * + * READ_AHEAD_IN_PROGRESS: When read ahead thread is in-flight + * and reading the copy operations. + * + * IO_IN_PROGRESS: Merge operation is in-progress by worker threads. + * + * IO_TERMINATED: When all the worker threads are done, request the + * read-ahead thread to terminate + * + * READ_AHEAD_FAILURE: If there are any IO failures when read-ahead + * thread is reading from COW device. + * + * The transition of each states is described in snapuserd_readahead.cpp + */ +enum class READ_AHEAD_IO_TRANSITION { + READ_AHEAD_BEGIN, + READ_AHEAD_IN_PROGRESS, + IO_IN_PROGRESS, + IO_TERMINATED, + READ_AHEAD_FAILURE, +}; + class BufferSink : public IByteSink { public: void Initialize(size_t size); @@ -77,6 +107,42 @@ class BufferSink : public IByteSink { class Snapuserd; +class ReadAheadThread { + public: + ReadAheadThread(const std::string& cow_device, const std::string& backing_device, + const std::string& misc_name, std::shared_ptr snapuserd); + bool RunThread(); + + private: + void InitializeIter(); + bool IterDone(); + void IterNext(); + const CowOperation* GetIterOp(); + void InitializeBuffer(); + + bool InitializeFds(); + void CloseFds() { + cow_fd_ = {}; + backing_store_fd_ = {}; + } + + bool ReadAheadIOStart(); + void PrepareReadAhead(uint64_t* source_block, int* pending_ops, std::vector& blocks); + bool ReconstructDataFromCow(); + + void* read_ahead_buffer_; + void* metadata_buffer_; + std::vector::reverse_iterator read_ahead_iter_; + std::string cow_device_; + std::string backing_store_device_; + std::string misc_name_; + + unique_fd cow_fd_; + unique_fd backing_store_fd_; + + std::shared_ptr snapuserd_; +}; + class WorkerThread { public: WorkerThread(const std::string& cow_device, const std::string& backing_device, @@ -117,12 +183,16 @@ class WorkerThread { bool ProcessCopyOp(const CowOperation* cow_op); bool ProcessZeroOp(); + bool ReadFromBaseDevice(const CowOperation* cow_op); + bool GetReadAheadPopulatedBuffer(const CowOperation* cow_op); + // Merge related functions bool ProcessMergeComplete(chunk_t chunk, void* buffer); loff_t GetMergeStartOffset(void* merged_buffer, void* unmerged_buffer, int* unmerged_exceptions); + int GetNumberOfMergedOps(void* merged_buffer, void* unmerged_buffer, loff_t offset, - int unmerged_exceptions); + int unmerged_exceptions, bool* copy_op, bool* commit); sector_t ChunkToSector(chunk_t chunk) { return chunk << CHUNK_SHIFT; } chunk_t SectorToChunk(sector_t sector) { return sector >> CHUNK_SHIFT; } @@ -159,7 +229,10 @@ class Snapuserd : public std::enable_shared_from_this { bool CommitMerge(int num_merge_ops); void CloseFds() { cow_fd_ = {}; } - void FreeResources() { worker_threads_.clear(); } + void FreeResources() { + worker_threads_.clear(); + read_ahead_thread_ = nullptr; + } size_t GetMetadataAreaSize() { return vec_.size(); } void* GetExceptionBuffer(size_t i) { return vec_[i].get(); } @@ -177,14 +250,44 @@ class Snapuserd : public std::enable_shared_from_this { void UnmapBufferRegion(); bool MmapMetadata(); + // Read-ahead related functions + std::vector& GetReadAheadOpsVec() { return read_ahead_ops_; } + std::unordered_map& GetReadAheadMap() { return read_ahead_buffer_map_; } + void* GetMappedAddr() { return mapped_addr_; } + bool IsReadAheadFeaturePresent() { return read_ahead_feature_; } + void PrepareReadAhead(); + void StartReadAhead(); + void MergeCompleted(); + bool ReadAheadIOCompleted(); + void ReadAheadIOFailed(); + bool WaitForMergeToComplete(); + bool GetReadAheadPopulatedBuffer(uint64_t block, void* buffer); + bool ReconstructDataFromCow() { return populate_data_from_cow_; } + void ReconstructDataFromCowFinish() { populate_data_from_cow_ = false; } + bool WaitForReadAheadToStart(); + + uint64_t GetBufferMetadataOffset(); + size_t GetBufferMetadataSize(); + size_t GetBufferDataOffset(); + size_t GetBufferDataSize(); + + // Final block to be merged in a given read-ahead buffer region + void SetFinalBlockMerged(uint64_t x) { final_block_merged_ = x; } + uint64_t GetFinalBlockMerged() { return final_block_merged_; } + // Total number of blocks to be merged in a given read-ahead buffer region + void SetTotalRaBlocksMerged(int x) { total_ra_blocks_merged_ = x; } + int GetTotalRaBlocksMerged() { return total_ra_blocks_merged_; } + private: bool IsChunkIdMetadata(chunk_t chunk); chunk_t GetNextAllocatableChunkId(chunk_t chunk_id); + bool GetRABuffer(std::unique_lock* lock, uint64_t block, void* buffer); bool ReadMetadata(); sector_t ChunkToSector(chunk_t chunk) { return chunk << CHUNK_SHIFT; } chunk_t SectorToChunk(sector_t sector) { return sector >> CHUNK_SHIFT; } bool IsBlockAligned(int read_size) { return ((read_size & (BLOCK_SZ - 1)) == 0); } + struct BufferState* GetBufferState(); std::string cow_device_; std::string backing_store_device_; @@ -209,11 +312,22 @@ class Snapuserd : public std::enable_shared_from_this { std::vector> chunk_vec_; std::mutex lock_; + std::condition_variable cv; void* mapped_addr_; size_t total_mapped_addr_length_; std::vector> worker_threads_; + // Read-ahead related + std::unordered_map read_ahead_buffer_map_; + std::vector read_ahead_ops_; + bool populate_data_from_cow_ = false; + bool read_ahead_feature_; + uint64_t final_block_merged_; + int total_ra_blocks_merged_ = 0; + READ_AHEAD_IO_TRANSITION io_state_; + std::unique_ptr read_ahead_thread_; + bool merge_initiated_ = false; bool attached_ = false; }; diff --git a/fs_mgr/libsnapshot/snapuserd_readahead.cpp b/fs_mgr/libsnapshot/snapuserd_readahead.cpp new file mode 100644 index 000000000..d60a35306 --- /dev/null +++ b/fs_mgr/libsnapshot/snapuserd_readahead.cpp @@ -0,0 +1,439 @@ +/* + * Copyright (C) 2021 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "snapuserd.h" + +#include +#include +#include + +#include + +namespace android { +namespace snapshot { + +using namespace android; +using namespace android::dm; +using android::base::unique_fd; + +#define SNAP_LOG(level) LOG(level) << misc_name_ << ": " +#define SNAP_PLOG(level) PLOG(level) << misc_name_ << ": " + +/* + * Merging a copy operation involves the following flow: + * + * 1: dm-snapshot layer requests merge for a 4k block. dm-user sends the request + * to the daemon + * 2: daemon reads the source block + * 3: daemon copies the source data + * 4: IO completion sent back to dm-user (a switch from user space to kernel) + * 5: dm-snapshot merges the data to base device + * 6: dm-snapshot sends the merge-completion IO to dm-user + * 7: dm-user re-directs the merge completion IO to daemon (one more switch) + * 8: daemon updates the COW file about the completed merge request (a write syscall) and followed + * by a fysnc. 9: Send the IO completion back to dm-user + * + * The above sequence is a significant overhead especially when merging one 4k + * block at a time. + * + * Read-ahead layer will optimize the above path by reading the data from base + * device in the background so that merging thread can retrieve the data from + * the read-ahead cache. Additionally, syncing of merged data is deferred to + * read-ahead thread threadby the IO path is not bottlenecked. + * + * We create a scratch space of 2MB to store the read-ahead data in the COW + * device. + * + * +-----------------------+ + * | Header (fixed) | + * +-----------------------+ + * | Scratch space | <-- 2MB + * +-----------------------+ + * + * Scratch space is as follows: + * + * +-----------------------+ + * | Metadata | <- 4k page + * +-----------------------+ + * | Metadata | <- 4k page + * +-----------------------+ + * | | + * | Read-ahead data | + * | | + * +-----------------------+ + * + * State transitions and communication between read-ahead thread and worker + * thread during merge: + * ===================================================================== + * + * Worker Threads Read-Ahead thread + * ------------------------------------------------------------------ + * + * | + * | + * --> -----------------READ_AHEAD_BEGIN------------->| + * | | | READ_AHEAD_IN_PROGRESS + * | WAIT | + * | | | + * | |<-----------------IO_IN_PROGRESS--------------- + * | | | + * | | IO_IN_PRGRESS WAIT + * | | | + * |<--| | + * | | + * ------------------IO_TERMINATED--------------->| + * END + * + * + * =================================================================== + * + * Example: + * + * We have 6 copy operations to be executed in OTA and there is a overlap. Update-engine + * will write to COW file as follows: + * + * Op-1: 20 -> 23 + * Op-2: 19 -> 22 + * Op-3: 18 -> 21 + * Op-4: 17 -> 20 + * Op-5: 16 -> 19 + * Op-6: 15 -> 18 + * + * Read-ahead thread will read all the 6 source blocks and store the data in the + * scratch space. Metadata will contain the destination block numbers. Thus, + * scratch space will look something like this: + * + * +--------------+ + * | Block 23 | + * | offset - 1 | + * +--------------+ + * | Block 22 | + * | offset - 2 | + * +--------------+ + * | Block 21 | + * | offset - 3 | + * +--------------+ + * ... + * ... + * +--------------+ + * | Data-Block 20| <-- offset - 1 + * +--------------+ + * | Data-Block 19| <-- offset - 2 + * +--------------+ + * | Data-Block 18| <-- offset - 3 + * +--------------+ + * ... + * ... + * + * ==================================================================== + * IO Path: + * + * Read-ahead will serve the data to worker threads during merge only + * after metadata and data are persisted to the scratch space. Worker + * threads during merge will always retrieve the data from cache; if the + * cache is not populated, it will wait for the read-ahead thread to finish. + * Furthermore, the number of operations merged will by synced to the header + * only when all the blocks in the read-ahead cache are merged. In the above + * case, when all 6 operations are merged, COW Header is updated with + * num_merge_ops = 6. + * + * Merge resume after crash: + * + * Let's say we have a crash after 5 operations are merged. i.e. after + * Op-5: 16->19 is completed but before the Op-6 is merged. Thus, COW Header + * num_merge_ops will be 0 as the all the ops were not merged yet. During next + * reboot, read-ahead thread will re-construct the data in-memory from the + * scratch space; when merge resumes, Op-1 will be re-exectued. However, + * data will be served from read-ahead cache safely even though, block 20 + * was over-written by Op-4. + * + */ + +ReadAheadThread::ReadAheadThread(const std::string& cow_device, const std::string& backing_device, + const std::string& misc_name, + std::shared_ptr snapuserd) { + cow_device_ = cow_device; + backing_store_device_ = backing_device; + misc_name_ = misc_name; + snapuserd_ = snapuserd; +} + +void ReadAheadThread::PrepareReadAhead(uint64_t* source_block, int* pending_ops, + std::vector& blocks) { + int num_ops = *pending_ops; + int nr_consecutive = 0; + + if (!IterDone() && num_ops) { + // Get the first block + const CowOperation* cow_op = GetIterOp(); + *source_block = cow_op->source; + IterNext(); + num_ops -= 1; + nr_consecutive = 1; + blocks.push_back(cow_op->new_block); + + /* + * Find number of consecutive blocks working backwards. + */ + while (!IterDone() && num_ops) { + const CowOperation* op = GetIterOp(); + if (op->source != (*source_block - nr_consecutive)) { + break; + } + nr_consecutive += 1; + num_ops -= 1; + blocks.push_back(op->new_block); + IterNext(); + } + } +} + +bool ReadAheadThread::ReconstructDataFromCow() { + std::unordered_map& read_ahead_buffer_map = snapuserd_->GetReadAheadMap(); + read_ahead_buffer_map.clear(); + loff_t metadata_offset = 0; + loff_t start_data_offset = snapuserd_->GetBufferDataOffset(); + int num_ops = 0; + int total_blocks_merged = 0; + + while (true) { + struct ScratchMetadata* bm = reinterpret_cast( + (char*)metadata_buffer_ + metadata_offset); + + // Done reading metadata + if (bm->new_block == 0 && bm->file_offset == 0) { + break; + } + + loff_t buffer_offset = bm->file_offset - start_data_offset; + void* bufptr = static_cast((char*)read_ahead_buffer_ + buffer_offset); + read_ahead_buffer_map[bm->new_block] = bufptr; + num_ops += 1; + total_blocks_merged += 1; + + metadata_offset += sizeof(struct ScratchMetadata); + } + + // We are done re-constructing the mapping; however, we need to make sure + // all the COW operations to-be merged are present in the re-constructed + // mapping. + while (!IterDone()) { + const CowOperation* op = GetIterOp(); + if (read_ahead_buffer_map.find(op->new_block) != read_ahead_buffer_map.end()) { + num_ops -= 1; + snapuserd_->SetFinalBlockMerged(op->new_block); + IterNext(); + } else { + // Verify that we have covered all the ops which were re-constructed + // from COW device - These are the ops which are being + // re-constructed after crash. + CHECK(num_ops == 0); + break; + } + } + + snapuserd_->SetTotalRaBlocksMerged(total_blocks_merged); + + snapuserd_->ReconstructDataFromCowFinish(); + + if (!snapuserd_->ReadAheadIOCompleted()) { + SNAP_LOG(ERROR) << "ReadAheadIOCompleted failed..."; + snapuserd_->ReadAheadIOFailed(); + return false; + } + + SNAP_LOG(INFO) << "ReconstructDataFromCow success"; + return true; +} + +bool ReadAheadThread::ReadAheadIOStart() { + // Check if the data has to be constructed from the COW file. + // This will be true only once during boot up after a crash + // during merge. + if (snapuserd_->ReconstructDataFromCow()) { + return ReconstructDataFromCow(); + } + + std::unordered_map& read_ahead_buffer_map = snapuserd_->GetReadAheadMap(); + read_ahead_buffer_map.clear(); + + int num_ops = (snapuserd_->GetBufferDataSize()) / BLOCK_SZ; + loff_t metadata_offset = 0; + + struct ScratchMetadata* bm = + reinterpret_cast((char*)metadata_buffer_ + metadata_offset); + + bm->new_block = 0; + bm->file_offset = 0; + + std::vector blocks; + + loff_t buffer_offset = 0; + loff_t offset = 0; + loff_t file_offset = snapuserd_->GetBufferDataOffset(); + int total_blocks_merged = 0; + + while (true) { + uint64_t source_block; + int linear_blocks; + + PrepareReadAhead(&source_block, &num_ops, blocks); + linear_blocks = blocks.size(); + if (linear_blocks == 0) { + // No more blocks to read + SNAP_LOG(DEBUG) << " Read-ahead completed...."; + break; + } + + // Get the first block in the consecutive set of blocks + source_block = source_block + 1 - linear_blocks; + size_t io_size = (linear_blocks * BLOCK_SZ); + num_ops -= linear_blocks; + total_blocks_merged += linear_blocks; + + // Mark the block number as the one which will + // be the final block to be merged in this entire region. + // Read-ahead thread will get + // notified when this block is merged to make + // forward progress + snapuserd_->SetFinalBlockMerged(blocks.back()); + + while (linear_blocks) { + uint64_t new_block = blocks.back(); + blocks.pop_back(); + // Assign the mapping + void* bufptr = static_cast((char*)read_ahead_buffer_ + offset); + read_ahead_buffer_map[new_block] = bufptr; + offset += BLOCK_SZ; + + bm = reinterpret_cast((char*)metadata_buffer_ + + metadata_offset); + bm->new_block = new_block; + bm->file_offset = file_offset; + + metadata_offset += sizeof(struct ScratchMetadata); + file_offset += BLOCK_SZ; + + linear_blocks -= 1; + } + + // Read from the base device consecutive set of blocks in one shot + if (!android::base::ReadFullyAtOffset(backing_store_fd_, + (char*)read_ahead_buffer_ + buffer_offset, io_size, + source_block * BLOCK_SZ)) { + SNAP_PLOG(ERROR) << "Copy-op failed. Read from backing store: " << backing_store_device_ + << "at block :" << source_block << " buffer_offset : " << buffer_offset + << " io_size : " << io_size << " buf-addr : " << read_ahead_buffer_; + + snapuserd_->ReadAheadIOFailed(); + return false; + } + + // This is important - explicitly set the contents to zero. This is used + // when re-constructing the data after crash. This indicates end of + // reading metadata contents when re-constructing the data + bm = reinterpret_cast((char*)metadata_buffer_ + metadata_offset); + bm->new_block = 0; + bm->file_offset = 0; + + buffer_offset += io_size; + CHECK(offset == buffer_offset); + CHECK((file_offset - snapuserd_->GetBufferDataOffset()) == offset); + } + + snapuserd_->SetTotalRaBlocksMerged(total_blocks_merged); + + if (!snapuserd_->ReadAheadIOCompleted()) { + SNAP_LOG(ERROR) << "ReadAheadIOCompleted failed..."; + snapuserd_->ReadAheadIOFailed(); + return false; + } + + return true; +} + +bool ReadAheadThread::RunThread() { + if (!InitializeFds()) { + return false; + } + + InitializeIter(); + InitializeBuffer(); + + while (!IterDone()) { + if (!ReadAheadIOStart()) { + return false; + } + + bool status = snapuserd_->WaitForMergeToComplete(); + + if (status && !snapuserd_->CommitMerge(snapuserd_->GetTotalRaBlocksMerged())) { + return false; + } + + if (!status) break; + } + + CloseFds(); + SNAP_LOG(INFO) << " ReadAhead thread terminating...."; + return true; +} + +// Initialization +bool ReadAheadThread::InitializeFds() { + backing_store_fd_.reset(open(backing_store_device_.c_str(), O_RDONLY)); + if (backing_store_fd_ < 0) { + SNAP_PLOG(ERROR) << "Open Failed: " << backing_store_device_; + return false; + } + + cow_fd_.reset(open(cow_device_.c_str(), O_RDWR)); + if (cow_fd_ < 0) { + SNAP_PLOG(ERROR) << "Open Failed: " << cow_device_; + return false; + } + + return true; +} + +void ReadAheadThread::InitializeIter() { + std::vector& read_ahead_ops = snapuserd_->GetReadAheadOpsVec(); + read_ahead_iter_ = read_ahead_ops.rbegin(); +} + +bool ReadAheadThread::IterDone() { + std::vector& read_ahead_ops = snapuserd_->GetReadAheadOpsVec(); + return read_ahead_iter_ == read_ahead_ops.rend(); +} + +void ReadAheadThread::IterNext() { + read_ahead_iter_++; +} + +const CowOperation* ReadAheadThread::GetIterOp() { + return *read_ahead_iter_; +} + +void ReadAheadThread::InitializeBuffer() { + void* mapped_addr = snapuserd_->GetMappedAddr(); + // Map the scratch space region into memory + metadata_buffer_ = + static_cast((char*)mapped_addr + snapuserd_->GetBufferMetadataOffset()); + read_ahead_buffer_ = static_cast((char*)mapped_addr + snapuserd_->GetBufferDataOffset()); +} + +} // namespace snapshot +} // namespace android diff --git a/fs_mgr/libsnapshot/snapuserd_worker.cpp b/fs_mgr/libsnapshot/snapuserd_worker.cpp index 4b2c5a083..9f42ab8ec 100644 --- a/fs_mgr/libsnapshot/snapuserd_worker.cpp +++ b/fs_mgr/libsnapshot/snapuserd_worker.cpp @@ -135,14 +135,11 @@ bool WorkerThread::ProcessReplaceOp(const CowOperation* cow_op) { return true; } -// Start the copy operation. This will read the backing -// block device which is represented by cow_op->source. -bool WorkerThread::ProcessCopyOp(const CowOperation* cow_op) { +bool WorkerThread::ReadFromBaseDevice(const CowOperation* cow_op) { void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ); CHECK(buffer != nullptr); - - // Issue a single 4K IO. However, this can be optimized - // if the successive blocks are contiguous. + SNAP_LOG(DEBUG) << " ReadFromBaseDevice...: new-block: " << cow_op->new_block + << " Source: " << cow_op->source; if (!android::base::ReadFullyAtOffset(backing_store_fd_, buffer, BLOCK_SZ, cow_op->source * BLOCK_SZ)) { SNAP_PLOG(ERROR) << "Copy-op failed. Read from backing store: " << backing_store_device_ @@ -153,6 +150,31 @@ bool WorkerThread::ProcessCopyOp(const CowOperation* cow_op) { return true; } +bool WorkerThread::GetReadAheadPopulatedBuffer(const CowOperation* cow_op) { + void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ); + CHECK(buffer != nullptr); + + if (!snapuserd_->GetReadAheadPopulatedBuffer(cow_op->new_block, buffer)) { + return false; + } + + return true; +} + +// Start the copy operation. This will read the backing +// block device which is represented by cow_op->source. +bool WorkerThread::ProcessCopyOp(const CowOperation* cow_op) { + if (!GetReadAheadPopulatedBuffer(cow_op)) { + SNAP_LOG(DEBUG) << " GetReadAheadPopulatedBuffer failed..." + << " new_block: " << cow_op->new_block; + if (!ReadFromBaseDevice(cow_op)) { + return false; + } + } + + return true; +} + bool WorkerThread::ProcessZeroOp() { // Zero out the entire block void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ); @@ -386,8 +408,10 @@ loff_t WorkerThread::GetMergeStartOffset(void* merged_buffer, void* unmerged_buf } int WorkerThread::GetNumberOfMergedOps(void* merged_buffer, void* unmerged_buffer, loff_t offset, - int unmerged_exceptions) { + int unmerged_exceptions, bool* copy_op, bool* commit) { int merged_ops_cur_iter = 0; + std::unordered_map& read_ahead_buffer_map = snapuserd_->GetReadAheadMap(); + *copy_op = false; std::vector>& chunk_vec = snapuserd_->GetChunkVec(); // Find the operations which are merged in this cycle. @@ -411,6 +435,23 @@ int WorkerThread::GetNumberOfMergedOps(void* merged_buffer, void* unmerged_buffe const CowOperation* cow_op = it->second; CHECK(cow_op != nullptr); + if (snapuserd_->IsReadAheadFeaturePresent() && cow_op->type == kCowCopyOp) { + *copy_op = true; + // Every single copy operation has to come from read-ahead + // cache. + if (read_ahead_buffer_map.find(cow_op->new_block) == read_ahead_buffer_map.end()) { + SNAP_LOG(ERROR) + << " Block: " << cow_op->new_block << " not found in read-ahead cache" + << " Source: " << cow_op->source; + return -1; + } + // If this is a final block merged in the read-ahead buffer + // region, notify the read-ahead thread to make forward + // progress + if (cow_op->new_block == snapuserd_->GetFinalBlockMerged()) { + *commit = true; + } + } CHECK(cow_op->new_block == cow_de->old_chunk); // zero out to indicate that operation is merged. @@ -442,6 +483,8 @@ int WorkerThread::GetNumberOfMergedOps(void* merged_buffer, void* unmerged_buffe bool WorkerThread::ProcessMergeComplete(chunk_t chunk, void* buffer) { uint32_t stride = exceptions_per_area_ + 1; const std::vector>& vec = snapuserd_->GetMetadataVec(); + bool copy_op = false; + bool commit = false; // ChunkID to vector index lldiv_t divresult = lldiv(chunk, stride); @@ -452,13 +495,24 @@ bool WorkerThread::ProcessMergeComplete(chunk_t chunk, void* buffer) { int unmerged_exceptions = 0; loff_t offset = GetMergeStartOffset(buffer, vec[divresult.quot].get(), &unmerged_exceptions); - int merged_ops_cur_iter = - GetNumberOfMergedOps(buffer, vec[divresult.quot].get(), offset, unmerged_exceptions); + int merged_ops_cur_iter = GetNumberOfMergedOps(buffer, vec[divresult.quot].get(), offset, + unmerged_exceptions, ©_op, &commit); // There should be at least one operation merged in this cycle CHECK(merged_ops_cur_iter > 0); - if (!snapuserd_->CommitMerge(merged_ops_cur_iter)) { - return false; + + if (copy_op) { + if (commit) { + // Push the flushing logic to read-ahead thread so that merge thread + // can make forward progress. Sync will happen in the background + snapuserd_->StartReadAhead(); + } + } else { + // Non-copy ops and all ops in older COW format + if (!snapuserd_->CommitMerge(merged_ops_cur_iter)) { + SNAP_LOG(ERROR) << "CommitMerge failed..."; + return false; + } } SNAP_LOG(DEBUG) << "Merge success: " << merged_ops_cur_iter << "chunk: " << chunk;