SqlParallel - Implement BatchQueueInterface
[civicrm-core.git] / CRM / Queue / Queue / SqlParallel.php
index 9eb12ac883dba0ab11b8adea2f10368cff433dbb..8573b7b99883b9aad03cfe0e3ccfdf6457a27a14 100644 (file)
@@ -12,7 +12,9 @@
 /**
  * A queue implementation which stores items in the CiviCRM SQL database
  */
-class CRM_Queue_Queue_SqlParallel extends CRM_Queue_Queue {
+class CRM_Queue_Queue_SqlParallel extends CRM_Queue_Queue implements CRM_Queue_Queue_BatchQueueInterface {
+
+  use CRM_Queue_Queue_SqlTrait;
 
   /**
    * Create a reference to queue. After constructing the queue, one should
@@ -33,95 +35,32 @@ class CRM_Queue_Queue_SqlParallel extends CRM_Queue_Queue {
   }
 
   /**
-   * Perform any registation or resource-allocation for a new queue
-   */
-  public function createQueue() {
-    // nothing to do -- just start CRUDing items in the appropriate table
-  }
-
-  /**
-   * Perform any loading or pre-fetch for an existing queue.
-   */
-  public function loadQueue() {
-    // nothing to do -- just start CRUDing items in the appropriate table
-  }
-
-  /**
-   * Release any resources claimed by the queue (memory, DB rows, etc)
-   */
-  public function deleteQueue() {
-    return CRM_Core_DAO::singleValueQuery("
-      DELETE FROM civicrm_queue_item
-      WHERE queue_name = %1
-    ", [
-      1 => [$this->getName(), 'String'],
-    ]);
-  }
-
-  /**
-   * Check if the queue exists.
-   *
-   * @return bool
-   */
-  public function existsQueue() {
-    return ($this->numberOfItems() > 0);
-  }
-
-  /**
-   * Add a new item to the queue.
-   *
-   * @param mixed $data
-   *   Serializable PHP object or array.
-   * @param array $options
-   *   Queue-dependent options; for example, if this is a
-   *   priority-queue, then $options might specify the item's priority.
-   */
-  public function createItem($data, $options = []) {
-    $dao = new CRM_Queue_DAO_QueueItem();
-    $dao->queue_name = $this->getName();
-    $dao->submit_time = CRM_Utils_Time::getTime('YmdHis');
-    $dao->data = serialize($data);
-    $dao->weight = CRM_Utils_Array::value('weight', $options, 0);
-    $dao->save();
-  }
-
-  /**
-   * Determine number of items remaining in the queue.
-   *
-   * @return int
+   * @inheritDoc
    */
-  public function numberOfItems() {
-    return CRM_Core_DAO::singleValueQuery("
-      SELECT count(*)
-      FROM civicrm_queue_item
-      WHERE queue_name = %1
-    ", [
-      1 => [$this->getName(), 'String'],
-    ]);
+  public function claimItem($lease_time = NULL) {
+    $items = $this->claimItems(1, $lease_time);
+    return $items[0] ?? NULL;
   }
 
   /**
-   * Get the next item.
-   *
-   * @param int $lease_time
-   *   Seconds.
-   *
-   * @return object
-   *   With key 'data' that matches the inputted data.
+   * @inheritDoc
    */
-  public function claimItem($lease_time = 3600) {
+  public function claimItems(int $limit, ?int $lease_time = NULL): array {
+    $lease_time = $lease_time ?: $this->getSpec('lease_time') ?: static::DEFAULT_LEASE_TIME;
+    $limit = $this->getSpec('batch_limit') ? min($limit, $this->getSpec('batch_limit')) : $limit;
 
-    $result = NULL;
     $dao = CRM_Core_DAO::executeQuery('LOCK TABLES civicrm_queue_item WRITE;');
-    $sql = "SELECT id, queue_name, submit_time, release_time, data
+    $sql = "SELECT id, queue_name, submit_time, release_time, run_count, data
         FROM civicrm_queue_item
         WHERE queue_name = %1
-              AND release_time IS NULL
+              AND (release_time IS NULL OR release_time < %2)
         ORDER BY weight ASC, id ASC
-        LIMIT 1
+        LIMIT %3
       ";
     $params = [
       1 => [$this->getName(), 'String'],
+      2 => [CRM_Utils_Time::getTime(), 'Timestamp'],
+      3 => [$limit, 'Integer'],
     ];
     $dao = CRM_Core_DAO::executeQuery($sql, $params, TRUE, 'CRM_Queue_DAO_QueueItem');
     if (is_a($dao, 'DB_Error')) {
@@ -129,19 +68,22 @@ class CRM_Queue_Queue_SqlParallel extends CRM_Queue_Queue {
       CRM_Core_Error::fatal();
     }
 
-    if ($dao->fetch()) {
+    $result = [];
+    while ($dao->fetch()) {
+      $result[] = (object) [
+        'id' => $dao->id,
+        'data' => unserialize($dao->data),
+        'queue_name' => $dao->queue_name,
+        'run_count' => 1 + (int) $dao->run_count,
+      ];
+    }
+    if ($result) {
       $nowEpoch = CRM_Utils_Time::getTimeRaw();
-      CRM_Core_DAO::executeQuery("UPDATE civicrm_queue_item SET release_time = %1 WHERE id = %2", [
-        '1' => [date('YmdHis', $nowEpoch + $lease_time), 'String'],
-        '2' => [$dao->id, 'Integer'],
+      $sql = CRM_Utils_SQL::interpolate('UPDATE civicrm_queue_item SET release_time = @RT, run_count = 1+run_count WHERE id IN (#ids)', [
+        'RT' => date('YmdHis', $nowEpoch + $lease_time),
+        'ids' => CRM_Utils_Array::collect('id', $result),
       ]);
-      // (Comment by artfulrobot Sep 2019: Not sure what the below comment means, should be removed/clarified?)
-      // work-around: inconsistent date-formatting causes unintentional breakage
-      #        $dao->submit_time = date('YmdHis', strtotime($dao->submit_time));
-      #        $dao->release_time = date('YmdHis', $nowEpoch + $lease_time);
-      #        $dao->save();
-      $dao->data = unserialize($dao->data);
-      $result = $dao;
+      CRM_Core_DAO::executeQuery($sql);
     }
 
     $dao = CRM_Core_DAO::executeQuery('UNLOCK TABLES;');
@@ -152,15 +94,17 @@ class CRM_Queue_Queue_SqlParallel extends CRM_Queue_Queue {
   /**
    * Get the next item, even if there's an active lease
    *
-   * @param int $lease_time
-   *   Seconds.
-   *
+   * @param int|null $lease_time
+   *   Hold a lease on the claimed item for $X seconds.
+   *   If NULL, inherit a queue default (`$queueSpec['lease_time']`) or system default (`DEFAULT_LEASE_TIME`).
    * @return object
    *   With key 'data' that matches the inputted data.
    */
-  public function stealItem($lease_time = 3600) {
+  public function stealItem($lease_time = NULL) {
+    $lease_time = $lease_time ?: $this->getSpec('lease_time') ?: static::DEFAULT_LEASE_TIME;
+
     $sql = "
-      SELECT id, queue_name, submit_time, release_time, data
+      SELECT id, queue_name, submit_time, release_time, run_count, data
       FROM civicrm_queue_item
       WHERE queue_name = %1
       ORDER BY weight ASC, id ASC
@@ -172,39 +116,15 @@ class CRM_Queue_Queue_SqlParallel extends CRM_Queue_Queue {
     $dao = CRM_Core_DAO::executeQuery($sql, $params, TRUE, 'CRM_Queue_DAO_QueueItem');
     if ($dao->fetch()) {
       $nowEpoch = CRM_Utils_Time::getTimeRaw();
-      CRM_Core_DAO::executeQuery("UPDATE civicrm_queue_item SET release_time = %1 WHERE id = %2", [
+      $dao->run_count++;
+      CRM_Core_DAO::executeQuery("UPDATE civicrm_queue_item SET release_time = %1, run_count = %3 WHERE id = %2", [
         '1' => [date('YmdHis', $nowEpoch + $lease_time), 'String'],
         '2' => [$dao->id, 'Integer'],
+        '3' => [$dao->run_count, 'Integer'],
       ]);
       $dao->data = unserialize($dao->data);
       return $dao;
     }
   }
 
-  /**
-   * Remove an item from the queue.
-   *
-   * @param CRM_Core_DAO $dao
-   *   The item returned by claimItem.
-   */
-  public function deleteItem($dao) {
-    $dao->delete();
-    $dao->free();
-  }
-
-  /**
-   * Return an item that could not be processed.
-   *
-   * @param CRM_Core_DAO $dao
-   *   The item returned by claimItem.
-   */
-  public function releaseItem($dao) {
-    $sql = "UPDATE civicrm_queue_item SET release_time = NULL WHERE id = %1";
-    $params = [
-      1 => [$dao->id, 'Integer'],
-    ];
-    CRM_Core_DAO::executeQuery($sql, $params);
-    $dao->free();
-  }
-
 }