[icinga-checkins] icinga.org: icinga2/master: Make the event queue adaptive.

git at icinga.org git at icinga.org
Sat Mar 23 12:23:39 CET 2013


Module: icinga2
Branch: master
Commit: d04a04d8972926075903cb843277d9a5fcb602a7
URL:    https://git.icinga.org/?p=icinga2.git;a=commit;h=d04a04d8972926075903cb843277d9a5fcb602a7

Author: Gunnar Beutner <gunnar at beutner.name>
Date:   Sat Mar 23 12:23:13 2013 +0100

Make the event queue adaptive.

---

 lib/base/eventqueue.cpp |  119 ++++++++++++++++++++++++++++++++---------------
 lib/base/eventqueue.h   |   12 ++++-
 2 files changed, 91 insertions(+), 40 deletions(-)

diff --git a/lib/base/eventqueue.cpp b/lib/base/eventqueue.cpp
index 0834945..8913903 100644
--- a/lib/base/eventqueue.cpp
+++ b/lib/base/eventqueue.cpp
@@ -29,23 +29,13 @@
 using namespace icinga;
 
 EventQueue::EventQueue(void)
-	: m_Stopped(false)
+	: m_Stopped(false), m_ThreadDeaths(0), m_Latency(0), m_LatencyCount(0)
 {
-	m_ThreadCount = boost::thread::hardware_concurrency();
+	for (int i = 0; i < sizeof(m_ThreadStates) / sizeof(m_ThreadStates[0]); i++)
+		m_ThreadStates[i] = ThreadDead;
 
-	if (m_ThreadCount == 0)
-		m_ThreadCount = 1;
-
-	m_ThreadCount *= 8;
-
-	m_ThreadCount = 128;
-
-	m_States = new ThreadState[m_ThreadCount];
-
-	for (int i = 0; i < m_ThreadCount; i++) {
-		m_States[i] = ThreadIdle;
-		m_Threads.create_thread(boost::bind(&EventQueue::QueueThreadProc, this, i));
-	}
+	for (int i = 0; i < 8; i++)
+		SpawnWorker();
 
 	boost::thread reportThread(boost::bind(&EventQueue::ReportThreadProc, this));
 	reportThread.detach();
@@ -69,7 +59,13 @@ void EventQueue::Stop(void)
  */
 void EventQueue::Join(void)
 {
-	m_Threads.join_all();
+	boost::mutex::scoped_lock lock(m_Mutex);
+
+	while (!m_Stopped || !m_Events.empty()) {
+		lock.unlock();
+		Utility::Sleep(0.5);
+		lock.lock();
+	}
 }
 
 /**
@@ -83,18 +79,26 @@ void EventQueue::QueueThreadProc(int tid)
 		{
 			boost::mutex::scoped_lock lock(m_Mutex);
 
-			m_States[tid] = ThreadIdle;
+			m_ThreadStates[tid] = ThreadIdle;
 
-			while (m_Events.empty() && !m_Stopped)
+			while (m_Events.empty() && !m_Stopped && m_ThreadDeaths == 0)
 				m_CV.wait(lock);
 
+			if (m_ThreadDeaths > 0) {
+				m_ThreadDeaths--;
+				break;
+			}
+
 			if (m_Events.empty() && m_Stopped)
 				break;
 
 			event = m_Events.front();
 			m_Events.pop_front();
 
-			m_States[tid] = ThreadBusy;
+			m_ThreadStates[tid] = ThreadBusy;
+
+			m_Latency += Utility::GetTime() - event.Timestamp;
+			m_LatencyCount++;
 		}
 
 #ifdef _DEBUG
@@ -150,6 +154,8 @@ void EventQueue::QueueThreadProc(int tid)
 		}
 #endif /* _DEBUG */
 	}
