diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/exchange/taler-exchange-wirewatch.c | 19 | ||||
| -rw-r--r-- | src/exchangedb/plugin_exchangedb_postgres.c | 112 | 
2 files changed, 113 insertions, 18 deletions
| diff --git a/src/exchange/taler-exchange-wirewatch.c b/src/exchange/taler-exchange-wirewatch.c index 5d35eba5..03f6e9e8 100644 --- a/src/exchange/taler-exchange-wirewatch.c +++ b/src/exchange/taler-exchange-wirewatch.c @@ -116,7 +116,7 @@ struct WireAccount    /**     * How much do we incremnt @e batch_size on success?     */ -  unsigned int batch_increment; +  unsigned int batch_thresh;    /**     * How many transactions did we see in the current batch? @@ -375,8 +375,8 @@ handle_soft_error (struct WireAccount *wa)                         wa->session);    if (1 < wa->batch_size)    { +    wa->batch_thresh = wa->batch_size;      wa->batch_size /= 2; -    wa->batch_increment = 0;      GNUNET_log (GNUNET_ERROR_TYPE_INFO,                  "Reduced batch size to %llu due to serialization issue\n",                  (unsigned long long) wa->batch_size); @@ -451,9 +451,13 @@ do_commit (struct WireAccount *wa)    wa->session = NULL;     /* should not be needed */    if (wa->batch_size < MAXIMUM_BATCH_SIZE)    { -    wa->batch_increment++; +    int delta; + +    delta = ((int) wa->batch_thresh - (int) wa->batch_size) / 4; +    if (delta < 0) +      delta = -delta;      wa->batch_size = GNUNET_MIN (MAXIMUM_BATCH_SIZE, -                                 wa->batch_size + wa->batch_increment); +                                 wa->batch_size + delta + 1);      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,                  "Increasing batch size to %llu\n",                  (unsigned long long) wa->batch_size); @@ -669,9 +673,9 @@ find_transfers (void *cls)      }    }    if (GNUNET_OK != -      db_plugin->start (db_plugin->cls, -                        session, -                        "wirewatch check for incoming wire transfers")) +      db_plugin->start_read_committed (db_plugin->cls, +                                       session, +                                       "wirewatch check for incoming wire transfers"))    {      GNUNET_log (GNUNET_ERROR_TYPE_ERROR,                  "Failed to start database transaction!\n"); @@ -679,6 +683,7 @@ find_transfers (void *cls)      GNUNET_SCHEDULER_shutdown ();      return;    } +    limit = GNUNET_MIN (wa_pos->batch_size,                        wa_pos->shard_end - wa_pos->batch_start);    GNUNET_assert (NULL == wa_pos->hh); diff --git a/src/exchangedb/plugin_exchangedb_postgres.c b/src/exchangedb/plugin_exchangedb_postgres.c index d06fc6a1..886d26ed 100644 --- a/src/exchangedb/plugin_exchangedb_postgres.c +++ b/src/exchangedb/plugin_exchangedb_postgres.c @@ -426,7 +426,6 @@ postgres_get_session (void *cls)                                " WHERE reserve_pub=$1"                                " LIMIT 1;",                                1), -      /* Used in #postgres_reserves_in_insert() when the reserve is new */        GNUNET_PQ_make_prepare ("reserve_create",                                "INSERT INTO reserves "                                "(reserve_pub" @@ -2571,6 +2570,47 @@ postgres_start (void *cls,  /** + * Start a READ COMMITTED transaction. + * + * @param cls the `struct PostgresClosure` with the plugin-specific state + * @param session the database connection + * @param name unique name identifying the transaction (for debugging) + *             must point to a constant + * @return #GNUNET_OK on success + */ +static int +postgres_start_read_committed (void *cls, +                               struct TALER_EXCHANGEDB_Session *session, +                               const char *name) +{ +  struct GNUNET_PQ_ExecuteStatement es[] = { +    GNUNET_PQ_make_execute ("START TRANSACTION ISOLATION LEVEL READ COMMITTED"), +    GNUNET_PQ_EXECUTE_STATEMENT_END +  }; + +  (void) cls; +  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, +              "Starting transaction named: %s\n", +              name); +  postgres_preflight (cls, +                      session); +  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, +              "Starting transaction on %p\n", +              session->conn); +  if (GNUNET_OK != +      GNUNET_PQ_exec_statements (session->conn, +                                 es)) +  { +    TALER_LOG_ERROR ("Failed to start transaction\n"); +    GNUNET_break (0); +    return GNUNET_SYSERR; +  } +  session->transaction_name = name; +  return GNUNET_OK; +} + + +/**   * Roll back the current transaction of a database connection.   *   * @param cls the `struct PostgresClosure` with the plugin-specific state @@ -3446,11 +3486,18 @@ postgres_reserves_in_insert (void *cls,    enum GNUNET_DB_QueryStatus qs1;    struct TALER_EXCHANGEDB_Reserve reserve;    struct GNUNET_TIME_Absolute expiry; +  struct GNUNET_TIME_Absolute gc; +  struct GNUNET_TIME_Absolute now; +  now = GNUNET_TIME_absolute_get (); +  (void) GNUNET_TIME_round_abs (&now);    reserve.pub = *reserve_pub;    expiry = GNUNET_TIME_absolute_add (execution_time,                                       pg->idle_reserve_expiration_time);    (void) GNUNET_TIME_round_abs (&expiry); +  gc = GNUNET_TIME_absolute_add (now, +                                 pg->legal_reserve_expiration_time); +  (void) GNUNET_TIME_round_abs (&gc);    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,                "Creating reserve %s with expiration in %s\n",                TALER_B2S (reserve_pub), @@ -3467,7 +3514,7 @@ postgres_reserves_in_insert (void *cls,        GNUNET_PQ_query_param_string (sender_account_details),        TALER_PQ_query_param_amount (balance),        TALER_PQ_query_param_absolute_time (&expiry), -      TALER_PQ_query_param_absolute_time (&expiry), +      TALER_PQ_query_param_absolute_time (&gc),        GNUNET_PQ_query_param_end      }; @@ -3505,6 +3552,13 @@ postgres_reserves_in_insert (void *cls,      }      if (0 >= qs2)      { +      if ( (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs2) && +           (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS != qs1) ) +      { +        GNUNET_break (0); /* should be impossible: reserve was fresh, +                             but transaction already known */ +        return GNUNET_DB_STATUS_HARD_ERROR; +      }        /* Transaction was already known or error. We are finished. */        return qs2;      } @@ -3515,6 +3569,22 @@ postgres_reserves_in_insert (void *cls,    /* we were wrong with our optimistic assumption:       reserve does exist, need to do an update instead */    { +    enum GNUNET_DB_QueryStatus cs; + +    cs = postgres_commit (cls, +                          session); +    if (cs < 0) +      return cs; +    if (GNUNET_OK != +        postgres_start (cls, +                        session, +                        "reserve-update-serializable")) +    { +      GNUNET_break (0); +      return GNUNET_DB_STATUS_HARD_ERROR; +    } +  } +  {      enum GNUNET_DB_QueryStatus reserve_exists;      reserve_exists = postgres_reserves_get (cls, @@ -3560,7 +3630,7 @@ postgres_reserves_in_insert (void *cls,      updated_reserve.expiry = GNUNET_TIME_absolute_max (expiry,                                                         reserve.expiry);      (void) GNUNET_TIME_round_abs (&updated_reserve.expiry); -    updated_reserve.gc = GNUNET_TIME_absolute_max (updated_reserve.expiry, +    updated_reserve.gc = GNUNET_TIME_absolute_max (gc,                                                     reserve.gc);      (void) GNUNET_TIME_round_abs (&updated_reserve.gc);      qs3 = reserves_update (cls, @@ -3581,8 +3651,26 @@ postgres_reserves_in_insert (void *cls,        /* continued below */        break;      } -    return qs3;    } + +  /* Go back to original transaction mode */ +  { +    enum GNUNET_DB_QueryStatus cs; + +    cs = postgres_commit (cls, +                          session); +    if (cs < 0) +      return cs; +    if (GNUNET_OK != +        postgres_start_read_committed (cls, +                                       session, +                                       "reserve-insert-continued")) +    { +      GNUNET_break (0); +      return GNUNET_DB_STATUS_HARD_ERROR; +    } +  } +  return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;  } @@ -3698,7 +3786,7 @@ postgres_insert_withdraw_info (    struct PostgresClosure *pg = cls;    struct TALER_EXCHANGEDB_Reserve reserve;    struct GNUNET_TIME_Absolute now; -  struct GNUNET_TIME_Absolute expiry; +  struct GNUNET_TIME_Absolute gc;    struct GNUNET_PQ_QueryParam params[] = {      GNUNET_PQ_query_param_auto_from_type (&collectable->h_coin_envelope),      GNUNET_PQ_query_param_auto_from_type (&collectable->denom_pub_hash), @@ -3749,9 +3837,9 @@ postgres_insert_withdraw_info (                  TALER_B2S (&collectable->reserve_pub));      return GNUNET_DB_STATUS_HARD_ERROR;    } -  expiry = GNUNET_TIME_absolute_add (now, -                                     pg->legal_reserve_expiration_time); -  reserve.gc = GNUNET_TIME_absolute_max (expiry, +  gc = GNUNET_TIME_absolute_add (now, +                                 pg->legal_reserve_expiration_time); +  reserve.gc = GNUNET_TIME_absolute_max (gc,                                           reserve.gc);    (void) GNUNET_TIME_round_abs (&reserve.gc);    qs = reserves_update (cls, @@ -8474,6 +8562,7 @@ postgres_insert_recoup_request (  {    struct PostgresClosure *pg = cls;    struct GNUNET_TIME_Absolute expiry; +  struct GNUNET_TIME_Absolute gc;    struct TALER_EXCHANGEDB_Reserve reserve;    struct GNUNET_PQ_QueryParam params[] = {      GNUNET_PQ_query_param_auto_from_type (&coin->coin_pub), @@ -8517,9 +8606,9 @@ postgres_insert_recoup_request (    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,                "Inserting recoup for coin %s\n",                TALER_B2S (&coin->coin_pub)); -  expiry = GNUNET_TIME_absolute_add (timestamp, -                                     pg->legal_reserve_expiration_time); -  reserve.gc = GNUNET_TIME_absolute_max (expiry, +  gc = GNUNET_TIME_absolute_add (timestamp, +                                 pg->legal_reserve_expiration_time); +  reserve.gc = GNUNET_TIME_absolute_max (gc,                                           reserve.gc);    (void) GNUNET_TIME_round_abs (&reserve.gc);    expiry = GNUNET_TIME_absolute_add (timestamp, @@ -10549,6 +10638,7 @@ libtaler_plugin_exchangedb_postgres_init (void *cls)    plugin->drop_tables = &postgres_drop_tables;    plugin->create_tables = &postgres_create_tables;    plugin->start = &postgres_start; +  plugin->start_read_committed = &postgres_start_read_committed;    plugin->commit = &postgres_commit;    plugin->preflight = &postgres_preflight;    plugin->rollback = &postgres_rollback; | 
