Simple hack to handle main workflow problem
[mediagoblin.git] / mediagoblin / media_types / video / processing.py
1 # GNU MediaGoblin -- federated, autonomous media hosting
2 # Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
3 #
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as published by
6 # the Free Software Foundation, either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16
17 import argparse
18 import os.path
19 import logging
20 import datetime
21 import celery
22
23 import six
24
25 from celery import group, chord
26 from mediagoblin import mg_globals as mgg
27 from mediagoblin.processing import (
28 FilenameBuilder, BaseProcessingFail,
29 ProgressCallback, MediaProcessor,
30 ProcessingManager, request_from_args,
31 get_process_filename, store_public,
32 copy_original, get_entry_and_processing_manager)
33 from mediagoblin.tools.translate import lazy_pass_to_ugettext as _
34 from mediagoblin.media_types import MissingComponents
35
36 from . import transcoders
37 from .util import skip_transcode, ACCEPTED_RESOLUTIONS
38
39 _log = logging.getLogger(__name__)
40 _log.setLevel(logging.DEBUG)
41
42 MEDIA_TYPE = 'mediagoblin.media_types.video'
43
44
45 class VideoTranscodingFail(BaseProcessingFail):
46 '''
47 Error raised if video transcoding fails
48 '''
49 general_message = _(u'Video transcoding failed')
50
51
52 def sniffer(media_file):
53 '''New style sniffer, used in two-steps check; requires to have .name'''
54 _log.info('Sniffing {0}'.format(MEDIA_TYPE))
55 try:
56 data = transcoders.discover(media_file.name)
57 except Exception as e:
58 # this is usually GLib.GError, but we don't really care which one
59 _log.warning(u'GStreamer: {0}'.format(six.text_type(e)))
60 raise MissingComponents(u'GStreamer: {0}'.format(six.text_type(e)))
61 _log.debug('Discovered: {0}'.format(data))
62
63 if not data.get_video_streams():
64 raise MissingComponents('No video streams found in this video')
65
66 if data.get_result() != 0: # it's 0 if success
67 try:
68 missing = data.get_misc().get_string('name')
69 _log.warning('GStreamer: missing {0}'.format(missing))
70 except AttributeError as e:
71 # AttributeError happens here on gstreamer >1.4, when get_misc
72 # returns None. There is a special function to get info about
73 # missing plugin. This info should be printed to logs for admin and
74 # showed to the user in a short and nice version
75 details = data.get_missing_elements_installer_details()
76 _log.warning('GStreamer: missing: {0}'.format(', '.join(details)))
77 missing = u', '.join([u'{0} ({1})'.format(*d.split('|')[3:])
78 for d in details])
79 raise MissingComponents(u'{0} is missing'.format(missing))
80
81 return MEDIA_TYPE
82
83
84 def sniff_handler(media_file, filename):
85 try:
86 return sniffer(media_file)
87 except:
88 _log.error('Could not discover {0}'.format(filename))
89 return None
90
91 def get_tags(stream_info):
92 'gets all tags and their values from stream info'
93 taglist = stream_info.get_tags()
94 if not taglist:
95 return {}
96 tags = []
97 taglist.foreach(
98 lambda list, tag: tags.append((tag, list.get_value_index(tag, 0))))
99 tags = dict(tags)
100
101 # date/datetime should be converted from GDate/GDateTime to strings
102 if 'date' in tags:
103 date = tags['date']
104 tags['date'] = "%s-%s-%s" % (
105 date.year, date.month, date.day)
106
107 if 'datetime' in tags:
108 # TODO: handle timezone info; gst.get_time_zone_offset +
109 # python's tzinfo should help
110 dt = tags['datetime']
111 tags['datetime'] = datetime.datetime(
112 dt.get_year(), dt.get_month(), dt.get_day(), dt.get_hour(),
113 dt.get_minute(), dt.get_second(),
114 dt.get_microsecond()).isoformat()
115 for k, v in tags.copy().items():
116 # types below are accepted by json; others must not present
117 if not isinstance(v, (dict, list, six.string_types, int, float, bool,
118 type(None))):
119 del tags[k]
120 return dict(tags)
121
122 def store_metadata(media_entry, metadata):
123 """
124 Store metadata from this video for this media entry.
125 """
126 stored_metadata = dict()
127 audio_info_list = metadata.get_audio_streams()
128 if audio_info_list:
129 stored_metadata['audio'] = []
130 for audio_info in audio_info_list:
131 stored_metadata['audio'].append(
132 {
133 'channels': audio_info.get_channels(),
134 'bitrate': audio_info.get_bitrate(),
135 'depth': audio_info.get_depth(),
136 'languange': audio_info.get_language(),
137 'sample_rate': audio_info.get_sample_rate(),
138 'tags': get_tags(audio_info)
139 })
140
141 video_info_list = metadata.get_video_streams()
142 if video_info_list:
143 stored_metadata['video'] = []
144 for video_info in video_info_list:
145 stored_metadata['video'].append(
146 {
147 'width': video_info.get_width(),
148 'height': video_info.get_height(),
149 'bitrate': video_info.get_bitrate(),
150 'depth': video_info.get_depth(),
151 'videorate': [video_info.get_framerate_num(),
152 video_info.get_framerate_denom()],
153 'tags': get_tags(video_info)
154 })
155
156 stored_metadata['common'] = {
157 'duration': metadata.get_duration(),
158 'tags': get_tags(metadata),
159 }
160 # Only save this field if there's something to save
161 if len(stored_metadata):
162 media_entry.media_data_init(orig_metadata=stored_metadata)
163
164 # =====================
165
166
167 @celery.task()
168 def main_task(entry_id, resolution, medium_size, **process_info):
169 entry, manager = get_entry_and_processing_manager(entry_id)
170 print "\nEntered main_task\n"
171 with CommonVideoProcessor(manager, entry) as processor:
172 processor.common_setup(resolution)
173 processor.transcode(medium_size=tuple(medium_size), vp8_quality=process_info['vp8_quality'],
174 vp8_threads=process_info['vp8_threads'], vorbis_quality=process_info['vorbis_quality'])
175 processor.generate_thumb(thumb_size=process_info['thumb_size'])
176 processor.store_orig_metadata()
177 print "\nExited main_task\n"
178
179
180 @celery.task()
181 def complimentary_task(entry_id, resolution, medium_size, **process_info):
182 entry, manager = get_entry_and_processing_manager(entry_id)
183 print "\nEntered complimentary_task\n"
184 with CommonVideoProcessor(manager, entry) as processor:
185 processor.common_setup(resolution)
186 processor.transcode(medium_size=tuple(medium_size), vp8_quality=process_info['vp8_quality'],
187 vp8_threads=process_info['vp8_threads'], vorbis_quality=process_info['vorbis_quality'])
188 print "\nExited complimentary_task\n"
189
190
191 @celery.task()
192 def processing_cleanup(entry_id):
193 print "\nEntered processing_cleanup()\n"
194 entry, manager = get_entry_and_processing_manager(entry_id)
195 with CommonVideoProcessor(manager, entry) as processor:
196 processor.delete_queue_file()
197 print "\nDeleted queue_file\n"
198
199 # =====================
200
201
202 class CommonVideoProcessor(MediaProcessor):
203 """
204 Provides a base for various video processing steps
205 """
206 acceptable_files = ['original, best_quality', 'webm_144p', 'webm_360p',
207 'webm_480p', 'webm_720p', 'webm_1080p', 'webm_video']
208
209 def common_setup(self, resolution=None):
210 self.video_config = mgg \
211 .global_config['plugins'][MEDIA_TYPE]
212
213 # Pull down and set up the processing file
214 self.process_filename = get_process_filename(
215 self.entry, self.workbench, self.acceptable_files)
216 self.name_builder = FilenameBuilder(self.process_filename)
217
218 self.transcoder = transcoders.VideoTranscoder()
219 self.did_transcode = False
220
221 if resolution:
222 self.curr_file = 'webm_' + str(resolution)
223 self.part_filename = (self.name_builder.fill('{basename}.' +
224 str(resolution) + '.webm'))
225 else:
226 self.curr_file = 'webm_video'
227 self.part_filename = self.name_builder.fill('{basename}.medium.webm')
228
229 print self.curr_file, ": Done common_setup()"
230
231 def copy_original(self):
232 # If we didn't transcode, then we need to keep the original
233 raise NotImplementedError
234
235 def _keep_best(self):
236 """
237 If there is no original, keep the best file that we have
238 """
239 raise NotImplementedError
240
241 def _skip_processing(self, keyname, **kwargs):
242 file_metadata = self.entry.get_file_metadata(keyname)
243
244 if not file_metadata:
245 return False
246 skip = True
247
248 if 'webm' in keyname:
249 if kwargs.get('medium_size') != file_metadata.get('medium_size'):
250 skip = False
251 elif kwargs.get('vp8_quality') != file_metadata.get('vp8_quality'):
252 skip = False
253 elif kwargs.get('vp8_threads') != file_metadata.get('vp8_threads'):
254 skip = False
255 elif kwargs.get('vorbis_quality') != \
256 file_metadata.get('vorbis_quality'):
257 skip = False
258 elif keyname == 'thumb':
259 if kwargs.get('thumb_size') != file_metadata.get('thumb_size'):
260 skip = False
261
262 return skip
263
264
265 def transcode(self, medium_size=None, vp8_quality=None, vp8_threads=None,
266 vorbis_quality=None):
267 print self.curr_file, ": Enter transcode"
268 progress_callback = ProgressCallback(self.entry)
269 tmp_dst = os.path.join(self.workbench.dir, self.part_filename)
270
271 if not medium_size:
272 medium_size = (
273 mgg.global_config['media:medium']['max_width'],
274 mgg.global_config['media:medium']['max_height'])
275 if not vp8_quality:
276 vp8_quality = self.video_config['vp8_quality']
277 if not vp8_threads:
278 vp8_threads = self.video_config['vp8_threads']
279 if not vorbis_quality:
280 vorbis_quality = self.video_config['vorbis_quality']
281
282 file_metadata = {'medium_size': medium_size,
283 'vp8_threads': vp8_threads,
284 'vp8_quality': vp8_quality,
285 'vorbis_quality': vorbis_quality}
286
287 if self._skip_processing(self.curr_file, **file_metadata):
288 return
289
290 metadata = transcoders.discover(self.process_filename)
291 orig_dst_dimensions = (metadata.get_video_streams()[0].get_width(),
292 metadata.get_video_streams()[0].get_height())
293
294 # Figure out whether or not we need to transcode this video or
295 # if we can skip it
296 if skip_transcode(metadata, medium_size):
297 _log.debug('Skipping transcoding')
298
299 # If there is an original and transcoded, delete the transcoded
300 # since it must be of lower quality then the original
301 if self.entry.media_files.get('original') and \
302 self.entry.media_files.get(self.curr_file):
303 self.entry.media_files[self.curr_file].delete()
304
305 else:
306 print self.curr_file, ": ->1"
307 self.transcoder.transcode(self.process_filename, tmp_dst,
308 vp8_quality=vp8_quality,
309 vp8_threads=vp8_threads,
310 vorbis_quality=vorbis_quality,
311 progress_callback=progress_callback,
312 dimensions=tuple(medium_size))
313 print self.curr_file, ": ->2"
314 if self.transcoder.dst_data:
315 print self.curr_file, ": ->3"
316 # Push transcoded video to public storage
317 _log.debug('Saving medium...')
318 store_public(self.entry, self.curr_file, tmp_dst, self.part_filename)
319 _log.debug('Saved medium')
320
321 print self.curr_file, ": ->4"
322 # Is this the file_metadata that paroneayea was talking about?
323 self.entry.set_file_metadata(self.curr_file, **file_metadata)
324
325 self.did_transcode = True
326 print self.curr_file, ": Done transcode()"
327
328 def generate_thumb(self, thumb_size=None):
329 print self.curr_file, ": Enter generate_thumb()"
330 # Temporary file for the video thumbnail (cleaned up with workbench)
331 tmp_thumb = os.path.join(self.workbench.dir,
332 self.name_builder.fill(
333 '{basename}.thumbnail.jpg'))
334
335 if not thumb_size:
336 thumb_size = (mgg.global_config['media:thumb']['max_width'],)
337
338 if self._skip_processing('thumb', thumb_size=thumb_size):
339 return
340
341 # We will only use the width so that the correct scale is kept
342 transcoders.capture_thumb(
343 self.process_filename,
344 tmp_thumb,
345 thumb_size[0])
346
347 # Checking if the thumbnail was correctly created. If it was not,
348 # then just give up.
349 if not os.path.exists (tmp_thumb):
350 return
351
352 # Push the thumbnail to public storage
353 _log.debug('Saving thumbnail...')
354 store_public(self.entry, 'thumb', tmp_thumb,
355 self.name_builder.fill('{basename}.thumbnail.jpg'))
356
357 self.entry.set_file_metadata('thumb', thumb_size=thumb_size)
358 print self.curr_file, ": Done generate_thumb()"
359
360 def store_orig_metadata(self):
361 print self.curr_file, ": Enter store_orig_metadata()"
362 # Extract metadata and keep a record of it
363 metadata = transcoders.discover(self.process_filename)
364
365 # metadata's stream info here is a DiscovererContainerInfo instance,
366 # it gets split into DiscovererAudioInfo and DiscovererVideoInfo;
367 # metadata itself has container-related data in tags, like video-codec
368 store_metadata(self.entry, metadata)
369 print self.curr_file, ": Done store_orig_metadata()"
370
371
372 class InitialProcessor(CommonVideoProcessor):
373 """
374 Initial processing steps for new video
375 """
376 name = "initial"
377 description = "Initial processing"
378
379 @classmethod
380 def media_is_eligible(cls, entry=None, state=None):
381 if not state:
382 state = entry.state
383 return state in (
384 "unprocessed", "failed")
385
386 @classmethod
387 def generate_parser(cls):
388 parser = argparse.ArgumentParser(
389 description=cls.description,
390 prog=cls.name)
391
392 parser.add_argument(
393 '--medium_size',
394 nargs=2,
395 metavar=('max_width', 'max_height'),
396 type=int)
397
398 parser.add_argument(
399 '--vp8_quality',
400 type=int,
401 help='Range 0..10')
402
403 parser.add_argument(
404 '--vp8_threads',
405 type=int,
406 help='0 means number_of_CPUs - 1')
407
408 parser.add_argument(
409 '--vorbis_quality',
410 type=float,
411 help='Range -0.1..1')
412
413 parser.add_argument(
414 '--thumb_size',
415 nargs=2,
416 metavar=('max_width', 'max_height'),
417 type=int)
418
419 return parser
420
421 @classmethod
422 def args_to_request(cls, args):
423 return request_from_args(
424 args, ['medium_size', 'vp8_quality', 'vp8_threads',
425 'vorbis_quality', 'thumb_size'])
426
427 def process(self, medium_size=None, vp8_threads=None, vp8_quality=None,
428 vorbis_quality=None, thumb_size=None, resolution=None):
429 self.common_setup(resolution=resolution)
430 self.store_orig_metadata()
431 self.transcode(medium_size=medium_size, vp8_quality=vp8_quality,
432 vp8_threads=vp8_threads, vorbis_quality=vorbis_quality)
433
434 self.generate_thumb(thumb_size=thumb_size)
435 self.delete_queue_file()
436
437
438 class Resizer(CommonVideoProcessor):
439 """
440 Video thumbnail resizing process steps for processed media
441 """
442 name = 'resize'
443 description = 'Resize thumbnail'
444 thumb_size = 'thumb_size'
445
446 @classmethod
447 def media_is_eligible(cls, entry=None, state=None):
448 if not state:
449 state = entry.state
450 return state in 'processed'
451
452 @classmethod
453 def generate_parser(cls):
454 parser = argparse.ArgumentParser(
455 description=cls.description,
456 prog=cls.name)
457
458 parser.add_argument(
459 '--thumb_size',
460 nargs=2,
461 metavar=('max_width', 'max_height'),
462 type=int)
463
464 # Needed for gmg reprocess thumbs to work
465 parser.add_argument(
466 'file',
467 nargs='?',
468 default='thumb',
469 choices=['thumb'])
470
471 return parser
472
473 @classmethod
474 def args_to_request(cls, args):
475 return request_from_args(
476 args, ['thumb_size', 'file'])
477
478 def process(self, thumb_size=None, file=None):
479 self.common_setup()
480 self.generate_thumb(thumb_size=thumb_size)
481
482
483 class Transcoder(CommonVideoProcessor):
484 """
485 Transcoding processing steps for processed video
486 """
487 name = 'transcode'
488 description = 'Re-transcode video'
489
490 @classmethod
491 def media_is_eligible(cls, entry=None, state=None):
492 if not state:
493 state = entry.state
494 return state in 'processed'
495
496 @classmethod
497 def generate_parser(cls):
498 parser = argparse.ArgumentParser(
499 description=cls.description,
500 prog=cls.name)
501
502 parser.add_argument(
503 '--medium_size',
504 nargs=2,
505 metavar=('max_width', 'max_height'),
506 type=int)
507
508 parser.add_argument(
509 '--vp8_quality',
510 type=int,
511 help='Range 0..10')
512
513 parser.add_argument(
514 '--vp8_threads',
515 type=int,
516 help='0 means number_of_CPUs - 1')
517
518 parser.add_argument(
519 '--vorbis_quality',
520 type=float,
521 help='Range -0.1..1')
522
523 return parser
524
525 @classmethod
526 def args_to_request(cls, args):
527 return request_from_args(
528 args, ['medium_size', 'vp8_threads', 'vp8_quality',
529 'vorbis_quality'])
530
531 def process(self, medium_size=None, vp8_quality=None, vp8_threads=None,
532 vorbis_quality=None):
533 self.common_setup()
534 self.transcode(medium_size=medium_size, vp8_threads=vp8_threads,
535 vp8_quality=vp8_quality, vorbis_quality=vorbis_quality)
536
537
538 class VideoProcessingManager(ProcessingManager):
539 def __init__(self):
540 super(VideoProcessingManager, self).__init__()
541 self.add_processor(InitialProcessor)
542 self.add_processor(Resizer)
543 self.add_processor(Transcoder)
544
545 def workflow(self, entry, feed_url, reprocess_action, reprocess_info=None):
546
547 reprocess_info = reprocess_info or {}
548 if 'vp8_quality' not in reprocess_info:
549 reprocess_info['vp8_quality'] = None
550 if 'vorbis_quality' not in reprocess_info:
551 reprocess_info['vorbis_quality'] = None
552 if 'vp8_threads' not in reprocess_info:
553 reprocess_info['vp8_threads'] = None
554 if 'thumb_size' not in reprocess_info:
555 reprocess_info['thumb_size'] = None
556
557 transcoding_tasks = group([
558 main_task.signature(args=(entry.id, '480p', ACCEPTED_RESOLUTIONS['480p']),
559 kwargs=reprocess_info, queue='default',
560 priority=5, immutable=True),
561 complimentary_task.signature(args=(entry.id, '360p', ACCEPTED_RESOLUTIONS['360p']),
562 kwargs=reprocess_info, queue='default',
563 priority=4, immutable=True),
564 complimentary_task.signature(args=(entry.id, '720p', ACCEPTED_RESOLUTIONS['720p']),
565 kwargs=reprocess_info, queue='default',
566 priority=3, immutable=True),
567 ])
568
569 cleanup_task = processing_cleanup.signature(args=(entry.id,),
570 queue='default', immutable=True)
571
572 chord(transcoding_tasks)(cleanup_task)
573
574 # Not sure what to return since we are scheduling the task here itself
575 return 1