diff options
author | Christian Grothoff <christian@grothoff.org> | 2022-08-11 23:35:33 +0200 |
---|---|---|
committer | Christian Grothoff <christian@grothoff.org> | 2022-08-11 23:35:33 +0200 |
commit | 1009084e94b8e8cf19e3b5568c3cccaba2bd2209 (patch) | |
tree | a346997dedd05f685ba7addc59e288dfa550ad0e /src/exchange/taler-exchange-aggregator.c | |
parent | b061ea85c84facfc78c34edface367c5f040bc9c (diff) |
major rework of the KYC logic, making it more configurable, not complete, but tests pass again
Diffstat (limited to 'src/exchange/taler-exchange-aggregator.c')
-rw-r--r-- | src/exchange/taler-exchange-aggregator.c | 779 |
1 files changed, 545 insertions, 234 deletions
diff --git a/src/exchange/taler-exchange-aggregator.c b/src/exchange/taler-exchange-aggregator.c index 3d30ccd0..2c279535 100644 --- a/src/exchange/taler-exchange-aggregator.c +++ b/src/exchange/taler-exchange-aggregator.c @@ -1,6 +1,6 @@ /* This file is part of TALER - Copyright (C) 2016-2021 Taler Systems SA + Copyright (C) 2016-2022 Taler Systems SA TALER is free software; you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software @@ -26,6 +26,7 @@ #include "taler_exchangedb_lib.h" #include "taler_exchangedb_plugin.h" #include "taler_json_lib.h" +#include "taler_kyclogic_lib.h" #include "taler_bank_service.h" @@ -43,6 +44,12 @@ struct AggregationUnit struct TALER_MerchantPublicKeyP merchant_pub; /** + * Transient amount already found aggregated, + * set only if @e have_transient is true. + */ + struct TALER_Amount trans; + + /** * Total amount to be transferred, before subtraction of @e fees.wire and rounding down. */ struct TALER_Amount total_amount; @@ -84,6 +91,19 @@ struct AggregationUnit */ const struct TALER_EXCHANGEDB_AccountInfo *wa; + /** + * Set to #GNUNET_OK during transient checking + * while everything is OK. Otherwise see return + * value of #do_aggregate(). + */ + enum GNUNET_GenericReturnValue ret; + + /** + * Do we have an entry in the transient table for + * this aggregation? + */ + bool have_transient; + }; @@ -151,7 +171,6 @@ static struct TALER_EXCHANGEDB_Plugin *db_plugin; */ static struct GNUNET_SCHEDULER_Task *task; - /** * How long should we sleep when idle before trying to find more work? */ @@ -186,12 +205,12 @@ run_aggregation (void *cls); /** - * Select a shard to work on. + * Work on transactions unlocked by KYC. * * @param cls NULL */ static void -run_shard (void *cls); +drain_kyc_alerts (void *cls); /** @@ -226,6 +245,7 @@ shutdown_task (void *cls) GNUNET_SCHEDULER_cancel (task); task = NULL; } + TALER_KYCLOGIC_kyc_done (); TALER_EXCHANGEDB_plugin_unload (db_plugin); db_plugin = NULL; TALER_EXCHANGEDB_unload_accounts (); @@ -353,109 +373,170 @@ release_shard (struct Shard *s) } +/** + * Trigger the wire transfer for the @a au_active + * and delete the record of the aggregation. + * + * @param au_active information about the aggregation + */ +static enum GNUNET_DB_QueryStatus +trigger_wire_transfer (const struct AggregationUnit *au_active) +{ + enum GNUNET_DB_QueryStatus qs; + + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Preparing wire transfer of %s to %s\n", + TALER_amount2s (&au_active->final_amount), + TALER_B2S (&au_active->merchant_pub)); + { + void *buf; + size_t buf_size; + + TALER_BANK_prepare_transfer (au_active->payto_uri, + &au_active->final_amount, + exchange_base_url, + &au_active->wtid, + &buf, + &buf_size); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Storing %u bytes of wire prepare data\n", + (unsigned int) buf_size); + /* Commit our intention to execute the wire transfer! */ + qs = db_plugin->wire_prepare_data_insert (db_plugin->cls, + au_active->wa->method, + buf, + buf_size); + GNUNET_free (buf); + } + /* Commit the WTID data to 'wire_out' */ + if (qs >= 0) + qs = db_plugin->store_wire_transfer_out (db_plugin->cls, + au_active->execution_time, + &au_active->wtid, + &au_active->h_payto, + au_active->wa->section_name, + &au_active->final_amount); + + if ( (qs >= 0) && + au_active->have_transient) + qs = db_plugin->delete_aggregation_transient (db_plugin->cls, + &au_active->h_payto, + &au_active->wtid); + return qs; +} + + +/** + * Callback to return all applicable amounts for the KYC + * decision to @ a cb. + * + * @param cls a `struct AggregationUnit *` + * @param limit time limit for the iteration + * @param cb function to call with the amounts + * @param cb_cls closure for @a cb + */ static void -run_aggregation (void *cls) +return_relevant_amounts (void *cls, + struct GNUNET_TIME_Absolute limit, + TALER_EXCHANGEDB_KycAmountCallback cb, + void *cb_cls) { - struct Shard *s = cls; - struct AggregationUnit au_active; + const struct AggregationUnit *au_active = cls; enum GNUNET_DB_QueryStatus qs; - struct TALER_Amount trans; - bool have_transient = true; /* squash compiler warning */ - task = NULL; GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Checking for ready deposits to aggregate\n"); - /* make sure we have current fees */ - memset (&au_active, - 0, - sizeof (au_active)); - au_active.execution_time = GNUNET_TIME_timestamp_get (); + "Returning amount %s in KYC check\n", + TALER_amount2s (&au_active->total_amount)); if (GNUNET_OK != - db_plugin->start_deferred_wire_out (db_plugin->cls)) + cb (cb_cls, + &au_active->total_amount, + GNUNET_TIME_absolute_get ())) + return; + qs = db_plugin->select_aggregation_amounts_for_kyc_check ( + db_plugin->cls, + &au_active->h_payto, + limit, + cb, + cb_cls); + if (GNUNET_DB_STATUS_HARD_ERROR == qs) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Failed to start database transaction!\n"); - global_ret = EXIT_FAILURE; - GNUNET_SCHEDULER_shutdown (); - release_shard (s); - return; + "Failed to select aggregation amounts for KYC limit check!\n"); } - qs = db_plugin->get_ready_deposit ( +} + + +/** + * Test if KYC is required for a transfer to @a h_payto. + * + * @param au_active aggregation unit to check for + * @return true if KYC checks are satisfied + */ +static bool +kyc_satisfied (const struct AggregationUnit *au_active) +{ + const char *requirement; + uint64_t legi_row; + enum GNUNET_DB_QueryStatus qs; + + requirement = TALER_KYCLOGIC_kyc_test_required ( + TALER_KYCLOGIC_KYC_TRIGGER_DEPOSIT, + &au_active->h_payto, + db_plugin->select_satisfied_kyc_processes, db_plugin->cls, - s->shard_start, - s->shard_end, - kyc_off ? true : false, - &au_active.merchant_pub, - &au_active.payto_uri); - switch (qs) + &return_relevant_amounts, + (void *) au_active); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "KYC requirement for %s is %s\n", + TALER_amount2s (&au_active->total_amount), + requirement); + if (NULL == requirement) + return true; + qs = db_plugin->insert_kyc_requirement_for_account ( + db_plugin->cls, + requirement, + &au_active->h_payto, + &legi_row); + if (qs < 0) { - case GNUNET_DB_STATUS_HARD_ERROR: - cleanup_au (&au_active); - db_plugin->rollback (db_plugin->cls); GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Failed to begin deposit iteration!\n"); - global_ret = EXIT_FAILURE; - GNUNET_SCHEDULER_shutdown (); - release_shard (s); - return; - case GNUNET_DB_STATUS_SOFT_ERROR: - cleanup_au (&au_active); - db_plugin->rollback (db_plugin->cls); - GNUNET_assert (NULL == task); - task = GNUNET_SCHEDULER_add_now (&run_aggregation, - s); - return; - case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: - { - uint64_t counter = s->work_counter; - struct GNUNET_TIME_Relative duration - = GNUNET_TIME_absolute_get_duration (s->start_time.abs_time); - - cleanup_au (&au_active); - db_plugin->rollback (db_plugin->cls); - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Completed shard [%u,%u] after %s with %llu deposits\n", - (unsigned int) s->shard_start, - (unsigned int) s->shard_end, - GNUNET_TIME_relative2s (duration, - true), - (unsigned long long) counter); - release_shard (s); - if ( (GNUNET_YES == test_mode) && - (0 == counter) ) - { - /* in test mode, shutdown after a shard is done with 0 work */ - GNUNET_SCHEDULER_shutdown (); - return; - } - GNUNET_assert (NULL == task); - /* If we ended up doing zero work, sleep a bit */ - if (0 == counter) - task = GNUNET_SCHEDULER_add_delayed (aggregator_idle_sleep_interval, - &run_shard, - NULL); - else - task = GNUNET_SCHEDULER_add_now (&run_shard, - NULL); - return; - } - case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: - s->work_counter++; - /* continued below */ - break; + "Failed to persist KYC requirement `%s' in DB!\n", + requirement); + } + else + { + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "New legitimization process %llu started\n", + (unsigned long long) legi_row); } - au_active.wa = TALER_EXCHANGEDB_find_account_by_payto_uri ( - au_active.payto_uri); - if (NULL == au_active.wa) + return false; +} + + +/** + * Perform the main aggregation work for @a au. Expects to be in + * a working transaction, which the caller must also ultimately commit + * (or rollback) depending on our return value. + * + * @param[in,out] au aggregation unit to work on + * @return #GNUNET_OK if aggregation succeeded, + * #GNUNET_NO to rollback and try again (serialization issue) + * #GNUNET_SYSERR hard error, terminate aggregator process + */ +static enum GNUNET_GenericReturnValue +do_aggregate (struct AggregationUnit *au) +{ + enum GNUNET_DB_QueryStatus qs; + + au->wa = TALER_EXCHANGEDB_find_account_by_payto_uri ( + au->payto_uri); + if (NULL == au->wa) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "No exchange account configured for `%s', please fix your setup to continue!\n", - au_active.payto_uri); + au->payto_uri); global_ret = EXIT_FAILURE; - GNUNET_SCHEDULER_shutdown (); - db_plugin->rollback (db_plugin->cls); - release_shard (s); - return; + return GNUNET_SYSERR; } { @@ -464,234 +545,265 @@ run_aggregation (void *cls) struct TALER_MasterSignatureP master_sig; qs = db_plugin->get_wire_fee (db_plugin->cls, - au_active.wa->method, - au_active.execution_time, + au->wa->method, + au->execution_time, &start_date, &end_date, - &au_active.fees, + &au->fees, &master_sig); if (0 >= qs) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Could not get wire fees for %s at %s. Aborting run.\n", - au_active.wa->method, - GNUNET_TIME_timestamp2s (au_active.execution_time)); + au->wa->method, + GNUNET_TIME_timestamp2s (au->execution_time)); global_ret = EXIT_FAILURE; - GNUNET_SCHEDULER_shutdown (); - db_plugin->rollback (db_plugin->cls); - release_shard (s); - return; + return GNUNET_SYSERR; } } - /* Now try to find other deposits to aggregate */ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Found ready deposit for %s, aggregating by target %s\n", - TALER_B2S (&au_active.merchant_pub), - au_active.payto_uri); - TALER_payto_hash (au_active.payto_uri, - &au_active.h_payto); - + TALER_B2S (&au->merchant_pub), + au->payto_uri); qs = db_plugin->select_aggregation_transient (db_plugin->cls, - &au_active.h_payto, - au_active.wa->section_name, - &au_active.wtid, - &trans); + &au->h_payto, + &au->merchant_pub, + au->wa->section_name, + &au->wtid, + &au->trans); switch (qs) { case GNUNET_DB_STATUS_HARD_ERROR: GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to lookup transient aggregates!\n"); - cleanup_au (&au_active); - db_plugin->rollback (db_plugin->cls); global_ret = EXIT_FAILURE; - GNUNET_SCHEDULER_shutdown (); - release_shard (s); - return; + return GNUNET_SYSERR; case GNUNET_DB_STATUS_SOFT_ERROR: /* serializiability issue, try again */ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Serialization issue, trying again later!\n"); - db_plugin->rollback (db_plugin->cls); - cleanup_au (&au_active); - GNUNET_assert (NULL == task); - task = GNUNET_SCHEDULER_add_now (&run_aggregation, - s); - return; + return GNUNET_NO; case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE, - &au_active.wtid, - sizeof (au_active.wtid)); - have_transient = false; + &au->wtid, + sizeof (au->wtid)); + au->have_transient = false; break; case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: - have_transient = true; + au->have_transient = true; break; } qs = db_plugin->aggregate (db_plugin->cls, - &au_active.h_payto, - &au_active.merchant_pub, - &au_active.wtid, - &au_active.total_amount); + &au->h_payto, + &au->merchant_pub, + &au->wtid, + &au->total_amount); if (GNUNET_DB_STATUS_HARD_ERROR == qs) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to execute aggregation!\n"); - cleanup_au (&au_active); - db_plugin->rollback (db_plugin->cls); global_ret = EXIT_FAILURE; - GNUNET_SCHEDULER_shutdown (); - release_shard (s); - return; + return GNUNET_SYSERR; } if (GNUNET_DB_STATUS_SOFT_ERROR == qs) { /* serializiability issue, try again */ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Serialization issue, trying again later!\n"); - db_plugin->rollback (db_plugin->cls); - cleanup_au (&au_active); - GNUNET_assert (NULL == task); - task = GNUNET_SCHEDULER_add_now (&run_aggregation, - s); - return; + return GNUNET_NO; } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Aggregation total is %s.\n", - TALER_amount2s (&au_active.total_amount)); + TALER_amount2s (&au->total_amount)); /* Subtract wire transfer fee and round to the unit supported by the wire transfer method; Check if after rounding down, we still have an amount to transfer, and if not mark as 'tiny'. */ - if (have_transient) + if (au->have_transient) GNUNET_assert (0 <= - TALER_amount_add (&au_active.total_amount, - &au_active.total_amount, - &trans)); + TALER_amount_add (&au->total_amount, + &au->total_amount, + &au->trans)); + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Rounding aggregate of %s\n", - TALER_amount2s (&au_active.total_amount)); + TALER_amount2s (&au->total_amount)); if ( (0 >= - TALER_amount_subtract (&au_active.final_amount, - &au_active.total_amount, - &au_active.fees.wire)) || + TALER_amount_subtract (&au->final_amount, + &au->total_amount, + &au->fees.wire)) || (GNUNET_SYSERR == - TALER_amount_round_down (&au_active.final_amount, + TALER_amount_round_down (&au->final_amount, ¤cy_round_unit)) || - (TALER_amount_is_zero (&au_active.final_amount)) ) + (TALER_amount_is_zero (&au->final_amount)) || + (! kyc_satisfied (au)) ) { GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Aggregate value too low for transfer (%d/%s)\n", + "Not ready for wire transfer (%d/%s)\n", qs, - TALER_amount2s (&au_active.final_amount)); - if (have_transient) + TALER_amount2s (&au->final_amount)); + if (au->have_transient) qs = db_plugin->update_aggregation_transient (db_plugin->cls, - &au_active.h_payto, - &au_active.wtid, - &au_active.total_amount); + &au->h_payto, + &au->wtid, + &au->total_amount); else qs = db_plugin->create_aggregation_transient (db_plugin->cls, - &au_active.h_payto, - au_active.wa->section_name, - &au_active.wtid, - &au_active.total_amount); + &au->h_payto, + au->wa->section_name, + &au->merchant_pub, + &au->wtid, + &au->total_amount); if (GNUNET_DB_STATUS_SOFT_ERROR == qs) { GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Serialization issue, trying again later!\n"); - db_plugin->rollback (db_plugin->cls); - cleanup_au (&au_active); - /* start again */ - GNUNET_assert (NULL == task); - task = GNUNET_SCHEDULER_add_now (&run_aggregation, - s); - return; + return GNUNET_NO; } if (GNUNET_DB_STATUS_HARD_ERROR == qs) { GNUNET_break (0); - db_plugin->rollback (db_plugin->cls); - cleanup_au (&au_active); global_ret = EXIT_FAILURE; - GNUNET_SCHEDULER_shutdown (); - release_shard (s); - return; + return GNUNET_SYSERR; } /* commit */ - (void) commit_or_warn (); - cleanup_au (&au_active); - - /* start again */ - GNUNET_assert (NULL == task); - task = GNUNET_SCHEDULER_add_now (&run_aggregation, - s); - return; + return GNUNET_OK; } - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Preparing wire transfer of %s to %s\n", - TALER_amount2s (&au_active.final_amount), - TALER_B2S (&au_active.merchant_pub)); - { - void *buf; - size_t buf_size; - TALER_BANK_prepare_transfer (au_active.payto_uri, - &au_active.final_amount, - exchange_base_url, - &au_active.wtid, - &buf, - &buf_size); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Storing %u bytes of wire prepare data\n", - (unsigned int) buf_size); - /* Commit our intention to execute the wire transfer! */ - qs = db_plugin->wire_prepare_data_insert (db_plugin->cls, - au_active.wa->method, - buf, - buf_size); - GNUNET_free (buf); + qs = trigger_wire_transfer (au); + switch (qs) + { + case GNUNET_DB_STATUS_SOFT_ERROR: + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Serialization issue during aggregation; trying again later!\n"); + return GNUNET_NO; + case GNUNET_DB_STATUS_HARD_ERROR: + GNUNET_break (0); + global_ret = EXIT_FAILURE; + return GNUNET_SYSERR; + default: + return GNUNET_OK; } - /* Commit the WTID data to 'wire_out' */ - if (qs >= 0) - qs = db_plugin->store_wire_transfer_out (db_plugin->cls, - au_active.execution_time, - &au_active.wtid, - &au_active.h_payto, - au_active.wa->section_name, - &au_active.final_amount); +} - if ( (qs >= 0) && - have_transient) - qs = db_plugin->delete_aggregation_transient (db_plugin->cls, - &au_active.h_payto, - &au_active.wtid); - cleanup_au (&au_active); - if (GNUNET_DB_STATUS_SOFT_ERROR == qs) +static void +run_aggregation (void *cls) +{ + struct Shard *s = cls; + struct AggregationUnit au_active; + enum GNUNET_DB_QueryStatus qs; + enum GNUNET_GenericReturnValue ret; + + task = NULL; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Checking for ready deposits to aggregate\n"); + /* make sure we have current fees */ + memset (&au_active, + 0, + sizeof (au_active)); + au_active.execution_time = GNUNET_TIME_timestamp_get (); + if (GNUNET_OK != + db_plugin->start_deferred_wire_out (db_plugin->cls)) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Serialization issue for prepared wire data; trying again later!\n"); + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Failed to start database transaction!\n"); + global_ret = EXIT_FAILURE; + GNUNET_SCHEDULER_shutdown (); + release_shard (s); + return; + } + qs = db_plugin->get_ready_deposit ( + db_plugin->cls, + s->shard_start, + s->shard_end, + &au_active.merchant_pub, + &au_active.payto_uri); + switch (qs) + { + case GNUNET_DB_STATUS_HARD_ERROR: + cleanup_au (&au_active); + db_plugin->rollback (db_plugin->cls); + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Failed to begin deposit iteration!\n"); + global_ret = EXIT_FAILURE; + GNUNET_SCHEDULER_shutdown (); + release_shard (s); + return; + case GNUNET_DB_STATUS_SOFT_ERROR: + cleanup_au (&au_active); db_plugin->rollback (db_plugin->cls); - /* start again */ GNUNET_assert (NULL == task); task = GNUNET_SCHEDULER_add_now (&run_aggregation, s); return; + case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: + { + uint64_t counter = s->work_counter; + struct GNUNET_TIME_Relative duration + = GNUNET_TIME_absolute_get_duration (s->start_time.abs_time); + + cleanup_au (&au_active); + db_plugin->rollback (db_plugin->cls); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Completed shard [%u,%u] after %s with %llu deposits\n", + (unsigned int) s->shard_start, + (unsigned int) s->shard_end, + GNUNET_TIME_relative2s (duration, + true), + (unsigned long long) counter); + release_shard (s); + if ( (GNUNET_YES == test_mode) && + (0 == counter) ) + { + /* in test mode, shutdown after a shard is done with 0 work */ + GNUNET_SCHEDULER_shutdown (); + return; + } + GNUNET_assert (NULL == task); + /* If we ended up doing zero work, sleep a bit */ + if (0 == counter) + task = GNUNET_SCHEDULER_add_delayed (aggregator_idle_sleep_interval, + &drain_kyc_alerts, + NULL); + else + task = GNUNET_SCHEDULER_add_now (&drain_kyc_alerts, + NULL); + return; + } + case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: + s->work_counter++; + /* continued below */ + break; } - if (GNUNET_DB_STATUS_HARD_ERROR == qs) + + TALER_payto_hash (au_active.payto_uri, + &au_active.h_payto); + ret = do_aggregate (&au_active); + cleanup_au (&au_active); + switch (ret) { - GNUNET_break (0); - db_plugin->rollback (db_plugin->cls); - /* die hard */ - global_ret = EXIT_FAILURE; + case GNUNET_SYSERR: GNUNET_SCHEDULER_shutdown (); + db_plugin->rollback (db_plugin->cls); release_shard (s); return; + case GNUNET_NO: + db_plugin->rollback (db_plugin->cls); + GNUNET_assert (NULL == task); + task = GNUNET_SCHEDULER_add_now (&run_aggregation, + s); + return; + case GNUNET_OK: + /* continued below */ + break; } - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Stored wire transfer out instructions\n"); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Committing aggregation result\n"); /* Now we can finally commit the overall transaction, as we are again consistent if all of this passes. */ @@ -699,8 +811,8 @@ run_aggregation (void *cls) { case GNUNET_DB_STATUS_SOFT_ERROR: /* try again */ - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Commit issue for prepared wire data; trying again later!\n"); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Serialization issue on commit; trying again later!\n"); GNUNET_assert (NULL == task); task = GNUNET_SCHEDULER_add_now (&run_aggregation, s); @@ -714,7 +826,7 @@ run_aggregation (void *cls) return; case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Preparation complete, going again\n"); + "Commit complete, going again\n"); GNUNET_assert (NULL == task); task = GNUNET_SCHEDULER_add_now (&run_aggregation, s); @@ -792,6 +904,197 @@ run_shard (void *cls) /** + * Function called on transient aggregations matching + * a particular hash of a payto URI. + * + * @param cls + * @param payto_uri corresponding payto URI + * @param wtid wire transfer identifier of transient aggregation + * @param merchant_pub public key of the merchant + * @param total amount aggregated so far + * @return true to continue to iterate + */ +static bool +handle_transient_cb ( + void *cls, + const char *payto_uri, + const struct TALER_WireTransferIdentifierRawP *wtid, + const struct TALER_MerchantPublicKeyP *merchant_pub, + const struct TALER_Amount *total) +{ + struct AggregationUnit *au = cls; + + if (GNUNET_OK != au->ret) + { + GNUNET_break (0); + return false; + } + au->payto_uri = GNUNET_strdup (payto_uri); + au->wtid = *wtid; + au->merchant_pub = *merchant_pub; + au->trans = *total; + au->have_transient = true; + au->ret = do_aggregate (au); + GNUNET_free (au->payto_uri); + return (GNUNET_OK == au->ret); +} + + +static void +drain_kyc_alerts (void *cls) +{ + enum GNUNET_DB_QueryStatus qs; + struct AggregationUnit au; + + (void) cls; + task = NULL; + memset (&au, + 0, + sizeof (au)); + au.execution_time = GNUNET_TIME_timestamp_get (); + if (GNUNET_SYSERR == + db_plugin->preflight (db_plugin->cls)) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Failed to obtain database connection!\n"); + global_ret = EXIT_FAILURE; + GNUNET_SCHEDULER_shutdown (); + return; + } + if (GNUNET_OK != + db_plugin->start (db_plugin->cls, + "handle kyc alerts")) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Failed to start database transaction!\n"); + global_ret = EXIT_FAILURE; + GNUNET_SCHEDULER_shutdown (); + return; + } + while (1) + { + qs = db_plugin->drain_kyc_alert (db_plugin->cls, + 1, + &au.h_payto); + switch (qs) + { + case GNUNET_DB_STATUS_HARD_ERROR: + GNUNET_break (0); + db_plugin->rollback (db_plugin->cls); + GNUNET_assert (NULL == task); + task = GNUNET_SCHEDULER_add_now (&drain_kyc_alerts, + NULL); + return; + case GNUNET_DB_STATUS_SOFT_ERROR: + db_plugin->rollback (db_plugin->cls); + GNUNET_assert (NULL == task); + task = GNUNET_SCHEDULER_add_now (&drain_kyc_alerts, + NULL); + return; + case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: + qs = db_plugin->commit (db_plugin->cls); + if (qs < 0) + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Failed to commit KYC drain\n"); + GNUNET_assert (NULL == task); + task = GNUNET_SCHEDULER_add_now (&run_shard, + NULL); + return; + case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: + /* handled below */ + break; + } + + au.ret = GNUNET_OK; + qs = db_plugin->find_aggregation_transient (db_plugin->cls, + &au.h_payto, + &handle_transient_cb, + &au); + switch (qs) + { + case GNUNET_DB_STATUS_HARD_ERROR: + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Failed to lookup transient aggregates!\n"); + db_plugin->rollback (db_plugin->cls); + GNUNET_assert (NULL == task); + task = GNUNET_SCHEDULER_add_now (&drain_kyc_alerts, + NULL); + return; + case GNUNET_DB_STATUS_SOFT_ERROR: + /* serializiability issue, try again */ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Serialization issue, trying again later!\n"); + db_plugin->rollback (db_plugin->cls); + GNUNET_assert (NULL == task); + task = GNUNET_SCHEDULER_add_now (&drain_kyc_alerts, + NULL); + return; + case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: + continue; /* while (1) */ + default: + break; + } + break; + } /* while(1) */ + + { + enum GNUNET_GenericReturnValue ret; + + ret = au.ret; + cleanup_au (&au); + switch (ret) + { + case GNUNET_SYSERR: + GNUNET_break (0); + GNUNET_SCHEDULER_shutdown (); + db_plugin->rollback (db_plugin->cls); /* just in case */ + return; + case GNUNET_NO: + db_plugin->rollback (db_plugin->cls); + GNUNET_assert (NULL == task); + task = GNUNET_SCHEDULER_add_now (&drain_kyc_alerts, + NULL); + return; + case GNUNET_OK: + /* continued below */ + break; + } + } + + switch (commit_or_warn ()) + { + case GNUNET_DB_STATUS_SOFT_ERROR: + /* try again */ + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Serialization issue on commit; trying again later!\n"); + GNUNET_assert (NULL == task); + task = GNUNET_SCHEDULER_add_now (&drain_kyc_alerts, + NULL); + return; + case GNUNET_DB_STATUS_HARD_ERROR: + GNUNET_break (0); + global_ret = EXIT_FAILURE; + GNUNET_SCHEDULER_shutdown (); + db_plugin->rollback (db_plugin->cls); /* just in case */ + return; + case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Commit complete, going again\n"); + GNUNET_assert (NULL == task); + task = GNUNET_SCHEDULER_add_now (&drain_kyc_alerts, + NULL); + return; + default: + GNUNET_break (0); + global_ret = EXIT_FAILURE; + GNUNET_SCHEDULER_shutdown (); + db_plugin->rollback (db_plugin->cls); /* just in case */ + return; + } +} + + +/** * First task. * * @param cls closure, NULL @@ -811,7 +1114,8 @@ run (void *cls, (void) cfgfile; cfg = c; - if (GNUNET_OK != parse_aggregator_config ()) + if (GNUNET_OK != + parse_aggregator_config ()) { cfg = NULL; global_ret = EXIT_NOTCONFIGURED; @@ -832,11 +1136,18 @@ run (void *cls, shard_size = 1U + INT32_MAX; else shard_size = (uint32_t) ass; + if (GNUNET_OK != + TALER_KYCLOGIC_kyc_init (cfg)) + { + cfg = NULL; + global_ret = EXIT_NOTCONFIGURED; + return; + } + GNUNET_SCHEDULER_add_shutdown (&shutdown_task, + NULL); GNUNET_assert (NULL == task); - task = GNUNET_SCHEDULER_add_now (&run_shard, + task = GNUNET_SCHEDULER_add_now (&drain_kyc_alerts, NULL); - GNUNET_SCHEDULER_add_shutdown (&shutdown_task, - cls); } |