It's 2012 all up in here
[mediagoblin.git] / mediagoblin / media_types / video / transcoders.py
1 # GNU MediaGoblin -- federated, autonomous media hosting
2 # Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
3 #
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as published by
6 # the Free Software Foundation, either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16
17 from __future__ import division
18
19 import os
20 os.putenv('GST_DEBUG_DUMP_DOT_DIR', '/tmp')
21
22 import sys
23 import logging
24 import pdb
25 import urllib
26
27 _log = logging.getLogger(__name__)
28 logging.basicConfig()
29 _log.setLevel(logging.DEBUG)
30
31 CPU_COUNT = 2
32 try:
33 import multiprocessing
34 try:
35 CPU_COUNT = multiprocessing.cpu_count()
36 except NotImplementedError:
37 _log.warning('multiprocessing.cpu_count not implemented')
38 pass
39 except ImportError:
40 _log.warning('Could not import multiprocessing, defaulting to 2 CPU cores')
41 pass
42
43 try:
44 import gtk
45 except:
46 raise Exception('Could not find pygtk')
47
48 try:
49 import gobject
50 gobject.threads_init()
51 except:
52 raise Exception('gobject could not be found')
53
54 try:
55 import pygst
56 pygst.require('0.10')
57 import gst
58 from gst.extend import discoverer
59 except:
60 raise Exception('gst/pygst 0.10 could not be found')
61
62
63 class VideoThumbnailer:
64 # Declaration of thumbnailer states
65 STATE_NULL = 0
66 STATE_HALTING = 1
67 STATE_PROCESSING = 2
68
69 # The current thumbnailer state
70 state = STATE_NULL
71
72 # This will contain the thumbnailing pipeline
73 thumbnail_pipeline = None
74
75 buffer_probes = {}
76
77 def __init__(self, source_path, dest_path):
78 '''
79 Set up playbin pipeline in order to get video properties.
80
81 Initializes and runs the gobject.MainLoop()
82 '''
83 self.errors = []
84
85 self.source_path = source_path
86 self.dest_path = dest_path
87
88 self.loop = gobject.MainLoop()
89
90 # Set up the playbin. It will be used to discover certain
91 # properties of the input file
92 self.playbin = gst.element_factory_make('playbin')
93
94 self.videosink = gst.element_factory_make('fakesink', 'videosink')
95 self.playbin.set_property('video-sink', self.videosink)
96
97 self.audiosink = gst.element_factory_make('fakesink', 'audiosink')
98 self.playbin.set_property('audio-sink', self.audiosink)
99
100 self.bus = self.playbin.get_bus()
101 self.bus.add_signal_watch()
102 self.watch_id = self.bus.connect('message', self._on_bus_message)
103
104 self.playbin.set_property('uri', 'file:{0}'.format(
105 urllib.pathname2url(self.source_path)))
106
107 self.playbin.set_state(gst.STATE_PAUSED)
108
109 self.run()
110
111 def run(self):
112 self.loop.run()
113
114 def _on_bus_message(self, bus, message):
115 _log.debug(' BUS MESSAGE: {0}'.format(message))
116
117 if message.type == gst.MESSAGE_ERROR:
118 gobject.idle_add(self._on_bus_error)
119
120 elif message.type == gst.MESSAGE_STATE_CHANGED:
121 # The pipeline state has changed
122 # Parse state changing data
123 _prev, state, _pending = message.parse_state_changed()
124
125 _log.debug('State changed: {0}'.format(state))
126
127 if state == gst.STATE_PAUSED:
128 if message.src == self.playbin:
129 gobject.idle_add(self._on_bus_paused)
130
131 def _on_bus_paused(self):
132 '''
133 Set up thumbnailing pipeline
134 '''
135 current_video = self.playbin.get_property('current-video')
136
137 if current_video == 0:
138 _log.debug('Found current video from playbin')
139 else:
140 _log.error('Could not get any current video from playbin!')
141
142 self.duration = self._get_duration(self.playbin)
143 _log.info('Video length: {0}'.format(self.duration / gst.SECOND))
144
145 _log.info('Setting up thumbnailing pipeline')
146 self.thumbnail_pipeline = gst.parse_launch(
147 'filesrc location="{0}" ! decodebin ! '
148 'ffmpegcolorspace ! videoscale ! '
149 'video/x-raw-rgb,depth=24,bpp=24,pixel-aspect-ratio=1/1,width=180 ! '
150 'fakesink signal-handoffs=True'.format(self.source_path))
151
152 self.thumbnail_bus = self.thumbnail_pipeline.get_bus()
153 self.thumbnail_bus.add_signal_watch()
154 self.thumbnail_watch_id = self.thumbnail_bus.connect(
155 'message', self._on_thumbnail_bus_message)
156
157 self.thumbnail_pipeline.set_state(gst.STATE_PAUSED)
158
159 #gobject.timeout_add(3000, self._on_timeout)
160
161 return False
162
163 def _on_thumbnail_bus_message(self, bus, message):
164 _log.debug('Thumbnail bus called, message: {0}'.format(message))
165
166 if message.type == gst.MESSAGE_ERROR:
167 _log.error(message)
168 gobject.idle_add(self._on_bus_error)
169
170 if message.type == gst.MESSAGE_STATE_CHANGED:
171 _prev, state, _pending = message.parse_state_changed()
172
173 if (state == gst.STATE_PAUSED and
174 not self.state == self.STATE_PROCESSING and
175 message.src == self.thumbnail_pipeline):
176 _log.info('Pipeline paused, processing')
177 self.state = self.STATE_PROCESSING
178
179 for sink in self.thumbnail_pipeline.sinks():
180 name = sink.get_name()
181 factoryname = sink.get_factory().get_name()
182
183 if factoryname == 'fakesink':
184 sinkpad = sink.get_pad('sink')
185
186 self.buffer_probes[name] = sinkpad.add_buffer_probe(
187 self.buffer_probe_handler, name)
188
189 _log.info('Added buffer probe')
190
191 break
192
193 # Apply the wadsworth constant, fallback to 1 second
194 seek_amount = max(self.duration / 100 * 30, 1 * gst.SECOND)
195
196 _log.debug('seek amount: {0}'.format(seek_amount))
197
198 seek_result = self.thumbnail_pipeline.seek(
199 1.0,
200 gst.FORMAT_TIME,
201 gst.SEEK_FLAG_FLUSH | gst.SEEK_FLAG_ACCURATE,
202 gst.SEEK_TYPE_SET,
203 seek_amount,
204 gst.SEEK_TYPE_NONE,
205 0)
206
207 if not seek_result:
208 self.errors.append('COULD_NOT_SEEK')
209 _log.error('Couldn\'t seek! result: {0}'.format(
210 seek_result))
211 _log.info(message)
212 self.shutdown()
213 else:
214 pass
215 #self.thumbnail_pipeline.set_state(gst.STATE_PAUSED)
216 #pdb.set_trace()
217
218 def buffer_probe_handler_real(self, pad, buff, name):
219 '''
220 Capture buffers as gdk_pixbufs when told to.
221 '''
222 try:
223 caps = buff.caps
224 if caps is None:
225 _log.error('No caps passed to buffer probe handler!')
226 self.shutdown()
227 return False
228
229 _log.debug('caps: {0}'.format(caps))
230
231 filters = caps[0]
232 width = filters["width"]
233 height = filters["height"]
234
235 pixbuf = gtk.gdk.pixbuf_new_from_data(
236 buff.data, gtk.gdk.COLORSPACE_RGB, False, 8,
237 width, height, width * 3)
238
239 # NOTE: 200x136 is sort of arbitrary. it's larger than what
240 # the ui uses at the time of this writing.
241 # new_width, new_height = scaled_size((width, height), (200, 136))
242
243 #pixbuf = pixbuf.scale_simple(
244 #new_width, new_height, gtk.gdk.INTERP_BILINEAR)
245
246 pixbuf.save(self.dest_path, 'jpeg')
247 _log.info('Saved thumbnail')
248 del pixbuf
249 self.shutdown()
250 except gst.QueryError:
251 pass
252 return False
253
254 def buffer_probe_handler(self, pad, buff, name):
255 '''
256 Proxy function for buffer_probe_handler_real
257 '''
258 gobject.idle_add(
259 lambda: self.buffer_probe_handler_real(pad, buff, name))
260
261 return True
262
263 def _get_duration(self, pipeline, retries=0):
264 '''
265 Get the duration of a pipeline.
266
267 Retries 5 times.
268 '''
269 if retries == 5:
270 return 0
271
272 try:
273 return pipeline.query_duration(gst.FORMAT_TIME)[0]
274 except gst.QueryError:
275 return self._get_duration(pipeline, retries + 1)
276
277 def _on_timeout(self):
278 _log.error('TIMEOUT! DROP EVERYTHING!')
279 self.shutdown()
280
281 def _on_bus_error(self, *args):
282 _log.error('AHAHAHA! Error! args: {0}'.format(args))
283
284 def shutdown(self):
285 '''
286 Tell gobject to call __halt when the mainloop is idle.
287 '''
288 _log.info('Shutting down')
289 self.__halt()
290
291 def __halt(self):
292 '''
293 Halt all pipelines and shut down the main loop
294 '''
295 _log.info('Halting...')
296 self.state = self.STATE_HALTING
297
298 self.__disconnect()
299
300 gobject.idle_add(self.__halt_final)
301
302 def __disconnect(self):
303 _log.debug('Disconnecting...')
304 if not self.playbin is None:
305 self.playbin.set_state(gst.STATE_NULL)
306 for sink in self.playbin.sinks():
307 name = sink.get_name()
308 factoryname = sink.get_factory().get_name()
309
310 _log.debug('Disconnecting {0}'.format(name))
311
312 if factoryname == "fakesink":
313 pad = sink.get_pad("sink")
314 pad.remove_buffer_probe(self.buffer_probes[name])
315 del self.buffer_probes[name]
316
317 self.playbin = None
318
319 if self.bus is not None:
320 self.bus.disconnect(self.watch_id)
321 self.bus = None
322
323
324 def __halt_final(self):
325 _log.info('Done')
326 if self.errors:
327 _log.error(','.join(self.errors))
328
329 self.loop.quit()
330
331
332 class VideoTranscoder:
333 '''
334 Video transcoder
335
336 Transcodes the SRC video file to a VP8 WebM video file at DST
337
338 - Does the same thing as VideoThumbnailer, but produces a WebM vp8
339 and vorbis video file.
340 - The VideoTranscoder exceeds the VideoThumbnailer in the way
341 that it was refined afterwards and therefore is done more
342 correctly.
343 '''
344 def __init__(self, src, dst, **kwargs):
345 _log.info('Initializing VideoTranscoder...')
346
347 self.loop = gobject.MainLoop()
348 self.source_path = src
349 self.destination_path = dst
350
351 # Options
352 self.destination_dimensions = kwargs.get('dimensions') or (640, 640)
353 self._progress_callback = kwargs.get('progress_callback') or None
354
355 if not type(self.destination_dimensions) == tuple:
356 raise Exception('dimensions must be tuple: (width, height)')
357
358 self._setup()
359 self._run()
360
361 def _setup(self):
362 self._setup_discover()
363 self._setup_pipeline()
364
365 def _run(self):
366 _log.info('Discovering...')
367 self.discoverer.discover()
368 _log.info('Done')
369
370 _log.debug('Initializing MainLoop()')
371 self.loop.run()
372
373 def _setup_discover(self):
374 _log.debug('Setting up discoverer')
375 self.discoverer = discoverer.Discoverer(self.source_path)
376
377 # Connect self.__discovered to the 'discovered' event
378 self.discoverer.connect('discovered', self.__discovered)
379
380 def __discovered(self, data, is_media):
381 '''
382 Callback for media discoverer.
383 '''
384 if not is_media:
385 self.__stop()
386 raise Exception('Could not discover {0}'.format(self.source_path))
387
388 _log.debug('__discovered, data: {0}'.format(data.__dict__))
389
390 self.data = data
391
392 # Launch things that should be done after discovery
393 self._link_elements()
394 self.__setup_videoscale_capsfilter()
395
396 # Tell the transcoding pipeline to start running
397 self.pipeline.set_state(gst.STATE_PLAYING)
398 _log.info('Transcoding...')
399
400 def _setup_pipeline(self):
401 _log.debug('Setting up transcoding pipeline')
402 # Create the pipeline bin.
403 self.pipeline = gst.Pipeline('VideoTranscoderPipeline')
404
405 # Create all GStreamer elements, starting with
406 # filesrc & decoder
407 self.filesrc = gst.element_factory_make('filesrc', 'filesrc')
408 self.filesrc.set_property('location', self.source_path)
409 self.pipeline.add(self.filesrc)
410
411 self.decoder = gst.element_factory_make('decodebin2', 'decoder')
412 self.decoder.connect('new-decoded-pad', self._on_dynamic_pad)
413 self.pipeline.add(self.decoder)
414
415 # Video elements
416 self.videoqueue = gst.element_factory_make('queue', 'videoqueue')
417 self.pipeline.add(self.videoqueue)
418
419 self.videorate = gst.element_factory_make('videorate', 'videorate')
420 self.pipeline.add(self.videorate)
421
422 self.ffmpegcolorspace = gst.element_factory_make(
423 'ffmpegcolorspace', 'ffmpegcolorspace')
424 self.pipeline.add(self.ffmpegcolorspace)
425
426 self.videoscale = gst.element_factory_make('ffvideoscale', 'videoscale')
427 #self.videoscale.set_property('method', 2) # I'm not sure this works
428 #self.videoscale.set_property('add-borders', 0)
429 self.pipeline.add(self.videoscale)
430
431 self.capsfilter = gst.element_factory_make('capsfilter', 'capsfilter')
432 self.pipeline.add(self.capsfilter)
433
434 self.vp8enc = gst.element_factory_make('vp8enc', 'vp8enc')
435 self.vp8enc.set_property('quality', 6)
436 self.vp8enc.set_property('threads', 2)
437 self.pipeline.add(self.vp8enc)
438
439 # Audio elements
440 self.audioqueue = gst.element_factory_make('queue', 'audioqueue')
441 self.pipeline.add(self.audioqueue)
442
443 self.audiorate = gst.element_factory_make('audiorate', 'audiorate')
444 self.audiorate.set_property('tolerance', 80000000)
445 self.pipeline.add(self.audiorate)
446
447 self.audioconvert = gst.element_factory_make('audioconvert', 'audioconvert')
448 self.pipeline.add(self.audioconvert)
449
450 self.audiocapsfilter = gst.element_factory_make('capsfilter', 'audiocapsfilter')
451 audiocaps = ['audio/x-raw-float']
452 self.audiocapsfilter.set_property(
453 'caps',
454 gst.caps_from_string(
455 ','.join(audiocaps)))
456 self.pipeline.add(self.audiocapsfilter)
457
458 self.vorbisenc = gst.element_factory_make('vorbisenc', 'vorbisenc')
459 self.vorbisenc.set_property('quality', 1)
460 self.pipeline.add(self.vorbisenc)
461
462 # WebMmux & filesink
463 self.webmmux = gst.element_factory_make('webmmux', 'webmmux')
464 self.pipeline.add(self.webmmux)
465
466 self.filesink = gst.element_factory_make('filesink', 'filesink')
467 self.filesink.set_property('location', self.destination_path)
468 self.pipeline.add(self.filesink)
469
470 # Progressreport
471 self.progressreport = gst.element_factory_make(
472 'progressreport', 'progressreport')
473 # Update every second
474 self.progressreport.set_property('update-freq', 1)
475 self.progressreport.set_property('silent', True)
476 self.pipeline.add(self.progressreport)
477
478 def _link_elements(self):
479 '''
480 Link all the elements
481
482 This code depends on data from the discoverer and is called
483 from __discovered
484 '''
485 _log.debug('linking elements')
486 # Link the filesrc element to the decoder. The decoder then emits
487 # 'new-decoded-pad' which links decoded src pads to either a video
488 # or audio sink
489 self.filesrc.link(self.decoder)
490
491 # Link all the video elements in a row to webmmux
492 gst.element_link_many(
493 self.videoqueue,
494 self.videorate,
495 self.ffmpegcolorspace,
496 self.videoscale,
497 self.capsfilter,
498 self.vp8enc,
499 self.webmmux)
500
501 if self.data.is_audio:
502 # Link all the audio elements in a row to webmux
503 gst.element_link_many(
504 self.audioqueue,
505 self.audiorate,
506 self.audioconvert,
507 self.audiocapsfilter,
508 self.vorbisenc,
509 self.webmmux)
510
511 gst.element_link_many(
512 self.webmmux,
513 self.progressreport,
514 self.filesink)
515
516 # Setup the message bus and connect _on_message to the pipeline
517 self._setup_bus()
518
519
520 def _on_dynamic_pad(self, dbin, pad, islast):
521 '''
522 Callback called when ``decodebin2`` has a pad that we can connect to
523 '''
524 # Intersect the capabilities of the video sink and the pad src
525 # Then check if they have no common capabilities.
526 if self.ffmpegcolorspace.get_pad_template('sink')\
527 .get_caps().intersect(pad.get_caps()).is_empty():
528 # It is NOT a video src pad.
529 pad.link(self.audioqueue.get_pad('sink'))
530 else:
531 # It IS a video src pad.
532 pad.link(self.videoqueue.get_pad('sink'))
533
534 def _setup_bus(self):
535 self.bus = self.pipeline.get_bus()
536 self.bus.add_signal_watch()
537 self.bus.connect('message', self._on_message)
538
539 def __setup_videoscale_capsfilter(self):
540 '''
541 Sets up the output format (width, height) for the video
542 '''
543 caps = ['video/x-raw-yuv', 'pixel-aspect-ratio=1/1', 'framerate=30/1']
544
545 if self.data.videoheight > self.data.videowidth:
546 # Whoa! We have ourselves a portrait video!
547 caps.append('height={0}'.format(
548 self.destination_dimensions[1]))
549 else:
550 # It's a landscape, phew, how normal.
551 caps.append('width={0}'.format(
552 self.destination_dimensions[0]))
553
554 self.capsfilter.set_property(
555 'caps',
556 gst.caps_from_string(
557 ','.join(caps)))
558
559 def _on_message(self, bus, message):
560 _log.debug((bus, message, message.type))
561
562 t = message.type
563
564 if t == gst.MESSAGE_EOS:
565 self._discover_dst_and_stop()
566 _log.info('Done')
567
568 elif t == gst.MESSAGE_ELEMENT:
569 if message.structure.get_name() == 'progress':
570 data = dict(message.structure)
571
572 if self._progress_callback:
573 self._progress_callback(data)
574
575 _log.info('{percent}% done...'.format(
576 percent=data.get('percent')))
577 _log.debug(data)
578
579 elif t == gst.MESSAGE_ERROR:
580 _log.error((bus, message))
581 self.__stop()
582
583 def _discover_dst_and_stop(self):
584 self.dst_discoverer = discoverer.Discoverer(self.destination_path)
585
586 self.dst_discoverer.connect('discovered', self.__dst_discovered)
587
588 self.dst_discoverer.discover()
589
590
591 def __dst_discovered(self, data, is_media):
592 self.dst_data = data
593
594 self.__stop()
595
596 def __stop(self):
597 _log.debug(self.loop)
598
599 # Stop executing the pipeline
600 self.pipeline.set_state(gst.STATE_NULL)
601
602 # This kills the loop, mercifully
603 gobject.idle_add(self.__stop_mainloop)
604
605 def __stop_mainloop(self):
606 '''
607 Wrapper for gobject.MainLoop.quit()
608
609 This wrapper makes us able to see if self.loop.quit has been called
610 '''
611 _log.info('Terminating MainLoop')
612
613 self.loop.quit()
614
615
616 if __name__ == '__main__':
617 os.nice(19)
618 from optparse import OptionParser
619
620 parser = OptionParser(
621 usage='%prog [-v] -a [ video | thumbnail ] SRC DEST')
622
623 parser.add_option('-a', '--action',
624 dest='action',
625 help='One of "video" or "thumbnail"')
626
627 parser.add_option('-v',
628 dest='verbose',
629 action='store_true',
630 help='Output debug information')
631
632 parser.add_option('-q',
633 dest='quiet',
634 action='store_true',
635 help='Dear program, please be quiet unless *error*')
636
637 (options, args) = parser.parse_args()
638
639 if options.verbose:
640 _log.setLevel(logging.DEBUG)
641 else:
642 _log.setLevel(logging.INFO)
643
644 if options.quiet:
645 _log.setLevel(logging.ERROR)
646
647 _log.debug(args)
648
649 if not len(args) == 2:
650 parser.print_help()
651 sys.exit()
652
653 if options.action == 'thumbnail':
654 VideoThumbnailer(*args)
655 elif options.action == 'video':
656 def cb(data):
657 print('I\'m a callback!')
658 transcoder = VideoTranscoder(*args, progress_callback=cb)