diff options
Diffstat (limited to 'src/exchangedb')
| -rw-r--r-- | src/exchangedb/Makefile.am | 4 | ||||
| -rw-r--r-- | src/exchangedb/drop0002.sql | 2 | ||||
| -rw-r--r-- | src/exchangedb/drop0003.sql | 26 | ||||
| -rw-r--r-- | src/exchangedb/exchange-0003.sql | 75 | ||||
| -rw-r--r-- | src/exchangedb/plugin_exchangedb_postgres.c | 397 | ||||
| -rw-r--r-- | src/exchangedb/test_exchangedb.c | 15 | 
6 files changed, 493 insertions, 26 deletions
| diff --git a/src/exchangedb/Makefile.am b/src/exchangedb/Makefile.am index 5114fb9c..eae03726 100644 --- a/src/exchangedb/Makefile.am +++ b/src/exchangedb/Makefile.am @@ -18,8 +18,10 @@ sql_DATA = \    exchange-0000.sql \    exchange-0001.sql \    exchange-0002.sql \ +  exchange-0003.sql \    drop0001.sql \ -  drop0002.sql +  drop0002.sql \ +  drop0003.sql  EXTRA_DIST = \    exchangedb.conf \ diff --git a/src/exchangedb/drop0002.sql b/src/exchangedb/drop0002.sql index 5bffab66..12db64c5 100644 --- a/src/exchangedb/drop0002.sql +++ b/src/exchangedb/drop0002.sql @@ -17,8 +17,6 @@  -- Everything in one big transaction  BEGIN; --- exchange-0002 did not create new tables, so nothing to do here. -  -- Unregister patch (0002.sql)  SELECT _v.unregister_patch('exchange-0002'); diff --git a/src/exchangedb/drop0003.sql b/src/exchangedb/drop0003.sql new file mode 100644 index 00000000..fbdab04c --- /dev/null +++ b/src/exchangedb/drop0003.sql @@ -0,0 +1,26 @@ +-- +-- This file is part of TALER +-- Copyright (C) 2020 Taler Systems SA +-- +-- TALER is free software; you can redistribute it and/or modify it under the +-- terms of the GNU General Public License as published by the Free Software +-- Foundation; either version 3, or (at your option) any later version. +-- +-- TALER is distributed in the hope that it will be useful, but WITHOUT ANY +-- WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR +-- A PARTICULAR PURPOSE.  See the GNU General Public License for more details. +-- +-- You should have received a copy of the GNU General Public License along with +-- TALER; see the file COPYING.  If not, see <http://www.gnu.org/licenses/> +-- + +-- Everything in one big transaction +BEGIN; + +-- Unregister patch (0003.sql) +SELECT _v.unregister_patch('exchange-0003'); + +DROP TABLE IF EXISTS revolving_work_shards CASCADE; + +-- And we're out of here... +COMMIT; diff --git a/src/exchangedb/exchange-0003.sql b/src/exchangedb/exchange-0003.sql new file mode 100644 index 00000000..e1c9273f --- /dev/null +++ b/src/exchangedb/exchange-0003.sql @@ -0,0 +1,75 @@ +-- +-- This file is part of TALER +-- Copyright (C) 2021 Taler Systems SA +-- +-- TALER is free software; you can redistribute it and/or modify it under the +-- terms of the GNU General Public License as published by the Free Software +-- Foundation; either version 3, or (at your option) any later version. +-- +-- TALER is distributed in the hope that it will be useful, but WITHOUT ANY +-- WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR +-- A PARTICULAR PURPOSE.  See the GNU General Public License for more details. +-- +-- You should have received a copy of the GNU General Public License along with +-- TALER; see the file COPYING.  If not, see <http://www.gnu.org/licenses/> +-- + +-- Everything in one big transaction +BEGIN; + +-- Check patch versioning is in place. +SELECT _v.register_patch('exchange-0003', NULL, NULL); + + + +ALTER TABLE deposits +  ADD COLUMN shard INT4 NOT NULL DEFAULT 0; +COMMENT ON COLUMN deposits.shard +  IS 'Used for load sharding. Should be set based on h_wire, merchant_pub and a service salt. Default of 0 onlyapplies for colums migrated from a previous version without sharding support. 64-bit value because we need an *unsigned* 32-bit value.'; + +DROP INDEX deposits_get_ready_index; +CREATE INDEX deposits_get_ready_index +  ON deposits +  (shard +  ,tiny +  ,done +  ,wire_deadline +  ,refund_deadline +  ); +COMMENT ON INDEX deposits_get_ready_index +  IS 'for deposits_get_ready'; + + + +CREATE UNLOGGED TABLE IF NOT EXISTS revolving_work_shards +  (shard_serial_id BIGSERIAL UNIQUE +  ,last_attempt INT8 NOT NULL +  ,start_row INT4 NOT NULL +  ,end_row INT4 NOT NULL +  ,active BOOLEAN NOT NULL DEFAULT FALSE +  ,job_name VARCHAR NOT NULL +  ,PRIMARY KEY (job_name, start_row) +  ); +CREATE INDEX IF NOT EXISTS revolving_work_shards_index +  ON revolving_work_shards +  (job_name +  ,active +  ,last_attempt +  ); +COMMENT ON TABLE revolving_work_shards +  IS 'coordinates work between multiple processes working on the same job with partitions that need to be repeatedly processed; unlogged because on system crashes the locks represented by this table will have to be cleared anyway, typically using "taler-exchange-dbinit -s"'; +COMMENT ON COLUMN revolving_work_shards.shard_serial_id +  IS 'unique serial number identifying the shard'; +COMMENT ON COLUMN revolving_work_shards.last_attempt +  IS 'last time a worker attempted to work on the shard'; +COMMENT ON COLUMN revolving_work_shards.active +  IS 'set to TRUE when a worker is active on the shard'; +COMMENT ON COLUMN revolving_work_shards.start_row +  IS 'row at which the shard scope starts, inclusive'; +COMMENT ON COLUMN revolving_work_shards.end_row +  IS 'row at which the shard scope ends, exclusive'; +COMMENT ON COLUMN revolving_work_shards.job_name +  IS 'unique name of the job the workers on this shard are performing'; + +-- Complete transaction +COMMIT; diff --git a/src/exchangedb/plugin_exchangedb_postgres.c b/src/exchangedb/plugin_exchangedb_postgres.c index ae090baf..70c337c5 100644 --- a/src/exchangedb/plugin_exchangedb_postgres.c +++ b/src/exchangedb/plugin_exchangedb_postgres.c @@ -874,11 +874,12 @@ prepare_statements (struct PostgresClosure *pg)                              ",coin_sig"                              ",wire"                              ",exchange_timestamp" +                            ",shard"                              ") SELECT known_coin_id, $2, $3, $4, $5, $6, " -                            " $7, $8, $9, $10, $11, $12" +                            " $7, $8, $9, $10, $11, $12, $13"                              "    FROM known_coins"                              "   WHERE coin_pub=$1;", -                            12), +                            13),      /* Fetch an existing deposit request, used to ensure idempotency         during /deposit processing. Used in #postgres_have_deposit(). */      GNUNET_PQ_make_prepare ("get_deposit", @@ -958,13 +959,18 @@ prepare_statements (struct PostgresClosure *pg)                              " FROM deposits"                              "    JOIN known_coins kc USING (known_coin_id)"                              "    JOIN denominations denom USING (denominations_serial)" -                            " WHERE tiny=FALSE" -                            "    AND done=FALSE" -                            "    AND wire_deadline<=$1" -                            "    AND refund_deadline<$1" -                            " ORDER BY wire_deadline ASC" +                            " WHERE " +                            "       shard >= $2" +                            "   AND shard <= $3" +                            "   AND tiny=FALSE" +                            "   AND done=FALSE" +                            "   AND wire_deadline<=$1" +                            "   AND refund_deadline<$1" +                            " ORDER BY " +                            "   shard ASC" +                            "  ,wire_deadline ASC"                              " LIMIT 1;", -                            1), +                            3),      /* Used in #postgres_iterate_matching_deposits() */      GNUNET_PQ_make_prepare ("deposits_iterate_matching",                              "SELECT" @@ -2399,6 +2405,18 @@ prepare_statements (struct PostgresClosure *pg)                              " ORDER BY last_attempt ASC"                              " LIMIT 1;",                              2), +    /* Used in #postgres_begin_revolving_shard() */ +    GNUNET_PQ_make_prepare ("get_open_revolving_shard", +                            "SELECT" +                            " start_row" +                            ",end_row" +                            " FROM revolving_work_shards" +                            " WHERE job_name=$1" +                            "   AND active=FALSE" +                            " ORDER BY last_attempt ASC" +                            " LIMIT 1;", +                            2), +    /* Used in #postgres_begin_shard() */      GNUNET_PQ_make_prepare ("reclaim_shard",                              "UPDATE work_shards"                              " SET last_attempt=$2" @@ -2406,6 +2424,16 @@ prepare_statements (struct PostgresClosure *pg)                              "   AND start_row=$3"                              "   AND end_row=$4",                              4), +    /* Used in #postgres_begin_revolving_shard() */ +    GNUNET_PQ_make_prepare ("reclaim_revolving_shard", +                            "UPDATE revolving_work_shards" +                            " SET last_attempt=$2" +                            "    ,active=TRUE" +                            " WHERE job_name=$1" +                            "   AND start_row=$3" +                            "   AND end_row=$4", +                            4), +    /* Used in #postgres_begin_shard() */      GNUNET_PQ_make_prepare ("get_last_shard",                              "SELECT"                              " end_row" @@ -2414,6 +2442,16 @@ prepare_statements (struct PostgresClosure *pg)                              " ORDER BY end_row DESC"                              " LIMIT 1;",                              1), +    /* Used in #postgres_begin_revolving_shard() */ +    GNUNET_PQ_make_prepare ("get_last_revolving_shard", +                            "SELECT" +                            " end_row" +                            " FROM revolving_work_shards" +                            " WHERE job_name=$1" +                            " ORDER BY end_row DESC" +                            " LIMIT 1;", +                            1), +    /* Used in #postgres_begin_shard() */      GNUNET_PQ_make_prepare ("claim_next_shard",                              "INSERT INTO work_shards"                              "(job_name" @@ -2423,6 +2461,17 @@ prepare_statements (struct PostgresClosure *pg)                              ") VALUES "                              "($1, $2, $3, $4);",                              4), +    /* Used in #postgres_claim_revolving_shard() */ +    GNUNET_PQ_make_prepare ("create_revolving_shard", +                            "INSERT INTO revolving_work_shards" +                            "(job_name" +                            ",last_attempt" +                            ",start_row" +                            ",end_row" +                            ",active" +                            ") VALUES " +                            "($1, $2, $3, $4, TRUE);", +                            4),      /* Used in #postgres_complete_shard() */      GNUNET_PQ_make_prepare ("complete_shard",                              "UPDATE work_shards" @@ -2431,6 +2480,18 @@ prepare_statements (struct PostgresClosure *pg)                              "   AND start_row=$2"                              "   AND end_row=$3",                              3), +    /* Used in #postgres_complete_shard() */ +    GNUNET_PQ_make_prepare ("release_revolving_shard", +                            "UPDATE revolving_work_shards" +                            " SET active=FALSE" +                            " WHERE job_name=$1" +                            "   AND start_row=$2" +                            "   AND end_row=$3", +                            3), +    /* Used in #postgres_delete_revolving_shards() */ +    GNUNET_PQ_make_prepare ("delete_revolving_shards", +                            "DELETE FROM revolving_work_shards", +                            0),      GNUNET_PQ_PREPARED_STATEMENT_END    }; @@ -4462,12 +4523,16 @@ postgres_mark_deposit_done (void *cls,   * execution time must be in the past.   *   * @param cls the @e cls of this struct with the plugin-specific state + * @param start_shard_row minimum shard row to select + * @param end_shard_row maximum shard row to select (inclusive)   * @param deposit_cb function to call for ONE such deposit   * @param deposit_cb_cls closure for @a deposit_cb   * @return transaction status code   */  static enum GNUNET_DB_QueryStatus  postgres_get_ready_deposit (void *cls, +                            uint32_t start_shard_row, +                            uint32_t end_shard_row,                              TALER_EXCHANGEDB_DepositIterator deposit_cb,                              void *deposit_cb_cls)  { @@ -4475,6 +4540,8 @@ postgres_get_ready_deposit (void *cls,    struct GNUNET_TIME_Absolute now = GNUNET_TIME_absolute_get ();    struct GNUNET_PQ_QueryParam params[] = {      TALER_PQ_query_param_absolute_time (&now), +    GNUNET_PQ_query_param_uint32 (&start_shard_row), +    GNUNET_PQ_query_param_uint32 (&end_shard_row),      GNUNET_PQ_query_param_end    };    struct TALER_Amount amount_with_fee; @@ -4504,6 +4571,8 @@ postgres_get_ready_deposit (void *cls,    enum GNUNET_DB_QueryStatus qs;    (void) GNUNET_TIME_round_abs (&now); +  GNUNET_assert (start_shard_row < end_shard_row); +  GNUNET_assert (end_shard_row <= INT32_MAX);    GNUNET_log (GNUNET_ERROR_TYPE_INFO,                "Finding ready deposits by deadline %s (%llu)\n",                GNUNET_STRINGS_absolute_time_to_string (now), @@ -4901,6 +4970,35 @@ postgres_ensure_coin_known (void *cls,  /** + * Compute the shard number of a given @a deposit + * + * @param deposit deposit to compute shard for + * @return shard number + */ +static uint32_t +compute_shard (const struct TALER_EXCHANGEDB_Deposit *deposit) +{ +  uint32_t res; + +  GNUNET_assert (GNUNET_YES == +                 GNUNET_CRYPTO_kdf (&res, +                                    sizeof (res), +                                    &deposit->h_wire, +                                    sizeof (deposit->h_wire), +                                    &deposit->merchant_pub, +                                    sizeof (deposit->merchant_pub), +                                    NULL, 0)); +  /* interpret hash result as NBO for platform independence, +     convert to HBO and map to [0..2^31-1] range */ +  res = ntohl (res); +  if (res > INT32_MAX) +    res += INT32_MIN; +  GNUNET_assert (res <= INT32_MAX); +  return res; +} + + +/**   * Insert information about deposited coin into the database.   *   * @param cls the `struct PostgresClosure` with the plugin-specific state @@ -4914,6 +5012,7 @@ postgres_insert_deposit (void *cls,                           const struct TALER_EXCHANGEDB_Deposit *deposit)  {    struct PostgresClosure *pg = cls; +  uint32_t shard = compute_shard (deposit);    struct GNUNET_PQ_QueryParam params[] = {      GNUNET_PQ_query_param_auto_from_type (&deposit->coin.coin_pub),      TALER_PQ_query_param_amount (&deposit->amount_with_fee), @@ -4926,9 +5025,11 @@ postgres_insert_deposit (void *cls,      GNUNET_PQ_query_param_auto_from_type (&deposit->csig),      TALER_PQ_query_param_json (deposit->receiver_wire_account),      TALER_PQ_query_param_absolute_time (&exchange_timestamp), +    GNUNET_PQ_query_param_uint32 (&shard),      GNUNET_PQ_query_param_end    }; +  GNUNET_assert (shard <= INT32_MAX);    GNUNET_log (GNUNET_ERROR_TYPE_INFO,                "Inserting deposit to be executed at %s (%llu/%llu)\n",                GNUNET_STRINGS_absolute_time_to_string (deposit->wire_deadline), @@ -6933,18 +7034,19 @@ postgres_wire_prepare_data_get (void *cls,  /** - * Start a transaction where we transiently violate the foreign + * Starts a READ COMMITTED transaction where we transiently violate the foreign   * constraints on the "wire_out" table as we insert aggregations   * and only add the wire transfer out at the end.   *   * @param cls the @e cls of this struct with the plugin-specific state   * @return #GNUNET_OK on success   */ -static int +static enum GNUNET_GenericReturnValue  postgres_start_deferred_wire_out (void *cls)  {    struct PostgresClosure *pg = cls;    struct GNUNET_PQ_ExecuteStatement es[] = { +    GNUNET_PQ_make_execute ("START TRANSACTION ISOLATION LEVEL READ COMMITTED"),      GNUNET_PQ_make_execute ("SET CONSTRAINTS wire_out_ref DEFERRED"),      GNUNET_PQ_EXECUTE_STATEMENT_END    }; @@ -6953,10 +7055,6 @@ postgres_start_deferred_wire_out (void *cls)        postgres_preflight (pg))      return GNUNET_SYSERR;    if (GNUNET_OK != -      postgres_start (pg, -                      "deferred wire out")) -    return GNUNET_SYSERR; -  if (GNUNET_OK !=        GNUNET_PQ_exec_statements (pg->conn,                                   es))    { @@ -6966,6 +7064,7 @@ postgres_start_deferred_wire_out (void *cls)      postgres_rollback (pg);      return GNUNET_SYSERR;    } +  pg->transaction_name = "deferred wire out";    return GNUNET_OK;  } @@ -8041,7 +8140,7 @@ struct RecoupSerialContext    /**     * Status code, set to #GNUNET_SYSERR on hard errors.     */ -  int status; +  enum GNUNET_GenericReturnValue status;  }; @@ -10381,6 +10480,268 @@ postgres_complete_shard (void *cls,  /** + * Function called to grab a revolving work shard on an operation @a op. Runs + * in its own transaction. Returns the oldest inactive shard. + * + * @param cls the @e cls of this struct with the plugin-specific state + * @param job_name name of the operation to grab a revolving shard for + * @param shard_size desired shard size + * @param shard_limit exclusive end of the shard range + * @param[out] start_row inclusive start row of the shard (returned) + * @param[out] end_row exclusive end row of the shard (returned) + * @return transaction status code + */ +static enum GNUNET_DB_QueryStatus +postgres_begin_revolving_shard (void *cls, +                                const char *job_name, +                                uint32_t shard_size, +                                uint32_t shard_limit, +                                uint32_t *start_row, +                                uint32_t *end_row) +{ +  struct PostgresClosure *pg = cls; + +  GNUNET_assert (shard_limit <= 1U + (uint32_t) INT32_MAX); +  GNUNET_assert (shard_limit > 0); +  GNUNET_assert (shard_size > 0); +  for (unsigned int retries = 0; retries<3; retries++) +  { +    if (GNUNET_OK != +        postgres_start (pg, +                        "begin_revolving_shard")) +    { +      GNUNET_break (0); +      return GNUNET_DB_STATUS_HARD_ERROR; +    } + +    /* First, find last 'end_row' */ +    { +      enum GNUNET_DB_QueryStatus qs; +      struct GNUNET_PQ_QueryParam params[] = { +        GNUNET_PQ_query_param_string (job_name), +        GNUNET_PQ_query_param_end +      }; +      struct GNUNET_PQ_ResultSpec rs[] = { +        GNUNET_PQ_result_spec_uint32 ("end_row", +                                      start_row), +        GNUNET_PQ_result_spec_end +      }; + +      qs = GNUNET_PQ_eval_prepared_singleton_select (pg->conn, +                                                     "get_last_revolving_shard", +                                                     params, +                                                     rs); +      switch (qs) +      { +      case GNUNET_DB_STATUS_HARD_ERROR: +        GNUNET_break (0); +        postgres_rollback (pg); +        return qs; +      case GNUNET_DB_STATUS_SOFT_ERROR: +        postgres_rollback (pg); +        continue; +      case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: +        break; +      case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: +        *start_row = 0; /* base-case: no shards yet */ +        break; /* continued below */ +      } +    } /* get_last_shard */ + +    if (*start_row < shard_limit) +    { +      /* Claim fresh shard */ +      enum GNUNET_DB_QueryStatus qs; +      struct GNUNET_TIME_Absolute now; +      struct GNUNET_PQ_QueryParam params[] = { +        GNUNET_PQ_query_param_string (job_name), +        GNUNET_PQ_query_param_absolute_time (&now), +        GNUNET_PQ_query_param_uint32 (start_row), +        GNUNET_PQ_query_param_uint32 (end_row), +        GNUNET_PQ_query_param_end +      }; + +      *end_row = GNUNET_MIN (shard_limit, +                             *start_row + shard_size); +      now = GNUNET_TIME_absolute_get (); +      GNUNET_log (GNUNET_ERROR_TYPE_INFO, +                  "Trying to claim shard %llu-%llu\n", +                  (unsigned long long) *start_row, +                  (unsigned long long) *end_row); +      qs = GNUNET_PQ_eval_prepared_non_select (pg->conn, +                                               "create_revolving_shard", +                                               params); +      switch (qs) +      { +      case GNUNET_DB_STATUS_HARD_ERROR: +        GNUNET_break (0); +        postgres_rollback (pg); +        return qs; +      case GNUNET_DB_STATUS_SOFT_ERROR: +        postgres_rollback (pg); +        continue; +      case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: +        /* continued below (with commit) */ +        break; +      case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: +        /* someone else got this shard already, +           try again */ +        postgres_rollback (pg); +        continue; +      } +    } /* end create fresh reovlving shard */ +    else +    { +      /* claim oldest existing shard */ +      enum GNUNET_DB_QueryStatus qs; +      struct GNUNET_PQ_QueryParam params[] = { +        GNUNET_PQ_query_param_string (job_name), +        GNUNET_PQ_query_param_end +      }; +      struct GNUNET_PQ_ResultSpec rs[] = { +        GNUNET_PQ_result_spec_uint32 ("start_row", +                                      start_row), +        GNUNET_PQ_result_spec_uint32 ("end_row", +                                      end_row), +        GNUNET_PQ_result_spec_end +      }; + +      qs = GNUNET_PQ_eval_prepared_singleton_select (pg->conn, +                                                     "get_open_revolving_shard", +                                                     params, +                                                     rs); +      switch (qs) +      { +      case GNUNET_DB_STATUS_HARD_ERROR: +        GNUNET_break (0); +        postgres_rollback (pg); +        return qs; +      case GNUNET_DB_STATUS_SOFT_ERROR: +        postgres_rollback (pg); +        continue; +      case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: +        /* no open shards available */ +        postgres_rollback (pg); +        return qs; +      case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: +        { +          enum GNUNET_DB_QueryStatus qs; +          struct GNUNET_TIME_Absolute now = GNUNET_TIME_absolute_get (); +          struct GNUNET_PQ_QueryParam params[] = { +            GNUNET_PQ_query_param_string (job_name), +            GNUNET_PQ_query_param_absolute_time (&now), +            GNUNET_PQ_query_param_uint32 (start_row), +            GNUNET_PQ_query_param_uint32 (end_row), +            GNUNET_PQ_query_param_end +          }; + +          qs = GNUNET_PQ_eval_prepared_non_select (pg->conn, +                                                   "reclaim_revolving_shard", +                                                   params); +          switch (qs) +          { +          case GNUNET_DB_STATUS_HARD_ERROR: +            GNUNET_break (0); +            postgres_rollback (pg); +            return qs; +          case GNUNET_DB_STATUS_SOFT_ERROR: +            postgres_rollback (pg); +            continue; +          case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: +            break; /* continue with commit */ +          case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: +            GNUNET_break (0); /* logic error, should be impossible */ +            postgres_rollback (pg); +            return GNUNET_DB_STATUS_HARD_ERROR; +          } +        } +        break; /* continue with commit */ +      } +    } /* end claim oldest existing shard */ + +    /* commit */ +    { +      enum GNUNET_DB_QueryStatus qs; + +      qs = postgres_commit (pg); +      switch (qs) +      { +      case GNUNET_DB_STATUS_HARD_ERROR: +        GNUNET_break (0); +        postgres_rollback (pg); +        return qs; +      case GNUNET_DB_STATUS_SOFT_ERROR: +        postgres_rollback (pg); +        continue; +      case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: +      case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: +        return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT; +      } +    } +  } /* retry 'for' loop */ +  return GNUNET_DB_STATUS_SOFT_ERROR; +} + + +/** + * Function called to release a revolving shard + * back into the work pool.  Clears the + * "completed" flag. + * + * @param cls the @e cls of this struct with the plugin-specific state + * @param job_name name of the operation to grab a word shard for + * @param start_row inclusive start row of the shard + * @param end_row exclusive end row of the shard + * @return transaction status code + */ +enum GNUNET_DB_QueryStatus +postgres_release_revolving_shard (void *cls, +                                  const char *job_name, +                                  uint32_t start_row, +                                  uint32_t end_row) +{ +  struct PostgresClosure *pg = cls; +  struct GNUNET_PQ_QueryParam params[] = { +    GNUNET_PQ_query_param_string (job_name), +    GNUNET_PQ_query_param_uint32 (&start_row), +    GNUNET_PQ_query_param_uint32 (&end_row), +    GNUNET_PQ_query_param_end +  }; + +  GNUNET_log (GNUNET_ERROR_TYPE_INFO, +              "Releasing revolving shard %s %u-%u\n", +              job_name, +              (unsigned int) start_row, +              (unsigned int) end_row); +  return GNUNET_PQ_eval_prepared_non_select (pg->conn, +                                             "release_revolving_shard", +                                             params); +} + + +/** + * Function called to delete all revolving shards. + * To be used after a crash or when the shard size is + * changed. + * + * @param cls the @e cls of this struct with the plugin-specific state + * @return transaction status code + */ +enum GNUNET_DB_QueryStatus +postgres_delete_revolving_shards (void *cls) +{ +  struct PostgresClosure *pg = cls; +  struct GNUNET_PQ_QueryParam params[] = { +    GNUNET_PQ_query_param_end +  }; + +  return GNUNET_PQ_eval_prepared_non_select (pg->conn, +                                             "delete_revolving_shards", +                                             params); +} + + +/**   * Initialize Postgres database subsystem.   *   * @param cls a configuration instance @@ -10592,6 +10953,12 @@ libtaler_plugin_exchangedb_postgres_init (void *cls)      = &postgres_begin_shard;    plugin->complete_shard      = &postgres_complete_shard; +  plugin->begin_revolving_shard +    = &postgres_begin_revolving_shard; +  plugin->release_revolving_shard +    = &postgres_release_revolving_shard; +  plugin->delete_revolving_shards +    = &postgres_delete_revolving_shards;    return plugin;  } diff --git a/src/exchangedb/test_exchangedb.c b/src/exchangedb/test_exchangedb.c index 1cccb23c..8478fac0 100644 --- a/src/exchangedb/test_exchangedb.c +++ b/src/exchangedb/test_exchangedb.c @@ -804,29 +804,22 @@ static uint64_t deposit_rowid;   * @param cls closure a `struct TALER_EXCHANGEDB_Deposit *`   * @param rowid unique ID for the deposit in our DB, used for marking   *              it as 'tiny' or 'done' - * @param exchange_timestamp when did the deposit happen - * @param wallet_timestamp when did the wallet sign the contract   * @param merchant_pub public key of the merchant   * @param coin_pub public key of the coin   * @param amount_with_fee amount that was deposited including fee   * @param deposit_fee amount the exchange gets to keep as transaction fees   * @param h_contract_terms hash of the proposal data known to merchant and customer - * @param wire_deadline by which the merchant advised that he would like the - *        wire transfer to be executed   * @param wire wire details for the merchant   * @return transaction status code, #GNUNET_DB_STATUS_SUCCESS_ONE_RESULT to continue to iterate   */  static enum GNUNET_DB_QueryStatus  deposit_cb (void *cls,              uint64_t rowid, -            struct GNUNET_TIME_Absolute exchange_timestamp, -            struct GNUNET_TIME_Absolute wallet_timestamp,              const struct TALER_MerchantPublicKeyP *merchant_pub,              const struct TALER_CoinSpendPublicKeyP *coin_pub,              const struct TALER_Amount *amount_with_fee,              const struct TALER_Amount *deposit_fee,              const struct GNUNET_HashCode *h_contract_terms, -            struct GNUNET_TIME_Absolute wire_deadline,              const json_t *wire)  {    struct TALER_EXCHANGEDB_Deposit *deposit = cls; @@ -1896,9 +1889,11 @@ run (void *cls)                                               &matching_deposit_cb,                                               &deposit,                                               2)); -  sleep (2); /* giv deposit time to be ready */ +  sleep (2); /* give deposit time to be ready */    FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT !=            plugin->get_ready_deposit (plugin->cls, +                                     0, +                                     INT32_MAX,                                       &deposit_cb,                                       &deposit));    FAILIF (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS != @@ -1911,11 +1906,15 @@ run (void *cls)                                       deposit_rowid));    FAILIF (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS !=            plugin->get_ready_deposit (plugin->cls, +                                     0, +                                     INT32_MAX,                                       &deposit_cb,                                       &deposit));    plugin->rollback (plugin->cls);    FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT !=            plugin->get_ready_deposit (plugin->cls, +                                     0, +                                     INT32_MAX,                                       &deposit_cb,                                       &deposit));    FAILIF (GNUNET_OK != | 
