64cacb5f0a2ff879f1750c549c9f77d7fc2e23a3
[mediagoblin.git] / mediagoblin / media_types / video / processing.py
1 # GNU MediaGoblin -- federated, autonomous media hosting
2 # Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
3 #
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as published by
6 # the Free Software Foundation, either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16
17 import argparse
18 import os.path
19 import logging
20 import datetime
21 import celery
22
23 import six
24
25 from celery import group, chord
26 from mediagoblin import mg_globals as mgg
27 from mediagoblin.processing import (
28 FilenameBuilder, BaseProcessingFail,
29 ProgressCallback, MediaProcessor,
30 ProcessingManager, request_from_args,
31 get_process_filename, store_public,
32 copy_original)
33 from mediagoblin.processing.task import ProcessMedia
34 from mediagoblin.tools.translate import lazy_pass_to_ugettext as _
35 from mediagoblin.media_types import MissingComponents
36
37 from . import transcoders
38 from .util import skip_transcode, ACCEPTED_RESOLUTIONS
39
40 _log = logging.getLogger(__name__)
41 _log.setLevel(logging.DEBUG)
42
43 MEDIA_TYPE = 'mediagoblin.media_types.video'
44
45
46 class VideoTranscodingFail(BaseProcessingFail):
47 '''
48 Error raised if video transcoding fails
49 '''
50 general_message = _(u'Video transcoding failed')
51
52
53 def sniffer(media_file):
54 '''New style sniffer, used in two-steps check; requires to have .name'''
55 _log.info('Sniffing {0}'.format(MEDIA_TYPE))
56 try:
57 data = transcoders.discover(media_file.name)
58 except Exception as e:
59 # this is usually GLib.GError, but we don't really care which one
60 _log.warning(u'GStreamer: {0}'.format(six.text_type(e)))
61 raise MissingComponents(u'GStreamer: {0}'.format(six.text_type(e)))
62 _log.debug('Discovered: {0}'.format(data))
63
64 if not data.get_video_streams():
65 raise MissingComponents('No video streams found in this video')
66
67 if data.get_result() != 0: # it's 0 if success
68 try:
69 missing = data.get_misc().get_string('name')
70 _log.warning('GStreamer: missing {0}'.format(missing))
71 except AttributeError as e:
72 # AttributeError happens here on gstreamer >1.4, when get_misc
73 # returns None. There is a special function to get info about
74 # missing plugin. This info should be printed to logs for admin and
75 # showed to the user in a short and nice version
76 details = data.get_missing_elements_installer_details()
77 _log.warning('GStreamer: missing: {0}'.format(', '.join(details)))
78 missing = u', '.join([u'{0} ({1})'.format(*d.split('|')[3:])
79 for d in details])
80 raise MissingComponents(u'{0} is missing'.format(missing))
81
82 return MEDIA_TYPE
83
84
85 def sniff_handler(media_file, filename):
86 try:
87 return sniffer(media_file)
88 except:
89 _log.error('Could not discover {0}'.format(filename))
90 return None
91
92 def get_tags(stream_info):
93 'gets all tags and their values from stream info'
94 taglist = stream_info.get_tags()
95 if not taglist:
96 return {}
97 tags = []
98 taglist.foreach(
99 lambda list, tag: tags.append((tag, list.get_value_index(tag, 0))))
100 tags = dict(tags)
101
102 # date/datetime should be converted from GDate/GDateTime to strings
103 if 'date' in tags:
104 date = tags['date']
105 tags['date'] = "%s-%s-%s" % (
106 date.year, date.month, date.day)
107
108 if 'datetime' in tags:
109 # TODO: handle timezone info; gst.get_time_zone_offset +
110 # python's tzinfo should help
111 dt = tags['datetime']
112 tags['datetime'] = datetime.datetime(
113 dt.get_year(), dt.get_month(), dt.get_day(), dt.get_hour(),
114 dt.get_minute(), dt.get_second(),
115 dt.get_microsecond()).isoformat()
116 for k, v in tags.copy().items():
117 # types below are accepted by json; others must not present
118 if not isinstance(v, (dict, list, six.string_types, int, float, bool,
119 type(None))):
120 del tags[k]
121 return dict(tags)
122
123 def store_metadata(media_entry, metadata):
124 """
125 Store metadata from this video for this media entry.
126 """
127 stored_metadata = dict()
128 audio_info_list = metadata.get_audio_streams()
129 if audio_info_list:
130 stored_metadata['audio'] = []
131 for audio_info in audio_info_list:
132 stored_metadata['audio'].append(
133 {
134 'channels': audio_info.get_channels(),
135 'bitrate': audio_info.get_bitrate(),
136 'depth': audio_info.get_depth(),
137 'languange': audio_info.get_language(),
138 'sample_rate': audio_info.get_sample_rate(),
139 'tags': get_tags(audio_info)
140 })
141
142 video_info_list = metadata.get_video_streams()
143 if video_info_list:
144 stored_metadata['video'] = []
145 for video_info in video_info_list:
146 stored_metadata['video'].append(
147 {
148 'width': video_info.get_width(),
149 'height': video_info.get_height(),
150 'bitrate': video_info.get_bitrate(),
151 'depth': video_info.get_depth(),
152 'videorate': [video_info.get_framerate_num(),
153 video_info.get_framerate_denom()],
154 'tags': get_tags(video_info)
155 })
156
157 stored_metadata['common'] = {
158 'duration': metadata.get_duration(),
159 'tags': get_tags(metadata),
160 }
161 # Only save this field if there's something to save
162 if len(stored_metadata):
163 media_entry.media_data_init(orig_metadata=stored_metadata)
164
165 # =====================
166
167
168 @celery.task()
169 def main_task(resolution, medium_size, **process_info):
170 processor = CommonVideoProcessor(process_info['manager'], process_info['entry'])
171 processor.common_setup(resolution)
172 processor.transcode(medium_size=medium_size, vp8_quality=process_info['vp8_quality'],
173 vp8_threads=process_info['vp8_threads'], vorbis_quality=process_info['vorbis_quality'])
174 processor.generate_thumb(thumb_size=process_info['thumb_size'])
175 processor.store_orig_metadata()
176
177
178 @celery.task()
179 def complimentary_task(resolution, medium_size, **process_info):
180 processor = CommonVideoProcessor(process_info['manager'], process_info['entry'])
181 processor.common_setup(resolution)
182 processor.transcode(medium_size=medium_size, vp8_quality=process_info['vp8_quality'],
183 vp8_threads=process_info['vp8_threads'], vorbis_quality=process_info['vorbis_quality'])
184
185
186 @celery.task()
187 def processing_cleanup(entry, manager):
188 processor = CommonVideoProcessor(manager, entry)
189 processor.delete_queue_file()
190
191 # =====================
192
193
194 class CommonVideoProcessor(MediaProcessor):
195 """
196 Provides a base for various video processing steps
197 """
198 acceptable_files = ['original, best_quality', 'webm_144p', 'webm_360p',
199 'webm_480p', 'webm_720p', 'webm_1080p', 'webm_video']
200
201 def common_setup(self, resolution=None):
202 self.video_config = mgg \
203 .global_config['plugins'][MEDIA_TYPE]
204
205 # Pull down and set up the processing file
206 self.process_filename = get_process_filename(
207 self.entry, self.workbench, self.acceptable_files)
208 self.name_builder = FilenameBuilder(self.process_filename)
209
210 self.transcoder = transcoders.VideoTranscoder()
211 self.did_transcode = False
212
213 if resolution:
214 self.curr_file = 'webm_' + str(resolution)
215 self.part_filename = (self.name_builder.fill('{basename}.' +
216 str(resolution) + '.webm'))
217 else:
218 self.curr_file = 'webm_video'
219 self.part_filename = self.name_builder.fill('{basename}.medium.webm')
220
221 def copy_original(self):
222 # If we didn't transcode, then we need to keep the original
223 raise NotImplementedError
224
225 def _keep_best(self):
226 """
227 If there is no original, keep the best file that we have
228 """
229 raise NotImplementedError
230
231 def _skip_processing(self, keyname, **kwargs):
232 file_metadata = self.entry.get_file_metadata(keyname)
233
234 if not file_metadata:
235 return False
236 skip = True
237
238 if 'webm' in keyname:
239 if kwargs.get('medium_size') != file_metadata.get('medium_size'):
240 skip = False
241 elif kwargs.get('vp8_quality') != file_metadata.get('vp8_quality'):
242 skip = False
243 elif kwargs.get('vp8_threads') != file_metadata.get('vp8_threads'):
244 skip = False
245 elif kwargs.get('vorbis_quality') != \
246 file_metadata.get('vorbis_quality'):
247 skip = False
248 elif keyname == 'thumb':
249 if kwargs.get('thumb_size') != file_metadata.get('thumb_size'):
250 skip = False
251
252 return skip
253
254
255 def transcode(self, medium_size=None, vp8_quality=None, vp8_threads=None,
256 vorbis_quality=None):
257 progress_callback = ProgressCallback(self.entry)
258 tmp_dst = os.path.join(self.workbench.dir, self.part_filename)
259
260 if not medium_size:
261 medium_size = (
262 mgg.global_config['media:medium']['max_width'],
263 mgg.global_config['media:medium']['max_height'])
264 if not vp8_quality:
265 vp8_quality = self.video_config['vp8_quality']
266 if not vp8_threads:
267 vp8_threads = self.video_config['vp8_threads']
268 if not vorbis_quality:
269 vorbis_quality = self.video_config['vorbis_quality']
270
271 file_metadata = {'medium_size': medium_size,
272 'vp8_threads': vp8_threads,
273 'vp8_quality': vp8_quality,
274 'vorbis_quality': vorbis_quality}
275
276 if self._skip_processing(self.curr_file, **file_metadata):
277 return
278
279 metadata = transcoders.discover(self.process_filename)
280 orig_dst_dimensions = (metadata.get_video_streams()[0].get_width(),
281 metadata.get_video_streams()[0].get_height())
282
283 # Figure out whether or not we need to transcode this video or
284 # if we can skip it
285 if skip_transcode(metadata, medium_size):
286 _log.debug('Skipping transcoding')
287
288 # If there is an original and transcoded, delete the transcoded
289 # since it must be of lower quality then the original
290 if self.entry.media_files.get('original') and \
291 self.entry.media_files.get(self.curr_file):
292 self.entry.media_files[self.curr_file].delete()
293
294 else:
295 self.transcoder.transcode(self.process_filename, tmp_dst,
296 vp8_quality=vp8_quality,
297 vp8_threads=vp8_threads,
298 vorbis_quality=vorbis_quality,
299 progress_callback=progress_callback,
300 dimensions=tuple(medium_size))
301 if self.transcoder.dst_data:
302 # Push transcoded video to public storage
303 _log.debug('Saving medium...')
304 store_public(self.entry, 'webm_video', tmp_dst,
305 self.name_builder.fill('{basename}.medium.webm'))
306 _log.debug('Saved medium')
307
308 # Is this the file_metadata that paroneayea was talking about?
309 self.entry.set_file_metadata(self.curr_file, **file_metadata)
310
311 self.did_transcode = True
312
313 def generate_thumb(self, thumb_size=None):
314 # Temporary file for the video thumbnail (cleaned up with workbench)
315 tmp_thumb = os.path.join(self.workbench.dir,
316 self.name_builder.fill(
317 '{basename}.thumbnail.jpg'))
318
319 if not thumb_size:
320 thumb_size = (mgg.global_config['media:thumb']['max_width'],)
321
322 if self._skip_processing('thumb', thumb_size=thumb_size):
323 return
324
325 # We will only use the width so that the correct scale is kept
326 transcoders.capture_thumb(
327 self.process_filename,
328 tmp_thumb,
329 thumb_size[0])
330
331 # Checking if the thumbnail was correctly created. If it was not,
332 # then just give up.
333 if not os.path.exists (tmp_thumb):
334 return
335
336 # Push the thumbnail to public storage
337 _log.debug('Saving thumbnail...')
338 store_public(self.entry, 'thumb', tmp_thumb,
339 self.name_builder.fill('{basename}.thumbnail.jpg'))
340
341 self.entry.set_file_metadata('thumb', thumb_size=thumb_size)
342
343 def store_orig_metadata(self):
344
345 # Extract metadata and keep a record of it
346 metadata = transcoders.discover(self.process_filename)
347
348 # metadata's stream info here is a DiscovererContainerInfo instance,
349 # it gets split into DiscovererAudioInfo and DiscovererVideoInfo;
350 # metadata itself has container-related data in tags, like video-codec
351 store_metadata(self.entry, metadata)
352
353
354 class InitialProcessor(CommonVideoProcessor):
355 """
356 Initial processing steps for new video
357 """
358 name = "initial"
359 description = "Initial processing"
360
361 @classmethod
362 def media_is_eligible(cls, entry=None, state=None):
363 if not state:
364 state = entry.state
365 return state in (
366 "unprocessed", "failed")
367
368 @classmethod
369 def generate_parser(cls):
370 parser = argparse.ArgumentParser(
371 description=cls.description,
372 prog=cls.name)
373
374 parser.add_argument(
375 '--medium_size',
376 nargs=2,
377 metavar=('max_width', 'max_height'),
378 type=int)
379
380 parser.add_argument(
381 '--vp8_quality',
382 type=int,
383 help='Range 0..10')
384
385 parser.add_argument(
386 '--vp8_threads',
387 type=int,
388 help='0 means number_of_CPUs - 1')
389
390 parser.add_argument(
391 '--vorbis_quality',
392 type=float,
393 help='Range -0.1..1')
394
395 parser.add_argument(
396 '--thumb_size',
397 nargs=2,
398 metavar=('max_width', 'max_height'),
399 type=int)
400
401 return parser
402
403 @classmethod
404 def args_to_request(cls, args):
405 return request_from_args(
406 args, ['medium_size', 'vp8_quality', 'vp8_threads',
407 'vorbis_quality', 'thumb_size'])
408
409 def process(self, medium_size=None, vp8_threads=None, vp8_quality=None,
410 vorbis_quality=None, thumb_size=None, resolution=None):
411 self.common_setup(resolution=resolution)
412 self.store_orig_metadata()
413 self.transcode(medium_size=medium_size, vp8_quality=vp8_quality,
414 vp8_threads=vp8_threads, vorbis_quality=vorbis_quality)
415
416 self.generate_thumb(thumb_size=thumb_size)
417 self.delete_queue_file()
418
419
420 class Resizer(CommonVideoProcessor):
421 """
422 Video thumbnail resizing process steps for processed media
423 """
424 name = 'resize'
425 description = 'Resize thumbnail'
426 thumb_size = 'thumb_size'
427
428 @classmethod
429 def media_is_eligible(cls, entry=None, state=None):
430 if not state:
431 state = entry.state
432 return state in 'processed'
433
434 @classmethod
435 def generate_parser(cls):
436 parser = argparse.ArgumentParser(
437 description=cls.description,
438 prog=cls.name)
439
440 parser.add_argument(
441 '--thumb_size',
442 nargs=2,
443 metavar=('max_width', 'max_height'),
444 type=int)
445
446 # Needed for gmg reprocess thumbs to work
447 parser.add_argument(
448 'file',
449 nargs='?',
450 default='thumb',
451 choices=['thumb'])
452
453 return parser
454
455 @classmethod
456 def args_to_request(cls, args):
457 return request_from_args(
458 args, ['thumb_size', 'file'])
459
460 def process(self, thumb_size=None, file=None):
461 self.common_setup()
462 self.generate_thumb(thumb_size=thumb_size)
463
464
465 class Transcoder(CommonVideoProcessor):
466 """
467 Transcoding processing steps for processed video
468 """
469 name = 'transcode'
470 description = 'Re-transcode video'
471
472 @classmethod
473 def media_is_eligible(cls, entry=None, state=None):
474 if not state:
475 state = entry.state
476 return state in 'processed'
477
478 @classmethod
479 def generate_parser(cls):
480 parser = argparse.ArgumentParser(
481 description=cls.description,
482 prog=cls.name)
483
484 parser.add_argument(
485 '--medium_size',
486 nargs=2,
487 metavar=('max_width', 'max_height'),
488 type=int)
489
490 parser.add_argument(
491 '--vp8_quality',
492 type=int,
493 help='Range 0..10')
494
495 parser.add_argument(
496 '--vp8_threads',
497 type=int,
498 help='0 means number_of_CPUs - 1')
499
500 parser.add_argument(
501 '--vorbis_quality',
502 type=float,
503 help='Range -0.1..1')
504
505 return parser
506
507 @classmethod
508 def args_to_request(cls, args):
509 return request_from_args(
510 args, ['medium_size', 'vp8_threads', 'vp8_quality',
511 'vorbis_quality'])
512
513 def process(self, medium_size=None, vp8_quality=None, vp8_threads=None,
514 vorbis_quality=None):
515 self.common_setup()
516 self.transcode(medium_size=medium_size, vp8_threads=vp8_threads,
517 vp8_quality=vp8_quality, vorbis_quality=vorbis_quality)
518
519
520 class VideoProcessingManager(ProcessingManager):
521 def __init__(self):
522 super(VideoProcessingManager, self).__init__()
523 self.add_processor(InitialProcessor)
524 self.add_processor(Resizer)
525 self.add_processor(Transcoder)
526
527 def workflow(self, entry, manager, feed_url, reprocess_action,
528 reprocess_info=None):
529
530 reprocess_info['entry'] = entry
531 reprocess_info['manager'] = manager
532
533 transcoding_tasks = group(
534 main_task.signature(args=('480p', ACCEPTED_RESOLUTIONS['480p']),
535 kwargs=reprocess_info, queue='default',
536 priority=5, immutable=True),
537 complimentary_task.signature(args=('360p', ACCEPTED_RESOLUTIONS['360p']),
538 kwargs=reprocess_info, queue='default',
539 priority=4, immutable=True),
540 complimentary_task.signature(args=('720p', ACCEPTED_RESOLUTIONS['720p']),
541 kwargs=reprocess_info, queue='default',
542 priority=3, immutable=True),
543 )
544
545 cleanup_task = processing_cleanup.signature(args=(entry, manager),
546 queue='default', immutable=True)
547
548 chord(transcoding_tasks)(cleanup_task)