From 8369e17be0a33fa041370498929108e3d20b9729 Mon Sep 17 00:00:00 2001 From: Tim Otten Date: Wed, 2 Feb 2022 00:41:03 -0800 Subject: [PATCH] Civi\Api4\Queue - Add APIs for claiming and running enqueued tasks --- Civi/Api4/Action/Queue/ClaimItems.php | 90 +++++ Civi/Api4/Action/Queue/RunItems.php | 119 +++++++ Civi/Api4/Queue.php | 30 ++ tests/phpunit/api/v4/Entity/QueueTest.php | 409 ++++++++++++++++++++++ 4 files changed, 648 insertions(+) create mode 100644 Civi/Api4/Action/Queue/ClaimItems.php create mode 100644 Civi/Api4/Action/Queue/RunItems.php create mode 100644 tests/phpunit/api/v4/Entity/QueueTest.php diff --git a/Civi/Api4/Action/Queue/ClaimItems.php b/Civi/Api4/Action/Queue/ClaimItems.php new file mode 100644 index 0000000000..8d05495060 --- /dev/null +++ b/Civi/Api4/Action/Queue/ClaimItems.php @@ -0,0 +1,90 @@ +select = empty($this->select) ? ['id', 'data', 'queue'] : $this->select; + $queue = $this->queue(); + if (!$queue->isActive()) { + return; + } + + $isBatch = $queue instanceof \CRM_Queue_Queue_BatchQueueInterface; + $limit = $queue->getSpec('batch_limit') ?: 1; + if ($limit > 1 && !$isBatch) { + throw new \API_Exception(sprintf('Queue "%s" (%s) does not support batching.', $queue->getName(), get_class($queue))); + // Note 1: Simply looping over `claimItem()` is unlikley to help the consumer b/c + // drivers like Sql+Memory are linear+blocking. + // Note 2: The default is batch_limit=1. So someone has specifically chosen an invalid configuration... + } + $items = $isBatch ? $queue->claimItems($limit) : [$queue->claimItem()]; + + foreach ($items as $item) { + if ($item) { + $result[] = $this->convertItemToStub($item); + } + } + } + + /** + * @param \CRM_Queue_DAO_QueueItem|\stdClass $item + * @return array + */ + protected function convertItemToStub(object $item): array { + $array = []; + foreach ($this->select as $field) { + switch ($field) { + case 'id': + $array['id'] = $item->id; + break; + + case 'data': + $array['data'] = (array) $item->data; + break; + + case 'queue': + $array['queue'] = $this->queue; + break; + + } + } + return $array; + } + + protected function queue(): \CRM_Queue_Queue { + if (empty($this->queue)) { + throw new \API_Exception('Missing required parameter: $queue'); + } + return \Civi::queue($this->queue); + } + +} diff --git a/Civi/Api4/Action/Queue/RunItems.php b/Civi/Api4/Action/Queue/RunItems.php new file mode 100644 index 0000000000..b65ccf7def --- /dev/null +++ b/Civi/Api4/Action/Queue/RunItems.php @@ -0,0 +1,119 @@ +setQueue($item)->setLeaseTime(600)->execute()->single(); + * $result = Civi\Api4\Queue::runItem()->setItem($todo)->execute()->single(); + * assert(in_array($result['outcome'], ['ok', 'retry', 'fail'])) + * + * $result = Civi\Api4\Queue::runItem()->setQueue('foo')->execute()->first(); + * assert(in_array($result['outcome'], ['ok', 'retry', 'fail'])) + * ``` + * + * Valid outcomes are: + * - 'ok': Task executed normally. Removed from queue. + * - 'retry': Task encountered an error. Will try again later. + * - 'fail': Task encountered an error. Will not try again later. Removed from queue. + * + * @method $this setItem(?array $item) + * @method ?array getItem() + * @method ?string setQueue + * @method $this setQueue(?string $queue) + */ +class RunItems extends \Civi\Api4\Generic\AbstractAction { + + /** + * Previously claimed item - which should now be released. + * + * @var array|null + * Fields: {id: scalar, queue: string} + */ + protected $items; + + /** + * Name of the target queue. + * + * @var string|null + */ + protected $queue; + + public function _run(\Civi\Api4\Generic\Result $result) { + if (!empty($this->items)) { + $this->validateItemStubs(); + $queue = \Civi::queue($this->items[0]['queue']); + $ids = \CRM_Utils_Array::collect('id', $this->items); + if (count($ids) > 1 && !($queue instanceof \CRM_Queue_Queue_BatchQueueInterface)) { + throw new \API_Exception("runItems: Error: Running multiple items requires BatchQueueInterface"); + } + if (count($ids) > 1) { + $items = $queue->fetchItems($ids); + } + else { + $items = [$queue->fetchItem($ids[0])]; + } + } + elseif (!empty($this->queue)) { + $queue = \Civi::queue($this->queue); + if (!$queue->isActive()) { + return; + } + $items = $queue instanceof \CRM_Queue_Queue_BatchQueueInterface + ? $queue->claimItems($queue->getSpec('batch_limit') ?: 1) + : [$queue->claimItem()]; + } + else { + throw new \API_Exception("runItems: Requires either 'queue' or 'item'."); + } + + if (empty($items)) { + return; + } + + $outcomes = []; + \CRM_Utils_Hook::queueRun($queue, $items, $outcomes); + if (empty($outcomes)) { + throw new \API_Exception(sprintf('Failed to run queue items (name=%s, runner=%s, itemCount=%d, outcomeCount=%d)', + $queue->getName(), $queue->getSpec('runner'), count($items), count($outcomes))); + } + foreach ($items as $itemPos => $item) { + $result[] = ['outcome' => $outcomes[$itemPos], 'item' => $this->createItemStub($item)]; + } + } + + private function validateItemStubs(): void { + $queueNames = []; + if (!isset($this->items[0])) { + throw new \API_Exception("Queue items must be given as numeric array."); + } + foreach ($this->items as $item) { + if (empty($item['queue'])) { + throw new \API_Exception("Queue item requires property 'queue'."); + } + if (empty($item['id'])) { + throw new \API_Exception("Queue item requires property 'id'."); + } + $queueNames[$item['queue']] = 1; + } + if (count($queueNames) > 1) { + throw new \API_Exception("Queue items cannot be mixed. Found queues: " . implode(', ', array_keys($queueNames))); + } + } + + private function createItemStub($item): array { + return ['id' => $item->id, 'queue' => $item->queue_name]; + } + +} diff --git a/Civi/Api4/Queue.php b/Civi/Api4/Queue.php index e3ae1acbf3..86173f6d3c 100644 --- a/Civi/Api4/Queue.php +++ b/Civi/Api4/Queue.php @@ -10,6 +10,9 @@ */ namespace Civi\Api4; +use Civi\Api4\Action\Queue\ClaimItems; +use Civi\Api4\Action\Queue\RunItems; + /** * Track a list of durable/scannable queues. * @@ -31,7 +34,34 @@ class Queue extends \Civi\Api4\Generic\DAOEntity { return [ 'meta' => ['access CiviCRM'], 'default' => ['administer queues'], + 'runItem' => [\CRM_Core_Permission::ALWAYS_DENY_PERMISSION], ]; } + /** + * Claim an item from the queue. Returns zero or one items. + * + * Note: This is appropriate for persistent, auto-run queues. + * + * @param bool $checkPermissions + * @return \Civi\Api4\Action\Queue\ClaimItems + */ + public static function claimItems($checkPermissions = TRUE) { + return (new ClaimItems(static::getEntityName(), __FUNCTION__)) + ->setCheckPermissions($checkPermissions); + } + + /** + * Run an item from the queue. + * + * Note: This is appropriate for persistent, auto-run queues. + * + * @param bool $checkPermissions + * @return \Civi\Api4\Action\Queue\RunItems + */ + public static function runItems($checkPermissions = TRUE) { + return (new RunItems(static::getEntityName(), __FUNCTION__)) + ->setCheckPermissions($checkPermissions); + } + } diff --git a/tests/phpunit/api/v4/Entity/QueueTest.php b/tests/phpunit/api/v4/Entity/QueueTest.php new file mode 100644 index 0000000000..1e0f70557d --- /dev/null +++ b/tests/phpunit/api/v4/Entity/QueueTest.php @@ -0,0 +1,409 @@ + TRUE, + 'doSomethingLog' => [], + 'onHookQueueRunLog' => [], + ]; + parent::setUp(); + } + + /** + * Setup a queue with a line of back-to-back tasks. + * + * The first task runs normally. The second task fails at first, but it is retried, and then + * succeeds. + * + * @throws \API_Exception + * @throws \Civi\API\Exception\UnauthorizedException + */ + public function testBasicLinearPolling() { + $queueName = 'QueueTest_' . md5(random_bytes(32)) . '_linear'; + $queue = \Civi::queue($queueName, [ + 'type' => 'Sql', + 'runner' => 'task', + 'error' => 'delete', + 'retry_limit' => 2, + 'retry_interval' => 4, + ]); + $this->assertEquals(0, $queue->numberOfItems()); + + \Civi::queue($queueName)->createItem(new \CRM_Queue_Task( + [QueueTest::class, 'doSomething'], + ['first'] + )); + \Civi::queue($queueName)->createItem(new \CRM_Queue_Task( + [QueueTest::class, 'doSomething'], + ['second'] + )); + + // Get item #1. Run it. Finish it. + $first = Queue::claimItems()->setQueue($queueName)->execute()->single(); + $this->assertCallback('doSomething', ['first'], $first); + $this->assertEquals(0, count(Queue::claimItems()->setQueue($queueName)->execute()), 'Linear queue should not return more items while first item is pending.'); + $firstResult = Queue::runItems(0)->setItems([$first])->execute()->single(); + $this->assertEquals('ok', $firstResult['outcome']); + $this->assertEquals($first['id'], $firstResult['item']['id']); + $this->assertEquals($first['queue'], $firstResult['item']['queue']); + $this->assertEquals(['first_ok'], \Civi::$statics[__CLASS__]['doSomethingLog']); + + // Get item #2. Run it - but fail! + $second = Queue::claimItems()->setQueue($queueName)->execute()->single(); + $this->assertCallback('doSomething', ['second'], $second); + \Civi::$statics[__CLASS__]['doSomethingResult'] = FALSE; + $secondResult = Queue::runItems(0)->setItems([$second])->execute()->single(); + \Civi::$statics[__CLASS__]['doSomethingResult'] = TRUE; + $this->assertEquals('retry', $secondResult['outcome']); + $this->assertEquals(['first_ok', 'second_err'], \Civi::$statics[__CLASS__]['doSomethingLog']); + + // Item #2 is delayed... it'll take a few seconds to come up... + $waitCount = $this->waitFor(1.0, 10, function() use ($queueName, &$retrySecond): bool { + $retrySecond = Queue::claimItems()->setQueue($queueName)->execute()->first(); + return !empty($retrySecond); + }); + $this->assertTrue($waitCount > 0, 'Failed task should not become available immediately. It should take a few seconds.'); + $this->assertCallback('doSomething', ['second'], $retrySecond); + $retrySecondResult = Queue::runItems(0)->setItems([$retrySecond])->execute()->single(); + $this->assertEquals('ok', $retrySecondResult['outcome']); + $this->assertEquals(['first_ok', 'second_err', 'second_ok'], \Civi::$statics[__CLASS__]['doSomethingLog']); + + // All done. + $this->assertEquals(0, $queue->numberOfItems()); + } + + public function testBasicParallelPolling() { + $queueName = 'QueueTest_' . md5(random_bytes(32)) . '_parallel'; + $queue = \Civi::queue($queueName, ['type' => 'SqlParallel', 'runner' => 'task', 'error' => 'delete']); + $this->assertEquals(0, $queue->numberOfItems()); + + \Civi::queue($queueName)->createItem(new \CRM_Queue_Task( + [QueueTest::class, 'doSomething'], + ['first'] + )); + \Civi::queue($queueName)->createItem(new \CRM_Queue_Task( + [QueueTest::class, 'doSomething'], + ['second'] + )); + + $first = Queue::claimItems()->setQueue($queueName)->execute()->single(); + $second = Queue::claimItems()->setQueue($queueName)->execute()->single(); + + $this->assertCallback('doSomething', ['first'], $first); + $this->assertCallback('doSomething', ['second'], $second); + + // Just for fun, let's run these tasks in opposite order. + + Queue::runItems(0)->setItems([$second])->execute(); + $this->assertEquals(['second_ok'], \Civi::$statics[__CLASS__]['doSomethingLog']); + + Queue::runItems(0)->setItems([$first])->execute(); + $this->assertEquals(['second_ok', 'first_ok'], \Civi::$statics[__CLASS__]['doSomethingLog']); + + $this->assertEquals(0, $queue->numberOfItems()); + } + + /** + * Create a parallel queue. Claim and execute tasks as batches. + * + * Batches are executed via `hook_civicrm_queueRun_{runner}`. + * + * @throws \API_Exception + * @throws \Civi\API\Exception\UnauthorizedException + */ + public function testBatchParallelPolling() { + $queueName = 'QueueTest_' . md5(random_bytes(32)) . '_parallel'; + \Civi::dispatcher()->addListener('hook_civicrm_queueRun_testStuff', [$this, 'onHookQueueRun']); + $queue = \Civi::queue($queueName, [ + 'type' => 'SqlParallel', + 'runner' => 'testStuff', + 'error' => 'delete', + 'batch_limit' => 3, + ]); + $this->assertEquals(0, $queue->numberOfItems()); + + for ($i = 0; $i < 7; $i++) { + \Civi::queue($queueName)->createItem(['thingy' => $i]); + } + + $result = Queue::runItems(0)->setQueue($queueName)->execute(); + $this->assertEquals(3, count($result)); + $this->assertEquals([0, 1, 2], \Civi::$statics[__CLASS__]['onHookQueueRunLog'][0]); + + $result = Queue::runItems(0)->setQueue($queueName)->execute(); + $this->assertEquals(3, count($result)); + $this->assertEquals([3, 4, 5], \Civi::$statics[__CLASS__]['onHookQueueRunLog'][1]); + + $result = Queue::runItems(0)->setQueue($queueName)->execute(); + $this->assertEquals(1, count($result)); + $this->assertEquals([6], \Civi::$statics[__CLASS__]['onHookQueueRunLog'][2]); + } + + /** + * @param \Civi\Core\Event\GenericHookEvent $e + * @see CRM_Utils_Hook::queueRun() + */ + public function onHookQueueRun(GenericHookEvent $e): void { + \Civi::$statics[__CLASS__]['onHookQueueRunLog'][] = array_map( + function($item) { + return $item->data['thingy']; + }, + $e->items + ); + + foreach ($e->items as $itemKey => $item) { + $e->outcomes[$itemKey] = 'ok'; + $e->queue->deleteItem($item); + } + } + + public function testSelect() { + $queueName = 'QueueTest_' . md5(random_bytes(32)) . '_parallel'; + $queue = \Civi::queue($queueName, ['type' => 'SqlParallel', 'runner' => 'task', 'error' => 'delete']); + $this->assertEquals(0, $queue->numberOfItems()); + + \Civi::queue($queueName)->createItem(new \CRM_Queue_Task( + [QueueTest::class, 'doSomething'], + ['first'] + )); + + $first = Queue::claimItems()->setQueue($queueName)->setSelect(['id', 'queue'])->execute()->single(); + $this->assertTrue(is_numeric($first['id'])); + $this->assertEquals($queueName, $first['queue']); + $this->assertFalse(isset($first['data'])); + } + + public function testEmptyPoll() { + $queueName = 'QueueTest_' . md5(random_bytes(32)) . '_linear'; + $queue = \Civi::queue($queueName, ['type' => 'Sql', 'runner' => 'task', 'error' => 'delete']); + $this->assertEquals(0, $queue->numberOfItems()); + + $startResult = Queue::claimItems()->setQueue($queueName)->execute(); + $this->assertEquals(0, $startResult->count()); + } + + public function getErrorModes(): array { + return [ + 'delete' => ['delete'], + 'abort' => ['abort'], + ]; + } + + /** + * Add a task which is never going to succeed. Try it multiple times (until we run out + * of retries). + * + * @param string $errorMode + * Either 'delete' or 'abort' + * @dataProvider getErrorModes + */ + public function testRetryWithPoliteExhaustion(string $errorMode) { + $queueName = 'QueueTest_' . md5(random_bytes(32)) . '_linear'; + $queue = \Civi::queue($queueName, [ + 'type' => 'Sql', + 'runner' => 'task', + 'error' => $errorMode, + 'retry_limit' => 2, + 'retry_interval' => 1, + ]); + $this->assertEquals(0, $queue->numberOfItems()); + + \Civi::queue($queueName)->createItem(new \CRM_Queue_Task( + [QueueTest::class, 'doSomething'], + ['nogooddirtyscoundrel'] + )); + + \Civi::$statics[__CLASS__]['doSomethingResult'] = FALSE; + $outcomes = []; + $this->waitFor(0.5, 15, function() use ($queueName, &$outcomes) { + $claimed = Queue::claimItems(0)->setQueue($queueName)->execute()->first(); + if (!$claimed) { + return FALSE; + } + $result = Queue::runItems(0)->setItems([$claimed])->execute()->first(); + $outcomes[] = $result['outcome']; + return ($result['outcome'] !== 'retry'); + }); + + $this->assertEquals(['retry', 'retry', $errorMode], $outcomes); + $this->assertEquals( + ['nogooddirtyscoundrel_err', 'nogooddirtyscoundrel_err', 'nogooddirtyscoundrel_err'], + \Civi::$statics[__CLASS__]['doSomethingLog'] + ); + + $expectActive = ['delete' => TRUE, 'abort' => FALSE]; + $this->assertEquals($expectActive[$errorMode], $queue->isActive()); + } + + /** + * Add a task. The task-running agent is a bit delinquent... so it forgets the first + * few tasks. But the third one works! + */ + public function testRetryWithDelinquencyAndSuccess() { + $queueName = 'QueueTest_' . md5(random_bytes(32)) . '_linear'; + $queue = \Civi::queue($queueName, [ + 'type' => 'Sql', + 'runner' => 'task', + 'error' => 'delete', + 'retry_limit' => 2, + 'retry_interval' => 0, + 'lease_time' => 1, + ]); + $this->assertEquals(0, $queue->numberOfItems()); + + \Civi::queue($queueName)->createItem(new \CRM_Queue_Task( + [QueueTest::class, 'doSomething'], + ['playinghooky'] + )); + $this->assertEquals(1, $queue->numberOfItems()); + + $claim1 = $this->waitForClaim(0.5, 5, $queueName); + // Oops, don't do anything with claim #1! + $this->assertEquals(1, $queue->numberOfItems()); + $this->assertEquals([], \Civi::$statics[__CLASS__]['doSomethingLog']); + + $claim2 = $this->waitForClaim(0.5, 5, $queueName); + // Oops, don't do anything with claim #2! + $this->assertEquals(1, $queue->numberOfItems()); + $this->assertEquals([], \Civi::$statics[__CLASS__]['doSomethingLog']); + + $claim3 = $this->waitForClaim(0.5, 5, $queueName); + $this->assertEquals(1, $queue->numberOfItems()); + $result = Queue::runItems(0)->setItems([$claim3])->execute()->first(); + $this->assertEquals(0, $queue->numberOfItems()); + $this->assertEquals(['playinghooky_ok'], \Civi::$statics[__CLASS__]['doSomethingLog']); + $this->assertEquals('ok', $result['outcome']); + } + + /** + * Add a task which is never going to succeed. The task fails every time, and eventually + * we either delete it or abort the queue. + * + * @param string $errorMode + * Either 'delete' or 'abort' + * @dataProvider getErrorModes + */ + public function testRetryWithEventualFailure(string $errorMode) { + \Civi::$statics[__CLASS__]['doSomethingResult'] = FALSE; + + $queueName = 'QueueTest_' . md5(random_bytes(32)) . '_linear'; + $queue = \Civi::queue($queueName, [ + 'type' => 'Sql', + 'runner' => 'task', + 'error' => $errorMode, + 'retry_limit' => 2, + 'retry_interval' => 0, + 'lease_time' => 1, + ]); + $this->assertEquals(0, $queue->numberOfItems()); + + \Civi::queue($queueName)->createItem(new \CRM_Queue_Task( + [QueueTest::class, 'doSomething'], + ['playinghooky'] + )); + $this->assertEquals(1, $queue->numberOfItems()); + + $claimAndRun = function($expectOutcome, $expectEndCount) use ($queue, $queueName) { + $claim = $this->waitForClaim(0.5, 5, $queueName); + $this->assertEquals(1, $queue->numberOfItems()); + $result = Queue::runItems(0)->setItems([$claim])->execute()->first(); + $this->assertEquals($expectEndCount, $queue->numberOfItems()); + $this->assertEquals($expectOutcome, $result['outcome']); + }; + + $claimAndRun('retry', 1); + $claimAndRun('retry', 1); + switch ($errorMode) { + case 'delete': + $claimAndRun('delete', 0); + $this->assertEquals(TRUE, $queue->isActive()); + break; + + case 'abort': + $claimAndRun('abort', 1); + $this->assertEquals(FALSE, $queue->isActive()); + break; + } + + $this->assertEquals(['playinghooky_err', 'playinghooky_err', 'playinghooky_err'], \Civi::$statics[__CLASS__]['doSomethingLog']); + } + + public static function doSomething(\CRM_Queue_TaskContext $ctx, string $something) { + $ok = \Civi::$statics[__CLASS__]['doSomethingResult']; + \Civi::$statics[__CLASS__]['doSomethingLog'][] = $something . ($ok ? '_ok' : '_err'); + return $ok; + } + + protected function assertCallback($expectMethod, $expectArgs, $actualTask) { + $this->assertEquals([QueueTest::class, $expectMethod], $actualTask['data']['callback'], 'Claimed task should have expected method'); + $this->assertEquals($expectArgs, $actualTask['data']['arguments'], 'Claimed task should have expected arguments'); + } + + protected function waitForClaim(float $interval, float $timeout, string $queueName): ?array { + $claims = []; + $this->waitFor($interval, $timeout, function() use ($queueName, &$claims) { + $claimed = Queue::claimItems(0)->setQueue($queueName)->execute()->first(); + if (!$claimed) { + return FALSE; + } + $claims[] = $claimed; + return TRUE; + }); + return $claims[0] ?? NULL; + } + + /** + * Repeatedly check $condition until it returns true (or until we exhaust timeout). + * + * @param float $interval + * Seconds to wait between checks. + * @param float $timeout + * Total maximum seconds to wait across all checks. + * @param callable $condition + * The condition to check. + * @return int + * Total number of intervals we had to wait/sleep. + */ + protected function waitFor(float $interval, float $timeout, callable $condition): int { + $end = microtime(TRUE) + $timeout; + $interval *= round($interval * 1000 * 1000); + $waitCount = 0; + $ready = $condition(); + while (!$ready && microtime(TRUE) <= $end) { + usleep($interval); + $waitCount++; + $ready = $condition(); + } + $this->assertTrue($ready, 'Wait condition not met'); + return $waitCount; + } + +} -- 2.25.1