diff options
Diffstat (limited to 'src/exchange/taler-exchange-aggregator.c')
| -rw-r--r-- | src/exchange/taler-exchange-aggregator.c | 150 | 
1 files changed, 93 insertions, 57 deletions
diff --git a/src/exchange/taler-exchange-aggregator.c b/src/exchange/taler-exchange-aggregator.c index 10240190..8dd46f7f 100644 --- a/src/exchange/taler-exchange-aggregator.c +++ b/src/exchange/taler-exchange-aggregator.c @@ -812,6 +812,8 @@ prepare_close_cb (void *cls,  		  const char *buf,  		  size_t buf_size)  { +  enum GNUNET_DB_QueryStatus qs; +        GNUNET_assert (cls == ctc);    GNUNET_log (GNUNET_ERROR_TYPE_INFO, @@ -832,14 +834,25 @@ prepare_close_cb (void *cls,    }    /* Commit our intention to execute the wire transfer! */ -  if (GNUNET_OK != -      db_plugin->wire_prepare_data_insert (db_plugin->cls, -                                           ctc->session, -                                           ctc->type, -                                           buf, -                                           buf_size)) +  qs = db_plugin->wire_prepare_data_insert (db_plugin->cls, +					    ctc->session, +					    ctc->type, +					    buf, +					    buf_size); +  if (GNUNET_DB_STATUS_HARD_ERROR == qs) +  { +    GNUNET_break (0);  +    db_plugin->rollback (db_plugin->cls, +                         ctc->session); +    global_ret = GNUNET_SYSERR; +    GNUNET_SCHEDULER_shutdown (); +    GNUNET_free (ctc->type); +    GNUNET_free (ctc); +    ctc = NULL; +    return; +  } +  if (GNUNET_DB_STATUS_SOFT_ERROR == qs)    { -    GNUNET_break (0); /* why? how to best recover? */      db_plugin->rollback (db_plugin->cls,                           ctc->session);      /* start again */ @@ -864,32 +877,52 @@ prepare_close_cb (void *cls,  /** + * Closure for #expired_reserve_cb(). + */ +struct ExpiredReserveContext +{ + +  /** +   * Database session we are using. +   */ +  struct TALER_EXCHANGEDB_Session *session; + +  /** +   * Set to #GNUNET_YES if the transaction continues +   * asynchronously. +   */ +  int async_cont; +}; + + +/**   * Function called with details about expired reserves.   * We trigger the reserve closure by inserting the respective   * closing record and prewire instructions into the respective   * tables.   * - * @param cls a `struct TALER_EXCHANGEDB_Session *` + * @param cls a `struct ExpiredReserveContext *`   * @param reserve_pub public key of the reserve   * @param left amount left in the reserve   * @param account_details information about the reserve's bank account   * @param expiration_date when did the reserve expire - * @return #GNUNET_OK to continue to iterate, #GNUNET_SYSERR to stop + * @return transaction status code   */ -static int +static enum GNUNET_DB_QueryStatus  expired_reserve_cb (void *cls,  		    const struct TALER_ReservePublicKeyP *reserve_pub,  		    const struct TALER_Amount *left,  		    const json_t *account_details,  		    struct GNUNET_TIME_Absolute expiration_date)  { -  struct TALER_EXCHANGEDB_Session *session = cls; +  struct ExpiredReserveContext *erc = cls; +  struct TALER_EXCHANGEDB_Session *session = erc->session;    struct GNUNET_TIME_Absolute now;    struct TALER_WireTransferIdentifierRawP wtid;    struct TALER_Amount amount_without_fee;    const struct TALER_Amount *closing_fee;    int ret; -  int iret; +  enum GNUNET_DB_QueryStatus qs;    const char *type;    struct WirePlugin *wp; @@ -901,21 +934,17 @@ expired_reserve_cb (void *cls,    if (NULL == type)    {      GNUNET_break (0); -    db_plugin->rollback (db_plugin->cls, -			 session);      global_ret = GNUNET_SYSERR;      GNUNET_SCHEDULER_shutdown (); -    return GNUNET_SYSERR; +    return GNUNET_DB_STATUS_HARD_ERROR;    }    wp = find_plugin (type);    if (NULL == wp)    {      GNUNET_break (0); -    db_plugin->rollback (db_plugin->cls, -			 session);      global_ret = GNUNET_SYSERR;      GNUNET_SCHEDULER_shutdown (); -    return GNUNET_SYSERR; +    return GNUNET_DB_STATUS_HARD_ERROR;    }    /* lookup `closing_fee` */ @@ -925,11 +954,9 @@ expired_reserve_cb (void *cls,  		   session))    {      GNUNET_break (0); -    db_plugin->rollback (db_plugin->cls, -			 session);      global_ret = GNUNET_SYSERR;      GNUNET_SCHEDULER_shutdown (); -    return GNUNET_SYSERR; +    return GNUNET_DB_STATUS_HARD_ERROR;    }    closing_fee = &wp->af->closing_fee; @@ -956,22 +983,22 @@ expired_reserve_cb (void *cls,  	  reserve_pub,  	  GNUNET_MIN (sizeof (wtid),  		      sizeof (*reserve_pub))); -  iret = db_plugin->insert_reserve_closed (db_plugin->cls, -					   session, -					   reserve_pub, -					   now, -					   account_details, -					   &wtid, -					   left, -					   closing_fee); +  qs = db_plugin->insert_reserve_closed (db_plugin->cls, +					 session, +					 reserve_pub, +					 now, +					 account_details, +					 &wtid, +					 left, +					 closing_fee);    GNUNET_log (GNUNET_ERROR_TYPE_INFO,                "Closing reserve %s over %s (%d, %d)\n",                TALER_B2S (reserve_pub),                TALER_amount2s (left),                ret, -              iret); +              qs);    if ( (GNUNET_OK == ret) && -       (GNUNET_OK == iret) ) +       (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT == qs) )    {      /* success, perform wire transfer */      if (GNUNET_SYSERR == @@ -979,11 +1006,9 @@ expired_reserve_cb (void *cls,  				       &amount_without_fee))      {        GNUNET_break (0); -      db_plugin->rollback (db_plugin->cls, -			   session);        global_ret = GNUNET_SYSERR;        GNUNET_SCHEDULER_shutdown (); -      return GNUNET_SYSERR; +      return GNUNET_DB_STATUS_HARD_ERROR;      }      ctc = GNUNET_new (struct CloseTransferContext);      ctc->wp = wp; @@ -1000,33 +1025,29 @@ expired_reserve_cb (void *cls,      if (NULL == ctc->ph)      {        GNUNET_break (0); -      db_plugin->rollback (db_plugin->cls, -			   session);        global_ret = GNUNET_SYSERR;        GNUNET_SCHEDULER_shutdown ();        GNUNET_free (ctc->type);        GNUNET_free (ctc);        ctc = NULL; +      return GNUNET_DB_STATUS_HARD_ERROR;      } -    return GNUNET_SYSERR; +    erc->async_cont = GNUNET_YES; +    return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;    }    /* Check for hard failure */ -  if (GNUNET_SYSERR == iret) +  if ( (GNUNET_SYSERR == ret) || +       (GNUNET_DB_STATUS_HARD_ERROR == qs) )    {      GNUNET_break (0); -    db_plugin->rollback (db_plugin->cls, -                         session);      global_ret = GNUNET_SYSERR;      GNUNET_SCHEDULER_shutdown (); -    return GNUNET_SYSERR; +    return GNUNET_DB_STATUS_HARD_ERROR;    } -  /* Reserve balance was almost zero; just commit */ +  /* Reserve balance was almost zero OR soft error */    GNUNET_log (GNUNET_ERROR_TYPE_INFO,                "Reserve was virtually empty, moving on\n"); -  (void) commit_or_warn (session); -  task = GNUNET_SCHEDULER_add_now (&run_reserve_closures, -				   NULL); -  return GNUNET_SYSERR; +  return qs;  } @@ -1040,9 +1061,10 @@ static void  run_reserve_closures (void *cls)  {    struct TALER_EXCHANGEDB_Session *session; -  int ret; +  enum GNUNET_DB_QueryStatus qs;    const struct GNUNET_SCHEDULER_TaskContext *tc; - +  struct ExpiredReserveContext erc; +      task = NULL;    reserves_idle = GNUNET_NO;    tc = GNUNET_SCHEDULER_get_task_context (); @@ -1068,22 +1090,29 @@ run_reserve_closures (void *cls)      GNUNET_SCHEDULER_shutdown ();      return;    } -  ret = db_plugin->get_expired_reserves (db_plugin->cls, -					 session, -					 GNUNET_TIME_absolute_get (), -					 &expired_reserve_cb, -					 session); -  if (GNUNET_SYSERR == ret) +  erc.session = session; +  erc.async_cont = GNUNET_NO; +  qs = db_plugin->get_expired_reserves (db_plugin->cls, +					session, +					GNUNET_TIME_absolute_get (), +					&expired_reserve_cb, +					&erc); +  switch (qs)     { +  case GNUNET_DB_STATUS_HARD_ERROR:          GNUNET_break (0);      db_plugin->rollback (db_plugin->cls,                           session);      global_ret = GNUNET_SYSERR;      GNUNET_SCHEDULER_shutdown ();      return; -  } -  if (GNUNET_NO == ret) -  { +  case GNUNET_DB_STATUS_SOFT_ERROR: +    db_plugin->rollback (db_plugin->cls, +                         session); +    task = GNUNET_SCHEDULER_add_now (&run_reserve_closures, +				     NULL); +    return; +  case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:      GNUNET_log (GNUNET_ERROR_TYPE_INFO,                  "No more idle reserves, going back to aggregation\n");      reserves_idle = GNUNET_YES; @@ -1092,6 +1121,13 @@ run_reserve_closures (void *cls)      task = GNUNET_SCHEDULER_add_now (&run_aggregation,  				     NULL);      return; +  case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: +    if (GNUNET_YES == erc.async_cont) +      break; +    (void) commit_or_warn (session); +    task = GNUNET_SCHEDULER_add_now (&run_reserve_closures, +				     NULL); +    return;    }  }  | 
