From 677170bdd275721d2187d8ecd1d8734163342bf3 Mon Sep 17 00:00:00 2001 From: Tim Otten Date: Tue, 7 Jun 2022 01:46:34 -0700 Subject: [PATCH] Queue - When UserJob.queue_id works down to zero tasks, update status and fire hook Suppose you setup a queue with several tasks and then run them all. What happens to the queue's status? Before ------ The status always remains `active`. After ----- Depends on the use-case: * If you have an open-ended queue providing an on-going service (no `UserJob`), then the status remains `active`. * If you have a fixed-purpose queue attached to a `UserJob`, then the status changes from `active` to `completed`, and it fires an event: ``` function hook_civicrm_queueStatus(CRM_Queue_Queue $queue, string $status) ``` Technical Details ----------------- * There are two main ways that items get removed from a queue (`Queue.runNext` API and `civicrm/queue/ajax/runNext`). Both of these fire an internal event (`civi.queue.check`) to consult the status. --- CRM/Core/BAO/UserJob.php | 61 +++++++++++- CRM/Queue/Queue.php | 8 +- CRM/Queue/Runner.php | 6 ++ CRM/Utils/Hook.php | 15 +++ Civi/Api4/Action/Queue/RunItems.php | 6 ++ tests/phpunit/CRM/Queue/QueueTest.php | 31 +++++++ tests/phpunit/api/v4/Entity/QueueTest.php | 107 ++++++++++++++++++++++ 7 files changed, 231 insertions(+), 3 deletions(-) diff --git a/CRM/Core/BAO/UserJob.php b/CRM/Core/BAO/UserJob.php index 7802f4a4ba..ae476bbd1f 100644 --- a/CRM/Core/BAO/UserJob.php +++ b/CRM/Core/BAO/UserJob.php @@ -18,7 +18,66 @@ /** * This class contains user jobs functionality. */ -class CRM_Core_BAO_UserJob extends CRM_Core_DAO_UserJob { +class CRM_Core_BAO_UserJob extends CRM_Core_DAO_UserJob implements \Civi\Core\HookInterface { + + /** + * Check on the status of a queue. + * + * Queues that are attached to a UserJob are necessarily finite - so we can mark them 'completed' + * when the task-list reaches empty. + * + * Note that this runs after processing *every item* in *every queue* (foreground, background, + * import, upgrade, ad nauseum). The capacity to handle heavy tasks here is subjective (depending + * on the specific queue/use-case). We try to be conservative about I/O until we know that + * we're in a suitable context. + */ + public static function on_civi_queue_check(\Civi\Core\Event\GenericHookEvent $e) { + /** @var \CRM_Queue_Queue $queue */ + $queue = $e->queue; + $userJobId = static::findUserJobId($queue->getName()); + if ($userJobId && $queue->numberOfItems() < 1) { + $queue->setStatus('completed'); + } + } + + /** + * If the `civicrm_queue` changes status, then the `civicrm_user_job` should also change status. + * + * @param \CRM_Queue_Queue $queue + * @param string $status + * @throws \API_Exception + * @throws \Civi\API\Exception\UnauthorizedException + * @see \CRM_Utils_Hook::queueStatus() + */ + public static function hook_civicrm_queueStatus(CRM_Queue_Queue $queue, string $status) { + $userJobId = static::findUserJobId($queue->getName()); + if ($userJobId && $status === 'completed') { + \Civi\Api4\UserJob::update() + ->addWhere('id', '=', $userJobId) + ->setValues(['status_id' => 1]) + ->execute(); + } + } + + private static function findUserJobId(string $queueName): ?int { + if (CRM_Core_Config::isUpgradeMode()) { + return NULL; + } + + $key = 'userJobId_' . $queueName; + if (!isset(Civi::$statics[__CLASS__][$key])) { + // Part of the primary structure/purpose of the queue. Shouldn't change. + $userJobId = CRM_Core_DAO::singleValueQuery(' + SELECT uj.id FROM civicrm_queue q + INNER JOIN civicrm_user_job uj ON q.id = uj.queue_id + WHERE q.name = %1 + ', [ + 1 => [$queueName, 'String'], + ]); + Civi::$statics[__CLASS__][$key] = $userJobId; + } + return Civi::$statics[__CLASS__][$key]; + } /** * Restrict access to the relevant user. diff --git a/CRM/Queue/Queue.php b/CRM/Queue/Queue.php index ed2d0f4388..83c27b0f49 100644 --- a/CRM/Queue/Queue.php +++ b/CRM/Queue/Queue.php @@ -71,10 +71,14 @@ abstract class CRM_Queue_Queue { * Ex: 'active', 'draft', 'aborted' */ public function setStatus(string $status): void { - CRM_Core_DAO::executeQuery('UPDATE civicrm_queue SET status = %1 WHERE name = %2', [ - 1 => ['aborted', 'String'], + $result = CRM_Core_DAO::executeQuery('UPDATE civicrm_queue SET status = %1 WHERE name = %2', [ + 1 => [$status, 'String'], 2 => [$this->getName(), 'String'], ]); + // If multiple workers try to setStatus('completed') at roughly the same time, only one will fire an event. + if ($result->affectedRows() > 0) { + CRM_Utils_Hook::queueStatus($this, $status); + } } /** diff --git a/CRM/Queue/Runner.php b/CRM/Queue/Runner.php index ee69cb6310..150ae4bef4 100644 --- a/CRM/Queue/Runner.php +++ b/CRM/Queue/Runner.php @@ -9,6 +9,8 @@ +--------------------------------------------------------------------+ */ +use Civi\Core\Event\GenericHookEvent; + /** * `CRM_Queue_Runner` runs a list tasks from a queue. It originally supported the database-upgrade * queue. Consequently, this runner is optimal for queues which are: @@ -229,6 +231,10 @@ class CRM_Queue_Runner { $this->releaseErrorItem($item); } + \Civi::dispatcher()->dispatch('civi.queue.check', GenericHookEvent::create([ + 'queue' => $this->queue, + ])); + return $this->formatTaskResult($isOK, $exception); } else { diff --git a/CRM/Utils/Hook.php b/CRM/Utils/Hook.php index d2fb744c88..c44273c842 100644 --- a/CRM/Utils/Hook.php +++ b/CRM/Utils/Hook.php @@ -2755,6 +2755,21 @@ abstract class CRM_Utils_Hook { ); } + /** + * Fired if the status of a queue changes. + * + * @param \CRM_Queue_Queue $queue + * @param string $status + * New status. + * Ex: 'completed', 'active', 'aborted' + */ + public static function queueStatus(CRM_Queue_Queue $queue, string $status): void { + self::singleton()->invoke(['queue', 'status'], $queue, $status, + self::$_nullObject, self::$_nullObject, self::$_nullObject, self::$_nullObject, + 'civicrm_queueStatus' + ); + } + /** * This is called if automatic execution of a queue-task fails. * diff --git a/Civi/Api4/Action/Queue/RunItems.php b/Civi/Api4/Action/Queue/RunItems.php index b65ccf7def..52a2503239 100644 --- a/Civi/Api4/Action/Queue/RunItems.php +++ b/Civi/Api4/Action/Queue/RunItems.php @@ -2,6 +2,8 @@ namespace Civi\Api4\Action\Queue; +use Civi\Core\Event\GenericHookEvent; + /** * Run an enqueued item (task). * @@ -91,6 +93,10 @@ class RunItems extends \Civi\Api4\Generic\AbstractAction { foreach ($items as $itemPos => $item) { $result[] = ['outcome' => $outcomes[$itemPos], 'item' => $this->createItemStub($item)]; } + + \Civi::dispatcher()->dispatch('civi.queue.check', GenericHookEvent::create([ + 'queue' => $queue, + ])); } private function validateItemStubs(): void { diff --git a/tests/phpunit/CRM/Queue/QueueTest.php b/tests/phpunit/CRM/Queue/QueueTest.php index 43912aaeae..781da94e5f 100644 --- a/tests/phpunit/CRM/Queue/QueueTest.php +++ b/tests/phpunit/CRM/Queue/QueueTest.php @@ -462,4 +462,35 @@ class CRM_Queue_QueueTest extends CiviUnitTestCase { $this->assertEquals(0, $this->queue->numberOfItems()); } + public function testSetStatus() { + $fired = ['status-changes' => []]; + \Civi::dispatcher()->addListener('hook_civicrm_queueStatus', function ($e) use (&$fired) { + $fired[$e->queue->getName()][] = $e->status; + }); + + $q = Civi::queue('status-changes', [ + 'type' => 'Sql', + ]); + $this->assertEquals([], $fired['status-changes']); + + $q->setStatus('draft'); + $this->assertEquals(['draft'], $fired['status-changes']); + + $q->setStatus('draft'); + $q->setStatus('draft'); + $q->setStatus('draft'); + $this->assertEquals(['draft'], $fired['status-changes']); + + $q->setStatus('active'); + $this->assertEquals(['draft', 'active'], $fired['status-changes']); + + $q->setStatus('active'); + $q->setStatus('active'); + $q->setStatus('active'); + $this->assertEquals(['draft', 'active'], $fired['status-changes']); + + $q->setStatus('completed'); + $this->assertEquals(['draft', 'active', 'completed'], $fired['status-changes']); + } + } diff --git a/tests/phpunit/api/v4/Entity/QueueTest.php b/tests/phpunit/api/v4/Entity/QueueTest.php index 1e0f70557d..5f87fddcdc 100644 --- a/tests/phpunit/api/v4/Entity/QueueTest.php +++ b/tests/phpunit/api/v4/Entity/QueueTest.php @@ -20,6 +20,7 @@ namespace api\v4\Entity; use api\v4\Api4TestBase; use Civi\Api4\Queue; +use Civi\Api4\UserJob; use Civi\Core\Event\GenericHookEvent; /** @@ -356,6 +357,112 @@ class QueueTest extends Api4TestBase { $this->assertEquals(['playinghooky_err', 'playinghooky_err', 'playinghooky_err'], \Civi::$statics[__CLASS__]['doSomethingLog']); } + /** + * If a queue is created as part of a user-job, then it has a fixed scope-of-work. The status + * should flip after completing its work. + * + * @throws \API_Exception + * @throws \CRM_Core_Exception + * @throws \Civi\API\Exception\UnauthorizedException + */ + public function testUserJobQueue_Completion() { + $queueName = 'QueueTest_' . md5(random_bytes(32)) . '_userjob'; + + $firedQueueStatus = []; + \Civi::dispatcher()->addListener('hook_civicrm_queueStatus', function($e) use (&$firedQueueStatus) { + $firedQueueStatus[$e->queue->getName()] = $e->status; + }); + + $queue = \Civi::queue($queueName, [ + 'type' => 'Sql', + 'runner' => 'task', + 'error' => 'delete', + ]); + $this->assertEquals(0, $queue->numberOfItems()); + + $userJob = \Civi\Api4\UserJob::create(FALSE)->setValues([ + 'type_id:label' => 'Contact Import', + 'status_id:name' => 'in_progress', + 'queue_id.name' => $queue->getName(), + ])->execute()->single(); + + \Civi::queue($queueName)->createItem(new \CRM_Queue_Task( + [QueueTest::class, 'doSomething'], + ['first'] + )); + \Civi::queue($queueName)->createItem(new \CRM_Queue_Task( + [QueueTest::class, 'doSomething'], + ['second'] + )); + + // Verify initial status + $this->assertEquals(2, $queue->numberOfItems()); + $this->assertEquals(FALSE, isset($firedQueueStatus[$queueName])); + $this->assertEquals(TRUE, $queue->isActive()); + $this->assertEquals(4, UserJob::get()->addWhere('id', '=', $userJob['id'])->execute()->first()['status_id']); + + // OK, let's run both items - and check status afterward. + Queue::runItems(FALSE)->setQueue($queueName)->execute()->single(); + $this->assertEquals(1, $queue->numberOfItems()); + $this->assertEquals(FALSE, isset($firedQueueStatus[$queueName])); + $this->assertEquals(TRUE, $queue->isActive()); + $this->assertEquals(4, UserJob::get()->addWhere('id', '=', $userJob['id'])->execute()->first()['status_id']); + + Queue::runItems(FALSE)->setQueue($queueName)->execute()->single(); + $this->assertEquals(0, $queue->numberOfItems()); + $this->assertEquals('completed', $firedQueueStatus[$queueName]); + $this->assertEquals(FALSE, $queue->isActive()); + $this->assertEquals(1, UserJob::get()->addWhere('id', '=', $userJob['id'])->execute()->first()['status_id']); + } + + /** + * If a queue is created as a long-term service, then its work is never complete. + * + * @throws \API_Exception + * @throws \CRM_Core_Exception + * @throws \Civi\API\Exception\UnauthorizedException + */ + public function testServiceQueue_NeverComplete() { + $queueName = 'QueueTest_' . md5(random_bytes(32)) . '_service'; + + $firedQueueStatus = []; + \Civi::dispatcher()->addListener('hook_civicrm_queueStatus', function($e) use (&$firedQueueStatus) { + $firedQueueStatus[$e->queue->getName()] = $e->status; + }); + + $queue = \Civi::queue($queueName, [ + 'type' => 'Sql', + 'runner' => 'task', + 'error' => 'delete', + ]); + $this->assertEquals(0, $queue->numberOfItems()); + + \Civi::queue($queueName)->createItem(new \CRM_Queue_Task( + [QueueTest::class, 'doSomething'], + ['first'] + )); + \Civi::queue($queueName)->createItem(new \CRM_Queue_Task( + [QueueTest::class, 'doSomething'], + ['second'] + )); + + // Verify initial status + $this->assertEquals(2, $queue->numberOfItems()); + $this->assertEquals(FALSE, isset($firedQueueStatus[$queueName])); + $this->assertEquals(TRUE, $queue->isActive()); + + // OK, let's run both items - and check status afterward. + Queue::runItems(FALSE)->setQueue($queueName)->execute()->single(); + $this->assertEquals(1, $queue->numberOfItems()); + $this->assertEquals(FALSE, isset($firedQueueStatus[$queueName])); + $this->assertEquals(TRUE, $queue->isActive()); + + Queue::runItems(FALSE)->setQueue($queueName)->execute()->single(); + $this->assertEquals(0, $queue->numberOfItems()); + $this->assertEquals(FALSE, isset($firedQueueStatus[$queueName])); + $this->assertEquals(TRUE, $queue->isActive()); + } + public static function doSomething(\CRM_Queue_TaskContext $ctx, string $something) { $ok = \Civi::$statics[__CLASS__]['doSomethingResult']; \Civi::$statics[__CLASS__]['doSomethingLog'][] = $something . ($ok ? '_ok' : '_err'); -- 2.25.1