[icinga-checkins] icinga.org: icinga-core/next: Make database transactions more coarse.

git at icinga.org git at icinga.org
Sat Jun 22 17:44:42 CEST 2013


Module: icinga-core
Branch: next
Commit: 3f2b48a14c1503ecf33b4048c2754b2d2fc6e442
URL:    https://git.icinga.org/?p=icinga-core.git;a=commit;h=3f2b48a14c1503ecf33b4048c2754b2d2fc6e442

Author: Gunnar Beutner <gunnar.beutner at netways.de>
Date:   Mon Apr 29 13:40:11 2013 +0200

Make database transactions more coarse.

---

 module/idoutils/include/ido2db.h |   11 +++-
 module/idoutils/src/ido2db.c     |  138 +++++++++++++++++++++++++++++++-------
 2 files changed, 122 insertions(+), 27 deletions(-)

diff --git a/module/idoutils/include/ido2db.h b/module/idoutils/include/ido2db.h
index 73ecd87..fcb409a 100644
--- a/module/idoutils/include/ido2db.h
+++ b/module/idoutils/include/ido2db.h
@@ -257,12 +257,19 @@ typedef struct ido2db_input_data_info_struct{
 	ido2db_dbconninfo dbinfo;
         }ido2db_idi;
 
+typedef struct ido2db_proxy_struct {
+	pthread_mutex_t mutex;
+	size_t size_left;
+	size_t size_right;
+	int refs;
+	}ido2db_proxy;
+
 typedef struct ido2db_proxy_args_struct{
 	int fd_left;
 	int fd_right;
+	ido2db_proxy *proxy;
 	}ido2db_proxy_args;
 
-
 /*************** DB server types ***************/
 #define IDO2DB_DBSERVER_NONE                            0
 #define IDO2DB_DBSERVER_MYSQL                           1
@@ -427,7 +434,7 @@ int ido2db_free_connection_memory(ido2db_idi *);
 
 /* client connection */
 int ido2db_wait_for_connections(void);
-int ido2db_handle_client_connection(int);
+int ido2db_handle_client_connection(int, ido2db_proxy *);
 int ido2db_idi_init(ido2db_idi *);
 int ido2db_check_for_client_input(ido2db_idi *);
 int ido2db_handle_client_input(ido2db_idi *,char *);
diff --git a/module/idoutils/src/ido2db.c b/module/idoutils/src/ido2db.c
index 2a0777a..92688b2 100644
--- a/module/idoutils/src/ido2db.c
+++ b/module/idoutils/src/ido2db.c
@@ -311,7 +311,7 @@ int main(int argc, char **argv) {
 		open("/dev/null", O_WRONLY);
 
 		/* handle the connection */
-		ido2db_handle_client_connection(0);
+		ido2db_handle_client_connection(0, NULL);
 	}
 
 	/* standalone daemon... */
