CRM_Queue_Service - Add support for persistent queues
authorTim Otten <totten@civicrm.org>
Sat, 29 Jan 2022 06:17:45 +0000 (22:17 -0800)
committerTim Otten <totten@civicrm.org>
Tue, 1 Feb 2022 01:15:17 +0000 (17:15 -0800)
Before:

* `CRM_Queue_Service::create()` traditionally provided support for
  instantiating queue objects.
* `civicrm_queue` was recently added for tracking persistent
  metadata about queues.

After:

* `CRM_Queue_Service::create()` works as before, and it also accepts
   parameters `is_persistent` and `is_autorun`.

CRM/Queue/Service.php

index 72bf45574ef7bdc64e288d76be42736d421a155b..576f76cdf2daede1fc8fad8960e9b9aef40251c7 100644 (file)
@@ -79,6 +79,9 @@ class CRM_Queue_Service {
    *   - reset: bool, optional; if a queue is found, then it should be
    *     flushed; default to TRUE
    *   - (additional keys depending on the queue provider).
+   *   - is_persistent: bool, optional; if true, then this queue is loaded from `civicrm_queue` list
+   *   - is_autorun: bool, optional; if true, then this queue will be auto-scanned
+   *     by background task-runners
    *
    * @return CRM_Queue_Queue
    */
@@ -87,6 +90,9 @@ class CRM_Queue_Service {
       return $this->queues[$queueSpec['name']];
     }
 
+    if (!empty($queueSpec['is_persistent'])) {
+      $queueSpec = $this->findCreateQueueSpec($queueSpec);
+    }
     $queue = $this->instantiateQueueObject($queueSpec);
     $exists = $queue->existsQueue();
     if (!$exists) {
@@ -103,6 +109,36 @@ class CRM_Queue_Service {
     return $queue;
   }
 
+  /**
+   * Find/create the queue-spec. Specifically:
+   *
+   * - If there is a stored queue, use its spec.
+   * - If there is no stored queue, and if we have enough information, then create queue.
+   *
+   * @param array $queueSpec
+   * @return array
+   *   Updated queueSpec.
+   * @throws \CRM_Core_Exception
+   */
+  protected function findCreateQueueSpec(array $queueSpec): array {
+    $storageFields = ['type', 'is_autorun'];
+    $dao = new CRM_Queue_DAO_Queue();
+    $dao->name = $queueSpec['name'];
+    if ($dao->find(TRUE)) {
+      return array_merge($queueSpec, CRM_Utils_Array::subset($dao->toArray(), $storageFields));
+    }
+
+    if (empty($queueSpec['type'])) {
+      throw new \CRM_Core_Exception(sprintf('Failed to find or create persistent queue "%s". Missing field "%s".',
+        $queueSpec['name'], 'type'));
+    }
+    $queueSpec = array_merge(['is_autorun' => FALSE], $queueSpec);
+    $dao->copyValues($queueSpec);
+    $dao->insert();
+
+    return $queueSpec;
+  }
+
   /**
    * Look up an existing queue.
    *
@@ -111,6 +147,7 @@ class CRM_Queue_Service {
    *   - type: string, required, e.g. `Sql`, `SqlParallel`, `Memory`
    *   - name: string, required, e.g. "upgrade-tasks"
    *   - (additional keys depending on the queue provider).
+   *   - is_persistent: bool, optional; if true, then this queue is loaded from `civicrm_queue` list
    *
    * @return CRM_Queue_Queue
    */
@@ -118,6 +155,9 @@ class CRM_Queue_Service {
     if (is_object($this->queues[$queueSpec['name']] ?? NULL)) {
       return $this->queues[$queueSpec['name']];
     }
+    if (!empty($queueSpec['is_persistent'])) {
+      $queueSpec = $this->findCreateQueueSpec($queueSpec);
+    }
     $queue = $this->instantiateQueueObject($queueSpec);
     $queue->loadQueue();
     $this->queues[$queueSpec['name']] = $queue;