Replace incorrect class comment
[civicrm-core.git] / CRM / Queue / Service.php
index 72bf45574ef7bdc64e288d76be42736d421a155b..2a9815ecfcf396a9e8931d126a115c92d3d2d667 100644 (file)
@@ -37,6 +37,14 @@ class CRM_Queue_Service {
 
   protected static $_singleton;
 
+  /**
+   * List of fields which are shared by `$queueSpec` and `civicrm_queue`.
+   *
+   * @var string[]
+   * @readonly
+   */
+  private static $commonFields = ['name', 'type', 'runner', 'batch_limit', 'lease_time', 'retry_limit', 'retry_interval'];
+
   /**
    * FIXME: Singleton pattern should be removed when dependency-injection
    * becomes available.
@@ -79,7 +87,13 @@ class CRM_Queue_Service {
    *   - reset: bool, optional; if a queue is found, then it should be
    *     flushed; default to TRUE
    *   - (additional keys depending on the queue provider).
-   *
+   *   - is_persistent: bool, optional; if true, then this queue is loaded from `civicrm_queue` list
+   *   - runner: string, optional; if given, then items in this queue can run
+   *     automatically via `hook_civicrm_queueRun_{$runner}`
+   *   - batch_limit: int, Maximum number of items in a batch.
+   *   - lease_time: int, When claiming an item (or batch of items) for work, how long should the item(s) be reserved. (Seconds)
+   *   - retry_limit: int, Number of permitted retries. Set to zero (0) to disable.
+   *   - retry_interval: int, Number of seconds to wait before retrying a failed execution.
    * @return CRM_Queue_Queue
    */
   public function create($queueSpec) {
@@ -87,6 +101,9 @@ class CRM_Queue_Service {
       return $this->queues[$queueSpec['name']];
     }
 
+    if (!empty($queueSpec['is_persistent'])) {
+      $queueSpec = $this->findCreateQueueSpec($queueSpec);
+    }
     $queue = $this->instantiateQueueObject($queueSpec);
     $exists = $queue->existsQueue();
     if (!$exists) {
@@ -103,6 +120,47 @@ class CRM_Queue_Service {
     return $queue;
   }
 
+  /**
+   * Find/create the queue-spec. Specifically:
+   *
+   * - If there is a stored queue, use its spec.
+   * - If there is no stored queue, and if we have enough information, then create queue.
+   *
+   * @param array $queueSpec
+   * @return array
+   *   Updated queueSpec.
+   * @throws \CRM_Core_Exception
+   */
+  protected function findCreateQueueSpec(array $queueSpec): array {
+    $loaded = $this->findQueueSpec($queueSpec);
+    if ($loaded !== NULL) {
+      return $loaded;
+    }
+
+    if (empty($queueSpec['type'])) {
+      throw new \CRM_Core_Exception(sprintf('Failed to find or create persistent queue "%s". Missing field "%s".',
+        $queueSpec['name'], 'type'));
+    }
+
+    $dao = new CRM_Queue_DAO_Queue();
+    $dao->name = $queueSpec['name'];
+    $dao->copyValues($queueSpec);
+    $dao->insert();
+
+    return $this->findQueueSpec($queueSpec);
+  }
+
+  protected function findQueueSpec(array $queueSpec): ?array {
+    $dao = new CRM_Queue_DAO_Queue();
+    $dao->name = $queueSpec['name'];
+    if ($dao->find(TRUE)) {
+      return array_merge($queueSpec, CRM_Utils_Array::subset($dao->toArray(), static::$commonFields));
+    }
+    else {
+      return NULL;
+    }
+  }
+
   /**
    * Look up an existing queue.
    *
@@ -111,6 +169,7 @@ class CRM_Queue_Service {
    *   - type: string, required, e.g. `Sql`, `SqlParallel`, `Memory`
    *   - name: string, required, e.g. "upgrade-tasks"
    *   - (additional keys depending on the queue provider).
+   *   - is_persistent: bool, optional; if true, then this queue is loaded from `civicrm_queue` list
    *
    * @return CRM_Queue_Queue
    */
@@ -118,6 +177,9 @@ class CRM_Queue_Service {
     if (is_object($this->queues[$queueSpec['name']] ?? NULL)) {
       return $this->queues[$queueSpec['name']];
     }
+    if (!empty($queueSpec['is_persistent'])) {
+      $queueSpec = $this->findCreateQueueSpec($queueSpec);
+    }
     $queue = $this->instantiateQueueObject($queueSpec);
     $queue->loadQueue();
     $this->queues[$queueSpec['name']] = $queue;