protected static $_singleton;
+ /**
+ * List of fields which are shared by `$queueSpec` and `civicrm_queue`.
+ *
+ * @var string[]
+ * @readonly
+ */
+ private static $commonFields = ['name', 'type', 'runner', 'status', 'error', 'batch_limit', 'lease_time', 'retry_limit', 'retry_interval'];
+
/**
* FIXME: Singleton pattern should be removed when dependency-injection
* becomes available.
* flushed; default to TRUE
* - (additional keys depending on the queue provider).
* - is_persistent: bool, optional; if true, then this queue is loaded from `civicrm_queue` list
- * - is_autorun: bool, optional; if true, then this queue will be auto-scanned
- * by background task-runners
- *
+ * - runner: string, optional; if given, then items in this queue can run
+ * automatically via `hook_civicrm_queueRun_{$runner}`
+ * - status: string, required for runnable-queues; specify whether the runner is currently active
+ * ex: 'active', 'draft', 'completed'
+ * - error: string, required for runnable-queues; specify what to do with unhandled errors
+ * ex: "drop" or "abort"
+ * - batch_limit: int, Maximum number of items in a batch.
+ * - lease_time: int, When claiming an item (or batch of items) for work, how long should the item(s) be reserved. (Seconds)
+ * - retry_limit: int, Number of permitted retries. Set to zero (0) to disable.
+ * - retry_interval: int, Number of seconds to wait before retrying a failed execution.
* @return CRM_Queue_Queue
*/
public function create($queueSpec) {
if (!empty($queueSpec['is_persistent'])) {
$queueSpec = $this->findCreateQueueSpec($queueSpec);
}
+ $this->validateQueueSpec($queueSpec);
$queue = $this->instantiateQueueObject($queueSpec);
$exists = $queue->existsQueue();
if (!$exists) {
* @throws \CRM_Core_Exception
*/
protected function findCreateQueueSpec(array $queueSpec): array {
- $storageFields = ['type', 'is_autorun'];
- $dao = new CRM_Queue_DAO_Queue();
- $dao->name = $queueSpec['name'];
- if ($dao->find(TRUE)) {
- return array_merge($queueSpec, CRM_Utils_Array::subset($dao->toArray(), $storageFields));
+ $loaded = $this->findQueueSpec($queueSpec);
+ if ($loaded !== NULL) {
+ return $loaded;
}
- if (empty($queueSpec['type'])) {
- throw new \CRM_Core_Exception(sprintf('Failed to find or create persistent queue "%s". Missing field "%s".',
- $queueSpec['name'], 'type'));
+ if (isset($queueSpec['template'])) {
+ $base = $this->findQueueSpec(['name' => $queueSpec['template']]);
+ $reset = ['is_template' => 0];
+ $queueSpec = array_merge($base, $reset, $queueSpec);
}
- $queueSpec = array_merge(['is_autorun' => FALSE], $queueSpec);
+
+ $this->validateQueueSpec($queueSpec);
+
+ $dao = new CRM_Queue_DAO_Queue();
+ $dao->name = $queueSpec['name'];
$dao->copyValues($queueSpec);
$dao->insert();
- return $queueSpec;
+ return $this->findQueueSpec($queueSpec);
+ }
+
+ protected function findQueueSpec(array $queueSpec): ?array {
+ $dao = new CRM_Queue_DAO_Queue();
+ $dao->name = $queueSpec['name'];
+ if ($dao->find(TRUE)) {
+ return array_merge($queueSpec, CRM_Utils_Array::subset($dao->toArray(), static::$commonFields));
+ }
+ else {
+ return NULL;
+ }
}
/**
return $class->newInstance($queueSpec);
}
+ /**
+ * Assert that the queueSpec is well-formed.
+ *
+ * @param array $queueSpec
+ * @throws \CRM_Core_Exception
+ */
+ public function validateQueueSpec(array $queueSpec): void {
+ $throw = function(string $message, ...$args) use ($queueSpec) {
+ $prefix = sprintf('Failed to create queue "%s". ', $queueSpec['name']);
+ throw new CRM_Core_Exception($prefix . sprintf($message, ...$args));
+ };
+
+ if (empty($queueSpec['type'])) {
+ $throw('Missing field "type".');
+ }
+
+ // The rest of the validations only apply to persistent, runnable queues.
+ if (empty($queueSpec['is_persistent']) || empty($queueSpec['runner'])) {
+ return;
+ }
+
+ $statuses = CRM_Queue_BAO_Queue::getStatuses();
+ $status = $queueSpec['status'] ?? NULL;
+ if (!isset($statuses[$status])) {
+ $throw('Invalid queue status "%s".', $status);
+ }
+
+ $errorModes = CRM_Queue_BAO_Queue::getErrorModes();
+ $errorMode = $queueSpec['error'] ?? NULL;
+ if ($queueSpec['runner'] === 'task' && !isset($errorModes[$errorMode])) {
+ $throw('Invalid error mode "%s".', $errorMode);
+ }
+ }
+
}