使用信号量控制进程顺序,规避由于首次索引时间过长导致inotify队列过长问题。

This commit is contained in:
iaom 2022-03-15 13:53:38 +08:00
parent 6eb9acb07a
commit 4e3a9d93c6
9 changed files with 56 additions and 68 deletions

View File

@ -8,6 +8,7 @@
#define DIR_SEARCH_VALUE "1" #define DIR_SEARCH_VALUE "1"
#define LABEL_MAX_WIDTH 300 #define LABEL_MAX_WIDTH 300
#define HOME_PATH QDir::homePath() #define HOME_PATH QDir::homePath()
#define INDEX_SEM "ukui-search-index-sem"
static const QStringList allAppPath = { static const QStringList allAppPath = {
QDir::homePath()+"/.local/share/applications/", QDir::homePath()+"/.local/share/applications/",
"/usr/share/applications/" "/usr/share/applications/"

View File

@ -23,14 +23,23 @@
#include "dir-watcher.h" #include "dir-watcher.h"
#include <QDebug> #include <QDebug>
#define NEW_QUEUE(a) a = new QQueue<QString>(); qDebug("---------------------------%s %s %s new at %d..",__FILE__,__FUNCTION__,#a,__LINE__);
//#define DELETE_QUEUE(a )
using namespace UkuiSearch; using namespace UkuiSearch;
FirstIndex::FirstIndex() { FirstIndex *FirstIndex::m_instance = nullptr;
std::once_flag g_firstIndexInstanceFlag;
FirstIndex::FirstIndex() : m_semaphore(INDEX_SEM, 1, QSystemSemaphore::AccessMode::Open)
{
m_pool.setMaxThreadCount(2); m_pool.setMaxThreadCount(2);
m_pool.setExpiryTimeout(100); m_pool.setExpiryTimeout(100);
} }
FirstIndex *FirstIndex::getInstance()
{
std::call_once(g_firstIndexInstanceFlag, [] () {
m_instance = new FirstIndex;
});
return m_instance;
}
FirstIndex::~FirstIndex() { FirstIndex::~FirstIndex() {
qDebug() << "~FirstIndex"; qDebug() << "~FirstIndex";
if(this->q_index) if(this->q_index)
@ -127,17 +136,6 @@ void FirstIndex::run() {
this->q_content_index = new QQueue<QPair<QString,qint64>>(); this->q_content_index = new QQueue<QPair<QString,qint64>>();
this->m_ocr_index = new QQueue<QPair<QString,qint64>>(); this->m_ocr_index = new QQueue<QPair<QString,qint64>>();
int fifo_fd;
char buffer[2];
memset(buffer, 0, sizeof(buffer));
buffer[0] = 0x1;
buffer[1] = '\0';
fifo_fd = open(UKUI_SEARCH_PIPE_PATH, O_RDWR);
if(fifo_fd == -1) {
perror("open fifo error\n");
assert(false);
}
++FileUtils::indexStatus; ++FileUtils::indexStatus;
pid_t pid; pid_t pid;
pid = fork(); pid = fork();
@ -161,23 +159,18 @@ void FirstIndex::run() {
QMutex mutex1, mutex2, mutex3; QMutex mutex1, mutex2, mutex3;
mutex1.lock(); mutex1.lock();
mutex2.lock(); mutex2.lock();
mutex3.lock();
sem.acquire(4);
sem.acquire(1);
mutex1.unlock();
qInfo() << "index dir" << DirWatcher::getDirWatcher()->currentindexableDir(); qInfo() << "index dir" << DirWatcher::getDirWatcher()->currentindexableDir();
qInfo() << "index block dir" << DirWatcher::getDirWatcher()->currentBlackListOfIndex(); qInfo() << "index block dir" << DirWatcher::getDirWatcher()->currentBlackListOfIndex();
setPath(DirWatcher::getDirWatcher()->currentindexableDir()); setPath(DirWatcher::getDirWatcher()->currentindexableDir());
setBlockPath(DirWatcher::getDirWatcher()->currentBlackListOfIndex()); setBlockPath(DirWatcher::getDirWatcher()->currentBlackListOfIndex());
Traverse(); this->Traverse();
FileUtils::_max_index_count = this->q_index->length(); FileUtils::_max_index_count = this->q_index->length();
qDebug() << "max_index_count:" << FileUtils::_max_index_count; qDebug() << "max_index_count:" << FileUtils::_max_index_count;
sem.release(5);
QtConcurrent::run(&m_pool, [&]() { QtConcurrent::run(&m_pool, [&]() {
sem.acquire(2); sem.acquire(2);
mutex2.unlock(); mutex1.unlock();
qDebug() << "index start;"; qDebug() << "index start;";
QQueue<QVector<QString>>* tmp1 = new QQueue<QVector<QString>>(); QQueue<QVector<QString>>* tmp1 = new QQueue<QVector<QString>>();
while(!this->q_index->empty()) { while(!this->q_index->empty()) {
@ -193,7 +186,7 @@ void FirstIndex::run() {
}); });
QtConcurrent::run(&m_pool,[&]() { QtConcurrent::run(&m_pool,[&]() {
sem.acquire(2); sem.acquire(2);
mutex3.unlock(); mutex2.unlock();
QQueue<QString>* tmp2 = new QQueue<QString>(); QQueue<QString>* tmp2 = new QQueue<QString>();
qDebug() << "q_content_index:" << q_content_index->size(); qDebug() << "q_content_index:" << q_content_index->size();
while(!this->q_content_index->empty()) { while(!this->q_content_index->empty()) {
@ -220,9 +213,10 @@ void FirstIndex::run() {
qDebug() << "content index end;"; qDebug() << "content index end;";
sem.release(2); sem.release(2);
}); });
//OCR功能暂时屏蔽 // OCR功能暂时屏蔽
// QtConcurrent::run(&m_pool,[&]() { // QtConcurrent::run(&m_pool,[&]() {
// sem.acquire(5); // sem.acquire(5);
// mutex3.unlock();
// QQueue<QString>* tmpOcr = new QQueue<QString>(); // QQueue<QString>* tmpOcr = new QQueue<QString>();
// qDebug() << "m_ocr_index:" << m_ocr_index->size(); // qDebug() << "m_ocr_index:" << m_ocr_index->size();
// while(!this->m_ocr_index->empty()) { // while(!this->m_ocr_index->empty()) {
@ -250,11 +244,11 @@ void FirstIndex::run() {
// }); // });
mutex1.lock(); mutex1.lock();
mutex2.lock(); mutex2.lock();
mutex3.lock(); // mutex3.lock();
sem.acquire(5); sem.acquire(5);
mutex1.unlock(); mutex1.unlock();
mutex2.unlock(); mutex2.unlock();
mutex3.unlock(); // mutex3.unlock();
if(this->q_index) if(this->q_index)
delete this->q_index; delete this->q_index;
@ -279,12 +273,13 @@ void FirstIndex::run() {
--FileUtils::indexStatus; --FileUtils::indexStatus;
} }
m_semaphore.release(1);
IndexStatusRecorder::getInstance()->setStatus(INOTIFY_NORMAL_EXIT, "2"); IndexStatusRecorder::getInstance()->setStatus(INOTIFY_NORMAL_EXIT, "2");
int retval1 = write(fifo_fd, buffer, strlen(buffer)); // int retval1 = write(fifo_fd, buffer, strlen(buffer));
if(retval1 == -1) { // if(retval1 == -1) {
qWarning("write error\n"); // qWarning("write error\n");
} // }
qDebug("write data ok!\n"); // qDebug("write data ok!\n");
QTime t2 = QTime::currentTime(); QTime t2 = QTime::currentTime();
qWarning() << t1; qWarning() << t1;
qWarning() << t2; qWarning() << t2;

View File

@ -25,6 +25,7 @@
#include <QtConcurrent/QtConcurrent> #include <QtConcurrent/QtConcurrent>
#include <signal.h> #include <signal.h>
#include <QSemaphore> #include <QSemaphore>
#include <QSystemSemaphore>
#include<sys/types.h> #include<sys/types.h>
#include <stdio.h> #include <stdio.h>
#include <unistd.h> #include <unistd.h>
@ -43,14 +44,16 @@
namespace UkuiSearch { namespace UkuiSearch {
class FirstIndex : public QThread, public Traverse_BFS { class FirstIndex : public QThread, public Traverse_BFS {
public: public:
FirstIndex(); static FirstIndex* getInstance();
~FirstIndex(); ~FirstIndex();
virtual void DoSomething(const QFileInfo &) final; virtual void DoSomething(const QFileInfo &) final;
protected: protected:
void run() override; void run() override;
private: private:
FirstIndex();
FirstIndex(const FirstIndex&) = delete; FirstIndex(const FirstIndex&) = delete;
void operator=(const FirstIndex&) = delete; void operator=(const FirstIndex&) = delete;
static FirstIndex *m_instance;
bool bool_dataBaseStatusOK = false; bool bool_dataBaseStatusOK = false;
bool bool_dataBaseExist = false; bool bool_dataBaseExist = false;
IndexGenerator* p_indexGenerator = nullptr; IndexGenerator* p_indexGenerator = nullptr;
@ -64,6 +67,8 @@ private:
QQueue<QPair<QString,qint64>>* m_ocr_index; QQueue<QPair<QString,qint64>>* m_ocr_index;
//xapian will auto commit per 10,000 changes, donnot change it!!! //xapian will auto commit per 10,000 changes, donnot change it!!!
const size_t u_send_length = 8192; const size_t u_send_length = 8192;
QSystemSemaphore m_semaphore;
}; };
} }

View File

@ -175,28 +175,6 @@ void InotifyWatch::run()
setBlockPath(DirWatcher::getDirWatcher()->currentBlackListOfIndex()); setBlockPath(DirWatcher::getDirWatcher()->currentBlackListOfIndex());
firstTraverse(); firstTraverse();
int fifo_fd;
char buffer[2];
memset(buffer, 0, sizeof(buffer));
fifo_fd = open(UKUI_SEARCH_PIPE_PATH, O_RDWR);
if(fifo_fd == -1) {
qWarning() << "Open fifo error\n";
assert(false);
}
int retval = read(fifo_fd, buffer, sizeof(buffer));
if(retval == -1) {
qWarning() << "read error\n";
assert(false);
}
qDebug("Read fifo[%s]", buffer);
qDebug("Read data ok");
close(fifo_fd);
if(buffer[0] & 0x1) {
qDebug("Data confirmed\n");
}
unlink(UKUI_SEARCH_PIPE_PATH);
while(FileUtils::SearchMethod::INDEXSEARCH == FileUtils::searchMethod) { while(FileUtils::SearchMethod::INDEXSEARCH == FileUtils::searchMethod) {
fd_set fds; fd_set fds;
FD_ZERO(&fds); FD_ZERO(&fds);

View File

@ -2,17 +2,20 @@
#define INOTIFYWATCH_H #define INOTIFYWATCH_H
#include <QThread> #include <QThread>
#include <unistd.h> #include <QBuffer>
#include <sys/inotify.h>
#include <QSocketNotifier> #include <QSocketNotifier>
#include <QDataStream> #include <QDataStream>
#include <QSharedMemory> #include <QSharedMemory>
#include <sys/prctl.h>
#include <sys/wait.h>
#include <sys/inotify.h>
#include <unistd.h>
#include "traverse_bfs.h" #include "traverse_bfs.h"
#include "ukui-search-qdbus.h" #include "ukui-search-qdbus.h"
#include "index-status-recorder.h" #include "index-status-recorder.h"
#include "file-utils.h" #include "file-utils.h"
#include "first-index.h"
#include "pending-file-queue.h" #include "pending-file-queue.h"
#include "common.h" #include "common.h"
namespace UkuiSearch { namespace UkuiSearch {
@ -21,7 +24,6 @@ class InotifyWatch : public QThread, public Traverse_BFS
Q_OBJECT Q_OBJECT
public: public:
static InotifyWatch* getInstance(); static InotifyWatch* getInstance();
bool addWatch(const QString &path); bool addWatch(const QString &path);
bool removeWatch(const QString &path, bool removeFromDatabase = true); bool removeWatch(const QString &path, bool removeFromDatabase = true);
virtual void DoSomething(const QFileInfo &info) final; virtual void DoSomething(const QFileInfo &info) final;

View File

@ -22,7 +22,7 @@
#include <malloc.h> #include <malloc.h>
using namespace UkuiSearch; using namespace UkuiSearch;
static PendingFileQueue *global_instance_pending_file_queue = nullptr; static PendingFileQueue *global_instance_pending_file_queue = nullptr;
PendingFileQueue::PendingFileQueue(QObject *parent) : QThread(parent) PendingFileQueue::PendingFileQueue(QObject *parent) : QThread(parent), m_semaphore(INDEX_SEM, 0, QSystemSemaphore::AccessMode::Open)
{ {
this->start(); this->start();
@ -72,10 +72,11 @@ PendingFileQueue::~PendingFileQueue()
void PendingFileQueue::forceFinish() void PendingFileQueue::forceFinish()
{ {
QThread::msleep(600);
Q_EMIT timerStop(); Q_EMIT timerStop();
this->quit(); this->quit();
this->wait(); this->wait();
processCache();
m_semaphore.release(1);
} }
void PendingFileQueue::enqueue(const PendingFile &file) void PendingFileQueue::enqueue(const PendingFile &file)
{ {
@ -128,6 +129,8 @@ void PendingFileQueue::enqueue(const PendingFile &file)
void PendingFileQueue::run() void PendingFileQueue::run()
{ {
//阻塞线程直到first-index进程结束
m_semaphore.acquire();
exec(); exec();
} }

View File

@ -25,6 +25,7 @@
#include <QTimer> #include <QTimer>
#include <QThread> #include <QThread>
#include <QMutex> #include <QMutex>
#include <QSystemSemaphore>
#include "pending-file.h" #include "pending-file.h"
#include "index-generator.h" #include "index-generator.h"
@ -57,6 +58,7 @@ private:
QVector<PendingFile> m_pendingFiles; QVector<PendingFile> m_pendingFiles;
QMutex m_mutex; QMutex m_mutex;
QMutex m_timeoutMutex; QMutex m_timeoutMutex;
QSystemSemaphore m_semaphore;
QThread *m_timerThread = nullptr; QThread *m_timerThread = nullptr;
bool m_timeout = false; bool m_timeout = false;

View File

@ -1,12 +1,11 @@
#include "search-method-manager.h" #include "search-method-manager.h"
#include "dir-watcher.h"
using namespace UkuiSearch; using namespace UkuiSearch;
static SearchMethodManager* global_instance = nullptr; static SearchMethodManager* global_instance = nullptr;
SearchMethodManager::SearchMethodManager() : m_semaphore(INDEX_SEM, 1, QSystemSemaphore::AccessMode::Create)
SearchMethodManager::SearchMethodManager()
{ {
qDebug() << m_semaphore.errorString();
m_fi = FirstIndex::getInstance();
m_iw = InotifyWatch::getInstance(); m_iw = InotifyWatch::getInstance();
} }
SearchMethodManager *SearchMethodManager::getInstance() SearchMethodManager *SearchMethodManager::getInstance()
@ -41,7 +40,8 @@ void SearchMethodManager::searchMethod(FileUtils::SearchMethod sm) {
} }
qDebug() << "create fifo success\n"; qDebug() << "create fifo success\n";
qWarning() << "start first index"; qWarning() << "start first index";
m_fi.start(); m_semaphore.acquire();
m_fi->start();
qWarning() << "start inotify index"; qWarning() << "start inotify index";
// InotifyIndex ii("/home"); // InotifyIndex ii("/home");
// ii.start(); // ii.start();

View File

@ -1,6 +1,7 @@
#ifndef SEARCHMETHODMANAGER_H #ifndef SEARCHMETHODMANAGER_H
#define SEARCHMETHODMANAGER_H #define SEARCHMETHODMANAGER_H
#include <QSystemSemaphore>
#include "first-index.h" #include "first-index.h"
//#include "inotify-index.h" //#include "inotify-index.h"
#include "inotify-watch.h" #include "inotify-watch.h"
@ -11,9 +12,10 @@ public:
void searchMethod(FileUtils::SearchMethod sm); void searchMethod(FileUtils::SearchMethod sm);
private: private:
SearchMethodManager(); SearchMethodManager();
FirstIndex m_fi; FirstIndex *m_fi;
// InotifyIndex* m_ii; // InotifyIndex* m_ii;
InotifyWatch *m_iw = nullptr; InotifyWatch *m_iw = nullptr;
QSystemSemaphore m_semaphore;
}; };
} }