-/* Put the chain of addrs on the defer list. Retry will happen
-on the next queue run, earlier if triggered by a new message.
-Loop for the next set of addresses. */
+/* Check transport for the given concurrency limit. Return TRUE if over
+the limit (or an expansion failure), else FALSE and if there was a limit,
+the key for the hints database used for the concurrency count. */
-static void
-deferlist_chain(address_item * addr)
+static BOOL
+tpt_parallel_check(transport_instance * tp, address_item * addr, uschar ** key)
{
-address_item * next;
-for (next = addr; next->next; next = next->next) ;
-next->next = addr_defer;
-addr_defer = addr;
+unsigned max_parallel;
+
+if (!tp->max_parallel) return FALSE;
+
+max_parallel = (unsigned) expand_string_integer(tp->max_parallel, TRUE);
+if (expand_string_message)
+ {
+ log_write(0, LOG_MAIN|LOG_PANIC, "Failed to expand max_parallel option "
+ "in %s transport (%s): %s", tp->name, addr->address,
+ expand_string_message);
+ return TRUE;
+ }
+
+if (max_parallel > 0)
+ {
+ uschar * serialize_key = string_sprintf("tpt-serialize-%s", tp->name);
+ if (!enq_start(serialize_key, max_parallel))
+ {
+ address_item * next;
+ DEBUG(D_transport)
+ debug_printf("skipping tpt %s because concurrency limit %u reached\n",
+ tp->name, max_parallel);
+ do
+ {
+ next = addr->next;
+ addr->message = US"concurrency limit reached for transport";
+ addr->basic_errno = ERRNO_TRETRY;
+ post_process_one(addr, DEFER, LOG_MAIN, DTYPE_TRANSPORT, 0);
+ } while ((addr = next));
+ return TRUE;
+ }
+ *key = serialize_key;
+ }
+return FALSE;
}
We use a hints DB entry, incremented here and decremented after
the transport (and any shadow transport) completes. */
- if (tp->max_parallel)
+ if (tpt_parallel_check(tp, addr, &serialize_key))
{
- int_eximarith_t max_parallel =
- expand_string_integer(tp->max_parallel, TRUE);
if (expand_string_message)
{
logflags |= LOG_PANIC;
- log_write(0, LOG_MAIN|LOG_PANIC, "Failed to expand max_parallel option "
- "in %s transport (%s): %s", tp->name, addr->address,
- expand_string_message);
- for (addr2 = addr->next; addr; addr = addr2, addr2 = addr2->next)
+ do
+ {
+ addr = addr->next;
post_process_one(addr, DEFER, logflags, DTYPE_TRANSPORT, 0);
- continue;
- }
- if ( max_parallel > 0
- && !enq_start(
- serialize_key = string_sprintf("tpt-serialize-%s", tp->name),
- (unsigned) max_parallel)
- )
- {
- DEBUG(D_transport)
- debug_printf("skipping tpt %s because parallelism limit %u reached\n",
- tp->name, (unsigned) max_parallel);
-
- deferlist_chain(addr);
- continue;
+ } while ((addr = addr2));
}
+ continue; /* Loop for the next set of addresses. */
}
The hints DB entry is decremented in par_reduce(), when we reap the
transport process. */
- if (tp->max_parallel)
- {
- int_eximarith_t max_parallel =
- expand_string_integer(tp->max_parallel, TRUE);
- if (expand_string_message)
- {
- panicmsg = expand_string_message;
+ if (tpt_parallel_check(tp, addr, &serialize_key))
+ if ((panicmsg = expand_string_message))
goto panic_continue;
- }
- if ( max_parallel > 0
- && !enq_start(
- serialize_key = string_sprintf("tpt-serialize-%s", tp->name),
- (unsigned) max_parallel)
- )
- {
- DEBUG(D_transport)
- debug_printf("skipping tpt %s because parallelism limit %u reached\n",
- tp->name, (unsigned) max_parallel);
-
- deferlist_chain(addr);
- continue;
- }
- }
+ else
+ continue; /* Loop for the next set of addresses. */
/* Set up the expansion variables for this set of addresses */
}
else
- deferlist_chain(addr);
+ {
+ while (next->next) next = next->next;
+ next->next = addr_defer;
+ addr_defer = addr;
+ }
continue;
}
so just queue them all. */
if (queue_run_local)
- {
while (addr_remote)
{
address_item *addr = addr_remote;
addr->message = US"remote deliveries suppressed";
(void)post_process_one(addr, DEFER, LOG_MAIN, DTYPE_TRANSPORT, 0);
}
- }
/* Handle remote deliveries */
1999-03-02 09:44:33 10HmaX-0005vi-00 <= CALLER@test.ex U=CALLER P=local S=sss for a b c
1999-03-02 09:44:33 exim x.yz daemon started: pid=pppp, no queue runs, listening for SMTP on port 1225
1999-03-02 09:44:33 Start queue run: pid=pppp
+1999-03-02 09:44:33 10HmaX-0005vi-00 == c@test.ex R=rmt_client T=smtp defer (-56): concurrency limit reached for transport
1999-03-02 09:44:33 10HmaY-0005vi-00 <= CALLER@test.ex H=localhost (myhost.test.ex) [127.0.0.1] P=esmtp S=sss id=E10HmaX-0005vi-00@myhost.test.ex for a@test.ex
1999-03-02 09:44:33 10HmaY-0005vi-00 => :blackhole: <a@test.ex> R=server
1999-03-02 09:44:33 10HmaY-0005vi-00 Completed
1999-03-02 09:44:33 End queue run: pid=pppp
1999-03-02 09:44:33 10HmbB-0005vi-00 <= CALLER@test.ex U=CALLER P=local S=sss for y
1999-03-02 09:44:33 10HmbC-0005vi-00 <= CALLER@test.ex U=CALLER P=local S=sss for z
+1999-03-02 09:44:33 10HmbC-0005vi-00 == z@test.ex R=lcl_client T=pipe defer (-56): concurrency limit reached for transport
1999-03-02 09:44:33 10HmbB-0005vi-00 => y <y@test.ex> R=lcl_client T=pipe
1999-03-02 09:44:33 10HmbB-0005vi-00 Completed
1999-03-02 09:44:33 Start queue run: pid=pppp