Merge pull request #23764 from eileenmcnaughton/activity_update_mutli
[civicrm-core.git] / CRM / Queue / Queue.php
index a8b98b8b64aef01441a21a8a6ed124ba09758a01..83c27b0f4986726ebfb934cb8c8fb9b03ae12e51 100644 (file)
@@ -20,6 +20,8 @@
  */
 abstract class CRM_Queue_Queue {
 
+  const DEFAULT_LEASE_TIME = 3600;
+
   /**
    * @var string
    */
@@ -44,6 +46,39 @@ abstract class CRM_Queue_Queue {
   public function __construct($queueSpec) {
     $this->_name = $queueSpec['name'];
     $this->queueSpec = $queueSpec;
+    unset($this->queueSpec['status']);
+    // Status may be meaningfully + independently toggled (eg when using type=SqlParallel,error=abort).
+    // Retaining a copy of 'status' in here would be misleading.
+  }
+
+  /**
+   * Determine whether this queue is currently active.
+   *
+   * @return bool
+   *   TRUE if runners should continue claiming new tasks from this queue
+   * @throws \CRM_Core_Exception
+   */
+  public function isActive(): bool {
+    $status = CRM_Core_DAO::getFieldValue('CRM_Queue_DAO_Queue', $this->_name, 'status', 'name', TRUE);
+    // Note: In the future, we may want to incorporate other data (like maintenance-mode or upgrade-status) in deciding active queues.
+    return ($status === 'active');
+  }
+
+  /**
+   * Change the status of the queue.
+   *
+   * @param string $status
+   *   Ex: 'active', 'draft', 'aborted'
+   */
+  public function setStatus(string $status): void {
+    $result = CRM_Core_DAO::executeQuery('UPDATE civicrm_queue SET status = %1 WHERE name = %2', [
+      1 => [$status, 'String'],
+      2 => [$this->getName(), 'String'],
+    ]);
+    // If multiple workers try to setStatus('completed') at roughly the same time, only one will fire an event.
+    if ($result->affectedRows() > 0) {
+      CRM_Utils_Hook::queueStatus($this, $status);
+    }
   }
 
   /**
@@ -108,13 +143,13 @@ abstract class CRM_Queue_Queue {
   /**
    * Get the next item.
    *
-   * @param int $lease_time
-   *   Seconds.
-   *
+   * @param int|null $lease_time
+   *   Hold a lease on the claimed item for $X seconds.
+   *   If NULL, inherit a default.
    * @return object
    *   with key 'data' that matches the inputted data
    */
-  abstract public function claimItem($lease_time = 3600);
+  abstract public function claimItem($lease_time = NULL);
 
   /**
    * Get the next item, even if there's an active lease
@@ -125,7 +160,7 @@ abstract class CRM_Queue_Queue {
    * @return object
    *   with key 'data' that matches the inputted data
    */
-  abstract public function stealItem($lease_time = 3600);
+  abstract public function stealItem($lease_time = NULL);
 
   /**
    * Remove an item from the queue.
@@ -135,6 +170,17 @@ abstract class CRM_Queue_Queue {
    */
   abstract public function deleteItem($item);
 
+  /**
+   * Get the full data for an item.
+   *
+   * This is a passive peek - it does not claim/steal/release anything.
+   *
+   * @param int|string $id
+   *   The unique ID of the task within the queue.
+   * @return CRM_Queue_DAO_QueueItem|object|null $dao
+   */
+  abstract public function fetchItem($id);
+
   /**
    * Return an item that could not be processed.
    *