From 1b2abf82123fcc09a21da8df56b71caec30d0693 Mon Sep 17 00:00:00 2001 From: Tim Otten Date: Fri, 29 Sep 2023 18:29:52 -0700 Subject: [PATCH] Queue::runLoop() - Add API for running several tasks --- Civi/Api4/Action/Queue/RunLoop.php | 128 ++++++++++++++++++++++ Civi/Api4/Queue.php | 16 +++ tests/phpunit/api/v4/Entity/QueueTest.php | 43 ++++++++ 3 files changed, 187 insertions(+) create mode 100644 Civi/Api4/Action/Queue/RunLoop.php diff --git a/Civi/Api4/Action/Queue/RunLoop.php b/Civi/Api4/Action/Queue/RunLoop.php new file mode 100644 index 0000000000..5a777b3ab2 --- /dev/null +++ b/Civi/Api4/Action/Queue/RunLoop.php @@ -0,0 +1,128 @@ +queue); + $startTime = microtime(TRUE); + $requests = 0; + $errors = 0; + $successes = 0; + $message = NULL; + $perm = $this->getCheckPermissions(); + + while (TRUE) { + if (!$queue->isActive()) { + break; + } + + if (static::isFinite($this->maxRequests) && $requests >= $this->maxRequests) { + $message = sprintf('Reached request limit (%d)', $this->maxRequests); + break; + } + if (static::isFinite($this->maxDuration) && (microtime(TRUE) - $startTime) >= $this->maxDuration) { + $message = sprintf('Reached duration limit (%d)', $this->maxDuration); + break; + } + + try { + $requests++; + + $claims = Queue::claimItems($perm)->setQueue($this->queue)->execute()->getArrayCopy(); + if (empty($claims)) { + $message = 'No claimable items'; + break; + } + + $batchResults = Queue::runItems($perm)->setQueue($this->queue)->setItems($claims)->execute(); + } + catch (\Throwable $t) { + $errors++; + $message = sprintf('Queue-item raised unhandled exception (%s: %s)', get_class($t), $t->getMessage()); + break; + } + + foreach ($batchResults as $batchResult) { + if ($batchResult['outcome'] === 'ok') { + $successes++; + } + else { + $errors++; + // Should we stop? No, we're just reporting stats. + // What about queues with policy "error=>abort"? They must update ("status=>aborted") under the aegis of runItems(). + // Stopping here would obscure problems that affect all main-loops. + } + } + } + + $result[] = [ + 'loop_duration' => sprintf('%.3f', microtime(TRUE) - $startTime), + 'loop_requests' => $requests, + 'item_successes' => $successes, + 'item_errors' => $errors, + 'queue_ready' => $queue->getStatistic('ready'), + 'queue_blocked' => $queue->getStatistic('blocked'), + 'queue_total' => $queue->getStatistic('total'), + 'exit_message' => $message, + ]; + } + + private static function isFinite($value): bool { + return $value !== NULL && $value >= 0; + } + +} diff --git a/Civi/Api4/Queue.php b/Civi/Api4/Queue.php index f504862457..ef468032e9 100644 --- a/Civi/Api4/Queue.php +++ b/Civi/Api4/Queue.php @@ -12,6 +12,7 @@ namespace Civi\Api4; use Civi\Api4\Action\Queue\ClaimItems; use Civi\Api4\Action\Queue\RunItems; +use Civi\Api4\Action\Queue\RunLoop; /** * Track a list of durable/scannable queues. @@ -65,4 +66,19 @@ class Queue extends \Civi\Api4\Generic\DAOEntity { ->setCheckPermissions($checkPermissions); } + /** + * Run a series of items from a queue. + * + * Note: This is appropriate for persistent, auto-run queues. + * + * Note: `runLoop()` executes several units-of-work. It may handle multiple batches. + * + * @param bool $checkPermissions + * @return \Civi\Api4\Action\Queue\RunLoop + */ + public static function runLoop($checkPermissions = TRUE) { + return (new RunLoop(static::getEntityName(), __FUNCTION__)) + ->setCheckPermissions($checkPermissions); + } + } diff --git a/tests/phpunit/api/v4/Entity/QueueTest.php b/tests/phpunit/api/v4/Entity/QueueTest.php index 720dc53709..e15eb9e364 100644 --- a/tests/phpunit/api/v4/Entity/QueueTest.php +++ b/tests/phpunit/api/v4/Entity/QueueTest.php @@ -171,6 +171,49 @@ class QueueTest extends Api4TestBase { $this->assertEquals([6], \Civi::$statics[__CLASS__]['onHookQueueRunLog'][2]); } + public function testRunLoop() { + $queueName = 'QueueTest_' . md5(random_bytes(32)) . '_runloop'; + \Civi::dispatcher()->addListener('hook_civicrm_queueRun_testStuff', [$this, 'onHookQueueRun']); + $queue = \Civi::queue($queueName, [ + 'type' => 'SqlParallel', + 'runner' => 'testStuff', + 'error' => 'delete', + 'batch_limit' => 4, + ]); + $this->assertQueueStats(0, 0, 0, $queue); + + for ($i = 0; $i < 20; $i++) { + \Civi::queue($queueName)->createItem(['thingy' => $i]); + } + + // 20 items ==> 4 per batch ==> 5 batches. Let's run the first 3... + $result = Queue::runLoop(0)->setQueue($queueName)->setMaxRequests(3)->execute(); + $this->assertEquals([0, 1, 2, 3], \Civi::$statics[__CLASS__]['onHookQueueRunLog'][0], 'Scope of first batch'); + $this->assertEquals([4, 5, 6, 7], \Civi::$statics[__CLASS__]['onHookQueueRunLog'][1], 'Scope of second batch'); + $this->assertEquals([8, 9, 10, 11], \Civi::$statics[__CLASS__]['onHookQueueRunLog'][2], 'Scope of third batch'); + $this->assertEquals(12, $result[0]['item_successes']); + $this->assertEquals(0, $result[0]['item_errors']); + $this->assertEquals(3, $result[0]['loop_requests']); + $this->assertTrue(is_numeric($result[0]['loop_duration'])); + $this->assertEquals('Reached request limit (3)', $result[0]['exit_message']); + $this->assertEquals(0, $result[0]['queue_blocked']); + $this->assertEquals(8, $result[0]['queue_ready'], 'Due to request limit, we left some items in queue'); + $this->assertEquals(8, $result[0]['queue_total'], 'Due to request limit, we left some items in queue'); + + // And run any remaining batches... + $result = Queue::runLoop(0)->setQueue($queueName)->setMaxRequests(10)->execute(); + $this->assertEquals([12, 13, 14, 15], \Civi::$statics[__CLASS__]['onHookQueueRunLog'][3], 'Scope of fourth batch'); + $this->assertEquals([16, 17, 18, 19], \Civi::$statics[__CLASS__]['onHookQueueRunLog'][4], 'Scope of fifth batch'); + $this->assertEquals(8, $result[0]['item_successes']); + $this->assertEquals(0, $result[0]['item_errors']); + $this->assertEquals(2 + 1, $result[0]['loop_requests']); + $this->assertTrue(is_numeric($result[0]['loop_duration'])); + $this->assertEquals('No claimable items', $result[0]['exit_message']); + $this->assertEquals(0, $result[0]['queue_blocked'], 'Queue should be empty'); + $this->assertEquals(0, $result[0]['queue_ready'], 'Queue should be empty'); + $this->assertEquals(0, $result[0]['queue_total'], 'Queue should be empty'); + } + /** * @param \Civi\Core\Event\GenericHookEvent $e * @see CRM_Utils_Hook::queueRun() -- 2.25.1