3 +--------------------------------------------------------------------+
4 | CiviCRM version 4.3 |
5 +--------------------------------------------------------------------+
6 | Copyright CiviCRM LLC (c) 2004-2013 |
7 +--------------------------------------------------------------------+
8 | This file is a part of CiviCRM. |
10 | CiviCRM is free software; you can copy, modify, and distribute it |
11 | under the terms of the GNU Affero General Public License |
12 | Version 3, 19 November 2007 and the CiviCRM Licensing Exception. |
14 | CiviCRM is distributed in the hope that it will be useful, but |
15 | WITHOUT ANY WARRANTY; without even the implied warranty of |
16 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. |
17 | See the GNU Affero General Public License for more details. |
19 | You should have received a copy of the GNU Affero General Public |
20 | License and the CiviCRM Licensing Exception along |
21 | with this program; if not, contact CiviCRM LLC |
22 | at info[AT]civicrm[DOT]org. If you have questions about the |
23 | GNU Affero General Public License or the licensing of CiviCRM, |
24 | see the CiviCRM license FAQ at http://civicrm.org/licensing |
25 +--------------------------------------------------------------------+
31 * @copyright CiviCRM LLC (c) 2004-2013
36 require_once 'Mail.php';
37 class CRM_Mailing_BAO_Job
extends CRM_Mailing_DAO_Job
{
38 CONST MAX_CONTACTS_TO_PROCESS
= 1000;
43 function __construct() {
44 parent
::__construct();
47 function create ($params){
48 $job = new CRM_Mailing_BAO_Job();
49 $job->mailing_id
= $params['mailing_id'];
50 $job->status
= $params['status'];
51 $job->scheduled_date
= $params['scheduled_date'];
52 $job->is_test
= $params['is_test'];
54 $mailing = new CRM_Mailing_BAO_Mailing();
55 $eq = $mailing->getRecipients($job->id
, $params['mailing_id'], NULL, NULL, true, false);
59 * Initiate all pending/ready jobs
65 public static function runJobs($testParams = NULL, $mode = NULL) {
66 $job = new CRM_Mailing_BAO_Job();
68 $config = CRM_Core_Config
::singleton();
69 $jobTable = CRM_Mailing_DAO_Job
::getTableName();
70 $mailingTable = CRM_Mailing_DAO_Mailing
::getTableName();
72 if (!empty($testParams)) {
76 WHERE id = {$testParams['job_id']}";
80 $currentTime = date('YmdHis');
81 $mailingACL = CRM_Mailing_BAO_Mailing
::mailingACL('m');
82 $domainID = CRM_Core_Config
::domainID();
84 $modeClause = 'AND m.sms_provider_id IS NULL';
86 $modeClause = 'AND m.sms_provider_id IS NOT NULL';
89 // Select the first child job that is scheduled
95 WHERE m.id = j.mailing_id AND m.domain_id = {$domainID}
98 AND ( ( j.start_date IS null
99 AND j.scheduled_date <= $currentTime
100 AND j.status = 'Scheduled' )
101 OR ( j.status = 'Running'
102 AND j.end_date IS null ) )
103 AND (j.job_type = 'child')
105 ORDER BY j.mailing_id,
113 while ($job->fetch()) {
114 // still use job level lock for each child job
115 $lockName = "civimail.job.{$job->id}";
117 $lock = new CRM_Core_Lock($lockName);
118 if (!$lock->isAcquired()) {
122 // for test jobs we do not change anything, since its on a short-circuit path
123 if (empty($testParams)) {
124 // we've got the lock, but while we were waiting and processing
125 // other emails, this job might have changed under us
126 // lets get the job status again and check
127 $job->status
= CRM_Core_DAO
::getFieldValue('CRM_Mailing_DAO_Job',
132 if ($job->status
!= 'Running' &&
133 $job->status
!= 'Scheduled'
135 // this includes Cancelled and other statuses, CRM-4246
141 /* Queue up recipients for the child job being launched */
143 if ($job->status
!= 'Running') {
144 $transaction = new CRM_Core_Transaction();
146 // have to queue it up based on the offset and limits
147 // get the parent ID, and limit and offset
148 $job->queue($testParams);
150 // Mark up the starting time
151 $saveJob = new CRM_Mailing_DAO_Job();
152 $saveJob->id
= $job->id
;
153 $saveJob->start_date
= date('YmdHis');
154 $saveJob->status
= 'Running';
157 $transaction->commit();
161 // make it a persistent connection, CRM-9349
162 if ($mode === NULL) {
163 $mailer = $config->getMailer(TRUE);
165 elseif ($mode == 'sms') {
166 $mailer = CRM_SMS_Provider
::singleton(array('mailing_id' => $job->mailing_id
));
169 // Compose and deliver each child job
170 $isComplete = $job->deliver($mailer, $testParams);
172 CRM_Utils_Hook
::post('create', 'CRM_Mailing_DAO_Spool', $job->id
, $isComplete);
174 // Mark the child complete
178 $transaction = new CRM_Core_Transaction();
180 $saveJob = new CRM_Mailing_DAO_Job();
181 $saveJob->id
= $job->id
;
182 $saveJob->end_date
= date('YmdHis');
183 $saveJob->status
= 'Complete';
186 $transaction->commit();
188 // don't mark the mailing as complete
191 // Release the child joblock
200 // post process to determine if the parent job
201 // as well as the mailing is complete after the run
202 public static function runJobs_post($mode = NULL) {
204 $job = new CRM_Mailing_BAO_Job();
206 $mailing = new CRM_Mailing_BAO_Mailing();
208 $config = CRM_Core_Config
::singleton();
209 $jobTable = CRM_Mailing_DAO_Job
::getTableName();
210 $mailingTable = CRM_Mailing_DAO_Mailing
::getTableName();
212 $currentTime = date('YmdHis');
213 $mailingACL = CRM_Mailing_BAO_Mailing
::mailingACL('m');
214 $domainID = CRM_Core_Config
::domainID();
220 WHERE m.id = j.mailing_id AND m.domain_id = {$domainID}
222 AND j.scheduled_date <= $currentTime
223 AND j.status = 'Running'
224 AND j.end_date IS null
225 AND (j.job_type != 'child' OR j.job_type is NULL)
226 ORDER BY j.scheduled_date,
231 // For each parent job that is running, let's look at their child jobs
232 while ($job->fetch()) {
234 $child_job = new CRM_Mailing_BAO_Job();
238 FROM civicrm_mailing_job j, civicrm_mailing m
239 WHERE m.id = j.mailing_id
240 AND j.job_type = 'child'
242 AND j.status <> 'Complete'";
243 $params = array(1 => array($job->id
, 'Integer'));
245 $anyChildLeft = CRM_Core_DAO
::singleValueQuery($child_job_sql, $params);
247 // all of the child jobs are complete, update
248 // the parent job as well as the mailing status
249 if (!$anyChildLeft) {
251 $transaction = new CRM_Core_Transaction();
253 $saveJob = new CRM_Mailing_DAO_Job();
254 $saveJob->id
= $job->id
;
255 $saveJob->end_date
= date('YmdHis');
256 $saveJob->status
= 'Complete';
260 $mailing->id
= $job->mailing_id
;
261 $mailing->is_completed
= TRUE;
263 $transaction->commit();
269 // before we run jobs, we need to split the jobs
270 public static function runJobs_pre($offset = 200, $mode = NULL) {
271 $job = new CRM_Mailing_BAO_Job();
273 $config = CRM_Core_Config
::singleton();
274 $jobTable = CRM_Mailing_DAO_Job
::getTableName();
275 $mailingTable = CRM_Mailing_DAO_Mailing
::getTableName();
277 $currentTime = date('YmdHis');
278 $mailingACL = CRM_Mailing_BAO_Mailing
::mailingACL('m');
281 $workflowClause = CRM_Mailing_BAO_Job
::workflowClause();
283 $domainID = CRM_Core_Config
::domainID();
285 $modeClause = 'AND m.sms_provider_id IS NULL';
286 if ($mode == 'sms') {
287 $modeClause = 'AND m.sms_provider_id IS NOT NULL';
290 // Select all the mailing jobs that are created from
291 // when the mailing is submitted or scheduled.
296 WHERE m.id = j.mailing_id AND m.domain_id = {$domainID}
300 AND ( ( j.start_date IS null
301 AND j.scheduled_date <= $currentTime
302 AND j.status = 'Scheduled'
303 AND j.end_date IS null ) )
304 AND ((j.job_type is NULL) OR (j.job_type <> 'child'))
305 ORDER BY j.scheduled_date,
312 // For each of the "Parent Jobs" we find, we split them into
313 // X Number of child jobs
314 while ($job->fetch()) {
315 // still use job level lock for each child job
316 $lockName = "civimail.job.{$job->id}";
318 $lock = new CRM_Core_Lock($lockName);
319 if (!$lock->isAcquired()) {
323 // Re-fetch the job status in case things
324 // changed between the first query and now
325 // to avoid race conditions
326 $job->status
= CRM_Core_DAO
::getFieldValue('CRM_Mailing_DAO_Job',
330 if ($job->status
!= 'Scheduled') {
335 $job->split_job($offset);
337 // update the status of the parent job
338 $transaction = new CRM_Core_Transaction();
340 $saveJob = new CRM_Mailing_DAO_Job();
341 $saveJob->id
= $job->id
;
342 $saveJob->start_date
= date('YmdHis');
343 $saveJob->status
= 'Running';
346 $transaction->commit();
348 // Release the job lock
353 // Split the parent job into n number of child job based on an offset
354 // If null or 0 , we create only one child job
355 public function split_job($offset = 200) {
356 $recipient_count = CRM_Mailing_BAO_Recipients
::mailingSize($this->mailing_id
);
358 $jobTable = CRM_Mailing_DAO_Job
::getTableName();
361 $dao = new CRM_Core_DAO();
364 INSERT INTO civicrm_mailing_job
365 (`mailing_id`, `scheduled_date`, `status`, `job_type`, `parent_id`, `job_offset`, `job_limit`)
366 VALUES (%1, %2, %3, %4, %5, %6, %7)
368 $params = array(1 => array($this->mailing_id
, 'Integer'),
369 2 => array($this->scheduled_date
, 'String'),
370 3 => array('Scheduled', 'String'),
371 4 => array('child', 'String'),
372 5 => array($this->id
, 'Integer'),
373 6 => array(0, 'Integer'),
374 7 => array($recipient_count, 'Integer'),
377 // create one child job if the mailing size is less than the offset
378 // probably use a CRM_Mailing_DAO_Job( );
379 if (empty($offset) ||
380 $recipient_count <= $offset
382 CRM_Core_DAO
::executeQuery($sql, $params);
385 // Creating 'child jobs'
386 for ($i = 0; $i < $recipient_count; $i = $i +
$offset) {
388 $params[7][0] = $offset;
389 CRM_Core_DAO
::executeQuery($sql, $params);
394 public function queue($testParams = NULL) {
395 $mailing = new CRM_Mailing_BAO_Mailing();
396 $mailing->id
= $this->mailing_id
;
397 if (!empty($testParams)) {
398 $mailing->getTestRecipients($testParams);
401 // We are still getting all the recipients from the parent job
402 // so we don't mess with the include/exclude logic.
403 $recipients = CRM_Mailing_BAO_Recipients
::mailingQuery($this->mailing_id
, $this->job_offset
, $this->job_limit
);
405 // FIXME: this is not very smart, we should move this to one DB call
406 // INSERT INTO ... SELECT FROM ..
407 // the thing we need to figure out is how to generate the hash automatically
411 while ($recipients->fetch()) {
412 if ($recipients->phone_id
) {
413 $recipients->email_id
= "null";
416 $recipients->phone_id
= "null";
421 $recipients->email_id
,
422 $recipients->contact_id
,
423 $recipients->phone_id
,
426 if ($count % CRM_Core_DAO
::BULK_MAIL_INSERT_COUNT
== 0) {
427 CRM_Mailing_Event_BAO_Queue
::bulkCreate($params, $now);
433 if (!empty($params)) {
434 CRM_Mailing_Event_BAO_Queue
::bulkCreate($params, $now);
442 * @param object $mailer A Mail object to send the messages
447 public function deliver(&$mailer, $testParams = NULL) {
448 $mailing = new CRM_Mailing_BAO_Mailing();
449 $mailing->id
= $this->mailing_id
;
450 $mailing->find(TRUE);
453 $eq = new CRM_Mailing_Event_BAO_Queue();
454 $eqTable = CRM_Mailing_Event_BAO_Queue
::getTableName();
455 $emailTable = CRM_Core_BAO_Email
::getTableName();
456 $phoneTable = CRM_Core_DAO_Phone
::getTableName();
457 $contactTable = CRM_Contact_BAO_Contact
::getTableName();
458 $edTable = CRM_Mailing_Event_BAO_Delivered
::getTableName();
459 $ebTable = CRM_Mailing_Event_BAO_Bounce
::getTableName();
461 $query = " SELECT $eqTable.id,
462 $emailTable.email as email,
467 INNER JOIN $emailTable
468 ON $eqTable.email_id = $emailTable.id
469 INNER JOIN $contactTable
470 ON $contactTable.id = $emailTable.contact_id
472 ON $eqTable.id = $edTable.event_queue_id
474 ON $eqTable.id = $ebTable.event_queue_id
475 WHERE $eqTable.job_id = " . $this->id
. "
476 AND $edTable.id IS null
477 AND $ebTable.id IS null
478 AND $contactTable.is_opt_out = 0";
480 if ($mailing->sms_provider_id
) {
483 $phoneTable.phone as phone,
488 INNER JOIN $phoneTable
489 ON $eqTable.phone_id = $phoneTable.id
490 INNER JOIN $contactTable
491 ON $contactTable.id = $phoneTable.contact_id
493 ON $eqTable.id = $edTable.event_queue_id
495 ON $eqTable.id = $ebTable.event_queue_id
496 WHERE $eqTable.job_id = " . $this->id
. "
497 AND $edTable.id IS null
498 AND $ebTable.id IS null
499 AND $contactTable.is_opt_out = 0";
503 static $config = NULL;
504 static $mailsProcessed = 0;
506 if ($config == NULL) {
507 $config = CRM_Core_Config
::singleton();
510 $job_date = CRM_Utils_Date
::isoToMysql($this->scheduled_date
);
513 if (!empty($testParams)) {
514 $mailing->from_name
= ts('CiviCRM Test Mailer (%1)',
515 array(1 => $mailing->from_name
)
517 $mailing->subject
= ts('Test Mailing:') . ' ' . $mailing->subject
;
520 CRM_Mailing_BAO_Mailing
::tokenReplace($mailing);
522 // get and format attachments
523 $attachments = CRM_Core_BAO_File
::getEntityFile('civicrm_mailing', $mailing->id
);
525 if (defined('CIVICRM_MAIL_SMARTY') && CIVICRM_MAIL_SMARTY
) {
526 CRM_Core_Smarty
::registerStringResource();
529 $isDelivered = FALSE;
531 // make sure that there's no more than $config->mailerBatchLimit mails processed in a run
532 while ($eq->fetch()) {
533 // if ( ( $mailsProcessed % 100 ) == 0 ) {
534 // CRM_Utils_System::xMemory( "$mailsProcessed: " );
537 if ($config->mailerBatchLimit
> 0 &&
538 $mailsProcessed >= $config->mailerBatchLimit
540 if (!empty($fields)) {
541 $this->deliverGroup($fields, $mailing, $mailer, $job_date, $attachments);
551 'contact_id' => $eq->contact_id
,
552 'email' => $eq->email
,
553 'phone' => $eq->phone
,
555 if (count($fields) == self
::MAX_CONTACTS_TO_PROCESS
) {
556 $isDelivered = $this->deliverGroup($fields, $mailing, $mailer, $job_date, $attachments);
567 if (!empty($fields)) {
568 $isDelivered = $this->deliverGroup($fields, $mailing, $mailer, $job_date, $attachments);
573 public function deliverGroup(&$fields, &$mailing, &$mailer, &$job_date, &$attachments) {
574 static $smtpConnectionErrors = 0;
576 if (!is_object($mailer) ||
579 CRM_Core_Error
::fatal();
582 // get the return properties
583 $returnProperties = $mailing->getReturnProperties();
584 $params = $targetParams = $deliveredParams = array();
587 foreach ($fields as $key => $field) {
588 $params[] = $field['contact_id'];
591 $details = CRM_Utils_Token
::getTokenDetails(
595 $mailing->getFlattenedTokens(),
600 $config = CRM_Core_Config
::singleton();
601 foreach ($fields as $key => $field) {
602 $contactID = $field['contact_id'];
603 if (!array_key_exists($contactID, $details[0])) {
604 $details[0][$contactID] = array();
606 /* Compose the mailing */
608 $recipient = $replyToEmail = NULL;
609 $replyValue = strcmp($mailing->replyto_email
, $mailing->from_email
);
611 $replyToEmail = $mailing->replyto_email
;
614 $message = &$mailing->compose($this->id
, $field['id'], $field['hash'],
615 $field['contact_id'], $field['email'],
616 $recipient, FALSE, $details[0][$contactID], $attachments,
617 FALSE, NULL, $replyToEmail
619 if (empty($message)) {
620 // lets keep the message in the queue
621 // most likely a permissions related issue with smarty templates
622 // or a bad contact id? CRM-9833
626 /* Send the mailing */
628 $body = &$message->get();
629 $headers = &$message->headers();
631 if ($mailing->sms_provider_id
) {
632 $provider = CRM_SMS_Provider
::singleton(array('mailing_id' => $mailing->id
));
633 $body = $provider->getMessage($message, $field['contact_id'], $details[0][$contactID]);
634 $headers = $provider->getRecipientDetails($field, $details[0][$contactID]);
637 // make $recipient actually be the *encoded* header, so as not to baffle Mail_RFC822, CRM-5743
638 $recipient = $headers['To'];
641 // disable error reporting on real mailings (but leave error reporting for tests), CRM-5744
643 CRM_Core_Error
::ignoreException();
646 $result = $mailer->send($recipient, $headers, $body, $this->id
);
649 CRM_Core_Error
::setCallback();
652 if (is_a($result, 'PEAR_Error')) {
654 $message = $result->getMessage();
656 'Failed to write to socket'
658 // lets log this message and code
659 $code = $result->getCode();
660 CRM_Core_Error
::debug_log_message("SMTP Socket Error. Message: $message, Code: $code");
662 // these are socket write errors which most likely means smtp connection errors
664 $smtpConnectionErrors++
;
665 if ($smtpConnectionErrors <= 5) {
670 // seems like we have too many of them in a row, we should
671 // write stuff to disk and abort the cron job
672 $this->writeToDB($deliveredParams,
678 CRM_Core_Error
::debug_log_message("Too many SMTP Socket Errors. Exiting");
679 CRM_Utils_System
::civiExit();
682 /* Register the bounce event */
685 'event_queue_id' => $field['id'],
686 'job_id' => $this->id
,
687 'hash' => $field['hash'],
689 $params = array_merge($params,
690 CRM_Mailing_BAO_BouncePattern
::match($result->getMessage())
692 CRM_Mailing_Event_BAO_Bounce
::create($params);
695 /* Register the delivery event */
697 $deliveredParams[] = $field['id'];
698 $targetParams[] = $field['contact_id'];
701 if ($count % CRM_Core_DAO
::BULK_MAIL_INSERT_COUNT
== 0) {
702 $this->writeToDB($deliveredParams,
709 // hack to stop mailing job at run time, CRM-4246.
710 // to avoid making too many DB calls for this rare case
711 // lets do it when we snapshot
712 $status = CRM_Core_DAO
::getFieldValue('CRM_Mailing_DAO_Job',
716 if ($status != 'Running') {
724 // seems like a successful delivery or bounce, lets decrement error count
725 // only if we have smtp connection errors
726 if ($smtpConnectionErrors > 0) {
727 $smtpConnectionErrors--;
730 // If we have enabled the Throttle option, this is the time to enforce it.
731 if (isset($config->mailThrottleTime
) && $config->mailThrottleTime
> 0) {
732 usleep((int ) $config->mailThrottleTime
);
736 $result = $this->writeToDB($deliveredParams,
748 * @param int $mailingId the id of the mailing to be canceled
751 public static function cancel($mailingId) {
754 FROM civicrm_mailing_job
755 WHERE mailing_id = %1
757 AND ( ( job_type IS NULL ) OR
758 job_type <> 'child' )
760 $params = array(1 => array($mailingId, 'Integer'));
761 $job = CRM_Core_DAO
::executeQuery($sql, $params);
763 in_array($job->status
, array('Scheduled', 'Running', 'Paused'))
766 $newJob = new CRM_Mailing_BAO_Job();
767 $newJob->id
= $job->id
;
768 $newJob->end_date
= date('YmdHis');
769 $newJob->status
= 'Canceled';
772 // also cancel all child jobs
774 UPDATE civicrm_mailing_job
775 SET status = 'Canceled',
779 AND job_type = 'child'
780 AND status IN ( 'Scheduled', 'Running', 'Paused' )
782 $params = array(1 => array($job->id
, 'Integer'),
783 2 => array(date('YmdHis'), 'Timestamp'),
785 CRM_Core_DAO
::executeQuery($sql, $params);
787 CRM_Core_Session
::setStatus(ts('The mailing has been canceled.'), ts('Canceled'), 'success');
792 * Return a translated status enum string
794 * @param string $status The status enum
796 * @return string The translated version
800 public static function status($status) {
801 static $translation = NULL;
803 if (empty($translation)) {
804 $translation = array(
805 'Scheduled' => ts('Scheduled'),
806 'Running' => ts('Running'),
807 'Complete' => ts('Complete'),
808 'Paused' => ts('Paused'),
809 'Canceled' => ts('Canceled'),
812 return CRM_Utils_Array
::value($status, $translation, ts('Not scheduled'));
816 * Return a workflow clause for use in SQL queries,
817 * to only process jobs that are approved.
819 * @return string For use in a WHERE clause
823 public static function workflowClause() {
824 // add an additional check and only process
825 // jobs that are approved
826 if (CRM_Mailing_Info
::workflowEnabled()) {
827 $approveOptionID = CRM_Core_OptionGroup
::getValue('mail_approval_status',
831 if ($approveOptionID) {
832 return " AND m.approval_status_id = $approveOptionID ";
838 public function writeToDB(&$deliveredParams,
843 static $activityTypeID = NULL;
845 if (!empty($deliveredParams)) {
846 CRM_Mailing_Event_BAO_Delivered
::bulkCreate($deliveredParams);
847 $deliveredParams = array();
851 if (!empty($targetParams) &&
852 !empty($mailing->scheduled_id
)
855 if (!$activityTypeID) {
856 $activityTypeID = CRM_Core_OptionGroup
::getValue('activity_type',
860 if ($mailing->sms_provider_id
) {
861 $mailing->subject
= $mailing->name
;
862 $activityTypeID = CRM_Core_OptionGroup
::getValue('activity_type',
867 if (!$activityTypeID) {
868 CRM_Core_Error
::fatal();
874 'source_contact_id' => $mailing->scheduled_id
,
876 'target_contact_id' => array_unique($targetParams),
877 'target_contact_id' => $targetParams,
878 'activity_type_id' => $activityTypeID,
879 'source_record_id' => $this->mailing_id
,
880 'activity_date_time' => $job_date,
881 'subject' => $mailing->subject
,
883 'deleteActivityTarget' => FALSE,
884 'campaign_id' => $mailing->campaign_id
,
887 //check whether activity is already created for this mailing.
888 //if yes then create only target contact record.
891 FROM civicrm_activity
892 WHERE civicrm_activity.activity_type_id = %1
893 AND civicrm_activity.source_record_id = %2";
895 $queryParams = array(1 => array($activityTypeID, 'Integer'),
896 2 => array($this->mailing_id
, 'Integer'),
898 $activityID = CRM_Core_DAO
::singleValueQuery($query,
903 $activity['id'] = $activityID;
906 if (CRM_Core_BAO_Email
::isMultipleBulkMail()) {
907 // make sure we don't attempt to duplicate the target activity
908 foreach ($activity['target_contact_id'] as $key => $targetID) {
909 $sql = "SELECT id FROM civicrm_activity_target WHERE activity_id = $activityID AND target_contact_id = $targetID;";
910 if (CRM_Core_DAO
::singleValueQuery($sql)) {
911 unset($activity['target_contact_id'][$key]);
917 if (is_a(CRM_Activity_BAO_Activity
::create($activity),
923 $targetParams = array();