diff --git a/logd/FlushCommand.cpp b/logd/FlushCommand.cpp index 658e079a5..bd1755553 100644 --- a/logd/FlushCommand.cpp +++ b/logd/FlushCommand.cpp @@ -42,7 +42,7 @@ void FlushCommand::runSocketCommand(SocketClient* client) { LogTimeEntry::wrlock(); LastLogTimes::iterator it = times.begin(); while (it != times.end()) { - entry = (*it); + entry = it->get(); if (entry->mClient == client) { if (!entry->isWatchingMultiple(mLogMask)) { LogTimeEntry::unlock(); @@ -63,31 +63,12 @@ void FlushCommand::runSocketCommand(SocketClient* client) { } } entry->triggerReader_Locked(); - if (entry->runningReader_Locked()) { - LogTimeEntry::unlock(); - return; - } - entry->incRef_Locked(); - break; + LogTimeEntry::unlock(); + return; } it++; } - if (it == times.end()) { - // Create LogTimeEntry in notifyNewLog() ? - if (mTail == (unsigned long)-1) { - LogTimeEntry::unlock(); - return; - } - entry = new LogTimeEntry(mReader, client, mNonBlock, mTail, mLogMask, - mPid, mStart, mTimeout); - times.push_front(entry); - } - - client->incRef(); - - // release client and entry reference counts once done - entry->startReader_Locked(); LogTimeEntry::unlock(); } diff --git a/logd/FlushCommand.h b/logd/FlushCommand.h index 543dfc3d7..ceaf39362 100644 --- a/logd/FlushCommand.h +++ b/logd/FlushCommand.h @@ -27,36 +27,11 @@ class LogReader; class FlushCommand : public SocketClientCommand { LogReader& mReader; - bool mNonBlock; - unsigned long mTail; log_mask_t mLogMask; - pid_t mPid; - log_time mStart; - uint64_t mTimeout; public: - // for opening a reader - explicit FlushCommand(LogReader& reader, bool nonBlock, unsigned long tail, - log_mask_t logMask, pid_t pid, log_time start, - uint64_t timeout) - : mReader(reader), - mNonBlock(nonBlock), - mTail(tail), - mLogMask(logMask), - mPid(pid), - mStart(start), - mTimeout((start != log_time::EPOCH) ? timeout : 0) { - } - - // for notification of an update explicit FlushCommand(LogReader& reader, log_mask_t logMask) - : mReader(reader), - mNonBlock(false), - mTail(-1), - mLogMask(logMask), - mPid(0), - mStart(log_time::EPOCH), - mTimeout(0) { + : mReader(reader), mLogMask(logMask) { } virtual void runSocketCommand(SocketClient* client); diff --git a/logd/LogBuffer.cpp b/logd/LogBuffer.cpp index fd1b8b211..fbdbf79bb 100644 --- a/logd/LogBuffer.cpp +++ b/logd/LogBuffer.cpp @@ -105,10 +105,8 @@ void LogBuffer::init() { LastLogTimes::iterator times = mTimes.begin(); while (times != mTimes.end()) { - LogTimeEntry* entry = (*times); - if (entry->owned_Locked()) { - entry->triggerReader_Locked(); - } + LogTimeEntry* entry = times->get(); + entry->triggerReader_Locked(); times++; } @@ -409,17 +407,15 @@ void LogBuffer::log(LogBufferElement* elem) { LastLogTimes::iterator times = mTimes.begin(); while (times != mTimes.end()) { - LogTimeEntry* entry = (*times); - if (entry->owned_Locked()) { - if (!entry->mNonBlock) { - end_always = true; - break; - } - // it passing mEnd is blocked by the following checks. - if (!end_set || (end <= entry->mEnd)) { - end = entry->mEnd; - end_set = true; - } + LogTimeEntry* entry = times->get(); + if (!entry->mNonBlock) { + end_always = true; + break; + } + // it passing mEnd is blocked by the following checks. + if (!end_set || (end <= entry->mEnd)) { + end = entry->mEnd; + end_set = true; } times++; } @@ -710,8 +706,8 @@ bool LogBuffer::prune(log_id_t id, unsigned long pruneRows, uid_t caller_uid) { // Region locked? LastLogTimes::iterator times = mTimes.begin(); while (times != mTimes.end()) { - LogTimeEntry* entry = (*times); - if (entry->owned_Locked() && entry->isWatching(id) && + LogTimeEntry* entry = times->get(); + if (entry->isWatching(id) && (!oldest || (oldest->mStart > entry->mStart) || ((oldest->mStart == entry->mStart) && (entry->mTimeout.tv_sec || entry->mTimeout.tv_nsec)))) { @@ -1052,9 +1048,9 @@ bool LogBuffer::clear(log_id_t id, uid_t uid) { LogTimeEntry::wrlock(); LastLogTimes::iterator times = mTimes.begin(); while (times != mTimes.end()) { - LogTimeEntry* entry = (*times); + LogTimeEntry* entry = times->get(); // Killer punch - if (entry->owned_Locked() && entry->isWatching(id)) { + if (entry->isWatching(id)) { entry->release_Locked(); } times++; diff --git a/logd/LogReader.cpp b/logd/LogReader.cpp index 2b6556d7b..13c7af3a9 100644 --- a/logd/LogReader.cpp +++ b/logd/LogReader.cpp @@ -41,6 +41,7 @@ void LogReader::notifyNewLog(log_mask_t logMask) { runOnEachSocket(&command); } +// Note returning false will release the SocketClient instance. bool LogReader::onDataAvailable(SocketClient* cli) { static bool name_set; if (!name_set) { @@ -57,6 +58,18 @@ bool LogReader::onDataAvailable(SocketClient* cli) { } buffer[len] = '\0'; + // Clients are only allowed to send one command, disconnect them if they + // send another. + LogTimeEntry::wrlock(); + for (const auto& entry : mLogbuf.mTimes) { + if (entry->mClient == cli) { + entry->release_Locked(); + LogTimeEntry::unlock(); + return false; + } + } + LogTimeEntry::unlock(); + unsigned long tail = 0; static const char _tail[] = " tail="; char* cp = strstr(buffer, _tail); @@ -199,14 +212,25 @@ bool LogReader::onDataAvailable(SocketClient* cli) { cli->getUid(), cli->getGid(), cli->getPid(), nonBlock ? 'n' : 'b', tail, logMask, (int)pid, sequence.nsec(), timeout); - FlushCommand command(*this, nonBlock, tail, logMask, pid, sequence, timeout); + LogTimeEntry::wrlock(); + auto entry = std::make_unique( + *this, cli, nonBlock, tail, logMask, pid, sequence, timeout); + if (!entry->startReader_Locked()) { + LogTimeEntry::unlock(); + return false; + } + + // release client and entry reference counts once done + cli->incRef(); + mLogbuf.mTimes.emplace_front(std::move(entry)); // Set acceptable upper limit to wait for slow reader processing b/27242723 struct timeval t = { LOGD_SNDTIMEO, 0 }; setsockopt(cli->getSocket(), SOL_SOCKET, SO_SNDTIMEO, (const char*)&t, sizeof(t)); - command.runSocketCommand(cli); + LogTimeEntry::unlock(); + return true; } @@ -215,9 +239,8 @@ void LogReader::doSocketDelete(SocketClient* cli) { LogTimeEntry::wrlock(); LastLogTimes::iterator it = times.begin(); while (it != times.end()) { - LogTimeEntry* entry = (*it); + LogTimeEntry* entry = it->get(); if (entry->mClient == cli) { - times.erase(it); entry->release_Locked(); break; } diff --git a/logd/LogTimes.cpp b/logd/LogTimes.cpp index 7a6f84b70..171550194 100644 --- a/logd/LogTimes.cpp +++ b/logd/LogTimes.cpp @@ -30,11 +30,7 @@ pthread_mutex_t LogTimeEntry::timesLock = PTHREAD_MUTEX_INITIALIZER; LogTimeEntry::LogTimeEntry(LogReader& reader, SocketClient* client, bool nonBlock, unsigned long tail, log_mask_t logMask, pid_t pid, log_time start, uint64_t timeout) - : mRefCount(1), - mRelease(false), - mError(false), - threadRunning(false), - leadingDropped(false), + : leadingDropped(false), mReader(reader), mLogMask(logMask), mPid(pid), @@ -52,65 +48,21 @@ LogTimeEntry::LogTimeEntry(LogReader& reader, SocketClient* client, cleanSkip_Locked(); } -void LogTimeEntry::startReader_Locked(void) { +bool LogTimeEntry::startReader_Locked() { pthread_attr_t attr; - threadRunning = true; - if (!pthread_attr_init(&attr)) { if (!pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED)) { if (!pthread_create(&mThread, &attr, LogTimeEntry::threadStart, this)) { pthread_attr_destroy(&attr); - return; + return true; } } pthread_attr_destroy(&attr); } - threadRunning = false; - if (mClient) { - mClient->decRef(); - } - decRef_Locked(); -} -void LogTimeEntry::threadStop(void* obj) { - LogTimeEntry* me = reinterpret_cast(obj); - - wrlock(); - - if (me->mNonBlock) { - me->error_Locked(); - } - - SocketClient* client = me->mClient; - - if (me->isError_Locked()) { - LogReader& reader = me->mReader; - LastLogTimes& times = reader.logbuf().mTimes; - - LastLogTimes::iterator it = times.begin(); - while (it != times.end()) { - if (*it == me) { - times.erase(it); - me->release_nodelete_Locked(); - break; - } - it++; - } - - me->mClient = nullptr; - reader.release(client); - } - - if (client) { - client->decRef(); - } - - me->threadRunning = false; - me->decRef_Locked(); - - unlock(); + return false; } void* LogTimeEntry::threadStart(void* obj) { @@ -118,13 +70,7 @@ void* LogTimeEntry::threadStart(void* obj) { LogTimeEntry* me = reinterpret_cast(obj); - pthread_cleanup_push(threadStop, obj); - SocketClient* client = me->mClient; - if (!client) { - me->error(); - return nullptr; - } LogBuffer& logbuf = me->mReader.logbuf(); @@ -137,14 +83,14 @@ void* LogTimeEntry::threadStart(void* obj) { log_time start = me->mStart; - while (me->threadRunning && !me->isError_Locked()) { + while (!me->mRelease) { if (me->mTimeout.tv_sec || me->mTimeout.tv_nsec) { if (pthread_cond_timedwait(&me->threadTriggeredCondition, ×Lock, &me->mTimeout) == ETIMEDOUT) { me->mTimeout.tv_sec = 0; me->mTimeout.tv_nsec = 0; } - if (!me->threadRunning || me->isError_Locked()) { + if (me->mRelease) { break; } } @@ -162,13 +108,12 @@ void* LogTimeEntry::threadStart(void* obj) { wrlock(); if (start == LogBufferElement::FLUSH_ERROR) { - me->error_Locked(); break; } me->mStart = start + log_time(0, 1); - if (me->mNonBlock || !me->threadRunning || me->isError_Locked()) { + if (me->mNonBlock || me->mRelease) { break; } @@ -179,9 +124,21 @@ void* LogTimeEntry::threadStart(void* obj) { } } - unlock(); + LogReader& reader = me->mReader; + reader.release(client); - pthread_cleanup_pop(true); + client->decRef(); + + LastLogTimes& times = reader.logbuf().mTimes; + auto it = + std::find_if(times.begin(), times.end(), + [&me](const auto& other) { return other.get() == me; }); + + if (it != times.end()) { + times.erase(it); + } + + unlock(); return nullptr; } @@ -247,10 +204,6 @@ int LogTimeEntry::FilterSecondPass(const LogBufferElement* element, void* obj) { goto skip; } - if (me->isError_Locked()) { - goto stop; - } - if (!me->mTail) { goto ok; } diff --git a/logd/LogTimes.h b/logd/LogTimes.h index 76d016c13..f4e165fdc 100644 --- a/logd/LogTimes.h +++ b/logd/LogTimes.h @@ -22,6 +22,7 @@ #include #include +#include #include #include @@ -33,16 +34,12 @@ class LogBufferElement; class LogTimeEntry { static pthread_mutex_t timesLock; - unsigned int mRefCount; - bool mRelease; - bool mError; - bool threadRunning; + bool mRelease = false; bool leadingDropped; pthread_cond_t threadTriggeredCondition; pthread_t mThread; LogReader& mReader; static void* threadStart(void* me); - static void threadStop(void* me); const log_mask_t mLogMask; const pid_t mPid; unsigned int skipAhead[LOG_ID_MAX]; @@ -73,11 +70,8 @@ class LogTimeEntry { pthread_mutex_unlock(×Lock); } - void startReader_Locked(void); + bool startReader_Locked(); - bool runningReader_Locked(void) const { - return threadRunning || mRelease || mError || mNonBlock; - } void triggerReader_Locked(void) { pthread_cond_signal(&threadTriggeredCondition); } @@ -87,54 +81,11 @@ class LogTimeEntry { } void cleanSkip_Locked(void); - // These called after LogTimeEntry removed from list, lock implicitly held - void release_nodelete_Locked(void) { - mRelease = true; - pthread_cond_signal(&threadTriggeredCondition); - // assumes caller code path will call decRef_Locked() - } - void release_Locked(void) { mRelease = true; pthread_cond_signal(&threadTriggeredCondition); - if (mRefCount || threadRunning) { - return; - } - // No one else is holding a reference to this - delete this; } - // Called to mark socket in jeopardy - void error_Locked(void) { - mError = true; - } - void error(void) { - wrlock(); - error_Locked(); - unlock(); - } - - bool isError_Locked(void) const { - return mRelease || mError; - } - - // Mark Used - // Locking implied, grabbed when protection around loop iteration - void incRef_Locked(void) { - ++mRefCount; - } - - bool owned_Locked(void) const { - return mRefCount != 0; - } - - void decRef_Locked(void) { - if ((mRefCount && --mRefCount) || !mRelease || threadRunning) { - return; - } - // No one else is holding a reference to this - delete this; - } bool isWatching(log_id_t id) const { return mLogMask & (1 << id); } @@ -146,6 +97,6 @@ class LogTimeEntry { static int FilterSecondPass(const LogBufferElement* element, void* me); }; -typedef std::list LastLogTimes; +typedef std::list> LastLogTimes; #endif // _LOGD_LOG_TIMES_H__