From 86ffee74755870de44119ca5592f3d5a64ec58c5 Mon Sep 17 00:00:00 2001 From: Tim Otten Date: Tue, 31 May 2022 00:02:20 -0700 Subject: [PATCH] Civi::queue(...$queueSpec) - Accept the 'status' and 'error' options Note: We only enforce this requirement for queues that use the `runner` option. Older queue-users may not specify a `runner` -- in which case they're responsible for establishing their own runner, and their runner problem doesn't care what is specified here. --- CRM/Queue/Service.php | 46 ++++++++++++++++++++++++--- Civi.php | 4 +-- tests/phpunit/CRM/Queue/QueueTest.php | 45 ++++++++++++++++++++++++++ 3 files changed, 88 insertions(+), 7 deletions(-) diff --git a/CRM/Queue/Service.php b/CRM/Queue/Service.php index 2a9815ecfc..0fd874a96b 100644 --- a/CRM/Queue/Service.php +++ b/CRM/Queue/Service.php @@ -43,7 +43,7 @@ class CRM_Queue_Service { * @var string[] * @readonly */ - private static $commonFields = ['name', 'type', 'runner', 'batch_limit', 'lease_time', 'retry_limit', 'retry_interval']; + private static $commonFields = ['name', 'type', 'runner', 'status', 'error', 'batch_limit', 'lease_time', 'retry_limit', 'retry_interval']; /** * FIXME: Singleton pattern should be removed when dependency-injection @@ -90,6 +90,10 @@ class CRM_Queue_Service { * - is_persistent: bool, optional; if true, then this queue is loaded from `civicrm_queue` list * - runner: string, optional; if given, then items in this queue can run * automatically via `hook_civicrm_queueRun_{$runner}` + * - status: string, required for runnable-queues; specify whether the runner is currently active + * ex: 'active', 'draft', 'completed' + * - error: string, required for runnable-queues; specify what to do with unhandled errors + * ex: "drop" or "abort" * - batch_limit: int, Maximum number of items in a batch. * - lease_time: int, When claiming an item (or batch of items) for work, how long should the item(s) be reserved. (Seconds) * - retry_limit: int, Number of permitted retries. Set to zero (0) to disable. @@ -104,6 +108,7 @@ class CRM_Queue_Service { if (!empty($queueSpec['is_persistent'])) { $queueSpec = $this->findCreateQueueSpec($queueSpec); } + $this->validateQueueSpec($queueSpec); $queue = $this->instantiateQueueObject($queueSpec); $exists = $queue->existsQueue(); if (!$exists) { @@ -137,10 +142,7 @@ class CRM_Queue_Service { return $loaded; } - if (empty($queueSpec['type'])) { - throw new \CRM_Core_Exception(sprintf('Failed to find or create persistent queue "%s". Missing field "%s".', - $queueSpec['name'], 'type')); - } + $this->validateQueueSpec($queueSpec); $dao = new CRM_Queue_DAO_Queue(); $dao->name = $queueSpec['name']; @@ -217,4 +219,38 @@ class CRM_Queue_Service { return $class->newInstance($queueSpec); } + /** + * Assert that the queueSpec is well-formed. + * + * @param array $queueSpec + * @throws \CRM_Core_Exception + */ + public function validateQueueSpec(array $queueSpec): void { + $throw = function(string $message, ...$args) use ($queueSpec) { + $prefix = sprintf('Failed to create queue "%s". ', $queueSpec['name']); + throw new CRM_Core_Exception($prefix . sprintf($message, ...$args)); + }; + + if (empty($queueSpec['type'])) { + $throw('Missing field "type".'); + } + + // The rest of the validations only apply to persistent, runnable queues. + if (empty($queueSpec['is_persistent']) || empty($queueSpec['runner'])) { + return; + } + + $statuses = CRM_Queue_BAO_Queue::getStatuses(); + $status = $queueSpec['status'] ?? NULL; + if (!isset($statuses[$status])) { + $throw('Invalid queue status "%s".', $status); + } + + $errorModes = CRM_Queue_BAO_Queue::getErrorModes(); + $errorMode = $queueSpec['error'] ?? NULL; + if ($queueSpec['runner'] === 'task' && !isset($errorModes[$errorMode])) { + $throw('Invalid error mode "%s".', $errorMode); + } + } + } diff --git a/Civi.php b/Civi.php index 3c9d71f929..7408a10f04 100644 --- a/Civi.php +++ b/Civi.php @@ -117,13 +117,13 @@ class Civi { * Specification for a queue. * This is not required for accessing an existing queue. * Specify this if you wish to auto-create the queue or to include advanced options (eg `reset`). - * Example: ['type' => 'SqlParallel'] + * Example: ['type' => 'SqlParallel', 'error' => 'drop'] * Defaults: ['reset'=>FALSE, 'is_persistent'=>TRUE, 'is_autorun'=>FALSE] * @return \CRM_Queue_Queue * @see \CRM_Queue_Service */ public static function queue(string $name, array $params = []): CRM_Queue_Queue { - $defaults = ['reset' => FALSE, 'is_persistent' => TRUE]; + $defaults = ['reset' => FALSE, 'is_persistent' => TRUE, 'status' => 'active']; $params = array_merge($defaults, $params, ['name' => $name]); return CRM_Queue_Service::singleton()->create($params); } diff --git a/tests/phpunit/CRM/Queue/QueueTest.php b/tests/phpunit/CRM/Queue/QueueTest.php index 38c8bd2b8d..1b20bb7e51 100644 --- a/tests/phpunit/CRM/Queue/QueueTest.php +++ b/tests/phpunit/CRM/Queue/QueueTest.php @@ -62,6 +62,51 @@ class CRM_Queue_QueueTest extends CiviUnitTestCase { parent::tearDown(); } + /** + * If the queue has an automatic background runner (`runner`), then it + * must also have an `error` policy. + */ + public function testRunnerRequiresErrorPolicy() { + try { + $q1 = Civi::queue('test/incomplete/1', [ + 'type' => 'Sql', + 'runner' => 'task', + ]); + $this->fail('Should fail without error policy'); + } + catch (CRM_Core_Exception $e) { + $this->assertRegExp('/Invalid error mode/', $e->getMessage()); + } + + $q2 = Civi::queue('test/complete/2', [ + 'type' => 'Sql', + 'runner' => 'task', + 'error' => 'delete', + ]); + $this->assertTrue($q2 instanceof CRM_Queue_Queue_Sql); + } + + public function testStatuses() { + $q1 = Civi::queue('test/valid/default', [ + 'type' => 'Sql', + 'runner' => 'task', + 'error' => 'delete', + ]); + $this->assertTrue($q1 instanceof CRM_Queue_Queue_Sql); + $this->assertDBQuery('active', "SELECT status FROM civicrm_queue WHERE name = 'test/valid/default'"); + + foreach (['draft', 'active', 'complete', 'aborted'] as $n => $exampleStatus) { + $q1 = Civi::queue("test/valid/$n", [ + 'type' => 'Sql', + 'runner' => 'task', + 'error' => 'delete', + 'status' => $exampleStatus, + ]); + $this->assertTrue($q1 instanceof CRM_Queue_Queue_Sql); + $this->assertDBQuery($exampleStatus, "SELECT status FROM civicrm_queue WHERE name = 'test/valid/$n'"); + } + } + /** * Create a few queue items; alternately enqueue and dequeue various * -- 2.25.1