[icinga-checkins] icinga.org: icinga2/master: Fixed 1 second delay for child processes.
git at icinga.org
git at icinga.org
Sun Feb 10 01:36:14 CET 2013
Module: icinga2
Branch: master
Commit: fc6df0ecbd059ed2a64113befa1eec4e9318f479
URL: https://git.icinga.org/?p=icinga2.git;a=commit;h=fc6df0ecbd059ed2a64113befa1eec4e9318f479
Author: Gunnar Beutner <gunnar.beutner at netways.de>
Date: Sun Feb 10 01:35:40 2013 +0100
Fixed 1 second delay for child processes.
---
lib/base/process.cpp | 135 +++++++++++++++++++++++++++++---------------------
lib/base/process.h | 4 +-
2 files changed, 81 insertions(+), 58 deletions(-)
diff --git a/lib/base/process.cpp b/lib/base/process.cpp
index 2ee52b5..d57ae7b 100644
--- a/lib/base/process.cpp
+++ b/lib/base/process.cpp
@@ -24,7 +24,7 @@ using namespace icinga;
bool Process::m_ThreadCreated = false;
boost::mutex Process::m_Mutex;
deque<Process::Ptr> Process::m_Tasks;
-condition_variable Process::m_TasksCV;
+int Process::m_TaskFd;
Process::Process(const String& command, const Dictionary::Ptr& environment)
: AsyncTask<Process, ProcessResult>(), m_Command(command),
@@ -33,7 +33,14 @@ Process::Process(const String& command, const Dictionary::Ptr& environment)
assert(Application::IsMainThread());
if (!m_ThreadCreated) {
- thread t(&Process::WorkerThreadProc);
+ int fds[2];
+
+ if (pipe(fds) < 0)
+ BOOST_THROW_EXCEPTION(PosixException("pipe() failed.", errno));
+
+ m_TaskFd = fds[1];
+
+ thread t(&Process::WorkerThreadProc, fds[0]);
t.detach();
m_ThreadCreated = true;
@@ -42,87 +49,103 @@ Process::Process(const String& command, const Dictionary::Ptr& environment)
void Process::Run(void)
{
- boost::mutex::scoped_lock lock(m_Mutex);
- m_Tasks.push_back(GetSelf());
- m_TasksCV.notify_all();
+ {
+ boost::mutex::scoped_lock lock(m_Mutex);
+ m_Tasks.push_back(GetSelf());
+ }
+
+ /**
+ * This little gem which is commonly known as the "self-pipe trick"
+ * takes care of waking up the select() call in the worker thread.
+ */
+ if (write(m_TaskFd, "T", 1) < 0)
+ BOOST_THROW_EXCEPTION(PosixException("write() failed.", errno));
}
-void Process::WorkerThreadProc(void)
+void Process::WorkerThreadProc(int taskFd)
{
- boost::mutex::scoped_lock lock(m_Mutex);
-
map<int, Process::Ptr> tasks;
for (;;) {
- while (m_Tasks.empty() || tasks.size() >= MaxTasksPerThread) {
- lock.unlock();
-
- map<int, Process::Ptr>::iterator it, prev;
+ map<int, Process::Ptr>::iterator it, prev;
#ifndef _MSC_VER
- fd_set readfds;
- int nfds = 0;
+ fd_set readfds;
+ int nfds = 0;
- FD_ZERO(&readfds);
+ FD_ZERO(&readfds);
+ FD_SET(taskFd, &readfds);
- int fd;
- BOOST_FOREACH(tie(fd, tuples::ignore), tasks) {
- if (fd > nfds)
- nfds = fd;
+ if (taskFd > nfds)
+ nfds = taskFd;
- FD_SET(fd, &readfds);
- }
+ int fd;
+ BOOST_FOREACH(tie(fd, tuples::ignore), tasks) {
+ if (fd > nfds)
+ nfds = fd;
+
+ FD_SET(fd, &readfds);
+ }
- timeval tv;
- tv.tv_sec = 1;
- tv.tv_usec = 0;
- select(nfds + 1, &readfds, NULL, NULL, &tv);
+ timeval tv;
+ tv.tv_sec = 1;
+ tv.tv_usec = 0;
+ select(nfds + 1, &readfds, NULL, NULL, &tv);
#else /* _MSC_VER */
- Utility::Sleep(1);
+ Utility::Sleep(1);
#endif /* _MSC_VER */
- for (it = tasks.begin(); it != tasks.end(); ) {
- int fd = it->first;
- Process::Ptr task = it->second;
+ if (FD_ISSET(taskFd, &readfds)) {
+ /* clear pipe */
+ char buffer[512];
+ int rc = read(taskFd, buffer, sizeof(buffer));
+ assert(rc >= 1);
-#ifndef _MSC_VER
- if (!FD_ISSET(fd, &readfds)) {
- it++;
- continue;
+ while (tasks.size() < MaxTasksPerThread) {
+ Process::Ptr task;
+
+ {
+ boost::mutex::scoped_lock lock(m_Mutex);
+
+ if (m_Tasks.empty())
+ break;
+
+ task = m_Tasks.front();
+ m_Tasks.pop_front();
}
-#endif /* _MSC_VER */
- if (!task->RunTask()) {
- prev = it;
- it++;
- tasks.erase(prev);
+ try {
+ task->InitTask();
- Event::Post(boost::bind(&Process::FinishResult, task, task->m_Result));
- } else {
- it++;
+ int fd = task->GetFD();
+ if (fd >= 0)
+ tasks[fd] = task;
+ } catch (...) {
+ Event::Post(boost::bind(&Process::FinishException, task, boost::current_exception()));
}
}
-
- lock.lock();
}
- while (!m_Tasks.empty() && tasks.size() < MaxTasksPerThread) {
- Process::Ptr task = m_Tasks.front();
- m_Tasks.pop_front();
+ for (it = tasks.begin(); it != tasks.end(); ) {
+ int fd = it->first;
+ Process::Ptr task = it->second;
- lock.unlock();
+#ifndef _MSC_VER
+ if (!FD_ISSET(fd, &readfds)) {
+ it++;
+ continue;
+ }
+#endif /* _MSC_VER */
- try {
- task->InitTask();
+ if (!task->RunTask()) {
+ prev = it;
+ it++;
+ tasks.erase(prev);
- int fd = task->GetFD();
- if (fd >= 0)
- tasks[fd] = task;
- } catch (...) {
- Event::Post(boost::bind(&Process::FinishException, task, boost::current_exception()));
+ Event::Post(boost::bind(&Process::FinishResult, task, task->m_Result));
+ } else {
+ it++;
}
-
- lock.lock();
}
}
}
diff --git a/lib/base/process.h b/lib/base/process.h
index 1c0c2f5..6044345 100644
--- a/lib/base/process.h
+++ b/lib/base/process.h
@@ -71,9 +71,9 @@ private:
static boost::mutex m_Mutex;
static deque<Process::Ptr> m_Tasks;
- static condition_variable m_TasksCV;
+ static int m_TaskFd;
- static void WorkerThreadProc(void);
+ static void WorkerThreadProc(int taskFd);
void InitTask(void);
bool RunTask(void);
More information about the icinga-checkins
mailing list