diff options
Diffstat (limited to 'src/exchangedb')
| -rw-r--r-- | src/exchangedb/exchange_do_reserves_in_insert.sql | 141 | ||||
| -rw-r--r-- | src/exchangedb/pg_reserves_in_insert.c | 89 | 
2 files changed, 222 insertions, 8 deletions
| diff --git a/src/exchangedb/exchange_do_reserves_in_insert.sql b/src/exchangedb/exchange_do_reserves_in_insert.sql index dffcd8b5..bc1431ad 100644 --- a/src/exchangedb/exchange_do_reserves_in_insert.sql +++ b/src/exchangedb/exchange_do_reserves_in_insert.sql @@ -963,3 +963,144 @@ BEGIN    CLOSE curs_transaction_exist;    RETURN;  END $$; + + + + + + + + + + + + + +CREATE OR REPLACE FUNCTION exchange_do_array_reserves_insert( +  IN in_gc_date INT8, +  IN in_reserve_expiration INT8, +  IN ina_reserve_pub BYTEA[], +  IN ina_wire_ref INT8[], +  IN ina_credit_val INT8[], +  IN ina_credit_frac INT4[], +  IN ina_exchange_account_name VARCHAR[], +  IN ina_execution_date INT8[], +  IN ina_wire_source_h_payto BYTEA[], +  IN ina_payto_uri VARCHAR[], +  IN ina_notify TEXT[], +  OUT transaction_duplicate BOOLEAN, +  OUT ruuid INT8) +LANGUAGE plpgsql +AS $$ +DECLARE +  curs REFCURSOR; +DECLARE +  conflict BOOL; +DECLARE +  dup BOOL; +DECLARE +  uuid INT8; +DECLARE +  i RECORD; +BEGIN + +  INSERT INTO wire_targets +    (wire_target_h_payto +    ,payto_uri) +    SELECT +      wire_source_h_payto +     ,payto_uri +    FROM +      UNNEST (ina_wire_source_h_payto) AS wire_source_h_payto +     ,UNNEST (ina_payto_uri) AS payto_uri +  ON CONFLICT DO NOTHING; + +  OPEN curs FOR +  WITH reserve_changes AS ( +    SELECT +      reserve_pub +     ,wire_ref +     ,credit_val +     ,credit_frac +     ,exchange_account_name +     ,execution_date +     ,wire_source_h_payto +     ,payto_uri +     ,notify +    FROM +      UNNEST (ina_reserve_pub) AS reserve_pub +     ,UNNEST (ina_wire_ref) AS wire_ref +     ,UNNEST (ina_credit_val) AS credit_val +     ,UNNEST (ina_credit_frac) AS credit_frac +     ,UNNEST (ina_exchange_account_name) AS exchange_account_name +     ,UNNEST (ina_execution_date) AS execution_date +     ,UNNEST (ina_wire_source_h_payto) AS wire_source_h_payto +     ,UNNEST (ina_notify) AS notify; + + +  <<loop>> LOOP +    FETCH FROM curs INTO i; +    IF NOT FOUND +    THEN +      EXIT loop; +    END IF; + +    INSERT INTO reserves +      (reserve_pub +      ,current_balance_val +      ,current_balance_frac +      ,expiration_date +      ,gc_date +    ) VALUES ( +      i.reserve_pub +     ,i.credit_val +     ,i.credit_frac +     ,in_reserve_expiration +     ,in_gc_date +    ) +    ON CONFLICT DO NOTHING +    RETURNING reserve_uuid +    INTO uuid; +    conflict = NOT FOUND; + +    INSERT INTO reserves_in +      (reserve_pub +      ,wire_reference +      ,credit_val +      ,credit_frac +      ,exchange_account_section +      ,wire_source_h_payto +      ,execution_date +    ) VALUES ( +      i.reserve_pub +     ,i.wire_reference +     ,i.credit_val +     ,i.credit_frac +     ,i.exchange_account_section +     ,i.wire_source_h_payto +     ,i.execution_date +    ON CONFLICT DO NOTHING; + +    IF NOT FOUND +    THEN +      IF conflict +      THEN +        dup = TRUE; +      else +        dup = FALSE; +      END IF; +    ELSE +      IF NOT conflict +      THEN +        EXECUTE FORMAT ( +          'NOTIFY %s' +          ,i.notify); +      END IF; +      dup = FALSE; +    END IF; +    RETURN (dup,uuid); +  END LOOP loop_reserve; +  CLOSE curs; + +  RETURN; +END $$; diff --git a/src/exchangedb/pg_reserves_in_insert.c b/src/exchangedb/pg_reserves_in_insert.c index 1b7e62d9..72fde749 100644 --- a/src/exchangedb/pg_reserves_in_insert.c +++ b/src/exchangedb/pg_reserves_in_insert.c @@ -619,12 +619,36 @@ TEH_PG_reserves_in_insert (  #if 0 +/** + * Closure for our helper_cb() + */  struct Context  { +  /** +   * Array of reserve UUIDs to initialize. +   */    uint64_t *reserve_uuids; + +  /** +   * Array with entries set to 'true' for duplicate transactions. +   */    bool *transaction_duplicates; + +  /** +   * Array with entries set to 'true' for rows with conflicts. +   */    bool *conflicts; + +  /** +   * Set to #GNUNET_SYSERR on failures. +   */    struct GNUNET_GenericReturnValue status; + +  /** +   * Single value (no array) set to true if we need +   * to follow-up with an update. +   */ +  bool *needs_update;  }; @@ -665,6 +689,7 @@ helper_cb (void *cls,        ctx->status = GNUNET_SYSERR;        return;      } +    *ctx->need_update |= ctx->conflicts[i];    }  } @@ -685,7 +710,6 @@ TEH_PG_reserves_in_insertN (    const char *sender_account_details[GNUNET_NZL (reserves_length)];    const char *exchange_account_names[GNUNET_NZL (reserves_length)];    uint64_t wire_references[GNUNET_NZL (reserves_length)]; -    uint64_t reserve_uuids[GNUNET_NZL (reserves_length)];    bool transaction_duplicates[GNUNET_NZL (reserves_length)];    bool conflicts[GNUNET_NZL (reserves_length)]; @@ -693,6 +717,7 @@ TEH_PG_reserves_in_insertN (      = GNUNET_TIME_relative_to_timestamp (pg->idle_reserve_expiration_time);    struct GNUNET_TIME_Timestamp gc      = GNUNET_TIME_relative_to_timestamp (pg->legal_reserve_expiration_time); +  bool needs_update = false;    enum GNUNET_DB_QueryStatus qs;    for (unsigned int i = 0; i<reserves_length; i++) @@ -709,6 +734,25 @@ TEH_PG_reserves_in_insertN (      exchange_account_names[i] = reserve->exchange_account_name;      wire_references[i] = reserve->wire_reference;    } + +  /* NOTE: kind-of pointless to explicitly start a transaction here... */ +  if (GNUNET_OK != +      TEH_PG_preflight (pg)) +  { +    GNUNET_break (0); +    qs = GNUNET_DB_STATUS_HARD_ERROR; +    goto finished; +  } + +  if (GNUNET_OK != +      TEH_PG_start_read_committed (pg, +                                   "READ_COMMITED")) +  { +    GNUNET_break (0); +    qs = GNUNET_DB_STATUS_HARD_ERROR; +    goto finished; +  } +    PREPARE (pg,             "reserves_insert_with_array",             "SELECT" @@ -752,6 +796,7 @@ TEH_PG_reserves_in_insertN (        .reserve_uuids = reserve_uuids,        .transaction_duplicates = transaction_duplicates,        .conflicts = conflicts, +      .needs_update = &needs_update,        .status = GNUNET_OK      }; @@ -766,12 +811,42 @@ TEH_PG_reserves_in_insertN (        GNUNET_log (GNUNET_ERROR_TYPE_WARNING,                    "Failed to insert into reserves (%d)\n",                    qs); -      for (unsigned int i = 0; i<reserves_length; i++) -        GNUNET_free (rrs[i].notify_s); -      return qs; +      goto finished;      }    } +  { +    enum GNUNET_DB_QueryStatus cs; + +    cs = TEH_PG_commit (pg); +    if (cs < 0) +    { +      GNUNET_log (GNUNET_ERROR_TYPE_WARNING, +                  "Failed to commit\n"); +      qs = cs; +      goto finished; +    } +  } +  for (unsigned int i = 0; i<reserves_length; i++) +  { +    results[i] = transaction_duplicates[i] +      ? GNUNET_DB_STATUS_SUCCESS_NO_RESULTS +      : GNUNET_DB_STATUS_SUCCESS_ONE_RESULT; +  } + +  if (! need_update) +  { +    qs = reserves_length; +    goto finished; +  } +  if (GNUNET_OK != +      TEH_PG_start (pg, +                    "reserve-insert-continued")) +  { +    GNUNET_break (0); +    qs = GNUNET_DB_STATUS_HARD_ERROR; +    goto finished; +  }    for (unsigned int i = 0; i<reserves_length; i++)    { @@ -806,16 +881,14 @@ TEH_PG_reserves_in_insertN (                      "Failed to update reserves (%d)\n",                      qs);          results[i] = qs; -        for (unsigned int i = 0; i<reserves_length; i++) -          GNUNET_free (rrs[i].notify_s); -        return qs; +        goto finished;        }        results[i] = duplicate            ? GNUNET_DB_STATUS_SUCCESS_NO_RESULTS            : GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;      }    } -  // FIXME: convert results back for caller, too! +finished:    for (unsigned int i = 0; i<reserves_length; i++)      GNUNET_free (rrs[i].notify_s);    return qs; | 
