@celery.task()
def processing_cleanup(entry, manager):
- processor = CommonVideoProcessor(manager, entry) # is it manager, entry or entry, manager?
+ processor = CommonVideoProcessor(manager, entry)
processor.delete_queue_file()
# =====================
def workflow(self, entry, manager, feed_url, reprocess_action,
reprocess_info=None):
- reprocess_info['entry'] = entry.id # ?
- reprocess_info['manager'] = manager # can celery serialize this?
-
- # Add args
+ reprocess_info['entry'] = entry
+ reprocess_info['manager'] = manager
transcoding_tasks = group(
- main_task.signature(queue='default', priority=5, immutable=True),
- complimentary_task.signature(queue='default', priority=4, immutable=True),
- complimentary_task.signature(queue='default', priority=3, immutable=True),
- complimentary_task.signature(queue='default', priority=2, immutable=True)
- complimentary_task.signature(queue='default', priority=1, immutable=True)
+ main_task.signature(args=('480p', ACCEPTED_RESOLUTIONS['480p']),
+ kwargs=reprocess_info, queue='default',
+ priority=5, immutable=True),
+ complimentary_task.signature(args=('360p', ACCEPTED_RESOLUTIONS['360p']),
+ kwargs=reprocess_info, queue='default',
+ priority=4, immutable=True),
+ complimentary_task.signature(args=('720p', ACCEPTED_RESOLUTIONS['720p']),
+ kwargs=reprocess_info, queue='default',
+ priority=3, immutable=True),
)
- chord(transcoding_tasks)(processing_cleanup.signature(queue='default', immutable=True))
+ cleanup_task = processing_cleanup.signature(args=(entry, manager),
+ queue='default', immutable=True)
+
+ chord(transcoding_tasks)(cleanup_task)