d8290d41d83f7770c1ec9283fe00e9c2752a238d
[mediagoblin.git] / mediagoblin / media_types / video / transcoders.py
1 # GNU MediaGoblin -- federated, autonomous media hosting
2 # Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
3 #
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as published by
6 # the Free Software Foundation, either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16
17 from __future__ import division
18
19 import os
20 import sys
21 import logging
22 import urllib
23 import multiprocessing
24 import gobject
25 import pygst
26 pygst.require('0.10')
27 import gst
28 import struct
29 import Image
30
31 from gst.extend import discoverer
32
33 _log = logging.getLogger(__name__)
34
35 gobject.threads_init()
36
37 CPU_COUNT = 2
38
39 try:
40 CPU_COUNT = multiprocessing.cpu_count()
41 except NotImplementedError:
42 _log.warning('multiprocessing.cpu_count not implemented')
43
44 os.putenv('GST_DEBUG_DUMP_DOT_DIR', '/tmp')
45
46
47 def pixbuf_to_pilbuf(buf):
48 data = list()
49 for i in range(0, len(buf), 3):
50 r, g, b = struct.unpack('BBB', buf[i:i + 3])
51 data.append((r, g, b))
52
53 return data
54
55
56 class VideoThumbnailer:
57 # Declaration of thumbnailer states
58 STATE_NULL = 0
59 STATE_HALTING = 1
60 STATE_PROCESSING = 2
61
62 # The current thumbnailer state
63
64 def __init__(self, source_path, dest_path):
65 '''
66 Set up playbin pipeline in order to get video properties.
67
68 Initializes and runs the gobject.MainLoop()
69
70 Abstract
71 - Set up a playbin with a fake audio sink and video sink. Load the video
72 into the playbin
73 - Initialize
74 '''
75 # This will contain the thumbnailing pipeline
76 self.state = self.STATE_NULL
77 self.thumbnail_pipeline = None
78 self.buffer_probes = {}
79 self.errors = []
80
81 self.source_path = source_path
82 self.dest_path = dest_path
83
84 self.loop = gobject.MainLoop()
85
86 # Set up the playbin. It will be used to discover certain
87 # properties of the input file
88 self.playbin = gst.element_factory_make('playbin')
89
90 self.videosink = gst.element_factory_make('fakesink', 'videosink')
91 self.playbin.set_property('video-sink', self.videosink)
92
93 self.audiosink = gst.element_factory_make('fakesink', 'audiosink')
94 self.playbin.set_property('audio-sink', self.audiosink)
95
96 self.bus = self.playbin.get_bus()
97 self.bus.add_signal_watch()
98 self.watch_id = self.bus.connect('message', self._on_bus_message)
99
100 self.playbin.set_property('uri', 'file:{0}'.format(
101 urllib.pathname2url(self.source_path)))
102
103 self.playbin.set_state(gst.STATE_PAUSED)
104
105 self.run()
106
107 def run(self):
108 self.loop.run()
109
110 def _on_bus_message(self, bus, message):
111 _log.debug(' thumbnail playbin: {0}'.format(message))
112
113 if message.type == gst.MESSAGE_ERROR:
114 _log.error('thumbnail playbin: {0}'.format(message))
115 gobject.idle_add(self._on_bus_error)
116
117 elif message.type == gst.MESSAGE_STATE_CHANGED:
118 # The pipeline state has changed
119 # Parse state changing data
120 _prev, state, _pending = message.parse_state_changed()
121
122 _log.debug('State changed: {0}'.format(state))
123
124 if state == gst.STATE_PAUSED:
125 if message.src == self.playbin:
126 gobject.idle_add(self._on_bus_paused)
127
128 def _on_bus_paused(self):
129 '''
130 Set up thumbnailing pipeline
131 '''
132 current_video = self.playbin.get_property('current-video')
133
134 if current_video == 0:
135 _log.debug('Found current video from playbin')
136 else:
137 _log.error('Could not get any current video from playbin!')
138
139 self.duration = self._get_duration(self.playbin)
140 _log.info('Video length: {0}'.format(self.duration / gst.SECOND))
141
142 _log.info('Setting up thumbnailing pipeline')
143 self.thumbnail_pipeline = gst.parse_launch(
144 'filesrc location="{0}" ! decodebin ! '
145 'ffmpegcolorspace ! videoscale ! '
146 'video/x-raw-rgb,depth=24,bpp=24,pixel-aspect-ratio=1/1,width=180 ! '
147 'fakesink signal-handoffs=True'.format(self.source_path))
148
149 self.thumbnail_bus = self.thumbnail_pipeline.get_bus()
150 self.thumbnail_bus.add_signal_watch()
151 self.thumbnail_watch_id = self.thumbnail_bus.connect(
152 'message', self._on_thumbnail_bus_message)
153
154 self.thumbnail_pipeline.set_state(gst.STATE_PAUSED)
155
156 #gobject.timeout_add(3000, self._on_timeout)
157
158 return False
159
160 def _on_thumbnail_bus_message(self, bus, message):
161 _log.debug('thumbnail: {0}'.format(message))
162
163 if message.type == gst.MESSAGE_ERROR:
164 _log.error(message)
165 gobject.idle_add(self._on_bus_error)
166
167 if message.type == gst.MESSAGE_STATE_CHANGED:
168 _log.debug('State changed')
169 _prev, state, _pending = message.parse_state_changed()
170
171 if (state == gst.STATE_PAUSED and
172 not self.state == self.STATE_PROCESSING and
173 message.src == self.thumbnail_pipeline):
174 _log.info('Pipeline paused, processing')
175 self.state = self.STATE_PROCESSING
176
177 for sink in self.thumbnail_pipeline.sinks():
178 name = sink.get_name()
179 factoryname = sink.get_factory().get_name()
180
181 if factoryname == 'fakesink':
182 sinkpad = sink.get_pad('sink')
183
184 self.buffer_probes[name] = sinkpad.add_buffer_probe(
185 self.buffer_probe_handler, name)
186
187 _log.info('Added buffer probe')
188
189 break
190
191 # Apply the wadsworth constant, fallback to 1 second
192 # TODO: Will break if video is shorter than 1 sec
193 seek_amount = max(self.duration / 100 * 30, 1 * gst.SECOND)
194
195 _log.debug('seek amount: {0}'.format(seek_amount))
196
197 seek_result = self.thumbnail_pipeline.seek(
198 1.0,
199 gst.FORMAT_TIME,
200 gst.SEEK_FLAG_FLUSH | gst.SEEK_FLAG_ACCURATE,
201 gst.SEEK_TYPE_SET,
202 seek_amount,
203 gst.SEEK_TYPE_NONE,
204 0)
205
206 if not seek_result:
207 self.errors.append('COULD_NOT_SEEK')
208 _log.error('Couldn\'t seek! result: {0}'.format(
209 seek_result))
210 _log.info(message)
211 self.shutdown()
212 else:
213 _log.debug('Seek successful')
214 self.thumbnail_pipeline.set_state(gst.STATE_PAUSED)
215 else:
216 _log.debug('Won\'t seek: \t{0}\n\t{1}'.format(
217 self.state,
218 message.src))
219
220 def buffer_probe_handler_real(self, pad, buff, name):
221 '''
222 Capture buffers as gdk_pixbufs when told to.
223 '''
224 _log.info('Capturing frame')
225 try:
226 caps = buff.caps
227 if caps is None:
228 _log.error('No caps passed to buffer probe handler!')
229 self.shutdown()
230 return False
231
232 _log.debug('caps: {0}'.format(caps))
233
234 filters = caps[0]
235 width = filters["width"]
236 height = filters["height"]
237
238 im = Image.new('RGB', (width, height))
239
240 data = pixbuf_to_pilbuf(buff.data)
241
242 im.putdata(data)
243
244 im.save(self.dest_path)
245
246 _log.info('Saved thumbnail')
247
248 self.shutdown()
249
250 except gst.QueryError as e:
251 _log.error('QueryError: {0}'.format(e))
252
253 return False
254
255 def buffer_probe_handler(self, pad, buff, name):
256 '''
257 Proxy function for buffer_probe_handler_real
258 '''
259 _log.debug('Attaching real buffer handler to gobject idle event')
260 gobject.idle_add(
261 lambda: self.buffer_probe_handler_real(pad, buff, name))
262
263 return True
264
265 def _get_duration(self, pipeline, retries=0):
266 '''
267 Get the duration of a pipeline.
268
269 Retries 5 times.
270 '''
271 if retries == 5:
272 return 0
273
274 try:
275 return pipeline.query_duration(gst.FORMAT_TIME)[0]
276 except gst.QueryError:
277 return self._get_duration(pipeline, retries + 1)
278
279 def _on_timeout(self):
280 _log.error('Timeout in thumbnailer!')
281 self.shutdown()
282
283 def _on_bus_error(self, *args):
284 _log.error('AHAHAHA! Error! args: {0}'.format(args))
285
286 def shutdown(self):
287 '''
288 Tell gobject to call __halt when the mainloop is idle.
289 '''
290 _log.info('Shutting down')
291 self.__halt()
292
293 def __halt(self):
294 '''
295 Halt all pipelines and shut down the main loop
296 '''
297 _log.info('Halting...')
298 self.state = self.STATE_HALTING
299
300 self.__disconnect()
301
302 gobject.idle_add(self.__halt_final)
303
304 def __disconnect(self):
305 _log.debug('Disconnecting...')
306 if not self.playbin is None:
307 self.playbin.set_state(gst.STATE_NULL)
308 for sink in self.playbin.sinks():
309 name = sink.get_name()
310 factoryname = sink.get_factory().get_name()
311
312 _log.debug('Disconnecting {0}'.format(name))
313
314 if factoryname == "fakesink":
315 pad = sink.get_pad("sink")
316 pad.remove_buffer_probe(self.buffer_probes[name])
317 del self.buffer_probes[name]
318
319 self.playbin = None
320
321 if self.bus is not None:
322 self.bus.disconnect(self.watch_id)
323 self.bus = None
324
325 def __halt_final(self):
326 _log.info('Done')
327 if self.errors:
328 _log.error(','.join(self.errors))
329
330 self.loop.quit()
331
332
333 class VideoThumbnailerMarkII(object):
334 '''
335 Creates a thumbnail from a video file. Rewrite of VideoThumbnailer.
336
337 Large parts of the functionality and overall architectue contained within
338 this object is taken from Participatory Culture Foundation's
339 `gst_extractor.Extractor` object last seen at
340 https://github.com/pculture/miro/blob/master/tv/lib/frontends/widgets/gst/gst_extractor.py
341 in the `miro` codebase.
342
343 The `miro` codebase and the gst_extractor.py are licensed under the GNU
344 General Public License v2 or later.
345 '''
346 STATE_NULL = 0
347 STATE_HALTING = 1
348 STATE_PROCESSING = 2
349 STATE_PROCESSING_THUMBNAIL = 3
350
351 def __init__(self, source_path, dest_path, width=None, height=None,
352 position_callback=None):
353 self.state = self.STATE_NULL
354
355 self.has_reached_playbin_pause = False
356
357 self.thumbnail_pipeline = None
358
359 self.permission_to_take_picture = False
360
361 self.buffer_probes = {}
362
363 self.errors = []
364
365 self.source_path = os.path.abspath(source_path)
366 self.dest_path = os.path.abspath(dest_path)
367
368 self.width = width
369 self.height = height
370 self.position_callback = position_callback \
371 or self.wadsworth_position_callback
372
373 self.mainloop = gobject.MainLoop()
374
375 self.playbin = gst.element_factory_make('playbin')
376
377 self.videosink = gst.element_factory_make('fakesink', 'videosink')
378 self.audiosink = gst.element_factory_make('fakesink', 'audiosink')
379
380 self.playbin.set_property('video-sink', self.videosink)
381 self.playbin.set_property('audio-sink', self.audiosink)
382
383 self.playbin_message_bus = self.playbin.get_bus()
384
385 self.playbin_message_bus.add_signal_watch()
386 self.playbin_bus_watch_id = self.playbin_message_bus.connect(
387 'message',
388 self.on_playbin_message)
389
390 self.playbin.set_property(
391 'uri',
392 'file:{0}'.format(
393 urllib.pathname2url(self.source_path)))
394
395 self.playbin.set_state(gst.STATE_PAUSED)
396
397 try:
398 self.run()
399 except Exception as exc:
400 _log.critical(
401 'Exception "{0}" caught, disconnecting and re-raising'\
402 .format(exc))
403 self.disconnect()
404 raise
405
406 def wadsworth_position_callback(self, duration, gst):
407 return self.duration / 100 * 30
408
409 def run(self):
410 self.mainloop.run()
411
412 def on_playbin_message(self, message_bus, message):
413 _log.debug('playbin message: {0}'.format(message))
414
415 if message.type == gst.MESSAGE_ERROR:
416 _log.error('playbin error: {0}'.format(message))
417 gobject.idle_add(self.on_playbin_error)
418
419 if message.type == gst.MESSAGE_STATE_CHANGED:
420 prev_state, cur_state, pending_state = \
421 message.parse_state_changed()
422
423 _log.debug('playbin state changed: \nprev: {0}\ncur: {1}\n \
424 pending: {2}'.format(
425 prev_state,
426 cur_state,
427 pending_state))
428
429 if cur_state == gst.STATE_PAUSED:
430 if message.src == self.playbin:
431 _log.info('playbin ready')
432 gobject.idle_add(self.on_playbin_paused)
433
434 def on_playbin_paused(self):
435 if self.has_reached_playbin_pause:
436 _log.warn('Has already reached logic for playbin pause. Aborting \
437 without doing anything this time.')
438 return False
439
440 self.has_reached_playbin_pause = True
441
442 current_video = self.playbin.get_property('current-video')
443
444 if not current_video:
445 _log.critical('thumbnail could not get any video data \
446 from playbin')
447
448 self.duration = self.get_duration(self.playbin)
449 self.permission_to_take_picture = True
450 self.buffer_probes = {}
451
452 pipeline = ''.join([
453 'filesrc location="%s" ! decodebin ! ' % self.source_path,
454 'ffmpegcolorspace ! videoscale ! ',
455 'video/x-raw-rgb,depth=24,bpp=24,pixel-aspect-ratio=1/1',
456 ',width={0}'.format(self.width) if self.width else '',
457 ',height={0}'.format(self.height) if self.height else '',
458 ' ! ',
459 'fakesink signal-handoffs=True'])
460
461 _log.debug('thumbnail_pipeline: {0}'.format(pipeline))
462
463 self.thumbnail_pipeline = gst.parse_launch(pipeline)
464 self.thumbnail_message_bus = self.thumbnail_pipeline.get_bus()
465 self.thumbnail_message_bus.add_signal_watch()
466 self.thumbnail_bus_watch_id = self.thumbnail_message_bus.connect(
467 'message',
468 self.on_thumbnail_message)
469
470 self.thumbnail_pipeline.set_state(gst.STATE_PAUSED)
471
472 gobject.timeout_add(3000, self.on_gobject_timeout)
473
474 return False
475
476 def on_thumbnail_message(self, message_bus, message):
477 _log.debug('thumbnail message: {0}'.format(message))
478
479 if message.type == gst.MESSAGE_ERROR:
480 _log.error('thumbnail error: {0}'.format(message.parse_error()))
481 gobject.idle_add(self.on_thumbnail_error, message)
482
483 if message.type == gst.MESSAGE_STATE_CHANGED:
484 prev_state, cur_state, pending_state = \
485 message.parse_state_changed()
486
487 _log.debug('thumbnail state changed: \nprev: {0}\ncur: {1}\n \
488 pending: {2}'.format(
489 prev_state,
490 cur_state,
491 pending_state))
492
493 if cur_state == gst.STATE_PAUSED and\
494 not self.state == self.STATE_PROCESSING_THUMBNAIL:
495 self.state = self.STATE_PROCESSING_THUMBNAIL
496
497 # Find the fakesink sink pad and attach the on_buffer_probe
498 # handler to it.
499 for sink in self.thumbnail_pipeline.sinks():
500 sink_name = sink.get_name()
501 sink_factory_name = sink.get_factory().get_name()
502
503 if sink_factory_name == 'fakesink':
504 sink_pad = sink.get_pad('sink')
505
506 self.buffer_probes[sink_name] = sink_pad\
507 .add_buffer_probe(
508 self.on_pad_buffer_probe,
509 sink_name)
510
511 _log.info('Attached buffer probes: {0}'.format(
512 self.buffer_probes))
513
514 break
515
516 seek_amount = self.position_callback(self.duration, gst)
517
518 seek_result = self.thumbnail_pipeline.seek(
519 1.0,
520 gst.FORMAT_TIME,
521 gst.SEEK_FLAG_FLUSH | gst.SEEK_FLAG_ACCURATE,
522 gst.SEEK_TYPE_SET,
523 seek_amount,
524 gst.SEEK_TYPE_NONE,
525 0)
526
527 if not seek_result:
528 _log.critical('Could not seek.')
529
530 elif self.state == self.STATE_PROCESSING_THUMBNAIL:
531 _log.debug('Already processing thumbnail')
532
533 def on_pad_buffer_probe(self, *args):
534 _log.debug('buffer probe handler: {0}'.format(args))
535 gobject.idle_add(lambda: self.take_snapshot(*args))
536
537 def take_snapshot(self, pad, buff, name):
538 if self.state == self.STATE_HALTING:
539 _log.debug('Pipeline is halting, will not take snapshot')
540 return False
541
542 _log.info('Taking snapshot! ({0})'.format(
543 (pad, buff, name)))
544 try:
545 caps = buff.caps
546 if caps is None:
547 _log.error('No buffer caps present /take_snapshot')
548 self.disconnect()
549
550 _log.debug('caps: {0}'.format(caps))
551
552 filters = caps[0]
553 width = filters['width']
554 height = filters['height']
555
556 im = Image.new('RGB', (width, height))
557
558 data = pixbuf_to_pilbuf(buff.data)
559
560 im.putdata(data)
561
562 im.save(self.dest_path)
563
564 _log.info('Saved snapshot!')
565
566 self.disconnect()
567
568 except gst.QueryError as exc:
569 _log.error('take_snapshot - QueryError: {0}'.format(exc))
570
571 return False
572
573 def on_thumbnail_error(self, message):
574 scaling_failed = False
575
576 if 'Error calculating the output scaled size - integer overflow' \
577 in message.parse_error()[1]:
578 # GStreamer videoscale sometimes fails to calculate the dimensions
579 # given only one of the destination dimensions and the source
580 # dimensions. This is a workaround in case videoscale returns an
581 # error that indicates this has happened.
582 scaling_failed = True
583 _log.error('Thumbnailing failed because of videoscale integer'
584 ' overflow. Will retry with fallback.')
585 else:
586 _log.error('Thumbnailing failed: {0}'.format(message.parse_error()))
587
588 # Kill the current mainloop
589 self.disconnect()
590
591 if scaling_failed:
592 # Manually scale the destination dimensions
593 _log.info('Retrying with manually set sizes...')
594
595 info = VideoTranscoder().discover(self.source_path)
596
597 h = info['videoheight']
598 w = info['videowidth']
599 ratio = 180 / int(w)
600 h = int(h * ratio)
601
602 self.__init__(self.source_path, self.dest_path, 180, h)
603
604 def disconnect(self):
605 self.state = self.STATE_HALTING
606
607 if self.playbin is not None:
608 self.playbin.set_state(gst.STATE_NULL)
609
610 for sink in self.playbin.sinks():
611 sink_name = sink.get_name()
612 sink_factory_name = sink.get_factory().get_name()
613
614 if sink_factory_name == 'fakesink':
615 sink_pad = sink.get_pad('sink')
616 sink_pad.remove_buffer_probe(self.buffer_probes[sink_name])
617 del self.buffer_probes[sink_name]
618
619 self.playbin = None
620
621 if self.thumbnail_pipeline is not None:
622 self.thumbnail_pipeline.set_state(gst.STATE_NULL)
623 self.thumbnail_pipeline = None
624
625 if self.playbin_message_bus is not None:
626 self.playbin_message_bus.disconnect(self.playbin_bus_watch_id)
627 self.playbin_message_bus = None
628
629 self.halt()
630
631 def halt(self):
632 gobject.idle_add(self.mainloop.quit)
633
634 def on_gobject_timeout(self):
635 _log.critical('Reached gobject timeout')
636 self.disconnect()
637
638 def get_duration(self, pipeline, attempt=1):
639 if attempt == 5:
640 _log.critical('Pipeline duration query retry limit reached.')
641 return 0
642
643 try:
644 return pipeline.query_duration(gst.FORMAT_TIME)[0]
645 except gst.QueryError as exc:
646 _log.error('Could not get duration on attempt {0}: {1}'.format(
647 attempt,
648 exc))
649 return self.get_duration(pipeline, attempt + 1)
650
651
652 class VideoTranscoder:
653 '''
654 Video transcoder
655
656 Transcodes the SRC video file to a VP8 WebM video file at DST
657
658 - Does the same thing as VideoThumbnailer, but produces a WebM vp8
659 and vorbis video file.
660 - The VideoTranscoder exceeds the VideoThumbnailer in the way
661 that it was refined afterwards and therefore is done more
662 correctly.
663 '''
664 def __init__(self):
665 _log.info('Initializing VideoTranscoder...')
666 self.progress_percentage = None
667 self.loop = gobject.MainLoop()
668
669 def transcode(self, src, dst, **kwargs):
670 '''
671 Transcode a video file into a 'medium'-sized version.
672 '''
673 self.source_path = src
674 self.destination_path = dst
675
676 # vp8enc options
677 self.destination_dimensions = kwargs.get('dimensions', (640, 640))
678 self.vp8_quality = kwargs.get('vp8_quality', 8)
679 # Number of threads used by vp8enc:
680 # number of real cores - 1 as per recommendation on
681 # <http://www.webmproject.org/tools/encoder-parameters/#6-multi-threaded-encode-and-decode>
682 self.vp8_threads = kwargs.get('vp8_threads', CPU_COUNT - 1)
683
684 # 0 means auto-detect, but dict.get() only falls back to CPU_COUNT
685 # if value is None, this will correct our incompatibility with
686 # dict.get()
687 # This will also correct cases where there's only 1 CPU core, see
688 # original self.vp8_threads assignment above.
689 if self.vp8_threads == 0:
690 self.vp8_threads = CPU_COUNT
691
692 # vorbisenc options
693 self.vorbis_quality = kwargs.get('vorbis_quality', 0.3)
694
695 self._progress_callback = kwargs.get('progress_callback') or None
696
697 if not type(self.destination_dimensions) == tuple:
698 raise Exception('dimensions must be tuple: (width, height)')
699
700 self._setup()
701 self._run()
702
703 # XXX: This could be a static method.
704 def discover(self, src):
705 '''
706 Discover properties about a media file
707 '''
708 _log.info('Discovering {0}'.format(src))
709
710 self.source_path = src
711 self._setup_discover(discovered_callback=self.__on_discovered)
712
713 self.discoverer.discover()
714
715 self.loop.run()
716
717 if hasattr(self, '_discovered_data'):
718 return self._discovered_data.__dict__
719 else:
720 return None
721
722 def __on_discovered(self, data, is_media):
723 _log.debug('Discovered: {0}'.format(data))
724 if not is_media:
725 self.__stop()
726 raise Exception('Could not discover {0}'.format(self.source_path))
727
728 self._discovered_data = data
729
730 self.__stop_mainloop()
731
732 def _setup(self):
733 self._setup_discover()
734 self._setup_pipeline()
735
736 def _run(self):
737 _log.info('Discovering...')
738 self.discoverer.discover()
739 _log.info('Done')
740
741 _log.debug('Initializing MainLoop()')
742 self.loop.run()
743
744 def _setup_discover(self, **kw):
745 _log.debug('Setting up discoverer')
746 self.discoverer = discoverer.Discoverer(self.source_path)
747
748 # Connect self.__discovered to the 'discovered' event
749 self.discoverer.connect(
750 'discovered',
751 kw.get('discovered_callback', self.__discovered))
752
753 def __discovered(self, data, is_media):
754 '''
755 Callback for media discoverer.
756 '''
757 if not is_media:
758 self.__stop()
759 raise Exception('Could not discover {0}'.format(self.source_path))
760
761 _log.debug('__discovered, data: {0}'.format(data.__dict__))
762
763 self.data = data
764
765 # Launch things that should be done after discovery
766 self._link_elements()
767 self.__setup_videoscale_capsfilter()
768
769 # Tell the transcoding pipeline to start running
770 self.pipeline.set_state(gst.STATE_PLAYING)
771 _log.info('Transcoding...')
772
773 def _setup_pipeline(self):
774 _log.debug('Setting up transcoding pipeline')
775 # Create the pipeline bin.
776 self.pipeline = gst.Pipeline('VideoTranscoderPipeline')
777
778 # Create all GStreamer elements, starting with
779 # filesrc & decoder
780 self.filesrc = gst.element_factory_make('filesrc', 'filesrc')
781 self.filesrc.set_property('location', self.source_path)
782 self.pipeline.add(self.filesrc)
783
784 self.decoder = gst.element_factory_make('decodebin2', 'decoder')
785 self.decoder.connect('new-decoded-pad', self._on_dynamic_pad)
786 self.pipeline.add(self.decoder)
787
788 # Video elements
789 self.videoqueue = gst.element_factory_make('queue', 'videoqueue')
790 self.pipeline.add(self.videoqueue)
791
792 self.videorate = gst.element_factory_make('videorate', 'videorate')
793 self.pipeline.add(self.videorate)
794
795 self.ffmpegcolorspace = gst.element_factory_make(
796 'ffmpegcolorspace', 'ffmpegcolorspace')
797 self.pipeline.add(self.ffmpegcolorspace)
798
799 self.videoscale = gst.element_factory_make('ffvideoscale', 'videoscale')
800 #self.videoscale.set_property('method', 2) # I'm not sure this works
801 #self.videoscale.set_property('add-borders', 0)
802 self.pipeline.add(self.videoscale)
803
804 self.capsfilter = gst.element_factory_make('capsfilter', 'capsfilter')
805 self.pipeline.add(self.capsfilter)
806
807 self.vp8enc = gst.element_factory_make('vp8enc', 'vp8enc')
808 self.vp8enc.set_property('quality', self.vp8_quality)
809 self.vp8enc.set_property('threads', self.vp8_threads)
810 self.vp8enc.set_property('max-latency', 25)
811 self.pipeline.add(self.vp8enc)
812
813 # Audio elements
814 self.audioqueue = gst.element_factory_make('queue', 'audioqueue')
815 self.pipeline.add(self.audioqueue)
816
817 self.audiorate = gst.element_factory_make('audiorate', 'audiorate')
818 self.audiorate.set_property('tolerance', 80000000)
819 self.pipeline.add(self.audiorate)
820
821 self.audioconvert = gst.element_factory_make('audioconvert', 'audioconvert')
822 self.pipeline.add(self.audioconvert)
823
824 self.audiocapsfilter = gst.element_factory_make('capsfilter',
825 'audiocapsfilter')
826 audiocaps = ['audio/x-raw-float']
827 self.audiocapsfilter.set_property(
828 'caps',
829 gst.caps_from_string(
830 ','.join(audiocaps)))
831 self.pipeline.add(self.audiocapsfilter)
832
833 self.vorbisenc = gst.element_factory_make('vorbisenc', 'vorbisenc')
834 self.vorbisenc.set_property('quality', self.vorbis_quality)
835 self.pipeline.add(self.vorbisenc)
836
837 # WebMmux & filesink
838 self.webmmux = gst.element_factory_make('webmmux', 'webmmux')
839 self.pipeline.add(self.webmmux)
840
841 self.filesink = gst.element_factory_make('filesink', 'filesink')
842 self.filesink.set_property('location', self.destination_path)
843 self.pipeline.add(self.filesink)
844
845 # Progressreport
846 self.progressreport = gst.element_factory_make(
847 'progressreport', 'progressreport')
848 # Update every second
849 self.progressreport.set_property('update-freq', 1)
850 self.progressreport.set_property('silent', True)
851 self.pipeline.add(self.progressreport)
852
853 def _link_elements(self):
854 '''
855 Link all the elements
856
857 This code depends on data from the discoverer and is called
858 from __discovered
859 '''
860 _log.debug('linking elements')
861 # Link the filesrc element to the decoder. The decoder then emits
862 # 'new-decoded-pad' which links decoded src pads to either a video
863 # or audio sink
864 self.filesrc.link(self.decoder)
865
866 # Link all the video elements in a row to webmmux
867 gst.element_link_many(
868 self.videoqueue,
869 self.videorate,
870 self.ffmpegcolorspace,
871 self.videoscale,
872 self.capsfilter,
873 self.vp8enc,
874 self.webmmux)
875
876 if self.data.is_audio:
877 # Link all the audio elements in a row to webmux
878 gst.element_link_many(
879 self.audioqueue,
880 self.audiorate,
881 self.audioconvert,
882 self.audiocapsfilter,
883 self.vorbisenc,
884 self.webmmux)
885
886 gst.element_link_many(
887 self.webmmux,
888 self.progressreport,
889 self.filesink)
890
891 # Setup the message bus and connect _on_message to the pipeline
892 self._setup_bus()
893
894 def _on_dynamic_pad(self, dbin, pad, islast):
895 '''
896 Callback called when ``decodebin2`` has a pad that we can connect to
897 '''
898 # Intersect the capabilities of the video sink and the pad src
899 # Then check if they have no common capabilities.
900 if self.ffmpegcolorspace.get_pad_template('sink')\
901 .get_caps().intersect(pad.get_caps()).is_empty():
902 # It is NOT a video src pad.
903 pad.link(self.audioqueue.get_pad('sink'))
904 else:
905 # It IS a video src pad.
906 pad.link(self.videoqueue.get_pad('sink'))
907
908 def _setup_bus(self):
909 self.bus = self.pipeline.get_bus()
910 self.bus.add_signal_watch()
911 self.bus.connect('message', self._on_message)
912
913 def __setup_videoscale_capsfilter(self):
914 '''
915 Sets up the output format (width, height) for the video
916 '''
917 caps = ['video/x-raw-yuv', 'pixel-aspect-ratio=1/1', 'framerate=30/1']
918
919 if self.data.videoheight > self.data.videowidth:
920 # Whoa! We have ourselves a portrait video!
921 caps.append('height={0}'.format(
922 self.destination_dimensions[1]))
923 else:
924 # It's a landscape, phew, how normal.
925 caps.append('width={0}'.format(
926 self.destination_dimensions[0]))
927
928 self.capsfilter.set_property(
929 'caps',
930 gst.caps_from_string(
931 ','.join(caps)))
932
933 def _on_message(self, bus, message):
934 _log.debug((bus, message, message.type))
935
936 t = message.type
937
938 if message.type == gst.MESSAGE_EOS:
939 self._discover_dst_and_stop()
940 _log.info('Done')
941
942 elif message.type == gst.MESSAGE_ELEMENT:
943 if message.structure.get_name() == 'progress':
944 data = dict(message.structure)
945 # Update progress state if it has changed
946 if self.progress_percentage != data.get('percent'):
947 self.progress_percentage = data.get('percent')
948 if self._progress_callback:
949 self._progress_callback(data.get('percent'))
950
951 _log.info('{percent}% done...'.format(
952 percent=data.get('percent')))
953 _log.debug(data)
954
955 elif t == gst.MESSAGE_ERROR:
956 _log.error((bus, message))
957 self.__stop()
958
959 def _discover_dst_and_stop(self):
960 self.dst_discoverer = discoverer.Discoverer(self.destination_path)
961
962 self.dst_discoverer.connect('discovered', self.__dst_discovered)
963
964 self.dst_discoverer.discover()
965
966 def __dst_discovered(self, data, is_media):
967 self.dst_data = data
968
969 self.__stop()
970
971 def __stop(self):
972 _log.debug(self.loop)
973
974 if hasattr(self, 'pipeline'):
975 # Stop executing the pipeline
976 self.pipeline.set_state(gst.STATE_NULL)
977
978 # This kills the loop, mercifully
979 gobject.idle_add(self.__stop_mainloop)
980
981 def __stop_mainloop(self):
982 '''
983 Wrapper for gobject.MainLoop.quit()
984
985 This wrapper makes us able to see if self.loop.quit has been called
986 '''
987 _log.info('Terminating MainLoop')
988
989 self.loop.quit()
990
991
992 if __name__ == '__main__':
993 os.nice(19)
994 logging.basicConfig()
995 from optparse import OptionParser
996
997 parser = OptionParser(
998 usage='%prog [-v] -a [ video | thumbnail | discover ] SRC [ DEST ]')
999
1000 parser.add_option('-a', '--action',
1001 dest='action',
1002 help='One of "video", "discover" or "thumbnail"')
1003
1004 parser.add_option('-v',
1005 dest='verbose',
1006 action='store_true',
1007 help='Output debug information')
1008
1009 parser.add_option('-q',
1010 dest='quiet',
1011 action='store_true',
1012 help='Dear program, please be quiet unless *error*')
1013
1014 (options, args) = parser.parse_args()
1015
1016 if options.verbose:
1017 _log.setLevel(logging.DEBUG)
1018 else:
1019 _log.setLevel(logging.INFO)
1020
1021 if options.quiet:
1022 _log.setLevel(logging.ERROR)
1023
1024 _log.debug(args)
1025
1026 if not len(args) == 2 and not options.action == 'discover':
1027 parser.print_help()
1028 sys.exit()
1029
1030 transcoder = VideoTranscoder()
1031
1032 if options.action == 'thumbnail':
1033 VideoThumbnailerMarkII(*args)
1034 elif options.action == 'video':
1035 def cb(data):
1036 print('I\'m a callback!')
1037 transcoder.transcode(*args, progress_callback=cb)
1038 elif options.action == 'discover':
1039 print transcoder.discover(*args)