SqlParallel - Implement BatchQueueInterface
[civicrm-core.git] / CRM / Queue / Queue / SqlParallel.php
index bf48d45e9386702b29e2d4c80dad70e68588aeda..8573b7b99883b9aad03cfe0e3ccfdf6457a27a14 100644 (file)
@@ -12,7 +12,7 @@
 /**
  * A queue implementation which stores items in the CiviCRM SQL database
  */
-class CRM_Queue_Queue_SqlParallel extends CRM_Queue_Queue {
+class CRM_Queue_Queue_SqlParallel extends CRM_Queue_Queue implements CRM_Queue_Queue_BatchQueueInterface {
 
   use CRM_Queue_Queue_SqlTrait;
 
@@ -35,28 +35,32 @@ class CRM_Queue_Queue_SqlParallel extends CRM_Queue_Queue {
   }
 
   /**
-   * Get the next item.
-   *
-   * @param int $lease_time
-   *   Seconds.
-   *
-   * @return object
-   *   With key 'data' that matches the inputted data.
+   * @inheritDoc
+   */
+  public function claimItem($lease_time = NULL) {
+    $items = $this->claimItems(1, $lease_time);
+    return $items[0] ?? NULL;
+  }
+
+  /**
+   * @inheritDoc
    */
-  public function claimItem($lease_time = 3600) {
+  public function claimItems(int $limit, ?int $lease_time = NULL): array {
+    $lease_time = $lease_time ?: $this->getSpec('lease_time') ?: static::DEFAULT_LEASE_TIME;
+    $limit = $this->getSpec('batch_limit') ? min($limit, $this->getSpec('batch_limit')) : $limit;
 
-    $result = NULL;
     $dao = CRM_Core_DAO::executeQuery('LOCK TABLES civicrm_queue_item WRITE;');
-    $sql = "SELECT id, queue_name, submit_time, release_time, data
+    $sql = "SELECT id, queue_name, submit_time, release_time, run_count, data
         FROM civicrm_queue_item
         WHERE queue_name = %1
               AND (release_time IS NULL OR release_time < %2)
         ORDER BY weight ASC, id ASC
-        LIMIT 1
+        LIMIT %3
       ";
     $params = [
       1 => [$this->getName(), 'String'],
       2 => [CRM_Utils_Time::getTime(), 'Timestamp'],
+      3 => [$limit, 'Integer'],
     ];
     $dao = CRM_Core_DAO::executeQuery($sql, $params, TRUE, 'CRM_Queue_DAO_QueueItem');
     if (is_a($dao, 'DB_Error')) {
@@ -64,19 +68,22 @@ class CRM_Queue_Queue_SqlParallel extends CRM_Queue_Queue {
       CRM_Core_Error::fatal();
     }
 
-    if ($dao->fetch()) {
+    $result = [];
+    while ($dao->fetch()) {
+      $result[] = (object) [
+        'id' => $dao->id,
+        'data' => unserialize($dao->data),
+        'queue_name' => $dao->queue_name,
+        'run_count' => 1 + (int) $dao->run_count,
+      ];
+    }
+    if ($result) {
       $nowEpoch = CRM_Utils_Time::getTimeRaw();
-      CRM_Core_DAO::executeQuery("UPDATE civicrm_queue_item SET release_time = %1 WHERE id = %2", [
-        '1' => [date('YmdHis', $nowEpoch + $lease_time), 'String'],
-        '2' => [$dao->id, 'Integer'],
+      $sql = CRM_Utils_SQL::interpolate('UPDATE civicrm_queue_item SET release_time = @RT, run_count = 1+run_count WHERE id IN (#ids)', [
+        'RT' => date('YmdHis', $nowEpoch + $lease_time),
+        'ids' => CRM_Utils_Array::collect('id', $result),
       ]);
-      // (Comment by artfulrobot Sep 2019: Not sure what the below comment means, should be removed/clarified?)
-      // work-around: inconsistent date-formatting causes unintentional breakage
-      #        $dao->submit_time = date('YmdHis', strtotime($dao->submit_time));
-      #        $dao->release_time = date('YmdHis', $nowEpoch + $lease_time);
-      #        $dao->save();
-      $dao->data = unserialize($dao->data);
-      $result = $dao;
+      CRM_Core_DAO::executeQuery($sql);
     }
 
     $dao = CRM_Core_DAO::executeQuery('UNLOCK TABLES;');
@@ -87,15 +94,17 @@ class CRM_Queue_Queue_SqlParallel extends CRM_Queue_Queue {
   /**
    * Get the next item, even if there's an active lease
    *
-   * @param int $lease_time
-   *   Seconds.
-   *
+   * @param int|null $lease_time
+   *   Hold a lease on the claimed item for $X seconds.
+   *   If NULL, inherit a queue default (`$queueSpec['lease_time']`) or system default (`DEFAULT_LEASE_TIME`).
    * @return object
    *   With key 'data' that matches the inputted data.
    */
-  public function stealItem($lease_time = 3600) {
+  public function stealItem($lease_time = NULL) {
+    $lease_time = $lease_time ?: $this->getSpec('lease_time') ?: static::DEFAULT_LEASE_TIME;
+
     $sql = "
-      SELECT id, queue_name, submit_time, release_time, data
+      SELECT id, queue_name, submit_time, release_time, run_count, data
       FROM civicrm_queue_item
       WHERE queue_name = %1
       ORDER BY weight ASC, id ASC
@@ -107,9 +116,11 @@ class CRM_Queue_Queue_SqlParallel extends CRM_Queue_Queue {
     $dao = CRM_Core_DAO::executeQuery($sql, $params, TRUE, 'CRM_Queue_DAO_QueueItem');
     if ($dao->fetch()) {
       $nowEpoch = CRM_Utils_Time::getTimeRaw();
-      CRM_Core_DAO::executeQuery("UPDATE civicrm_queue_item SET release_time = %1 WHERE id = %2", [
+      $dao->run_count++;
+      CRM_Core_DAO::executeQuery("UPDATE civicrm_queue_item SET release_time = %1, run_count = %3 WHERE id = %2", [
         '1' => [date('YmdHis', $nowEpoch + $lease_time), 'String'],
         '2' => [$dao->id, 'Integer'],
+        '3' => [$dao->run_count, 'Integer'],
       ]);
       $dao->data = unserialize($dao->data);
       return $dao;