Optimized pending file queue process logic.
This commit is contained in:
parent
b9da7ad708
commit
d4939e118d
|
@ -455,26 +455,26 @@ bool IndexGenerator::deleteAllIndex(QStringList *pathlist) {
|
|||
QStringList *list = pathlist;
|
||||
if(list->isEmpty())
|
||||
return true;
|
||||
try {
|
||||
for(int i = 0; i < list->size(); i++) {
|
||||
QString doc = list->at(i);
|
||||
std::string uniqueterm = FileUtils::makeDocUterm(doc);
|
||||
try {
|
||||
qDebug() << "--delete start--";
|
||||
m_database_path->delete_document(uniqueterm);
|
||||
m_database_content->delete_document(uniqueterm);
|
||||
qDebug() << "delete path" << doc;
|
||||
qDebug() << "delete md5" << QString::fromStdString(uniqueterm);
|
||||
qDebug() << "--delete finish--";
|
||||
// qDebug()<<"m_database_path->get_lastdocid()!!!"<<m_database_path->get_lastdocid();
|
||||
// qDebug()<<"m_database_path->get_doccount()!!!"<<m_database_path->get_doccount();
|
||||
}
|
||||
m_database_path->commit();
|
||||
m_database_content->commit();
|
||||
qDebug() << "--delete finish--";
|
||||
// qDebug()<<"m_database_path->get_lastdocid()!!!"<<m_database_path->get_lastdocid();
|
||||
|
||||
// qDebug()<<"m_database_path->get_doccount()!!!"<<m_database_path->get_doccount();
|
||||
} catch(const Xapian::Error &e) {
|
||||
qWarning() << QString::fromStdString(e.get_description());
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
Q_EMIT this->transactionFinished();
|
||||
return true;
|
||||
}
|
||||
|
|
|
@ -198,6 +198,11 @@ void InotifyWatch::run()
|
|||
slotEvent(buf, len);
|
||||
free(buf);
|
||||
}
|
||||
} else if(rc < 0) {
|
||||
// error
|
||||
qWarning() << "select result < 0, error!";
|
||||
IndexStatusRecorder::getInstance()->setStatus(INOTIFY_NORMAL_EXIT, "1");
|
||||
assert(false);
|
||||
}
|
||||
}
|
||||
if(FileUtils::SearchMethod::DIRECTSEARCH == FileUtils::searchMethod) {
|
||||
|
@ -220,7 +225,6 @@ void InotifyWatch::slotEvent(char *buf, ssize_t len)
|
|||
if(pid == 0) {
|
||||
prctl(PR_SET_PDEATHSIG, SIGTERM);
|
||||
prctl(PR_SET_NAME, "inotify-index");
|
||||
|
||||
this->eventProcess(buf, len);
|
||||
fd_set read_fds;
|
||||
int rc;
|
||||
|
@ -230,11 +234,10 @@ void InotifyWatch::slotEvent(char *buf, ssize_t len)
|
|||
for(;;) {
|
||||
FD_ZERO(&read_fds);
|
||||
FD_SET(m_inotifyFd, &read_fds);
|
||||
qDebug() << read_timeout->tv_sec;
|
||||
rc = select(m_inotifyFd + 1, &read_fds, NULL, NULL, read_timeout);
|
||||
if(rc < 0) {
|
||||
// error
|
||||
qWarning() << "select result < 0, error!";
|
||||
qWarning() << "fork select result < 0, error!";
|
||||
IndexStatusRecorder::getInstance()->setStatus(INOTIFY_NORMAL_EXIT, "1");
|
||||
assert(false);
|
||||
} else if(rc == 0) {
|
||||
|
@ -263,9 +266,9 @@ void InotifyWatch::slotEvent(char *buf, ssize_t len)
|
|||
PendingFileQueue::getInstance()->~PendingFileQueue();
|
||||
::_exit(0);
|
||||
} else {
|
||||
qDebug() << "Select remain:" <<read_timeout->tv_sec;
|
||||
// qDebug() << "Select remain:" <<read_timeout->tv_sec;
|
||||
this->eventProcess(m_inotifyFd);
|
||||
qDebug() << "Select remain:" <<read_timeout->tv_sec;
|
||||
// qDebug() << "Select remain:" <<read_timeout->tv_sec;
|
||||
}
|
||||
}
|
||||
} else if(pid > 0) {
|
||||
|
@ -323,7 +326,7 @@ char * InotifyWatch::filter()
|
|||
}
|
||||
void InotifyWatch::eventProcess(int socket)
|
||||
{
|
||||
qDebug()<< "Enter eventProcess!";
|
||||
// qDebug()<< "Enter eventProcess!";
|
||||
int avail;
|
||||
if (ioctl(socket, FIONREAD, &avail) == EINVAL) {
|
||||
qWarning() << "Did not receive an entire inotify event.";
|
||||
|
@ -342,8 +345,8 @@ void InotifyWatch::eventProcess(int socket)
|
|||
while (i < len) {
|
||||
const struct inotify_event* event = (struct inotify_event*)&buffer[i];
|
||||
if(event->name[0] != '.') {
|
||||
qDebug() << "Read Event: " << currentPath[event->wd] << QString(event->name) << event->cookie << event->wd << event->mask;
|
||||
qDebug("mask:0x%x,",event->mask);
|
||||
// qDebug() << "Read Event: " << currentPath[event->wd] << QString(event->name) << event->cookie << event->wd << event->mask;
|
||||
// qDebug("mask:0x%x,",event->mask);
|
||||
break;
|
||||
}
|
||||
i += sizeof(struct inotify_event) + event->len;
|
||||
|
@ -358,8 +361,7 @@ void InotifyWatch::eventProcess(int socket)
|
|||
|
||||
void InotifyWatch::eventProcess(const char *buffer, ssize_t len)
|
||||
{
|
||||
qDebug()<< "Begin eventProcess! len:" << len;
|
||||
IndexStatusRecorder::getInstance()->setStatus(INOTIFY_NORMAL_EXIT, "0");
|
||||
// qDebug()<< "Begin eventProcess! len:" << len;
|
||||
|
||||
char * p = const_cast<char*>(buffer);
|
||||
while (p < buffer + len) {
|
||||
|
@ -370,7 +372,7 @@ void InotifyWatch::eventProcess(const char *buffer, ssize_t len)
|
|||
QString path = currentPath[event->wd] + '/' + event->name;
|
||||
//Create top dir first, traverse it last.
|
||||
if(event->mask & IN_CREATE) {
|
||||
qDebug() << "IN_CREATE";
|
||||
// qDebug() << "IN_CREATE";
|
||||
PendingFile f(path);
|
||||
if(event->mask & IN_ISDIR) {
|
||||
f.setIsDir();
|
||||
|
@ -401,7 +403,7 @@ void InotifyWatch::eventProcess(const char *buffer, ssize_t len)
|
|||
continue;
|
||||
}
|
||||
if(event->mask & IN_MODIFY) {
|
||||
qDebug() << "IN_MODIFY";
|
||||
// qDebug() << "IN_MODIFY";
|
||||
if(!(event->mask & IN_ISDIR)) {
|
||||
PendingFileQueue::getInstance()->enqueue(PendingFile(path));
|
||||
}
|
||||
|
@ -437,7 +439,7 @@ void InotifyWatch::eventProcess(const char *buffer, ssize_t len)
|
|||
next:
|
||||
p += sizeof(struct inotify_event) + event->len;
|
||||
}
|
||||
qDebug()<< "Finish eventProcess!";
|
||||
// qDebug()<< "Finish eventProcess!";
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -40,10 +40,11 @@ PendingFileQueue::PendingFileQueue(QObject *parent) : QThread(parent)
|
|||
// connect(this, &PendingFileQueue::minProcessTimerStart, m_minProcessTimer, f,Qt::DirectConnection);
|
||||
connect(this, SIGNAL(cacheTimerStart()), m_cacheTimer, SLOT(start()));
|
||||
connect(this, SIGNAL(minProcessTimerStart()), m_minProcessTimer, SLOT(start()));
|
||||
connect(this, &PendingFileQueue::timerStop, m_cacheTimer, &QTimer::stop);
|
||||
connect(this, &PendingFileQueue::timerStop, m_minProcessTimer, &QTimer::stop);
|
||||
|
||||
connect(m_cacheTimer, &QTimer::timeout, this, &PendingFileQueue::processCache, Qt::DirectConnection);
|
||||
connect(m_minProcessTimer, &QTimer::timeout, this, &PendingFileQueue::processCache, Qt::DirectConnection);
|
||||
|
||||
}
|
||||
|
||||
PendingFileQueue *PendingFileQueue::getInstance(QObject *parent)
|
||||
|
@ -71,13 +72,18 @@ PendingFileQueue::~PendingFileQueue()
|
|||
void PendingFileQueue::forceFinish()
|
||||
{
|
||||
QThread::msleep(600);
|
||||
Q_EMIT timerStop();
|
||||
this->quit();
|
||||
this->wait();
|
||||
}
|
||||
void PendingFileQueue::enqueue(const PendingFile &file)
|
||||
{
|
||||
qDebug() << "enqueuq file: " << file.path();
|
||||
// qDebug() << "enqueuq file: " << file.path();
|
||||
m_mutex.lock();
|
||||
m_enqueuetimes++;
|
||||
if(m_cache.isEmpty()) {
|
||||
IndexStatusRecorder::getInstance()->setStatus(INOTIFY_NORMAL_EXIT, "0");
|
||||
}
|
||||
// Remove all indexs of files under a dir which is to about be deleted,but keep delete signals.
|
||||
// Because our datebase need to delete those indexs one by one.
|
||||
if(file.shouldRemoveIndex() && file.isDir()) {
|
||||
|
@ -94,29 +100,29 @@ void PendingFileQueue::enqueue(const PendingFile &file)
|
|||
}
|
||||
int i = m_cache.indexOf(file);
|
||||
if (i == -1) {
|
||||
qDebug() << "insert file" << file.path() << file.shouldRemoveIndex();
|
||||
// qDebug() << "insert file" << file.path() << file.shouldRemoveIndex();
|
||||
m_cache << file;
|
||||
} else {
|
||||
qDebug() << "merge file" << file.path() << file.shouldRemoveIndex();
|
||||
// qDebug() << "merge file" << file.path() << file.shouldRemoveIndex();
|
||||
m_cache[i].merge(file);
|
||||
}
|
||||
|
||||
if(!m_cacheTimer->isActive()) {
|
||||
qDebug()<<"m_cacheTimer-----start!!";
|
||||
// qDebug()<<"m_cacheTimer-----start!!";
|
||||
// m_cacheTimer->start();
|
||||
Q_EMIT cacheTimerStart();
|
||||
}
|
||||
Q_EMIT minProcessTimerStart();
|
||||
// m_minProcessTimer->start();
|
||||
qDebug()<<"m_minProcessTimer-----start!!";
|
||||
// qDebug()<<"m_minProcessTimer-----start!!";
|
||||
m_mutex.unlock();
|
||||
qDebug() << "Current cache-------------";
|
||||
for(PendingFile i : m_cache) {
|
||||
qDebug() << "|" << i.path();
|
||||
qDebug() << "|" <<i.shouldRemoveIndex();
|
||||
}
|
||||
qDebug() << "Current cache-------------";
|
||||
qDebug()<<"enqueuq file finish!!"<<file.path();
|
||||
// qDebug() << "Current cache-------------";
|
||||
// for(PendingFile i : m_cache) {
|
||||
// qDebug() << "|" << i.path();
|
||||
// qDebug() << "|" <<i.shouldRemoveIndex();
|
||||
// }
|
||||
// qDebug() << "Current cache-------------";
|
||||
// qDebug()<<"enqueuq file finish!!"<<file.path();
|
||||
}
|
||||
|
||||
void PendingFileQueue::run()
|
||||
|
@ -126,13 +132,21 @@ void PendingFileQueue::run()
|
|||
|
||||
void PendingFileQueue::processCache()
|
||||
{
|
||||
qDebug()<< "Begin processCache!";
|
||||
qDebug()<< "Begin processCache!" ;
|
||||
m_mutex.lock();
|
||||
qDebug() << "Events:" << m_enqueuetimes;
|
||||
m_enqueuetimes = 0;
|
||||
m_cache.swap(m_pendingFiles);
|
||||
// m_pendingFiles = m_cache;
|
||||
// m_cache.clear();
|
||||
// m_cache.squeeze();
|
||||
m_mutex.unlock();
|
||||
qDebug() << "Current process-------------";
|
||||
for(PendingFile i : m_pendingFiles) {
|
||||
qDebug() << "|" << i.path();
|
||||
qDebug() << "|" <<i.shouldRemoveIndex();
|
||||
}
|
||||
qDebug() << "Current process-------------";
|
||||
if(m_pendingFiles.isEmpty()) {
|
||||
qDebug()<< "Empty, finish processCache!";
|
||||
return;
|
||||
|
|
|
@ -48,6 +48,7 @@ protected:
|
|||
Q_SIGNALS:
|
||||
void cacheTimerStart();
|
||||
void minProcessTimerStart();
|
||||
void timerStop();
|
||||
private:
|
||||
void processCache();
|
||||
explicit PendingFileQueue(QObject *parent = nullptr);
|
||||
|
@ -59,6 +60,7 @@ private:
|
|||
|
||||
QThread *m_timerThread = nullptr;
|
||||
bool m_timeout = false;
|
||||
int m_enqueuetimes = 0;
|
||||
|
||||
};
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue