aboutsummaryrefslogtreecommitdiff
path: root/src/exchange
diff options
context:
space:
mode:
Diffstat (limited to 'src/exchange')
-rw-r--r--src/exchange/taler-exchange-expire.c41
-rw-r--r--src/exchange/taler-exchange-httpd.h4
-rw-r--r--src/exchange/taler-exchange-httpd_purses_get.c31
3 files changed, 66 insertions, 10 deletions
diff --git a/src/exchange/taler-exchange-expire.c b/src/exchange/taler-exchange-expire.c
index c7691930..b5df64a7 100644
--- a/src/exchange/taler-exchange-expire.c
+++ b/src/exchange/taler-exchange-expire.c
@@ -95,6 +95,12 @@ static int global_ret;
*/
static int test_mode;
+/**
+ * If this is a first-time run, we immediately
+ * try to catch up with the present.
+ */
+static bool jump_mode;
+
/**
* Select a shard to work on.
@@ -188,6 +194,7 @@ static void
release_shard (struct Shard *s)
{
enum GNUNET_DB_QueryStatus qs;
+ unsigned long long wc = (unsigned long long) s->work_counter;
qs = db_plugin->complete_shard (
db_plugin->cls,
@@ -209,10 +216,14 @@ release_shard (struct Shard *s)
case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Purse expiration shard completed with %llu purses\n",
- (unsigned long long) s->work_counter);
+ wc);
/* normal case */
break;
}
+ if ( (0 == wc) &&
+ (test_mode) &&
+ (! jump_mode) )
+ GNUNET_SCHEDULER_shutdown ();
}
@@ -262,13 +273,16 @@ run_expire (void *cls)
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Failed to obtain database connection!\n");
+ abort_shard (s);
global_ret = EXIT_FAILURE;
GNUNET_SCHEDULER_shutdown ();
return;
}
- if (db_plugin->start (db_plugin->cls,
+ if (GNUNET_OK !=
+ db_plugin->start (db_plugin->cls,
"expire-purse"))
{
+ GNUNET_break (0);
global_ret = EXIT_FAILURE;
db_plugin->rollback (db_plugin->cls);
abort_shard (s);
@@ -290,6 +304,7 @@ run_expire (void *cls)
case GNUNET_DB_STATUS_SOFT_ERROR:
db_plugin->rollback (db_plugin->cls);
abort_shard (s);
+ GNUNET_assert (NULL == task);
task = GNUNET_SCHEDULER_add_now (&run_shard,
NULL);
return;
@@ -303,6 +318,7 @@ run_expire (void *cls)
{
release_shard (s);
}
+ GNUNET_assert (NULL == task);
task = GNUNET_SCHEDULER_add_now (&run_shard,
NULL);
return;
@@ -310,6 +326,7 @@ run_expire (void *cls)
/* commit, and go again immediately */
s->work_counter++;
(void) commit_or_warn ();
+ GNUNET_assert (NULL == task);
task = GNUNET_SCHEDULER_add_now (&run_expire,
s);
}
@@ -343,9 +360,15 @@ run_shard (void *cls)
qs = db_plugin->begin_shard (db_plugin->cls,
"expire",
shard_size,
- shard_size.rel_value_us,
+ jump_mode
+ ? GNUNET_TIME_absolute_subtract (
+ GNUNET_TIME_absolute_get (),
+ shard_size).
+ abs_value_us
+ : shard_size.rel_value_us,
&s->shard_start.abs_value_us,
&s->shard_end.abs_value_us);
+ jump_mode = false;
if (0 >= qs)
{
if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
@@ -355,6 +378,7 @@ run_shard (void *cls)
GNUNET_free (s);
delay = GNUNET_TIME_randomized_backoff (delay,
GNUNET_TIME_UNIT_SECONDS);
+ GNUNET_assert (NULL == task);
task = GNUNET_SCHEDULER_add_delayed (delay,
&run_shard,
NULL);
@@ -368,9 +392,10 @@ run_shard (void *cls)
GNUNET_SCHEDULER_shutdown ();
return;
}
- if (GNUNET_TIME_absolute_is_future (s->shard_end))
+ if (GNUNET_TIME_absolute_is_future (s->shard_start))
{
- task = GNUNET_SCHEDULER_add_at (s->shard_end,
+ GNUNET_assert (NULL == task);
+ task = GNUNET_SCHEDULER_add_at (s->shard_start,
&run_shard,
NULL);
abort_shard (s);
@@ -379,12 +404,12 @@ run_shard (void *cls)
/* If this is a first-time run, we immediately
try to catch up with the present */
if (GNUNET_TIME_absolute_is_zero (s->shard_start))
- s->shard_end = GNUNET_TIME_absolute_get ();
-
+ jump_mode = true;
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Starting shard [%llu:%llu]!\n",
+ "Starting shard [%llu:%llu)!\n",
(unsigned long long) s->shard_start.abs_value_us,
(unsigned long long) s->shard_end.abs_value_us);
+ GNUNET_assert (NULL == task);
task = GNUNET_SCHEDULER_add_now (&run_expire,
s);
}
diff --git a/src/exchange/taler-exchange-httpd.h b/src/exchange/taler-exchange-httpd.h
index bb387696..278a05be 100644
--- a/src/exchange/taler-exchange-httpd.h
+++ b/src/exchange/taler-exchange-httpd.h
@@ -309,8 +309,8 @@ struct TEH_RequestHandler
* @return MHD result code
*/
MHD_RESULT
- (*get)(struct TEH_RequestContext *rc,
- const char *const args[]);
+ (*get)(struct TEH_RequestContext *rc,
+ const char *const args[]);
/**
diff --git a/src/exchange/taler-exchange-httpd_purses_get.c b/src/exchange/taler-exchange-httpd_purses_get.c
index 656a34db..12a24489 100644
--- a/src/exchange/taler-exchange-httpd_purses_get.c
+++ b/src/exchange/taler-exchange-httpd_purses_get.c
@@ -333,6 +333,37 @@ TEH_handler_purses_get (struct TEH_RequestContext *rc,
case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
break; /* handled below */
}
+ if (GNUNET_TIME_absolute_cmp (gc->timeout,
+ >,
+ gc->purse_expiration.abs_time))
+ {
+ /* Timeout too high, need to replace event handler */
+ struct TALER_PurseEventP rep = {
+ .header.size = htons (sizeof (rep)),
+ .header.type = htons (
+ gc->wait_for_merge
+ ? TALER_DBEVENT_EXCHANGE_PURSE_MERGED
+ : TALER_DBEVENT_EXCHANGE_PURSE_DEPOSITED),
+ .purse_pub = gc->purse_pub
+ };
+ struct GNUNET_DB_EventHandler *eh2;
+
+ gc->timeout = gc->purse_expiration.abs_time;
+ eh2 = TEH_plugin->event_listen (
+ TEH_plugin->cls,
+ GNUNET_TIME_absolute_get_remaining (gc->timeout),
+ &rep.header,
+ &db_event_cb,
+ rc);
+ if (NULL == eh2)
+ {
+ GNUNET_break (0);
+ gc->timeout = GNUNET_TIME_UNIT_ZERO_ABS;
+ }
+ TEH_plugin->event_listen_cancel (TEH_plugin->cls,
+ gc->eh);
+ gc->eh = eh2;
+ }
}
if (GNUNET_TIME_absolute_is_past (gc->purse_expiration.abs_time))
{