SqlParallel - Implement BatchQueueInterface
authorTim Otten <totten@civicrm.org>
Fri, 11 Feb 2022 09:07:25 +0000 (01:07 -0800)
committerTim Otten <totten@civicrm.org>
Thu, 2 Jun 2022 20:31:59 +0000 (13:31 -0700)
Before
------

Each of the `CRM_Queue_Queue_*` drivers supports a set of methods for
claiming/releasing one queue-item at a time (eg `claimItem()`,
`releaseItem()`, `deleteItem()`).

After
-----

All drivers still support the same queue-item methods.  Additionally, the
`SqlParallel` driver supports batch-oriented equivalents (`claimItems()`,
`deleteItems()`, etc).

Comments
--------

I initially looked at updating all of the drivers to support queues.  There
were a few obstacles.  Firstly, batched-claims seem semantically
questionable for queues that run 1-by-1 (`Sql`, `Memory`).  Secondly, there
are a few extensions in contrib that extend these classes and override
methods (consequently, they're looking for stable signatures).

The approach here seemed to resolve those two concerns in an OOP-safe way:
define an optional interface (`BatchQueueInterface`) which can be used to
mark enhanced functionality on queues where it makes sense (eg
`SqlParallel`).

CRM/Queue/Queue/BatchQueueInterface.php [new file with mode: 0644]
CRM/Queue/Queue/SqlParallel.php
CRM/Queue/Queue/SqlTrait.php
tests/phpunit/CRM/Queue/QueueTest.php

diff --git a/CRM/Queue/Queue/BatchQueueInterface.php b/CRM/Queue/Queue/BatchQueueInterface.php
new file mode 100644 (file)
index 0000000..fd0c836
--- /dev/null
@@ -0,0 +1,57 @@
+<?php
+/*
+ +--------------------------------------------------------------------+
+ | Copyright CiviCRM LLC. All rights reserved.                        |
+ |                                                                    |
+ | This work is published under the GNU AGPLv3 license with some      |
+ | permitted exceptions and without any warranty. For full license    |
+ | and copyright information, see https://civicrm.org/licensing       |
+ +--------------------------------------------------------------------+
+ */
+
+/**
+ * Variation on CRM_Queue_Queue which can claim/release/delete items in batches.
+ */
+interface CRM_Queue_Queue_BatchQueueInterface {
+
+  /**
+   * Get a batch of queue items.
+   *
+   * @param int $limit
+   *   Maximum number of records to claim
+   * @param int|null $lease_time
+   *   Hold a lease on the claimed item for $X seconds.
+   *   If NULL, inherit a default.
+   * @return object
+   *   with key 'data' that matches the inputted data
+   */
+  public function claimItems(int $limit, ?int $lease_time = NULL): array;
+
+  /**
+   * Remove items from the queue.
+   *
+   * @param array $items
+   *   The item returned by claimItem.
+   */
+  public function deleteItems(array $items): void;
+
+  /**
+   * Get the full data for multiple items.
+   *
+   * This is a passive peek - it does not claim/steal/release anything.
+   *
+   * @param array $ids
+   *   The unique IDs of the tasks within the queue.
+   * @return array
+   */
+  public function fetchItems(array $ids): array;
+
+  /**
+   * Return an item that could not be processed.
+   *
+   * @param array $items
+   *   The items returned by claimItem.
+   */
+  public function releaseItems(array $items): void;
+
+}
index 603436e3970c4b7ee630ef64cb4f6677e88eb86c..8573b7b99883b9aad03cfe0e3ccfdf6457a27a14 100644 (file)
@@ -12,7 +12,7 @@
 /**
  * A queue implementation which stores items in the CiviCRM SQL database
  */
-class CRM_Queue_Queue_SqlParallel extends CRM_Queue_Queue {
+class CRM_Queue_Queue_SqlParallel extends CRM_Queue_Queue implements CRM_Queue_Queue_BatchQueueInterface {
 
   use CRM_Queue_Queue_SqlTrait;
 
@@ -35,29 +35,32 @@ class CRM_Queue_Queue_SqlParallel extends CRM_Queue_Queue {
   }
 
   /**
-   * Get the next item.
-   *
-   * @param int|null $lease_time
-   *   Hold a lease on the claimed item for $X seconds.
-   *   If NULL, inherit a queue default (`$queueSpec['lease_time']`) or system default (`DEFAULT_LEASE_TIME`).
-   * @return object
-   *   With key 'data' that matches the inputted data.
+   * @inheritDoc
    */
   public function claimItem($lease_time = NULL) {
+    $items = $this->claimItems(1, $lease_time);
+    return $items[0] ?? NULL;
+  }
+
+  /**
+   * @inheritDoc
+   */
+  public function claimItems(int $limit, ?int $lease_time = NULL): array {
     $lease_time = $lease_time ?: $this->getSpec('lease_time') ?: static::DEFAULT_LEASE_TIME;
+    $limit = $this->getSpec('batch_limit') ? min($limit, $this->getSpec('batch_limit')) : $limit;
 
-    $result = NULL;
     $dao = CRM_Core_DAO::executeQuery('LOCK TABLES civicrm_queue_item WRITE;');
     $sql = "SELECT id, queue_name, submit_time, release_time, run_count, data
         FROM civicrm_queue_item
         WHERE queue_name = %1
               AND (release_time IS NULL OR release_time < %2)
         ORDER BY weight ASC, id ASC
-        LIMIT 1
+        LIMIT %3
       ";
     $params = [
       1 => [$this->getName(), 'String'],
       2 => [CRM_Utils_Time::getTime(), 'Timestamp'],
+      3 => [$limit, 'Integer'],
     ];
     $dao = CRM_Core_DAO::executeQuery($sql, $params, TRUE, 'CRM_Queue_DAO_QueueItem');
     if (is_a($dao, 'DB_Error')) {
@@ -65,22 +68,22 @@ class CRM_Queue_Queue_SqlParallel extends CRM_Queue_Queue {
       CRM_Core_Error::fatal();
     }
 
-    if ($dao->fetch()) {
+    $result = [];
+    while ($dao->fetch()) {
+      $result[] = (object) [
+        'id' => $dao->id,
+        'data' => unserialize($dao->data),
+        'queue_name' => $dao->queue_name,
+        'run_count' => 1 + (int) $dao->run_count,
+      ];
+    }
+    if ($result) {
       $nowEpoch = CRM_Utils_Time::getTimeRaw();
-      $dao->run_count++;
-      CRM_Core_DAO::executeQuery("UPDATE civicrm_queue_item SET release_time = %1, run_count = %3 WHERE id = %2", [
-        '1' => [date('YmdHis', $nowEpoch + $lease_time), 'String'],
-        '2' => [$dao->id, 'Integer'],
-        '3' => [$dao->run_count, 'Integer'],
+      $sql = CRM_Utils_SQL::interpolate('UPDATE civicrm_queue_item SET release_time = @RT, run_count = 1+run_count WHERE id IN (#ids)', [
+        'RT' => date('YmdHis', $nowEpoch + $lease_time),
+        'ids' => CRM_Utils_Array::collect('id', $result),
       ]);
-
-      // (Comment by artfulrobot Sep 2019: Not sure what the below comment means, should be removed/clarified?)
-      // work-around: inconsistent date-formatting causes unintentional breakage
-      #        $dao->submit_time = date('YmdHis', strtotime($dao->submit_time));
-      #        $dao->release_time = date('YmdHis', $nowEpoch + $lease_time);
-      #        $dao->save();
-      $dao->data = unserialize($dao->data);
-      $result = $dao;
+      CRM_Core_DAO::executeQuery($sql);
     }
 
     $dao = CRM_Core_DAO::executeQuery('UNLOCK TABLES;');
index 40a9ad6002d23976198a7e52992a8f44754af844..0f68620c712612739ea04e097b8677c0c621ddae 100644 (file)
@@ -86,12 +86,23 @@ trait CRM_Queue_Queue_SqlTrait {
   /**
    * Remove an item from the queue.
    *
-   * @param CRM_Core_DAO|stdClass $dao
+   * @param CRM_Core_DAO|stdClass $item
    *   The item returned by claimItem.
    */
-  public function deleteItem($dao) {
-    $dao->delete();
-    $dao->free();
+  public function deleteItem($item) {
+    $this->deleteItems([$item]);
+  }
+
+  public function deleteItems($items): void {
+    if (empty($items)) {
+      return;
+    }
+    $sql = CRM_Utils_SQL::interpolate('DELETE FROM civicrm_queue_item WHERE id IN (#ids) AND queue_name = @name', [
+      'ids' => CRM_Utils_Array::collect('id', $items),
+      'name' => $this->getName(),
+    ]);
+    CRM_Core_DAO::executeQuery($sql);
+    $this->freeDAOs($items);
   }
 
   /**
@@ -104,35 +115,60 @@ trait CRM_Queue_Queue_SqlTrait {
    * @return CRM_Queue_DAO_QueueItem|object|null $dao
    */
   public function fetchItem($id) {
-    $dao = new CRM_Queue_DAO_QueueItem();
-    $dao->id = $id;
-    $dao->queue_name = $this->getName();
-    if (!$dao->find(TRUE)) {
-      return NULL;
+    $items = $this->fetchItems([$id]);
+    return $items[0] ?? NULL;
+  }
+
+  public function fetchItems(array $ids): array {
+    $dao = CRM_Utils_SQL_Select::from('civicrm_queue_item')
+      ->select(['id', 'data', 'run_count'])
+      ->where('id IN (#ids)', ['ids' => $ids])
+      ->where('queue_name = @name', ['name' => $this->getName()])
+      ->execute();
+    $result = [];
+    while ($dao->fetch()) {
+      $result[] = (object) [
+        'id' => $dao->id,
+        'data' => unserialize($dao->data),
+        'run_count' => $dao->run_count,
+        'queue_name' => $this->getName(),
+      ];
     }
-    $dao->data = unserialize($dao->data);
-    return $dao;
+    return $result;
   }
 
   /**
    * Return an item that could not be processed.
    *
-   * @param CRM_Core_DAO $dao
+   * @param CRM_Core_DAO $item
    *   The item returned by claimItem.
    */
-  public function releaseItem($dao) {
-    if (empty($this->queueSpec['retry_interval'])) {
-      CRM_Core_DAO::executeQuery('UPDATE civicrm_queue_item SET release_time = NULL WHERE id = %1', [
-        1 => [$dao->id, 'Integer'],
-      ]);
+  public function releaseItem($item) {
+    $this->releaseItems([$item]);
+  }
+
+  public function releaseItems($items): void {
+    if (empty($items)) {
+      return;
     }
-    else {
-      CRM_Core_DAO::executeQuery('UPDATE civicrm_queue_item SET release_time = DATE_ADD(NOW(), INTERVAL %2 SECOND) WHERE id = %1', [
-        1 => [$dao->id, 'Integer'],
-        2 => [$this->queueSpec['retry_interval'], 'Integer'],
-      ]);
+    $sql = empty($this->queueSpec['retry_interval'])
+      ? 'UPDATE civicrm_queue_item SET release_time = NULL WHERE id IN (#ids) AND queue_name = @name'
+      : 'UPDATE civicrm_queue_item SET release_time = DATE_ADD(NOW(), INTERVAL #retry SECOND) WHERE id IN (#ids) AND queue_name = @name';
+    CRM_Core_DAO::executeQuery(CRM_Utils_SQL::interpolate($sql, [
+      'ids' => CRM_Utils_Array::collect('id', $items),
+      'name' => $this->getName(),
+      'retry' => $this->queueSpec['retry_interval'] ?? NULL,
+    ]));
+    $this->freeDAOs($items);
+  }
+
+  protected function freeDAOs($mixed) {
+    $mixed = (array) $mixed;
+    foreach ($mixed as $item) {
+      if ($item instanceof CRM_Core_DAO) {
+        $item->free();
+      }
     }
-    $dao->free();
   }
 
 }
index a5416e5ae80f68d93a0f33281bc29c636d9565b8..43912aaeae72b58d035a0643d48c7222bf820d89 100644 (file)
@@ -412,4 +412,54 @@ class CRM_Queue_QueueTest extends CiviUnitTestCase {
     $queue2->releaseItem($item);
   }
 
+  /**
+   * Grab items from a queue in batches.
+   *
+   * @dataProvider getQueueSpecs
+   * @param $queueSpec
+   */
+  public function testBatchClaim($queueSpec) {
+    $this->queue = $this->queueService->create($queueSpec);
+    $this->assertTrue($this->queue instanceof CRM_Queue_Queue);
+    if (!($this->queue instanceof CRM_Queue_Queue_BatchQueueInterface)) {
+      $this->markTestSkipped("Queue class does not support batch interface: " . get_class($this->queue));
+    }
+
+    for ($i = 0; $i < 9; $i++) {
+      $this->queue->createItem('x' . $i);
+    }
+    $this->assertEquals(9, $this->queue->numberOfItems());
+
+    // We expect this driver to be fully compliant with batching.
+    $claimsA = $this->queue->claimItems(3);
+    $claimsB = $this->queue->claimItems(3);
+    $this->assertEquals(9, $this->queue->numberOfItems());
+
+    $this->assertEquals(['x0', 'x1', 'x2'], CRM_Utils_Array::collect('data', $claimsA));
+    $this->assertEquals(['x3', 'x4', 'x5'], CRM_Utils_Array::collect('data', $claimsB));
+
+    $this->queue->deleteItems([$claimsA[0], $claimsA[1]]); /* x0, x1 */
+    $this->queue->releaseItems([$claimsA[2]]); /* x2: will retry with next claimItems() */
+    $this->queue->deleteItems([$claimsB[0], $claimsB[1]]); /* x3, x4 */
+    /* claimsB[2]: x5: Oops, we're gonna take some time to finish this one. */
+    $this->assertEquals(5, $this->queue->numberOfItems());
+
+    $claimsC = $this->queue->claimItems(3);
+    $this->assertEquals(['x2', 'x6', 'x7'], CRM_Utils_Array::collect('data', $claimsC));
+    $this->queue->deleteItem($claimsC[0]); /* x2 */
+    $this->queue->releaseItem($claimsC[1]); /* x6: will retry with next claimItems() */
+    $this->queue->deleteItem($claimsC[2]); /* x7 */
+    $this->assertEquals(3, $this->queue->numberOfItems());
+
+    $claimsD = $this->queue->claimItems(3);
+    $this->assertEquals(['x6', 'x8'], CRM_Utils_Array::collect('data', $claimsD));
+    $this->queue->deleteItem($claimsD[0]); /* x6 */
+    $this->queue->deleteItem($claimsD[1]); /* x8 */
+    $this->assertEquals(1, $this->queue->numberOfItems());
+
+    // claimsB took a while to wrap-up. But it finally did!
+    $this->queue->deleteItem($claimsB[2]); /* x5 */
+    $this->assertEquals(0, $this->queue->numberOfItems());
+  }
+
 }