Added option to skip transcoding
[mediagoblin.git] / mediagoblin / media_types / video / transcoders.py
1 # GNU MediaGoblin -- federated, autonomous media hosting
2 # Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
3 #
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as published by
6 # the Free Software Foundation, either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16
17 from __future__ import division
18
19 import os
20 import sys
21 import logging
22 import urllib
23 import multiprocessing
24 import gobject
25 import pygst
26 pygst.require('0.10')
27 import gst
28 import struct
29 import Image
30
31 from gst.extend import discoverer
32
33 _log = logging.getLogger(__name__)
34
35 gobject.threads_init()
36
37 CPU_COUNT = 2
38
39 try:
40 CPU_COUNT = multiprocessing.cpu_count()
41 except NotImplementedError:
42 _log.warning('multiprocessing.cpu_count not implemented')
43
44 os.putenv('GST_DEBUG_DUMP_DOT_DIR', '/tmp')
45
46
47 def pixbuf_to_pilbuf(buf):
48 data = list()
49 for i in range(0, len(buf), 3):
50 r, g, b = struct.unpack('BBB', buf[i:i + 3])
51 data.append((r, g, b))
52
53 return data
54
55
56 class VideoThumbnailer:
57 # Declaration of thumbnailer states
58 STATE_NULL = 0
59 STATE_HALTING = 1
60 STATE_PROCESSING = 2
61
62 # The current thumbnailer state
63
64 def __init__(self, source_path, dest_path):
65 '''
66 Set up playbin pipeline in order to get video properties.
67
68 Initializes and runs the gobject.MainLoop()
69
70 Abstract
71 - Set up a playbin with a fake audio sink and video sink. Load the video
72 into the playbin
73 - Initialize
74 '''
75 # This will contain the thumbnailing pipeline
76 self.state = self.STATE_NULL
77 self.thumbnail_pipeline = None
78 self.buffer_probes = {}
79 self.errors = []
80
81 self.source_path = source_path
82 self.dest_path = dest_path
83
84 self.loop = gobject.MainLoop()
85
86 # Set up the playbin. It will be used to discover certain
87 # properties of the input file
88 self.playbin = gst.element_factory_make('playbin')
89
90 self.videosink = gst.element_factory_make('fakesink', 'videosink')
91 self.playbin.set_property('video-sink', self.videosink)
92
93 self.audiosink = gst.element_factory_make('fakesink', 'audiosink')
94 self.playbin.set_property('audio-sink', self.audiosink)
95
96 self.bus = self.playbin.get_bus()
97 self.bus.add_signal_watch()
98 self.watch_id = self.bus.connect('message', self._on_bus_message)
99
100 self.playbin.set_property('uri', 'file:{0}'.format(
101 urllib.pathname2url(self.source_path)))
102
103 self.playbin.set_state(gst.STATE_PAUSED)
104
105 self.run()
106
107 def run(self):
108 self.loop.run()
109
110 def _on_bus_message(self, bus, message):
111 _log.debug(' thumbnail playbin: {0}'.format(message))
112
113 if message.type == gst.MESSAGE_ERROR:
114 _log.error('thumbnail playbin: {0}'.format(message))
115 gobject.idle_add(self._on_bus_error)
116
117 elif message.type == gst.MESSAGE_STATE_CHANGED:
118 # The pipeline state has changed
119 # Parse state changing data
120 _prev, state, _pending = message.parse_state_changed()
121
122 _log.debug('State changed: {0}'.format(state))
123
124 if state == gst.STATE_PAUSED:
125 if message.src == self.playbin:
126 gobject.idle_add(self._on_bus_paused)
127
128 def _on_bus_paused(self):
129 '''
130 Set up thumbnailing pipeline
131 '''
132 current_video = self.playbin.get_property('current-video')
133
134 if current_video == 0:
135 _log.debug('Found current video from playbin')
136 else:
137 _log.error('Could not get any current video from playbin!')
138
139 self.duration = self._get_duration(self.playbin)
140 _log.info('Video length: {0}'.format(self.duration / gst.SECOND))
141
142 _log.info('Setting up thumbnailing pipeline')
143 self.thumbnail_pipeline = gst.parse_launch(
144 'filesrc location="{0}" ! decodebin ! '
145 'ffmpegcolorspace ! videoscale ! '
146 'video/x-raw-rgb,depth=24,bpp=24,pixel-aspect-ratio=1/1,width=180 ! '
147 'fakesink signal-handoffs=True'.format(self.source_path))
148
149 self.thumbnail_bus = self.thumbnail_pipeline.get_bus()
150 self.thumbnail_bus.add_signal_watch()
151 self.thumbnail_watch_id = self.thumbnail_bus.connect(
152 'message', self._on_thumbnail_bus_message)
153
154 self.thumbnail_pipeline.set_state(gst.STATE_PAUSED)
155
156 #gobject.timeout_add(3000, self._on_timeout)
157
158 return False
159
160 def _on_thumbnail_bus_message(self, bus, message):
161 _log.debug('thumbnail: {0}'.format(message))
162
163 if message.type == gst.MESSAGE_ERROR:
164 _log.error(message)
165 gobject.idle_add(self._on_bus_error)
166
167 if message.type == gst.MESSAGE_STATE_CHANGED:
168 _log.debug('State changed')
169 _prev, state, _pending = message.parse_state_changed()
170
171 if (state == gst.STATE_PAUSED and
172 not self.state == self.STATE_PROCESSING and
173 message.src == self.thumbnail_pipeline):
174 _log.info('Pipeline paused, processing')
175 self.state = self.STATE_PROCESSING
176
177 for sink in self.thumbnail_pipeline.sinks():
178 name = sink.get_name()
179 factoryname = sink.get_factory().get_name()
180
181 if factoryname == 'fakesink':
182 sinkpad = sink.get_pad('sink')
183
184 self.buffer_probes[name] = sinkpad.add_buffer_probe(
185 self.buffer_probe_handler, name)
186
187 _log.info('Added buffer probe')
188
189 break
190
191 # Apply the wadsworth constant, fallback to 1 second
192 # TODO: Will break if video is shorter than 1 sec
193 seek_amount = max(self.duration / 100 * 30, 1 * gst.SECOND)
194
195 _log.debug('seek amount: {0}'.format(seek_amount))
196
197 seek_result = self.thumbnail_pipeline.seek(
198 1.0,
199 gst.FORMAT_TIME,
200 gst.SEEK_FLAG_FLUSH | gst.SEEK_FLAG_ACCURATE,
201 gst.SEEK_TYPE_SET,
202 seek_amount,
203 gst.SEEK_TYPE_NONE,
204 0)
205
206 if not seek_result:
207 self.errors.append('COULD_NOT_SEEK')
208 _log.error('Couldn\'t seek! result: {0}'.format(
209 seek_result))
210 _log.info(message)
211 self.shutdown()
212 else:
213 _log.debug('Seek successful')
214 self.thumbnail_pipeline.set_state(gst.STATE_PAUSED)
215 else:
216 _log.debug('Won\'t seek: \t{0}\n\t{1}'.format(
217 self.state,
218 message.src))
219
220 def buffer_probe_handler_real(self, pad, buff, name):
221 '''
222 Capture buffers as gdk_pixbufs when told to.
223 '''
224 _log.info('Capturing frame')
225 try:
226 caps = buff.caps
227 if caps is None:
228 _log.error('No caps passed to buffer probe handler!')
229 self.shutdown()
230 return False
231
232 _log.debug('caps: {0}'.format(caps))
233
234 filters = caps[0]
235 width = filters["width"]
236 height = filters["height"]
237
238 im = Image.new('RGB', (width, height))
239
240 data = pixbuf_to_pilbuf(buff.data)
241
242 im.putdata(data)
243
244 im.save(self.dest_path)
245
246 _log.info('Saved thumbnail')
247
248 self.shutdown()
249
250 except gst.QueryError as e:
251 _log.error('QueryError: {0}'.format(e))
252
253 return False
254
255 def buffer_probe_handler(self, pad, buff, name):
256 '''
257 Proxy function for buffer_probe_handler_real
258 '''
259 _log.debug('Attaching real buffer handler to gobject idle event')
260 gobject.idle_add(
261 lambda: self.buffer_probe_handler_real(pad, buff, name))
262
263 return True
264
265 def _get_duration(self, pipeline, retries=0):
266 '''
267 Get the duration of a pipeline.
268
269 Retries 5 times.
270 '''
271 if retries == 5:
272 return 0
273
274 try:
275 return pipeline.query_duration(gst.FORMAT_TIME)[0]
276 except gst.QueryError:
277 return self._get_duration(pipeline, retries + 1)
278
279 def _on_timeout(self):
280 _log.error('Timeout in thumbnailer!')
281 self.shutdown()
282
283 def _on_bus_error(self, *args):
284 _log.error('AHAHAHA! Error! args: {0}'.format(args))
285
286 def shutdown(self):
287 '''
288 Tell gobject to call __halt when the mainloop is idle.
289 '''
290 _log.info('Shutting down')
291 self.__halt()
292
293 def __halt(self):
294 '''
295 Halt all pipelines and shut down the main loop
296 '''
297 _log.info('Halting...')
298 self.state = self.STATE_HALTING
299
300 self.__disconnect()
301
302 gobject.idle_add(self.__halt_final)
303
304 def __disconnect(self):
305 _log.debug('Disconnecting...')
306 if not self.playbin is None:
307 self.playbin.set_state(gst.STATE_NULL)
308 for sink in self.playbin.sinks():
309 name = sink.get_name()
310 factoryname = sink.get_factory().get_name()
311
312 _log.debug('Disconnecting {0}'.format(name))
313
314 if factoryname == "fakesink":
315 pad = sink.get_pad("sink")
316 pad.remove_buffer_probe(self.buffer_probes[name])
317 del self.buffer_probes[name]
318
319 self.playbin = None
320
321 if self.bus is not None:
322 self.bus.disconnect(self.watch_id)
323 self.bus = None
324
325 def __halt_final(self):
326 _log.info('Done')
327 if self.errors:
328 _log.error(','.join(self.errors))
329
330 self.loop.quit()
331
332
333 class VideoThumbnailerMarkII(object):
334 '''
335 Creates a thumbnail from a video file. Rewrite of VideoThumbnailer.
336
337 Large parts of the functionality and overall architectue contained within
338 this object is taken from Participatory Culture Foundation's
339 `gst_extractor.Extractor` object last seen at
340 https://github.com/pculture/miro/blob/master/tv/lib/frontends/widgets/gst/gst_extractor.py
341 in the `miro` codebase.
342
343 The `miro` codebase and the gst_extractor.py are licensed under the GNU
344 General Public License v2 or later.
345 '''
346 STATE_NULL = 0
347 STATE_HALTING = 1
348 STATE_PROCESSING = 2
349 STATE_PROCESSING_THUMBNAIL = 3
350
351 def __init__(self, source_path, dest_path, width=None, height=None,
352 position_callback=None):
353 self.state = self.STATE_NULL
354
355 self.has_reached_playbin_pause = False
356
357 self.thumbnail_pipeline = None
358
359 self.permission_to_take_picture = False
360
361 self.buffer_probes = {}
362
363 self.errors = []
364
365 self.source_path = os.path.abspath(source_path)
366 self.dest_path = os.path.abspath(dest_path)
367
368 self.width = width
369 self.height = height
370 self.position_callback = position_callback \
371 or self.wadsworth_position_callback
372
373 self.mainloop = gobject.MainLoop()
374
375 self.playbin = gst.element_factory_make('playbin')
376
377 self.videosink = gst.element_factory_make('fakesink', 'videosink')
378 self.audiosink = gst.element_factory_make('fakesink', 'audiosink')
379
380 self.playbin.set_property('video-sink', self.videosink)
381 self.playbin.set_property('audio-sink', self.audiosink)
382
383 self.playbin_message_bus = self.playbin.get_bus()
384
385 self.playbin_message_bus.add_signal_watch()
386 self.playbin_bus_watch_id = self.playbin_message_bus.connect(
387 'message',
388 self.on_playbin_message)
389
390 self.playbin.set_property(
391 'uri',
392 'file:{0}'.format(
393 urllib.pathname2url(self.source_path)))
394
395 self.playbin.set_state(gst.STATE_PAUSED)
396
397 try:
398 self.run()
399 except Exception as exc:
400 _log.critical(
401 'Exception "{0}" caught, disconnecting and re-raising'\
402 .format(exc))
403 self.disconnect()
404 raise
405
406 def wadsworth_position_callback(self, duration, gst):
407 return self.duration / 100 * 30
408
409 def run(self):
410 self.mainloop.run()
411
412 def on_playbin_message(self, message_bus, message):
413 _log.debug('playbin message: {0}'.format(message))
414
415 if message.type == gst.MESSAGE_ERROR:
416 _log.error('playbin error: {0}'.format(message))
417 gobject.idle_add(self.on_playbin_error)
418
419 if message.type == gst.MESSAGE_STATE_CHANGED:
420 prev_state, cur_state, pending_state = \
421 message.parse_state_changed()
422
423 _log.debug('playbin state changed: \nprev: {0}\ncur: {1}\n \
424 pending: {2}'.format(
425 prev_state,
426 cur_state,
427 pending_state))
428
429 if cur_state == gst.STATE_PAUSED:
430 if message.src == self.playbin:
431 _log.info('playbin ready')
432 gobject.idle_add(self.on_playbin_paused)
433
434 def on_playbin_paused(self):
435 if self.has_reached_playbin_pause:
436 _log.warn('Has already reached logic for playbin pause. Aborting \
437 without doing anything this time.')
438 return False
439
440 self.has_reached_playbin_pause = True
441
442 current_video = self.playbin.get_property('current-video')
443
444 if not current_video:
445 _log.critical('thumbnail could not get any video data \
446 from playbin')
447
448 self.duration = self.get_duration(self.playbin)
449 self.permission_to_take_picture = True
450 self.buffer_probes = {}
451
452 pipeline = ''.join([
453 'filesrc location="%s" ! decodebin ! ' % self.source_path,
454 'ffmpegcolorspace ! videoscale ! ',
455 'video/x-raw-rgb,depth=24,bpp=24,pixel-aspect-ratio=1/1',
456 ',width={0}'.format(self.width) if self.width else '',
457 ',height={0}'.format(self.height) if self.height else '',
458 ' ! ',
459 'fakesink signal-handoffs=True'])
460
461 _log.debug('thumbnail_pipeline: {0}'.format(pipeline))
462
463 self.thumbnail_pipeline = gst.parse_launch(pipeline)
464 self.thumbnail_message_bus = self.thumbnail_pipeline.get_bus()
465 self.thumbnail_message_bus.add_signal_watch()
466 self.thumbnail_bus_watch_id = self.thumbnail_message_bus.connect(
467 'message',
468 self.on_thumbnail_message)
469
470 self.thumbnail_pipeline.set_state(gst.STATE_PAUSED)
471
472 gobject.timeout_add(3000, self.on_gobject_timeout)
473
474 return False
475
476 def on_thumbnail_message(self, message_bus, message):
477 _log.debug('thumbnail message: {0}'.format(message))
478
479 if message.type == gst.MESSAGE_ERROR:
480 _log.error('thumbnail error: {0}'.format(message))
481 gobject.idle_add(self.on_thumbnail_error)
482
483 if message.type == gst.MESSAGE_STATE_CHANGED:
484 prev_state, cur_state, pending_state = \
485 message.parse_state_changed()
486
487 _log.debug('thumbnail state changed: \nprev: {0}\ncur: {1}\n \
488 pending: {2}'.format(
489 prev_state,
490 cur_state,
491 pending_state))
492
493 if cur_state == gst.STATE_PAUSED and\
494 not self.state == self.STATE_PROCESSING_THUMBNAIL:
495 self.state = self.STATE_PROCESSING_THUMBNAIL
496
497 # Find the fakesink sink pad and attach the on_buffer_probe
498 # handler to it.
499 for sink in self.thumbnail_pipeline.sinks():
500 sink_name = sink.get_name()
501 sink_factory_name = sink.get_factory().get_name()
502
503 if sink_factory_name == 'fakesink':
504 sink_pad = sink.get_pad('sink')
505
506 self.buffer_probes[sink_name] = sink_pad\
507 .add_buffer_probe(
508 self.on_pad_buffer_probe,
509 sink_name)
510
511 _log.info('Attached buffer probes: {0}'.format(
512 self.buffer_probes))
513
514 break
515
516 seek_amount = self.position_callback(self.duration, gst)
517
518 seek_result = self.thumbnail_pipeline.seek(
519 1.0,
520 gst.FORMAT_TIME,
521 gst.SEEK_FLAG_FLUSH | gst.SEEK_FLAG_ACCURATE,
522 gst.SEEK_TYPE_SET,
523 seek_amount,
524 gst.SEEK_TYPE_NONE,
525 0)
526
527 if not seek_result:
528 _log.critical('Could not seek.')
529
530 elif self.state == self.STATE_PROCESSING_THUMBNAIL:
531 _log.debug('Already processing thumbnail')
532
533 def on_pad_buffer_probe(self, *args):
534 _log.debug('buffer probe handler: {0}'.format(args))
535 gobject.idle_add(lambda: self.take_snapshot(*args))
536
537 def take_snapshot(self, pad, buff, name):
538 if self.state == self.STATE_HALTING:
539 _log.debug('Pipeline is halting, will not take snapshot')
540 return False
541
542 _log.info('Taking snapshot! ({0})'.format(
543 (pad, buff, name)))
544 try:
545 caps = buff.caps
546 if caps is None:
547 _log.error('No buffer caps present /take_snapshot')
548 self.disconnect()
549
550 _log.debug('caps: {0}'.format(caps))
551
552 filters = caps[0]
553 width = filters['width']
554 height = filters['height']
555
556 im = Image.new('RGB', (width, height))
557
558 data = pixbuf_to_pilbuf(buff.data)
559
560 im.putdata(data)
561
562 im.save(self.dest_path)
563
564 _log.info('Saved snapshot!')
565
566 self.disconnect()
567
568 except gst.QueryError as exc:
569 _log.error('take_snapshot - QueryError: {0}'.format(exc))
570
571 return False
572
573 def on_thumbnail_error(self):
574 _log.error('Thumbnailing failed.')
575 self.disconnect()
576
577 def disconnect(self):
578 self.state = self.STATE_HALTING
579
580 if self.playbin is not None:
581 self.playbin.set_state(gst.STATE_NULL)
582
583 for sink in self.playbin.sinks():
584 sink_name = sink.get_name()
585 sink_factory_name = sink.get_factory().get_name()
586
587 if sink_factory_name == 'fakesink':
588 sink_pad = sink.get_pad('sink')
589 sink_pad.remove_buffer_probe(self.buffer_probes[sink_name])
590 del self.buffer_probes[sink_name]
591
592 self.playbin = None
593
594 if self.thumbnail_pipeline is not None:
595 self.thumbnail_pipeline.set_state(gst.STATE_NULL)
596 self.thumbnail_pipeline = None
597
598 if self.playbin_message_bus is not None:
599 self.playbin_message_bus.disconnect(self.playbin_bus_watch_id)
600 self.playbin_message_bus = None
601
602 self.halt()
603
604 def halt(self):
605 gobject.idle_add(self.mainloop.quit)
606
607 def on_gobject_timeout(self):
608 _log.critical('Reached gobject timeout')
609 self.disconnect()
610
611 def get_duration(self, pipeline, attempt=1):
612 if attempt == 5:
613 _log.critical('Pipeline duration query retry limit reached.')
614 return 0
615
616 try:
617 return pipeline.query_duration(gst.FORMAT_TIME)[0]
618 except gst.QueryError as exc:
619 _log.error('Could not get duration on attempt {0}: {1}'.format(
620 attempt,
621 exc))
622 return self.get_duration(pipeline, attempt + 1)
623
624
625 class VideoTranscoder:
626 '''
627 Video transcoder
628
629 Transcodes the SRC video file to a VP8 WebM video file at DST
630
631 - Does the same thing as VideoThumbnailer, but produces a WebM vp8
632 and vorbis video file.
633 - The VideoTranscoder exceeds the VideoThumbnailer in the way
634 that it was refined afterwards and therefore is done more
635 correctly.
636 '''
637 def __init__(self):
638 _log.info('Initializing VideoTranscoder...')
639 self.progress_percentage = None
640 self.loop = gobject.MainLoop()
641
642 def transcode(self, src, dst, **kwargs):
643 '''
644 Transcode a video file into a 'medium'-sized version.
645 '''
646 self.source_path = src
647 self.destination_path = dst
648
649 # vp8enc options
650 self.destination_dimensions = kwargs.get('dimensions', (640, 640))
651 self.vp8_quality = kwargs.get('vp8_quality', 8)
652 # Number of threads used by vp8enc:
653 # number of real cores - 1 as per recommendation on
654 # <http://www.webmproject.org/tools/encoder-parameters/#6-multi-threaded-encode-and-decode>
655 self.vp8_threads = kwargs.get('vp8_threads', CPU_COUNT - 1)
656
657 # 0 means auto-detect, but dict.get() only falls back to CPU_COUNT
658 # if value is None, this will correct our incompatibility with
659 # dict.get()
660 # This will also correct cases where there's only 1 CPU core, see
661 # original self.vp8_threads assignment above.
662 if self.vp8_threads == 0:
663 self.vp8_threads = CPU_COUNT
664
665 # vorbisenc options
666 self.vorbis_quality = kwargs.get('vorbis_quality', 0.3)
667
668 self._progress_callback = kwargs.get('progress_callback') or None
669
670 if not type(self.destination_dimensions) == tuple:
671 raise Exception('dimensions must be tuple: (width, height)')
672
673 self._setup()
674 self._run()
675
676 # XXX: This could be a static method.
677 def discover(self, src):
678 '''
679 Discover properties about a media file
680 '''
681 _log.info('Discovering {0}'.format(src))
682
683 self.source_path = src
684 self._setup_discover(discovered_callback=self.__on_discovered)
685
686 self.discoverer.discover()
687
688 self.loop.run()
689
690 if hasattr(self, '_discovered_data'):
691 return self._discovered_data.__dict__
692 else:
693 return None
694
695 def __on_discovered(self, data, is_media):
696 _log.debug('Discovered: {0}'.format(data))
697 if not is_media:
698 self.__stop()
699 raise Exception('Could not discover {0}'.format(self.source_path))
700
701 self._discovered_data = data
702
703 self.__stop_mainloop()
704
705 def _setup(self):
706 self._setup_discover()
707 self._setup_pipeline()
708
709 def _run(self):
710 _log.info('Discovering...')
711 self.discoverer.discover()
712 _log.info('Done')
713
714 _log.debug('Initializing MainLoop()')
715 self.loop.run()
716
717 def _setup_discover(self, **kw):
718 _log.debug('Setting up discoverer')
719 self.discoverer = discoverer.Discoverer(self.source_path)
720
721 # Connect self.__discovered to the 'discovered' event
722 self.discoverer.connect(
723 'discovered',
724 kw.get('discovered_callback', self.__discovered))
725
726 def __discovered(self, data, is_media):
727 '''
728 Callback for media discoverer.
729 '''
730 if not is_media:
731 self.__stop()
732 raise Exception('Could not discover {0}'.format(self.source_path))
733
734 _log.debug('__discovered, data: {0}'.format(data.__dict__))
735
736 self.data = data
737
738 # Launch things that should be done after discovery
739 self._link_elements()
740 self.__setup_videoscale_capsfilter()
741
742 # Tell the transcoding pipeline to start running
743 self.pipeline.set_state(gst.STATE_PLAYING)
744 _log.info('Transcoding...')
745
746 def _setup_pipeline(self):
747 _log.debug('Setting up transcoding pipeline')
748 # Create the pipeline bin.
749 self.pipeline = gst.Pipeline('VideoTranscoderPipeline')
750
751 # Create all GStreamer elements, starting with
752 # filesrc & decoder
753 self.filesrc = gst.element_factory_make('filesrc', 'filesrc')
754 self.filesrc.set_property('location', self.source_path)
755 self.pipeline.add(self.filesrc)
756
757 self.decoder = gst.element_factory_make('decodebin2', 'decoder')
758 self.decoder.connect('new-decoded-pad', self._on_dynamic_pad)
759 self.pipeline.add(self.decoder)
760
761 # Video elements
762 self.videoqueue = gst.element_factory_make('queue', 'videoqueue')
763 self.pipeline.add(self.videoqueue)
764
765 self.videorate = gst.element_factory_make('videorate', 'videorate')
766 self.pipeline.add(self.videorate)
767
768 self.ffmpegcolorspace = gst.element_factory_make(
769 'ffmpegcolorspace', 'ffmpegcolorspace')
770 self.pipeline.add(self.ffmpegcolorspace)
771
772 self.videoscale = gst.element_factory_make('ffvideoscale', 'videoscale')
773 #self.videoscale.set_property('method', 2) # I'm not sure this works
774 #self.videoscale.set_property('add-borders', 0)
775 self.pipeline.add(self.videoscale)
776
777 self.capsfilter = gst.element_factory_make('capsfilter', 'capsfilter')
778 self.pipeline.add(self.capsfilter)
779
780 self.vp8enc = gst.element_factory_make('vp8enc', 'vp8enc')
781 self.vp8enc.set_property('quality', self.vp8_quality)
782 self.vp8enc.set_property('threads', self.vp8_threads)
783 self.vp8enc.set_property('max-latency', 25)
784 self.pipeline.add(self.vp8enc)
785
786 # Audio elements
787 self.audioqueue = gst.element_factory_make('queue', 'audioqueue')
788 self.pipeline.add(self.audioqueue)
789
790 self.audiorate = gst.element_factory_make('audiorate', 'audiorate')
791 self.audiorate.set_property('tolerance', 80000000)
792 self.pipeline.add(self.audiorate)
793
794 self.audioconvert = gst.element_factory_make('audioconvert', 'audioconvert')
795 self.pipeline.add(self.audioconvert)
796
797 self.audiocapsfilter = gst.element_factory_make('capsfilter',
798 'audiocapsfilter')
799 audiocaps = ['audio/x-raw-float']
800 self.audiocapsfilter.set_property(
801 'caps',
802 gst.caps_from_string(
803 ','.join(audiocaps)))
804 self.pipeline.add(self.audiocapsfilter)
805
806 self.vorbisenc = gst.element_factory_make('vorbisenc', 'vorbisenc')
807 self.vorbisenc.set_property('quality', self.vorbis_quality)
808 self.pipeline.add(self.vorbisenc)
809
810 # WebMmux & filesink
811 self.webmmux = gst.element_factory_make('webmmux', 'webmmux')
812 self.pipeline.add(self.webmmux)
813
814 self.filesink = gst.element_factory_make('filesink', 'filesink')
815 self.filesink.set_property('location', self.destination_path)
816 self.pipeline.add(self.filesink)
817
818 # Progressreport
819 self.progressreport = gst.element_factory_make(
820 'progressreport', 'progressreport')
821 # Update every second
822 self.progressreport.set_property('update-freq', 1)
823 self.progressreport.set_property('silent', True)
824 self.pipeline.add(self.progressreport)
825
826 def _link_elements(self):
827 '''
828 Link all the elements
829
830 This code depends on data from the discoverer and is called
831 from __discovered
832 '''
833 _log.debug('linking elements')
834 # Link the filesrc element to the decoder. The decoder then emits
835 # 'new-decoded-pad' which links decoded src pads to either a video
836 # or audio sink
837 self.filesrc.link(self.decoder)
838
839 # Link all the video elements in a row to webmmux
840 gst.element_link_many(
841 self.videoqueue,
842 self.videorate,
843 self.ffmpegcolorspace,
844 self.videoscale,
845 self.capsfilter,
846 self.vp8enc,
847 self.webmmux)
848
849 if self.data.is_audio:
850 # Link all the audio elements in a row to webmux
851 gst.element_link_many(
852 self.audioqueue,
853 self.audiorate,
854 self.audioconvert,
855 self.audiocapsfilter,
856 self.vorbisenc,
857 self.webmmux)
858
859 gst.element_link_many(
860 self.webmmux,
861 self.progressreport,
862 self.filesink)
863
864 # Setup the message bus and connect _on_message to the pipeline
865 self._setup_bus()
866
867 def _on_dynamic_pad(self, dbin, pad, islast):
868 '''
869 Callback called when ``decodebin2`` has a pad that we can connect to
870 '''
871 # Intersect the capabilities of the video sink and the pad src
872 # Then check if they have no common capabilities.
873 if self.ffmpegcolorspace.get_pad_template('sink')\
874 .get_caps().intersect(pad.get_caps()).is_empty():
875 # It is NOT a video src pad.
876 pad.link(self.audioqueue.get_pad('sink'))
877 else:
878 # It IS a video src pad.
879 pad.link(self.videoqueue.get_pad('sink'))
880
881 def _setup_bus(self):
882 self.bus = self.pipeline.get_bus()
883 self.bus.add_signal_watch()
884 self.bus.connect('message', self._on_message)
885
886 def __setup_videoscale_capsfilter(self):
887 '''
888 Sets up the output format (width, height) for the video
889 '''
890 caps = ['video/x-raw-yuv', 'pixel-aspect-ratio=1/1', 'framerate=30/1']
891
892 if self.data.videoheight > self.data.videowidth:
893 # Whoa! We have ourselves a portrait video!
894 caps.append('height={0}'.format(
895 self.destination_dimensions[1]))
896 else:
897 # It's a landscape, phew, how normal.
898 caps.append('width={0}'.format(
899 self.destination_dimensions[0]))
900
901 self.capsfilter.set_property(
902 'caps',
903 gst.caps_from_string(
904 ','.join(caps)))
905
906 def _on_message(self, bus, message):
907 _log.debug((bus, message, message.type))
908
909 t = message.type
910
911 if message.type == gst.MESSAGE_EOS:
912 self._discover_dst_and_stop()
913 _log.info('Done')
914
915 elif message.type == gst.MESSAGE_ELEMENT:
916 if message.structure.get_name() == 'progress':
917 data = dict(message.structure)
918 # Update progress state if it has changed
919 if self.progress_percentage != data.get('percent'):
920 self.progress_percentage = data.get('percent')
921 if self._progress_callback:
922 self._progress_callback(data.get('percent'))
923
924 _log.info('{percent}% done...'.format(
925 percent=data.get('percent')))
926 _log.debug(data)
927
928 elif t == gst.MESSAGE_ERROR:
929 _log.error((bus, message))
930 self.__stop()
931
932 def _discover_dst_and_stop(self):
933 self.dst_discoverer = discoverer.Discoverer(self.destination_path)
934
935 self.dst_discoverer.connect('discovered', self.__dst_discovered)
936
937 self.dst_discoverer.discover()
938
939 def __dst_discovered(self, data, is_media):
940 self.dst_data = data
941
942 self.__stop()
943
944 def __stop(self):
945 _log.debug(self.loop)
946
947 if hasattr(self, 'pipeline'):
948 # Stop executing the pipeline
949 self.pipeline.set_state(gst.STATE_NULL)
950
951 # This kills the loop, mercifully
952 gobject.idle_add(self.__stop_mainloop)
953
954 def __stop_mainloop(self):
955 '''
956 Wrapper for gobject.MainLoop.quit()
957
958 This wrapper makes us able to see if self.loop.quit has been called
959 '''
960 _log.info('Terminating MainLoop')
961
962 self.loop.quit()
963
964
965 if __name__ == '__main__':
966 os.nice(19)
967 logging.basicConfig()
968 from optparse import OptionParser
969
970 parser = OptionParser(
971 usage='%prog [-v] -a [ video | thumbnail | discover ] SRC [ DEST ]')
972
973 parser.add_option('-a', '--action',
974 dest='action',
975 help='One of "video", "discover" or "thumbnail"')
976
977 parser.add_option('-v',
978 dest='verbose',
979 action='store_true',
980 help='Output debug information')
981
982 parser.add_option('-q',
983 dest='quiet',
984 action='store_true',
985 help='Dear program, please be quiet unless *error*')
986
987 (options, args) = parser.parse_args()
988
989 if options.verbose:
990 _log.setLevel(logging.DEBUG)
991 else:
992 _log.setLevel(logging.INFO)
993
994 if options.quiet:
995 _log.setLevel(logging.ERROR)
996
997 _log.debug(args)
998
999 if not len(args) == 2 and not options.action == 'discover':
1000 parser.print_help()
1001 sys.exit()
1002
1003 transcoder = VideoTranscoder()
1004
1005 if options.action == 'thumbnail':
1006 VideoThumbnailerMarkII(*args)
1007 elif options.action == 'video':
1008 def cb(data):
1009 print('I\'m a callback!')
1010 transcoder.transcode(*args, progress_callback=cb)
1011 elif options.action == 'discover':
1012 print transcoder.discover(*args).__dict__