Queues - Allow passing `$options['release_time']` for new tasks
authorTim Otten <totten@civicrm.org>
Fri, 26 Aug 2022 23:01:13 +0000 (16:01 -0700)
committerTim Otten <totten@civicrm.org>
Fri, 26 Aug 2022 23:01:13 +0000 (16:01 -0700)
CRM/Queue/Queue.php
CRM/Queue/Queue/Memory.php
CRM/Queue/Queue/SqlTrait.php
tests/phpunit/api/v4/Entity/QueueTest.php

index 83c27b0f4986726ebfb934cb8c8fb9b03ae12e51..9ff7d9b10c722ad9447a44617723f64fdbbc755b 100644 (file)
@@ -130,6 +130,7 @@ abstract class CRM_Queue_Queue {
    * @param array $options
    *   Queue-dependent options; for example, if this is a
    *   priority-queue, then $options might specify the item's priority.
+   *   Ex: ['release_time' => strtotime('+3 hours')]
    */
   abstract public function createItem($data, $options = []);
 
index b11630f2f516a89912c25acf66b7ab16c10c304b..7fd1ab6ae211f811a6f9776a906750c02fba524b 100644 (file)
@@ -97,12 +97,16 @@ class CRM_Queue_Queue_Memory extends CRM_Queue_Queue {
    * @param array $options
    *   Queue-dependent options; for example, if this is a
    *   priority-queue, then $options might specify the item's priority.
+   *   Ex: ['release_time' => strtotime('+3 hours')]
    */
   public function createItem($data, $options = []) {
     $id = $this->nextQueueItemId++;
     // force copy, no unintendedsharing effects from pointers
     $this->items[$id] = serialize($data);
     $this->runCounts[$id] = 0;
+    if (isset($options['release_time'])) {
+      $this->releaseTimes[$id] = $options['release_time'];
+    }
   }
 
   /**
index 0f68620c712612739ea04e097b8677c0c621ddae..886eafa0c323e731f799efbb7008c231976f45bd 100644 (file)
@@ -73,6 +73,7 @@ trait CRM_Queue_Queue_SqlTrait {
    * @param array $options
    *   Queue-dependent options; for example, if this is a
    *   priority-queue, then $options might specify the item's priority.
+   *   Ex: ['release_time' => strtotime('+3 hours')]
    */
   public function createItem($data, $options = []) {
     $dao = new CRM_Queue_DAO_QueueItem();
@@ -80,6 +81,9 @@ trait CRM_Queue_Queue_SqlTrait {
     $dao->submit_time = CRM_Utils_Time::getTime('YmdHis');
     $dao->data = serialize($data);
     $dao->weight = CRM_Utils_Array::value('weight', $options, 0);
+    if (isset($options['release_time'])) {
+      $dao->release_time = date('Y-m-d H:i:s', $options['release_time']);
+    }
     $dao->save();
   }
 
index 379b34c08c0ae8569f5b18daa0cda5295c253a06..6f74de2eccc8d636daa1ee9394060da80a7a46ec 100644 (file)
@@ -211,6 +211,39 @@ class QueueTest extends Api4TestBase {
     $this->assertEquals(0, $startResult->count());
   }
 
+  public function getDelayableDrivers(): array {
+    return [
+      'Sql' => [['type' => 'Sql', 'runner' => 'task', 'error' => 'delete']],
+      'SqlParallel' => [['type' => 'SqlParallel', 'runner' => 'task', 'error' => 'delete']],
+      'Memory' => [['type' => 'Memory', 'runner' => 'task', 'error' => 'delete']],
+    ];
+  }
+
+  /**
+   * @dataProvider getDelayableDrivers
+   */
+  public function testDelayedStart(array $queueSpec) {
+    $queueName = 'QueueTest_' . md5(random_bytes(32)) . '_delayed';
+    $queue = \Civi::queue($queueName, $queueSpec);
+    $this->assertEquals(0, $queue->numberOfItems());
+
+    $releaseTime = \CRM_Utils_Time::strtotime('+3 seconds');
+    \Civi::queue($queueName)->createItem(new \CRM_Queue_Task(
+      [QueueTest::class, 'doSomething'],
+      ['itwillstartanymomentnow']
+    ), ['release_time' => $releaseTime]);
+    $this->assertEquals(1, $queue->numberOfItems());
+
+    // Not available... yet...
+    $claim1 = $queue->claimItem();
+    $this->assertEquals(NULL, $claim1);
+
+    // OK, it'll come in a few seconds...
+    $claim2 = $this->waitForClaim(0.5, 6, $queueName);
+    $this->assertEquals('itwillstartanymomentnow', $claim2['data']['arguments'][0]);
+    $this->assertTrue(\CRM_Utils_Time::time() >= $releaseTime);
+  }
+
   public function getErrorModes(): array {
     return [
       'delete' => ['delete'],