1 # GNU MediaGoblin -- federated, autonomous media hosting
2 # Copyright (C) 2011 MediaGoblin contributors. See AUTHORS.
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as published by
6 # the Free Software Foundation, either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU Affero General Public License for more details.
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17 from __future__
import division
20 os
.putenv('GST_DEBUG_DUMP_DOT_DIR', '/tmp')
27 _log
= logging
.getLogger(__name__
)
29 _log
.setLevel(logging
.DEBUG
)
33 import multiprocessing
35 CPU_COUNT
= multiprocessing
.cpu_count()
36 except NotImplementedError:
37 _log
.warning('multiprocessing.cpu_count not implemented')
40 _log
.warning('Could not import multiprocessing, defaulting to 2 CPU cores')
46 raise Exception('Could not find pygtk')
50 gobject
.threads_init()
52 raise Exception('gobject could not be found')
58 from gst
.extend
import discoverer
60 raise Exception('gst/pygst 0.10 could not be found')
63 class VideoThumbnailer
:
64 # Declaration of thumbnailer states
69 # The current thumbnailer state
72 # This will contain the thumbnailing pipeline
73 thumbnail_pipeline
= None
77 def __init__(self
, source_path
, dest_path
):
79 Set up playbin pipeline in order to get video properties.
81 Initializes and runs the gobject.MainLoop()
85 self
.source_path
= source_path
86 self
.dest_path
= dest_path
88 self
.loop
= gobject
.MainLoop()
90 # Set up the playbin. It will be used to discover certain
91 # properties of the input file
92 self
.playbin
= gst
.element_factory_make('playbin')
94 self
.videosink
= gst
.element_factory_make('fakesink', 'videosink')
95 self
.playbin
.set_property('video-sink', self
.videosink
)
97 self
.audiosink
= gst
.element_factory_make('fakesink', 'audiosink')
98 self
.playbin
.set_property('audio-sink', self
.audiosink
)
100 self
.bus
= self
.playbin
.get_bus()
101 self
.bus
.add_signal_watch()
102 self
.watch_id
= self
.bus
.connect('message', self
._on
_bus
_message
)
104 self
.playbin
.set_property('uri', 'file:{0}'.format(
105 urllib
.pathname2url(self
.source_path
)))
107 self
.playbin
.set_state(gst
.STATE_PAUSED
)
114 def _on_bus_message(self
, bus
, message
):
115 _log
.debug(' BUS MESSAGE: {0}'.format(message
))
117 if message
.type == gst
.MESSAGE_ERROR
:
118 gobject
.idle_add(self
._on
_bus
_error
)
120 elif message
.type == gst
.MESSAGE_STATE_CHANGED
:
121 # The pipeline state has changed
122 # Parse state changing data
123 _prev
, state
, _pending
= message
.parse_state_changed()
125 _log
.debug('State changed: {0}'.format(state
))
127 if state
== gst
.STATE_PAUSED
:
128 if message
.src
== self
.playbin
:
129 gobject
.idle_add(self
._on
_bus
_paused
)
131 def _on_bus_paused(self
):
133 Set up thumbnailing pipeline
135 current_video
= self
.playbin
.get_property('current-video')
137 if current_video
== 0:
138 _log
.debug('Found current video from playbin')
140 _log
.error('Could not get any current video from playbin!')
142 self
.duration
= self
._get
_duration
(self
.playbin
)
143 _log
.info('Video length: {0}'.format(self
.duration
/ gst
.SECOND
))
145 _log
.info('Setting up thumbnailing pipeline')
146 self
.thumbnail_pipeline
= gst
.parse_launch(
147 'filesrc location="{0}" ! decodebin ! '
148 'ffmpegcolorspace ! videoscale ! '
149 'video/x-raw-rgb,depth=24,bpp=24,pixel-aspect-ratio=1/1,width=180 ! '
150 'fakesink signal-handoffs=True'.format(self
.source_path
))
152 self
.thumbnail_bus
= self
.thumbnail_pipeline
.get_bus()
153 self
.thumbnail_bus
.add_signal_watch()
154 self
.thumbnail_watch_id
= self
.thumbnail_bus
.connect(
155 'message', self
._on
_thumbnail
_bus
_message
)
157 self
.thumbnail_pipeline
.set_state(gst
.STATE_PAUSED
)
159 #gobject.timeout_add(3000, self._on_timeout)
163 def _on_thumbnail_bus_message(self
, bus
, message
):
164 _log
.debug('Thumbnail bus called, message: {0}'.format(message
))
166 if message
.type == gst
.MESSAGE_ERROR
:
168 gobject
.idle_add(self
._on
_bus
_error
)
170 if message
.type == gst
.MESSAGE_STATE_CHANGED
:
171 _prev
, state
, _pending
= message
.parse_state_changed()
173 if (state
== gst
.STATE_PAUSED
and
174 not self
.state
== self
.STATE_PROCESSING
and
175 message
.src
== self
.thumbnail_pipeline
):
176 _log
.info('Pipeline paused, processing')
177 self
.state
= self
.STATE_PROCESSING
179 for sink
in self
.thumbnail_pipeline
.sinks():
180 name
= sink
.get_name()
181 factoryname
= sink
.get_factory().get_name()
183 if factoryname
== 'fakesink':
184 sinkpad
= sink
.get_pad('sink')
186 self
.buffer_probes
[name
] = sinkpad
.add_buffer_probe(
187 self
.buffer_probe_handler
, name
)
189 _log
.info('Added buffer probe')
193 # Apply the wadsworth constant, fallback to 1 second
194 seek_amount
= max(self
.duration
/ 100 * 30, 1 * gst
.SECOND
)
196 _log
.debug('seek amount: {0}'.format(seek_amount
))
198 seek_result
= self
.thumbnail_pipeline
.seek(
201 gst
.SEEK_FLAG_FLUSH | gst
.SEEK_FLAG_ACCURATE
,
208 self
.errors
.append('COULD_NOT_SEEK')
209 _log
.error('Couldn\'t seek! result: {0}'.format(
215 #self.thumbnail_pipeline.set_state(gst.STATE_PAUSED)
218 def buffer_probe_handler_real(self
, pad
, buff
, name
):
220 Capture buffers as gdk_pixbufs when told to.
225 _log
.error('No caps passed to buffer probe handler!')
229 _log
.debug('caps: {0}'.format(caps
))
232 width
= filters
["width"]
233 height
= filters
["height"]
235 pixbuf
= gtk
.gdk
.pixbuf_new_from_data(
236 buff
.data
, gtk
.gdk
.COLORSPACE_RGB
, False, 8,
237 width
, height
, width
* 3)
239 # NOTE: 200x136 is sort of arbitrary. it's larger than what
240 # the ui uses at the time of this writing.
241 # new_width, new_height = scaled_size((width, height), (200, 136))
243 #pixbuf = pixbuf.scale_simple(
244 #new_width, new_height, gtk.gdk.INTERP_BILINEAR)
246 pixbuf
.save(self
.dest_path
, 'jpeg')
247 _log
.info('Saved thumbnail')
250 except gst
.QueryError
:
254 def buffer_probe_handler(self
, pad
, buff
, name
):
256 Proxy function for buffer_probe_handler_real
259 lambda: self
.buffer_probe_handler_real(pad
, buff
, name
))
263 def _get_duration(self
, pipeline
, retries
=0):
265 Get the duration of a pipeline.
273 return pipeline
.query_duration(gst
.FORMAT_TIME
)[0]
274 except gst
.QueryError
:
275 return self
._get
_duration
(pipeline
, retries
+ 1)
277 def _on_timeout(self
):
278 _log
.error('TIMEOUT! DROP EVERYTHING!')
281 def _on_bus_error(self
, *args
):
282 _log
.error('AHAHAHA! Error! args: {0}'.format(args
))
286 Tell gobject to call __halt when the mainloop is idle.
288 _log
.info('Shutting down')
293 Halt all pipelines and shut down the main loop
295 _log
.info('Halting...')
296 self
.state
= self
.STATE_HALTING
300 gobject
.idle_add(self
.__halt
_final
)
302 def __disconnect(self
):
303 _log
.debug('Disconnecting...')
304 if not self
.playbin
is None:
305 self
.playbin
.set_state(gst
.STATE_NULL
)
306 for sink
in self
.playbin
.sinks():
307 name
= sink
.get_name()
308 factoryname
= sink
.get_factory().get_name()
310 _log
.debug('Disconnecting {0}'.format(name
))
312 if factoryname
== "fakesink":
313 pad
= sink
.get_pad("sink")
314 pad
.remove_buffer_probe(self
.buffer_probes
[name
])
315 del self
.buffer_probes
[name
]
319 if self
.bus
is not None:
320 self
.bus
.disconnect(self
.watch_id
)
324 def __halt_final(self
):
327 _log
.error(','.join(self
.errors
))
332 class VideoTranscoder
:
336 Transcodes the SRC video file to a VP8 WebM video file at DST
338 - Does the same thing as VideoThumbnailer, but produces a WebM vp8
339 and vorbis video file.
340 - The VideoTranscoder exceeds the VideoThumbnailer in the way
341 that it was refined afterwards and therefore is done more
344 def __init__(self
, src
, dst
, **kwargs
):
345 _log
.info('Initializing VideoTranscoder...')
347 self
.loop
= gobject
.MainLoop()
348 self
.source_path
= src
349 self
.destination_path
= dst
352 self
.destination_dimensions
= kwargs
.get('dimensions') or (640, 640)
353 self
._progress
_callback
= kwargs
.get('progress_callback') or None
355 if not type(self
.destination_dimensions
) == tuple:
356 raise Exception('dimensions must be tuple: (width, height)')
362 self
._setup
_discover
()
363 self
._setup
_pipeline
()
366 _log
.info('Discovering...')
367 self
.discoverer
.discover()
370 _log
.debug('Initializing MainLoop()')
373 def _setup_discover(self
):
374 _log
.debug('Setting up discoverer')
375 self
.discoverer
= discoverer
.Discoverer(self
.source_path
)
377 # Connect self.__discovered to the 'discovered' event
378 self
.discoverer
.connect('discovered', self
.__discovered
)
380 def __discovered(self
, data
, is_media
):
382 Callback for media discoverer.
386 raise Exception('Could not discover {0}'.format(self
.source_path
))
388 _log
.debug('__discovered, data: {0}'.format(data
.__dict
__))
392 # Launch things that should be done after discovery
393 self
._link
_elements
()
394 self
.__setup
_videoscale
_capsfilter
()
396 # Tell the transcoding pipeline to start running
397 self
.pipeline
.set_state(gst
.STATE_PLAYING
)
398 _log
.info('Transcoding...')
400 def _setup_pipeline(self
):
401 _log
.debug('Setting up transcoding pipeline')
402 # Create the pipeline bin.
403 self
.pipeline
= gst
.Pipeline('VideoTranscoderPipeline')
405 # Create all GStreamer elements, starting with
407 self
.filesrc
= gst
.element_factory_make('filesrc', 'filesrc')
408 self
.filesrc
.set_property('location', self
.source_path
)
409 self
.pipeline
.add(self
.filesrc
)
411 self
.decoder
= gst
.element_factory_make('decodebin2', 'decoder')
412 self
.decoder
.connect('new-decoded-pad', self
._on
_dynamic
_pad
)
413 self
.pipeline
.add(self
.decoder
)
416 self
.videoqueue
= gst
.element_factory_make('queue', 'videoqueue')
417 self
.pipeline
.add(self
.videoqueue
)
419 self
.videorate
= gst
.element_factory_make('videorate', 'videorate')
420 self
.pipeline
.add(self
.videorate
)
422 self
.ffmpegcolorspace
= gst
.element_factory_make(
423 'ffmpegcolorspace', 'ffmpegcolorspace')
424 self
.pipeline
.add(self
.ffmpegcolorspace
)
426 self
.videoscale
= gst
.element_factory_make('ffvideoscale', 'videoscale')
427 #self.videoscale.set_property('method', 2) # I'm not sure this works
428 #self.videoscale.set_property('add-borders', 0)
429 self
.pipeline
.add(self
.videoscale
)
431 self
.capsfilter
= gst
.element_factory_make('capsfilter', 'capsfilter')
432 self
.pipeline
.add(self
.capsfilter
)
434 self
.vp8enc
= gst
.element_factory_make('vp8enc', 'vp8enc')
435 self
.vp8enc
.set_property('quality', 6)
436 self
.vp8enc
.set_property('threads', 2)
437 self
.pipeline
.add(self
.vp8enc
)
440 self
.audioqueue
= gst
.element_factory_make('queue', 'audioqueue')
441 self
.pipeline
.add(self
.audioqueue
)
443 self
.audiorate
= gst
.element_factory_make('audiorate', 'audiorate')
444 self
.audiorate
.set_property('tolerance', 80000000)
445 self
.pipeline
.add(self
.audiorate
)
447 self
.audioconvert
= gst
.element_factory_make('audioconvert', 'audioconvert')
448 self
.pipeline
.add(self
.audioconvert
)
450 self
.audiocapsfilter
= gst
.element_factory_make('capsfilter', 'audiocapsfilter')
451 audiocaps
= ['audio/x-raw-float']
452 self
.audiocapsfilter
.set_property(
454 gst
.caps_from_string(
455 ','.join(audiocaps
)))
456 self
.pipeline
.add(self
.audiocapsfilter
)
458 self
.vorbisenc
= gst
.element_factory_make('vorbisenc', 'vorbisenc')
459 self
.vorbisenc
.set_property('quality', 1)
460 self
.pipeline
.add(self
.vorbisenc
)
463 self
.webmmux
= gst
.element_factory_make('webmmux', 'webmmux')
464 self
.pipeline
.add(self
.webmmux
)
466 self
.filesink
= gst
.element_factory_make('filesink', 'filesink')
467 self
.filesink
.set_property('location', self
.destination_path
)
468 self
.pipeline
.add(self
.filesink
)
471 self
.progressreport
= gst
.element_factory_make(
472 'progressreport', 'progressreport')
473 # Update every second
474 self
.progressreport
.set_property('update-freq', 1)
475 self
.progressreport
.set_property('silent', True)
476 self
.pipeline
.add(self
.progressreport
)
478 def _link_elements(self
):
480 Link all the elements
482 This code depends on data from the discoverer and is called
485 _log
.debug('linking elements')
486 # Link the filesrc element to the decoder. The decoder then emits
487 # 'new-decoded-pad' which links decoded src pads to either a video
489 self
.filesrc
.link(self
.decoder
)
491 # Link all the video elements in a row to webmmux
492 gst
.element_link_many(
495 self
.ffmpegcolorspace
,
501 if self
.data
.is_audio
:
502 # Link all the audio elements in a row to webmux
503 gst
.element_link_many(
507 self
.audiocapsfilter
,
511 gst
.element_link_many(
516 # Setup the message bus and connect _on_message to the pipeline
520 def _on_dynamic_pad(self
, dbin
, pad
, islast
):
522 Callback called when ``decodebin2`` has a pad that we can connect to
524 # Intersect the capabilities of the video sink and the pad src
525 # Then check if they have no common capabilities.
526 if self
.ffmpegcolorspace
.get_pad_template('sink')\
527 .get_caps().intersect(pad
.get_caps()).is_empty():
528 # It is NOT a video src pad.
529 pad
.link(self
.audioqueue
.get_pad('sink'))
531 # It IS a video src pad.
532 pad
.link(self
.videoqueue
.get_pad('sink'))
534 def _setup_bus(self
):
535 self
.bus
= self
.pipeline
.get_bus()
536 self
.bus
.add_signal_watch()
537 self
.bus
.connect('message', self
._on
_message
)
539 def __setup_videoscale_capsfilter(self
):
541 Sets up the output format (width, height) for the video
543 caps
= ['video/x-raw-yuv', 'pixel-aspect-ratio=1/1', 'framerate=30/1']
545 if self
.data
.videoheight
> self
.data
.videowidth
:
546 # Whoa! We have ourselves a portrait video!
547 caps
.append('height={0}'.format(
548 self
.destination_dimensions
[1]))
550 # It's a landscape, phew, how normal.
551 caps
.append('width={0}'.format(
552 self
.destination_dimensions
[0]))
554 self
.capsfilter
.set_property(
556 gst
.caps_from_string(
559 def _on_message(self
, bus
, message
):
560 _log
.debug((bus
, message
, message
.type))
564 if t
== gst
.MESSAGE_EOS
:
565 self
._discover
_dst
_and
_stop
()
568 elif t
== gst
.MESSAGE_ELEMENT
:
569 if message
.structure
.get_name() == 'progress':
570 data
= dict(message
.structure
)
572 if self
._progress
_callback
:
573 self
._progress
_callback
(data
)
575 _log
.info('{percent}% done...'.format(
576 percent
=data
.get('percent')))
579 elif t
== gst
.MESSAGE_ERROR
:
580 _log
.error((bus
, message
))
583 def _discover_dst_and_stop(self
):
584 self
.dst_discoverer
= discoverer
.Discoverer(self
.destination_path
)
586 self
.dst_discoverer
.connect('discovered', self
.__dst
_discovered
)
588 self
.dst_discoverer
.discover()
591 def __dst_discovered(self
, data
, is_media
):
597 _log
.debug(self
.loop
)
599 # Stop executing the pipeline
600 self
.pipeline
.set_state(gst
.STATE_NULL
)
602 # This kills the loop, mercifully
603 gobject
.idle_add(self
.__stop
_mainloop
)
605 def __stop_mainloop(self
):
607 Wrapper for gobject.MainLoop.quit()
609 This wrapper makes us able to see if self.loop.quit has been called
611 _log
.info('Terminating MainLoop')
616 if __name__
== '__main__':
618 from optparse
import OptionParser
620 parser
= OptionParser(
621 usage
='%prog [-v] -a [ video | thumbnail ] SRC DEST')
623 parser
.add_option('-a', '--action',
625 help='One of "video" or "thumbnail"')
627 parser
.add_option('-v',
630 help='Output debug information')
632 parser
.add_option('-q',
635 help='Dear program, please be quiet unless *error*')
637 (options
, args
) = parser
.parse_args()
640 _log
.setLevel(logging
.DEBUG
)
642 _log
.setLevel(logging
.INFO
)
645 _log
.setLevel(logging
.ERROR
)
649 if not len(args
) == 2:
653 if options
.action
== 'thumbnail':
654 VideoThumbnailer(*args
)
655 elif options
.action
== 'video':
657 print('I\'m a callback!')
658 transcoder
= VideoTranscoder(*args
, progress_callback
=cb
)