Add a simple work queue abstraction.
Makes it easy to schedule a bunch of work to happen in parallel. Change-Id: Id9c0e52fc8b6d78d2b9ed4c2ee47abce0a01775c
This commit is contained in:
parent
9a0a76df1e
commit
27e6eaae87
|
@ -73,6 +73,7 @@ extern void androidSetCreateThreadFunc(android_create_thread_fn func);
|
|||
// Get pid for the current thread.
|
||||
extern pid_t androidGetTid();
|
||||
|
||||
#ifdef HAVE_ANDROID_OS
|
||||
// Change the scheduling group of a particular thread. The group
|
||||
// should be one of the ANDROID_TGROUP constants. Returns BAD_VALUE if
|
||||
// grp is out of range, else another non-zero value with errno set if
|
||||
|
@ -95,6 +96,7 @@ extern int androidGetThreadPriority(pid_t tid);
|
|||
// scheduling groups are disabled. Returns INVALID_OPERATION if unexpected error.
|
||||
// Thread ID zero means current thread.
|
||||
extern int androidGetThreadSchedulingGroup(pid_t tid);
|
||||
#endif
|
||||
|
||||
#ifdef __cplusplus
|
||||
} // extern "C"
|
||||
|
|
|
@ -0,0 +1,119 @@
|
|||
/*]
|
||||
* Copyright (C) 2012 The Android Open Source Project
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#ifndef _LIBS_UTILS_WORK_QUEUE_H
|
||||
#define _LIBS_UTILS_WORK_QUEUE_H
|
||||
|
||||
#include <utils/Errors.h>
|
||||
#include <utils/Vector.h>
|
||||
#include <utils/threads.h>
|
||||
|
||||
namespace android {
|
||||
|
||||
/*
|
||||
* A threaded work queue.
|
||||
*
|
||||
* This class is designed to make it easy to run a bunch of isolated work
|
||||
* units in parallel, using up to the specified number of threads.
|
||||
* To use it, write a loop to post work units to the work queue, then synchronize
|
||||
* on the queue at the end.
|
||||
*/
|
||||
class WorkQueue {
|
||||
public:
|
||||
class WorkUnit {
|
||||
public:
|
||||
WorkUnit() { }
|
||||
virtual ~WorkUnit() { }
|
||||
|
||||
/*
|
||||
* Runs the work unit.
|
||||
* If the result is 'true' then the work queue continues scheduling work as usual.
|
||||
* If the result is 'false' then the work queue is canceled.
|
||||
*/
|
||||
virtual bool run() = 0;
|
||||
};
|
||||
|
||||
/* Creates a work queue with the specified maximum number of work threads. */
|
||||
WorkQueue(size_t maxThreads, bool canCallJava = true);
|
||||
|
||||
/* Destroys the work queue.
|
||||
* Cancels pending work and waits for all remaining threads to complete.
|
||||
*/
|
||||
~WorkQueue();
|
||||
|
||||
/* Posts a work unit to run later.
|
||||
* If the work queue has been canceled or is already finished, returns INVALID_OPERATION
|
||||
* and does not take ownership of the work unit (caller must destroy it itself).
|
||||
* Otherwise, returns OK and takes ownership of the work unit (the work queue will
|
||||
* destroy it automatically).
|
||||
*
|
||||
* For flow control, this method blocks when the size of the pending work queue is more
|
||||
* 'backlog' times the number of threads. This condition reduces the rate of entry into
|
||||
* the pending work queue and prevents it from growing much more rapidly than the
|
||||
* work threads can actually handle.
|
||||
*
|
||||
* If 'backlog' is 0, then no throttle is applied.
|
||||
*/
|
||||
status_t schedule(WorkUnit* workUnit, size_t backlog = 2);
|
||||
|
||||
/* Cancels all pending work.
|
||||
* If the work queue is already finished, returns INVALID_OPERATION.
|
||||
* If the work queue is already canceled, returns OK and does nothing else.
|
||||
* Otherwise, returns OK, discards all pending work units and prevents additional
|
||||
* work units from being scheduled.
|
||||
*
|
||||
* Call finish() after cancel() to wait for all remaining work to complete.
|
||||
*/
|
||||
status_t cancel();
|
||||
|
||||
/* Waits for all work to complete.
|
||||
* If the work queue is already finished, returns INVALID_OPERATION.
|
||||
* Otherwise, waits for all work to complete and returns OK.
|
||||
*/
|
||||
status_t finish();
|
||||
|
||||
private:
|
||||
class WorkThread : public Thread {
|
||||
public:
|
||||
WorkThread(WorkQueue* workQueue, bool canCallJava);
|
||||
virtual ~WorkThread();
|
||||
|
||||
private:
|
||||
virtual bool threadLoop();
|
||||
|
||||
WorkQueue* const mWorkQueue;
|
||||
};
|
||||
|
||||
status_t cancelLocked();
|
||||
bool threadLoop(); // called from each work thread
|
||||
|
||||
const size_t mMaxThreads;
|
||||
const bool mCanCallJava;
|
||||
|
||||
Mutex mLock;
|
||||
Condition mWorkChangedCondition;
|
||||
Condition mWorkDequeuedCondition;
|
||||
|
||||
bool mCanceled;
|
||||
bool mFinished;
|
||||
size_t mIdleThreads;
|
||||
Vector<sp<WorkThread> > mWorkThreads;
|
||||
Vector<WorkUnit*> mWorkUnits;
|
||||
};
|
||||
|
||||
}; // namespace android
|
||||
|
||||
#endif // _LIBS_UTILS_WORK_QUEUE_H
|
|
@ -41,6 +41,7 @@ commonSources:= \
|
|||
Tokenizer.cpp \
|
||||
Unicode.cpp \
|
||||
VectorImpl.cpp \
|
||||
WorkQueue.cpp \
|
||||
misc.cpp
|
||||
|
||||
host_commonCflags := -DLIBUTILS_NATIVE=1 $(TOOL_CFLAGS)
|
||||
|
|
|
@ -323,6 +323,7 @@ pid_t androidGetTid()
|
|||
#endif
|
||||
}
|
||||
|
||||
#ifdef HAVE_ANDROID_OS
|
||||
int androidSetThreadSchedulingGroup(pid_t tid, int grp)
|
||||
{
|
||||
if (grp > ANDROID_TGROUP_MAX || grp < 0) {
|
||||
|
@ -425,6 +426,7 @@ int androidGetThreadSchedulingGroup(pid_t tid)
|
|||
|
||||
return ret;
|
||||
}
|
||||
#endif
|
||||
|
||||
namespace android {
|
||||
|
||||
|
|
|
@ -0,0 +1,171 @@
|
|||
/*
|
||||
* Copyright (C) 2012 The Android Open Source Project
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
// #define LOG_NDEBUG 0
|
||||
#define LOG_TAG "WorkQueue"
|
||||
|
||||
#include <utils/Log.h>
|
||||
#include <utils/WorkQueue.h>
|
||||
|
||||
namespace android {
|
||||
|
||||
// --- WorkQueue ---
|
||||
|
||||
WorkQueue::WorkQueue(size_t maxThreads, bool canCallJava) :
|
||||
mMaxThreads(maxThreads), mCanCallJava(canCallJava),
|
||||
mCanceled(false), mFinished(false), mIdleThreads(0) {
|
||||
}
|
||||
|
||||
WorkQueue::~WorkQueue() {
|
||||
if (!cancel()) {
|
||||
finish();
|
||||
}
|
||||
}
|
||||
|
||||
status_t WorkQueue::schedule(WorkUnit* workUnit, size_t backlog) {
|
||||
AutoMutex _l(mLock);
|
||||
|
||||
if (mFinished || mCanceled) {
|
||||
return INVALID_OPERATION;
|
||||
}
|
||||
|
||||
if (mWorkThreads.size() < mMaxThreads
|
||||
&& mIdleThreads < mWorkUnits.size() + 1) {
|
||||
sp<WorkThread> workThread = new WorkThread(this, mCanCallJava);
|
||||
status_t status = workThread->run("WorkQueue::WorkThread");
|
||||
if (status) {
|
||||
return status;
|
||||
}
|
||||
mWorkThreads.add(workThread);
|
||||
mIdleThreads += 1;
|
||||
} else if (backlog) {
|
||||
while (mWorkUnits.size() >= mMaxThreads * backlog) {
|
||||
mWorkDequeuedCondition.wait(mLock);
|
||||
if (mFinished || mCanceled) {
|
||||
return INVALID_OPERATION;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
mWorkUnits.add(workUnit);
|
||||
mWorkChangedCondition.broadcast();
|
||||
return OK;
|
||||
}
|
||||
|
||||
status_t WorkQueue::cancel() {
|
||||
AutoMutex _l(mLock);
|
||||
|
||||
return cancelLocked();
|
||||
}
|
||||
|
||||
status_t WorkQueue::cancelLocked() {
|
||||
if (mFinished) {
|
||||
return INVALID_OPERATION;
|
||||
}
|
||||
|
||||
if (!mCanceled) {
|
||||
mCanceled = true;
|
||||
|
||||
size_t count = mWorkUnits.size();
|
||||
for (size_t i = 0; i < count; i++) {
|
||||
delete mWorkUnits.itemAt(i);
|
||||
}
|
||||
mWorkUnits.clear();
|
||||
mWorkChangedCondition.broadcast();
|
||||
mWorkDequeuedCondition.broadcast();
|
||||
}
|
||||
return OK;
|
||||
}
|
||||
|
||||
status_t WorkQueue::finish() {
|
||||
{ // acquire lock
|
||||
AutoMutex _l(mLock);
|
||||
|
||||
if (mFinished) {
|
||||
return INVALID_OPERATION;
|
||||
}
|
||||
|
||||
mFinished = true;
|
||||
mWorkChangedCondition.broadcast();
|
||||
} // release lock
|
||||
|
||||
// It is not possible for the list of work threads to change once the mFinished
|
||||
// flag has been set, so we can access mWorkThreads outside of the lock here.
|
||||
size_t count = mWorkThreads.size();
|
||||
for (size_t i = 0; i < count; i++) {
|
||||
mWorkThreads.itemAt(i)->join();
|
||||
}
|
||||
mWorkThreads.clear();
|
||||
return OK;
|
||||
}
|
||||
|
||||
bool WorkQueue::threadLoop() {
|
||||
WorkUnit* workUnit;
|
||||
{ // acquire lock
|
||||
AutoMutex _l(mLock);
|
||||
|
||||
for (;;) {
|
||||
if (mCanceled) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!mWorkUnits.isEmpty()) {
|
||||
workUnit = mWorkUnits.itemAt(0);
|
||||
mWorkUnits.removeAt(0);
|
||||
mIdleThreads -= 1;
|
||||
mWorkDequeuedCondition.broadcast();
|
||||
break;
|
||||
}
|
||||
|
||||
if (mFinished) {
|
||||
return false;
|
||||
}
|
||||
|
||||
mWorkChangedCondition.wait(mLock);
|
||||
}
|
||||
} // release lock
|
||||
|
||||
bool shouldContinue = workUnit->run();
|
||||
delete workUnit;
|
||||
|
||||
{ // acquire lock
|
||||
AutoMutex _l(mLock);
|
||||
|
||||
mIdleThreads += 1;
|
||||
|
||||
if (!shouldContinue) {
|
||||
cancelLocked();
|
||||
return false;
|
||||
}
|
||||
} // release lock
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
// --- WorkQueue::WorkThread ---
|
||||
|
||||
WorkQueue::WorkThread::WorkThread(WorkQueue* workQueue, bool canCallJava) :
|
||||
Thread(canCallJava), mWorkQueue(workQueue) {
|
||||
}
|
||||
|
||||
WorkQueue::WorkThread::~WorkThread() {
|
||||
}
|
||||
|
||||
bool WorkQueue::WorkThread::threadLoop() {
|
||||
return mWorkQueue->threadLoop();
|
||||
}
|
||||
|
||||
}; // namespace android
|
Loading…
Reference in New Issue