[icinga-checkins] icinga.org: icinga2/master: Implemented poll() support for Process tasks.

git at icinga.org git at icinga.org
Sun Feb 10 23:32:01 CET 2013


Module: icinga2
Branch: master
Commit: bddd9ebf0b153ebfec1134a551a93c8e91576dd7
URL:    https://git.icinga.org/?p=icinga2.git;a=commit;h=bddd9ebf0b153ebfec1134a551a93c8e91576dd7

Author: Gunnar Beutner <gunnar.beutner at netways.de>
Date:   Sun Feb 10 23:31:11 2013 +0100

Implemented poll() support for Process tasks.

Fixes #3035

---

 lib/base/process.cpp |  127 +++++++++++++++++++++++++++++---------------------
 lib/base/unix.h      |    1 +
 2 files changed, 74 insertions(+), 54 deletions(-)

diff --git a/lib/base/process.cpp b/lib/base/process.cpp
index 882fb77..0efc585 100644
--- a/lib/base/process.cpp
+++ b/lib/base/process.cpp
@@ -143,34 +143,36 @@ void Process::Run(void)
 void Process::WorkerThreadProc(int taskFd)
 {
 	map<int, Process::Ptr> tasks;
+	pollfd *pfds;
 
 	for (;;) {
 		map<int, Process::Ptr>::iterator it, prev;
 
 #ifndef _MSC_VER
-		fd_set readfds;
-		int nfds = 0;
+		pfds = (pollfd *)realloc(pfds, (1 + tasks.size()) * sizeof(pollfd));
 
-		FD_ZERO(&readfds);
+		if (pfds == NULL)
+			BOOST_THROW_EXCEPTION(PosixException("realloc() failed.", errno));
 
-		if (tasks.size() < MaxTasksPerThread)
-			FD_SET(taskFd, &readfds);
+		int idx = 0;
 
-		if (taskFd > nfds)
-			nfds = taskFd;
+		if (tasks.size() < MaxTasksPerThread) {
+			pfds[idx].fd = taskFd;
+			pfds[idx].events = POLLIN;
+			idx++;
+		}
 
 		int fd;
 		BOOST_FOREACH(tie(fd, tuples::ignore), tasks) {
-			if (fd > nfds)
-				nfds = fd;
-
-			FD_SET(fd, &readfds);
+			pfds[idx].fd = fd;
+			pfds[idx].events = POLLIN;
+			idx++;
 		}
 
-		int rc = select(nfds + 1, &readfds, NULL, NULL, NULL);
+		int rc = poll(pfds, idx, -1);
 
 		if (rc < 0 && errno != EINTR)
-			BOOST_THROW_EXCEPTION(PosixException("select() failed.", errno));
+			BOOST_THROW_EXCEPTION(PosixException("poll() failed.", errno));
 
 		if (rc == 0)
 			continue;
@@ -180,67 +182,84 @@ void Process::WorkerThreadProc(int taskFd)
 #endif /* _MSC_VER */
 
 #ifndef _MSC_VER
-		if (FD_ISSET(taskFd, &readfds)) {
+		for (int i = 0; i < idx; i++) {
+			if ((pfds[i].revents & POLLIN) == 0)
+				continue;
+
+			if (pfds[i].fd == taskFd) {
 #endif /* _MSC_VER */
 
-			while (tasks.size() < MaxTasksPerThread) {
-				Process::Ptr task;
+				while (tasks.size() < MaxTasksPerThread) {
+					Process::Ptr task;
 
-				{
-					boost::mutex::scoped_lock lock(m_Mutex);
+					{
+						boost::mutex::scoped_lock lock(m_Mutex);
 
-					/* Read one byte for every task we take from the pending tasks list. */
-					char buffer;
-					int rc = read(taskFd, &buffer, sizeof(buffer));
+#ifndef _MSC_VER
+						/* Read one byte for every task we take from the pending tasks list. */
+						char buffer;
+						int rc = read(taskFd, &buffer, sizeof(buffer));
 
-					if (rc < 0) {
-						if (errno == EAGAIN)
-							break; /* Someone else was faster and took our task. */
+						if (rc < 0) {
+							if (errno == EAGAIN)
+								break; /* Someone else was faster and took our task. */
 
-						BOOST_THROW_EXCEPTION(PosixException("read() failed.", errno));
-					}
+							BOOST_THROW_EXCEPTION(PosixException("read() failed.", errno));
+						}
 
-					assert(!m_Tasks.empty());
+						assert(!m_Tasks.empty());
+#else /* _MSC_VER */
+						if (m_Tasks.empty())
+							break;
+#endif /* _MSC_VER */
 
-					task = m_Tasks.front();
-					m_Tasks.pop_front();
-				}
+						task = m_Tasks.front();
+						m_Tasks.pop_front();
+					}
 
-				try {
-					task->InitTask();
+					try {
+						task->InitTask();
 
-					int fd = fileno(task->m_FP);
-					if (fd >= 0)
-						tasks[fd] = task;
-				} catch (...) {
-					Event::Post(boost::bind(&Process::FinishException, task, boost::current_exception()));
+						int fd = fileno(task->m_FP);
+						if (fd >= 0)
+							tasks[fd] = task;
+					} catch (...) {
+						Event::Post(boost::bind(&Process::FinishException, task, boost::current_exception()));
+					}
 				}
-			}
 #ifndef _MSC_VER
-		}
-#endif /* _MSC_VER */
 
-		for (it = tasks.begin(); it != tasks.end(); ) {
-			int fd = it->first;
-			Process::Ptr task = it->second;
-
-#ifndef _MSC_VER
-			if (!FD_ISSET(fd, &readfds)) {
-				it++;
 				continue;
 			}
+
+			it = tasks.find(pfds[i].fd);
+
+			if (it == tasks.end())
+				continue;
+#else /* _MSC_VER */
+			for (it = tasks.begin(); it != tasks.end(); ) {
+				int fd = it->first;
 #endif /* _MSC_VER */
+				Process::Ptr task = it->second;
 
-			if (!task->RunTask()) {
-				prev = it;
-				it++;
-				tasks.erase(prev);
+				if (!task->RunTask()) {
+					prev = it;
+#ifdef _MSC_VER
+					it++;
+#endif /* _MSC_VER */
+					tasks.erase(prev);
 
-				Event::Post(boost::bind(&Process::FinishResult, task, task->m_Result));
-			} else {
-				it++;
+					Event::Post(boost::bind(&Process::FinishResult, task, task->m_Result));
+#ifdef _MSC_VER
+				} else {
+					it++;
+#endif /* _MSC_VER */
+				}
+#ifdef _MSC_VER
 			}
+#else /* _MSC_VER */
 		}
+#endif /* _MSC_VER */
 	}
 }
 
diff --git a/lib/base/unix.h b/lib/base/unix.h
index e39af51..af58c92 100644
--- a/lib/base/unix.h
+++ b/lib/base/unix.h
@@ -35,6 +35,7 @@
 #include <syslog.h>
 #include <sys/file.h>
 #include <sys/wait.h>
+#include <poll.h>
 #include <glob.h>
 #include <ltdl.h>
 





More information about the icinga-checkins mailing list