Refractored GStreamer element linking
[mediagoblin.git] / mediagoblin / media_types / video / transcoders.py
CommitLineData
26729e02
JW
1# GNU MediaGoblin -- federated, autonomous media hosting
2# Copyright (C) 2011 MediaGoblin contributors. See AUTHORS.
3#
4# This program is free software: you can redistribute it and/or modify
5# it under the terms of the GNU Affero General Public License as published by
6# the Free Software Foundation, either version 3 of the License, or
7# (at your option) any later version.
8#
9# This program is distributed in the hope that it will be useful,
10# but WITHOUT ANY WARRANTY; without even the implied warranty of
11# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12# GNU Affero General Public License for more details.
13#
14# You should have received a copy of the GNU Affero General Public License
15# along with this program. If not, see <http://www.gnu.org/licenses/>.
16
a249b6d3 17from __future__ import division
206ef749
JW
18
19import os
206ef749
JW
20os.putenv('GST_DEBUG_DUMP_DOT_DIR', '/tmp')
21
26729e02
JW
22import sys
23import logging
e9c1b938 24import pdb
206ef749 25import urllib
a249b6d3 26
26729e02
JW
27_log = logging.getLogger(__name__)
28logging.basicConfig()
a249b6d3 29_log.setLevel(logging.DEBUG)
26729e02 30
64fd0462
JW
31CPU_COUNT = 2
32try:
33 import multiprocessing
34 try:
35 CPU_COUNT = multiprocessing.cpu_count()
36 except NotImplementedError:
37 _log.warning('multiprocessing.cpu_count not implemented')
38 pass
39except ImportError:
40 _log.warning('Could not import multiprocessing, defaulting to 2 CPU cores')
41 pass
42
315266b4
JW
43try:
44 import gtk
45except:
46 raise Exception('Could not find pygtk')
206ef749 47
26729e02
JW
48try:
49 import gobject
a249b6d3 50 gobject.threads_init()
26729e02 51except:
206ef749 52 raise Exception('gobject could not be found')
26729e02
JW
53
54try:
55 import pygst
56 pygst.require('0.10')
57 import gst
a249b6d3 58 from gst.extend import discoverer
26729e02 59except:
206ef749 60 raise Exception('gst/pygst 0.10 could not be found')
26729e02 61
89d764cd 62
26729e02 63class VideoThumbnailer:
206ef749
JW
64 # Declaration of thumbnailer states
65 STATE_NULL = 0
66 STATE_HALTING = 1
67 STATE_PROCESSING = 2
68
69 # The current thumbnailer state
70 state = STATE_NULL
71
72 # This will contain the thumbnailing pipeline
73 thumbnail_pipeline = None
74
75 buffer_probes = {}
76
77 errors = []
78
79 def __init__(self, source_path, dest_path):
80 '''
81 Set up playbin pipeline in order to get video properties.
82
83 Initializes and runs the gobject.MainLoop()
84 '''
85 self.source_path = source_path
86 self.dest_path = dest_path
87
88 self.loop = gobject.MainLoop()
89
90 # Set up the playbin. It will be used to discover certain
91 # properties of the input file
92 self.playbin = gst.element_factory_make('playbin')
93
94 self.videosink = gst.element_factory_make('fakesink', 'videosink')
95 self.playbin.set_property('video-sink', self.videosink)
96
97 #self.audiosink = gst.element_factory_make('fakesink', 'audiosink')
98 #self.playbin.set_property('audio-sink', self.audiosink)
99
100 self.bus = self.playbin.get_bus()
101 self.bus.add_signal_watch()
102 self.watch_id = self.bus.connect('message', self._on_bus_message)
103
104 self.playbin.set_property('uri', 'file:{0}'.format(
105 urllib.pathname2url(self.source_path)))
106
107 self.playbin.set_state(gst.STATE_PAUSED)
108
109 self.run()
110
111 def run(self):
112 self.loop.run()
113
114 def _on_bus_message(self, bus, message):
115 _log.debug(' BUS MESSAGE: {0}'.format(message))
116
117 if message.type == gst.MESSAGE_ERROR:
118 gobject.idle_add(self._on_bus_error)
119
120 elif message.type == gst.MESSAGE_STATE_CHANGED:
121 # The pipeline state has changed
122 # Parse state changing data
123 _prev, state, _pending = message.parse_state_changed()
124
125 _log.debug('State changed: {0}'.format(state))
126
127 if state == gst.STATE_PAUSED:
128 if message.src == self.playbin:
129 gobject.idle_add(self._on_bus_paused)
130
131 def _on_bus_paused(self):
132 '''
133 Set up thumbnailing pipeline
134 '''
135 current_video = self.playbin.get_property('current-video')
136
137 if current_video == 0:
138 _log.debug('Found current video from playbin')
139 else:
140 _log.error('Could not get any current video from playbin!')
141
142 self.duration = self._get_duration(self.playbin)
143 _log.info('Video length: {0}'.format(self.duration / gst.SECOND))
144
145 _log.info('Setting up thumbnailing pipeline')
146 self.thumbnail_pipeline = gst.parse_launch(
147 'filesrc location="{0}" ! decodebin ! '
148 'ffmpegcolorspace ! videoscale ! '
149 'video/x-raw-rgb,depth=24,bpp=24,pixel-aspect-ratio=1/1,width=180 ! '
150 'fakesink signal-handoffs=True'.format(self.source_path))
151
152 self.thumbnail_bus = self.thumbnail_pipeline.get_bus()
153 self.thumbnail_bus.add_signal_watch()
154 self.thumbnail_watch_id = self.thumbnail_bus.connect(
155 'message', self._on_thumbnail_bus_message)
156
157 self.thumbnail_pipeline.set_state(gst.STATE_PAUSED)
158
159 #gobject.timeout_add(3000, self._on_timeout)
160
161 return False
162
163 def _on_thumbnail_bus_message(self, bus, message):
164 _log.debug('Thumbnail bus called, message: {0}'.format(message))
165
166 if message.type == gst.MESSAGE_ERROR:
167 _log.error(message)
168 gobject.idle_add(self._on_bus_error)
169
170 if message.type == gst.MESSAGE_STATE_CHANGED:
171 _prev, state, _pending = message.parse_state_changed()
172
173 if (state == gst.STATE_PAUSED and
174 not self.state == self.STATE_PROCESSING and
175 message.src == self.thumbnail_pipeline):
176 _log.info('Pipeline paused, processing')
177 self.state = self.STATE_PROCESSING
178
179 for sink in self.thumbnail_pipeline.sinks():
180 name = sink.get_name()
181 factoryname = sink.get_factory().get_name()
182
183 if factoryname == 'fakesink':
184 sinkpad = sink.get_pad('sink')
185
186 self.buffer_probes[name] = sinkpad.add_buffer_probe(
187 self.buffer_probe_handler, name)
188
189 _log.info('Added buffer probe')
190
191 break
192
193 # Apply the wadsworth constant, fallback to 1 second
194 seek_amount = max(self.duration / 100 * 30, 1 * gst.SECOND)
195
196 _log.debug('seek amount: {0}'.format(seek_amount))
197
198
199 seek_result = self.thumbnail_pipeline.seek(
200 1.0,
201 gst.FORMAT_TIME,
202 gst.SEEK_FLAG_FLUSH | gst.SEEK_FLAG_ACCURATE,
203 gst.SEEK_TYPE_SET,
204 seek_amount,
205 gst.SEEK_TYPE_NONE,
206 0)
207 '''
208
209 seek_result = self.thumbnail_pipeline.seek_simple(
210 gst.FORMAT_TIME,
211 gst.SEEK_FLAG_FLUSH | gst.SEEK_FLAG_ACCURATE,
212 seek_amount)
213
214 '''
215
216 if not seek_result:
217 self.errors.append('COULD_NOT_SEEK')
218 _log.error('Couldn\'t seek! result: {0}'.format(
219 seek_result))
220 _log.info(message)
221 self.shutdown()
222 else:
223 pass
224 #self.thumbnail_pipeline.set_state(gst.STATE_PAUSED)
225 #pdb.set_trace()
226
227 def buffer_probe_handler_real(self, pad, buff, name):
228 '''
229 Capture buffers as gdk_pixbufs when told to.
230 '''
231 try:
232 caps = buff.caps
233 if caps is None:
234 _log.error('No caps passed to buffer probe handler!')
235 self.shutdown()
236 return False
237
238 _log.debug('caps: {0}'.format(caps))
239
240 filters = caps[0]
241 width = filters["width"]
242 height = filters["height"]
243
244 pixbuf = gtk.gdk.pixbuf_new_from_data(
245 buff.data, gtk.gdk.COLORSPACE_RGB, False, 8,
246 width, height, width * 3)
247
248 # NOTE: 200x136 is sort of arbitrary. it's larger than what
249 # the ui uses at the time of this writing.
250 # new_width, new_height = scaled_size((width, height), (200, 136))
251
252 #pixbuf = pixbuf.scale_simple(
253 #new_width, new_height, gtk.gdk.INTERP_BILINEAR)
254
255 pixbuf.save(self.dest_path, 'jpeg')
256 _log.info('Saved thumbnail')
257 del pixbuf
258 self.shutdown()
259 except gst.QueryError:
260 pass
261 return False
262
263 def buffer_probe_handler(self, pad, buff, name):
264 '''
265 Proxy function for buffer_probe_handler_real
266 '''
267 gobject.idle_add(
268 lambda: self.buffer_probe_handler_real(pad, buff, name))
269
270 return True
271
272 def _get_duration(self, pipeline, retries=0):
273 '''
274 Get the duration of a pipeline.
275
276 Retries 5 times.
277 '''
278 if retries == 5:
279 return 0
280
281 try:
282 return pipeline.query_duration(gst.FORMAT_TIME)[0]
283 except gst.QueryError:
284 return self._get_duration(pipeline, retries + 1)
285
286 def _on_timeout(self):
287 _log.error('TIMEOUT! DROP EVERYTHING!')
288 self.shutdown()
289
290 def _on_bus_error(self, *args):
291 _log.error('AHAHAHA! Error! args: {0}'.format(args))
292
293 def shutdown(self):
294 '''
295 Tell gobject to call __halt when the mainloop is idle.
296 '''
297 _log.info('Shutting down')
298 self.__halt()
299
300 def __halt(self):
301 '''
302 Halt all pipelines and shut down the main loop
303 '''
304 _log.info('Halting...')
305 self.state = self.STATE_HALTING
306
307 self.__disconnect()
308
309 gobject.idle_add(self.__halt_final)
310
311 def __disconnect(self):
312 _log.debug('Disconnecting...')
313 if not self.playbin is None:
314 self.playbin.set_state(gst.STATE_NULL)
315 for sink in self.playbin.sinks():
316 name = sink.get_name()
317 factoryname = sink.get_factory().get_name()
318
319 _log.debug('Disconnecting {0}'.format(name))
320
321 if factoryname == "fakesink":
322 pad = sink.get_pad("sink")
323 pad.remove_buffer_probe(self.buffer_probes[name])
324 del self.buffer_probes[name]
325
326 self.playbin = None
327
328 if self.bus is not None:
329 self.bus.disconnect(self.watch_id)
330 self.bus = None
331
332
333 def __halt_final(self):
334 _log.info('Done')
335 if self.errors:
336 _log.error(','.join(self.errors))
337
338 self.loop.quit()
339
340
e9c1b938 341class VideoTranscoder:
a249b6d3
JW
342 '''
343 Video transcoder
344
e9c1b938
JW
345 Transcodes the SRC video file to a VP8 WebM video file at DST
346
206ef749
JW
347 - Does the same thing as VideoThumbnailer, but produces a WebM vp8
348 and vorbis video file.
349 - The VideoTranscoder exceeds the VideoThumbnailer in the way
350 that it was refined afterwards and therefore is done more
351 correctly.
a249b6d3
JW
352 '''
353 def __init__(self, src, dst, **kwargs):
354 _log.info('Initializing VideoTranscoder...')
355
356 self.loop = gobject.MainLoop()
357 self.source_path = src
358 self.destination_path = dst
359
206ef749 360 # Options
e9c1b938 361 self.destination_dimensions = kwargs.get('dimensions') or (640, 640)
206ef749 362 self._progress_callback = kwargs.get('progress_callback') or None
a249b6d3
JW
363
364 if not type(self.destination_dimensions) == tuple:
365 raise Exception('dimensions must be tuple: (width, height)')
366
367 self._setup()
368 self._run()
369
370 def _setup(self):
a249b6d3 371 self._setup_discover()
206ef749 372 self._setup_pipeline()
a249b6d3
JW
373
374 def _run(self):
375 _log.info('Discovering...')
376 self.discoverer.discover()
377 _log.info('Done')
378
379 _log.debug('Initializing MainLoop()')
380 self.loop.run()
381
382 def _setup_discover(self):
206ef749 383 _log.debug('Setting up discoverer')
a249b6d3
JW
384 self.discoverer = discoverer.Discoverer(self.source_path)
385
386 # Connect self.__discovered to the 'discovered' event
387 self.discoverer.connect('discovered', self.__discovered)
388
389 def __discovered(self, data, is_media):
390 '''
391 Callback for media discoverer.
392 '''
393 if not is_media:
394 self.__stop()
395 raise Exception('Could not discover {0}'.format(self.source_path))
396
206ef749 397 _log.debug('__discovered, data: {0}'.format(data.__dict__))
a249b6d3
JW
398
399 self.data = data
400
206ef749
JW
401 # Launch things that should be done after discovery
402 self._link_elements()
403 self.__setup_videoscale_capsfilter()
e9c1b938 404
a249b6d3
JW
405 # Tell the transcoding pipeline to start running
406 self.pipeline.set_state(gst.STATE_PLAYING)
407 _log.info('Transcoding...')
408
206ef749
JW
409 def _setup_pipeline(self):
410 _log.debug('Setting up transcoding pipeline')
411 # Create the pipeline bin.
a249b6d3
JW
412 self.pipeline = gst.Pipeline('VideoTranscoderPipeline')
413
206ef749
JW
414 # Create all GStreamer elements, starting with
415 # filesrc & decoder
a249b6d3
JW
416 self.filesrc = gst.element_factory_make('filesrc', 'filesrc')
417 self.filesrc.set_property('location', self.source_path)
418 self.pipeline.add(self.filesrc)
419
420 self.decoder = gst.element_factory_make('decodebin2', 'decoder')
a249b6d3
JW
421 self.decoder.connect('new-decoded-pad', self._on_dynamic_pad)
422 self.pipeline.add(self.decoder)
423
206ef749 424 # Video elements
b33701b8
JW
425 self.videoqueue = gst.element_factory_make('queue', 'videoqueue')
426 self.pipeline.add(self.videoqueue)
427
64fd0462
JW
428 self.videorate = gst.element_factory_make('videorate', 'videorate')
429 self.pipeline.add(self.videorate)
430
206ef749
JW
431 self.ffmpegcolorspace = gst.element_factory_make(
432 'ffmpegcolorspace', 'ffmpegcolorspace')
a249b6d3 433 self.pipeline.add(self.ffmpegcolorspace)
64fd0462 434
b33701b8
JW
435 self.videoscale = gst.element_factory_make('ffvideoscale', 'videoscale')
436 #self.videoscale.set_property('method', 2) # I'm not sure this works
437 #self.videoscale.set_property('add-borders', 0)
a249b6d3
JW
438 self.pipeline.add(self.videoscale)
439
440 self.capsfilter = gst.element_factory_make('capsfilter', 'capsfilter')
441 self.pipeline.add(self.capsfilter)
442
443 self.vp8enc = gst.element_factory_make('vp8enc', 'vp8enc')
444 self.vp8enc.set_property('quality', 6)
445 self.vp8enc.set_property('threads', 2)
e9c1b938
JW
446 self.pipeline.add(self.vp8enc)
447
206ef749 448 # Audio elements
b33701b8
JW
449 self.audioqueue = gst.element_factory_make('queue', 'audioqueue')
450 self.pipeline.add(self.audioqueue)
451
64fd0462 452 self.audiorate = gst.element_factory_make('audiorate', 'audiorate')
359781f0 453 self.audiorate.set_property('tolerance', 80000000)
64fd0462
JW
454 self.pipeline.add(self.audiorate)
455
e9c1b938
JW
456 self.audioconvert = gst.element_factory_make('audioconvert', 'audioconvert')
457 self.pipeline.add(self.audioconvert)
458
64fd0462
JW
459 self.audiocapsfilter = gst.element_factory_make('capsfilter', 'audiocapsfilter')
460 audiocaps = ['audio/x-raw-float']
461 self.audiocapsfilter.set_property(
462 'caps',
463 gst.caps_from_string(
464 ','.join(audiocaps)))
465 self.pipeline.add(self.audiocapsfilter)
466
e9c1b938 467 self.vorbisenc = gst.element_factory_make('vorbisenc', 'vorbisenc')
64fd0462 468 self.vorbisenc.set_property('quality', 1)
e9c1b938
JW
469 self.pipeline.add(self.vorbisenc)
470
206ef749 471 # WebMmux & filesink
a249b6d3
JW
472 self.webmmux = gst.element_factory_make('webmmux', 'webmmux')
473 self.pipeline.add(self.webmmux)
474
475 self.filesink = gst.element_factory_make('filesink', 'filesink')
e9c1b938
JW
476 self.filesink.set_property('location', self.destination_path)
477 self.pipeline.add(self.filesink)
a249b6d3 478
206ef749
JW
479 # Progressreport
480 self.progressreport = gst.element_factory_make(
481 'progressreport', 'progressreport')
482 # Update every second
483 self.progressreport.set_property('update-freq', 1)
484 self.progressreport.set_property('silent', True)
485 self.pipeline.add(self.progressreport)
486
487 def _link_elements(self):
488 '''
489 Link all the elements
490
491 This code depends on data from the discoverer and is called
492 from __discovered
493 '''
494 _log.debug('linking elements')
495 # Link the filesrc element to the decoder. The decoder then emits
496 # 'new-decoded-pad' which links decoded src pads to either a video
497 # or audio sink
a249b6d3 498 self.filesrc.link(self.decoder)
206ef749 499
c875bb74
JW
500 # Link all the video elements in a row to webmmux
501 gst.element_link_many(
502 self.videoqueue,
503 self.videorate,
504 self.ffmpegcolorspace,
505 self.videoscale,
506 self.capsfilter,
507 self.vp8enc,
508 self.webmmux)
e9c1b938 509
206ef749 510 if self.data.is_audio:
c875bb74
JW
511 # Link all the audio elements in a row to webmux
512 gst.element_link_many(
513 self.audioqueue,
514 self.audiorate,
515 self.audioconvert,
516 self.audiocapsfilter,
517 self.vorbisenc,
518 self.webmmux)
519
520 gst.element_link_many(
521 self.webmmux,
522 self.progressreport,
523 self.filesink)
a249b6d3 524
206ef749 525 # Setup the message bus and connect _on_message to the pipeline
a249b6d3
JW
526 self._setup_bus()
527
206ef749 528
a249b6d3
JW
529 def _on_dynamic_pad(self, dbin, pad, islast):
530 '''
531 Callback called when ``decodebin2`` has a pad that we can connect to
532 '''
206ef749
JW
533 # Intersect the capabilities of the video sink and the pad src
534 # Then check if they have no common capabilities.
e9c1b938
JW
535 if self.ffmpegcolorspace.get_pad_template('sink')\
536 .get_caps().intersect(pad.get_caps()).is_empty():
206ef749 537 # It is NOT a video src pad.
b33701b8 538 pad.link(self.audioqueue.get_pad('sink'))
e9c1b938 539 else:
206ef749 540 # It IS a video src pad.
b33701b8 541 pad.link(self.videoqueue.get_pad('sink'))
a249b6d3
JW
542
543 def _setup_bus(self):
544 self.bus = self.pipeline.get_bus()
545 self.bus.add_signal_watch()
546 self.bus.connect('message', self._on_message)
547
e9c1b938 548 def __setup_videoscale_capsfilter(self):
206ef749
JW
549 '''
550 Sets up the output format (width, height) for the video
551 '''
64fd0462 552 caps = ['video/x-raw-yuv', 'pixel-aspect-ratio=1/1', 'framerate=30/1']
a249b6d3
JW
553
554 if self.data.videoheight > self.data.videowidth:
e9c1b938
JW
555 # Whoa! We have ourselves a portrait video!
556 caps.append('height={0}'.format(
557 self.destination_dimensions[1]))
a249b6d3 558 else:
e9c1b938
JW
559 # It's a landscape, phew, how normal.
560 caps.append('width={0}'.format(
561 self.destination_dimensions[0]))
a249b6d3 562
e9c1b938
JW
563 self.capsfilter.set_property(
564 'caps',
565 gst.caps_from_string(
64fd0462 566 ','.join(caps)))
a249b6d3
JW
567
568 def _on_message(self, bus, message):
206ef749 569 _log.debug((bus, message, message.type))
a249b6d3
JW
570
571 t = message.type
572
573 if t == gst.MESSAGE_EOS:
e9c1b938 574 self._discover_dst_and_stop()
a249b6d3 575 _log.info('Done')
206ef749
JW
576
577 elif t == gst.MESSAGE_ELEMENT:
578 if message.structure.get_name() == 'progress':
579 data = {
580 'structure': message.structure,
581 'percent': message.structure['percent'],
582 'total': message.structure['total'],
583 'current': message.structure['current']}
584
585 if self._progress_callback:
586 self._progress_callback(data)
587
588 _log.info('{percent}% done...'.format(
589 percent=data['percent']))
590 _log.debug(data)
591
a249b6d3
JW
592 elif t == gst.MESSAGE_ERROR:
593 _log.error((bus, message))
594 self.__stop()
595
e9c1b938
JW
596 def _discover_dst_and_stop(self):
597 self.dst_discoverer = discoverer.Discoverer(self.destination_path)
598
599 self.dst_discoverer.connect('discovered', self.__dst_discovered)
600
601 self.dst_discoverer.discover()
602
603
604 def __dst_discovered(self, data, is_media):
605 self.dst_data = data
606
607 self.__stop()
608
a249b6d3 609 def __stop(self):
26729e02
JW
610 _log.debug(self.loop)
611
206ef749 612 # Stop executing the pipeline
26729e02
JW
613 self.pipeline.set_state(gst.STATE_NULL)
614
206ef749
JW
615 # This kills the loop, mercifully
616 gobject.idle_add(self.__stop_mainloop)
617
618 def __stop_mainloop(self):
619 '''
620 Wrapper for gobject.MainLoop.quit()
621
622 This wrapper makes us able to see if self.loop.quit has been called
623 '''
624 _log.info('Terminating MainLoop')
625
626 self.loop.quit()
26729e02
JW
627
628
629if __name__ == '__main__':
206ef749 630 os.nice(19)
a249b6d3
JW
631 from optparse import OptionParser
632
633 parser = OptionParser(
634 usage='%prog [-v] -a [ video | thumbnail ] SRC DEST')
635
636 parser.add_option('-a', '--action',
637 dest='action',
638 help='One of "video" or "thumbnail"')
639
640 parser.add_option('-v',
641 dest='verbose',
642 action='store_true',
643 help='Output debug information')
644
645 parser.add_option('-q',
646 dest='quiet',
647 action='store_true',
648 help='Dear program, please be quiet unless *error*')
649
650 (options, args) = parser.parse_args()
651
652 if options.verbose:
653 _log.setLevel(logging.DEBUG)
654 else:
655 _log.setLevel(logging.INFO)
656
657 if options.quiet:
658 _log.setLevel(logging.ERROR)
659
660 _log.debug(args)
661
662 if not len(args) == 2:
663 parser.print_help()
664 sys.exit()
665
666 if options.action == 'thumbnail':
667 VideoThumbnailer(*args)
668 elif options.action == 'video':
206ef749
JW
669 def cb(data):
670 print('I\'m a callback!')
671 transcoder = VideoTranscoder(*args, progress_callback=cb)