@@ -1042,7 +1042,7 @@ void ido2db_child_sighandler(int sig) {
 /* UTILITY FUNCTIONS                                                        */
 /****************************************************************************/
 
-static int ido2db_proxy_fill_buffer(void **buffer, size_t *size, int fd) {
+static int ido2db_proxy_fill_buffer(void **buffer, size_t *size, size_t *iostats, int fd) {
 	size_t offset = *size;
 	char temp[4096];
 	int rc;
@@ -1056,10 +1056,13 @@ static int ido2db_proxy_fill_buffer(void **buffer, size_t *size, int fd) {
 	*buffer = realloc(*buffer, *size);
 	memcpy((char *)*buffer + offset, temp, rc);
 
+	if (iostats)
+		*iostats += rc;
+
 	return 0;
 }
 
-static int ido2db_proxy_flush_buffer(void **buffer, size_t *size, int fd) {
+static int ido2db_proxy_flush_buffer(void **buffer, size_t *size, size_t *iostats, int fd) {
 	int rc;
 	void *new_buffer;
 
@@ -1074,14 +1077,32 @@ static int ido2db_proxy_flush_buffer(void **buffer, size_t *size, int fd) {
 	*buffer = new_buffer;
 	*size -= rc;
 
+	if (iostats)
+		*iostats += rc;
+
 	return 0;
 }
 
+static void ido2db_proxy_free(ido2db_proxy *proxy) {
+	int refs;
+
+	pthread_mutex_lock(&(proxy->mutex));
+	proxy->refs--;
+	refs = proxy->refs;
+	pthread_mutex_unlock(&(proxy->mutex));
+
+	if (refs == 0) {
+		pthread_mutex_destroy(&(proxy->mutex));
+		free(proxy);
+	}
+}
+
 static void *ido2db_proxy_thread_proc(void *pargs) {
 	ido2db_proxy_args args = *(ido2db_proxy_args *)pargs;
+	ido2db_proxy *proxy = args.proxy;
 	fd_set readfds, writefds, exceptfds;
 	void *buffer_left = NULL, *buffer_right = NULL;
-	size_t size_left = 0, size_right = 0;
+	size_t size_left = 0, size_right = 0, iostats = 0;
 	int flags, max_fd;
 	time_t now;
 
@@ -1131,26 +1152,31 @@ static void *ido2db_proxy_thread_proc(void *pargs) {
 			break;
 
 		if (FD_ISSET(args.fd_left, &writefds))
-			if (ido2db_proxy_flush_buffer(&buffer_right, &size_right, args.fd_left) < 0)
+			if (ido2db_proxy_flush_buffer(&buffer_right, &size_right, &iostats, args.fd_left) < 0)
 				break;
 
 		if (FD_ISSET(args.fd_right, &writefds))
-			if (ido2db_proxy_flush_buffer(&buffer_left, &size_left, args.fd_right) < 0)
+			if (ido2db_proxy_flush_buffer(&buffer_left, &size_left, &iostats, args.fd_right) < 0)
 				break;
 
 		time(&now);
 		if (ido2db_proxy_last_report < now - 15 && (size_left > 0 || size_right > 0)) {
-			syslog(LOG_INFO, "IDO2DB buffer sizes: left=%d, right=%d\n", (int)size_left, (int)size_right);
+			syslog(LOG_INFO, "IDO2DB proxy stats (p=%p): left=%d, right=%d; iostats=%d\n", proxy, (int)size_left, (int)size_right, (int)(iostats + size_left + size_right) / 2);
 			ido2db_proxy_last_report = now;
 		}
 
 		if (FD_ISSET(args.fd_left, &readfds))
-			if (ido2db_proxy_fill_buffer(&buffer_left, &size_left, args.fd_left) < 0)
+			if (ido2db_proxy_fill_buffer(&buffer_left, &size_left, &iostats, args.fd_left) < 0)
 				break;
 
 		if (FD_ISSET(args.fd_right, &readfds))
-			if (ido2db_proxy_fill_buffer(&buffer_right, &size_right, args.fd_right) < 0)
+			if (ido2db_proxy_fill_buffer(&buffer_right, &size_right, &iostats, args.fd_right) < 0)
 				break;
+
+		pthread_mutex_lock(&proxy->mutex);
+		proxy->size_left = size_left;
+		proxy->size_right = size_right;
+		pthread_mutex_unlock(&proxy->mutex);
 	}
 
 	shutdown(args.fd_left, SHUT_RDWR);
@@ -1162,12 +1188,61 @@ static void *ido2db_proxy_thread_proc(void *pargs) {
 	free(buffer_left);
 	free(buffer_right);
 
+	ido2db_proxy_free(proxy);
+
 	return NULL;
 }
 
+static ido2db_proxy *ido2db_proxy_new(int fd_left, int fd_right) {
+	pthread_t tid;
+
+	ido2db_proxy *proxy = (ido2db_proxy *)malloc(sizeof(ido2db_proxy));
+	pthread_mutex_init(&(proxy->mutex), NULL);
+	proxy->size_left = 0;
+	proxy->size_right = 0;
+	proxy->refs = 2;
+
+	ido2db_proxy_args *pa = (ido2db_proxy_args *)malloc(sizeof(ido2db_proxy_args));
+	pa->fd_left = fd_left;
+	pa->fd_right = fd_right;
+	pa->proxy = proxy;
+
+	if (pthread_create(&tid, NULL, ido2db_proxy_thread_proc, pa) < 0) {
+		proxy->refs = 1;
+		ido2db_proxy_free(proxy);
+		free(pa);
+		return NULL;
+	}
+
+	(void) pthread_detach(tid);
+
+	return proxy;
+}
+
+static size_t ido2db_proxy_get_size_left(ido2db_proxy *proxy) {
+	size_t result;
+
+	pthread_mutex_lock(&(proxy->mutex));
+	result = proxy->size_left;
+	pthread_mutex_unlock(&(proxy->mutex));
+
+	return result;
+}
+
+static size_t ido2db_proxy_get_size_right(ido2db_proxy *proxy) {
+	size_t result;
+
+	pthread_mutex_lock(&(proxy->mutex));
+	result = proxy->size_right;
+	pthread_mutex_unlock(&(proxy->mutex));
+
+	return result;
+}
+
 int ido2db_wait_for_connections(void) {
 	int sd_flag = 1;
 	int new_sd = 0;
+	ido2db_proxy *proxy;
 	pid_t new_pid = -1;
 	struct sockaddr_un server_address_u;
 	struct sockaddr_in server_address_i;
@@ -1260,17 +1335,12 @@ int ido2db_wait_for_connections(void) {
 			int fds[2];
 
 			new_sd = accept(ido2db_sd, (ido2db_socket_type == IDO_SINK_TCPSOCKET) ? (struct sockaddr *)&client_address_i : (struct sockaddr *)&client_address_u, (socklen_t *)&client_address_length);
+			proxy = NULL;
 
 			if (socketpair(AF_UNIX, SOCK_STREAM, PF_UNIX, fds) == 0) {
-				pthread_t tid;
-
-				ido2db_proxy_args *pa = (ido2db_proxy_args *)malloc(sizeof(ido2db_proxy_args));
-				pa->fd_left = new_sd;
-				pa->fd_right = fds[0];
-
-				if (pthread_create(&tid, NULL, ido2db_proxy_thread_proc, pa) == 0) {
-					(void) pthread_detach(tid);
+				proxy = ido2db_proxy_new(new_sd, fds[0]);
 
+				if (proxy) {
 					new_sd = fds[1];
 				} else {
 					close(fds[0]);
@@ -1313,10 +1383,13 @@ int ido2db_wait_for_connections(void) {
 
 			case 0:
 				/* child processes data... */
-				ido2db_handle_client_connection(new_sd);
+				ido2db_handle_client_connection(new_sd, proxy);
 
 				/* close socket when we're done */
 				close(new_sd);
+
+				ido2db_proxy_free(proxy);
+
 				return IDO_OK;
 				break;
 
@@ -1327,10 +1400,12 @@ int ido2db_wait_for_connections(void) {
 			}
 		} else {
 			/* child processes data... */
-			ido2db_handle_client_connection(new_sd);
+			ido2db_handle_client_connection(new_sd, proxy);
 
 			/* close socket when we're done */
 			close(new_sd);
+
+			ido2db_proxy_free(proxy);
 		}
 
 #ifdef DEBUG_IDO2DB_EXIT_AFTER_CONNECTION
@@ -1347,12 +1422,13 @@ int ido2db_wait_for_connections(void) {
 }
 
 
-int ido2db_handle_client_connection(int sd) {
+int ido2db_handle_client_connection(int sd, ido2db_proxy *proxy) {
 	int dbuf_chunk = 2048;
 	ido2db_idi idi;
 	char buf[16 * 1024];
 	int result = 0;
 	int error = IDO_FALSE;
+	int in_transaction = 0, io_since_last_commit = 0;
 
 	int pthread_ret = 0;
 	//sigset_t newmask;
@@ -1533,14 +1609,28 @@ int ido2db_handle_client_connection(int sd) {
 		/* 2011-02-23 MF: only do that in a worker thread */
 		/* 2011-05-02 MF: redo it the old way */
 
-		result = ido2db_db_tx_begin(&idi);
+		io_since_last_commit += result;
+
+		result = IDO_OK;
+
+		if (!in_transaction)
+			result = ido2db_db_tx_begin(&idi);
 
 		ido2db_check_for_client_input(&idi);
 
 		if (result == IDO_OK) {
-			if (ido2db_db_tx_commit(&idi) != IDO_OK) {
-				syslog(LOG_ERR, "IDO2DB commit failed. Some data may have been lost.\n");
+			in_transaction = (proxy && ido2db_proxy_get_size_left(proxy) > 16 * 1024);
+
+			if (io_since_last_commit > 1024 * 1024)
+				in_transaction = 0;
+
+			if (!in_transaction) {
+				io_since_last_commit = 0;
+				printf("Committing...\n");
 			}
+
+			if (!in_transaction && ido2db_db_tx_commit(&idi) != IDO_OK)
+				syslog(LOG_ERR, "IDO2DB commit failed. Some data may have been lost.\n");
 		}
 
 		/* should we disconnect the client? */
@@ -1664,7 +1754,6 @@ int ido2db_check_for_client_input(ido2db_idi *idi) {
 			dbuf.buf[x] = '\x0';
 
 			if ((buf = strdup(dbuf.buf))) {
-
 				ido2db_handle_client_input(idi, buf);
 
 				free(buf);
@@ -2970,4 +3059,3 @@ int terminate_cleanup_thread(void) {
 	return IDO_OK;
 
 }
-





More information about the icinga-checkins mailing list