logd: single std::mutex for locking log buffers and tracking readers
There are only three places where the log buffer lock is not already held when the reader lock is taken: 1) In LogReader, when a new reader connects 2) In LogReader, when a misbehaving reader disconnects 3) LogReaderThread::ThreadFunction() 1) and 2) happen sufficiently rarely that there's no impact if they additionally held a global lock. 3) is refactored in this CL. Previously, it would do the below in a loop 1) Lock the reader lock then wait on a condition variable 2) Unlock the reader lock 3) Lock the log buffer lock in LogBuffer::FlushTo() 4) In each iteration in the LogBuffer::FlushTo() loop 1) Lock then unlock the reader lock in FilterSecondPass() 2) Unlock the log buffer lock to send the message, then re-lock it 5) Unlock the log buffer lock when leaving LogBuffer::FlushTo() If these locks are collapsed into a single lock, then this simplifies to: 1) Lock the single lock then wait on a condition variable 2) In each iteration in the LogBuffer::FlushTo() loop 1) Unlock the single lock to send the message, then re-lock it Collapsing both these locks into a single lock simplifes the code and removes the overhead of acquiring the second lock, in the majority of use cases where the first lock is already held. Secondly, this lock will be a plain std::mutex instead of a RwLock. RwLock's are appropriate when there is a substantial imbalance between readers and writers and high contention, neither are true for logd. Bug: 169736426 Test: logging unit tests Change-Id: Ia511506f2d0935a5321c1b2f65569066f91ecb06
This commit is contained in:
parent
8401907adc
commit
c581886eea
|
@ -58,12 +58,13 @@ cc_library_static {
|
|||
srcs: [
|
||||
"ChattyLogBuffer.cpp",
|
||||
"CompressionEngine.cpp",
|
||||
"LogBufferElement.cpp",
|
||||
"LogReaderList.cpp",
|
||||
"LogReaderThread.cpp",
|
||||
"LogBufferElement.cpp",
|
||||
"LogSize.cpp",
|
||||
"LogStatistics.cpp",
|
||||
"LogTags.cpp",
|
||||
"LogdLock.cpp",
|
||||
"PruneList.cpp",
|
||||
"SerializedFlushToState.cpp",
|
||||
"SerializedLogBuffer.cpp",
|
||||
|
@ -138,6 +139,7 @@ cc_defaults {
|
|||
"-fstack-protector-all",
|
||||
"-g",
|
||||
"-Wall",
|
||||
"-Wthread-safety",
|
||||
"-Wextra",
|
||||
"-Werror",
|
||||
"-fno-builtin",
|
||||
|
|
|
@ -333,8 +333,6 @@ bool ChattyLogBuffer::Prune(log_id_t id, unsigned long pruneRows, uid_t caller_u
|
|||
LogReaderThread* oldest = nullptr;
|
||||
bool clearAll = pruneRows == ULONG_MAX;
|
||||
|
||||
auto reader_threads_lock = std::lock_guard{reader_list()->reader_threads_lock()};
|
||||
|
||||
// Region locked?
|
||||
for (const auto& reader_thread : reader_list()->reader_threads()) {
|
||||
if (!reader_thread->IsWatching(id)) {
|
||||
|
|
|
@ -33,19 +33,19 @@
|
|||
#include "LogStatistics.h"
|
||||
#include "LogTags.h"
|
||||
#include "LogWriter.h"
|
||||
#include "LogdLock.h"
|
||||
#include "PruneList.h"
|
||||
#include "SimpleLogBuffer.h"
|
||||
#include "rwlock.h"
|
||||
|
||||
typedef std::list<LogBufferElement> LogBufferElementCollection;
|
||||
|
||||
class ChattyLogBuffer : public SimpleLogBuffer {
|
||||
// watermark of any worst/chatty uid processing
|
||||
typedef std::unordered_map<uid_t, LogBufferElementCollection::iterator> LogBufferIteratorMap;
|
||||
LogBufferIteratorMap mLastWorst[LOG_ID_MAX] GUARDED_BY(lock_);
|
||||
LogBufferIteratorMap mLastWorst[LOG_ID_MAX] GUARDED_BY(logd_lock);
|
||||
// watermark of any worst/chatty pid of system processing
|
||||
typedef std::unordered_map<pid_t, LogBufferElementCollection::iterator> LogBufferPidIteratorMap;
|
||||
LogBufferPidIteratorMap mLastWorstPidOfSystem[LOG_ID_MAX] GUARDED_BY(lock_);
|
||||
LogBufferPidIteratorMap mLastWorstPidOfSystem[LOG_ID_MAX] GUARDED_BY(logd_lock);
|
||||
|
||||
public:
|
||||
ChattyLogBuffer(LogReaderList* reader_list, LogTags* tags, PruneList* prune,
|
||||
|
@ -53,18 +53,18 @@ class ChattyLogBuffer : public SimpleLogBuffer {
|
|||
~ChattyLogBuffer();
|
||||
|
||||
protected:
|
||||
bool Prune(log_id_t id, unsigned long pruneRows, uid_t uid) REQUIRES(lock_) override;
|
||||
void LogInternal(LogBufferElement&& elem) REQUIRES(lock_) override;
|
||||
bool Prune(log_id_t id, unsigned long pruneRows, uid_t uid) REQUIRES(logd_lock) override;
|
||||
void LogInternal(LogBufferElement&& elem) REQUIRES(logd_lock) override;
|
||||
|
||||
private:
|
||||
LogBufferElementCollection::iterator Erase(LogBufferElementCollection::iterator it,
|
||||
bool coalesce = false) REQUIRES(lock_);
|
||||
bool coalesce = false) REQUIRES(logd_lock);
|
||||
|
||||
PruneList* prune_;
|
||||
|
||||
// This always contains a copy of the last message logged, for deduplication.
|
||||
std::optional<LogBufferElement> last_logged_elements_[LOG_ID_MAX] GUARDED_BY(lock_);
|
||||
std::optional<LogBufferElement> last_logged_elements_[LOG_ID_MAX] GUARDED_BY(logd_lock);
|
||||
// This contains an element if duplicate messages are seen.
|
||||
// Its `dropped` count is `duplicates seen - 1`.
|
||||
std::optional<LogBufferElement> duplicate_elements_[LOG_ID_MAX] GUARDED_BY(lock_);
|
||||
std::optional<LogBufferElement> duplicate_elements_[LOG_ID_MAX] GUARDED_BY(logd_lock);
|
||||
};
|
||||
|
|
|
@ -60,9 +60,14 @@ TEST_P(ChattyLogBufferTest, deduplication_simple) {
|
|||
LogMessages(log_messages);
|
||||
|
||||
std::vector<LogMessage> read_log_messages;
|
||||
std::unique_ptr<LogWriter> test_writer(new TestWriter(&read_log_messages, nullptr));
|
||||
std::unique_ptr<FlushToState> flush_to_state = log_buffer_->CreateFlushToState(1, kLogMaskAll);
|
||||
EXPECT_TRUE(log_buffer_->FlushTo(test_writer.get(), *flush_to_state, nullptr));
|
||||
{
|
||||
auto lock = std::lock_guard{logd_lock};
|
||||
std::unique_ptr<LogWriter> test_writer(new TestWriter(&read_log_messages, nullptr));
|
||||
|
||||
std::unique_ptr<FlushToState> flush_to_state =
|
||||
log_buffer_->CreateFlushToState(1, kLogMaskAll);
|
||||
EXPECT_TRUE(log_buffer_->FlushTo(test_writer.get(), *flush_to_state, nullptr));
|
||||
}
|
||||
|
||||
std::vector<LogMessage> expected_log_messages = {
|
||||
make_message(0, "test_tag", "duplicate"),
|
||||
|
@ -117,9 +122,13 @@ TEST_P(ChattyLogBufferTest, deduplication_overflow) {
|
|||
LogMessages(log_messages);
|
||||
|
||||
std::vector<LogMessage> read_log_messages;
|
||||
std::unique_ptr<LogWriter> test_writer(new TestWriter(&read_log_messages, nullptr));
|
||||
std::unique_ptr<FlushToState> flush_to_state = log_buffer_->CreateFlushToState(1, kLogMaskAll);
|
||||
EXPECT_TRUE(log_buffer_->FlushTo(test_writer.get(), *flush_to_state, nullptr));
|
||||
{
|
||||
auto lock = std::lock_guard{logd_lock};
|
||||
std::unique_ptr<LogWriter> test_writer(new TestWriter(&read_log_messages, nullptr));
|
||||
std::unique_ptr<FlushToState> flush_to_state =
|
||||
log_buffer_->CreateFlushToState(1, kLogMaskAll);
|
||||
EXPECT_TRUE(log_buffer_->FlushTo(test_writer.get(), *flush_to_state, nullptr));
|
||||
}
|
||||
|
||||
std::vector<LogMessage> expected_log_messages = {
|
||||
make_message(0, "test_tag", "normal"),
|
||||
|
@ -173,9 +182,13 @@ TEST_P(ChattyLogBufferTest, deduplication_liblog) {
|
|||
LogMessages(log_messages);
|
||||
|
||||
std::vector<LogMessage> read_log_messages;
|
||||
std::unique_ptr<LogWriter> test_writer(new TestWriter(&read_log_messages, nullptr));
|
||||
std::unique_ptr<FlushToState> flush_to_state = log_buffer_->CreateFlushToState(1, kLogMaskAll);
|
||||
EXPECT_TRUE(log_buffer_->FlushTo(test_writer.get(), *flush_to_state, nullptr));
|
||||
{
|
||||
auto lock = std::lock_guard{logd_lock};
|
||||
std::unique_ptr<LogWriter> test_writer(new TestWriter(&read_log_messages, nullptr));
|
||||
std::unique_ptr<FlushToState> flush_to_state =
|
||||
log_buffer_->CreateFlushToState(1, kLogMaskAll);
|
||||
EXPECT_TRUE(log_buffer_->FlushTo(test_writer.get(), *flush_to_state, nullptr));
|
||||
}
|
||||
|
||||
std::vector<LogMessage> expected_log_messages = {
|
||||
make_message(0, 1234, 1),
|
||||
|
@ -257,7 +270,7 @@ TEST_P(ChattyLogBufferTest, no_leading_chatty_simple) {
|
|||
std::vector<LogMessage> read_log_messages;
|
||||
bool released = false;
|
||||
{
|
||||
auto lock = std::unique_lock{reader_list_.reader_threads_lock()};
|
||||
auto lock = std::lock_guard{logd_lock};
|
||||
std::unique_ptr<LogWriter> test_writer(new TestWriter(&read_log_messages, &released));
|
||||
std::unique_ptr<LogReaderThread> log_reader(
|
||||
new LogReaderThread(log_buffer_.get(), &reader_list_, std::move(test_writer), true,
|
||||
|
@ -317,7 +330,7 @@ TEST_P(ChattyLogBufferTest, no_leading_chatty_tail) {
|
|||
std::vector<LogMessage> read_log_messages;
|
||||
bool released = false;
|
||||
{
|
||||
auto lock = std::unique_lock{reader_list_.reader_threads_lock()};
|
||||
auto lock = std::lock_guard{logd_lock};
|
||||
std::unique_ptr<LogWriter> test_writer(new TestWriter(&read_log_messages, &released));
|
||||
std::unique_ptr<LogReaderThread> log_reader(
|
||||
new LogReaderThread(log_buffer_.get(), &reader_list_, std::move(test_writer), true,
|
||||
|
|
|
@ -21,10 +21,12 @@
|
|||
#include <functional>
|
||||
#include <memory>
|
||||
|
||||
#include <android-base/thread_annotations.h>
|
||||
#include <log/log.h>
|
||||
#include <log/log_read.h>
|
||||
|
||||
#include "LogWriter.h"
|
||||
#include "LogdLock.h"
|
||||
|
||||
// A mask to represent which log buffers a reader is watching, values are (1 << LOG_ID_MAIN), etc.
|
||||
using LogMask = uint32_t;
|
||||
|
@ -62,12 +64,12 @@ class LogBuffer {
|
|||
virtual int Log(log_id_t log_id, log_time realtime, uid_t uid, pid_t pid, pid_t tid,
|
||||
const char* msg, uint16_t len) = 0;
|
||||
|
||||
virtual std::unique_ptr<FlushToState> CreateFlushToState(uint64_t start, LogMask log_mask) = 0;
|
||||
virtual void DeleteFlushToState(std::unique_ptr<FlushToState>) {}
|
||||
virtual std::unique_ptr<FlushToState> CreateFlushToState(uint64_t start, LogMask log_mask)
|
||||
REQUIRES(logd_lock) = 0;
|
||||
virtual bool FlushTo(
|
||||
LogWriter* writer, FlushToState& state,
|
||||
const std::function<FilterResult(log_id_t log_id, pid_t pid, uint64_t sequence,
|
||||
log_time realtime)>& filter) = 0;
|
||||
log_time realtime)>& filter) REQUIRES(logd_lock) = 0;
|
||||
|
||||
virtual bool Clear(log_id_t id, uid_t uid) = 0;
|
||||
virtual size_t GetSize(log_id_t id) = 0;
|
||||
|
|
|
@ -190,10 +190,14 @@ TEST_P(LogBufferTest, smoke) {
|
|||
LogMessages(log_messages);
|
||||
|
||||
std::vector<LogMessage> read_log_messages;
|
||||
std::unique_ptr<LogWriter> test_writer(new TestWriter(&read_log_messages, nullptr));
|
||||
std::unique_ptr<FlushToState> flush_to_state = log_buffer_->CreateFlushToState(1, kLogMaskAll);
|
||||
EXPECT_TRUE(log_buffer_->FlushTo(test_writer.get(), *flush_to_state, nullptr));
|
||||
EXPECT_EQ(2ULL, flush_to_state->start());
|
||||
{
|
||||
auto lock = std::lock_guard{logd_lock};
|
||||
std::unique_ptr<LogWriter> test_writer(new TestWriter(&read_log_messages, nullptr));
|
||||
std::unique_ptr<FlushToState> flush_to_state =
|
||||
log_buffer_->CreateFlushToState(1, kLogMaskAll);
|
||||
EXPECT_TRUE(log_buffer_->FlushTo(test_writer.get(), *flush_to_state, nullptr));
|
||||
EXPECT_EQ(2ULL, flush_to_state->start());
|
||||
}
|
||||
CompareLogMessages(log_messages, read_log_messages);
|
||||
}
|
||||
|
||||
|
@ -227,7 +231,7 @@ TEST_P(LogBufferTest, smoke_with_reader_thread) {
|
|||
bool released = false;
|
||||
|
||||
{
|
||||
auto lock = std::unique_lock{reader_list_.reader_threads_lock()};
|
||||
auto lock = std::lock_guard{logd_lock};
|
||||
std::unique_ptr<LogWriter> test_writer(new TestWriter(&read_log_messages, &released));
|
||||
std::unique_ptr<LogReaderThread> log_reader(
|
||||
new LogReaderThread(log_buffer_.get(), &reader_list_, std::move(test_writer), true,
|
||||
|
@ -239,7 +243,7 @@ TEST_P(LogBufferTest, smoke_with_reader_thread) {
|
|||
usleep(5000);
|
||||
}
|
||||
{
|
||||
auto lock = std::unique_lock{reader_list_.reader_threads_lock()};
|
||||
auto lock = std::lock_guard{logd_lock};
|
||||
EXPECT_EQ(0U, reader_list_.reader_threads().size());
|
||||
}
|
||||
CompareLogMessages(log_messages, read_log_messages);
|
||||
|
@ -301,7 +305,7 @@ TEST_P(LogBufferTest, random_messages) {
|
|||
bool released = false;
|
||||
|
||||
{
|
||||
auto lock = std::unique_lock{reader_list_.reader_threads_lock()};
|
||||
auto lock = std::lock_guard{logd_lock};
|
||||
std::unique_ptr<LogWriter> test_writer(new TestWriter(&read_log_messages, &released));
|
||||
std::unique_ptr<LogReaderThread> log_reader(
|
||||
new LogReaderThread(log_buffer_.get(), &reader_list_, std::move(test_writer), true,
|
||||
|
@ -313,7 +317,7 @@ TEST_P(LogBufferTest, random_messages) {
|
|||
usleep(5000);
|
||||
}
|
||||
{
|
||||
auto lock = std::unique_lock{reader_list_.reader_threads_lock()};
|
||||
auto lock = std::lock_guard{logd_lock};
|
||||
EXPECT_EQ(0U, reader_list_.reader_threads().size());
|
||||
}
|
||||
CompareLogMessages(log_messages, read_log_messages);
|
||||
|
@ -335,7 +339,7 @@ TEST_P(LogBufferTest, read_last_sequence) {
|
|||
bool released = false;
|
||||
|
||||
{
|
||||
auto lock = std::unique_lock{reader_list_.reader_threads_lock()};
|
||||
auto lock = std::lock_guard{logd_lock};
|
||||
std::unique_ptr<LogWriter> test_writer(new TestWriter(&read_log_messages, &released));
|
||||
std::unique_ptr<LogReaderThread> log_reader(
|
||||
new LogReaderThread(log_buffer_.get(), &reader_list_, std::move(test_writer), true,
|
||||
|
@ -347,7 +351,7 @@ TEST_P(LogBufferTest, read_last_sequence) {
|
|||
usleep(5000);
|
||||
}
|
||||
{
|
||||
auto lock = std::unique_lock{reader_list_.reader_threads_lock()};
|
||||
auto lock = std::lock_guard{logd_lock};
|
||||
EXPECT_EQ(0U, reader_list_.reader_threads().size());
|
||||
}
|
||||
std::vector<LogMessage> expected_log_messages = {log_messages.back()};
|
||||
|
@ -372,7 +376,7 @@ TEST_P(LogBufferTest, clear_logs) {
|
|||
|
||||
// Connect a blocking reader.
|
||||
{
|
||||
auto lock = std::unique_lock{reader_list_.reader_threads_lock()};
|
||||
auto lock = std::lock_guard{logd_lock};
|
||||
std::unique_ptr<LogWriter> test_writer(new TestWriter(&read_log_messages, &released));
|
||||
std::unique_ptr<LogReaderThread> log_reader(
|
||||
new LogReaderThread(log_buffer_.get(), &reader_list_, std::move(test_writer), false,
|
||||
|
@ -385,7 +389,7 @@ TEST_P(LogBufferTest, clear_logs) {
|
|||
int count = 0;
|
||||
for (; count < kMaxRetryCount; ++count) {
|
||||
usleep(5000);
|
||||
auto lock = std::unique_lock{reader_list_.reader_threads_lock()};
|
||||
auto lock = std::lock_guard{logd_lock};
|
||||
if (reader_list_.reader_threads().back()->start() == 4) {
|
||||
break;
|
||||
}
|
||||
|
@ -410,7 +414,7 @@ TEST_P(LogBufferTest, clear_logs) {
|
|||
// Wait up to 250ms for the reader to read the 3 additional logs.
|
||||
for (count = 0; count < kMaxRetryCount; ++count) {
|
||||
usleep(5000);
|
||||
auto lock = std::unique_lock{reader_list_.reader_threads_lock()};
|
||||
auto lock = std::lock_guard{logd_lock};
|
||||
if (reader_list_.reader_threads().back()->start() == 7) {
|
||||
break;
|
||||
}
|
||||
|
@ -419,14 +423,14 @@ TEST_P(LogBufferTest, clear_logs) {
|
|||
|
||||
// Release the reader, wait for it to get the signal then check that it has been deleted.
|
||||
{
|
||||
auto lock = std::unique_lock{reader_list_.reader_threads_lock()};
|
||||
reader_list_.reader_threads().back()->release_Locked();
|
||||
auto lock = std::lock_guard{logd_lock};
|
||||
reader_list_.reader_threads().back()->Release();
|
||||
}
|
||||
while (!released) {
|
||||
usleep(5000);
|
||||
}
|
||||
{
|
||||
auto lock = std::unique_lock{reader_list_.reader_threads_lock()};
|
||||
auto lock = std::lock_guard{logd_lock};
|
||||
EXPECT_EQ(0U, reader_list_.reader_threads().size());
|
||||
}
|
||||
|
||||
|
@ -438,10 +442,15 @@ TEST_P(LogBufferTest, clear_logs) {
|
|||
|
||||
// Finally, call FlushTo and ensure that only the 3 logs after the clear remain in the buffer.
|
||||
std::vector<LogMessage> read_log_messages_after_clear;
|
||||
std::unique_ptr<LogWriter> test_writer(new TestWriter(&read_log_messages_after_clear, nullptr));
|
||||
std::unique_ptr<FlushToState> flush_to_state = log_buffer_->CreateFlushToState(1, kLogMaskAll);
|
||||
EXPECT_TRUE(log_buffer_->FlushTo(test_writer.get(), *flush_to_state, nullptr));
|
||||
EXPECT_EQ(7ULL, flush_to_state->start());
|
||||
{
|
||||
auto lock = std::lock_guard{logd_lock};
|
||||
std::unique_ptr<LogWriter> test_writer(
|
||||
new TestWriter(&read_log_messages_after_clear, nullptr));
|
||||
std::unique_ptr<FlushToState> flush_to_state =
|
||||
log_buffer_->CreateFlushToState(1, kLogMaskAll);
|
||||
EXPECT_TRUE(log_buffer_->FlushTo(test_writer.get(), *flush_to_state, nullptr));
|
||||
EXPECT_EQ(7ULL, flush_to_state->start());
|
||||
}
|
||||
CompareLogMessages(after_clear_messages, read_log_messages_after_clear);
|
||||
}
|
||||
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
#include <ctype.h>
|
||||
#include <inttypes.h>
|
||||
#include <poll.h>
|
||||
#include <sched.h>
|
||||
#include <sys/prctl.h>
|
||||
#include <sys/socket.h>
|
||||
#include <sys/types.h>
|
||||
|
@ -152,8 +153,8 @@ bool LogReader::onDataAvailable(SocketClient* cli) {
|
|||
if (!fastcmp<strncmp>(buffer, "dumpAndClose", 12)) {
|
||||
// Allow writer to get some cycles, and wait for pending notifications
|
||||
sched_yield();
|
||||
reader_list_->reader_threads_lock().lock();
|
||||
reader_list_->reader_threads_lock().unlock();
|
||||
logd_lock.lock();
|
||||
logd_lock.unlock();
|
||||
sched_yield();
|
||||
nonBlock = true;
|
||||
}
|
||||
|
@ -191,6 +192,7 @@ bool LogReader::onDataAvailable(SocketClient* cli) {
|
|||
}
|
||||
return FilterResult::kSkip;
|
||||
};
|
||||
auto lock = std::lock_guard{logd_lock};
|
||||
auto flush_to_state = log_buffer_->CreateFlushToState(sequence, logMask);
|
||||
log_buffer_->FlushTo(socket_log_writer.get(), *flush_to_state, log_find_start);
|
||||
|
||||
|
@ -212,7 +214,7 @@ bool LogReader::onDataAvailable(SocketClient* cli) {
|
|||
deadline = {};
|
||||
}
|
||||
|
||||
auto lock = std::lock_guard{reader_list_->reader_threads_lock()};
|
||||
auto lock = std::lock_guard{logd_lock};
|
||||
auto entry = std::make_unique<LogReaderThread>(log_buffer_, reader_list_,
|
||||
std::move(socket_log_writer), nonBlock, tail,
|
||||
logMask, pid, start, sequence, deadline);
|
||||
|
@ -230,10 +232,10 @@ bool LogReader::onDataAvailable(SocketClient* cli) {
|
|||
|
||||
bool LogReader::DoSocketDelete(SocketClient* cli) {
|
||||
auto cli_name = SocketClientToName(cli);
|
||||
auto lock = std::lock_guard{reader_list_->reader_threads_lock()};
|
||||
auto lock = std::lock_guard{logd_lock};
|
||||
for (const auto& reader : reader_list_->reader_threads()) {
|
||||
if (reader->name() == cli_name) {
|
||||
reader->release_Locked();
|
||||
reader->Release();
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,8 +19,6 @@
|
|||
// When we are notified a new log entry is available, inform
|
||||
// listening sockets who are watching this entry's log id.
|
||||
void LogReaderList::NotifyNewLog(LogMask log_mask) const {
|
||||
auto lock = std::lock_guard{reader_threads_lock_};
|
||||
|
||||
for (const auto& entry : reader_threads_) {
|
||||
if (!entry->IsWatchingMultiple(log_mask)) {
|
||||
continue;
|
||||
|
@ -28,6 +26,6 @@ void LogReaderList::NotifyNewLog(LogMask log_mask) const {
|
|||
if (entry->deadline().time_since_epoch().count() != 0) {
|
||||
continue;
|
||||
}
|
||||
entry->triggerReader_Locked();
|
||||
entry->TriggerReader();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,15 +22,16 @@
|
|||
|
||||
#include "LogBuffer.h"
|
||||
#include "LogReaderThread.h"
|
||||
#include "LogdLock.h"
|
||||
|
||||
class LogReaderList {
|
||||
public:
|
||||
void NotifyNewLog(LogMask log_mask) const;
|
||||
void NotifyNewLog(LogMask log_mask) const REQUIRES(logd_lock);
|
||||
|
||||
std::list<std::unique_ptr<LogReaderThread>>& reader_threads() { return reader_threads_; }
|
||||
std::mutex& reader_threads_lock() { return reader_threads_lock_; }
|
||||
std::list<std::unique_ptr<LogReaderThread>>& reader_threads() REQUIRES(logd_lock) {
|
||||
return reader_threads_;
|
||||
}
|
||||
|
||||
private:
|
||||
std::list<std::unique_ptr<LogReaderThread>> reader_threads_;
|
||||
mutable std::mutex reader_threads_lock_;
|
||||
std::list<std::unique_ptr<LogReaderThread>> reader_threads_ GUARDED_BY(logd_lock);
|
||||
};
|
|
@ -24,6 +24,7 @@
|
|||
|
||||
#include "LogBuffer.h"
|
||||
#include "LogReaderList.h"
|
||||
#include "SerializedFlushToState.h"
|
||||
|
||||
LogReaderThread::LogReaderThread(LogBuffer* log_buffer, LogReaderList* reader_list,
|
||||
std::unique_ptr<LogWriter> writer, bool non_block,
|
||||
|
@ -40,7 +41,7 @@ LogReaderThread::LogReaderThread(LogBuffer* log_buffer, LogReaderList* reader_li
|
|||
start_time_(start_time),
|
||||
deadline_(deadline),
|
||||
non_block_(non_block) {
|
||||
cleanSkip_Locked();
|
||||
CleanSkip();
|
||||
flush_to_state_ = log_buffer_->CreateFlushToState(start, log_mask);
|
||||
auto thread = std::thread{&LogReaderThread::ThreadFunction, this};
|
||||
thread.detach();
|
||||
|
@ -49,7 +50,8 @@ LogReaderThread::LogReaderThread(LogBuffer* log_buffer, LogReaderList* reader_li
|
|||
void LogReaderThread::ThreadFunction() {
|
||||
prctl(PR_SET_NAME, "logd.reader.per");
|
||||
|
||||
auto lock = std::unique_lock{reader_list_->reader_threads_lock()};
|
||||
auto lock = std::unique_lock{logd_lock};
|
||||
auto lock_assertion = android::base::ScopedLockAssertion{logd_lock};
|
||||
|
||||
while (!release_) {
|
||||
if (deadline_.time_since_epoch().count() != 0) {
|
||||
|
@ -62,23 +64,19 @@ void LogReaderThread::ThreadFunction() {
|
|||
}
|
||||
}
|
||||
|
||||
lock.unlock();
|
||||
|
||||
if (tail_) {
|
||||
auto first_pass_state = log_buffer_->CreateFlushToState(flush_to_state_->start(),
|
||||
flush_to_state_->log_mask());
|
||||
log_buffer_->FlushTo(
|
||||
writer_.get(), *first_pass_state,
|
||||
[this](log_id_t log_id, pid_t pid, uint64_t sequence, log_time realtime) {
|
||||
return FilterFirstPass(log_id, pid, sequence, realtime);
|
||||
});
|
||||
log_buffer_->DeleteFlushToState(std::move(first_pass_state));
|
||||
log_buffer_->FlushTo(writer_.get(), *first_pass_state,
|
||||
[this](log_id_t log_id, pid_t pid, uint64_t sequence,
|
||||
log_time realtime) REQUIRES(logd_lock) {
|
||||
return FilterFirstPass(log_id, pid, sequence, realtime);
|
||||
});
|
||||
}
|
||||
bool flush_success = log_buffer_->FlushTo(
|
||||
writer_.get(), *flush_to_state_,
|
||||
[this](log_id_t log_id, pid_t pid, uint64_t sequence, log_time realtime) {
|
||||
return FilterSecondPass(log_id, pid, sequence, realtime);
|
||||
});
|
||||
[this](log_id_t log_id, pid_t pid, uint64_t sequence, log_time realtime) REQUIRES(
|
||||
logd_lock) { return FilterSecondPass(log_id, pid, sequence, realtime); });
|
||||
|
||||
// We only ignore entries before the original start time for the first flushTo(), if we
|
||||
// get entries after this first flush before the original start time, then the client
|
||||
|
@ -89,8 +87,6 @@ void LogReaderThread::ThreadFunction() {
|
|||
start_time_.tv_sec = 0;
|
||||
start_time_.tv_nsec = 0;
|
||||
|
||||
lock.lock();
|
||||
|
||||
if (!flush_success) {
|
||||
break;
|
||||
}
|
||||
|
@ -99,17 +95,13 @@ void LogReaderThread::ThreadFunction() {
|
|||
break;
|
||||
}
|
||||
|
||||
cleanSkip_Locked();
|
||||
CleanSkip();
|
||||
|
||||
if (deadline_.time_since_epoch().count() == 0) {
|
||||
thread_triggered_condition_.wait(lock);
|
||||
}
|
||||
}
|
||||
|
||||
lock.unlock();
|
||||
log_buffer_->DeleteFlushToState(std::move(flush_to_state_));
|
||||
lock.lock();
|
||||
|
||||
writer_->Release();
|
||||
|
||||
auto& log_reader_threads = reader_list_->reader_threads();
|
||||
|
@ -123,8 +115,6 @@ void LogReaderThread::ThreadFunction() {
|
|||
|
||||
// A first pass to count the number of elements
|
||||
FilterResult LogReaderThread::FilterFirstPass(log_id_t, pid_t pid, uint64_t, log_time realtime) {
|
||||
auto lock = std::lock_guard{reader_list_->reader_threads_lock()};
|
||||
|
||||
if ((!pid_ || pid_ == pid) && (start_time_ == log_time::EPOCH || start_time_ <= realtime)) {
|
||||
++count_;
|
||||
}
|
||||
|
@ -135,8 +125,6 @@ FilterResult LogReaderThread::FilterFirstPass(log_id_t, pid_t pid, uint64_t, log
|
|||
// A second pass to send the selected elements
|
||||
FilterResult LogReaderThread::FilterSecondPass(log_id_t log_id, pid_t pid, uint64_t,
|
||||
log_time realtime) {
|
||||
auto lock = std::lock_guard{reader_list_->reader_threads_lock()};
|
||||
|
||||
if (skip_ahead_[log_id]) {
|
||||
skip_ahead_[log_id]--;
|
||||
return FilterResult::kSkip;
|
||||
|
@ -179,7 +167,3 @@ ok:
|
|||
}
|
||||
return FilterResult::kSkip;
|
||||
}
|
||||
|
||||
void LogReaderThread::cleanSkip_Locked(void) {
|
||||
memset(skip_ahead_, 0, sizeof(skip_ahead_));
|
||||
}
|
||||
|
|
|
@ -26,10 +26,12 @@
|
|||
#include <list>
|
||||
#include <memory>
|
||||
|
||||
#include <android-base/thread_annotations.h>
|
||||
#include <log/log.h>
|
||||
|
||||
#include "LogBuffer.h"
|
||||
#include "LogWriter.h"
|
||||
#include "LogdLock.h"
|
||||
|
||||
class LogReaderList;
|
||||
|
||||
|
@ -39,50 +41,54 @@ class LogReaderThread {
|
|||
std::unique_ptr<LogWriter> writer, bool non_block, unsigned long tail,
|
||||
LogMask log_mask, pid_t pid, log_time start_time, uint64_t sequence,
|
||||
std::chrono::steady_clock::time_point deadline);
|
||||
void triggerReader_Locked() { thread_triggered_condition_.notify_all(); }
|
||||
void TriggerReader() REQUIRES(logd_lock) { thread_triggered_condition_.notify_all(); }
|
||||
|
||||
void triggerSkip_Locked(log_id_t id, unsigned int skip) { skip_ahead_[id] = skip; }
|
||||
void cleanSkip_Locked();
|
||||
void TriggerSkip(log_id_t id, unsigned int skip) REQUIRES(logd_lock) { skip_ahead_[id] = skip; }
|
||||
void CleanSkip() REQUIRES(logd_lock) { memset(skip_ahead_, 0, sizeof(skip_ahead_)); }
|
||||
|
||||
void release_Locked() {
|
||||
void Release() REQUIRES(logd_lock) {
|
||||
// gracefully shut down the socket.
|
||||
writer_->Shutdown();
|
||||
release_ = true;
|
||||
thread_triggered_condition_.notify_all();
|
||||
}
|
||||
|
||||
bool IsWatching(log_id_t id) const { return flush_to_state_->log_mask() & (1 << id); }
|
||||
bool IsWatchingMultiple(LogMask log_mask) const {
|
||||
return flush_to_state_ && flush_to_state_->log_mask() & log_mask;
|
||||
bool IsWatching(log_id_t id) const REQUIRES(logd_lock) {
|
||||
return flush_to_state_->log_mask() & (1 << id);
|
||||
}
|
||||
bool IsWatchingMultiple(LogMask log_mask) const REQUIRES(logd_lock) {
|
||||
return flush_to_state_->log_mask() & log_mask;
|
||||
}
|
||||
|
||||
std::string name() const { return writer_->name(); }
|
||||
uint64_t start() const { return flush_to_state_->start(); }
|
||||
std::chrono::steady_clock::time_point deadline() const { return deadline_; }
|
||||
FlushToState& flush_to_state() { return *flush_to_state_; }
|
||||
std::string name() const REQUIRES(logd_lock) { return writer_->name(); }
|
||||
uint64_t start() const REQUIRES(logd_lock) { return flush_to_state_->start(); }
|
||||
std::chrono::steady_clock::time_point deadline() const REQUIRES(logd_lock) { return deadline_; }
|
||||
FlushToState& flush_to_state() REQUIRES(logd_lock) { return *flush_to_state_; }
|
||||
|
||||
private:
|
||||
void ThreadFunction();
|
||||
// flushTo filter callbacks
|
||||
FilterResult FilterFirstPass(log_id_t log_id, pid_t pid, uint64_t sequence, log_time realtime);
|
||||
FilterResult FilterSecondPass(log_id_t log_id, pid_t pid, uint64_t sequence, log_time realtime);
|
||||
FilterResult FilterFirstPass(log_id_t log_id, pid_t pid, uint64_t sequence, log_time realtime)
|
||||
REQUIRES(logd_lock);
|
||||
FilterResult FilterSecondPass(log_id_t log_id, pid_t pid, uint64_t sequence, log_time realtime)
|
||||
REQUIRES(logd_lock);
|
||||
|
||||
std::condition_variable thread_triggered_condition_;
|
||||
LogBuffer* log_buffer_;
|
||||
LogReaderList* reader_list_;
|
||||
std::unique_ptr<LogWriter> writer_;
|
||||
std::unique_ptr<LogWriter> writer_ GUARDED_BY(logd_lock);
|
||||
|
||||
// Set to true to cause the thread to end and the LogReaderThread to delete itself.
|
||||
bool release_ = false;
|
||||
bool release_ GUARDED_BY(logd_lock) = false;
|
||||
|
||||
// If set to non-zero, only pids equal to this are read by the reader.
|
||||
const pid_t pid_;
|
||||
// When a reader is referencing (via start_) old elements in the log buffer, and the log
|
||||
// buffer's size grows past its memory limit, the log buffer may request the reader to skip
|
||||
// ahead a specified number of logs.
|
||||
unsigned int skip_ahead_[LOG_ID_MAX];
|
||||
unsigned int skip_ahead_[LOG_ID_MAX] GUARDED_BY(logd_lock);
|
||||
// LogBuffer::FlushTo() needs to store state across subsequent calls.
|
||||
std::unique_ptr<FlushToState> flush_to_state_;
|
||||
std::unique_ptr<FlushToState> flush_to_state_ GUARDED_BY(logd_lock);
|
||||
|
||||
// These next three variables are used for reading only the most recent lines aka `adb logcat
|
||||
// -t` / `adb logcat -T`.
|
||||
|
@ -100,7 +106,7 @@ class LogReaderThread {
|
|||
log_time start_time_;
|
||||
// CLOCK_MONOTONIC based deadline used for log wrapping. If this deadline expires before logs
|
||||
// wrap, then wake up and send the logs to the reader anyway.
|
||||
std::chrono::steady_clock::time_point deadline_;
|
||||
std::chrono::steady_clock::time_point deadline_ GUARDED_BY(logd_lock);
|
||||
// If this reader is 'dumpAndClose' and will disconnect once it has read its intended logs.
|
||||
const bool non_block_;
|
||||
};
|
||||
|
|
|
@ -0,0 +1,19 @@
|
|||
/*
|
||||
* Copyright (C) 2020 The Android Open Source Project
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "LogdLock.h"
|
||||
|
||||
std::mutex logd_lock;
|
|
@ -0,0 +1,21 @@
|
|||
/*
|
||||
* Copyright (C) 2020 The Android Open Source Project
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <mutex>
|
||||
|
||||
extern std::mutex logd_lock;
|
|
@ -321,6 +321,7 @@ class PrintLogs : public SingleBufferOperation {
|
|||
}
|
||||
|
||||
void End() override {
|
||||
auto lock = std::lock_guard{logd_lock};
|
||||
std::unique_ptr<LogWriter> test_writer(new StdoutWriter());
|
||||
std::unique_ptr<FlushToState> flush_to_state = log_buffer_->CreateFlushToState(1, mask_);
|
||||
log_buffer_->FlushTo(test_writer.get(), *flush_to_state, nullptr);
|
||||
|
@ -372,7 +373,7 @@ class PrintAllLogs : public SingleBufferOperation {
|
|||
PrintAllLogs(log_time first_log_timestamp, const char* buffer, const char* buffers)
|
||||
: SingleBufferOperation(first_log_timestamp, buffer) {
|
||||
LogMask mask = BuffersToLogMask(buffers);
|
||||
auto lock = std::unique_lock{reader_list_.reader_threads_lock()};
|
||||
auto lock = std::unique_lock{logd_lock};
|
||||
std::unique_ptr<LogWriter> stdout_writer(new StdoutWriter());
|
||||
std::unique_ptr<LogReaderThread> log_reader(
|
||||
new LogReaderThread(log_buffer_.get(), &reader_list_, std::move(stdout_writer),
|
||||
|
|
|
@ -20,8 +20,9 @@
|
|||
|
||||
#include <android-base/logging.h>
|
||||
|
||||
SerializedFlushToState::SerializedFlushToState(uint64_t start, LogMask log_mask)
|
||||
: FlushToState(start, log_mask) {
|
||||
SerializedFlushToState::SerializedFlushToState(uint64_t start, LogMask log_mask,
|
||||
std::list<SerializedLogChunk>* logs)
|
||||
: FlushToState(start, log_mask), logs_(logs) {
|
||||
log_id_for_each(i) {
|
||||
if (((1 << i) & log_mask) == 0) {
|
||||
continue;
|
||||
|
|
|
@ -21,6 +21,7 @@
|
|||
#include <queue>
|
||||
|
||||
#include "LogBuffer.h"
|
||||
#include "LogdLock.h"
|
||||
#include "SerializedLogChunk.h"
|
||||
#include "SerializedLogEntry.h"
|
||||
|
||||
|
@ -44,48 +45,45 @@ class SerializedFlushToState : public FlushToState {
|
|||
public:
|
||||
// Initializes this state object. For each log buffer set in log_mask, this sets
|
||||
// logs_needed_from_next_position_.
|
||||
SerializedFlushToState(uint64_t start, LogMask log_mask);
|
||||
SerializedFlushToState(uint64_t start, LogMask log_mask, std::list<SerializedLogChunk>* logs)
|
||||
REQUIRES(logd_lock);
|
||||
|
||||
// Decrease the reference of all referenced logs. This happens when a reader is disconnected.
|
||||
~SerializedFlushToState() override;
|
||||
|
||||
// We can't hold SerializedLogBuffer::lock_ in the constructor, so we must initialize logs here.
|
||||
void InitializeLogs(std::list<SerializedLogChunk>* logs) {
|
||||
if (logs_ == nullptr) logs_ = logs;
|
||||
}
|
||||
|
||||
// Updates the state of log_positions_ and logs_needed_from_next_position_ then returns true if
|
||||
// there are any unread logs, false otherwise.
|
||||
bool HasUnreadLogs();
|
||||
bool HasUnreadLogs() REQUIRES(logd_lock);
|
||||
|
||||
// Returns the next unread log and sets logs_needed_from_next_position_ to indicate that we're
|
||||
// waiting for more logs from the associated log buffer.
|
||||
LogWithId PopNextUnreadLog();
|
||||
LogWithId PopNextUnreadLog() REQUIRES(logd_lock);
|
||||
|
||||
// If the parent log buffer prunes logs, the reference that this class contains may become
|
||||
// invalid, so this must be called first to drop the reference to buffer_it, if any.
|
||||
void Prune(log_id_t log_id, const std::list<SerializedLogChunk>::iterator& buffer_it);
|
||||
void Prune(log_id_t log_id, const std::list<SerializedLogChunk>::iterator& buffer_it)
|
||||
REQUIRES(logd_lock);
|
||||
|
||||
private:
|
||||
// Set logs_needed_from_next_position_[i] to indicate if log_positions_[i] points to an unread
|
||||
// log or to the point at which the next log will appear.
|
||||
void UpdateLogsNeeded(log_id_t log_id);
|
||||
void UpdateLogsNeeded(log_id_t log_id) REQUIRES(logd_lock);
|
||||
|
||||
// Create a LogPosition object for the given log_id by searching through the log chunks for the
|
||||
// first chunk and then first log entry within that chunk that is greater or equal to start().
|
||||
void CreateLogPosition(log_id_t log_id);
|
||||
void CreateLogPosition(log_id_t log_id) REQUIRES(logd_lock);
|
||||
|
||||
// Checks to see if any log buffers set in logs_needed_from_next_position_ have new logs and
|
||||
// calls UpdateLogsNeeded() if so.
|
||||
void CheckForNewLogs();
|
||||
void CheckForNewLogs() REQUIRES(logd_lock);
|
||||
|
||||
std::list<SerializedLogChunk>* logs_ = nullptr;
|
||||
std::list<SerializedLogChunk>* logs_ GUARDED_BY(logd_lock) = nullptr;
|
||||
// An optional structure that contains an iterator to the serialized log buffer and offset into
|
||||
// it that this logger should handle next.
|
||||
std::optional<LogPosition> log_positions_[LOG_ID_MAX];
|
||||
std::optional<LogPosition> log_positions_[LOG_ID_MAX] GUARDED_BY(logd_lock);
|
||||
// A bit for each log that is set if a given log_id has no logs or if this client has read all
|
||||
// of its logs. In order words: `logs_[i].empty() || (buffer_it == std::prev(logs_.end) &&
|
||||
// next_log_position == logs_write_position_)`. These will be re-checked in each
|
||||
// loop in case new logs came in.
|
||||
std::bitset<LOG_ID_MAX> logs_needed_from_next_position_ = {};
|
||||
std::bitset<LOG_ID_MAX> logs_needed_from_next_position_ GUARDED_BY(logd_lock) = {};
|
||||
};
|
||||
|
|
|
@ -36,8 +36,8 @@ class SerializedFlushToStateTest : public testing::Test {
|
|||
}
|
||||
void TearDown() override { android::base::SetMinimumLogSeverity(old_log_severity_); }
|
||||
|
||||
std::string TestReport(const std::vector<uint64_t>& expected,
|
||||
const std::vector<uint64_t>& read) {
|
||||
std::string TestReport(const std::vector<uint64_t>& expected, const std::vector<uint64_t>& read)
|
||||
REQUIRES(logd_lock) {
|
||||
auto sequence_to_log_id = [&](uint64_t sequence) -> int {
|
||||
for (const auto& [log_id, sequences] : sequence_numbers_per_buffer_) {
|
||||
if (std::find(sequences.begin(), sequences.end(), sequence) != sequences.end()) {
|
||||
|
@ -82,13 +82,12 @@ class SerializedFlushToStateTest : public testing::Test {
|
|||
// Read sequence numbers in order from SerializedFlushToState for every mask combination and all
|
||||
// sequence numbers from 0 through the highest logged sequence number + 1.
|
||||
// This assumes that all of the logs have already been written.
|
||||
void TestAllReading() {
|
||||
void TestAllReading() REQUIRES(logd_lock) {
|
||||
uint64_t max_sequence = sequence_ + 1;
|
||||
uint32_t max_mask = (1 << LOG_ID_MAX) - 1;
|
||||
for (uint64_t sequence = 0; sequence < max_sequence; ++sequence) {
|
||||
for (uint32_t mask = 0; mask < max_mask; ++mask) {
|
||||
auto state = SerializedFlushToState{sequence, mask};
|
||||
state.InitializeLogs(log_chunks_);
|
||||
auto state = SerializedFlushToState{sequence, mask, log_chunks_};
|
||||
TestReading(sequence, mask, state);
|
||||
}
|
||||
}
|
||||
|
@ -98,14 +97,14 @@ class SerializedFlushToStateTest : public testing::Test {
|
|||
// it calls write_logs() in a loop for sequence/mask combination. It clears log_chunks_ and
|
||||
// sequence_numbers_per_buffer_ between calls, such that only the sequence numbers written in
|
||||
// the previous call to write_logs() are expected.
|
||||
void TestAllReadingWithFutureMessages(const std::function<bool(int)>& write_logs) {
|
||||
void TestAllReadingWithFutureMessages(const std::function<bool(int)>& write_logs)
|
||||
REQUIRES(logd_lock) {
|
||||
uint64_t max_sequence = sequence_ + 1;
|
||||
uint32_t max_mask = (1 << LOG_ID_MAX) - 1;
|
||||
for (uint64_t sequence = 1; sequence < max_sequence; ++sequence) {
|
||||
for (uint32_t mask = 1; mask < max_mask; ++mask) {
|
||||
log_id_for_each(i) { log_chunks_[i].clear(); }
|
||||
auto state = SerializedFlushToState{sequence, mask};
|
||||
state.InitializeLogs(log_chunks_);
|
||||
auto state = SerializedFlushToState{sequence, mask, log_chunks_};
|
||||
int loop_count = 0;
|
||||
while (write_logs(loop_count++)) {
|
||||
TestReading(sequence, mask, state);
|
||||
|
@ -115,7 +114,8 @@ class SerializedFlushToStateTest : public testing::Test {
|
|||
}
|
||||
}
|
||||
|
||||
void TestReading(uint64_t start, LogMask log_mask, SerializedFlushToState& state) {
|
||||
void TestReading(uint64_t start, LogMask log_mask, SerializedFlushToState& state)
|
||||
REQUIRES(logd_lock) {
|
||||
std::vector<uint64_t> expected_sequence;
|
||||
log_id_for_each(i) {
|
||||
if (((1 << i) & log_mask) == 0) {
|
||||
|
@ -148,7 +148,7 @@ class SerializedFlushToStateTest : public testing::Test {
|
|||
// Add a chunk with the given messages to the a given log buffer. Keep track of the sequence
|
||||
// numbers for future validation. Optionally mark the block as having finished writing.
|
||||
void AddChunkWithMessages(bool finish_writing, int buffer,
|
||||
const std::vector<std::string>& messages) {
|
||||
const std::vector<std::string>& messages) REQUIRES(logd_lock) {
|
||||
auto chunk = SerializedLogChunk{kChunkSize};
|
||||
for (const auto& message : messages) {
|
||||
auto sequence = sequence_++;
|
||||
|
@ -175,6 +175,7 @@ class SerializedFlushToStateTest : public testing::Test {
|
|||
// 4: 1 chunk with 0 logs and finished writing (impossible, but SerializedFlushToState handles it)
|
||||
// 5-7: 0 chunks
|
||||
TEST_F(SerializedFlushToStateTest, smoke) {
|
||||
auto lock = std::lock_guard{logd_lock};
|
||||
AddChunkWithMessages(true, 0, {"1st", "2nd"});
|
||||
AddChunkWithMessages(true, 1, {"3rd"});
|
||||
AddChunkWithMessages(false, 0, {"4th"});
|
||||
|
@ -188,6 +189,7 @@ TEST_F(SerializedFlushToStateTest, smoke) {
|
|||
}
|
||||
|
||||
TEST_F(SerializedFlushToStateTest, random) {
|
||||
auto lock = std::lock_guard{logd_lock};
|
||||
srand(1);
|
||||
for (int count = 0; count < 20; ++count) {
|
||||
unsigned int num_messages = 1 + rand() % 15;
|
||||
|
@ -204,7 +206,8 @@ TEST_F(SerializedFlushToStateTest, random) {
|
|||
|
||||
// Same start as smoke, but we selectively write logs to the buffers and ensure they're read.
|
||||
TEST_F(SerializedFlushToStateTest, future_writes) {
|
||||
auto write_logs = [&](int loop_count) {
|
||||
auto lock = std::lock_guard{logd_lock};
|
||||
auto write_logs = [&](int loop_count) REQUIRES(logd_lock) {
|
||||
switch (loop_count) {
|
||||
case 0:
|
||||
// Initial writes.
|
||||
|
@ -252,11 +255,11 @@ TEST_F(SerializedFlushToStateTest, future_writes) {
|
|||
}
|
||||
|
||||
TEST_F(SerializedFlushToStateTest, no_dangling_references) {
|
||||
auto lock = std::lock_guard{logd_lock};
|
||||
AddChunkWithMessages(true, 0, {"1st", "2nd"});
|
||||
AddChunkWithMessages(true, 0, {"3rd", "4th"});
|
||||
|
||||
auto state = SerializedFlushToState{1, kLogMaskAll};
|
||||
state.InitializeLogs(log_chunks_);
|
||||
auto state = SerializedFlushToState{1, kLogMaskAll, log_chunks_};
|
||||
|
||||
ASSERT_EQ(log_chunks_[0].size(), 2U);
|
||||
auto first_chunk = log_chunks_[0].begin();
|
||||
|
@ -290,6 +293,7 @@ TEST_F(SerializedFlushToStateTest, no_dangling_references) {
|
|||
}
|
||||
|
||||
TEST(SerializedFlushToState, Prune) {
|
||||
auto lock = std::lock_guard{logd_lock};
|
||||
auto chunk = SerializedLogChunk{kChunkSize};
|
||||
chunk.Log(1, log_time(), 0, 1, 1, "abc", 3);
|
||||
chunk.Log(2, log_time(), 0, 1, 1, "abc", 3);
|
||||
|
@ -299,8 +303,7 @@ TEST(SerializedFlushToState, Prune) {
|
|||
std::list<SerializedLogChunk> log_chunks[LOG_ID_MAX];
|
||||
log_chunks[LOG_ID_MAIN].emplace_back(std::move(chunk));
|
||||
|
||||
auto state = SerializedFlushToState{1, kLogMaskAll};
|
||||
state.InitializeLogs(log_chunks);
|
||||
auto state = SerializedFlushToState{1, kLogMaskAll, log_chunks};
|
||||
ASSERT_TRUE(state.HasUnreadLogs());
|
||||
|
||||
state.Prune(LOG_ID_MAIN, log_chunks[LOG_ID_MAIN].begin());
|
||||
|
|
|
@ -41,9 +41,9 @@ void SerializedLogBuffer::Init() {
|
|||
}
|
||||
|
||||
// Release any sleeping reader threads to dump their current content.
|
||||
auto reader_threads_lock = std::lock_guard{reader_list_->reader_threads_lock()};
|
||||
auto lock = std::lock_guard{logd_lock};
|
||||
for (const auto& reader_thread : reader_list_->reader_threads()) {
|
||||
reader_thread->triggerReader_Locked();
|
||||
reader_thread->TriggerReader();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -86,7 +86,7 @@ int SerializedLogBuffer::Log(log_id_t log_id, log_time realtime, uid_t uid, pid_
|
|||
|
||||
auto sequence = sequence_.fetch_add(1, std::memory_order_relaxed);
|
||||
|
||||
auto lock = std::lock_guard{lock_};
|
||||
auto lock = std::lock_guard{logd_lock};
|
||||
|
||||
if (logs_[log_id].empty()) {
|
||||
logs_[log_id].push_back(SerializedLogChunk(max_size_[log_id] / 4));
|
||||
|
@ -140,8 +140,6 @@ void SerializedLogBuffer::NotifyReadersOfPrune(
|
|||
}
|
||||
|
||||
void SerializedLogBuffer::Prune(log_id_t log_id, size_t bytes_to_free, uid_t uid) {
|
||||
auto reader_threads_lock = std::lock_guard{reader_list_->reader_threads_lock()};
|
||||
|
||||
auto& log_buffer = logs_[log_id];
|
||||
auto it = log_buffer.begin();
|
||||
while (it != log_buffer.end()) {
|
||||
|
@ -158,7 +156,7 @@ void SerializedLogBuffer::Prune(log_id_t log_id, size_t bytes_to_free, uid_t uid
|
|||
// fast enough to not back-up logd. Instead, we can achieve an nearly-as-efficient
|
||||
// but not error-prune batching effect by waking the reader whenever any chunk is
|
||||
// about to be pruned.
|
||||
reader_thread->triggerReader_Locked();
|
||||
reader_thread->TriggerReader();
|
||||
}
|
||||
|
||||
// Some readers may be still reading from this log chunk, log a warning that they are
|
||||
|
@ -198,22 +196,14 @@ void SerializedLogBuffer::Prune(log_id_t log_id, size_t bytes_to_free, uid_t uid
|
|||
|
||||
std::unique_ptr<FlushToState> SerializedLogBuffer::CreateFlushToState(uint64_t start,
|
||||
LogMask log_mask) {
|
||||
return std::make_unique<SerializedFlushToState>(start, log_mask);
|
||||
}
|
||||
|
||||
void SerializedLogBuffer::DeleteFlushToState(std::unique_ptr<FlushToState> state) {
|
||||
auto lock = std::unique_lock{lock_};
|
||||
state.reset();
|
||||
return std::make_unique<SerializedFlushToState>(start, log_mask, logs_);
|
||||
}
|
||||
|
||||
bool SerializedLogBuffer::FlushTo(
|
||||
LogWriter* writer, FlushToState& abstract_state,
|
||||
const std::function<FilterResult(log_id_t log_id, pid_t pid, uint64_t sequence,
|
||||
log_time realtime)>& filter) {
|
||||
auto lock = std::unique_lock{lock_};
|
||||
|
||||
auto& state = reinterpret_cast<SerializedFlushToState&>(abstract_state);
|
||||
state.InitializeLogs(logs_);
|
||||
|
||||
while (state.HasUnreadLogs()) {
|
||||
LogWithId top = state.PopNextUnreadLog();
|
||||
|
@ -245,13 +235,14 @@ bool SerializedLogBuffer::FlushTo(
|
|||
unsigned char entry_copy[kMaxEntrySize] __attribute__((uninitialized));
|
||||
CHECK_LT(entry->msg_len(), LOGGER_ENTRY_MAX_PAYLOAD + 1);
|
||||
memcpy(entry_copy, entry, sizeof(*entry) + entry->msg_len());
|
||||
lock.unlock();
|
||||
logd_lock.unlock();
|
||||
|
||||
if (!reinterpret_cast<SerializedLogEntry*>(entry_copy)->Flush(writer, log_id)) {
|
||||
logd_lock.lock();
|
||||
return false;
|
||||
}
|
||||
|
||||
lock.lock();
|
||||
logd_lock.lock();
|
||||
}
|
||||
|
||||
state.set_start(state.start() + 1);
|
||||
|
@ -259,7 +250,7 @@ bool SerializedLogBuffer::FlushTo(
|
|||
}
|
||||
|
||||
bool SerializedLogBuffer::Clear(log_id_t id, uid_t uid) {
|
||||
auto lock = std::lock_guard{lock_};
|
||||
auto lock = std::lock_guard{logd_lock};
|
||||
Prune(id, ULONG_MAX, uid);
|
||||
|
||||
// Clearing SerializedLogBuffer never waits for readers and therefore is always successful.
|
||||
|
@ -275,7 +266,7 @@ size_t SerializedLogBuffer::GetSizeUsed(log_id_t id) {
|
|||
}
|
||||
|
||||
size_t SerializedLogBuffer::GetSize(log_id_t id) {
|
||||
auto lock = std::lock_guard{lock_};
|
||||
auto lock = std::lock_guard{logd_lock};
|
||||
return max_size_[id];
|
||||
}
|
||||
|
||||
|
@ -288,7 +279,7 @@ bool SerializedLogBuffer::SetSize(log_id_t id, size_t size) {
|
|||
return false;
|
||||
}
|
||||
|
||||
auto lock = std::lock_guard{lock_};
|
||||
auto lock = std::lock_guard{logd_lock};
|
||||
max_size_[id] = size;
|
||||
|
||||
MaybePrune(id);
|
||||
|
|
|
@ -30,9 +30,9 @@
|
|||
#include "LogReaderList.h"
|
||||
#include "LogStatistics.h"
|
||||
#include "LogTags.h"
|
||||
#include "LogdLock.h"
|
||||
#include "SerializedLogChunk.h"
|
||||
#include "SerializedLogEntry.h"
|
||||
#include "rwlock.h"
|
||||
|
||||
class SerializedLogBuffer final : public LogBuffer {
|
||||
public:
|
||||
|
@ -41,11 +41,12 @@ class SerializedLogBuffer final : public LogBuffer {
|
|||
|
||||
int Log(log_id_t log_id, log_time realtime, uid_t uid, pid_t pid, pid_t tid, const char* msg,
|
||||
uint16_t len) override;
|
||||
std::unique_ptr<FlushToState> CreateFlushToState(uint64_t start, LogMask log_mask) override;
|
||||
void DeleteFlushToState(std::unique_ptr<FlushToState> state) override;
|
||||
std::unique_ptr<FlushToState> CreateFlushToState(uint64_t start, LogMask log_mask)
|
||||
REQUIRES(logd_lock) override;
|
||||
bool FlushTo(LogWriter* writer, FlushToState& state,
|
||||
const std::function<FilterResult(log_id_t log_id, pid_t pid, uint64_t sequence,
|
||||
log_time realtime)>& filter) override;
|
||||
log_time realtime)>& filter)
|
||||
REQUIRES(logd_lock) override;
|
||||
|
||||
bool Clear(log_id_t id, uid_t uid) override;
|
||||
size_t GetSize(log_id_t id) override;
|
||||
|
@ -55,20 +56,19 @@ class SerializedLogBuffer final : public LogBuffer {
|
|||
|
||||
private:
|
||||
bool ShouldLog(log_id_t log_id, const char* msg, uint16_t len);
|
||||
void MaybePrune(log_id_t log_id) REQUIRES(lock_);
|
||||
void Prune(log_id_t log_id, size_t bytes_to_free, uid_t uid) REQUIRES(lock_);
|
||||
void MaybePrune(log_id_t log_id) REQUIRES(logd_lock);
|
||||
void Prune(log_id_t log_id, size_t bytes_to_free, uid_t uid) REQUIRES(logd_lock);
|
||||
void NotifyReadersOfPrune(log_id_t log_id, const std::list<SerializedLogChunk>::iterator& chunk)
|
||||
REQUIRES(reader_list_->reader_threads_lock());
|
||||
REQUIRES(logd_lock);
|
||||
void RemoveChunkFromStats(log_id_t log_id, SerializedLogChunk& chunk);
|
||||
size_t GetSizeUsed(log_id_t id) REQUIRES(lock_);
|
||||
size_t GetSizeUsed(log_id_t id) REQUIRES(logd_lock);
|
||||
|
||||
LogReaderList* reader_list_;
|
||||
LogTags* tags_;
|
||||
LogStatistics* stats_;
|
||||
|
||||
size_t max_size_[LOG_ID_MAX] GUARDED_BY(lock_) = {};
|
||||
std::list<SerializedLogChunk> logs_[LOG_ID_MAX] GUARDED_BY(lock_);
|
||||
RwLock lock_;
|
||||
size_t max_size_[LOG_ID_MAX] GUARDED_BY(logd_lock) = {};
|
||||
std::list<SerializedLogChunk> logs_[LOG_ID_MAX] GUARDED_BY(logd_lock);
|
||||
|
||||
std::atomic<uint64_t> sequence_ = 1;
|
||||
};
|
||||
|
|
|
@ -36,9 +36,9 @@ void SimpleLogBuffer::Init() {
|
|||
}
|
||||
|
||||
// Release any sleeping reader threads to dump their current content.
|
||||
auto reader_threads_lock = std::lock_guard{reader_list_->reader_threads_lock()};
|
||||
auto lock = std::lock_guard{logd_lock};
|
||||
for (const auto& reader_thread : reader_list_->reader_threads()) {
|
||||
reader_thread->triggerReader_Locked();
|
||||
reader_thread->TriggerReader();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -95,7 +95,7 @@ int SimpleLogBuffer::Log(log_id_t log_id, log_time realtime, uid_t uid, pid_t pi
|
|||
// exact entry with time specified in ms or us precision.
|
||||
if ((realtime.tv_nsec % 1000) == 0) ++realtime.tv_nsec;
|
||||
|
||||
auto lock = std::lock_guard{lock_};
|
||||
auto lock = std::lock_guard{logd_lock};
|
||||
auto sequence = sequence_.fetch_add(1, std::memory_order_relaxed);
|
||||
LogInternal(LogBufferElement(log_id, realtime, uid, pid, tid, sequence, msg, len));
|
||||
return len;
|
||||
|
@ -136,8 +136,6 @@ bool SimpleLogBuffer::FlushTo(
|
|||
LogWriter* writer, FlushToState& abstract_state,
|
||||
const std::function<FilterResult(log_id_t log_id, pid_t pid, uint64_t sequence,
|
||||
log_time realtime)>& filter) {
|
||||
auto shared_lock = SharedLock{lock_};
|
||||
|
||||
auto& state = reinterpret_cast<ChattyFlushToState&>(abstract_state);
|
||||
|
||||
std::list<LogBufferElement>::iterator it;
|
||||
|
@ -200,13 +198,14 @@ bool SimpleLogBuffer::FlushTo(
|
|||
state.last_tid()[element.log_id()] =
|
||||
(element.dropped_count() && !same_tid) ? 0 : element.tid();
|
||||
|
||||
shared_lock.unlock();
|
||||
logd_lock.unlock();
|
||||
// We never prune logs equal to or newer than any LogReaderThreads' `start` value, so the
|
||||
// `element` pointer is safe here without the lock
|
||||
if (!element.FlushTo(writer, stats_, same_tid)) {
|
||||
logd_lock.lock();
|
||||
return false;
|
||||
}
|
||||
shared_lock.lock_shared();
|
||||
logd_lock.lock();
|
||||
}
|
||||
|
||||
state.set_start(state.start() + 1);
|
||||
|
@ -217,7 +216,7 @@ bool SimpleLogBuffer::Clear(log_id_t id, uid_t uid) {
|
|||
// Try three times to clear, then disconnect the readers and try one final time.
|
||||
for (int retry = 0; retry < 3; ++retry) {
|
||||
{
|
||||
auto lock = std::lock_guard{lock_};
|
||||
auto lock = std::lock_guard{logd_lock};
|
||||
if (Prune(id, ULONG_MAX, uid)) {
|
||||
return true;
|
||||
}
|
||||
|
@ -229,27 +228,27 @@ bool SimpleLogBuffer::Clear(log_id_t id, uid_t uid) {
|
|||
// _blocked_ reader.
|
||||
bool busy = false;
|
||||
{
|
||||
auto lock = std::lock_guard{lock_};
|
||||
auto lock = std::lock_guard{logd_lock};
|
||||
busy = !Prune(id, 1, uid);
|
||||
}
|
||||
// It is still busy, disconnect all readers.
|
||||
if (busy) {
|
||||
auto reader_threads_lock = std::lock_guard{reader_list_->reader_threads_lock()};
|
||||
auto lock = std::lock_guard{logd_lock};
|
||||
for (const auto& reader_thread : reader_list_->reader_threads()) {
|
||||
if (reader_thread->IsWatching(id)) {
|
||||
LOG(WARNING) << "Kicking blocked reader, " << reader_thread->name()
|
||||
<< ", from LogBuffer::clear()";
|
||||
reader_thread->release_Locked();
|
||||
reader_thread->Release();
|
||||
}
|
||||
}
|
||||
}
|
||||
auto lock = std::lock_guard{lock_};
|
||||
auto lock = std::lock_guard{logd_lock};
|
||||
return Prune(id, ULONG_MAX, uid);
|
||||
}
|
||||
|
||||
// get the total space allocated to "id"
|
||||
size_t SimpleLogBuffer::GetSize(log_id_t id) {
|
||||
auto lock = SharedLock{lock_};
|
||||
auto lock = std::lock_guard{logd_lock};
|
||||
size_t retval = max_size_[id];
|
||||
return retval;
|
||||
}
|
||||
|
@ -261,7 +260,7 @@ bool SimpleLogBuffer::SetSize(log_id_t id, size_t size) {
|
|||
return false;
|
||||
}
|
||||
|
||||
auto lock = std::lock_guard{lock_};
|
||||
auto lock = std::lock_guard{logd_lock};
|
||||
max_size_[id] = size;
|
||||
return true;
|
||||
}
|
||||
|
@ -274,8 +273,6 @@ void SimpleLogBuffer::MaybePrune(log_id_t id) {
|
|||
}
|
||||
|
||||
bool SimpleLogBuffer::Prune(log_id_t id, unsigned long prune_rows, uid_t caller_uid) {
|
||||
auto reader_threads_lock = std::lock_guard{reader_list_->reader_threads_lock()};
|
||||
|
||||
// Don't prune logs that are newer than the point at which any reader threads are reading from.
|
||||
LogReaderThread* oldest = nullptr;
|
||||
for (const auto& reader_thread : reader_list_->reader_threads()) {
|
||||
|
@ -347,14 +344,14 @@ void SimpleLogBuffer::KickReader(LogReaderThread* reader, log_id_t id, unsigned
|
|||
// dropped if we hit too much memory pressure.
|
||||
LOG(WARNING) << "Kicking blocked reader, " << reader->name()
|
||||
<< ", from LogBuffer::kickMe()";
|
||||
reader->release_Locked();
|
||||
reader->Release();
|
||||
} else if (reader->deadline().time_since_epoch().count() != 0) {
|
||||
// Allow a blocked WRAP deadline reader to trigger and start reporting the log data.
|
||||
reader->triggerReader_Locked();
|
||||
reader->TriggerReader();
|
||||
} else {
|
||||
// tell slow reader to skip entries to catch up
|
||||
LOG(WARNING) << "Skipping " << prune_rows << " entries from slow reader, " << reader->name()
|
||||
<< ", from LogBuffer::kickMe()";
|
||||
reader->triggerSkip_Locked(id, prune_rows);
|
||||
reader->TriggerSkip(id, prune_rows);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -25,7 +25,7 @@
|
|||
#include "LogReaderList.h"
|
||||
#include "LogStatistics.h"
|
||||
#include "LogTags.h"
|
||||
#include "rwlock.h"
|
||||
#include "LogdLock.h"
|
||||
|
||||
class SimpleLogBuffer : public LogBuffer {
|
||||
public:
|
||||
|
@ -35,10 +35,12 @@ class SimpleLogBuffer : public LogBuffer {
|
|||
|
||||
int Log(log_id_t log_id, log_time realtime, uid_t uid, pid_t pid, pid_t tid, const char* msg,
|
||||
uint16_t len) override;
|
||||
std::unique_ptr<FlushToState> CreateFlushToState(uint64_t start, LogMask log_mask) override;
|
||||
std::unique_ptr<FlushToState> CreateFlushToState(uint64_t start, LogMask log_mask)
|
||||
REQUIRES(logd_lock) override;
|
||||
bool FlushTo(LogWriter* writer, FlushToState& state,
|
||||
const std::function<FilterResult(log_id_t log_id, pid_t pid, uint64_t sequence,
|
||||
log_time realtime)>& filter) override;
|
||||
log_time realtime)>& filter)
|
||||
REQUIRES(logd_lock) override;
|
||||
|
||||
bool Clear(log_id_t id, uid_t uid) override;
|
||||
size_t GetSize(log_id_t id) override;
|
||||
|
@ -47,27 +49,25 @@ class SimpleLogBuffer : public LogBuffer {
|
|||
uint64_t sequence() const override { return sequence_.load(std::memory_order_relaxed); }
|
||||
|
||||
protected:
|
||||
virtual bool Prune(log_id_t id, unsigned long prune_rows, uid_t uid) REQUIRES(lock_);
|
||||
virtual void LogInternal(LogBufferElement&& elem) REQUIRES(lock_);
|
||||
virtual bool Prune(log_id_t id, unsigned long prune_rows, uid_t uid) REQUIRES(logd_lock);
|
||||
virtual void LogInternal(LogBufferElement&& elem) REQUIRES(logd_lock);
|
||||
|
||||
// Returns an iterator to the oldest element for a given log type, or logs_.end() if
|
||||
// there are no logs for the given log type. Requires logs_lock_ to be held.
|
||||
std::list<LogBufferElement>::iterator GetOldest(log_id_t log_id) REQUIRES(lock_);
|
||||
// there are no logs for the given log type. Requires logs_logd_lock to be held.
|
||||
std::list<LogBufferElement>::iterator GetOldest(log_id_t log_id) REQUIRES(logd_lock);
|
||||
std::list<LogBufferElement>::iterator Erase(std::list<LogBufferElement>::iterator it)
|
||||
REQUIRES(lock_);
|
||||
REQUIRES(logd_lock);
|
||||
void KickReader(LogReaderThread* reader, log_id_t id, unsigned long prune_rows)
|
||||
REQUIRES_SHARED(lock_);
|
||||
REQUIRES(logd_lock);
|
||||
|
||||
LogStatistics* stats() { return stats_; }
|
||||
LogReaderList* reader_list() { return reader_list_; }
|
||||
size_t max_size(log_id_t id) REQUIRES_SHARED(lock_) { return max_size_[id]; }
|
||||
size_t max_size(log_id_t id) REQUIRES_SHARED(logd_lock) { return max_size_[id]; }
|
||||
std::list<LogBufferElement>& logs() { return logs_; }
|
||||
|
||||
RwLock lock_;
|
||||
|
||||
private:
|
||||
bool ShouldLog(log_id_t log_id, const char* msg, uint16_t len);
|
||||
void MaybePrune(log_id_t id) REQUIRES(lock_);
|
||||
void MaybePrune(log_id_t id) REQUIRES(logd_lock);
|
||||
|
||||
LogReaderList* reader_list_;
|
||||
LogTags* tags_;
|
||||
|
@ -75,9 +75,9 @@ class SimpleLogBuffer : public LogBuffer {
|
|||
|
||||
std::atomic<uint64_t> sequence_ = 1;
|
||||
|
||||
size_t max_size_[LOG_ID_MAX] GUARDED_BY(lock_);
|
||||
std::list<LogBufferElement> logs_ GUARDED_BY(lock_);
|
||||
size_t max_size_[LOG_ID_MAX] GUARDED_BY(logd_lock);
|
||||
std::list<LogBufferElement> logs_ GUARDED_BY(logd_lock);
|
||||
// Keeps track of the iterator to the oldest log message of a given log type, as an
|
||||
// optimization when pruning logs. Use GetOldest() to retrieve.
|
||||
std::optional<std::list<LogBufferElement>::iterator> oldest_[LOG_ID_MAX] GUARDED_BY(lock_);
|
||||
std::optional<std::list<LogBufferElement>::iterator> oldest_[LOG_ID_MAX] GUARDED_BY(logd_lock);
|
||||
};
|
||||
|
|
|
@ -126,7 +126,7 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) {
|
|||
|
||||
// Read out all of the logs.
|
||||
{
|
||||
auto lock = std::unique_lock{reader_list.reader_threads_lock()};
|
||||
auto lock = std::unique_lock{logd_lock};
|
||||
std::unique_ptr<LogWriter> test_writer(new NoopWriter());
|
||||
std::unique_ptr<LogReaderThread> log_reader(
|
||||
new LogReaderThread(log_buffer.get(), &reader_list, std::move(test_writer), true, 0,
|
||||
|
@ -137,7 +137,7 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) {
|
|||
// Wait until the reader has finished.
|
||||
while (true) {
|
||||
usleep(50);
|
||||
auto lock = std::unique_lock{reader_list.reader_threads_lock()};
|
||||
auto lock = std::unique_lock{logd_lock};
|
||||
if (reader_list.reader_threads().size() == 0) {
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -1,56 +0,0 @@
|
|||
/*
|
||||
* Copyright (C) 2020 The Android Open Source Project
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <pthread.h>
|
||||
|
||||
#include <android-base/macros.h>
|
||||
#include <android-base/thread_annotations.h>
|
||||
|
||||
// As of the end of May 2020, std::shared_mutex is *not* simply a pthread_rwlock, but rather a
|
||||
// combination of std::mutex and std::condition variable, which is obviously less efficient. This
|
||||
// immitates what std::shared_mutex should be doing and is compatible with RAII thread wrappers.
|
||||
|
||||
class SHARED_CAPABILITY("mutex") RwLock {
|
||||
public:
|
||||
RwLock() {}
|
||||
~RwLock() {}
|
||||
|
||||
void lock() ACQUIRE() { pthread_rwlock_wrlock(&rwlock_); }
|
||||
void lock_shared() ACQUIRE_SHARED() { pthread_rwlock_rdlock(&rwlock_); }
|
||||
|
||||
void unlock() RELEASE() { pthread_rwlock_unlock(&rwlock_); }
|
||||
|
||||
private:
|
||||
pthread_rwlock_t rwlock_ = PTHREAD_RWLOCK_INITIALIZER;
|
||||
};
|
||||
|
||||
// std::shared_lock does not have thread annotations, so we need our own.
|
||||
|
||||
class SCOPED_CAPABILITY SharedLock {
|
||||
public:
|
||||
explicit SharedLock(RwLock& lock) ACQUIRE_SHARED(lock) : lock_(lock) { lock_.lock_shared(); }
|
||||
~SharedLock() RELEASE() { lock_.unlock(); }
|
||||
|
||||
void lock_shared() ACQUIRE_SHARED() { lock_.lock_shared(); }
|
||||
void unlock() RELEASE() { lock_.unlock(); }
|
||||
|
||||
DISALLOW_IMPLICIT_CONSTRUCTORS(SharedLock);
|
||||
|
||||
private:
|
||||
RwLock& lock_;
|
||||
};
|
Loading…
Reference in New Issue