+
+	m_ThreadStates[tid] = ThreadDead;
 }
 
 /**
@@ -161,6 +167,9 @@ void EventQueue::Post(const EventQueueCallback& callback)
 {
 	boost::mutex::scoped_lock lock(m_Mutex);
 
+	if (m_Stopped)
+		BOOST_THROW_EXCEPTION(std::runtime_error("EventQueue has been stopped."));
+
 	EventQueueWorkItem event;
 	event.Callback = callback;
 	event.Timestamp = Utility::GetTime();
@@ -171,43 +180,79 @@ void EventQueue::Post(const EventQueueCallback& callback)
 
 void EventQueue::ReportThreadProc(void)
 {
+	double last_adjustment = 0;
+
 	for (;;) {
 		Utility::Sleep(5);
 
 		double now = Utility::GetTime();
 
-		int pending, busy;
-		double max_latency, avg_latency;
+		int pending, alive, busy;
+		double avg_latency;
 
 		{
 			boost::mutex::scoped_lock lock(m_Mutex);
 			pending = m_Events.size();
 
+			alive = 0;
 			busy = 0;
 
-			for (int i = 0; i < m_ThreadCount; i++) {
-				if (m_States[i] == ThreadBusy)
+			for (int i = 0; i < sizeof(m_ThreadStates) / sizeof(m_ThreadStates[0]); i++) {
+				if (m_ThreadStates[i] != ThreadDead)
+					alive++;
+
+				if (m_ThreadStates[i] == ThreadBusy)
 					busy++;
 			}
 
-			max_latency = 0;
-			avg_latency = 0;
+			if (m_LatencyCount > 0)
+				avg_latency = m_Latency / (m_LatencyCount * 1.0);
+			else
+				avg_latency = 0;
+
+			m_Latency = 0;
+			m_LatencyCount = 0;
+
+			if (pending > 0) {
+				/* Spawn a few additional workers. */
+				for (int i = 0; i < 2; i++)
+					SpawnWorker();
+			} else if (last_adjustment < now - 30) {
+				KillWorker();
+				last_adjustment = now;
+			}
+		}
 
-			BOOST_FOREACH(const EventQueueWorkItem& event, m_Events) {
-				double latency = now - event.Timestamp;
+		std::ostringstream msgbuf;
+		msgbuf << "Pending tasks: " << pending << "; Busy threads: " << busy << "; Idle threads: " << alive - busy << "; Average latency: " << (long)(avg_latency * 1000) << "ms";
+		Log(LogInformation, "base", msgbuf.str());
+	}
+}
 
-				avg_latency += latency;
+/**
+ * Note: Caller must hold m_Mutex
+ */
+void EventQueue::SpawnWorker(void)
+{
+	for (int i = 0; i < sizeof(m_ThreadStates) / sizeof(m_ThreadStates[0]); i++) {
+		if (m_ThreadStates[i] == ThreadDead) {
+			Log(LogInformation, "debug", "Spawning worker thread.");
 
-				if (latency > max_latency)
-					max_latency = latency;
-			}
+			m_ThreadStates[i] = ThreadIdle;
+			boost::thread worker(boost::bind(&EventQueue::QueueThreadProc, this, i));
+			worker.detach();
 
-			avg_latency /= pending;
+			break;
 		}
-
-		Log(LogInformation, "base", "Pending tasks: " + Convert::ToString(pending) + "; Busy threads: " +
-		    Convert::ToString(busy) + "; Idle threads: " + Convert::ToString(m_ThreadCount - busy) +
-		    "; Maximum latency: " + Convert::ToString((long)max_latency * 1000) + "ms"
-		    "; Average latency: " + Convert::ToString((long)avg_latency * 1000) + "ms");
 	}
 }
+
+/**
+ * Note: Caller must hold m_Mutex.
+ */
+void EventQueue::KillWorker(void)
+{
+	Log(LogInformation, "base", "Killing worker thread.");
+
+	m_ThreadDeaths++;
+}
diff --git a/lib/base/eventqueue.h b/lib/base/eventqueue.h
index 2b31879..f173bdb 100644
--- a/lib/base/eventqueue.h
+++ b/lib/base/eventqueue.h
@@ -32,6 +32,7 @@ namespace icinga
 
 enum ThreadState
 {
+	ThreadDead,
 	ThreadIdle,
 	ThreadBusy
 };
@@ -61,9 +62,11 @@ public:
 	void Post(const EventQueueCallback& callback);
 
 private:
-	boost::thread_group m_Threads;
-	ThreadState *m_States;
-	int m_ThreadCount;
+	ThreadState m_ThreadStates[512];
+	int m_ThreadDeaths;
+
+	double m_Latency;
+	int m_LatencyCount;
 
 	boost::mutex m_Mutex;
 	boost::condition_variable m_CV;
@@ -73,6 +76,9 @@ private:
 
 	void QueueThreadProc(int tid);
 	void ReportThreadProc(void);
+
+	void SpawnWorker(void);
+	void KillWorker(void);
 };
 
 }





More information about the icinga-checkins mailing list