[icinga-checkins] icinga.org: icinga-core/mfriedrich/workers: core: retain worker between reloads #2956

git at icinga.org git at icinga.org
Sun Aug 5 23:32:12 CEST 2012


Module: icinga-core
Branch: mfriedrich/workers
Commit: ffca1684d1f94aa86dfec08978bf25f143de7929
URL:    https://git.icinga.org/?p=icinga-core.git;a=commit;h=ffca1684d1f94aa86dfec08978bf25f143de7929

Author: Michael Friedrich <michael.friedrich at univie.ac.at>
Date:   Sun Aug  5 16:19:02 2012 +0200

core: retain worker between reloads #2956

it actually makes no sense to kill and respawn them over reloads, since
it just means we lose a lot of half-completed checks for no good reason.

while we're at it, we'll fix the worker process memory management, so
that they can safely release each other's memory.

refs #2956

---

 base/icinga.c     |    6 ++
 base/workers.c    |  136 ++++++++++++++++++++++++++++++++++++++--------------
 include/workers.h |    5 ++-
 lib/worker.h      |    4 ++
 4 files changed, 113 insertions(+), 38 deletions(-)

diff --git a/base/icinga.c b/base/icinga.c
index 9c37bb4..80a66fb 100644
--- a/base/icinga.c
+++ b/base/icinga.c
@@ -276,6 +276,8 @@ int             debug_level = DEFAULT_DEBUG_LEVEL;
 int             debug_verbosity = DEFAULT_DEBUG_VERBOSITY;
 unsigned long   max_debug_file_size = DEFAULT_MAX_DEBUG_FILE_SIZE;
 
+extern iobroker_set *icinga_iobs;
+
 int dummy;	/* reduce compiler warnings */
 
 /* MAIN */
