Skip to content

Commit

Permalink
A worker pool is only needed for rpc instances that serve call backs
Browse files Browse the repository at this point in the history
Since the glib-2.0 patch
4d2e77a55 GThreadPool: Always use the thread-spawning thread for the global shared thread pool
GNOME/glib@4d2e77a
creating a threadpool spawns a new thread. Previously, this was only the
case for non-Linux platforms.
This can cause issues for daemons that fork and depend on libraries that
load apteryx on LT_INIT, as the glib threadpool spawned thread will not
be copied on the fork.
Additionally, now libraries that only use get calls will be running an
extra thread from the threadpool.

Move the threadpool creation into request_cb() to only spawn the pool
managing thread for applications that require callbacks using workers.
  • Loading branch information
coledishington authored and carlgsmith committed Apr 3, 2024
1 parent 13ecc9f commit 21bffd3
Showing 1 changed file with 22 additions and 26 deletions.
48 changes: 22 additions & 26 deletions rpc.c
Original file line number Diff line number Diff line change
Expand Up @@ -354,20 +354,15 @@ request_cb (rpc_socket sock, rpc_id id, void *buffer, size_t len)
submit_slow_work (rpc, work, 0);
pthread_mutex_unlock (&rpc->lock);
}
else if (rpc->workers)
g_thread_pool_push (rpc->workers, work, NULL);
else
goto error;

return;

error:
if (work)
{
g_free (work);
if (!rpc->workers)
{
rpc->workers = g_thread_pool_new ((GFunc)worker_func, (gpointer)&rpc->worker_sigmask,
8, FALSE, NULL);
}
g_thread_pool_push (rpc->workers, work, NULL);
}
rpc_socket_deref (sock);
return;
}

rpc_instance
Expand Down Expand Up @@ -396,8 +391,6 @@ rpc_init (int timeout, bool reuse_sock, rpc_msg_handler handler)
rpc->handler = handler;
rpc->server = server;
rpc->clients = g_hash_table_new (g_str_hash, g_str_equal);
rpc->workers = g_thread_pool_new ((GFunc)worker_func, (gpointer)&rpc->worker_sigmask,
8, FALSE, NULL);

DEBUG ("RPC: New Instance (%p)\n", rpc);
return rpc;
Expand Down Expand Up @@ -458,23 +451,26 @@ rpc_shutdown (rpc_instance rpc)
DEBUG ("RPC: Shutdown Instance (%p)\n", rpc);

/* Need to wait until all threads are cleaned up */
for (i=0; i<10; i++)
if (rpc->workers)
{
g_thread_pool_stop_unused_threads ();
if (g_thread_pool_unprocessed (rpc->workers) == 0 &&
g_thread_pool_get_num_threads (rpc->workers) == 0 &&
g_thread_pool_get_num_unused_threads () == 0)
{
break;
}
else if (i >= 9)
for (i=0; i<10; i++)
{
ERROR ("RPC: Worker threads not shutting down\n");
g_thread_pool_stop_unused_threads ();
if (g_thread_pool_unprocessed (rpc->workers) == 0 &&
g_thread_pool_get_num_threads (rpc->workers) == 0 &&
g_thread_pool_get_num_unused_threads () == 0)
{
break;
}
else if (i >= 9)
{
ERROR ("RPC: Worker threads not shutting down\n");
}
g_usleep (RPC_TIMEOUT_US / 10);
}
g_usleep (RPC_TIMEOUT_US / 10);
g_thread_pool_free (rpc->workers, FALSE, TRUE);
rpc->workers = NULL;
}
g_thread_pool_free (rpc->workers, FALSE, TRUE);
rpc->workers = NULL;
if (rpc->slow_loop)
{
for (i=0; i<10; i++)
Expand Down

0 comments on commit 21bffd3

Please sign in to comment.