*/
abstract class CRM_Queue_Queue {
+ const DEFAULT_LEASE_TIME = 3600;
+
/**
* @var string
*/
public function __construct($queueSpec) {
$this->_name = $queueSpec['name'];
$this->queueSpec = $queueSpec;
+ unset($this->queueSpec['status']);
+ // Status may be meaningfully + independently toggled (eg when using type=SqlParallel,error=abort).
+ // Retaining a copy of 'status' in here would be misleading.
+ }
+
+ /**
+ * Determine whether this queue is currently active.
+ *
+ * @return bool
+ * TRUE if runners should continue claiming new tasks from this queue
+ * @throws \CRM_Core_Exception
+ */
+ public function isActive(): bool {
+ $status = CRM_Core_DAO::getFieldValue('CRM_Queue_DAO_Queue', $this->_name, 'status', 'name', TRUE);
+ // Note: In the future, we may want to incorporate other data (like maintenance-mode or upgrade-status) in deciding active queues.
+ return ($status === 'active');
+ }
+
+ /**
+ * Change the status of the queue.
+ *
+ * @param string $status
+ * Ex: 'active', 'draft', 'aborted'
+ */
+ public function setStatus(string $status): void {
+ $result = CRM_Core_DAO::executeQuery('UPDATE civicrm_queue SET status = %1 WHERE name = %2', [
+ 1 => [$status, 'String'],
+ 2 => [$this->getName(), 'String'],
+ ]);
+ // If multiple workers try to setStatus('completed') at roughly the same time, only one will fire an event.
+ if ($result->affectedRows() > 0) {
+ CRM_Utils_Hook::queueStatus($this, $status);
+ }
}
/**
/**
* Get the next item.
*
- * @param int $lease_time
- * Seconds.
- *
+ * @param int|null $lease_time
+ * Hold a lease on the claimed item for $X seconds.
+ * If NULL, inherit a default.
* @return object
* with key 'data' that matches the inputted data
*/
- abstract public function claimItem($lease_time = 3600);
+ abstract public function claimItem($lease_time = NULL);
/**
* Get the next item, even if there's an active lease
* @return object
* with key 'data' that matches the inputted data
*/
- abstract public function stealItem($lease_time = 3600);
+ abstract public function stealItem($lease_time = NULL);
/**
* Remove an item from the queue.
*/
abstract public function deleteItem($item);
+ /**
+ * Get the full data for an item.
+ *
+ * This is a passive peek - it does not claim/steal/release anything.
+ *
+ * @param int|string $id
+ * The unique ID of the task within the queue.
+ * @return CRM_Queue_DAO_QueueItem|object|null $dao
+ */
+ abstract public function fetchItem($id);
+
/**
* Return an item that could not be processed.
*