[icinga-checkins] icinga.org: icinga2/master: Fixed 1 second delay for child processes.

git at icinga.org git at icinga.org
Sun Feb 10 01:36:14 CET 2013


Module: icinga2
Branch: master
Commit: fc6df0ecbd059ed2a64113befa1eec4e9318f479
URL:    https://git.icinga.org/?p=icinga2.git;a=commit;h=fc6df0ecbd059ed2a64113befa1eec4e9318f479

Author: Gunnar Beutner <gunnar.beutner at netways.de>
Date:   Sun Feb 10 01:35:40 2013 +0100

Fixed 1 second delay for child processes.

---

 lib/base/process.cpp |  135 +++++++++++++++++++++++++++++---------------------
 lib/base/process.h   |    4 +-
 2 files changed, 81 insertions(+), 58 deletions(-)

diff --git a/lib/base/process.cpp b/lib/base/process.cpp
index 2ee52b5..d57ae7b 100644
--- a/lib/base/process.cpp
+++ b/lib/base/process.cpp
@@ -24,7 +24,7 @@ using namespace icinga;
 bool Process::m_ThreadCreated = false;
 boost::mutex Process::m_Mutex;
 deque<Process::Ptr> Process::m_Tasks;
-condition_variable Process::m_TasksCV;
+int Process::m_TaskFd;
 
 Process::Process(const String& command, const Dictionary::Ptr& environment)
 	: AsyncTask<Process, ProcessResult>(), m_Command(command),
@@ -33,7 +33,14 @@ Process::Process(const String& command, const Dictionary::Ptr& environment)
 	assert(Application::IsMainThread());
 
 	if (!m_ThreadCreated) {
-		thread t(&Process::WorkerThreadProc);
+		int fds[2];
+
+		if (pipe(fds) < 0)
+			BOOST_THROW_EXCEPTION(PosixException("pipe() failed.", errno));
+
+		m_TaskFd = fds[1];
+
+		thread t(&Process::WorkerThreadProc, fds[0]);
 		t.detach();
 
 		m_ThreadCreated = true;
@@ -42,87 +49,103 @@ Process::Process(const String& command, const Dictionary::Ptr& environment)
 
 void Process::Run(void)
 {
-	boost::mutex::scoped_lock lock(m_Mutex);
-	m_Tasks.push_back(GetSelf());
-	m_TasksCV.notify_all();
+	{
+		boost::mutex::scoped_lock lock(m_Mutex);
+		m_Tasks.push_back(GetSelf());
+	}
+
+	/**
+	 * This little gem which is commonly known as the "self-pipe trick"
+	 * takes care of waking up the select() call in the worker thread.
+	 */
+	if (write(m_TaskFd, "T", 1) < 0)
+		BOOST_THROW_EXCEPTION(PosixException("write() failed.", errno));
 }
 
-void Process::WorkerThreadProc(void)
+void Process::WorkerThreadProc(int taskFd)
 {
-	boost::mutex::scoped_lock lock(m_Mutex);
-
 	map<int, Process::Ptr> tasks;
 
 	for (;;) {
-		while (m_Tasks.empty() || tasks.size() >= MaxTasksPerThread) {
-			lock.unlock();
-
-			map<int, Process::Ptr>::iterator it, prev;
+		map<int, Process::Ptr>::iterator it, prev;
 
 #ifndef _MSC_VER
-			fd_set readfds;
-			int nfds = 0;
+		fd_set readfds;
+		int nfds = 0;
 
-			FD_ZERO(&readfds);
+		FD_ZERO(&readfds);
+		FD_SET(taskFd, &readfds);
 
-			int fd;
-			BOOST_FOREACH(tie(fd, tuples::ignore), tasks) {
-				if (fd > nfds)
-					nfds = fd;
+		if (taskFd > nfds)
+			nfds = taskFd;
 
-				FD_SET(fd, &readfds);
-			}
+		int fd;
+		BOOST_FOREACH(tie(fd, tuples::ignore), tasks) {
+			if (fd > nfds)
+				nfds = fd;
+
+			FD_SET(fd, &readfds);
+		}
 
-			timeval tv;
-			tv.tv_sec = 1;
-			tv.tv_usec = 0;
-			select(nfds + 1, &readfds, NULL, NULL, &tv);
+		timeval tv;
+		tv.tv_sec = 1;
+		tv.tv_usec = 0;
+		select(nfds + 1, &readfds, NULL, NULL, &tv);
 #else /* _MSC_VER */
-			Utility::Sleep(1);
+		Utility::Sleep(1);
 #endif /* _MSC_VER */
 
-			for (it = tasks.begin(); it != tasks.end(); ) {
-				int fd = it->first;
-				Process::Ptr task = it->second;
+		if (FD_ISSET(taskFd, &readfds)) {
+			/* clear pipe */
+			char buffer[512];
+			int rc = read(taskFd, buffer, sizeof(buffer));
+			assert(rc >= 1);
 
-#ifndef _MSC_VER
-				if (!FD_ISSET(fd, &readfds)) {
-					it++;
-					continue;
+			while (tasks.size() < MaxTasksPerThread) {
+				Process::Ptr task;
+
+				{
+					boost::mutex::scoped_lock lock(m_Mutex);
+
+					if (m_Tasks.empty())
+						break;
+
+					task = m_Tasks.front();
+					m_Tasks.pop_front();
 				}
-#endif /* _MSC_VER */
 
-				if (!task->RunTask()) {
-					prev = it;
-					it++;
-					tasks.erase(prev);
+				try {
+					task->InitTask();
 
-					Event::Post(boost::bind(&Process::FinishResult, task, task->m_Result));
-				} else {
-					it++;
+					int fd = task->GetFD();
+					if (fd >= 0)
+						tasks[fd] = task;
+				} catch (...) {
+					Event::Post(boost::bind(&Process::FinishException, task, boost::current_exception()));
 				}
 			}
-
-			lock.lock();
 		}
 
-		while (!m_Tasks.empty() && tasks.size() < MaxTasksPerThread) {
-			Process::Ptr task = m_Tasks.front();
-			m_Tasks.pop_front();
+		for (it = tasks.begin(); it != tasks.end(); ) {
+			int fd = it->first;
+			Process::Ptr task = it->second;
 
-			lock.unlock();
+#ifndef _MSC_VER
+			if (!FD_ISSET(fd, &readfds)) {
+				it++;
+				continue;
+			}
+#endif /* _MSC_VER */
 
-			try {
-				task->InitTask();
+			if (!task->RunTask()) {
+				prev = it;
+				it++;
+				tasks.erase(prev);
 
-				int fd = task->GetFD();
-				if (fd >= 0)
-					tasks[fd] = task;
-			} catch (...) {
-				Event::Post(boost::bind(&Process::FinishException, task, boost::current_exception()));
+				Event::Post(boost::bind(&Process::FinishResult, task, task->m_Result));
+			} else {
+				it++;
 			}
-
-			lock.lock();
 		}
 	}
 }
diff --git a/lib/base/process.h b/lib/base/process.h
index 1c0c2f5..6044345 100644
--- a/lib/base/process.h
+++ b/lib/base/process.h
@@ -71,9 +71,9 @@ private:
 
 	static boost::mutex m_Mutex;
 	static deque<Process::Ptr> m_Tasks;
-	static condition_variable m_TasksCV;
+	static int m_TaskFd;
 
-	static void WorkerThreadProc(void);
+	static void WorkerThreadProc(int taskFd);
 
 	void InitTask(void);
 	bool RunTask(void);





More information about the icinga-checkins mailing list