[icinga-checkins] icinga.org: icinga2/master: Use multiple worker threads for Process tasks.

git at icinga.org git at icinga.org
Sun Feb 10 22:29:51 CET 2013


Module: icinga2
Branch: master
Commit: 7ce98ed374ddc6cd694dd8c4c5a83bcaec1c594a
URL:    https://git.icinga.org/?p=icinga2.git;a=commit;h=7ce98ed374ddc6cd694dd8c4c5a83bcaec1c594a

Author: Gunnar Beutner <gunnar.beutner at netways.de>
Date:   Sun Feb 10 22:29:42 2013 +0100

Use multiple worker threads for Process tasks.

---

 lib/base/process.cpp |   61 ++++++++++++++++++++++++++++++-------------------
 1 files changed, 37 insertions(+), 24 deletions(-)

diff --git a/lib/base/process.cpp b/lib/base/process.cpp
index 6ae7ea2..882fb77 100644
--- a/lib/base/process.cpp
+++ b/lib/base/process.cpp
@@ -39,22 +39,38 @@ Process::Process(const vector<String>& arguments, const Dictionary::Ptr& extraEn
 	assert(Application::IsMainThread());
 
 	if (!m_ThreadCreated) {
-		int childTaskFd;
-
-#ifdef _MSC_VER
-		childTaskFd = 0;
-#else /* _MSC_VER */
+#ifndef _MSC_VER
 		int fds[2];
 
 		if (pipe(fds) < 0)
 			BOOST_THROW_EXCEPTION(PosixException("pipe() failed.", errno));
 
-		childTaskFd = fds[0];
 		m_TaskFd = fds[1];
 #endif /* _MSC_VER */
 
-		thread t(&Process::WorkerThreadProc, childTaskFd);
-		t.detach();
+		for (int i = 0; i < thread::hardware_concurrency(); i++) {
+			int childTaskFd;
+
+#ifdef _MSC_VER
+			childTaskFd = 0;
+#else /* _MSC_VER */
+			childTaskFd = dup(fds[0]);
+
+			if (childTaskFd < 0)
+				BOOST_THROW_EXCEPTION(PosixException("dup() failed.", errno));
+
+			int flags;
+			flags = fcntl(childTaskFd, F_GETFL, 0);
+			if (flags < 0)
+				BOOST_THROW_EXCEPTION(PosixException("fcntl failed", errno));
+
+			if (fcntl(childTaskFd, F_SETFL, flags | O_NONBLOCK) < 0)
+				BOOST_THROW_EXCEPTION(PosixException("fcntl failed", errno));
+#endif /* _MSC_VER */
+
+			thread t(&Process::WorkerThreadProc, childTaskFd);
+			t.detach();
+		}
 
 		m_ThreadCreated = true;
 	}
@@ -166,28 +182,25 @@ void Process::WorkerThreadProc(int taskFd)
 #ifndef _MSC_VER
 		if (FD_ISSET(taskFd, &readfds)) {
 #endif /* _MSC_VER */
-			/* Figure out how many tasks we'd ideally want. */
-			int tasknum = MaxTasksPerThread - tasks.size();
-
-#ifndef _MSC_VER
-			/* Read one byte for every task we take from the pending tasks list. */
-			char buffer[MaxTasksPerThread];
-			tasknum = read(taskFd, &buffer, tasknum);
-
-			if (tasknum < 0)
-				BOOST_THROW_EXCEPTION(PosixException("read() failed.", errno));
-
-			assert(tasknum >= 1);
-#endif /* _MSC_VER */
 
-			for (int i = 0; i < tasknum; i++) {
+			while (tasks.size() < MaxTasksPerThread) {
 				Process::Ptr task;
 
 				{
 					boost::mutex::scoped_lock lock(m_Mutex);
 
-					if (m_Tasks.empty())
-						break;
+					/* Read one byte for every task we take from the pending tasks list. */
+					char buffer;
+					int rc = read(taskFd, &buffer, sizeof(buffer));
+
+					if (rc < 0) {
+						if (errno == EAGAIN)
+							break; /* Someone else was faster and took our task. */
+
+						BOOST_THROW_EXCEPTION(PosixException("read() failed.", errno));
+					}
+
+					assert(!m_Tasks.empty());
 
 					task = m_Tasks.front();
 					m_Tasks.pop_front();





More information about the icinga-checkins mailing list