From 5891a2af8adf0f79282a3f35c688be9aec101531 Mon Sep 17 00:00:00 2001 From: Tim Otten Date: Fri, 26 Aug 2022 16:01:13 -0700 Subject: [PATCH] Queues - Allow passing `$options['release_time']` for new tasks --- CRM/Queue/Queue.php | 1 + CRM/Queue/Queue/Memory.php | 4 +++ CRM/Queue/Queue/SqlTrait.php | 4 +++ tests/phpunit/api/v4/Entity/QueueTest.php | 33 +++++++++++++++++++++++ 4 files changed, 42 insertions(+) diff --git a/CRM/Queue/Queue.php b/CRM/Queue/Queue.php index 83c27b0f49..9ff7d9b10c 100644 --- a/CRM/Queue/Queue.php +++ b/CRM/Queue/Queue.php @@ -130,6 +130,7 @@ abstract class CRM_Queue_Queue { * @param array $options * Queue-dependent options; for example, if this is a * priority-queue, then $options might specify the item's priority. + * Ex: ['release_time' => strtotime('+3 hours')] */ abstract public function createItem($data, $options = []); diff --git a/CRM/Queue/Queue/Memory.php b/CRM/Queue/Queue/Memory.php index b11630f2f5..7fd1ab6ae2 100644 --- a/CRM/Queue/Queue/Memory.php +++ b/CRM/Queue/Queue/Memory.php @@ -97,12 +97,16 @@ class CRM_Queue_Queue_Memory extends CRM_Queue_Queue { * @param array $options * Queue-dependent options; for example, if this is a * priority-queue, then $options might specify the item's priority. + * Ex: ['release_time' => strtotime('+3 hours')] */ public function createItem($data, $options = []) { $id = $this->nextQueueItemId++; // force copy, no unintendedsharing effects from pointers $this->items[$id] = serialize($data); $this->runCounts[$id] = 0; + if (isset($options['release_time'])) { + $this->releaseTimes[$id] = $options['release_time']; + } } /** diff --git a/CRM/Queue/Queue/SqlTrait.php b/CRM/Queue/Queue/SqlTrait.php index 0f68620c71..886eafa0c3 100644 --- a/CRM/Queue/Queue/SqlTrait.php +++ b/CRM/Queue/Queue/SqlTrait.php @@ -73,6 +73,7 @@ trait CRM_Queue_Queue_SqlTrait { * @param array $options * Queue-dependent options; for example, if this is a * priority-queue, then $options might specify the item's priority. + * Ex: ['release_time' => strtotime('+3 hours')] */ public function createItem($data, $options = []) { $dao = new CRM_Queue_DAO_QueueItem(); @@ -80,6 +81,9 @@ trait CRM_Queue_Queue_SqlTrait { $dao->submit_time = CRM_Utils_Time::getTime('YmdHis'); $dao->data = serialize($data); $dao->weight = CRM_Utils_Array::value('weight', $options, 0); + if (isset($options['release_time'])) { + $dao->release_time = date('Y-m-d H:i:s', $options['release_time']); + } $dao->save(); } diff --git a/tests/phpunit/api/v4/Entity/QueueTest.php b/tests/phpunit/api/v4/Entity/QueueTest.php index 379b34c08c..6f74de2ecc 100644 --- a/tests/phpunit/api/v4/Entity/QueueTest.php +++ b/tests/phpunit/api/v4/Entity/QueueTest.php @@ -211,6 +211,39 @@ class QueueTest extends Api4TestBase { $this->assertEquals(0, $startResult->count()); } + public function getDelayableDrivers(): array { + return [ + 'Sql' => [['type' => 'Sql', 'runner' => 'task', 'error' => 'delete']], + 'SqlParallel' => [['type' => 'SqlParallel', 'runner' => 'task', 'error' => 'delete']], + 'Memory' => [['type' => 'Memory', 'runner' => 'task', 'error' => 'delete']], + ]; + } + + /** + * @dataProvider getDelayableDrivers + */ + public function testDelayedStart(array $queueSpec) { + $queueName = 'QueueTest_' . md5(random_bytes(32)) . '_delayed'; + $queue = \Civi::queue($queueName, $queueSpec); + $this->assertEquals(0, $queue->numberOfItems()); + + $releaseTime = \CRM_Utils_Time::strtotime('+3 seconds'); + \Civi::queue($queueName)->createItem(new \CRM_Queue_Task( + [QueueTest::class, 'doSomething'], + ['itwillstartanymomentnow'] + ), ['release_time' => $releaseTime]); + $this->assertEquals(1, $queue->numberOfItems()); + + // Not available... yet... + $claim1 = $queue->claimItem(); + $this->assertEquals(NULL, $claim1); + + // OK, it'll come in a few seconds... + $claim2 = $this->waitForClaim(0.5, 6, $queueName); + $this->assertEquals('itwillstartanymomentnow', $claim2['data']['arguments'][0]); + $this->assertTrue(\CRM_Utils_Time::time() >= $releaseTime); + } + public function getErrorModes(): array { return [ 'delete' => ['delete'], -- 2.25.1