diff --git a/logd/ChattyLogBufferTest.cpp b/logd/ChattyLogBufferTest.cpp index 2e0c9470a..8754b885a 100644 --- a/logd/ChattyLogBufferTest.cpp +++ b/logd/ChattyLogBufferTest.cpp @@ -61,7 +61,8 @@ TEST_P(ChattyLogBufferTest, deduplication_simple) { std::vector read_log_messages; std::unique_ptr test_writer(new TestWriter(&read_log_messages, nullptr)); - log_buffer_->FlushTo(test_writer.get(), 1, nullptr, nullptr); + std::unique_ptr flush_to_state = log_buffer_->CreateFlushToState(1, kLogMaskAll); + EXPECT_TRUE(log_buffer_->FlushTo(test_writer.get(), *flush_to_state, nullptr)); std::vector expected_log_messages = { make_message(0, "test_tag", "duplicate"), @@ -72,12 +73,12 @@ TEST_P(ChattyLogBufferTest, deduplication_simple) { make_message(5, "test_tag", "not_same"), // 3 duplicate logs together print the first, a 1 count chatty message, then the last. make_message(6, "test_tag", "duplicate"), - make_message(7, "chatty", "uid=0\\([^\\)]+\\) [^ ]+ expire 1 line", true), + make_message(7, "chatty", "uid=0\\([^\\)]+\\) [^ ]+ identical 1 line", true), make_message(8, "test_tag", "duplicate"), make_message(9, "test_tag", "not_same"), // 6 duplicate logs together print the first, a 4 count chatty message, then the last. make_message(10, "test_tag", "duplicate"), - make_message(14, "chatty", "uid=0\\([^\\)]+\\) [^ ]+ expire 4 lines", true), + make_message(14, "chatty", "uid=0\\([^\\)]+\\) [^ ]+ identical 4 lines", true), make_message(15, "test_tag", "duplicate"), make_message(16, "test_tag", "not_same"), // duplicate logs > 1 minute apart are not deduplicated. @@ -117,15 +118,16 @@ TEST_P(ChattyLogBufferTest, deduplication_overflow) { std::vector read_log_messages; std::unique_ptr test_writer(new TestWriter(&read_log_messages, nullptr)); - log_buffer_->FlushTo(test_writer.get(), 1, nullptr, nullptr); + std::unique_ptr flush_to_state = log_buffer_->CreateFlushToState(1, kLogMaskAll); + EXPECT_TRUE(log_buffer_->FlushTo(test_writer.get(), *flush_to_state, nullptr)); std::vector expected_log_messages = { make_message(0, "test_tag", "normal"), make_message(1, "test_tag", "duplicate"), make_message(expired_per_chatty_message + 1, "chatty", - "uid=0\\([^\\)]+\\) [^ ]+ expire 65535 lines", true), + "uid=0\\([^\\)]+\\) [^ ]+ identical 65535 lines", true), make_message(expired_per_chatty_message + 2, "chatty", - "uid=0\\([^\\)]+\\) [^ ]+ expire 1 line", true), + "uid=0\\([^\\)]+\\) [^ ]+ identical 1 line", true), make_message(expired_per_chatty_message + 3, "test_tag", "duplicate"), make_message(expired_per_chatty_message + 4, "test_tag", "normal"), }; @@ -172,7 +174,8 @@ TEST_P(ChattyLogBufferTest, deduplication_liblog) { std::vector read_log_messages; std::unique_ptr test_writer(new TestWriter(&read_log_messages, nullptr)); - log_buffer_->FlushTo(test_writer.get(), 1, nullptr, nullptr); + std::unique_ptr flush_to_state = log_buffer_->CreateFlushToState(1, kLogMaskAll); + EXPECT_TRUE(log_buffer_->FlushTo(test_writer.get(), *flush_to_state, nullptr)); std::vector expected_log_messages = { make_message(0, 1234, 1), @@ -199,4 +202,4 @@ TEST_P(ChattyLogBufferTest, deduplication_liblog) { CompareLogMessages(expected_log_messages, read_log_messages); }; -INSTANTIATE_TEST_CASE_P(ChattyLogBufferTests, ChattyLogBufferTest, testing::Values("chatty")); \ No newline at end of file +INSTANTIATE_TEST_CASE_P(ChattyLogBufferTests, ChattyLogBufferTest, testing::Values("chatty")); diff --git a/logd/LogBuffer.h b/logd/LogBuffer.h index 859d7400c..a3ac683f8 100644 --- a/logd/LogBuffer.h +++ b/logd/LogBuffer.h @@ -25,6 +25,27 @@ #include "LogWriter.h" +// A mask to represent which log buffers a reader is watching, values are (1 << LOG_ID_MAIN), etc. +using LogMask = uint32_t; +constexpr uint32_t kLogMaskAll = 0xFFFFFFFF; + +// State that a LogBuffer may want to persist across calls to FlushTo(). +class FlushToState { + public: + FlushToState(uint64_t start, LogMask log_mask) : start_(start), log_mask_(log_mask) {} + virtual ~FlushToState() {} + + uint64_t start() const { return start_; } + void set_start(uint64_t start) { start_ = start; } + + LogMask log_mask() const { return log_mask_; } + + private: + uint64_t start_; + LogMask log_mask_; +}; + +// Enum for the return values of the `filter` function passed to FlushTo(). enum class FilterResult { kSkip, kStop, @@ -39,19 +60,16 @@ class LogBuffer { virtual int Log(log_id_t log_id, log_time realtime, uid_t uid, pid_t pid, pid_t tid, const char* msg, uint16_t len) = 0; - // lastTid is an optional context to help detect if the last previous - // valid message was from the same source so we can differentiate chatty - // filter types (identical or expired) - static const uint64_t FLUSH_ERROR = 0; - virtual uint64_t FlushTo(LogWriter* writer, uint64_t start, - pid_t* last_tid, // nullable - const std::function& filter) = 0; + + virtual std::unique_ptr CreateFlushToState(uint64_t start, LogMask log_mask) = 0; + virtual bool FlushTo(LogWriter* writer, FlushToState& state, + const std::function& filter) = 0; virtual bool Clear(log_id_t id, uid_t uid) = 0; virtual unsigned long GetSize(log_id_t id) = 0; virtual int SetSize(log_id_t id, unsigned long size) = 0; virtual uint64_t sequence() const = 0; -}; \ No newline at end of file +}; diff --git a/logd/LogBufferTest.cpp b/logd/LogBufferTest.cpp index 457d2fb29..bc01c805e 100644 --- a/logd/LogBufferTest.cpp +++ b/logd/LogBufferTest.cpp @@ -208,8 +208,9 @@ TEST_P(LogBufferTest, smoke) { std::vector read_log_messages; std::unique_ptr test_writer(new TestWriter(&read_log_messages, nullptr)); - uint64_t flush_result = log_buffer_->FlushTo(test_writer.get(), 1, nullptr, nullptr); - EXPECT_EQ(1ULL, flush_result); + std::unique_ptr flush_to_state = log_buffer_->CreateFlushToState(1, kLogMaskAll); + EXPECT_TRUE(log_buffer_->FlushTo(test_writer.get(), *flush_to_state, nullptr)); + EXPECT_EQ(2ULL, flush_to_state->start()); CompareLogMessages(log_messages, read_log_messages); } diff --git a/logd/LogReader.cpp b/logd/LogReader.cpp index 35c46aac7..fc461fd1e 100644 --- a/logd/LogReader.cpp +++ b/logd/LogReader.cpp @@ -171,16 +171,12 @@ bool LogReader::onDataAvailable(SocketClient* cli) { if (start != log_time::EPOCH) { bool start_time_set = false; uint64_t last = sequence; - auto log_find_start = [pid, logMask, start, &sequence, &start_time_set, &last]( - log_id_t element_log_id, pid_t element_pid, - uint64_t element_sequence, log_time element_realtime, - uint16_t) -> FilterResult { + auto log_find_start = [pid, start, &sequence, &start_time_set, &last]( + log_id_t, pid_t element_pid, uint64_t element_sequence, + log_time element_realtime, uint16_t) -> FilterResult { if (pid && pid != element_pid) { return FilterResult::kSkip; } - if ((logMask & (1 << element_log_id)) == 0) { - return FilterResult::kSkip; - } if (start == element_realtime) { sequence = element_sequence; start_time_set = true; @@ -195,8 +191,8 @@ bool LogReader::onDataAvailable(SocketClient* cli) { } return FilterResult::kSkip; }; - - log_buffer_->FlushTo(socket_log_writer.get(), sequence, nullptr, log_find_start); + auto flush_to_state = log_buffer_->CreateFlushToState(sequence, logMask); + log_buffer_->FlushTo(socket_log_writer.get(), *flush_to_state, log_find_start); if (!start_time_set) { if (nonBlock) { diff --git a/logd/LogReaderList.cpp b/logd/LogReaderList.cpp index 220027b28..32ba2910d 100644 --- a/logd/LogReaderList.cpp +++ b/logd/LogReaderList.cpp @@ -18,7 +18,7 @@ // When we are notified a new log entry is available, inform // listening sockets who are watching this entry's log id. -void LogReaderList::NotifyNewLog(unsigned int log_mask) const { +void LogReaderList::NotifyNewLog(LogMask log_mask) const { auto lock = std::lock_guard{reader_threads_lock_}; for (const auto& entry : reader_threads_) { diff --git a/logd/LogReaderList.h b/logd/LogReaderList.h index 0d84aba9a..594716a5b 100644 --- a/logd/LogReaderList.h +++ b/logd/LogReaderList.h @@ -20,11 +20,12 @@ #include #include +#include "LogBuffer.h" #include "LogReaderThread.h" class LogReaderList { public: - void NotifyNewLog(unsigned int log_mask) const; + void NotifyNewLog(LogMask log_mask) const; std::list>& reader_threads() { return reader_threads_; } std::mutex& reader_threads_lock() { return reader_threads_lock_; } diff --git a/logd/LogReaderThread.cpp b/logd/LogReaderThread.cpp index 3a83f3f5f..c6e60feff 100644 --- a/logd/LogReaderThread.cpp +++ b/logd/LogReaderThread.cpp @@ -29,24 +29,22 @@ using namespace std::placeholders; LogReaderThread::LogReaderThread(LogBuffer* log_buffer, LogReaderList* reader_list, std::unique_ptr writer, bool non_block, - unsigned long tail, unsigned int log_mask, pid_t pid, + unsigned long tail, LogMask log_mask, pid_t pid, log_time start_time, uint64_t start, std::chrono::steady_clock::time_point deadline) : log_buffer_(log_buffer), reader_list_(reader_list), writer_(std::move(writer)), leading_dropped_(false), - log_mask_(log_mask), pid_(pid), tail_(tail), count_(0), index_(0), start_time_(start_time), - start_(start), deadline_(deadline), non_block_(non_block) { - memset(last_tid_, 0, sizeof(last_tid_)); cleanSkip_Locked(); + flush_to_state_ = log_buffer_->CreateFlushToState(start, log_mask); auto thread = std::thread{&LogReaderThread::ThreadFunction, this}; thread.detach(); } @@ -58,8 +56,6 @@ void LogReaderThread::ThreadFunction() { auto lock = std::unique_lock{reader_list_->reader_threads_lock()}; - uint64_t start = start_; - while (!release_) { if (deadline_.time_since_epoch().count() != 0) { if (thread_triggered_condition_.wait_until(lock, deadline_) == @@ -74,7 +70,9 @@ void LogReaderThread::ThreadFunction() { lock.unlock(); if (tail_) { - log_buffer_->FlushTo(writer_.get(), start, nullptr, + auto first_pass_state = log_buffer_->CreateFlushToState(flush_to_state_->start(), + flush_to_state_->log_mask()); + log_buffer_->FlushTo(writer_.get(), *first_pass_state, [this](log_id_t log_id, pid_t pid, uint64_t sequence, log_time realtime, uint16_t dropped_count) { return FilterFirstPass(log_id, pid, sequence, realtime, @@ -84,12 +82,12 @@ void LogReaderThread::ThreadFunction() { true; // TODO: Likely a bug, if leading_dropped_ was not true before calling // flushTo(), then it should not be reset to true after. } - start = log_buffer_->FlushTo(writer_.get(), start, last_tid_, - [this](log_id_t log_id, pid_t pid, uint64_t sequence, - log_time realtime, uint16_t dropped_count) { - return FilterSecondPass(log_id, pid, sequence, realtime, - dropped_count); - }); + bool flush_success = log_buffer_->FlushTo( + writer_.get(), *flush_to_state_, + [this](log_id_t log_id, pid_t pid, uint64_t sequence, log_time realtime, + uint16_t dropped_count) { + return FilterSecondPass(log_id, pid, sequence, realtime, dropped_count); + }); // We only ignore entries before the original start time for the first flushTo(), if we // get entries after this first flush before the original start time, then the client @@ -102,12 +100,10 @@ void LogReaderThread::ThreadFunction() { lock.lock(); - if (start == LogBuffer::FLUSH_ERROR) { + if (!flush_success) { break; } - start_ = start + 1; - if (non_block_ || release_) { break; } @@ -131,8 +127,8 @@ void LogReaderThread::ThreadFunction() { } // A first pass to count the number of elements -FilterResult LogReaderThread::FilterFirstPass(log_id_t log_id, pid_t pid, uint64_t sequence, - log_time realtime, uint16_t dropped_count) { +FilterResult LogReaderThread::FilterFirstPass(log_id_t, pid_t pid, uint64_t, log_time realtime, + uint16_t dropped_count) { auto lock = std::lock_guard{reader_list_->reader_threads_lock()}; if (leading_dropped_) { @@ -142,12 +138,7 @@ FilterResult LogReaderThread::FilterFirstPass(log_id_t log_id, pid_t pid, uint64 leading_dropped_ = false; } - if (count_ == 0) { - start_ = sequence; - } - - if ((!pid_ || pid_ == pid) && IsWatching(log_id) && - (start_time_ == log_time::EPOCH || start_time_ <= realtime)) { + if ((!pid_ || pid_ == pid) && (start_time_ == log_time::EPOCH || start_time_ <= realtime)) { ++count_; } @@ -155,12 +146,10 @@ FilterResult LogReaderThread::FilterFirstPass(log_id_t log_id, pid_t pid, uint64 } // A second pass to send the selected elements -FilterResult LogReaderThread::FilterSecondPass(log_id_t log_id, pid_t pid, uint64_t sequence, +FilterResult LogReaderThread::FilterSecondPass(log_id_t log_id, pid_t pid, uint64_t, log_time realtime, uint16_t dropped_count) { auto lock = std::lock_guard{reader_list_->reader_threads_lock()}; - start_ = sequence; - if (skip_ahead_[log_id]) { skip_ahead_[log_id]--; return FilterResult::kSkip; @@ -178,10 +167,6 @@ FilterResult LogReaderThread::FilterSecondPass(log_id_t log_id, pid_t pid, uint6 return FilterResult::kStop; } - if (!IsWatching(log_id)) { - return FilterResult::kSkip; - } - if (pid_ && pid_ != pid) { return FilterResult::kSkip; } diff --git a/logd/LogReaderThread.h b/logd/LogReaderThread.h index ba810634b..f288d68cc 100644 --- a/logd/LogReaderThread.h +++ b/logd/LogReaderThread.h @@ -38,7 +38,7 @@ class LogReaderThread { public: LogReaderThread(LogBuffer* log_buffer, LogReaderList* reader_list, std::unique_ptr writer, bool non_block, unsigned long tail, - unsigned int log_mask, pid_t pid, log_time start_time, uint64_t sequence, + LogMask log_mask, pid_t pid, log_time start_time, uint64_t sequence, std::chrono::steady_clock::time_point deadline); void triggerReader_Locked() { thread_triggered_condition_.notify_all(); } @@ -52,11 +52,13 @@ class LogReaderThread { thread_triggered_condition_.notify_all(); } - bool IsWatching(log_id_t id) const { return log_mask_ & (1 << id); } - bool IsWatchingMultiple(unsigned int log_mask) const { return log_mask_ & log_mask; } + bool IsWatching(log_id_t id) const { return flush_to_state_->log_mask() & (1 << id); } + bool IsWatchingMultiple(LogMask log_mask) const { + return flush_to_state_->log_mask() & log_mask; + } std::string name() const { return writer_->name(); } - uint64_t start() const { return start_; } + uint64_t start() const { return flush_to_state_->start(); } std::chrono::steady_clock::time_point deadline() const { return deadline_; } private: @@ -78,16 +80,14 @@ class LogReaderThread { // messages should be ignored. bool leading_dropped_; - // A mask of the logs buffers that are read by this reader. - const unsigned int log_mask_; // If set to non-zero, only pids equal to this are read by the reader. const pid_t pid_; // When a reader is referencing (via start_) old elements in the log buffer, and the log // buffer's size grows past its memory limit, the log buffer may request the reader to skip // ahead a specified number of logs. unsigned int skip_ahead_[LOG_ID_MAX]; - // Used for distinguishing 'dropped' messages for duplicate logs vs chatty drops - pid_t last_tid_[LOG_ID_MAX]; + // LogBuffer::FlushTo() needs to store state across subsequent calls. + std::unique_ptr flush_to_state_; // These next three variables are used for reading only the most recent lines aka `adb logcat // -t` / `adb logcat -T`. @@ -103,8 +103,6 @@ class LogReaderThread { // When a reader requests logs starting from a given timestamp, its stored here for the first // pass, such that logs before this time stamp that are accumulated in the buffer are ignored. log_time start_time_; - // The point from which the reader will read logs once awoken. - uint64_t start_; // CLOCK_MONOTONIC based deadline used for log wrapping. If this deadline expires before logs // wrap, then wake up and send the logs to the reader anyway. std::chrono::steady_clock::time_point deadline_; diff --git a/logd/SimpleLogBuffer.cpp b/logd/SimpleLogBuffer.cpp index ceecc6d10..561b9682e 100644 --- a/logd/SimpleLogBuffer.cpp +++ b/logd/SimpleLogBuffer.cpp @@ -110,14 +110,34 @@ void SimpleLogBuffer::LogInternal(LogBufferElement&& elem) { reader_list_->NotifyNewLog(1 << log_id); } -uint64_t SimpleLogBuffer::FlushTo( - LogWriter* writer, uint64_t start, pid_t* last_tid, +// These extra parameters are only required for chatty, but since they're a no-op for +// SimpleLogBuffer, it's easier to include them here, then to duplicate FlushTo() for +// ChattyLogBuffer. +class ChattyFlushToState : public FlushToState { + public: + ChattyFlushToState(uint64_t start, LogMask log_mask) : FlushToState(start, log_mask) {} + + pid_t* last_tid() { return last_tid_; } + + private: + pid_t last_tid_[LOG_ID_MAX] = {}; +}; + +std::unique_ptr SimpleLogBuffer::CreateFlushToState(uint64_t start, + LogMask log_mask) { + return std::make_unique(start, log_mask); +} + +bool SimpleLogBuffer::FlushTo( + LogWriter* writer, FlushToState& abstract_state, const std::function& filter) { auto shared_lock = SharedLock{lock_}; + auto& state = reinterpret_cast(abstract_state); + std::list::iterator it; - if (start <= 1) { + if (state.start() <= 1) { // client wants to start from the beginning it = logs_.begin(); } else { @@ -126,20 +146,20 @@ uint64_t SimpleLogBuffer::FlushTo( for (it = logs_.end(); it != logs_.begin(); /* do nothing */) { --it; - if (it->getSequence() == start) { + if (it->getSequence() == state.start()) { break; - } else if (it->getSequence() < start) { + } else if (it->getSequence() < state.start()) { it++; break; } } } - uint64_t curr = start; - for (; it != logs_.end(); ++it) { LogBufferElement& element = *it; + state.set_start(element.getSequence()); + if (!writer->privileged() && element.getUid() != writer->uid()) { continue; } @@ -148,6 +168,10 @@ uint64_t SimpleLogBuffer::FlushTo( continue; } + if (((1 << element.getLogId()) & state.log_mask()) == 0) { + continue; + } + if (filter) { FilterResult ret = filter(element.getLogId(), element.getPid(), element.getSequence(), element.getRealTime(), element.getDropped()); @@ -159,31 +183,24 @@ uint64_t SimpleLogBuffer::FlushTo( } } - bool same_tid = false; - if (last_tid) { - same_tid = last_tid[element.getLogId()] == element.getTid(); - // Dropped (chatty) immediately following a valid log from the - // same source in the same log buffer indicates we have a - // multiple identical squash. chatty that differs source - // is due to spam filter. chatty to chatty of different - // source is also due to spam filter. - last_tid[element.getLogId()] = - (element.getDropped() && !same_tid) ? 0 : element.getTid(); - } + bool same_tid = state.last_tid()[element.getLogId()] == element.getTid(); + // Dropped (chatty) immediately following a valid log from the same source in the same log + // buffer indicates we have a multiple identical squash. chatty that differs source is due + // to spam filter. chatty to chatty of different source is also due to spam filter. + state.last_tid()[element.getLogId()] = + (element.getDropped() && !same_tid) ? 0 : element.getTid(); shared_lock.unlock(); - // We never prune logs equal to or newer than any LogReaderThreads' `start` value, so the // `element` pointer is safe here without the lock - curr = element.getSequence(); if (!element.FlushTo(writer, stats_, same_tid)) { - return FLUSH_ERROR; + return false; } - shared_lock.lock_shared(); } - return curr; + state.set_start(state.start() + 1); + return true; } // clear all rows of type "id" from the buffer. diff --git a/logd/SimpleLogBuffer.h b/logd/SimpleLogBuffer.h index 72d26b060..a2ab881ee 100644 --- a/logd/SimpleLogBuffer.h +++ b/logd/SimpleLogBuffer.h @@ -35,10 +35,11 @@ class SimpleLogBuffer : public LogBuffer { int Log(log_id_t log_id, log_time realtime, uid_t uid, pid_t pid, pid_t tid, const char* msg, uint16_t len) override; - uint64_t FlushTo(LogWriter* writer, uint64_t start, pid_t* lastTid, - const std::function& - filter) override; + std::unique_ptr CreateFlushToState(uint64_t start, LogMask log_mask) override; + bool FlushTo(LogWriter* writer, FlushToState& state, + const std::function& + filter) override; bool Clear(log_id_t id, uid_t uid) override; unsigned long GetSize(log_id_t id) override;