diff options
Diffstat (limited to 'src/exchange/taler-exchange-aggregator.c')
| -rw-r--r-- | src/exchange/taler-exchange-aggregator.c | 116 | 
1 files changed, 69 insertions, 47 deletions
| diff --git a/src/exchange/taler-exchange-aggregator.c b/src/exchange/taler-exchange-aggregator.c index cfc11a5f..96922eff 100644 --- a/src/exchange/taler-exchange-aggregator.c +++ b/src/exchange/taler-exchange-aggregator.c @@ -20,7 +20,6 @@   * @author Christian Grothoff   *   * TODO: - * - simplify global_ret: make it a global!   * - handle shutdown more nicely (call 'cancel' method on wire transfers)   */  #include "platform.h" @@ -68,6 +67,12 @@ static struct TALER_WIRE_Plugin *wire_plugin;  static struct GNUNET_SCHEDULER_Task *task;  /** + * Value to return from main(). #GNUNET_OK on success, #GNUNET_SYSERR + * on serious errors. + */ +static int global_ret; + +/**   * #GNUNET_YES if we are in test mode and are using temporary tables.   */  static int test_mode; @@ -85,6 +90,25 @@ static unsigned int aggregation_limit = TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT  /** + * We're being aborted with CTRL-C (or SIGTERM). Shut down. + * + * @param cls closure + * @param tc scheduler context + */ +static void +shutdown_task (void *cls, +               const struct GNUNET_SCHEDULER_TaskContext *tc) +{ +  if (NULL != task) +  { +    GNUNET_SCHEDULER_cancel (task); +    task = NULL; +  } +  /* FIXME: other shutdown stuff here! */ +} + + +/**   * Load configuration parameters for the exchange   * server into the corresponding global variables.   * @@ -217,11 +241,6 @@ struct AggregationUnit    unsigned long long *additional_rows;    /** -   * Pointer to global return value. Closure for #run(). -   */ -  int *global_ret; - -  /**     * Offset specifying how many #additional_rows are in use.     */    unsigned int rows_offset; @@ -314,7 +333,6 @@ deposit_cb (void *cls,  } -  /**   * Function called with details about another deposit we   * can aggregate into an existing aggregation unit. @@ -433,14 +451,13 @@ prepare_cb (void *cls,   * Main work function that queries the DB and aggregates transactions   * into larger wire transfers.   * - * @param cls pointer to an `int` which we will return from main() + * @param cls NULL   * @param tc scheduler context   */  static void  run_aggregation (void *cls,                   const struct GNUNET_SCHEDULER_TaskContext *tc)  { -  int *global_ret = cls;    struct TALER_EXCHANGEDB_Session *session;    struct AggregationUnit *au;    unsigned int i; @@ -453,7 +470,7 @@ run_aggregation (void *cls,    {      GNUNET_log (GNUNET_ERROR_TYPE_ERROR,                  "Failed to obtain database session!\n"); -    *global_ret = GNUNET_SYSERR; +    global_ret = GNUNET_SYSERR;      return;    }    if (GNUNET_OK != @@ -462,7 +479,7 @@ run_aggregation (void *cls,    {      GNUNET_log (GNUNET_ERROR_TYPE_ERROR,                  "Failed to start database transaction!\n"); -    *global_ret = GNUNET_SYSERR; +    global_ret = GNUNET_SYSERR;      return;    }    au = GNUNET_new (struct AggregationUnit); @@ -482,7 +499,7 @@ run_aggregation (void *cls,      {        GNUNET_log (GNUNET_ERROR_TYPE_ERROR,                    "Failed to execute deposit iteration!\n"); -      *global_ret = GNUNET_SYSERR; +      global_ret = GNUNET_SYSERR;        return;      }      if (GNUNET_YES == test_mode) @@ -495,7 +512,7 @@ run_aggregation (void *cls,        /* nothing to do, sleep for a minute and try again */        task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,                                             &run_aggregation, -                                           global_ret); +                                           NULL);      }      return;    } @@ -518,7 +535,7 @@ run_aggregation (void *cls,      GNUNET_free (au);      db_plugin->rollback (db_plugin->cls,                           session); -    *global_ret = GNUNET_SYSERR; +    global_ret = GNUNET_SYSERR;      return;    }    /* Round to the unit supported by the wire transfer method */ @@ -541,7 +558,7 @@ run_aggregation (void *cls,      {        GNUNET_log (GNUNET_ERROR_TYPE_ERROR,                    "Failed to start database transaction!\n"); -      *global_ret = GNUNET_SYSERR; +      global_ret = GNUNET_SYSERR;        GNUNET_free_non_null (au->additional_rows);        if (NULL != au->wire)          json_decref (au->wire); @@ -576,10 +593,9 @@ run_aggregation (void *cls,      GNUNET_free (au);      /* start again */      task = GNUNET_SCHEDULER_add_now (&run_aggregation, -                                     global_ret); +                                     NULL);      return;    } -  au->global_ret = global_ret;    au->ph = wire_plugin->prepare_wire_transfer (wire_plugin->cls,                                                 au->wire,                                                 &au->total_amount, @@ -600,7 +616,7 @@ run_aggregation (void *cls,      GNUNET_free (au);      /* start again */      task = GNUNET_SCHEDULER_add_now (&run_aggregation, -                                     global_ret); +                                     NULL);      return;    }    /* otherwise we continue with #prepare_cb(), see below */ @@ -632,7 +648,6 @@ prepare_cb (void *cls,              size_t buf_size)  {    struct AggregationUnit *au = cls; -  int *global_ret = au->global_ret;    struct TALER_EXCHANGEDB_Session *session = au->session;    GNUNET_free_non_null (au->additional_rows); @@ -646,7 +661,7 @@ prepare_cb (void *cls,                           session);      /* start again */      task = GNUNET_SCHEDULER_add_now (&run_aggregation, -                                     global_ret); +                                     NULL);      return;    } @@ -663,7 +678,7 @@ prepare_cb (void *cls,                           session);      /* start again */      task = GNUNET_SCHEDULER_add_now (&run_aggregation, -                                     global_ret); +                                     NULL);      return;    } @@ -677,13 +692,13 @@ prepare_cb (void *cls,                  "Failed to commit database transaction!\n");      /* try again */      task = GNUNET_SCHEDULER_add_now (&run_aggregation, -                                     global_ret); +                                     NULL);      return;    }    /* run alternative task: actually do wire transfer! */    task = GNUNET_SCHEDULER_add_now (&run_transfers, -                                   &global_ret); +                                   NULL);  } @@ -704,12 +719,6 @@ struct WirePrepareData    struct TALER_WIRE_ExecuteHandle *eh;    /** -   * Pointer to global return value. Closure for #run(). -   */ -  int *global_ret; - - -  /**     * Row ID of the transfer.     */    unsigned long long row_id; @@ -730,7 +739,6 @@ wire_confirm_cb (void *cls,                   const char *emsg)  {    struct WirePrepareData *wpd = cls; -  int *global_ret = wpd->global_ret;    struct TALER_EXCHANGEDB_Session *session = wpd->session;    wpd->eh = NULL; @@ -741,7 +749,7 @@ wire_confirm_cb (void *cls,                  emsg);      db_plugin->rollback (db_plugin->cls,                           session); -    *global_ret = GNUNET_SYSERR; +    global_ret = GNUNET_SYSERR;      GNUNET_free (wpd);      return;    } @@ -753,7 +761,7 @@ wire_confirm_cb (void *cls,      GNUNET_break (0); /* why!? */      db_plugin->rollback (db_plugin->cls,                           session); -    *global_ret = GNUNET_SYSERR; +    global_ret = GNUNET_SYSERR;      GNUNET_free (wpd);      return;    } @@ -766,13 +774,13 @@ wire_confirm_cb (void *cls,                  "Failed to commit database transaction!\n");      /* try again */      task = GNUNET_SCHEDULER_add_now (&run_aggregation, -                                     global_ret); +                                     NULL);      return;    }    /* continue with #run_transfers(), just to guard       against the unlikely case that there are more. */    task = GNUNET_SCHEDULER_add_now (&run_transfers, -                                   &global_ret); +                                   NULL);  } @@ -792,7 +800,6 @@ wire_prepare_cb (void *cls,                   size_t buf_size)  {    struct WirePrepareData *wpd = cls; -  int *global_ret = wpd->global_ret;    wpd->row_id = rowid;    wpd->eh = wire_plugin->execute_wire_transfer (wire_plugin->cls, @@ -808,7 +815,7 @@ wire_prepare_cb (void *cls,      GNUNET_break (0); /* why? how to best recover? */      db_plugin->rollback (db_plugin->cls,                           wpd->session); -    *global_ret = GNUNET_SYSERR; +    global_ret = GNUNET_SYSERR;      GNUNET_free (wpd);      return;    } @@ -819,14 +826,13 @@ wire_prepare_cb (void *cls,   * Execute the wire transfers that we have committed to   * do.   * - * @param cls pointer to an `int` which we will return from main() + * @param cls NULL   * @param tc scheduler context   */  static void  run_transfers (void *cls,                 const struct GNUNET_SCHEDULER_TaskContext *tc)  { -  int *global_ret = cls;    int ret;    struct WirePrepareData *wpd;    struct TALER_EXCHANGEDB_Session *session; @@ -838,7 +844,7 @@ run_transfers (void *cls,    {      GNUNET_log (GNUNET_ERROR_TYPE_ERROR,                  "Failed to obtain database session!\n"); -    *global_ret = GNUNET_SYSERR; +    global_ret = GNUNET_SYSERR;      return;    }    if (GNUNET_OK != @@ -847,12 +853,11 @@ run_transfers (void *cls,    {      GNUNET_log (GNUNET_ERROR_TYPE_ERROR,                  "Failed to start database transaction!\n"); -    *global_ret = GNUNET_SYSERR; +    global_ret = GNUNET_SYSERR;      return;    }    wpd = GNUNET_new (struct WirePrepareData);    wpd->session = session; -  wpd->global_ret = global_ret;    ret = db_plugin->wire_prepare_data_get (db_plugin->cls,                                            session,                                            exchange_wireformat, @@ -863,7 +868,7 @@ run_transfers (void *cls,      GNUNET_break (0); /* why? how to best recover? */      db_plugin->rollback (db_plugin->cls,                           session); -    *global_ret = GNUNET_SYSERR; +    global_ret = GNUNET_SYSERR;      GNUNET_free (wpd);      return;    } @@ -873,7 +878,7 @@ run_transfers (void *cls,      db_plugin->rollback (db_plugin->cls,                           session);      task = GNUNET_SCHEDULER_add_now (&run_aggregation, -                                     global_ret); +                                     NULL);      GNUNET_free (wpd);      return;    } @@ -882,6 +887,24 @@ run_transfers (void *cls,  /** + * First task. + * + * @param cls closure, NULL + * @param tc scheduler context + */ +static void +run (void *cls, +     const struct GNUNET_SCHEDULER_TaskContext *tc) +{ +  task = GNUNET_SCHEDULER_add_now (&run_transfers, +                                   cls); +  GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, +                                &shutdown_task, +                                cls); +} + + +/**   * The main function of the taler-exchange-httpd server ("the exchange").   *   * @param argc number of arguments from the command line @@ -907,7 +930,6 @@ main (int argc,      GNUNET_GETOPT_OPTION_VERSION (VERSION "-" VCS_VERSION),      GNUNET_GETOPT_OPTION_END    }; -  int ret = GNUNET_OK;    GNUNET_assert (GNUNET_OK ==                   GNUNET_log_setup ("taler-exchange-aggregator", @@ -929,12 +951,12 @@ main (int argc,    {      return 1;    } - -  GNUNET_SCHEDULER_run (&run_transfers, &ret); +  global_ret = GNUNET_OK; +  GNUNET_SCHEDULER_run (&run, NULL);    TALER_EXCHANGEDB_plugin_unload (db_plugin);    TALER_WIRE_plugin_unload (wire_plugin); -  return (GNUNET_SYSERR == ret) ? 1 : 0; +  return (GNUNET_SYSERR == global_ret) ? 1 : 0;  }  /* end of taler-exchange-aggregator.c */ | 
