diff options
| author | Christian Grothoff <christian@grothoff.org> | 2021-06-20 16:41:04 +0200 | 
|---|---|---|
| committer | Christian Grothoff <christian@grothoff.org> | 2021-06-20 16:41:04 +0200 | 
| commit | 108bf57d048a135cb71f9453540c9d6579ae2028 (patch) | |
| tree | 6a67bbcf7cb1ab049a5d9ba426e878b1b493dd10 /src/exchangedb | |
| parent | 0271e848138a94e27f472196f5341879fd3ab8ba (diff) | |
preparations for sharded wirewatch
Diffstat (limited to 'src/exchangedb')
| -rw-r--r-- | src/exchangedb/exchange-0002.sql | 34 | ||||
| -rw-r--r-- | src/exchangedb/plugin_exchangedb_postgres.c | 297 | 
2 files changed, 329 insertions, 2 deletions
| diff --git a/src/exchangedb/exchange-0002.sql b/src/exchangedb/exchange-0002.sql index b03a7b51..361b69b8 100644 --- a/src/exchangedb/exchange-0002.sql +++ b/src/exchangedb/exchange-0002.sql @@ -1,6 +1,6 @@  --  -- This file is part of TALER --- Copyright (C) 2020 Taler Systems SA +-- Copyright (C) 2020-2021 Taler Systems SA  --  -- TALER is free software; you can redistribute it and/or modify it under the  -- terms of the GNU General Public License as published by the Free Software @@ -374,5 +374,37 @@ COMMENT ON TABLE signkey_revocations    IS 'remembering which online signing keys have been revoked'; + +CREATE TABLE IF NOT EXISTS work_shards +  (shard_serial_id BIGSERIAL UNIQUE +  ,last_attempt INT8 NOT NULL +  ,start_row INT8 NOT NULL +  ,end_row INT8 NOT NULL +  ,completed BOOLEAN NOT NULL +  ,job_name VARCHAR NOT NULL +  ,PRIMARY KEY (job_name, start_row) +  ); +CREATE INDEX IF NOT EXISTS work_shards_index +  ON work_shards +  (job_name +  ,completed +  ,last_attempt +  ); +COMMENT ON TABLE work_shards +  IS 'coordinates work between multiple processes working on the same job'; +COMMENT ON COLUMN work_shards.shard_serial_id +  IS 'unique serial number identifying the shard'; +COMMENT ON COLUMN work_shards.last_attempt +  IS 'last time a worker attempted to work on the shard'; +COMMENT ON COLUMN work_shards.completed +  IS 'set to TRUE once the shard is finished by a worker'; +COMMENT ON COLUMN work_shards.start_row +  IS 'row at which the shard scope starts, inclusive'; +COMMENT ON COLUMN work_shards.end_row +  IS 'row at which the shard scope ends, exclusive'; +COMMENT ON COLUMN work_shards.job_name +  IS 'unique name of the job the workers on this shard are performing'; + +  -- Complete transaction  COMMIT; diff --git a/src/exchangedb/plugin_exchangedb_postgres.c b/src/exchangedb/plugin_exchangedb_postgres.c index 04dc03cd..e61a1ac7 100644 --- a/src/exchangedb/plugin_exchangedb_postgres.c +++ b/src/exchangedb/plugin_exchangedb_postgres.c @@ -1,6 +1,6 @@  /*    This file is part of TALER -  Copyright (C) 2014--2020 Taler Systems SA +  Copyright (C) 2014--2021 Taler Systems SA    TALER is free software; you can redistribute it and/or modify it under the    terms of the GNU General Public License as published by the Free Software @@ -2438,6 +2438,52 @@ postgres_get_session (void *cls)                                ") VALUES "                                "($1, $2, $3, $4, $5, $6, $7, $8);",                                8), + +      /* Used in #postgres_begin_shard() */ +      GNUNET_PQ_make_prepare ("get_open_shard", +                              "SELECT" +                              " start_row" +                              ",end_row" +                              " FROM work_shards" +                              " WHERE job_name=$1" +                              "   AND last_attempt<$2" +                              "   AND completed=FALSE" +                              " ORDER BY last_attempt ASC" +                              " LIMIT 1;", +                              2), +      GNUNET_PQ_make_prepare ("reclaim_shard", +                              "UPDATE work_shards" +                              " SET last_attempt=$2" +                              " WHERE job_name=$1" +                              "   AND start_row=$3" +                              "   AND end_row=$4", +                              4), +      GNUNET_PQ_make_prepare ("get_last_shard", +                              "SELECT" +                              " end_row" +                              " FROM work_shards" +                              " WHERE job_name=$1" +                              "   AND completed=FALSE" +                              " ORDER BY end_row DESC" +                              " LIMIT 1;", +                              1), +      GNUNET_PQ_make_prepare ("claim_next_shard", +                              "INSERT INTO work_shards" +                              "(job_name" +                              ",last_attempt" +                              ",start_row" +                              ",end_row" +                              ") VALUES " +                              "($1, $2, $3, $4);", +                              4), +      /* Used in #postgres_complete_shard() */ +      GNUNET_PQ_make_prepare ("complete_shard", +                              "UPDATE work_shards" +                              " SET completed=TRUE" +                              " WHERE job_name=$1" +                              "   AND start_row=$2" +                              "   AND end_row=$3", +                              3),        GNUNET_PQ_PREPARED_STATEMENT_END      }; @@ -10150,6 +10196,251 @@ postgres_insert_records_by_table (void *cls,  /** + * Function called to grab a work shard on an operation @a op. Runs in its + * own transaction (hence no session provided). + * + * @param cls the @e cls of this struct with the plugin-specific state + * @param job_name name of the operation to grab a word shard for + * @param delay minimum age of a shard to grab + * @param size desired shard size + * @param[out] start_row inclusive start row of the shard (returned) + * @param[out] end_row exclusive end row of the shard (returned) + * @return transaction status code + */ +static enum GNUNET_DB_QueryStatus +postgres_begin_shard (void *cls, +                      const char *job_name, +                      struct GNUNET_TIME_Relative delay, +                      uint64_t shard_size, +                      uint64_t *start_row, +                      uint64_t *end_row) +{ +  struct TALER_EXCHANGEDB_Session *session; + +  session = postgres_get_session (cls); +  if (NULL == session) +    return GNUNET_DB_STATUS_HARD_ERROR; +  for (unsigned int retries = 0; retries<3; retries++) +  { +    if (GNUNET_OK != +        postgres_start (cls, +                        session, +                        "begin_shard")) +    { +      GNUNET_break (0); +      return GNUNET_DB_STATUS_HARD_ERROR; +    } + +    { +      struct GNUNET_TIME_Absolute past; +      enum GNUNET_DB_QueryStatus qs; +      struct GNUNET_PQ_QueryParam params[] = { +        GNUNET_PQ_query_param_string (job_name), +        GNUNET_PQ_query_param_absolute_time (&past), +        GNUNET_PQ_query_param_end +      }; +      struct GNUNET_PQ_ResultSpec rs[] = { +        GNUNET_PQ_result_spec_uint64 ("start_row", +                                      start_row), +        GNUNET_PQ_result_spec_uint64 ("end_row", +                                      end_row), +        GNUNET_PQ_result_spec_end +      }; + +      past = GNUNET_TIME_absolute_subtract (GNUNET_TIME_absolute_get (), +                                            delay); +      qs = GNUNET_PQ_eval_prepared_singleton_select (session->conn, +                                                     "get_open_shard", +                                                     params, +                                                     rs); +      switch (qs) +      { +      case GNUNET_DB_STATUS_HARD_ERROR: +        GNUNET_break (0); +        postgres_rollback (cls, +                           session); +        return qs; +      case GNUNET_DB_STATUS_SOFT_ERROR: +        postgres_rollback (cls, +                           session); +        continue; +      case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: +        { +          enum GNUNET_DB_QueryStatus qs; +          struct GNUNET_TIME_Absolute now; +          struct GNUNET_PQ_QueryParam params[] = { +            GNUNET_PQ_query_param_string (job_name), +            GNUNET_PQ_query_param_absolute_time (&now), +            GNUNET_PQ_query_param_uint64 (start_row), +            GNUNET_PQ_query_param_uint64 (end_row), +            GNUNET_PQ_query_param_end +          }; + +          now = GNUNET_TIME_absolute_get (); +          qs = GNUNET_PQ_eval_prepared_non_select (session->conn, +                                                   "reclaim_shard", +                                                   params); +          switch (qs) +          { +          case GNUNET_DB_STATUS_HARD_ERROR: +            GNUNET_break (0); +            postgres_rollback (cls, +                               session); +            return qs; +          case GNUNET_DB_STATUS_SOFT_ERROR: +            postgres_rollback (cls, +                               session); +            continue; +          case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: +            goto commit; +          case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: +            GNUNET_break (0); /* logic error, should be impossible */ +            postgres_rollback (cls, +                               session); +            return GNUNET_DB_STATUS_HARD_ERROR; +          } +        } +        break; /* actually unreachable */ +      case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: +        break; /* continued below */ +      } +    } /* get_open_shard */ + +    /* No open shard, find last 'end_row' */ +    { +      enum GNUNET_DB_QueryStatus qs; +      struct GNUNET_PQ_QueryParam params[] = { +        GNUNET_PQ_query_param_string (job_name), +        GNUNET_PQ_query_param_end +      }; +      struct GNUNET_PQ_ResultSpec rs[] = { +        GNUNET_PQ_result_spec_uint64 ("end_row", +                                      start_row), +        GNUNET_PQ_result_spec_end +      }; + +      qs = GNUNET_PQ_eval_prepared_singleton_select (session->conn, +                                                     "get_last_shard", +                                                     params, +                                                     rs); +      switch (qs) +      { +      case GNUNET_DB_STATUS_HARD_ERROR: +        GNUNET_break (0); +        postgres_rollback (cls, +                           session); +        return qs; +      case GNUNET_DB_STATUS_SOFT_ERROR: +        postgres_rollback (cls, +                           session); +        continue; +      case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: +        break; +      case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: +        *start_row = 0; /* base-case: no shards yet */ +        break; /* continued below */ +      } +      *end_row = *start_row + shard_size; +    } /* get_last_shard */ + +    /* Claim fresh shard */ +    { +      enum GNUNET_DB_QueryStatus qs; +      struct GNUNET_TIME_Absolute now; +      struct GNUNET_PQ_QueryParam params[] = { +        GNUNET_PQ_query_param_string (job_name), +        GNUNET_PQ_query_param_absolute_time (&now), +        GNUNET_PQ_query_param_uint64 (start_row), +        GNUNET_PQ_query_param_uint64 (end_row), +        GNUNET_PQ_query_param_end +      }; + +      now = GNUNET_TIME_absolute_get (); +      qs = GNUNET_PQ_eval_prepared_non_select (session->conn, +                                               "claim_next_shard", +                                               params); +      switch (qs) +      { +      case GNUNET_DB_STATUS_HARD_ERROR: +        GNUNET_break (0); +        postgres_rollback (cls, +                           session); +        return qs; +      case GNUNET_DB_STATUS_SOFT_ERROR: +        postgres_rollback (cls, +                           session); +        continue; +      case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: +        /* continued below */ +        break; +      case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: +        GNUNET_break (0); +        postgres_rollback (cls, +                           session); +        continue; +      } +    } /* claim_next_shard */ + +    /* commit */ +commit: +    { +      enum GNUNET_DB_QueryStatus qs; + +      qs = postgres_commit (cls, +                            session); +      switch (qs) +      { +      case GNUNET_DB_STATUS_HARD_ERROR: +        GNUNET_break (0); +        postgres_rollback (cls, +                           session); +        return qs; +      case GNUNET_DB_STATUS_SOFT_ERROR: +        postgres_rollback (cls, +                           session); +        continue; +      case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: +      case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: +        return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT; +      } +    } +  } /* retry 'for' loop */ +  return GNUNET_DB_STATUS_SOFT_ERROR; +} + + +/** + * Function called to persist that work on a shard was completed. + * + * @param cls the @e cls of this struct with the plugin-specific state + * @param session a session + * @param job_name name of the operation to grab a word shard for + * @param start_row inclusive start row of the shard + * @param end_row exclusive end row of the shard + * @return transaction status code + */ +enum GNUNET_DB_QueryStatus +postgres_complete_shard (void *cls, +                         struct TALER_EXCHANGEDB_Session *session, +                         const char *job_name, +                         uint64_t start_row, +                         uint64_t end_row) +{ +  struct GNUNET_PQ_QueryParam params[] = { +    GNUNET_PQ_query_param_string (job_name), +    GNUNET_PQ_query_param_uint64 (&start_row), +    GNUNET_PQ_query_param_uint64 (&end_row), +    GNUNET_PQ_query_param_end +  }; + +  (void) cls; +  return GNUNET_PQ_eval_prepared_non_select (session->conn, +                                             "complete_shard", +                                             params); +} + + +/**   * Initialize Postgres database subsystem.   *   * @param cls a configuration instance @@ -10353,6 +10644,10 @@ libtaler_plugin_exchangedb_postgres_init (void *cls)      = &postgres_lookup_records_by_table;    plugin->insert_records_by_table      = &postgres_insert_records_by_table; +  plugin->begin_shard +    = &postgres_begin_shard; +  plugin->complete_shard +    = &postgres_complete_shard;    return plugin;  } | 
