diff options
Diffstat (limited to 'src/exchange/taler-exchange-aggregator.c')
| -rw-r--r-- | src/exchange/taler-exchange-aggregator.c | 273 | 
1 files changed, 212 insertions, 61 deletions
diff --git a/src/exchange/taler-exchange-aggregator.c b/src/exchange/taler-exchange-aggregator.c index e202290d..0fc13c14 100644 --- a/src/exchange/taler-exchange-aggregator.c +++ b/src/exchange/taler-exchange-aggregator.c @@ -108,6 +108,35 @@ struct AggregationUnit  /** + * Work shard we are processing. + */ +struct Shard +{ + +  /** +   * When did we start processing the shard? +   */ +  struct GNUNET_TIME_Absolute start_time; + +  /** +   * Starting row of the shard. +   */ +  uint32_t shard_start; + +  /** +   * Exclusive end row of the shard. +   */ +  uint32_t shard_end; + +  /** +   * Number of starting points found in the shard. +   */ +  uint64_t work_counter; + +}; + + +/**   * What is the smallest unit we support for wire transfers?   * We will need to round down to a multiple of this amount.   */ @@ -135,12 +164,20 @@ static struct TALER_EXCHANGEDB_Plugin *db_plugin;   */  static struct GNUNET_SCHEDULER_Task *task; +  /**   * How long should we sleep when idle before trying to find more work?   */  static struct GNUNET_TIME_Relative aggregator_idle_sleep_interval;  /** + * How big are the shards we are processing? Is an inclusive offset, so every + * shard ranges from [X,X+shard_size) exclusive.  So a shard covers + * shard_size slots.  The maximum value for shard_size is INT32_MAX+1. + */ +static uint32_t shard_size; + +/**   * Value to return from main(). 0 on success, non-zero on errors.   */  static int global_ret; @@ -162,6 +199,15 @@ run_aggregation (void *cls);  /** + * Select a shard to work on. + * + * @param cls NULL + */ +static void +run_shard (void *cls); + + +/**   * Free data stored in @a au, but not @a au itself (stack allocated).   *   * @param au aggregation unit to clean up @@ -612,30 +658,56 @@ commit_or_warn (void)  /** + * Release lock on shard @a s in the database. + * On error, terminates this process. + * + * @param[in] s shard to free (and memory to release) + */ +static void +release_shard (struct Shard *s) +{ +  enum GNUNET_DB_QueryStatus qs; + +  qs = db_plugin->release_revolving_shard ( +    db_plugin->cls, +    "aggregator", +    s->shard_start, +    s->shard_end); +  GNUNET_free (s); +  switch (qs) +  { +  case GNUNET_DB_STATUS_HARD_ERROR: +  case GNUNET_DB_STATUS_SOFT_ERROR: +    GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR != qs); +    GNUNET_break (0); +    GNUNET_SCHEDULER_shutdown (); +    return; +  case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: +    /* Strange, but let's just continue */ +    break; +  case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: +    /* normal case */ +    break; +  } +} + + +/**   * Main work function that queries the DB and aggregates transactions   * into larger wire transfers.   * - * @param cls NULL + * @param cls a `struct Shard *`   */  static void  run_aggregation (void *cls)  { +  struct Shard *s = cls;    struct AggregationUnit au_active;    enum GNUNET_DB_QueryStatus qs; -  (void) cls;    task = NULL;    GNUNET_log (GNUNET_ERROR_TYPE_INFO,                "Checking for ready deposits to aggregate\n"); -  if (GNUNET_SYSERR == -      db_plugin->preflight (db_plugin->cls)) -  { -    GNUNET_log (GNUNET_ERROR_TYPE_ERROR, -                "Failed to obtain database connection!\n"); -    global_ret = EXIT_FAILURE; -    GNUNET_SCHEDULER_shutdown (); -    return; -  }    if (GNUNET_OK !=        db_plugin->start_deferred_wire_out (db_plugin->cls))    { @@ -643,50 +715,70 @@ run_aggregation (void *cls)                  "Failed to start database transaction!\n");      global_ret = EXIT_FAILURE;      GNUNET_SCHEDULER_shutdown (); +    release_shard (s);      return;    }    memset (&au_active,            0,            sizeof (au_active)); -  qs = db_plugin->get_ready_deposit (db_plugin->cls, -                                     &deposit_cb, -                                     &au_active); -  if (0 >= qs) +  qs = db_plugin->get_ready_deposit ( +    db_plugin->cls, +    s->shard_start, +    s->shard_end - 1, /* -1: exclusive->inclusive */ +    &deposit_cb, +    &au_active); +  switch (qs)    { +  case GNUNET_DB_STATUS_HARD_ERROR:      cleanup_au (&au_active);      db_plugin->rollback (db_plugin->cls); -    if (GNUNET_DB_STATUS_HARD_ERROR == qs) -    { -      GNUNET_log (GNUNET_ERROR_TYPE_ERROR, -                  "Failed to execute deposit iteration!\n"); -      global_ret = EXIT_FAILURE; -      GNUNET_SCHEDULER_shutdown (); -      return; -    } -    if (GNUNET_DB_STATUS_SOFT_ERROR == qs) +    GNUNET_log (GNUNET_ERROR_TYPE_ERROR, +                "Failed to begin deposit iteration!\n"); +    global_ret = EXIT_FAILURE; +    GNUNET_SCHEDULER_shutdown (); +    release_shard (s); +    return; +  case GNUNET_DB_STATUS_SOFT_ERROR: +    cleanup_au (&au_active); +    db_plugin->rollback (db_plugin->cls); +    GNUNET_assert (NULL == task); +    task = GNUNET_SCHEDULER_add_now (&run_aggregation, +                                     s); +    return; +  case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:      { -      /* should re-try immediately */ +      uint64_t counter = s->work_counter; +      struct GNUNET_TIME_Relative duration +        = GNUNET_TIME_absolute_get_duration (s->start_time); + +      cleanup_au (&au_active); +      db_plugin->rollback (db_plugin->cls); +      GNUNET_log (GNUNET_ERROR_TYPE_INFO, +                  "Completed shard after %s\n", +                  GNUNET_STRINGS_relative_time_to_string (duration, +                                                          GNUNET_YES)); +      release_shard (s); +      if (GNUNET_YES == test_mode) +      { +        /* in test mode, shutdown after a shard is done */ +        GNUNET_SCHEDULER_shutdown (); +        return; +      }        GNUNET_assert (NULL == task); -      task = GNUNET_SCHEDULER_add_now (&run_aggregation, -                                       NULL); +      /* If we ended up doing zero work, sleep a bit */ +      if (0 == counter) +        task = GNUNET_SCHEDULER_add_delayed (aggregator_idle_sleep_interval, +                                             &run_shard, +                                             NULL); +      else +        task = GNUNET_SCHEDULER_add_now (&run_shard, +                                         NULL);        return;      } -    GNUNET_log (GNUNET_ERROR_TYPE_INFO, -                "No more ready deposits, going to sleep\n"); -    if (GNUNET_YES == test_mode) -    { -      /* in test mode, shutdown if we end up being idle */ -      GNUNET_SCHEDULER_shutdown (); -    } -    else -    { -      /* nothing to do, sleep for a minute and try again */ -      GNUNET_assert (NULL == task); -      task = GNUNET_SCHEDULER_add_delayed (aggregator_idle_sleep_interval, -                                           &run_aggregation, -                                           NULL); -    } -    return; +  case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: +    s->work_counter++; +    /* continued below */ +    break;    }    /* Now try to find other deposits to aggregate */ @@ -707,6 +799,7 @@ run_aggregation (void *cls)      db_plugin->rollback (db_plugin->cls);      global_ret = EXIT_FAILURE;      GNUNET_SCHEDULER_shutdown (); +    release_shard (s);      return;    }    if (GNUNET_DB_STATUS_SOFT_ERROR == qs) @@ -718,7 +811,7 @@ run_aggregation (void *cls)      cleanup_au (&au_active);      GNUNET_assert (NULL == task);      task = GNUNET_SCHEDULER_add_now (&run_aggregation, -                                     NULL); +                                     s);      return;    } @@ -754,6 +847,7 @@ run_aggregation (void *cls)        global_ret = EXIT_FAILURE;        cleanup_au (&au_active);        GNUNET_SCHEDULER_shutdown (); +      release_shard (s);        return;      }      /* Mark transactions by row_id as minor */ @@ -778,7 +872,7 @@ run_aggregation (void *cls)        /* start again */        GNUNET_assert (NULL == task);        task = GNUNET_SCHEDULER_add_now (&run_aggregation, -                                       NULL); +                                       s);        return;      }      if (GNUNET_DB_STATUS_HARD_ERROR == qs) @@ -787,6 +881,7 @@ run_aggregation (void *cls)        cleanup_au (&au_active);        global_ret = EXIT_FAILURE;        GNUNET_SCHEDULER_shutdown (); +      release_shard (s);        return;      }      /* commit */ @@ -796,20 +891,13 @@ run_aggregation (void *cls)      /* start again */      GNUNET_assert (NULL == task);      task = GNUNET_SCHEDULER_add_now (&run_aggregation, -                                     NULL); +                                     s);      return;    } -  { -    char *amount_s; - -    amount_s = TALER_amount_to_string (&au_active.final_amount); -    GNUNET_log (GNUNET_ERROR_TYPE_INFO, -                "Preparing wire transfer of %s to %s\n", -                amount_s, -                TALER_B2S (&au_active.merchant_pub)); -    GNUNET_free (amount_s); -  } - +  GNUNET_log (GNUNET_ERROR_TYPE_INFO, +              "Preparing wire transfer of %s to %s\n", +              TALER_amount2s (&au_active.final_amount), +              TALER_B2S (&au_active.merchant_pub));    {      void *buf;      size_t buf_size; @@ -856,7 +944,7 @@ run_aggregation (void *cls)      /* start again */      GNUNET_assert (NULL == task);      task = GNUNET_SCHEDULER_add_now (&run_aggregation, -                                     NULL); +                                     s);      return;    }    if (GNUNET_DB_STATUS_HARD_ERROR == qs) @@ -866,6 +954,7 @@ run_aggregation (void *cls)      /* die hard */      global_ret = EXIT_FAILURE;      GNUNET_SCHEDULER_shutdown (); +    release_shard (s);      return;    } @@ -882,26 +971,72 @@ run_aggregation (void *cls)                  "Commit issue for prepared wire data; trying again later!\n");      GNUNET_assert (NULL == task);      task = GNUNET_SCHEDULER_add_now (&run_aggregation, -                                     NULL); +                                     s);      return;    case GNUNET_DB_STATUS_HARD_ERROR:      GNUNET_break (0);      global_ret = EXIT_FAILURE;      GNUNET_SCHEDULER_shutdown (); +    release_shard (s);      return;    case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:      GNUNET_log (GNUNET_ERROR_TYPE_INFO,                  "Preparation complete, going again\n");      GNUNET_assert (NULL == task);      task = GNUNET_SCHEDULER_add_now (&run_aggregation, -                                     NULL); +                                     s);      return;    default:      GNUNET_break (0);      global_ret = EXIT_FAILURE;      GNUNET_SCHEDULER_shutdown (); +    release_shard (s); +    return; +  } +} + + +/** + * Select a shard to work on. + * + * @param cls NULL + */ +static void +run_shard (void *cls) +{ +  struct Shard *s; +  enum GNUNET_DB_QueryStatus qs; + +  (void) cls; +  task = NULL; +  if (GNUNET_SYSERR == +      db_plugin->preflight (db_plugin->cls)) +  { +    GNUNET_log (GNUNET_ERROR_TYPE_ERROR, +                "Failed to obtain database connection!\n"); +    global_ret = EXIT_FAILURE; +    GNUNET_SCHEDULER_shutdown ();      return;    } +  s = GNUNET_new (struct Shard); +  s->start_time = GNUNET_TIME_absolute_get (); +  qs = db_plugin->begin_revolving_shard (db_plugin->cls, +                                         "aggregator", +                                         shard_size, +                                         1U + INT32_MAX, +                                         &s->shard_start, +                                         &s->shard_end); +  if (0 >= qs) +  { +    GNUNET_log (GNUNET_ERROR_TYPE_ERROR, +                "Failed to begin shard!\n"); +    GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR != qs); +    global_ret = EXIT_FAILURE; +    GNUNET_SCHEDULER_shutdown (); +    return; +  } +  task = GNUNET_SCHEDULER_add_now (&run_aggregation, +                                   s);  } @@ -919,6 +1054,7 @@ run (void *cls,       const char *cfgfile,       const struct GNUNET_CONFIGURATION_Handle *c)  { +  unsigned long long ass;    (void) cls;    (void) args;    (void) cfgfile; @@ -930,8 +1066,23 @@ run (void *cls,      global_ret = EXIT_NOTCONFIGURED;      return;    } +  if (GNUNET_OK != +      GNUNET_CONFIGURATION_get_value_number (cfg, +                                             "exchange", +                                             "AGGREGATOR_SHARD_SIZE", +                                             &ass)) +  { +    cfg = NULL; +    global_ret = EXIT_NOTCONFIGURED; +    return; +  } +  if ( (0 == ass) || +       (ass > INT32_MAX) ) +    shard_size = 1U + INT32_MAX; +  else +    shard_size = (uint32_t) ass;    GNUNET_assert (NULL == task); -  task = GNUNET_SCHEDULER_add_now (&run_aggregation, +  task = GNUNET_SCHEDULER_add_now (&run_shard,                                     NULL);    GNUNET_SCHEDULER_add_shutdown (&shutdown_task,                                   cls);  | 
