Celery Priority testing with debug statements
[mediagoblin.git] / mediagoblin / media_types / video / processing.py
CommitLineData
93bdab9d 1# GNU MediaGoblin -- federated, autonomous media hosting
cf29e8a8 2# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
93bdab9d
JW
3#
4# This program is free software: you can redistribute it and/or modify
5# it under the terms of the GNU Affero General Public License as published by
6# the Free Software Foundation, either version 3 of the License, or
7# (at your option) any later version.
8#
9# This program is distributed in the hope that it will be useful,
10# but WITHOUT ANY WARRANTY; without even the implied warranty of
11# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12# GNU Affero General Public License for more details.
13#
14# You should have received a copy of the GNU Affero General Public License
15# along with this program. If not, see <http://www.gnu.org/licenses/>.
16
57d1cb3c 17import argparse
2ed6afb0 18import os.path
e9c1b938 19import logging
d0ceb506 20import datetime
9a27fa60 21import celery
93bdab9d 22
896d00fb
BP
23import six
24
25ecdec9 25from celery import group, chord
93bdab9d 26from mediagoblin import mg_globals as mgg
347ef583
RE
27from mediagoblin.processing import (
28 FilenameBuilder, BaseProcessingFail,
29 ProgressCallback, MediaProcessor,
30 ProcessingManager, request_from_args,
1cefccc7 31 get_process_filename, store_public,
d77eb562 32 copy_original, get_entry_and_processing_manager)
81c59ef0 33from mediagoblin.processing.task import ProcessMedia
51eb0267 34from mediagoblin.tools.translate import lazy_pass_to_ugettext as _
54b4b28f 35from mediagoblin.media_types import MissingComponents
51eb0267 36
26729e02 37from . import transcoders
25ecdec9 38from .util import skip_transcode, ACCEPTED_RESOLUTIONS
5c754fda 39
8e5f9746
JW
40_log = logging.getLogger(__name__)
41_log.setLevel(logging.DEBUG)
93bdab9d 42
cbac4a7f
RE
43MEDIA_TYPE = 'mediagoblin.media_types.video'
44
93bdab9d 45
51eb0267
JW
46class VideoTranscodingFail(BaseProcessingFail):
47 '''
48 Error raised if video transcoding fails
49 '''
50 general_message = _(u'Video transcoding failed')
51
52
54b4b28f
BB
53def sniffer(media_file):
54 '''New style sniffer, used in two-steps check; requires to have .name'''
cbac4a7f 55 _log.info('Sniffing {0}'.format(MEDIA_TYPE))
54b4b28f
BB
56 try:
57 data = transcoders.discover(media_file.name)
58 except Exception as e:
59 # this is usually GLib.GError, but we don't really care which one
896d00fb
BP
60 _log.warning(u'GStreamer: {0}'.format(six.text_type(e)))
61 raise MissingComponents(u'GStreamer: {0}'.format(six.text_type(e)))
4f4f2531 62 _log.debug('Discovered: {0}'.format(data))
10085b77 63
54b4b28f
BB
64 if not data.get_video_streams():
65 raise MissingComponents('No video streams found in this video')
26729e02 66
54b4b28f 67 if data.get_result() != 0: # it's 0 if success
6e4eccb1
BB
68 try:
69 missing = data.get_misc().get_string('name')
70 _log.warning('GStreamer: missing {0}'.format(missing))
71 except AttributeError as e:
72 # AttributeError happens here on gstreamer >1.4, when get_misc
73 # returns None. There is a special function to get info about
74 # missing plugin. This info should be printed to logs for admin and
75 # showed to the user in a short and nice version
76 details = data.get_missing_elements_installer_details()
77 _log.warning('GStreamer: missing: {0}'.format(', '.join(details)))
78 missing = u', '.join([u'{0} ({1})'.format(*d.split('|')[3:])
79 for d in details])
80 raise MissingComponents(u'{0} is missing'.format(missing))
26729e02 81
54b4b28f 82 return MEDIA_TYPE
93bdab9d 83
bfd68cce 84
54b4b28f
BB
85def sniff_handler(media_file, filename):
86 try:
87 return sniffer(media_file)
88 except:
89 _log.error('Could not discover {0}'.format(filename))
90 return None
91
2d1e8905
BB
92def get_tags(stream_info):
93 'gets all tags and their values from stream info'
94 taglist = stream_info.get_tags()
95 if not taglist:
96 return {}
97 tags = []
98 taglist.foreach(
99 lambda list, tag: tags.append((tag, list.get_value_index(tag, 0))))
100 tags = dict(tags)
101
102 # date/datetime should be converted from GDate/GDateTime to strings
103 if 'date' in tags:
104 date = tags['date']
105 tags['date'] = "%s-%s-%s" % (
106 date.year, date.month, date.day)
107
108 if 'datetime' in tags:
109 # TODO: handle timezone info; gst.get_time_zone_offset +
110 # python's tzinfo should help
111 dt = tags['datetime']
112 tags['datetime'] = datetime.datetime(
113 dt.get_year(), dt.get_month(), dt.get_day(), dt.get_hour(),
114 dt.get_minute(), dt.get_second(),
115 dt.get_microsecond()).isoformat()
f13225fa 116 for k, v in tags.copy().items():
2d1e8905 117 # types below are accepted by json; others must not present
896d00fb 118 if not isinstance(v, (dict, list, six.string_types, int, float, bool,
2d1e8905
BB
119 type(None))):
120 del tags[k]
121 return dict(tags)
122
29adab46
CAW
123def store_metadata(media_entry, metadata):
124 """
125 Store metadata from this video for this media entry.
126 """
7e266d5a
BB
127 stored_metadata = dict()
128 audio_info_list = metadata.get_audio_streams()
129 if audio_info_list:
2d1e8905
BB
130 stored_metadata['audio'] = []
131 for audio_info in audio_info_list:
132 stored_metadata['audio'].append(
133 {
134 'channels': audio_info.get_channels(),
135 'bitrate': audio_info.get_bitrate(),
136 'depth': audio_info.get_depth(),
137 'languange': audio_info.get_language(),
138 'sample_rate': audio_info.get_sample_rate(),
139 'tags': get_tags(audio_info)
140 })
141
142 video_info_list = metadata.get_video_streams()
143 if video_info_list:
144 stored_metadata['video'] = []
145 for video_info in video_info_list:
146 stored_metadata['video'].append(
147 {
148 'width': video_info.get_width(),
149 'height': video_info.get_height(),
150 'bitrate': video_info.get_bitrate(),
151 'depth': video_info.get_depth(),
152 'videorate': [video_info.get_framerate_num(),
153 video_info.get_framerate_denom()],
154 'tags': get_tags(video_info)
155 })
156
157 stored_metadata['common'] = {
158 'duration': metadata.get_duration(),
159 'tags': get_tags(metadata),
160 }
4f239ff1
CAW
161 # Only save this field if there's something to save
162 if len(stored_metadata):
2d1e8905 163 media_entry.media_data_init(orig_metadata=stored_metadata)
347ef583 164
7cc9b6d1 165# =====================
166
167
9a27fa60 168@celery.task()
d77eb562 169def main_task(entry_id, resolution, medium_size, **process_info):
170 entry, manager = get_entry_and_processing_manager(entry_id)
171 print "\nEntered main_task\n"
172 with CommonVideoProcessor(manager, entry) as processor:
173 processor.common_setup(resolution)
174 processor.transcode(medium_size=tuple(medium_size), vp8_quality=process_info['vp8_quality'],
175 vp8_threads=process_info['vp8_threads'], vorbis_quality=process_info['vorbis_quality'])
176 processor.generate_thumb(thumb_size=process_info['thumb_size'])
177 processor.store_orig_metadata()
178 print "\nExited main_task\n"
7cc9b6d1 179
180
9a27fa60 181@celery.task()
d77eb562 182def complimentary_task(entry_id, resolution, medium_size, **process_info):
183 entry, manager = get_entry_and_processing_manager(entry_id)
184 print "\nEntered complimentary_task\n"
185 with CommonVideoProcessor(manager, entry) as processor:
186 processor.common_setup(resolution)
187 processor.transcode(medium_size=tuple(medium_size), vp8_quality=process_info['vp8_quality'],
188 vp8_threads=process_info['vp8_threads'], vorbis_quality=process_info['vorbis_quality'])
189 print "\nExited complimentary_task\n"
7cc9b6d1 190
191
9a27fa60 192@celery.task()
d77eb562 193def processing_cleanup(entry_id):
194 entry, manager = get_entry_and_processing_manager(entry_id)
195 with CommonVideoProcessor(manager, entry) as processor:
196 processor.delete_queue_file()
197 print "\nDeleted queue_file\n"
7cc9b6d1 198
199# =====================
200
347ef583
RE
201
202class CommonVideoProcessor(MediaProcessor):
203 """
204 Provides a base for various video processing steps
205 """
16ef1164 206 acceptable_files = ['original, best_quality', 'webm_144p', 'webm_360p',
207 'webm_480p', 'webm_720p', 'webm_1080p', 'webm_video']
347ef583 208
16ef1164 209 def common_setup(self, resolution=None):
347ef583 210 self.video_config = mgg \
9a6741d7 211 .global_config['plugins'][MEDIA_TYPE]
347ef583 212
1cefccc7
RE
213 # Pull down and set up the processing file
214 self.process_filename = get_process_filename(
215 self.entry, self.workbench, self.acceptable_files)
216 self.name_builder = FilenameBuilder(self.process_filename)
d77eb562 217
347ef583
RE
218 self.transcoder = transcoders.VideoTranscoder()
219 self.did_transcode = False
220
16ef1164 221 if resolution:
222 self.curr_file = 'webm_' + str(resolution)
223 self.part_filename = (self.name_builder.fill('{basename}.' +
224 str(resolution) + '.webm'))
225 else:
226 self.curr_file = 'webm_video'
227 self.part_filename = self.name_builder.fill('{basename}.medium.webm')
2963b0a1 228
d77eb562 229 print self.curr_file, ": Done common_setup()"
230
347ef583
RE
231 def copy_original(self):
232 # If we didn't transcode, then we need to keep the original
16ef1164 233 raise NotImplementedError
347ef583 234
0a8c0c70
RE
235 def _keep_best(self):
236 """
237 If there is no original, keep the best file that we have
238 """
16ef1164 239 raise NotImplementedError
0a8c0c70 240
4c617543
RE
241 def _skip_processing(self, keyname, **kwargs):
242 file_metadata = self.entry.get_file_metadata(keyname)
243
244 if not file_metadata:
245 return False
246 skip = True
247
16ef1164 248 if 'webm' in keyname:
4c617543
RE
249 if kwargs.get('medium_size') != file_metadata.get('medium_size'):
250 skip = False
251 elif kwargs.get('vp8_quality') != file_metadata.get('vp8_quality'):
252 skip = False
253 elif kwargs.get('vp8_threads') != file_metadata.get('vp8_threads'):
254 skip = False
255 elif kwargs.get('vorbis_quality') != \
256 file_metadata.get('vorbis_quality'):
257 skip = False
258 elif keyname == 'thumb':
259 if kwargs.get('thumb_size') != file_metadata.get('thumb_size'):
260 skip = False
261
262 return skip
263
0a8c0c70 264
347ef583
RE
265 def transcode(self, medium_size=None, vp8_quality=None, vp8_threads=None,
266 vorbis_quality=None):
d77eb562 267 print self.curr_file, ": Enter transcode"
57d1cb3c 268 progress_callback = ProgressCallback(self.entry)
16ef1164 269 tmp_dst = os.path.join(self.workbench.dir, self.part_filename)
347ef583
RE
270
271 if not medium_size:
272 medium_size = (
273 mgg.global_config['media:medium']['max_width'],
274 mgg.global_config['media:medium']['max_height'])
275 if not vp8_quality:
276 vp8_quality = self.video_config['vp8_quality']
277 if not vp8_threads:
278 vp8_threads = self.video_config['vp8_threads']
279 if not vorbis_quality:
280 vorbis_quality = self.video_config['vorbis_quality']
281
4c617543
RE
282 file_metadata = {'medium_size': medium_size,
283 'vp8_threads': vp8_threads,
284 'vp8_quality': vp8_quality,
285 'vorbis_quality': vorbis_quality}
286
16ef1164 287 if self._skip_processing(self.curr_file, **file_metadata):
4c617543
RE
288 return
289
16ef1164 290 metadata = transcoders.discover(self.process_filename)
291 orig_dst_dimensions = (metadata.get_video_streams()[0].get_width(),
292 metadata.get_video_streams()[0].get_height())
bd50f8bf 293
347ef583
RE
294 # Figure out whether or not we need to transcode this video or
295 # if we can skip it
16ef1164 296 if skip_transcode(metadata, medium_size):
347ef583
RE
297 _log.debug('Skipping transcoding')
298
1cefccc7
RE
299 # If there is an original and transcoded, delete the transcoded
300 # since it must be of lower quality then the original
301 if self.entry.media_files.get('original') and \
16ef1164 302 self.entry.media_files.get(self.curr_file):
303 self.entry.media_files[self.curr_file].delete()
1cefccc7 304
347ef583 305 else:
d77eb562 306 print self.curr_file, ": ->1.1"
307 print type(medium_size)
308 medium_size = tuple(medium_size)
309 print type(medium_size)
310 print self.curr_file, ": ->1.2"
1cefccc7 311 self.transcoder.transcode(self.process_filename, tmp_dst,
347ef583
RE
312 vp8_quality=vp8_quality,
313 vp8_threads=vp8_threads,
314 vorbis_quality=vorbis_quality,
315 progress_callback=progress_callback,
9b1317e3 316 dimensions=tuple(medium_size))
d77eb562 317 print self.curr_file, ": ->2"
bd50f8bf 318 if self.transcoder.dst_data:
d77eb562 319 print self.curr_file, ": ->3"
bd50f8bf
BB
320 # Push transcoded video to public storage
321 _log.debug('Saving medium...')
d77eb562 322 store_public(self.entry, 'webm_video', tmp_dst, self.part_filename)
bd50f8bf
BB
323 _log.debug('Saved medium')
324
d77eb562 325 print self.curr_file, ": ->4"
16ef1164 326 # Is this the file_metadata that paroneayea was talking about?
327 self.entry.set_file_metadata(self.curr_file, **file_metadata)
bd50f8bf
BB
328
329 self.did_transcode = True
d77eb562 330 print self.curr_file, ": Done transcode()"
347ef583
RE
331
332 def generate_thumb(self, thumb_size=None):
d77eb562 333 print self.curr_file, ": Enter generate_thumb()"
347ef583
RE
334 # Temporary file for the video thumbnail (cleaned up with workbench)
335 tmp_thumb = os.path.join(self.workbench.dir,
336 self.name_builder.fill(
337 '{basename}.thumbnail.jpg'))
338
339 if not thumb_size:
79044027 340 thumb_size = (mgg.global_config['media:thumb']['max_width'],)
347ef583 341
4c617543
RE
342 if self._skip_processing('thumb', thumb_size=thumb_size):
343 return
344
0cdebda7 345 # We will only use the width so that the correct scale is kept
7e266d5a 346 transcoders.capture_thumb(
1cefccc7 347 self.process_filename,
347ef583 348 tmp_thumb,
0cdebda7 349 thumb_size[0])
347ef583 350
f4703ae9
CAW
351 # Checking if the thumbnail was correctly created. If it was not,
352 # then just give up.
353 if not os.path.exists (tmp_thumb):
354 return
355
347ef583
RE
356 # Push the thumbnail to public storage
357 _log.debug('Saving thumbnail...')
358 store_public(self.entry, 'thumb', tmp_thumb,
359 self.name_builder.fill('{basename}.thumbnail.jpg'))
360
4c617543 361 self.entry.set_file_metadata('thumb', thumb_size=thumb_size)
d77eb562 362 print self.curr_file, ": Done generate_thumb()"
347ef583 363
16ef1164 364 def store_orig_metadata(self):
d77eb562 365 print self.curr_file, ": 2"
16ef1164 366 # Extract metadata and keep a record of it
367 metadata = transcoders.discover(self.process_filename)
368
369 # metadata's stream info here is a DiscovererContainerInfo instance,
370 # it gets split into DiscovererAudioInfo and DiscovererVideoInfo;
371 # metadata itself has container-related data in tags, like video-codec
372 store_metadata(self.entry, metadata)
373
374
347ef583
RE
375class InitialProcessor(CommonVideoProcessor):
376 """
377 Initial processing steps for new video
378 """
379 name = "initial"
380 description = "Initial processing"
381
382 @classmethod
383 def media_is_eligible(cls, entry=None, state=None):
384 if not state:
385 state = entry.state
386 return state in (
387 "unprocessed", "failed")
388
389 @classmethod
390 def generate_parser(cls):
391 parser = argparse.ArgumentParser(
392 description=cls.description,
393 prog=cls.name)
394
395 parser.add_argument(
396 '--medium_size',
397 nargs=2,
398 metavar=('max_width', 'max_height'),
399 type=int)
400
401 parser.add_argument(
402 '--vp8_quality',
403 type=int,
404 help='Range 0..10')
405
406 parser.add_argument(
407 '--vp8_threads',
408 type=int,
409 help='0 means number_of_CPUs - 1')
410
411 parser.add_argument(
412 '--vorbis_quality',
413 type=float,
414 help='Range -0.1..1')
415
416 parser.add_argument(
417 '--thumb_size',
418 nargs=2,
419 metavar=('max_width', 'max_height'),
420 type=int)
421
422 return parser
423
424 @classmethod
425 def args_to_request(cls, args):
426 return request_from_args(
427 args, ['medium_size', 'vp8_quality', 'vp8_threads',
428 'vorbis_quality', 'thumb_size'])
429
430 def process(self, medium_size=None, vp8_threads=None, vp8_quality=None,
16ef1164 431 vorbis_quality=None, thumb_size=None, resolution=None):
432 self.common_setup(resolution=resolution)
433 self.store_orig_metadata()
347ef583
RE
434 self.transcode(medium_size=medium_size, vp8_quality=vp8_quality,
435 vp8_threads=vp8_threads, vorbis_quality=vorbis_quality)
436
347ef583
RE
437 self.generate_thumb(thumb_size=thumb_size)
438 self.delete_queue_file()
439
440
371bcc24
RE
441class Resizer(CommonVideoProcessor):
442 """
443 Video thumbnail resizing process steps for processed media
444 """
445 name = 'resize'
446 description = 'Resize thumbnail'
3225008f 447 thumb_size = 'thumb_size'
371bcc24
RE
448
449 @classmethod
450 def media_is_eligible(cls, entry=None, state=None):
451 if not state:
452 state = entry.state
453 return state in 'processed'
454
455 @classmethod
456 def generate_parser(cls):
457 parser = argparse.ArgumentParser(
57d1cb3c 458 description=cls.description,
371bcc24
RE
459 prog=cls.name)
460
461 parser.add_argument(
462 '--thumb_size',
463 nargs=2,
464 metavar=('max_width', 'max_height'),
465 type=int)
466
698c7a8b
RE
467 # Needed for gmg reprocess thumbs to work
468 parser.add_argument(
469 'file',
470 nargs='?',
8bb0df62
RE
471 default='thumb',
472 choices=['thumb'])
698c7a8b 473
57d1cb3c
RE
474 return parser
475
371bcc24
RE
476 @classmethod
477 def args_to_request(cls, args):
478 return request_from_args(
698c7a8b 479 args, ['thumb_size', 'file'])
371bcc24 480
698c7a8b 481 def process(self, thumb_size=None, file=None):
371bcc24
RE
482 self.common_setup()
483 self.generate_thumb(thumb_size=thumb_size)
484
485
57d1cb3c
RE
486class Transcoder(CommonVideoProcessor):
487 """
488 Transcoding processing steps for processed video
489 """
490 name = 'transcode'
491 description = 'Re-transcode video'
492
493 @classmethod
494 def media_is_eligible(cls, entry=None, state=None):
495 if not state:
496 state = entry.state
497 return state in 'processed'
498
499 @classmethod
500 def generate_parser(cls):
501 parser = argparse.ArgumentParser(
502 description=cls.description,
503 prog=cls.name)
504
505 parser.add_argument(
506 '--medium_size',
507 nargs=2,
508 metavar=('max_width', 'max_height'),
509 type=int)
510
511 parser.add_argument(
512 '--vp8_quality',
513 type=int,
514 help='Range 0..10')
515
516 parser.add_argument(
517 '--vp8_threads',
518 type=int,
519 help='0 means number_of_CPUs - 1')
520
521 parser.add_argument(
522 '--vorbis_quality',
523 type=float,
524 help='Range -0.1..1')
525
526 return parser
527
528 @classmethod
529 def args_to_request(cls, args):
530 return request_from_args(
531 args, ['medium_size', 'vp8_threads', 'vp8_quality',
532 'vorbis_quality'])
533
534 def process(self, medium_size=None, vp8_quality=None, vp8_threads=None,
535 vorbis_quality=None):
536 self.common_setup()
537 self.transcode(medium_size=medium_size, vp8_threads=vp8_threads,
538 vp8_quality=vp8_quality, vorbis_quality=vorbis_quality)
539
540
347ef583
RE
541class VideoProcessingManager(ProcessingManager):
542 def __init__(self):
1a2982d6 543 super(VideoProcessingManager, self).__init__()
347ef583 544 self.add_processor(InitialProcessor)
371bcc24 545 self.add_processor(Resizer)
57d1cb3c 546 self.add_processor(Transcoder)
81c59ef0 547
d77eb562 548 def workflow(self, entry_id, feed_url, reprocess_action, reprocess_info=None):
25ecdec9 549
d77eb562 550 reprocess_info = reprocess_info or {}
551 if 'vp8_quality' not in reprocess_info:
552 reprocess_info['vp8_quality'] = None
553 if 'vorbis_quality' not in reprocess_info:
554 reprocess_info['vorbis_quality'] = None
555 if 'vp8_threads' not in reprocess_info:
556 reprocess_info['vp8_threads'] = None
557 if 'thumb_size' not in reprocess_info:
558 reprocess_info['thumb_size'] = None
25ecdec9 559
d77eb562 560 transcoding_tasks = group([
561 main_task.signature(args=(entry_id, '480p', ACCEPTED_RESOLUTIONS['480p']),
bd011c94 562 kwargs=reprocess_info, queue='default',
563 priority=5, immutable=True),
d77eb562 564 ])
565
566 cleanup_task = processing_cleanup.signature(args=(entry_id,),
567 queue='default', immutable=True)
568
569 """
570 complimentary_task.signature(args=(entry_id, '360p', ACCEPTED_RESOLUTIONS['360p']),
bd011c94 571 kwargs=reprocess_info, queue='default',
572 priority=4, immutable=True),
d77eb562 573 complimentary_task.signature(args=(entry_id, '720p', ACCEPTED_RESOLUTIONS['720p']),
bd011c94 574 kwargs=reprocess_info, queue='default',
575 priority=3, immutable=True),
d77eb562 576 main_task.apply_async(args=(entry_id, '480p', ACCEPTED_RESOLUTIONS['480p']),
577 kwargs=reprocess_info, queue='default',
578 priority=5, immutable=True)
579 processing_cleanup.apply_async(args=(entry_id,), queue='default', immutable=True)
580 """
25ecdec9 581
bd011c94 582 chord(transcoding_tasks)(cleanup_task)
d77eb562 583
584 # main_task(entry_id, '480p', ACCEPTED_RESOLUTIONS['480p'], **reprocess_info)
585 # processing_cleanup(entry_id)