make use of r36977/78 API improvement in GNUnet

This commit is contained in:
Christian Grothoff 2016-04-04 14:04:44 +02:00
parent 9909a04d5b
commit 7401433644
3 changed files with 387 additions and 200 deletions

View File

@ -121,7 +121,7 @@ handle_mhd_completion_callback (void *cls,
/**
* Handle a request coming from libmicrohttpd.
* Handle incoming HTTP request.
*
* @param cls closure for MHD daemon (unused)
* @param connection the connection

View File

@ -24,122 +24,18 @@
#include "platform.h"
#include <gnunet/gnunet_util_lib.h>
#include <gnunet/gnunet_json_lib.h>
#include "taler_json_lib.h"
#include "taler-exchange-httpd_parsing.h"
#include "taler-exchange-httpd_responses.h"
/**
* Initial size for POST request buffer.
*/
#define REQUEST_BUFFER_INITIAL (2*1024)
/**
* Maximum POST request size.
*/
#define REQUEST_BUFFER_MAX (1024*1024)
/**
* Buffer for POST requests.
*/
struct Buffer
{
/**
* Allocated memory
*/
char *data;
/**
* Number of valid bytes in buffer.
*/
size_t fill;
/**
* Number of allocated bytes in buffer.
*/
size_t alloc;
};
/**
* Initialize a buffer.
*
* @param buf the buffer to initialize
* @param data the initial data
* @param data_size size of the initial data
* @param alloc_size size of the buffer
* @param max_size maximum size that the buffer can grow to
* @return a GNUnet result code
*/
static int
buffer_init (struct Buffer *buf,
const void *data,
size_t data_size,
size_t alloc_size,
size_t max_size)
{
if (data_size > max_size || alloc_size > max_size)
return GNUNET_SYSERR;
if (data_size > alloc_size)
alloc_size = data_size;
buf->data = GNUNET_malloc (alloc_size);
memcpy (buf->data, data, data_size);
return GNUNET_OK;
}
/**
* Free the data in a buffer. Does *not* free
* the buffer object itself.
*
* @param buf buffer to de-initialize
*/
static void
buffer_deinit (struct Buffer *buf)
{
GNUNET_free (buf->data);
buf->data = NULL;
}
/**
* Append data to a buffer, growing the buffer if necessary.
*
* @param buf the buffer to append to
* @param data the data to append
* @param data_size the size of @a data
* @param max_size maximum size that the buffer can grow to
* @return #GNUNET_OK on success,
* #GNUNET_NO if the buffer can't accomodate for the new data
*/
static int
buffer_append (struct Buffer *buf,
const void *data,
size_t data_size,
size_t max_size)
{
if (buf->fill + data_size > max_size)
return GNUNET_NO;
if (data_size + buf->fill > buf->alloc)
{
char *new_buf;
size_t new_size = buf->alloc;
while (new_size < buf->fill + data_size)
new_size += 2;
if (new_size > max_size)
return GNUNET_NO;
new_buf = GNUNET_malloc (new_size);
memcpy (new_buf, buf->data, buf->fill);
GNUNET_free (buf->data);
buf->data = new_buf;
buf->alloc = new_size;
}
memcpy (buf->data + buf->fill, data, data_size);
buf->fill += data_size;
return GNUNET_OK;
}
/**
* Process a POST request containing a JSON object. This function
@ -171,75 +67,37 @@ TMH_PARSE_post_json (struct MHD_Connection *connection,
size_t *upload_data_size,
json_t **json)
{
struct Buffer *r = *con_cls;
enum GNUNET_JSON_PostResult pr;
*json = NULL;
if (NULL == *con_cls)
pr = GNUNET_JSON_post_parser (REQUEST_BUFFER_MAX,
con_cls,
upload_data,
upload_data_size,
json);
switch (pr)
{
/* We are seeing a fresh POST request. */
r = GNUNET_new (struct Buffer);
if (GNUNET_OK !=
buffer_init (r,
upload_data,
*upload_data_size,
REQUEST_BUFFER_INITIAL,
REQUEST_BUFFER_MAX))
{
*con_cls = NULL;
buffer_deinit (r);
GNUNET_free (r);
return (MHD_NO ==
TMH_RESPONSE_reply_internal_error (connection,
"out of memory"))
? GNUNET_SYSERR : GNUNET_NO;
}
/* everything OK, wait for more POST data */
*upload_data_size = 0;
*con_cls = r;
case GNUNET_JSON_PR_OUT_OF_MEMORY:
return (MHD_NO ==
TMH_RESPONSE_reply_internal_error (connection,
"out of memory"))
? GNUNET_SYSERR : GNUNET_NO;
case GNUNET_JSON_PR_CONTINUE:
return GNUNET_YES;
}
if (0 != *upload_data_size)
{
/* We are seeing an old request with more data available. */
if (GNUNET_OK !=
buffer_append (r,
upload_data,
*upload_data_size,
REQUEST_BUFFER_MAX))
{
/* Request too long */
*con_cls = NULL;
buffer_deinit (r);
GNUNET_free (r);
return (MHD_NO ==
TMH_RESPONSE_reply_request_too_large (connection))
? GNUNET_SYSERR : GNUNET_NO;
}
/* everything OK, wait for more POST data */
*upload_data_size = 0;
return GNUNET_YES;
}
/* We have seen the whole request. */
*json = json_loadb (r->data,
r->fill,
0,
NULL);
if (NULL == *json)
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"Failed to parse JSON request body\n");
case GNUNET_JSON_PR_REQUEST_TOO_LARGE:
return (MHD_NO ==
TMH_RESPONSE_reply_request_too_large (connection))
? GNUNET_SYSERR : GNUNET_NO;
case GNUNET_JSON_PR_JSON_INVALID:
return (MHD_YES ==
TMH_RESPONSE_reply_invalid_json (connection))
? GNUNET_NO : GNUNET_SYSERR;
case GNUNET_JSON_PR_SUCCESS:
GNUNET_break (NULL != *json);
return GNUNET_YES;
}
buffer_deinit (r);
GNUNET_free (r);
*con_cls = NULL;
return GNUNET_YES;
/* this should never happen */
GNUNET_break (0);
return GNUNET_SYSERR;
}
@ -253,13 +111,7 @@ TMH_PARSE_post_json (struct MHD_Connection *connection,
void
TMH_PARSE_post_cleanup_callback (void *con_cls)
{
struct Buffer *r = con_cls;
if (NULL != r)
{
buffer_deinit (r);
GNUNET_free (r);
}
GNUNET_JSON_post_parser_cleanup (con_cls);
}

View File

@ -22,6 +22,71 @@
#include "platform.h"
#include "taler_util.h"
#include "taler_exchangedb_plugin.h"
#include <microhttpd.h>
/**
* Commands for the interpreter.
*/
enum OpCode {
/**
* Terminate testcase with 'skipped' result.
*/
OPCODE_TERMINATE_SKIP,
/**
* Run taler-exchange-aggregator.
*/
OPCODE_RUN_AGGREGATOR,
/**
* Finish testcase with success.
*/
OPCODE_TERMINATE_SUCCESS
};
/**
* Command state for the interpreter.
*/
struct Command
{
enum OpCode opcode;
};
/**
* State of the interpreter.
*/
struct State
{
/**
* Array of commands to run.
*/
struct Command* commands;
/**
* Offset of the next command to be run.
*/
unsigned int ioff;
};
/**
* Pipe used to communicate child death via signal.
*/
static struct GNUNET_DISK_PipeHandle *sigpipe;
/**
* ID of task called whenever we get a SIGCHILD.
*/
static struct GNUNET_SCHEDULER_Task *child_death_task;
/**
* ID of task called whenever are shutting down.
*/
static struct GNUNET_SCHEDULER_Task *shutdown_task;
/**
* Return value from main().
@ -33,26 +98,257 @@ static int result;
*/
static char *config_filename;
/**
* Database plugin.
*/
static struct TALER_EXCHANGEDB_Plugin *plugin;
/**
* Runs the aggregator process.
* Our session with the database.
*/
static struct TALER_EXCHANGEDB_Session *session;
/**
* The handle for the aggregator process that we are testing.
*/
static struct GNUNET_OS_Process *aggregator_proc;
/**
* State of our interpreter while we are running the aggregator
* process.
*/
static struct State *aggregator_state;
/**
* HTTP server we run to pretend to be the "test" bank.
*/
static struct MHD_Daemon *mhd_bank;
/**
* Task running HTTP server for the "test" bank.
*/
static struct GNUNET_SCHEDULER_Task *mhd_task;
/**
* Interprets the commands from the test program.
*
* @param cls the `struct State` of the interpreter
* @param tc scheduler context
*/
static void
run_aggregator ()
{
struct GNUNET_OS_Process *proc;
interpreter (void *cls,
const struct GNUNET_SCHEDULER_TaskContext *tc);
proc = GNUNET_OS_start_process (GNUNET_NO,
GNUNET_OS_INHERIT_STD_ALL,
NULL, NULL, NULL,
"taler-exchange-aggregator",
"taler-exchange-aggregator",
/* "-c", config_filename, */
"-d", "test-exchange-home",
"-t", /* enable temporary tables */
NULL);
GNUNET_OS_process_wait (proc);
GNUNET_OS_process_destroy (proc);
/**
* Task triggered whenever we are to shutdown.
*
* @param cls closure, NULL if we need to self-restart
* @param tc context
*/
static void
shutdown_action (void *cls,
const struct GNUNET_SCHEDULER_TaskContext *tc)
{
shutdown_task = NULL;
if (NULL != mhd_task)
{
GNUNET_SCHEDULER_cancel (mhd_task);
mhd_task = NULL;
}
if (NULL != mhd_bank)
{
MHD_stop_daemon (mhd_bank);
mhd_bank = NULL;
}
if (NULL == aggregator_proc)
{
GNUNET_SCHEDULER_cancel (child_death_task);
child_death_task = NULL;
}
else
{
GNUNET_break (0 == GNUNET_OS_process_kill (aggregator_proc,
SIGKILL));
}
plugin->drop_temporary (plugin->cls,
session);
TALER_EXCHANGEDB_plugin_unload (plugin);
plugin = NULL;
}
/**
* Task triggered whenever we receive a SIGCHLD (child
* process died).
*
* @param cls closure, NULL if we need to self-restart
* @param tc context
*/
static void
maint_child_death (void *cls,
const struct GNUNET_SCHEDULER_TaskContext *tc)
{
const struct GNUNET_DISK_FileHandle *pr;
char c[16];
struct State *state;
child_death_task = NULL;
pr = GNUNET_DISK_pipe_handle (sigpipe, GNUNET_DISK_PIPE_END_READ);
if (0 == (tc->reason & GNUNET_SCHEDULER_REASON_READ_READY))
{
/* shutdown scheduled us, ignore! */
child_death_task =
GNUNET_SCHEDULER_add_read_file (GNUNET_TIME_UNIT_FOREVER_REL,
pr,
&maint_child_death,
NULL);
return;
}
GNUNET_break (0 < GNUNET_DISK_file_read (pr, &c, sizeof (c)));
GNUNET_OS_process_wait (aggregator_proc);
GNUNET_OS_process_destroy (aggregator_proc);
aggregator_proc = NULL;
aggregator_state->ioff++;
state = aggregator_state;
aggregator_state = NULL;
interpreter (state, NULL);
if (NULL == shutdown_task)
return;
child_death_task = GNUNET_SCHEDULER_add_read_file (GNUNET_TIME_UNIT_FOREVER_REL,
pr,
&maint_child_death, NULL);
}
/**
* Interprets the commands from the test program.
*
* @param cls the `struct State` of the interpreter
* @param tc scheduler context
*/
static void
interpreter (void *cls,
const struct GNUNET_SCHEDULER_TaskContext *tc)
{
struct State *state = cls;
struct Command *cmd = &state->commands[state->ioff];
switch (cmd->opcode)
{
case OPCODE_TERMINATE_SKIP:
/* return skip: test not finished, but did not fail either */
result = 77;
GNUNET_SCHEDULER_shutdown ();
return;
case OPCODE_RUN_AGGREGATOR:
GNUNET_assert (NULL == aggregator_state);
aggregator_state = state;
aggregator_proc
= GNUNET_OS_start_process (GNUNET_NO,
GNUNET_OS_INHERIT_STD_ALL,
NULL, NULL, NULL,
"taler-exchange-aggregator",
"taler-exchange-aggregator",
/* "-c", config_filename, */
"-d", "test-exchange-home",
"-t", /* enable temporary tables */
NULL);
return;
case OPCODE_TERMINATE_SUCCESS:
result = 0;
GNUNET_SCHEDULER_shutdown ();
return;
}
}
/**
* Contains the test program. Here each step of the testcase
* is defined.
*/
static void
run_test ()
{
static struct Command commands[] = {
/* FIXME: prime DB */
{
.opcode = OPCODE_RUN_AGGREGATOR
},
{
.opcode = OPCODE_TERMINATE_SKIP
}
};
static struct State state = {
.commands = commands
};
GNUNET_SCHEDULER_add_now (&interpreter,
&state);
}
/**
* Function called whenever MHD is done with a request. If the
* request was a POST, we may have stored a `struct Buffer *` in the
* @a con_cls that might still need to be cleaned up. Call the
* respective function to free the memory.
*
* @param cls client-defined closure
* @param connection connection handle
* @param con_cls value as set by the last call to
* the #MHD_AccessHandlerCallback
* @param toe reason for request termination
* @see #MHD_OPTION_NOTIFY_COMPLETED
* @ingroup request
*/
static void
handle_mhd_completion_callback (void *cls,
struct MHD_Connection *connection,
void **con_cls,
enum MHD_RequestTerminationCode toe)
{
TMH_PARSE_post_cleanup_callback (*con_cls);
*con_cls = NULL;
}
/**
* Handle incoming HTTP request.
*
* @param cls closure for MHD daemon (unused)
* @param connection the connection
* @param url the requested url
* @param method the method (POST, GET, ...)
* @param version HTTP version (ignored)
* @param upload_data request data
* @param upload_data_size size of @a upload_data in bytes
* @param con_cls closure for request (a `struct Buffer *`)
* @return MHD result code
*/
static int
handle_mhd_request (void *cls,
struct MHD_Connection *connection,
const char *url,
const char *method,
const char *version,
const char *upload_data,
size_t *upload_data_size,
void **con_cls)
{
if (0 != strcasecmp (url,
"/admin/add/incoming"))
{
/* Unexpected URI path, just close the connection. */
/* we're rather impolite here, but it's a testcase. */
GNUNET_break_op (0);
return MHD_NO;
}
/* FIXME: to be implemented! */
GNUNET_break (0);
return MHD_NO;
}
@ -67,8 +363,6 @@ run (void *cls,
const struct GNUNET_SCHEDULER_TaskContext *tc)
{
struct GNUNET_CONFIGURATION_Handle *cfg = cls;
struct TALER_EXCHANGEDB_Plugin *plugin;
struct TALER_EXCHANGEDB_Session *session;
plugin = TALER_EXCHANGEDB_plugin_load (cfg);
if (GNUNET_OK !=
@ -81,15 +375,48 @@ run (void *cls,
}
session = plugin->get_session (plugin->cls,
GNUNET_YES);
/* FIXME: prime DB */
/* FIXME: launch bank on 8082! */
run_aggregator ();
/* FIXME: check DB and bank */
GNUNET_assert (NULL != session);
child_death_task =
GNUNET_SCHEDULER_add_read_file (GNUNET_TIME_UNIT_FOREVER_REL,
GNUNET_DISK_pipe_handle (sigpipe,
GNUNET_DISK_PIPE_END_READ),
&maint_child_death, NULL);
shutdown_task =
GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
&shutdown_action,
NULL);
result = 1; /* test failed for undefined reason */
mhd_bank = MHD_start_daemon (MHD_USE_SELECT_INTERNALLY | MHD_USE_DEBUG,
8082,
NULL, NULL,
&handle_mhd_request, NULL,
MHD_OPTION_NOTIFY_COMPLETED, &handle_mhd_completion_callback, NULL,
MHD_OPTION_END);
if (NULL == mhd_bank)
{
GNUNET_SCHEDULER_shutdown ();
return;
}
mhd_task = FIXME;
run_test ();
}
plugin->drop_temporary (plugin->cls,
session);
TALER_EXCHANGEDB_plugin_unload (plugin);
result = 77; /* skip: not finished */
/**
* Signal handler called for SIGCHLD. Triggers the
* respective handler by writing to the trigger pipe.
*/
static void
sighandler_child_death ()
{
static char c;
int old_errno = errno; /* back-up errno */
GNUNET_break (1 ==
GNUNET_DISK_file_write (GNUNET_DISK_pipe_handle
(sigpipe, GNUNET_DISK_PIPE_END_WRITE),
&c, sizeof (c)));
errno = old_errno; /* restore errno */
}
@ -100,6 +427,7 @@ main (int argc,
const char *plugin_name;
char *testname;
struct GNUNET_CONFIGURATION_Handle *cfg;
struct GNUNET_SIGNAL_Context *shc_chld;
result = -1;
if (NULL == (plugin_name = strrchr (argv[0], (int) '-')))
@ -122,7 +450,14 @@ main (int argc,
GNUNET_free (testname);
return 2;
}
sigpipe = GNUNET_DISK_pipe (GNUNET_NO, GNUNET_NO, GNUNET_NO, GNUNET_NO);
GNUNET_assert (NULL != sigpipe);
shc_chld =
GNUNET_SIGNAL_handler_install (GNUNET_SIGCHLD, &sighandler_child_death);
GNUNET_SCHEDULER_run (&run, cfg);
GNUNET_SIGNAL_handler_uninstall (shc_chld);
shc_chld = NULL;
GNUNET_DISK_pipe_close (sigpipe);
GNUNET_CONFIGURATION_destroy (cfg);
GNUNET_free (config_filename);
GNUNET_free (testname);