Add main_task, complimentary_task and cleanup
[mediagoblin.git] / mediagoblin / media_types / video / processing.py
1 # GNU MediaGoblin -- federated, autonomous media hosting
2 # Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
3 #
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as published by
6 # the Free Software Foundation, either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16
17 import argparse
18 import os.path
19 import logging
20 import datetime
21
22 import six
23
24 from mediagoblin import mg_globals as mgg
25 from mediagoblin.processing import (
26 FilenameBuilder, BaseProcessingFail,
27 ProgressCallback, MediaProcessor,
28 ProcessingManager, request_from_args,
29 get_process_filename, store_public,
30 copy_original)
31 from mediagoblin.processing.task import ProcessMedia
32 from mediagoblin.tools.translate import lazy_pass_to_ugettext as _
33 from mediagoblin.media_types import MissingComponents
34
35 from . import transcoders
36 from .util import skip_transcode
37
38 _log = logging.getLogger(__name__)
39 _log.setLevel(logging.DEBUG)
40
41 MEDIA_TYPE = 'mediagoblin.media_types.video'
42
43
44 class VideoTranscodingFail(BaseProcessingFail):
45 '''
46 Error raised if video transcoding fails
47 '''
48 general_message = _(u'Video transcoding failed')
49
50
51 def sniffer(media_file):
52 '''New style sniffer, used in two-steps check; requires to have .name'''
53 _log.info('Sniffing {0}'.format(MEDIA_TYPE))
54 try:
55 data = transcoders.discover(media_file.name)
56 except Exception as e:
57 # this is usually GLib.GError, but we don't really care which one
58 _log.warning(u'GStreamer: {0}'.format(six.text_type(e)))
59 raise MissingComponents(u'GStreamer: {0}'.format(six.text_type(e)))
60 _log.debug('Discovered: {0}'.format(data))
61
62 if not data.get_video_streams():
63 raise MissingComponents('No video streams found in this video')
64
65 if data.get_result() != 0: # it's 0 if success
66 try:
67 missing = data.get_misc().get_string('name')
68 _log.warning('GStreamer: missing {0}'.format(missing))
69 except AttributeError as e:
70 # AttributeError happens here on gstreamer >1.4, when get_misc
71 # returns None. There is a special function to get info about
72 # missing plugin. This info should be printed to logs for admin and
73 # showed to the user in a short and nice version
74 details = data.get_missing_elements_installer_details()
75 _log.warning('GStreamer: missing: {0}'.format(', '.join(details)))
76 missing = u', '.join([u'{0} ({1})'.format(*d.split('|')[3:])
77 for d in details])
78 raise MissingComponents(u'{0} is missing'.format(missing))
79
80 return MEDIA_TYPE
81
82
83 def sniff_handler(media_file, filename):
84 try:
85 return sniffer(media_file)
86 except:
87 _log.error('Could not discover {0}'.format(filename))
88 return None
89
90 def get_tags(stream_info):
91 'gets all tags and their values from stream info'
92 taglist = stream_info.get_tags()
93 if not taglist:
94 return {}
95 tags = []
96 taglist.foreach(
97 lambda list, tag: tags.append((tag, list.get_value_index(tag, 0))))
98 tags = dict(tags)
99
100 # date/datetime should be converted from GDate/GDateTime to strings
101 if 'date' in tags:
102 date = tags['date']
103 tags['date'] = "%s-%s-%s" % (
104 date.year, date.month, date.day)
105
106 if 'datetime' in tags:
107 # TODO: handle timezone info; gst.get_time_zone_offset +
108 # python's tzinfo should help
109 dt = tags['datetime']
110 tags['datetime'] = datetime.datetime(
111 dt.get_year(), dt.get_month(), dt.get_day(), dt.get_hour(),
112 dt.get_minute(), dt.get_second(),
113 dt.get_microsecond()).isoformat()
114 for k, v in tags.copy().items():
115 # types below are accepted by json; others must not present
116 if not isinstance(v, (dict, list, six.string_types, int, float, bool,
117 type(None))):
118 del tags[k]
119 return dict(tags)
120
121 def store_metadata(media_entry, metadata):
122 """
123 Store metadata from this video for this media entry.
124 """
125 stored_metadata = dict()
126 audio_info_list = metadata.get_audio_streams()
127 if audio_info_list:
128 stored_metadata['audio'] = []
129 for audio_info in audio_info_list:
130 stored_metadata['audio'].append(
131 {
132 'channels': audio_info.get_channels(),
133 'bitrate': audio_info.get_bitrate(),
134 'depth': audio_info.get_depth(),
135 'languange': audio_info.get_language(),
136 'sample_rate': audio_info.get_sample_rate(),
137 'tags': get_tags(audio_info)
138 })
139
140 video_info_list = metadata.get_video_streams()
141 if video_info_list:
142 stored_metadata['video'] = []
143 for video_info in video_info_list:
144 stored_metadata['video'].append(
145 {
146 'width': video_info.get_width(),
147 'height': video_info.get_height(),
148 'bitrate': video_info.get_bitrate(),
149 'depth': video_info.get_depth(),
150 'videorate': [video_info.get_framerate_num(),
151 video_info.get_framerate_denom()],
152 'tags': get_tags(video_info)
153 })
154
155 stored_metadata['common'] = {
156 'duration': metadata.get_duration(),
157 'tags': get_tags(metadata),
158 }
159 # Only save this field if there's something to save
160 if len(stored_metadata):
161 media_entry.media_data_init(orig_metadata=stored_metadata)
162
163 # =====================
164
165
166 def main_task(**process_info):
167 processor = CommonVideoProcessor(process_info['manager'], process_info['entry'])
168 processor.common_setup(process_info['resolution'])
169 processor.transcode(medium_size=process_info['medium_size'], vp8_quality=process_info['vp8_quality'],
170 vp8_threads=process_info['vp8_threads'], vorbis_quality=process_info['vorbis_quality'])
171 processor.generate_thumb(thumb_size=process_info['thumb_size'])
172 processor.store_orig_metadata()
173
174
175 def complimentary_task(**process_info):
176 processor = CommonVideoProcessor(process_info['manager'], process_info['entry'])
177 processor.common_setup(process_info['resolution'])
178 processor.transcode(medium_size=process_info['medium_size'], vp8_quality=process_info['vp8_quality'],
179 vp8_threads=process_info['vp8_threads'], vorbis_quality=process_info['vorbis_quality'])
180
181
182 def processing_cleanup(**process_info):
183 processor = CommonVideoProcessor(process_info['manager'], process_info['entry'])
184 processor.delete_queue_file()
185
186 # =====================
187
188
189 class CommonVideoProcessor(MediaProcessor):
190 """
191 Provides a base for various video processing steps
192 """
193 acceptable_files = ['original, best_quality', 'webm_144p', 'webm_360p',
194 'webm_480p', 'webm_720p', 'webm_1080p', 'webm_video']
195
196 def common_setup(self, resolution=None):
197 self.video_config = mgg \
198 .global_config['plugins'][MEDIA_TYPE]
199
200 # Pull down and set up the processing file
201 self.process_filename = get_process_filename(
202 self.entry, self.workbench, self.acceptable_files)
203 self.name_builder = FilenameBuilder(self.process_filename)
204
205 self.transcoder = transcoders.VideoTranscoder()
206 self.did_transcode = False
207
208 if resolution:
209 self.curr_file = 'webm_' + str(resolution)
210 self.part_filename = (self.name_builder.fill('{basename}.' +
211 str(resolution) + '.webm'))
212 else:
213 self.curr_file = 'webm_video'
214 self.part_filename = self.name_builder.fill('{basename}.medium.webm')
215
216 def copy_original(self):
217 # If we didn't transcode, then we need to keep the original
218 raise NotImplementedError
219
220 def _keep_best(self):
221 """
222 If there is no original, keep the best file that we have
223 """
224 raise NotImplementedError
225
226 def _skip_processing(self, keyname, **kwargs):
227 file_metadata = self.entry.get_file_metadata(keyname)
228
229 if not file_metadata:
230 return False
231 skip = True
232
233 if 'webm' in keyname:
234 if kwargs.get('medium_size') != file_metadata.get('medium_size'):
235 skip = False
236 elif kwargs.get('vp8_quality') != file_metadata.get('vp8_quality'):
237 skip = False
238 elif kwargs.get('vp8_threads') != file_metadata.get('vp8_threads'):
239 skip = False
240 elif kwargs.get('vorbis_quality') != \
241 file_metadata.get('vorbis_quality'):
242 skip = False
243 elif keyname == 'thumb':
244 if kwargs.get('thumb_size') != file_metadata.get('thumb_size'):
245 skip = False
246
247 return skip
248
249
250 def transcode(self, medium_size=None, vp8_quality=None, vp8_threads=None,
251 vorbis_quality=None):
252 progress_callback = ProgressCallback(self.entry)
253 tmp_dst = os.path.join(self.workbench.dir, self.part_filename)
254
255 if not medium_size:
256 medium_size = (
257 mgg.global_config['media:medium']['max_width'],
258 mgg.global_config['media:medium']['max_height'])
259 if not vp8_quality:
260 vp8_quality = self.video_config['vp8_quality']
261 if not vp8_threads:
262 vp8_threads = self.video_config['vp8_threads']
263 if not vorbis_quality:
264 vorbis_quality = self.video_config['vorbis_quality']
265
266 file_metadata = {'medium_size': medium_size,
267 'vp8_threads': vp8_threads,
268 'vp8_quality': vp8_quality,
269 'vorbis_quality': vorbis_quality}
270
271 if self._skip_processing(self.curr_file, **file_metadata):
272 return
273
274 metadata = transcoders.discover(self.process_filename)
275 orig_dst_dimensions = (metadata.get_video_streams()[0].get_width(),
276 metadata.get_video_streams()[0].get_height())
277
278 # Figure out whether or not we need to transcode this video or
279 # if we can skip it
280 if skip_transcode(metadata, medium_size):
281 _log.debug('Skipping transcoding')
282
283 # If there is an original and transcoded, delete the transcoded
284 # since it must be of lower quality then the original
285 if self.entry.media_files.get('original') and \
286 self.entry.media_files.get(self.curr_file):
287 self.entry.media_files[self.curr_file].delete()
288
289 else:
290 self.transcoder.transcode(self.process_filename, tmp_dst,
291 vp8_quality=vp8_quality,
292 vp8_threads=vp8_threads,
293 vorbis_quality=vorbis_quality,
294 progress_callback=progress_callback,
295 dimensions=tuple(medium_size))
296 if self.transcoder.dst_data:
297 # Push transcoded video to public storage
298 _log.debug('Saving medium...')
299 store_public(self.entry, 'webm_video', tmp_dst,
300 self.name_builder.fill('{basename}.medium.webm'))
301 _log.debug('Saved medium')
302
303 # Is this the file_metadata that paroneayea was talking about?
304 self.entry.set_file_metadata(self.curr_file, **file_metadata)
305
306 self.did_transcode = True
307
308 def generate_thumb(self, thumb_size=None):
309 # Temporary file for the video thumbnail (cleaned up with workbench)
310 tmp_thumb = os.path.join(self.workbench.dir,
311 self.name_builder.fill(
312 '{basename}.thumbnail.jpg'))
313
314 if not thumb_size:
315 thumb_size = (mgg.global_config['media:thumb']['max_width'],)
316
317 if self._skip_processing('thumb', thumb_size=thumb_size):
318 return
319
320 # We will only use the width so that the correct scale is kept
321 transcoders.capture_thumb(
322 self.process_filename,
323 tmp_thumb,
324 thumb_size[0])
325
326 # Checking if the thumbnail was correctly created. If it was not,
327 # then just give up.
328 if not os.path.exists (tmp_thumb):
329 return
330
331 # Push the thumbnail to public storage
332 _log.debug('Saving thumbnail...')
333 store_public(self.entry, 'thumb', tmp_thumb,
334 self.name_builder.fill('{basename}.thumbnail.jpg'))
335
336 self.entry.set_file_metadata('thumb', thumb_size=thumb_size)
337
338 def store_orig_metadata(self):
339
340 # Extract metadata and keep a record of it
341 metadata = transcoders.discover(self.process_filename)
342
343 # metadata's stream info here is a DiscovererContainerInfo instance,
344 # it gets split into DiscovererAudioInfo and DiscovererVideoInfo;
345 # metadata itself has container-related data in tags, like video-codec
346 store_metadata(self.entry, metadata)
347
348
349 class InitialProcessor(CommonVideoProcessor):
350 """
351 Initial processing steps for new video
352 """
353 name = "initial"
354 description = "Initial processing"
355
356 @classmethod
357 def media_is_eligible(cls, entry=None, state=None):
358 if not state:
359 state = entry.state
360 return state in (
361 "unprocessed", "failed")
362
363 @classmethod
364 def generate_parser(cls):
365 parser = argparse.ArgumentParser(
366 description=cls.description,
367 prog=cls.name)
368
369 parser.add_argument(
370 '--medium_size',
371 nargs=2,
372 metavar=('max_width', 'max_height'),
373 type=int)
374
375 parser.add_argument(
376 '--vp8_quality',
377 type=int,
378 help='Range 0..10')
379
380 parser.add_argument(
381 '--vp8_threads',
382 type=int,
383 help='0 means number_of_CPUs - 1')
384
385 parser.add_argument(
386 '--vorbis_quality',
387 type=float,
388 help='Range -0.1..1')
389
390 parser.add_argument(
391 '--thumb_size',
392 nargs=2,
393 metavar=('max_width', 'max_height'),
394 type=int)
395
396 return parser
397
398 @classmethod
399 def args_to_request(cls, args):
400 return request_from_args(
401 args, ['medium_size', 'vp8_quality', 'vp8_threads',
402 'vorbis_quality', 'thumb_size'])
403
404 def process(self, medium_size=None, vp8_threads=None, vp8_quality=None,
405 vorbis_quality=None, thumb_size=None, resolution=None):
406 self.common_setup(resolution=resolution)
407 self.store_orig_metadata()
408 self.transcode(medium_size=medium_size, vp8_quality=vp8_quality,
409 vp8_threads=vp8_threads, vorbis_quality=vorbis_quality)
410
411 self.copy_original()
412 self.generate_thumb(thumb_size=thumb_size)
413 self.delete_queue_file()
414
415
416 class Resizer(CommonVideoProcessor):
417 """
418 Video thumbnail resizing process steps for processed media
419 """
420 name = 'resize'
421 description = 'Resize thumbnail'
422 thumb_size = 'thumb_size'
423
424 @classmethod
425 def media_is_eligible(cls, entry=None, state=None):
426 if not state:
427 state = entry.state
428 return state in 'processed'
429
430 @classmethod
431 def generate_parser(cls):
432 parser = argparse.ArgumentParser(
433 description=cls.description,
434 prog=cls.name)
435
436 parser.add_argument(
437 '--thumb_size',
438 nargs=2,
439 metavar=('max_width', 'max_height'),
440 type=int)
441
442 # Needed for gmg reprocess thumbs to work
443 parser.add_argument(
444 'file',
445 nargs='?',
446 default='thumb',
447 choices=['thumb'])
448
449 return parser
450
451 @classmethod
452 def args_to_request(cls, args):
453 return request_from_args(
454 args, ['thumb_size', 'file'])
455
456 def process(self, thumb_size=None, file=None):
457 self.common_setup()
458 self.generate_thumb(thumb_size=thumb_size)
459
460
461 class Transcoder(CommonVideoProcessor):
462 """
463 Transcoding processing steps for processed video
464 """
465 name = 'transcode'
466 description = 'Re-transcode video'
467
468 @classmethod
469 def media_is_eligible(cls, entry=None, state=None):
470 if not state:
471 state = entry.state
472 return state in 'processed'
473
474 @classmethod
475 def generate_parser(cls):
476 parser = argparse.ArgumentParser(
477 description=cls.description,
478 prog=cls.name)
479
480 parser.add_argument(
481 '--medium_size',
482 nargs=2,
483 metavar=('max_width', 'max_height'),
484 type=int)
485
486 parser.add_argument(
487 '--vp8_quality',
488 type=int,
489 help='Range 0..10')
490
491 parser.add_argument(
492 '--vp8_threads',
493 type=int,
494 help='0 means number_of_CPUs - 1')
495
496 parser.add_argument(
497 '--vorbis_quality',
498 type=float,
499 help='Range -0.1..1')
500
501 return parser
502
503 @classmethod
504 def args_to_request(cls, args):
505 return request_from_args(
506 args, ['medium_size', 'vp8_threads', 'vp8_quality',
507 'vorbis_quality'])
508
509 def process(self, medium_size=None, vp8_quality=None, vp8_threads=None,
510 vorbis_quality=None):
511 self.common_setup()
512 self.transcode(medium_size=medium_size, vp8_threads=vp8_threads,
513 vp8_quality=vp8_quality, vorbis_quality=vorbis_quality)
514
515
516 class VideoProcessingManager(ProcessingManager):
517 def __init__(self):
518 super(VideoProcessingManager, self).__init__()
519 self.add_processor(InitialProcessor)
520 self.add_processor(Resizer)
521 self.add_processor(Transcoder)
522
523 def workflow(self, entry, feed_url, reprocess_action, reprocess_info=None):
524 ProcessMedia().apply_async(
525 [entry.id, feed_url, reprocess_action, reprocess_info], {},
526 task_id=entry.queued_task_id)