265 lines
7.2 KiB
C++
265 lines
7.2 KiB
C++
/*
|
|
* s3fs - FUSE-based file system backed by Amazon S3
|
|
*
|
|
* Copyright(C) 2007 Takeshi Nakatani <ggtakec.com>
|
|
*
|
|
* This program is free software; you can redistribute it and/or
|
|
* modify it under the terms of the GNU General Public License
|
|
* as published by the Free Software Foundation; either version 2
|
|
* of the License, or (at your option) any later version.
|
|
*
|
|
* This program is distributed in the hope that it will be useful,
|
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
* GNU General Public License for more details.
|
|
*
|
|
* You should have received a copy of the GNU General Public License
|
|
* along with this program; if not, write to the Free Software
|
|
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
|
*/
|
|
|
|
#include <cerrno>
|
|
#include <cstdint>
|
|
#include <cstdio>
|
|
#include <cstdlib>
|
|
|
|
#include "s3fs_logger.h"
|
|
#include "threadpoolman.h"
|
|
#include "autolock.h"
|
|
|
|
//------------------------------------------------
|
|
// ThreadPoolMan class variables
|
|
//------------------------------------------------
|
|
ThreadPoolMan* ThreadPoolMan::singleton = nullptr;
|
|
|
|
//------------------------------------------------
|
|
// ThreadPoolMan class methods
|
|
//------------------------------------------------
|
|
bool ThreadPoolMan::Initialize(int count)
|
|
{
|
|
if(ThreadPoolMan::singleton){
|
|
S3FS_PRN_WARN("Already singleton for Thread Manager is existed, then re-create it.");
|
|
ThreadPoolMan::Destroy();
|
|
}
|
|
ThreadPoolMan::singleton = new ThreadPoolMan(count);
|
|
return true;
|
|
}
|
|
|
|
void ThreadPoolMan::Destroy()
|
|
{
|
|
if(ThreadPoolMan::singleton){
|
|
delete ThreadPoolMan::singleton;
|
|
ThreadPoolMan::singleton = nullptr;
|
|
}
|
|
}
|
|
|
|
bool ThreadPoolMan::Instruct(std::unique_ptr<thpoolman_param> pparam)
|
|
{
|
|
if(!ThreadPoolMan::singleton){
|
|
S3FS_PRN_WARN("The singleton object is not initialized yet.");
|
|
return false;
|
|
}
|
|
return ThreadPoolMan::singleton->SetInstruction(std::move(pparam));
|
|
}
|
|
|
|
//
|
|
// Thread worker
|
|
//
|
|
void* ThreadPoolMan::Worker(void* arg)
|
|
{
|
|
ThreadPoolMan* psingleton = static_cast<ThreadPoolMan*>(arg);
|
|
|
|
if(!psingleton){
|
|
S3FS_PRN_ERR("The parameter for worker thread is invalid.");
|
|
return reinterpret_cast<void*>(-EIO);
|
|
}
|
|
S3FS_PRN_INFO3("Start worker thread in ThreadPoolMan.");
|
|
|
|
while(!psingleton->IsExit()){
|
|
// wait
|
|
psingleton->thpoolman_sem.wait();
|
|
|
|
if(psingleton->IsExit()){
|
|
break;
|
|
}
|
|
|
|
// get instruction
|
|
std::unique_ptr<thpoolman_param> pparam;
|
|
{
|
|
AutoLock auto_lock(&(psingleton->thread_list_lock));
|
|
|
|
if(!psingleton->instruction_list.empty()){
|
|
pparam = std::move(psingleton->instruction_list.front());
|
|
psingleton->instruction_list.pop_front();
|
|
if(!pparam){
|
|
S3FS_PRN_WARN("Got a semaphore, but the instruction is empty.");
|
|
}
|
|
}else{
|
|
S3FS_PRN_WARN("Got a semaphore, but there is no instruction.");
|
|
pparam = nullptr;
|
|
}
|
|
}
|
|
|
|
if(pparam){
|
|
void* retval = pparam->pfunc(pparam->args);
|
|
if(nullptr != retval){
|
|
S3FS_PRN_WARN("The instruction function returned with somthign error code(%ld).", reinterpret_cast<long>(retval));
|
|
}
|
|
if(pparam->psem){
|
|
pparam->psem->post();
|
|
}
|
|
}
|
|
}
|
|
|
|
return nullptr;
|
|
}
|
|
|
|
//------------------------------------------------
|
|
// ThreadPoolMan methods
|
|
//------------------------------------------------
|
|
ThreadPoolMan::ThreadPoolMan(int count) : is_exit(false), thpoolman_sem(0), is_lock_init(false)
|
|
{
|
|
if(count < 1){
|
|
S3FS_PRN_CRIT("Failed to creating singleton for Thread Manager, because thread count(%d) is under 1.", count);
|
|
abort();
|
|
}
|
|
if(ThreadPoolMan::singleton){
|
|
S3FS_PRN_CRIT("Already singleton for Thread Manager is existed.");
|
|
abort();
|
|
}
|
|
|
|
pthread_mutexattr_t attr;
|
|
pthread_mutexattr_init(&attr);
|
|
#if S3FS_PTHREAD_ERRORCHECK
|
|
pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_ERRORCHECK);
|
|
#endif
|
|
|
|
int result;
|
|
if(0 != (result = pthread_mutex_init(&thread_list_lock, &attr))){
|
|
S3FS_PRN_CRIT("failed to init thread_list_lock: %d", result);
|
|
abort();
|
|
}
|
|
is_lock_init = true;
|
|
|
|
// create threads
|
|
if(!StartThreads(count)){
|
|
S3FS_PRN_ERR("Failed starting threads at initializing.");
|
|
abort();
|
|
}
|
|
}
|
|
|
|
ThreadPoolMan::~ThreadPoolMan()
|
|
{
|
|
StopThreads();
|
|
|
|
if(is_lock_init){
|
|
int result;
|
|
if(0 != (result = pthread_mutex_destroy(&thread_list_lock))){
|
|
S3FS_PRN_CRIT("failed to destroy thread_list_lock: %d", result);
|
|
abort();
|
|
}
|
|
is_lock_init = false;
|
|
}
|
|
}
|
|
|
|
bool ThreadPoolMan::IsExit() const
|
|
{
|
|
return is_exit;
|
|
}
|
|
|
|
void ThreadPoolMan::SetExitFlag(bool exit_flag)
|
|
{
|
|
is_exit = exit_flag;
|
|
}
|
|
|
|
bool ThreadPoolMan::StopThreads()
|
|
{
|
|
if(thread_list.empty()){
|
|
S3FS_PRN_INFO("Any threads are running now, then nothing to do.");
|
|
return true;
|
|
}
|
|
|
|
// all threads to exit
|
|
SetExitFlag(true);
|
|
for(size_t waitcnt = thread_list.size(); 0 < waitcnt; --waitcnt){
|
|
thpoolman_sem.post();
|
|
}
|
|
|
|
// wait for threads exiting
|
|
for(thread_list_t::const_iterator iter = thread_list.begin(); iter != thread_list.end(); ++iter){
|
|
void* retval = nullptr;
|
|
int result = pthread_join(*iter, &retval);
|
|
if(result){
|
|
S3FS_PRN_ERR("failed pthread_join - result(%d)", result);
|
|
}else{
|
|
S3FS_PRN_DBG("succeed pthread_join - return code(%ld)", reinterpret_cast<long>(retval));
|
|
}
|
|
}
|
|
thread_list.clear();
|
|
|
|
// reset semaphore(to zero)
|
|
while(thpoolman_sem.try_wait()){
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
bool ThreadPoolMan::StartThreads(int count)
|
|
{
|
|
if(count < 1){
|
|
S3FS_PRN_ERR("Failed to creating threads, because thread count(%d) is under 1.", count);
|
|
return false;
|
|
}
|
|
|
|
// stop all thread if they are running.
|
|
// cppcheck-suppress unmatchedSuppression
|
|
// cppcheck-suppress knownConditionTrueFalse
|
|
if(!StopThreads()){
|
|
S3FS_PRN_ERR("Failed to stop existed threads.");
|
|
return false;
|
|
}
|
|
|
|
// create all threads
|
|
SetExitFlag(false);
|
|
for(int cnt = 0; cnt < count; ++cnt){
|
|
// run thread
|
|
pthread_t thread;
|
|
int result;
|
|
if(0 != (result = pthread_create(&thread, nullptr, ThreadPoolMan::Worker, static_cast<void*>(this)))){
|
|
S3FS_PRN_ERR("failed pthread_create with return code(%d)", result);
|
|
StopThreads(); // if possible, stop all threads
|
|
return false;
|
|
}
|
|
thread_list.push_back(thread);
|
|
}
|
|
return true;
|
|
}
|
|
|
|
bool ThreadPoolMan::SetInstruction(std::unique_ptr<thpoolman_param> pparam)
|
|
{
|
|
if(!pparam){
|
|
S3FS_PRN_ERR("The parameter value is nullptr.");
|
|
return false;
|
|
}
|
|
|
|
// set parameter to list
|
|
{
|
|
AutoLock auto_lock(&thread_list_lock);
|
|
instruction_list.push_back(std::move(pparam));
|
|
}
|
|
|
|
// run thread
|
|
thpoolman_sem.post();
|
|
|
|
return true;
|
|
}
|
|
|
|
/*
|
|
* Local variables:
|
|
* tab-width: 4
|
|
* c-basic-offset: 4
|
|
* End:
|
|
* vim600: expandtab sw=4 ts=4 fdm=marker
|
|
* vim<600: expandtab sw=4 ts=4
|
|
*/
|