Add priority to the celery tasks
[mediagoblin.git] / mediagoblin / media_types / video / processing.py
CommitLineData
93bdab9d 1# GNU MediaGoblin -- federated, autonomous media hosting
cf29e8a8 2# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
93bdab9d
JW
3#
4# This program is free software: you can redistribute it and/or modify
5# it under the terms of the GNU Affero General Public License as published by
6# the Free Software Foundation, either version 3 of the License, or
7# (at your option) any later version.
8#
9# This program is distributed in the hope that it will be useful,
10# but WITHOUT ANY WARRANTY; without even the implied warranty of
11# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12# GNU Affero General Public License for more details.
13#
14# You should have received a copy of the GNU Affero General Public License
15# along with this program. If not, see <http://www.gnu.org/licenses/>.
16
57d1cb3c 17import argparse
2ed6afb0 18import os.path
e9c1b938 19import logging
d0ceb506 20import datetime
9a27fa60 21import celery
93bdab9d 22
896d00fb
BP
23import six
24
25ecdec9 25from celery import group, chord
93bdab9d 26from mediagoblin import mg_globals as mgg
347ef583
RE
27from mediagoblin.processing import (
28 FilenameBuilder, BaseProcessingFail,
29 ProgressCallback, MediaProcessor,
30 ProcessingManager, request_from_args,
1cefccc7 31 get_process_filename, store_public,
347ef583 32 copy_original)
81c59ef0 33from mediagoblin.processing.task import ProcessMedia
51eb0267 34from mediagoblin.tools.translate import lazy_pass_to_ugettext as _
54b4b28f 35from mediagoblin.media_types import MissingComponents
51eb0267 36
26729e02 37from . import transcoders
25ecdec9 38from .util import skip_transcode, ACCEPTED_RESOLUTIONS
5c754fda 39
8e5f9746
JW
40_log = logging.getLogger(__name__)
41_log.setLevel(logging.DEBUG)
93bdab9d 42
cbac4a7f
RE
43MEDIA_TYPE = 'mediagoblin.media_types.video'
44
93bdab9d 45
51eb0267
JW
46class VideoTranscodingFail(BaseProcessingFail):
47 '''
48 Error raised if video transcoding fails
49 '''
50 general_message = _(u'Video transcoding failed')
51
52
54b4b28f
BB
53def sniffer(media_file):
54 '''New style sniffer, used in two-steps check; requires to have .name'''
cbac4a7f 55 _log.info('Sniffing {0}'.format(MEDIA_TYPE))
54b4b28f
BB
56 try:
57 data = transcoders.discover(media_file.name)
58 except Exception as e:
59 # this is usually GLib.GError, but we don't really care which one
896d00fb
BP
60 _log.warning(u'GStreamer: {0}'.format(six.text_type(e)))
61 raise MissingComponents(u'GStreamer: {0}'.format(six.text_type(e)))
4f4f2531 62 _log.debug('Discovered: {0}'.format(data))
10085b77 63
54b4b28f
BB
64 if not data.get_video_streams():
65 raise MissingComponents('No video streams found in this video')
26729e02 66
54b4b28f 67 if data.get_result() != 0: # it's 0 if success
6e4eccb1
BB
68 try:
69 missing = data.get_misc().get_string('name')
70 _log.warning('GStreamer: missing {0}'.format(missing))
71 except AttributeError as e:
72 # AttributeError happens here on gstreamer >1.4, when get_misc
73 # returns None. There is a special function to get info about
74 # missing plugin. This info should be printed to logs for admin and
75 # showed to the user in a short and nice version
76 details = data.get_missing_elements_installer_details()
77 _log.warning('GStreamer: missing: {0}'.format(', '.join(details)))
78 missing = u', '.join([u'{0} ({1})'.format(*d.split('|')[3:])
79 for d in details])
80 raise MissingComponents(u'{0} is missing'.format(missing))
26729e02 81
54b4b28f 82 return MEDIA_TYPE
93bdab9d 83
bfd68cce 84
54b4b28f
BB
85def sniff_handler(media_file, filename):
86 try:
87 return sniffer(media_file)
88 except:
89 _log.error('Could not discover {0}'.format(filename))
90 return None
91
2d1e8905
BB
92def get_tags(stream_info):
93 'gets all tags and their values from stream info'
94 taglist = stream_info.get_tags()
95 if not taglist:
96 return {}
97 tags = []
98 taglist.foreach(
99 lambda list, tag: tags.append((tag, list.get_value_index(tag, 0))))
100 tags = dict(tags)
101
102 # date/datetime should be converted from GDate/GDateTime to strings
103 if 'date' in tags:
104 date = tags['date']
105 tags['date'] = "%s-%s-%s" % (
106 date.year, date.month, date.day)
107
108 if 'datetime' in tags:
109 # TODO: handle timezone info; gst.get_time_zone_offset +
110 # python's tzinfo should help
111 dt = tags['datetime']
112 tags['datetime'] = datetime.datetime(
113 dt.get_year(), dt.get_month(), dt.get_day(), dt.get_hour(),
114 dt.get_minute(), dt.get_second(),
115 dt.get_microsecond()).isoformat()
f13225fa 116 for k, v in tags.copy().items():
2d1e8905 117 # types below are accepted by json; others must not present
896d00fb 118 if not isinstance(v, (dict, list, six.string_types, int, float, bool,
2d1e8905
BB
119 type(None))):
120 del tags[k]
121 return dict(tags)
122
29adab46
CAW
123def store_metadata(media_entry, metadata):
124 """
125 Store metadata from this video for this media entry.
126 """
7e266d5a
BB
127 stored_metadata = dict()
128 audio_info_list = metadata.get_audio_streams()
129 if audio_info_list:
2d1e8905
BB
130 stored_metadata['audio'] = []
131 for audio_info in audio_info_list:
132 stored_metadata['audio'].append(
133 {
134 'channels': audio_info.get_channels(),
135 'bitrate': audio_info.get_bitrate(),
136 'depth': audio_info.get_depth(),
137 'languange': audio_info.get_language(),
138 'sample_rate': audio_info.get_sample_rate(),
139 'tags': get_tags(audio_info)
140 })
141
142 video_info_list = metadata.get_video_streams()
143 if video_info_list:
144 stored_metadata['video'] = []
145 for video_info in video_info_list:
146 stored_metadata['video'].append(
147 {
148 'width': video_info.get_width(),
149 'height': video_info.get_height(),
150 'bitrate': video_info.get_bitrate(),
151 'depth': video_info.get_depth(),
152 'videorate': [video_info.get_framerate_num(),
153 video_info.get_framerate_denom()],
154 'tags': get_tags(video_info)
155 })
156
157 stored_metadata['common'] = {
158 'duration': metadata.get_duration(),
159 'tags': get_tags(metadata),
160 }
4f239ff1
CAW
161 # Only save this field if there's something to save
162 if len(stored_metadata):
2d1e8905 163 media_entry.media_data_init(orig_metadata=stored_metadata)
347ef583 164
7cc9b6d1 165# =====================
166
167
9a27fa60 168@celery.task()
25ecdec9 169def main_task(resolution, medium_size, **process_info):
7cc9b6d1 170 processor = CommonVideoProcessor(process_info['manager'], process_info['entry'])
25ecdec9 171 processor.common_setup(resolution)
172 processor.transcode(medium_size=medium_size, vp8_quality=process_info['vp8_quality'],
7cc9b6d1 173 vp8_threads=process_info['vp8_threads'], vorbis_quality=process_info['vorbis_quality'])
174 processor.generate_thumb(thumb_size=process_info['thumb_size'])
175 processor.store_orig_metadata()
176
177
9a27fa60 178@celery.task()
25ecdec9 179def complimentary_task(resolution, medium_size, **process_info):
7cc9b6d1 180 processor = CommonVideoProcessor(process_info['manager'], process_info['entry'])
25ecdec9 181 processor.common_setup(resolution)
182 processor.transcode(medium_size=medium_size, vp8_quality=process_info['vp8_quality'],
7cc9b6d1 183 vp8_threads=process_info['vp8_threads'], vorbis_quality=process_info['vorbis_quality'])
184
185
9a27fa60 186@celery.task()
25ecdec9 187def processing_cleanup(entry, manager):
188 processor = CommonVideoProcessor(manager, entry) # is it manager, entry or entry, manager?
7cc9b6d1 189 processor.delete_queue_file()
190
191# =====================
192
347ef583
RE
193
194class CommonVideoProcessor(MediaProcessor):
195 """
196 Provides a base for various video processing steps
197 """
16ef1164 198 acceptable_files = ['original, best_quality', 'webm_144p', 'webm_360p',
199 'webm_480p', 'webm_720p', 'webm_1080p', 'webm_video']
347ef583 200
16ef1164 201 def common_setup(self, resolution=None):
347ef583 202 self.video_config = mgg \
9a6741d7 203 .global_config['plugins'][MEDIA_TYPE]
347ef583 204
1cefccc7
RE
205 # Pull down and set up the processing file
206 self.process_filename = get_process_filename(
207 self.entry, self.workbench, self.acceptable_files)
208 self.name_builder = FilenameBuilder(self.process_filename)
347ef583
RE
209
210 self.transcoder = transcoders.VideoTranscoder()
211 self.did_transcode = False
212
16ef1164 213 if resolution:
214 self.curr_file = 'webm_' + str(resolution)
215 self.part_filename = (self.name_builder.fill('{basename}.' +
216 str(resolution) + '.webm'))
217 else:
218 self.curr_file = 'webm_video'
219 self.part_filename = self.name_builder.fill('{basename}.medium.webm')
2963b0a1 220
347ef583
RE
221 def copy_original(self):
222 # If we didn't transcode, then we need to keep the original
16ef1164 223 raise NotImplementedError
347ef583 224
0a8c0c70
RE
225 def _keep_best(self):
226 """
227 If there is no original, keep the best file that we have
228 """
16ef1164 229 raise NotImplementedError
0a8c0c70 230
4c617543
RE
231 def _skip_processing(self, keyname, **kwargs):
232 file_metadata = self.entry.get_file_metadata(keyname)
233
234 if not file_metadata:
235 return False
236 skip = True
237
16ef1164 238 if 'webm' in keyname:
4c617543
RE
239 if kwargs.get('medium_size') != file_metadata.get('medium_size'):
240 skip = False
241 elif kwargs.get('vp8_quality') != file_metadata.get('vp8_quality'):
242 skip = False
243 elif kwargs.get('vp8_threads') != file_metadata.get('vp8_threads'):
244 skip = False
245 elif kwargs.get('vorbis_quality') != \
246 file_metadata.get('vorbis_quality'):
247 skip = False
248 elif keyname == 'thumb':
249 if kwargs.get('thumb_size') != file_metadata.get('thumb_size'):
250 skip = False
251
252 return skip
253
0a8c0c70 254
347ef583
RE
255 def transcode(self, medium_size=None, vp8_quality=None, vp8_threads=None,
256 vorbis_quality=None):
57d1cb3c 257 progress_callback = ProgressCallback(self.entry)
16ef1164 258 tmp_dst = os.path.join(self.workbench.dir, self.part_filename)
347ef583
RE
259
260 if not medium_size:
261 medium_size = (
262 mgg.global_config['media:medium']['max_width'],
263 mgg.global_config['media:medium']['max_height'])
264 if not vp8_quality:
265 vp8_quality = self.video_config['vp8_quality']
266 if not vp8_threads:
267 vp8_threads = self.video_config['vp8_threads']
268 if not vorbis_quality:
269 vorbis_quality = self.video_config['vorbis_quality']
270
4c617543
RE
271 file_metadata = {'medium_size': medium_size,
272 'vp8_threads': vp8_threads,
273 'vp8_quality': vp8_quality,
274 'vorbis_quality': vorbis_quality}
275
16ef1164 276 if self._skip_processing(self.curr_file, **file_metadata):
4c617543
RE
277 return
278
16ef1164 279 metadata = transcoders.discover(self.process_filename)
280 orig_dst_dimensions = (metadata.get_video_streams()[0].get_width(),
281 metadata.get_video_streams()[0].get_height())
bd50f8bf 282
347ef583
RE
283 # Figure out whether or not we need to transcode this video or
284 # if we can skip it
16ef1164 285 if skip_transcode(metadata, medium_size):
347ef583
RE
286 _log.debug('Skipping transcoding')
287
1cefccc7
RE
288 # If there is an original and transcoded, delete the transcoded
289 # since it must be of lower quality then the original
290 if self.entry.media_files.get('original') and \
16ef1164 291 self.entry.media_files.get(self.curr_file):
292 self.entry.media_files[self.curr_file].delete()
1cefccc7 293
347ef583 294 else:
1cefccc7 295 self.transcoder.transcode(self.process_filename, tmp_dst,
347ef583
RE
296 vp8_quality=vp8_quality,
297 vp8_threads=vp8_threads,
298 vorbis_quality=vorbis_quality,
299 progress_callback=progress_callback,
9b1317e3 300 dimensions=tuple(medium_size))
bd50f8bf 301 if self.transcoder.dst_data:
bd50f8bf
BB
302 # Push transcoded video to public storage
303 _log.debug('Saving medium...')
304 store_public(self.entry, 'webm_video', tmp_dst,
305 self.name_builder.fill('{basename}.medium.webm'))
306 _log.debug('Saved medium')
307
16ef1164 308 # Is this the file_metadata that paroneayea was talking about?
309 self.entry.set_file_metadata(self.curr_file, **file_metadata)
bd50f8bf
BB
310
311 self.did_transcode = True
347ef583
RE
312
313 def generate_thumb(self, thumb_size=None):
314 # Temporary file for the video thumbnail (cleaned up with workbench)
315 tmp_thumb = os.path.join(self.workbench.dir,
316 self.name_builder.fill(
317 '{basename}.thumbnail.jpg'))
318
319 if not thumb_size:
79044027 320 thumb_size = (mgg.global_config['media:thumb']['max_width'],)
347ef583 321
4c617543
RE
322 if self._skip_processing('thumb', thumb_size=thumb_size):
323 return
324
0cdebda7 325 # We will only use the width so that the correct scale is kept
7e266d5a 326 transcoders.capture_thumb(
1cefccc7 327 self.process_filename,
347ef583 328 tmp_thumb,
0cdebda7 329 thumb_size[0])
347ef583 330
f4703ae9
CAW
331 # Checking if the thumbnail was correctly created. If it was not,
332 # then just give up.
333 if not os.path.exists (tmp_thumb):
334 return
335
347ef583
RE
336 # Push the thumbnail to public storage
337 _log.debug('Saving thumbnail...')
338 store_public(self.entry, 'thumb', tmp_thumb,
339 self.name_builder.fill('{basename}.thumbnail.jpg'))
340
4c617543 341 self.entry.set_file_metadata('thumb', thumb_size=thumb_size)
347ef583 342
16ef1164 343 def store_orig_metadata(self):
344
345 # Extract metadata and keep a record of it
346 metadata = transcoders.discover(self.process_filename)
347
348 # metadata's stream info here is a DiscovererContainerInfo instance,
349 # it gets split into DiscovererAudioInfo and DiscovererVideoInfo;
350 # metadata itself has container-related data in tags, like video-codec
351 store_metadata(self.entry, metadata)
352
353
347ef583
RE
354class InitialProcessor(CommonVideoProcessor):
355 """
356 Initial processing steps for new video
357 """
358 name = "initial"
359 description = "Initial processing"
360
361 @classmethod
362 def media_is_eligible(cls, entry=None, state=None):
363 if not state:
364 state = entry.state
365 return state in (
366 "unprocessed", "failed")
367
368 @classmethod
369 def generate_parser(cls):
370 parser = argparse.ArgumentParser(
371 description=cls.description,
372 prog=cls.name)
373
374 parser.add_argument(
375 '--medium_size',
376 nargs=2,
377 metavar=('max_width', 'max_height'),
378 type=int)
379
380 parser.add_argument(
381 '--vp8_quality',
382 type=int,
383 help='Range 0..10')
384
385 parser.add_argument(
386 '--vp8_threads',
387 type=int,
388 help='0 means number_of_CPUs - 1')
389
390 parser.add_argument(
391 '--vorbis_quality',
392 type=float,
393 help='Range -0.1..1')
394
395 parser.add_argument(
396 '--thumb_size',
397 nargs=2,
398 metavar=('max_width', 'max_height'),
399 type=int)
400
401 return parser
402
403 @classmethod
404 def args_to_request(cls, args):
405 return request_from_args(
406 args, ['medium_size', 'vp8_quality', 'vp8_threads',
407 'vorbis_quality', 'thumb_size'])
408
409 def process(self, medium_size=None, vp8_threads=None, vp8_quality=None,
16ef1164 410 vorbis_quality=None, thumb_size=None, resolution=None):
411 self.common_setup(resolution=resolution)
412 self.store_orig_metadata()
347ef583
RE
413 self.transcode(medium_size=medium_size, vp8_quality=vp8_quality,
414 vp8_threads=vp8_threads, vorbis_quality=vorbis_quality)
415
347ef583
RE
416 self.generate_thumb(thumb_size=thumb_size)
417 self.delete_queue_file()
418
419
371bcc24
RE
420class Resizer(CommonVideoProcessor):
421 """
422 Video thumbnail resizing process steps for processed media
423 """
424 name = 'resize'
425 description = 'Resize thumbnail'
3225008f 426 thumb_size = 'thumb_size'
371bcc24
RE
427
428 @classmethod
429 def media_is_eligible(cls, entry=None, state=None):
430 if not state:
431 state = entry.state
432 return state in 'processed'
433
434 @classmethod
435 def generate_parser(cls):
436 parser = argparse.ArgumentParser(
57d1cb3c 437 description=cls.description,
371bcc24
RE
438 prog=cls.name)
439
440 parser.add_argument(
441 '--thumb_size',
442 nargs=2,
443 metavar=('max_width', 'max_height'),
444 type=int)
445
698c7a8b
RE
446 # Needed for gmg reprocess thumbs to work
447 parser.add_argument(
448 'file',
449 nargs='?',
8bb0df62
RE
450 default='thumb',
451 choices=['thumb'])
698c7a8b 452
57d1cb3c
RE
453 return parser
454
371bcc24
RE
455 @classmethod
456 def args_to_request(cls, args):
457 return request_from_args(
698c7a8b 458 args, ['thumb_size', 'file'])
371bcc24 459
698c7a8b 460 def process(self, thumb_size=None, file=None):
371bcc24
RE
461 self.common_setup()
462 self.generate_thumb(thumb_size=thumb_size)
463
464
57d1cb3c
RE
465class Transcoder(CommonVideoProcessor):
466 """
467 Transcoding processing steps for processed video
468 """
469 name = 'transcode'
470 description = 'Re-transcode video'
471
472 @classmethod
473 def media_is_eligible(cls, entry=None, state=None):
474 if not state:
475 state = entry.state
476 return state in 'processed'
477
478 @classmethod
479 def generate_parser(cls):
480 parser = argparse.ArgumentParser(
481 description=cls.description,
482 prog=cls.name)
483
484 parser.add_argument(
485 '--medium_size',
486 nargs=2,
487 metavar=('max_width', 'max_height'),
488 type=int)
489
490 parser.add_argument(
491 '--vp8_quality',
492 type=int,
493 help='Range 0..10')
494
495 parser.add_argument(
496 '--vp8_threads',
497 type=int,
498 help='0 means number_of_CPUs - 1')
499
500 parser.add_argument(
501 '--vorbis_quality',
502 type=float,
503 help='Range -0.1..1')
504
505 return parser
506
507 @classmethod
508 def args_to_request(cls, args):
509 return request_from_args(
510 args, ['medium_size', 'vp8_threads', 'vp8_quality',
511 'vorbis_quality'])
512
513 def process(self, medium_size=None, vp8_quality=None, vp8_threads=None,
514 vorbis_quality=None):
515 self.common_setup()
516 self.transcode(medium_size=medium_size, vp8_threads=vp8_threads,
517 vp8_quality=vp8_quality, vorbis_quality=vorbis_quality)
518
519
347ef583
RE
520class VideoProcessingManager(ProcessingManager):
521 def __init__(self):
1a2982d6 522 super(VideoProcessingManager, self).__init__()
347ef583 523 self.add_processor(InitialProcessor)
371bcc24 524 self.add_processor(Resizer)
57d1cb3c 525 self.add_processor(Transcoder)
81c59ef0 526
25ecdec9 527 def workflow(self, entry, manager, feed_url, reprocess_action,
528 reprocess_info=None):
529
530 reprocess_info['entry'] = entry.id # ?
531 reprocess_info['manager'] = manager # can celery serialize this?
532
533 # Add args
534
535 transcoding_tasks = group(
536 main_task.signature(queue='default', priority=5, immutable=True),
537 complimentary_task.signature(queue='default', priority=4, immutable=True),
538 complimentary_task.signature(queue='default', priority=3, immutable=True),
539 complimentary_task.signature(queue='default', priority=2, immutable=True)
540 complimentary_task.signature(queue='default', priority=1, immutable=True)
541 )
542
543 chord(transcoding_tasks)(processing_cleanup.signature(queue='default', immutable=True))