* - reset: bool, optional; if a queue is found, then it should be
* flushed; default to TRUE
* - (additional keys depending on the queue provider).
+ * - is_persistent: bool, optional; if true, then this queue is loaded from `civicrm_queue` list
+ * - is_autorun: bool, optional; if true, then this queue will be auto-scanned
+ * by background task-runners
*
* @return CRM_Queue_Queue
*/
return $this->queues[$queueSpec['name']];
}
+ if (!empty($queueSpec['is_persistent'])) {
+ $queueSpec = $this->findCreateQueueSpec($queueSpec);
+ }
$queue = $this->instantiateQueueObject($queueSpec);
$exists = $queue->existsQueue();
if (!$exists) {
return $queue;
}
+ /**
+ * Find/create the queue-spec. Specifically:
+ *
+ * - If there is a stored queue, use its spec.
+ * - If there is no stored queue, and if we have enough information, then create queue.
+ *
+ * @param array $queueSpec
+ * @return array
+ * Updated queueSpec.
+ * @throws \CRM_Core_Exception
+ */
+ protected function findCreateQueueSpec(array $queueSpec): array {
+ $storageFields = ['type', 'is_autorun'];
+ $dao = new CRM_Queue_DAO_Queue();
+ $dao->name = $queueSpec['name'];
+ if ($dao->find(TRUE)) {
+ return array_merge($queueSpec, CRM_Utils_Array::subset($dao->toArray(), $storageFields));
+ }
+
+ if (empty($queueSpec['type'])) {
+ throw new \CRM_Core_Exception(sprintf('Failed to find or create persistent queue "%s". Missing field "%s".',
+ $queueSpec['name'], 'type'));
+ }
+ $queueSpec = array_merge(['is_autorun' => FALSE], $queueSpec);
+ $dao->copyValues($queueSpec);
+ $dao->insert();
+
+ return $queueSpec;
+ }
+
/**
* Look up an existing queue.
*
* - type: string, required, e.g. `Sql`, `SqlParallel`, `Memory`
* - name: string, required, e.g. "upgrade-tasks"
* - (additional keys depending on the queue provider).
+ * - is_persistent: bool, optional; if true, then this queue is loaded from `civicrm_queue` list
*
* @return CRM_Queue_Queue
*/
if (is_object($this->queues[$queueSpec['name']] ?? NULL)) {
return $this->queues[$queueSpec['name']];
}
+ if (!empty($queueSpec['is_persistent'])) {
+ $queueSpec = $this->findCreateQueueSpec($queueSpec);
+ }
$queue = $this->instantiateQueueObject($queueSpec);
$queue->loadQueue();
$this->queues[$queueSpec['name']] = $queue;