diff options
Diffstat (limited to 'src/exchange/taler-exchange-aggregator.c')
| -rw-r--r-- | src/exchange/taler-exchange-aggregator.c | 327 | 
1 files changed, 167 insertions, 160 deletions
diff --git a/src/exchange/taler-exchange-aggregator.c b/src/exchange/taler-exchange-aggregator.c index 9b1c7e3e..b84c837e 100644 --- a/src/exchange/taler-exchange-aggregator.c +++ b/src/exchange/taler-exchange-aggregator.c @@ -287,7 +287,8 @@ static int reserves_idle;   * Note: do not change here, Postgres requires us to hard-code the   * LIMIT in the prepared statement.   */ -static unsigned int aggregation_limit = TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT; +static unsigned int aggregation_limit = +  TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT; @@ -347,13 +348,13 @@ update_fees (struct WireAccount *wa,         p = p->next)    {      qs = db_plugin->insert_wire_fee (db_plugin->cls, -				     session, -				     wa->wire_plugin->method, -				     p->start_date, -				     p->end_date, -				     &p->wire_fee, -				     &p->closing_fee, -				     &p->master_sig); +                                     session, +                                     wa->wire_plugin->method, +                                     p->start_date, +                                     p->end_date, +                                     &p->wire_fee, +                                     &p->closing_fee, +                                     &p->master_sig);      if (qs < 0)      {        TALER_EXCHANGEDB_fees_free (wa->af); @@ -486,8 +487,9 @@ shutdown_task (void *cls)    {      if (NULL != wpd->eh)      { -      wpd->wa->wire_plugin->execute_wire_transfer_cancel (wpd->wa->wire_plugin->cls, -                                                          wpd->eh); +      wpd->wa->wire_plugin->execute_wire_transfer_cancel ( +        wpd->wa->wire_plugin->cls, +        wpd->eh);        wpd->eh = NULL;      }      db_plugin->rollback (db_plugin->cls, @@ -499,8 +501,9 @@ shutdown_task (void *cls)    {      if (NULL != au->ph)      { -      au->wa->wire_plugin->prepare_wire_transfer_cancel (au->wa->wire_plugin->cls, -                                                         au->ph); +      au->wa->wire_plugin->prepare_wire_transfer_cancel ( +        au->wa->wire_plugin->cls, +        au->ph);        au->ph = NULL;      }      db_plugin->rollback (db_plugin->cls, @@ -509,8 +512,9 @@ shutdown_task (void *cls)    }    if (NULL != ctc)    { -    ctc->wa->wire_plugin->prepare_wire_transfer_cancel (ctc->wa->wire_plugin->cls, -                                                        ctc->ph); +    ctc->wa->wire_plugin->prepare_wire_transfer_cancel ( +      ctc->wa->wire_plugin->cls, +      ctc->ph);      ctc->ph = NULL;      db_plugin->rollback (db_plugin->cls,                           ctc->session); @@ -613,12 +617,12 @@ exchange_serve_process_config ()   */  static int  refund_by_coin_cb (void *cls, -		   const struct TALER_MerchantPublicKeyP *merchant_pub, -		   const struct TALER_MerchantSignatureP *merchant_sig, -		   const struct GNUNET_HashCode *h_contract, -		   uint64_t rtransaction_id, -		   const struct TALER_Amount *amount_with_fee, -		   const struct TALER_Amount *refund_fee) +                   const struct TALER_MerchantPublicKeyP *merchant_pub, +                   const struct TALER_MerchantSignatureP *merchant_sig, +                   const struct GNUNET_HashCode *h_contract, +                   uint64_t rtransaction_id, +                   const struct TALER_Amount *amount_with_fee, +                   const struct TALER_Amount *refund_fee)  {    struct AggregationUnit *aux = cls; @@ -632,8 +636,8 @@ refund_by_coin_cb (void *cls,      return GNUNET_OK; /* different contract */    if (GNUNET_OK !=        TALER_amount_subtract (&aux->total_amount, -			     &aux->total_amount, -			     amount_with_fee)) +                             &aux->total_amount, +                             amount_with_fee))    {      GNUNET_break (0);      return GNUNET_SYSERR; @@ -680,17 +684,17 @@ deposit_cb (void *cls,      GNUNET_log (GNUNET_ERROR_TYPE_ERROR,                  "Fatally malformed record at row %llu over %s\n",                  (unsigned long long) row_id, -		TALER_amount2s (amount_with_fee)); +                TALER_amount2s (amount_with_fee));      return GNUNET_DB_STATUS_HARD_ERROR;    }    au->row_id = row_id;    au->h_contract = h_contract_terms;    qs = db_plugin->select_refunds_by_coin (db_plugin->cls, -					  au->session, -					  coin_pub, -					  &refund_by_coin_cb, -					  au); +                                          au->session, +                                          coin_pub, +                                          &refund_by_coin_cb, +                                          au);    au->h_contract = NULL;    if (0 > qs)    { @@ -738,8 +742,8 @@ deposit_cb (void *cls,    au->execution_time = GNUNET_TIME_absolute_get ();    (void) GNUNET_TIME_round_abs (&au->execution_time);    qs = update_fees (au->wa, -		    au->execution_time, -		    au->session); +                    au->execution_time, +                    au->session);    if (qs <= 0)    {      if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs) @@ -750,17 +754,17 @@ deposit_cb (void *cls,    au->wire_fee = au->wa->af->wire_fee;    qs = db_plugin->insert_aggregation_tracking (db_plugin->cls, -					       au->session, -					       &au->wtid, -					       row_id); +                                               au->session, +                                               &au->wtid, +                                               row_id);    if (qs <= 0)    {      GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);      return qs;    }    qs = db_plugin->mark_deposit_done (db_plugin->cls, -				     au->session, -				     row_id); +                                     au->session, +                                     row_id);    if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != qs)    {      GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); @@ -811,13 +815,13 @@ aggregate_cb (void *cls,      GNUNET_log (GNUNET_ERROR_TYPE_ERROR,                  "Fatally malformed record at %llu over amount %s\n",                  (unsigned long long) row_id, -		TALER_amount2s (amount_with_fee)); +                TALER_amount2s (amount_with_fee));      return GNUNET_DB_STATUS_HARD_ERROR;    }    /* add to total */    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, -	      "Adding transaction amount %s to aggregation\n", -	      TALER_amount2s (&delta)); +              "Adding transaction amount %s to aggregation\n", +              TALER_amount2s (&delta));    if (GNUNET_OK !=        TALER_amount_add (&au->total_amount,                          &au->total_amount, @@ -832,10 +836,10 @@ aggregate_cb (void *cls,    au->h_contract = h_contract_terms;    qs = db_plugin->select_refunds_by_coin (db_plugin->cls, -					  au->session, -					  coin_pub, -					  &refund_by_coin_cb, -					  au); +                                          au->session, +                                          coin_pub, +                                          &refund_by_coin_cb, +                                          au);    au->h_contract = NULL;    if (0 > qs)    { @@ -857,25 +861,25 @@ aggregate_cb (void *cls,    au->additional_rows[au->rows_offset++] = row_id;    /* insert into aggregation tracking table */    qs = db_plugin->insert_aggregation_tracking (db_plugin->cls, -					       au->session, -					       &au->wtid, -					       row_id); +                                               au->session, +                                               &au->wtid, +                                               row_id);    if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != qs)    {      GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);      return qs;    }    qs = db_plugin->mark_deposit_done (db_plugin->cls, -				     au->session, -				     row_id); +                                     au->session, +                                     row_id);    if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != qs)    {      GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);      return qs;    }    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, -	      "Added row %llu with %s to aggregation\n", -	      (unsigned long long) row_id, +              "Added row %llu with %s to aggregation\n", +              (unsigned long long) row_id,                TALER_amount2s (&delta));    return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;  } @@ -958,8 +962,8 @@ commit_or_warn (struct TALER_EXCHANGEDB_Session *session)   */  static void  prepare_close_cb (void *cls, -		  const char *buf, -		  size_t buf_size) +                  const char *buf, +                  size_t buf_size)  {    enum GNUNET_DB_QueryStatus qs; @@ -984,10 +988,10 @@ prepare_close_cb (void *cls,    /* Commit our intention to execute the wire transfer! */    qs = db_plugin->wire_prepare_data_insert (db_plugin->cls, -					    ctc->session, -					    ctc->method, -					    buf, -					    buf_size); +                                            ctc->session, +                                            ctc->method, +                                            buf, +                                            buf_size);    if (GNUNET_DB_STATUS_HARD_ERROR == qs)    {      GNUNET_break (0); @@ -1021,7 +1025,7 @@ prepare_close_cb (void *cls,    GNUNET_log (GNUNET_ERROR_TYPE_INFO,                "Reserve closure committed, running transfer\n");    task = GNUNET_SCHEDULER_add_now (&run_transfers, -				   NULL); +                                   NULL);  } @@ -1059,10 +1063,10 @@ struct ExpiredReserveContext   */  static enum GNUNET_DB_QueryStatus  expired_reserve_cb (void *cls, -		    const struct TALER_ReservePublicKeyP *reserve_pub, -		    const struct TALER_Amount *left, -		    const char *account_details, -		    struct GNUNET_TIME_Absolute expiration_date) +                    const struct TALER_ReservePublicKeyP *reserve_pub, +                    const struct TALER_Amount *left, +                    const char *account_details, +                    struct GNUNET_TIME_Absolute expiration_date)  {    struct ExpiredReserveContext *erc = cls;    struct TALER_EXCHANGEDB_Session *session = erc->session; @@ -1090,8 +1094,8 @@ expired_reserve_cb (void *cls,    /* lookup `closing_fee` */    qs = update_fees (wa, -		    now, -		    session); +                    now, +                    session);    if (qs <= 0)    {      if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs) @@ -1106,8 +1110,8 @@ expired_reserve_cb (void *cls,    /* calculate transfer amount */    ret = TALER_amount_subtract (&amount_without_fee, -			       left, -			       closing_fee); +                               left, +                               closing_fee);    if ( (GNUNET_SYSERR == ret) ||         (GNUNET_NO == ret) )    { @@ -1122,21 +1126,21 @@ expired_reserve_cb (void *cls,    /* NOTE: sizeof (*reserve_pub) == sizeof (wtid) right now, but to       be future-compatible, we use the memset + min construction */    memset (&wtid, -	  0, -	  sizeof (wtid)); +          0, +          sizeof (wtid));    memcpy (&wtid, -	  reserve_pub, -	  GNUNET_MIN (sizeof (wtid), -		      sizeof (*reserve_pub))); +          reserve_pub, +          GNUNET_MIN (sizeof (wtid), +                      sizeof (*reserve_pub)));    qs = db_plugin->insert_reserve_closed (db_plugin->cls, -					 session, -					 reserve_pub, -					 now, -					 account_details, -					 &wtid, -					 left, -					 closing_fee); +                                         session, +                                         reserve_pub, +                                         now, +                                         account_details, +                                         &wtid, +                                         left, +                                         closing_fee);    GNUNET_log (GNUNET_ERROR_TYPE_INFO,                "Closing reserve %s over %s (%d, %d)\n", @@ -1149,8 +1153,8 @@ expired_reserve_cb (void *cls,    {      /* success, perform wire transfer */      if (GNUNET_SYSERR == -	wa->wire_plugin->amount_round (wa->wire_plugin->cls, -				       &amount_without_fee)) +        wa->wire_plugin->amount_round (wa->wire_plugin->cls, +                                       &amount_without_fee))      {        GNUNET_break (0);        global_ret = GNUNET_SYSERR; @@ -1164,12 +1168,12 @@ expired_reserve_cb (void *cls,      ctc->ph        = wa->wire_plugin->prepare_wire_transfer (wa->wire_plugin->cls,                                                  wa->section_name, -						account_details, -						&amount_without_fee, -						exchange_base_url, -						&wtid, -						&prepare_close_cb, -						ctc); +                                                account_details, +                                                &amount_without_fee, +                                                exchange_base_url, +                                                &wtid, +                                                &prepare_close_cb, +                                                ctc);      if (NULL == ctc->ph)      {        GNUNET_break (0); @@ -1233,7 +1237,7 @@ run_reserve_closures (void *cls)                          session);    if (GNUNET_OK !=        db_plugin->start (db_plugin->cls, -			session, +                        session,                          "aggregator reserve closures"))    {      GNUNET_log (GNUNET_ERROR_TYPE_ERROR, @@ -1247,10 +1251,10 @@ run_reserve_closures (void *cls)    now = GNUNET_TIME_absolute_get ();    (void) GNUNET_TIME_round_abs (&now);    qs = db_plugin->get_expired_reserves (db_plugin->cls, -					session, -					now, -					&expired_reserve_cb, -					&erc); +                                        session, +                                        now, +                                        &expired_reserve_cb, +                                        &erc);    switch (qs)    {    case GNUNET_DB_STATUS_HARD_ERROR: @@ -1264,7 +1268,7 @@ run_reserve_closures (void *cls)      db_plugin->rollback (db_plugin->cls,                           session);      task = GNUNET_SCHEDULER_add_now (&run_reserve_closures, -				     NULL); +                                     NULL);      return;    case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:      GNUNET_log (GNUNET_ERROR_TYPE_INFO, @@ -1273,14 +1277,14 @@ run_reserve_closures (void *cls)      db_plugin->rollback (db_plugin->cls,                           session);      task = GNUNET_SCHEDULER_add_now (&run_aggregation, -				     NULL); +                                     NULL);      return;    case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:      if (GNUNET_YES == erc.async_cont)        break;      (void) commit_or_warn (session);      task = GNUNET_SCHEDULER_add_now (&run_reserve_closures, -				     NULL); +                                     NULL);      return;    }  } @@ -1307,7 +1311,7 @@ run_aggregation (void *cls)    if (0 == (++swap % 2))    {      task = GNUNET_SCHEDULER_add_now (&run_reserve_closures, -				     NULL); +                                     NULL);      return;    }    GNUNET_log (GNUNET_ERROR_TYPE_INFO, @@ -1333,9 +1337,9 @@ run_aggregation (void *cls)    au = GNUNET_new (struct AggregationUnit);    au->session = session;    qs = db_plugin->get_ready_deposit (db_plugin->cls, -				     session, -				     &deposit_cb, -				     au); +                                     session, +                                     &deposit_cb, +                                     au);    if (0 >= qs)    {      cleanup_au (); @@ -1354,7 +1358,7 @@ run_aggregation (void *cls)        /* should re-try immediately */        swap--; /* do not count failed attempts */        task = GNUNET_SCHEDULER_add_now (&run_aggregation, -				       NULL); +                                       NULL);        return;      }      GNUNET_log (GNUNET_ERROR_TYPE_INFO, @@ -1368,15 +1372,15 @@ run_aggregation (void *cls)      else      {        if ( (GNUNET_NO == reserves_idle) || -	   (GNUNET_YES == test_mode) ) -	/* Possibly more to on reserves, go for it immediately */ -	task = GNUNET_SCHEDULER_add_now (&run_reserve_closures, -					 NULL); +           (GNUNET_YES == test_mode) ) +        /* Possibly more to on reserves, go for it immediately */ +        task = GNUNET_SCHEDULER_add_now (&run_reserve_closures, +                                         NULL);        else -	/* nothing to do, sleep for a minute and try again */ -	task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES, -					     &run_aggregation, -					     NULL); +        /* nothing to do, sleep for a minute and try again */ +        task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES, +                                             &run_aggregation, +                                             NULL);      }      return;    } @@ -1386,12 +1390,12 @@ run_aggregation (void *cls)                "Found ready deposit for %s, aggregating\n",                TALER_B2S (&au->merchant_pub));    qs = db_plugin->iterate_matching_deposits (db_plugin->cls, -					     session, -					     &au->h_wire, -					     &au->merchant_pub, -					     &aggregate_cb, -					     au, -					     aggregation_limit); +                                             session, +                                             &au->h_wire, +                                             &au->merchant_pub, +                                             &aggregate_cb, +                                             au, +                                             aggregation_limit);    if ( (GNUNET_DB_STATUS_HARD_ERROR == qs) ||         (GNUNET_YES == au->failed) )    { @@ -1412,7 +1416,7 @@ run_aggregation (void *cls)      db_plugin->rollback (db_plugin->cls,                           session);      task = GNUNET_SCHEDULER_add_now (&run_aggregation, -				     NULL); +                                     NULL);      return;    } @@ -1431,8 +1435,8 @@ run_aggregation (void *cls)    {      GNUNET_log (GNUNET_ERROR_TYPE_INFO,                  "Aggregate value too low for transfer (%d/%s)\n", -		qs, -		TALER_amount2s (&au->final_amount)); +                qs, +                TALER_amount2s (&au->final_amount));      /* Rollback ongoing transaction, as we will not use the respective         WTID and thus need to remove the tracking data */      db_plugin->rollback (db_plugin->cls, @@ -1454,35 +1458,35 @@ run_aggregation (void *cls)      }      /* Mark transactions by row_id as minor */      qs = db_plugin->mark_deposit_tiny (db_plugin->cls, -				       session, -				       au->row_id); +                                       session, +                                       au->row_id);      if (0 <= qs)      { -      for (unsigned int i=0;i<au->rows_offset;i++) +      for (unsigned int i = 0; i<au->rows_offset; i++)        {          qs = db_plugin->mark_deposit_tiny (db_plugin->cls, -					   session, -					   au->additional_rows[i]); -	if (0 > qs) -	  break; +                                           session, +                                           au->additional_rows[i]); +        if (0 > qs) +          break;        }      }      if (GNUNET_DB_STATUS_SOFT_ERROR == qs)      {        GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, -		  "Serialization issue, trying again later!\n"); +                  "Serialization issue, trying again later!\n");        db_plugin->rollback (db_plugin->cls, -			   session); +                           session);        cleanup_au ();        /* start again */        task = GNUNET_SCHEDULER_add_now (&run_aggregation, -				       NULL); +                                       NULL);        return;      }      if (GNUNET_DB_STATUS_HARD_ERROR == qs)      {        db_plugin->rollback (db_plugin->cls, -			   session); +                           session);        cleanup_au ();        GNUNET_SCHEDULER_shutdown ();        return; @@ -1509,14 +1513,15 @@ run_aggregation (void *cls)      char *url;      url = TALER_JSON_wire_to_payto (au->wire); -    au->ph = au->wa->wire_plugin->prepare_wire_transfer (au->wa->wire_plugin->cls, -                                                         au->wa->section_name, -                                                         url, -                                                         &au->final_amount, -                                                         exchange_base_url, -                                                         &au->wtid, -                                                         &prepare_cb, -                                                         au); +    au->ph = au->wa->wire_plugin->prepare_wire_transfer ( +      au->wa->wire_plugin->cls, +      au->wa->section_name, +      url, +      &au->final_amount, +      exchange_base_url, +      &au->wtid, +      &prepare_cb, +      au);      GNUNET_free (url);    }    if (NULL == au->ph) @@ -1563,24 +1568,24 @@ prepare_cb (void *cls,    }    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, -	      "Storing %u bytes of wire prepare data\n", -	      (unsigned int) buf_size); +              "Storing %u bytes of wire prepare data\n", +              (unsigned int) buf_size);    /* Commit our intention to execute the wire transfer! */    qs = db_plugin->wire_prepare_data_insert (db_plugin->cls, -					    session, -					    au->wa->wire_plugin->method, -					    buf, -					    buf_size); +                                            session, +                                            au->wa->wire_plugin->method, +                                            buf, +                                            buf_size);    /* Commit the WTID data to 'wire_out' to finally satisfy aggregation       table constraints */    if (qs >= 0)      qs = db_plugin->store_wire_transfer_out (db_plugin->cls, -					     session, -					     au->execution_time, -					     &au->wtid, -					     au->wire, +                                             session, +                                             au->execution_time, +                                             &au->wtid, +                                             au->wire,                                               au->wa->section_name, -					     &au->final_amount); +                                             &au->final_amount);    cleanup_au ();    if (GNUNET_DB_STATUS_SOFT_ERROR == qs)    { @@ -1672,8 +1677,8 @@ wire_confirm_cb (void *cls,      return;    }    qs = db_plugin->wire_prepare_data_mark_finished (db_plugin->cls, -						   session, -						   wpd->row_id); +                                                   session, +                                                   wpd->row_id);    if (0 >= qs)    {      GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); @@ -1683,7 +1688,7 @@ wire_confirm_cb (void *cls,      {        /* try again */        task = GNUNET_SCHEDULER_add_now (&run_aggregation, -				       NULL); +                                       NULL);      }      else      { @@ -1759,11 +1764,12 @@ wire_prepare_cb (void *cls,      wpd = NULL;      return;    } -  wpd->eh = wpd->wa->wire_plugin->execute_wire_transfer (wpd->wa->wire_plugin->cls, -                                                         buf, -                                                         buf_size, -                                                         &wire_confirm_cb, -                                                         NULL); +  wpd->eh = wpd->wa->wire_plugin->execute_wire_transfer ( +    wpd->wa->wire_plugin->cls, +    buf, +    buf_size, +    &wire_confirm_cb, +    NULL);    if (NULL == wpd->eh)    {      GNUNET_break (0); /* why? how to best recover? */ @@ -1821,13 +1827,13 @@ run_transfers (void *cls)    wpd = GNUNET_new (struct WirePrepareData);    wpd->session = session;    qs = db_plugin->wire_prepare_data_get (db_plugin->cls, -					 session, -					 &wire_prepare_cb, -					 NULL); +                                         session, +                                         &wire_prepare_cb, +                                         NULL);    if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT == qs)      return;  /* continues in #wire_prepare_cb() */    db_plugin->rollback (db_plugin->cls, -		       session); +                       session);    GNUNET_free (wpd);    wpd = NULL;    switch (qs) @@ -1840,7 +1846,7 @@ run_transfers (void *cls)    case GNUNET_DB_STATUS_SOFT_ERROR:      /* try again */      task = GNUNET_SCHEDULER_add_now (&run_transfers, -				     NULL); +                                     NULL);      return;    case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:      /* no more prepared wire transfers, go back to aggregation! */ @@ -1910,9 +1916,9 @@ main (int argc,  {    struct GNUNET_GETOPT_CommandLineOption options[] = {      GNUNET_GETOPT_option_flag ('t', -			       "test", -			       "run in test mode and exit when idle", -			       &test_mode), +                               "test", +                               "run in test mode and exit when idle", +                               &test_mode),      GNUNET_GETOPT_option_version (VERSION "-" VCS_VERSION),      GNUNET_GETOPT_OPTION_END    }; @@ -1923,7 +1929,8 @@ main (int argc,    if (GNUNET_OK !=        GNUNET_PROGRAM_run (argc, argv,                            "taler-exchange-aggregator", -                          gettext_noop ("background process that aggregates and executes wire transfers to merchants"), +                          gettext_noop ( +                            "background process that aggregates and executes wire transfers to merchants"),                            options,                            &run, NULL))    {  | 
