add worker thread logic

This commit is contained in:
Christian Grothoff 2020-11-15 16:17:57 +01:00
parent 9f60c8c041
commit 736997ffe8
No known key found for this signature in database
GPG Key ID: 939E6BE1E29FC3CC

View File

@ -38,13 +38,15 @@
* assigned and collected by the main thread).
*
* TODO:
* - actual networking
* - networking: sending signature replies
* - actual signing
*/
#include "platform.h"
#include "taler_util.h"
#include "taler-helper-crypto-rsa.h"
#include <gcrypt.h>
#include <pthread.h>
#include <sys/eventfd.h>
/**
@ -160,6 +162,12 @@ struct Denomination
};
/**
* Actively worked on client request.
*/
struct WorkItem;
/**
* Information we keep for a client connected to us.
*/
@ -176,6 +184,11 @@ struct Client
*/
struct Client *prev;
/**
* Work created by this client, NULL for none.
*/
struct WorkItem *work;
/**
* Client socket.
*/
@ -186,6 +199,44 @@ struct Client
*/
struct GNUNET_SCHEDULER_Task *task;
/**
* Flag set to true if this client has disconnected. Used
* by the workers to detect that they must free the client
* instead of returning the result.
*/
bool gone;
};
struct WorkItem
{
/**
* Kept in a DLL.
*/
struct WorkItem *next;
/**
* Kept in a DLL.
*/
struct WorkItem *prev;
/**
* The client that created the request.
*/
struct Client *client;
/**
* Key to be used for this operation.
*/
struct DenominationKey *dk;
/**
* Hash of the value to sign (FDH still to be computed!).
*/
struct GNUNET_HashCode h_message;
};
@ -274,6 +325,138 @@ static struct Client *clients_head;
*/
static struct Client *clients_tail;
/**
* Head of DLL with pending signing operations.
*/
static struct WorkItem *work_head;
/**
* Tail of DLL with pending signing operations.
*/
static struct WorkItem *work_tail;
/**
* Lock for the work queue.
*/
static pthread_mutex_t work_lock;
/**
* Condition variable for the semaphore of the work queue.
*/
static pthread_cond_t work_cond = PTHREAD_COND_INITIALIZER;
/**
* Number of items in the work queue. Also used as the semaphore counter.
*/
static unsigned long long work_counter;
/**
* Head of DLL with completed signing operations.
*/
static struct WorkItem *done_head;
/**
* Tail of DLL with completed signing operations.
*/
static struct WorkItem *done_tail;
/**
* Lock for the done queue.
*/
static pthread_mutex_t done_lock;
/**
* Task waiting for work to be done.
*/
static struct GNUNET_SCHEDULER_Task *done_task;
/**
* Signal used by threads to notify the #done_task that they
* completed work that is now in the done queue.
*/
static struct GNUNET_NETWORK_Handle *done_signal;
/**
* Set once we are in shutdown and workers should terminate.
*/
static volatile bool in_shutdown;
/**
* Array of #num_worker sign_worker() threads.
*/
static pthread_t *workers;
/**
* Length of the #workers array.
*/
static unsigned int num_workers;
/**
* Function that performs the actual signature for the work @a wi
*
* @param[in,out] wi signature work we should do
*/
static void
do_sign (struct WorkItem *wi)
{
// FIXME!
}
/**
* Main function of a worker thread that signs.
*
* @param cls NULL
* @return NULL
*/
static void *
sign_worker (void *cls)
{
(void) cls;
GNUNET_assert (0 == pthread_mutex_lock (&work_lock));
while (! in_shutdown)
{
struct WorkItem *wi;
while (NULL != (wi = work_head))
{
/* take work from queue */
GNUNET_CONTAINER_DLL_remove (work_head,
work_tail,
wi);
work_counter--;
GNUNET_assert (0 == pthread_mutex_unlock (&work_lock));
do_sign (wi);
/* put completed work into done queue */
GNUNET_assert (0 == pthread_mutex_lock (&done_lock));
GNUNET_CONTAINER_DLL_insert (done_head,
done_tail,
wi);
GNUNET_assert (0 == pthread_mutex_unlock (&done_lock));
{
uint64_t val = GNUNET_htonll (1);
/* raise #done_signal */
if (sizeof(val) !=
GNUNET_NETWORK_socket_send (done_signal,
&val,
sizeof (val)))
GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING,
"send(eventfd)");
}
GNUNET_assert (0 == pthread_mutex_lock (&work_lock));
}
/* queue is empty, wait for work */
GNUNET_assert (0 ==
pthread_cond_wait (&work_cond,
&work_lock));
}
GNUNET_assert (0 ==
pthread_mutex_unlock (&work_lock));
return NULL;
}
/**
* Free @a client, releasing all (remaining) state.
@ -289,10 +472,14 @@ free_client (struct Client *client)
client->task = NULL;
}
GNUNET_NETWORK_socket_close (client->sock);
client->sock = NULL;
GNUNET_CONTAINER_DLL_remove (clients_head,
clients_tail,
client);
GNUNET_free (client);
if (NULL != client->work)
client->gone = true;
else
GNUNET_free (client);
}
@ -337,13 +524,84 @@ free_dk (struct DenominationKey *dk)
/**
* Process completed tasks that are in the #done_head queue, sending
* the result back to the client (and resuming the client).
*
* @param cls NULL
*/
static void
handle_done (void *cls)
{
uint64_t data;
(void) cls;
/* consume #done_signal */
if (sizeof (data) !=
GNUNET_NETWORK_socket_recv (done_signal,
&data,
sizeof (data)))
GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING,
"recv(eventfd)");
done_task = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
done_signal,
&handle_done,
NULL);
GNUNET_assert (0 == pthread_mutex_lock (&done_lock));
while (NULL != done_head)
{
struct WorkItem *wi = done_head;
GNUNET_CONTAINER_DLL_remove (done_head,
done_tail,
wi);
GNUNET_assert (0 == pthread_mutex_unlock (&done_lock));
// FIXME: send response to client!
GNUNET_free (wi);
GNUNET_assert (0 == pthread_mutex_lock (&done_lock));
}
GNUNET_assert (0 == pthread_mutex_unlock (&done_lock));
}
/**
* Handle @a client request @a sr to create signature. Create the
* signature using the respective key and return the result to
* the client.
*
* @param client the client making the request
* @param sr the request details
*/
static void
handle_sign_request (struct Client *client,
const struct TALER_CRYPTO_SignRequest *sr)
{
// FIXME ...
struct DenominationKey *dk;
struct WorkItem *wi;
dk = GNUNET_CONTAINER_multihashmap_get (keys,
&sr->h_denom_pub);
if (NULL == dk)
{
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Signing request failed, denomination key unknown\n");
// FIXME: send failure response to client!
client_next (client);
return;
}
wi = GNUNET_new (struct WorkItem);
wi->client = client;
wi->dk = dk;
dk->rc++;
wi->h_message = sr->h_message;
GNUNET_assert (0 == pthread_mutex_lock (&work_lock));
work_counter++;
GNUNET_CONTAINER_DLL_insert (work_head,
work_tail,
wi);
GNUNET_assert (0 == pthread_mutex_unlock (&work_lock));
GNUNET_assert (0 == pthread_cond_signal (&work_cond));
}
@ -1364,6 +1622,25 @@ do_shutdown (void *cls)
GNUNET_SCHEDULER_cancel (keygen_task);
keygen_task = NULL;
}
if (NULL != done_task)
{
GNUNET_SCHEDULER_cancel (done_task);
done_task = NULL;
}
/* shut down worker threads */
GNUNET_assert (0 == pthread_mutex_lock (&work_lock));
in_shutdown = true;
GNUNET_assert (0 == pthread_cond_broadcast (&work_cond));
GNUNET_assert (0 == pthread_mutex_unlock (&work_lock));
for (unsigned int i = 0; i<num_workers; i++)
GNUNET_assert (0 == pthread_join (workers[i],
NULL));
if (NULL != done_signal)
{
GNUNET_break (GNUNET_OK ==
GNUNET_NETWORK_socket_close (done_signal));
done_signal = NULL;
}
}
@ -1518,6 +1795,38 @@ run (void *cls,
/* start job to keep keys up-to-date */
keygen_task = GNUNET_SCHEDULER_add_now (&update_denominations,
NULL);
/* start job to handle completed work */
{
int fd;
fd = eventfd (0,
EFD_NONBLOCK | EFD_CLOEXEC);
if (-1 == fd)
{
GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR,
"eventfd");
global_ret = 6;
GNUNET_SCHEDULER_shutdown ();
return;
}
done_signal = GNUNET_NETWORK_socket_box_native (fd);
}
done_task = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
done_signal,
&handle_done,
NULL);
/* start crypto workers */
num_workers = 1; // for now...
workers = GNUNET_new_array (num_workers,
pthread_t);
for (unsigned int i = 0; i<num_workers; i++)
GNUNET_assert (0 ==
pthread_create (&workers[i],
NULL,
&sign_worker,
NULL));
}