Merge pull request #23776 from eileenmcnaughton/electro
[civicrm-core.git] / CRM / Queue / Service.php
index 119c76efe16a91c8ffe8fc29a6513392a65aea91..338c46eac253919d62c41741a960cd0327985394 100644 (file)
@@ -37,6 +37,14 @@ class CRM_Queue_Service {
 
   protected static $_singleton;
 
+  /**
+   * List of fields which are shared by `$queueSpec` and `civicrm_queue`.
+   *
+   * @var string[]
+   * @readonly
+   */
+  private static $commonFields = ['name', 'type', 'runner', 'status', 'error', 'batch_limit', 'lease_time', 'retry_limit', 'retry_interval'];
+
   /**
    * FIXME: Singleton pattern should be removed when dependency-injection
    * becomes available.
@@ -74,13 +82,22 @@ class CRM_Queue_Service {
    *
    * @param array $queueSpec
    *   Array with keys:
-   *   - type: string, required, e.g. "interactive", "immediate", "stomp",
-   *    "beanstalk"
+   *   - type: string, required, e.g. `Sql`, `SqlParallel`, `Memory`
    *   - name: string, required, e.g. "upgrade-tasks"
    *   - reset: bool, optional; if a queue is found, then it should be
    *     flushed; default to TRUE
    *   - (additional keys depending on the queue provider).
-   *
+   *   - is_persistent: bool, optional; if true, then this queue is loaded from `civicrm_queue` list
+   *   - runner: string, optional; if given, then items in this queue can run
+   *     automatically via `hook_civicrm_queueRun_{$runner}`
+   *   - status: string, required for runnable-queues; specify whether the runner is currently active
+   *     ex: 'active', 'draft', 'completed'
+   *   - error: string, required for runnable-queues; specify what to do with unhandled errors
+   *     ex: "drop" or "abort"
+   *   - batch_limit: int, Maximum number of items in a batch.
+   *   - lease_time: int, When claiming an item (or batch of items) for work, how long should the item(s) be reserved. (Seconds)
+   *   - retry_limit: int, Number of permitted retries. Set to zero (0) to disable.
+   *   - retry_interval: int, Number of seconds to wait before retrying a failed execution.
    * @return CRM_Queue_Queue
    */
   public function create($queueSpec) {
@@ -88,6 +105,10 @@ class CRM_Queue_Service {
       return $this->queues[$queueSpec['name']];
     }
 
+    if (!empty($queueSpec['is_persistent'])) {
+      $queueSpec = $this->findCreateQueueSpec($queueSpec);
+    }
+    $this->validateQueueSpec($queueSpec);
     $queue = $this->instantiateQueueObject($queueSpec);
     $exists = $queue->existsQueue();
     if (!$exists) {
@@ -104,15 +125,59 @@ class CRM_Queue_Service {
     return $queue;
   }
 
+  /**
+   * Find/create the queue-spec. Specifically:
+   *
+   * - If there is a stored queue, use its spec.
+   * - If there is no stored queue, and if we have enough information, then create queue.
+   *
+   * @param array $queueSpec
+   * @return array
+   *   Updated queueSpec.
+   * @throws \CRM_Core_Exception
+   */
+  protected function findCreateQueueSpec(array $queueSpec): array {
+    $loaded = $this->findQueueSpec($queueSpec);
+    if ($loaded !== NULL) {
+      return $loaded;
+    }
+
+    if (isset($queueSpec['template'])) {
+      $base = $this->findQueueSpec(['name' => $queueSpec['template']]);
+      $reset = ['is_template' => 0];
+      $queueSpec = array_merge($base, $reset, $queueSpec);
+    }
+
+    $this->validateQueueSpec($queueSpec);
+
+    $dao = new CRM_Queue_DAO_Queue();
+    $dao->name = $queueSpec['name'];
+    $dao->copyValues($queueSpec);
+    $dao->insert();
+
+    return $this->findQueueSpec($queueSpec);
+  }
+
+  protected function findQueueSpec(array $queueSpec): ?array {
+    $dao = new CRM_Queue_DAO_Queue();
+    $dao->name = $queueSpec['name'];
+    if ($dao->find(TRUE)) {
+      return array_merge($queueSpec, CRM_Utils_Array::subset($dao->toArray(), static::$commonFields));
+    }
+    else {
+      return NULL;
+    }
+  }
+
   /**
    * Look up an existing queue.
    *
    * @param array $queueSpec
    *   Array with keys:
-   *   - type: string, required, e.g. "interactive", "immediate", "stomp",
-   *     "beanstalk"
+   *   - type: string, required, e.g. `Sql`, `SqlParallel`, `Memory`
    *   - name: string, required, e.g. "upgrade-tasks"
    *   - (additional keys depending on the queue provider).
+   *   - is_persistent: bool, optional; if true, then this queue is loaded from `civicrm_queue` list
    *
    * @return CRM_Queue_Queue
    */
@@ -120,6 +185,9 @@ class CRM_Queue_Service {
     if (is_object($this->queues[$queueSpec['name']] ?? NULL)) {
       return $this->queues[$queueSpec['name']];
     }
+    if (!empty($queueSpec['is_persistent'])) {
+      $queueSpec = $this->findCreateQueueSpec($queueSpec);
+    }
     $queue = $this->instantiateQueueObject($queueSpec);
     $queue->loadQueue();
     $this->queues[$queueSpec['name']] = $queue;
@@ -130,8 +198,7 @@ class CRM_Queue_Service {
    * Convert a queue "type" name to a class name.
    *
    * @param string $type
-   *   E.g. "interactive", "immediate", "stomp", "beanstalk".
-   *
+   *   - type: string, required, e.g. `Sql`, `SqlParallel`, `Memory`
    * @return string
    *   Class-name
    */
@@ -158,4 +225,38 @@ class CRM_Queue_Service {
     return $class->newInstance($queueSpec);
   }
 
+  /**
+   * Assert that the queueSpec is well-formed.
+   *
+   * @param array $queueSpec
+   * @throws \CRM_Core_Exception
+   */
+  public function validateQueueSpec(array $queueSpec): void {
+    $throw = function(string $message, ...$args) use ($queueSpec) {
+      $prefix = sprintf('Failed to create queue "%s". ', $queueSpec['name']);
+      throw new CRM_Core_Exception($prefix . sprintf($message, ...$args));
+    };
+
+    if (empty($queueSpec['type'])) {
+      $throw('Missing field "type".');
+    }
+
+    // The rest of the validations only apply to persistent, runnable queues.
+    if (empty($queueSpec['is_persistent']) || empty($queueSpec['runner'])) {
+      return;
+    }
+
+    $statuses = CRM_Queue_BAO_Queue::getStatuses();
+    $status = $queueSpec['status'] ?? NULL;
+    if (!isset($statuses[$status])) {
+      $throw('Invalid queue status "%s".', $status);
+    }
+
+    $errorModes = CRM_Queue_BAO_Queue::getErrorModes();
+    $errorMode = $queueSpec['error'] ?? NULL;
+    if ($queueSpec['runner'] === 'task' && !isset($errorModes[$errorMode])) {
+      $throw('Invalid error mode "%s".', $errorMode);
+    }
+  }
+
 }