@@ -645,6 +647,8 @@ int main(int argc, char **argv, char **env) {
 	/* else start to monitor things... */
 	else {
 
+		icinga_iobs = iobroker_create();
+
 		/* make sure gcc3 won't hit here */
 #ifndef GCCTOOOLD
 		/* This is Sparta! */
@@ -911,6 +915,8 @@ int main(int argc, char **argv, char **env) {
 
 			/* shutdown stuff... */
 			if (sigshutdown == TRUE) {
+				free_worker_memory(WPROC_FORCE);
+				iobroker_destroy(icinga_iobs, IOBROKER_CLOSE_SOCKETS);
 
 				/* make sure lock file has been removed - it may not have been if we received a shutdown command */
 				if (daemon_mode == TRUE)
diff --git a/base/workers.c b/base/workers.c
index a305c78..9a4d866 100644
--- a/base/workers.c
+++ b/base/workers.c
@@ -52,7 +52,6 @@ typedef struct wproc_object_job {
 	char *contact_name;
 	char *host_name;
 	char *service_description;
-	struct squeue_event *sq_evt;
 } wproc_object_job;
 
 typedef struct wproc_result {
@@ -75,6 +74,8 @@ typedef struct wproc_result {
 	struct kvvec *response;
 } wproc_result;
 
+extern int nagios_pid;
+
 #define tv2float(tv) ((float)((tv)->tv_sec) + ((float)(tv)->tv_usec) / 1000000.0)
 
 static worker_job *create_job(int type, void *arg, time_t timeout, const char *command) {
@@ -160,49 +161,94 @@ static void destroy_job(worker_process *wp, worker_job *job) {
 	my_free(job->command);
 
 	wp->jobs[job->id % wp->max_jobs] = NULL;
+	wp->jobs_running--;
+
 	free(job);
 }
 
-static void free_wproc_memory(worker_process *wp) {
-	int i = 0, destroyed = 0;
+static int wproc_is_alive(worker_process *wp) {
+
+	if (!wp || !wp->pid)
+		return 0;
+	if(kill(wp->pid, 0) == 0 && iobroker_is_registered(icinga_iobs, wp->sd))
+		return 1;
+
+	return 0;
+}
+
+int wproc_destroy (worker_process *wp, int flags) {
+	int i = 0, destroyed = 0, force = 0, sd, self;
 
 	if (!wp)
-		return;
+		return 0;
+
+	force = !!(flags & WPROC_FORCE);
 
+	self = getpid();
+
+	/* master retains workers through restarts */
+	if (self == nagios_pid && !force)
+		return 0;
+
+	/* free all memory when either forcing or a worker called us */
 	iocache_destroy(wp->ioc);
 	wp->ioc = NULL;
 
-	for (i = 0; i < wp->max_jobs; i++) {
-		if (!wp->jobs[i])
-			continue;
+	if (wp->jobs) {
 
-		destroy_job(wp, wp->jobs[i]);
-		/* we can (often) break out early */
-		if (++destroyed >= wp->jobs_running)
-			break;
+		for (i = 0; i < wp->max_jobs; i++) {
+			if (!wp->jobs[i])
+				continue;
+
+			destroy_job(wp, wp->jobs[i]);
+			/* we can (often) break out early */
+			if (++destroyed >= wp->jobs_running)
+				break;
+		}
+
+		/* this triggers a double-free() for some reason */
+		/* free(wp->jobs); */
+		wp->jobs = NULL;
 	}
 
-	free(wp->jobs);
+	sd = wp->sd;
+	free(wp);
+
+	/* workers must never control other workers, so they return early */
+	if (self != nagios_pid)
+		return 0;
+
+	/* kill(0, SIGKILL) equals suicide, so we avoid it */
+	if (wp->pid) {
+		kill(wp->pid, SIGKILL);
+	}
+
+	iobroker_close(icinga_iobs, sd);
+
+	/* reap our possibly lost children */
+	while (waitpid(-1, &i, WNOHANG) > 0)
+		; /* do nothing */
+
+	return 0;
 }
 
 /*
  * This gets called from both parent and worker process, so
  * we must take care not to blindly shut down everything here
  */
-void free_worker_memory(void) {
-	unsigned int i;
+void free_worker_memory(int flags) {
 
-	for (i = 0; i < num_workers; i++) {
-		if (!workers[i])
-			continue;
+	if (workers) {
+		unsigned int i;
 
-		/* workers die when master socket close()s */
-		iobroker_close(icinga_iobs, workers[i]->sd);
-		free_wproc_memory(workers[i]);
-		my_free(workers[i]);
+		for (i = 0; i < num_workers; i++) {
+			if (!workers[i])
+				continue;
+			wproc_destroy(workers[i], flags);
+			workers[i] = NULL;
+		}
+		free(workers);
 	}
-	iobroker_destroy(icinga_iobs, 0);
-	icinga_iobs = NULL;
 	workers = NULL;
 	num_workers = 0;
 	worker_index = 0;
@@ -238,8 +284,6 @@ static int handle_worker_check(wproc_result *wpres, worker_process *wp, worker_j
 	int result = ERROR;
 	check_result *cr = (check_result *)job->arg;
 
-	cr->output_file = NULL;
-	cr->output_file_fp = NULL;
 	memcpy(&cr->rusage, &wpres->rusage, sizeof(wpres->rusage));
 	cr->start_time.tv_sec = wpres->start.tv_sec;
 	cr->start_time.tv_usec = wpres->start.tv_usec;
@@ -508,19 +552,23 @@ static int handle_worker_result(int sd, int events, void *arg) {
 			break;
 		}
 		destroy_job(wp, job);
-		wp->jobs_running--;
 	}
 
 	return 0;
 }
 
-static int init_iobroker(void) {
-	if (!icinga_iobs)
-		icinga_iobs = iobroker_create();
+int workers_alive(void) {
+	int i, alive = 0;
 
-	if (icinga_iobs)
+	if(!workers)
 		return 0;
-	return -1;
+
+	for (i = 0; i < num_workers; i++) {
+		if (wproc_is_alive(workers[i]))
+			alive++;
+	}
+
+	return alive;
 }
 
 int init_workers(int desired_workers) {
@@ -531,7 +579,8 @@ int init_workers(int desired_workers) {
 		desired_workers = 4;
 	}
 
-	init_iobroker();
+	if (workers_alive() == desired_workers)
+		return 0;
 
 	/* can't shrink the number of workers (yet) */
 	if (desired_workers < num_workers)
@@ -552,21 +601,33 @@ int init_workers(int desired_workers) {
 	}
 
 	workers = wps;
-	for (; num_workers < desired_workers; num_workers++) {
+	for (i = 0; i < desired_workers; i++) {
+		int ret;
 		worker_process *wp;
 
+		if (wps[i])
+			continue;
+
 		wp = spawn_worker(worker_init_func, (void *)get_global_macros());
 		if (!wp) {
 			logit(NSLOG_RUNTIME_WARNING, TRUE, "Failed to spawn worker: %s\n", strerror(errno));
-			free_worker_memory();
+			free_worker_memory(0);
 			return ERROR;
 		}
 
-		wps[num_workers] = wp;
-		iobroker_register(icinga_iobs, wp->sd, wp, handle_worker_result);
+		set_socket_options(wp->sd, 256 * 1024);
+
+		wps[i] = wp;
+		ret = iobroker_register(icinga_iobs, wp->sd, wp, handle_worker_result);
+		if (ret < 0) {
+			printf("Failed to register worker socket with iobroker %p\n", icinga_iobs);
+			exit(1);
+		}
 	}
 
-	logit(NSLOG_PROCESS_INFO, TRUE, "Spawned %d core workers.\n", num_workers);
+	num_workers = desired_workers;
+
+	logit(NSLOG_INFO_MESSAGE, TRUE, "Workers spawned: %d.\n", num_workers);
 
 	return 0;
 }
@@ -586,6 +647,7 @@ static worker_process *get_worker(worker_job *job) {
 		/* XXX FIXME Fiddle with finding a new, less busy, worker here */
 	}
 	wp->jobs[job->id % wp->max_jobs] = job;
+	job->wp = wp;
 	return wp;
 
 	/* dead code below. for now */
diff --git a/include/workers.h b/include/workers.h
index a71cc8a..1c3714b 100644
--- a/include/workers.h
+++ b/include/workers.h
@@ -39,7 +39,9 @@
 #define WPJOB_GLOBAL_HOST_EVTHANDLER 6
 #define WPJOB_HOST_EVTHANDLER 7
 
-extern void free_worker_memory(void);
+#define WPROC_FORCE (1 << 0)
+
+extern void free_worker_memory(int flags);
 extern int init_workers(int desired_workers);
 extern int wproc_run_check(check_result *cr, char *cmd, icinga_macros *mac);
 extern int wproc_notify(char *cname, char *hname, char *sdesc, char *cmd, icinga_macros *mac);
@@ -47,4 +49,5 @@ extern void wproc_poll(int timeout_ms);
 extern int wproc_run(int job_type, char *cmd, int timeout, icinga_macros *mac);
 extern int wproc_run_service_job(int jtype, int timeout, service *svc, char *cmd, icinga_macros *mac);
 extern int wproc_run_host_job(int jtype, int timeout, host *hst, char *cmd, icinga_macros *mac);
+extern int wproc_destroy(worker_process *wp, int flags);
 #endif
diff --git a/lib/worker.h b/lib/worker.h
index 4efa2e3..bc9b0c9 100644
--- a/lib/worker.h
+++ b/lib/worker.h
@@ -55,6 +55,8 @@
 #define PAIR_SEP 0 				/**< pair separator for buf2kvvec() and kvvec2buf() */
 #define KV_SEP '=' 				/**< key/value separator for buf2kvvec() and kvvec2buf() */
 
+struct worker_process;
+
 /** Worker job data */
 typedef struct worker_job {
 	int id;         /**< job id */
@@ -62,10 +64,12 @@ typedef struct worker_job {
 	time_t timeout; /**< timeout, in absolute time */
 	char *command;  /**< command string for this job */
 	void *arg;      /**< any random argument */
+	struct worker_process *wp; /**< worker process running this job */
 } worker_job;
 
 /** A worker process as seen from its controller */
 typedef struct worker_process {
+	const char* type;		/**< identifying typename of this worker */
 	int sd;    			/**< communication socket */
 	pid_t pid; 			/**< pid */
 	int max_jobs; 			/**< Max number of jobs we can handle */





More information about the icinga-checkins mailing list