# GNU MediaGoblin -- federated, autonomous media hosting
# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see .
from __future__ import division
import os
import sys
import logging
import urllib
import multiprocessing
import gobject
old_argv = sys.argv
sys.argv = []
import pygst
pygst.require('0.10')
import gst
sys.argv = old_argv
import struct
try:
from PIL import Image
except ImportError:
import Image
from gst.extend import discoverer
_log = logging.getLogger(__name__)
gobject.threads_init()
CPU_COUNT = 2
try:
CPU_COUNT = multiprocessing.cpu_count()
except NotImplementedError:
_log.warning('multiprocessing.cpu_count not implemented')
os.putenv('GST_DEBUG_DUMP_DOT_DIR', '/tmp')
def pixbuf_to_pilbuf(buf):
data = list()
for i in range(0, len(buf), 3):
r, g, b = struct.unpack('BBB', buf[i:i + 3])
data.append((r, g, b))
return data
class VideoThumbnailerMarkII(object):
'''
Creates a thumbnail from a video file. Rewrite of VideoThumbnailer.
Large parts of the functionality and overall architectue contained within
this object is taken from Participatory Culture Foundation's
`gst_extractor.Extractor` object last seen at
https://github.com/pculture/miro/blob/master/tv/lib/frontends/widgets/gst/gst_extractor.py
in the `miro` codebase.
The `miro` codebase and the gst_extractor.py are licensed under the GNU
General Public License v2 or later.
'''
STATE_NULL = 0
STATE_HALTING = 1
STATE_PROCESSING = 2
STATE_PROCESSING_THUMBNAIL = 3
def __init__(self, source_path, dest_path, width=None, height=None,
position_callback=None):
self.state = self.STATE_NULL
self.has_reached_playbin_pause = False
self.thumbnail_pipeline = None
self.permission_to_take_picture = False
self.buffer_probes = {}
self.errors = []
self.source_path = os.path.abspath(source_path)
self.dest_path = os.path.abspath(dest_path)
self.width = width
self.height = height
self.position_callback = position_callback \
or self.wadsworth_position_callback
self.mainloop = gobject.MainLoop()
self.playbin = gst.element_factory_make('playbin')
self.videosink = gst.element_factory_make('fakesink', 'videosink')
self.audiosink = gst.element_factory_make('fakesink', 'audiosink')
self.playbin.set_property('video-sink', self.videosink)
self.playbin.set_property('audio-sink', self.audiosink)
self.playbin_message_bus = self.playbin.get_bus()
self.playbin_message_bus.add_signal_watch()
self.playbin_bus_watch_id = self.playbin_message_bus.connect(
'message',
self.on_playbin_message)
self.playbin.set_property(
'uri',
'file:{0}'.format(
urllib.pathname2url(self.source_path)))
self.playbin.set_state(gst.STATE_PAUSED)
try:
self.run()
except Exception as exc:
_log.critical(
'Exception "{0}" caught, shutting down mainloop and re-raising'\
.format(exc))
self.disconnect()
raise
def wadsworth_position_callback(self, duration, gst):
return self.duration / 100 * 30
def run(self):
self.mainloop.run()
def on_playbin_message(self, message_bus, message):
# Silenced to prevent clobbering of output
#_log.debug('playbin message: {0}'.format(message))
if message.type == gst.MESSAGE_ERROR:
_log.error('playbin error: {0}'.format(message))
gobject.idle_add(self.on_playbin_error)
if message.type == gst.MESSAGE_STATE_CHANGED:
prev_state, cur_state, pending_state = \
message.parse_state_changed()
_log.debug('playbin state changed: \nprev: {0}\ncur: {1}\n \
pending: {2}'.format(
prev_state,
cur_state,
pending_state))
if cur_state == gst.STATE_PAUSED:
if message.src == self.playbin:
_log.info('playbin ready')
gobject.idle_add(self.on_playbin_paused)
def on_playbin_paused(self):
if self.has_reached_playbin_pause:
_log.warn('Has already reached on_playbin_paused. Aborting \
without doing anything this time.')
return False
self.has_reached_playbin_pause = True
# XXX: Why is this even needed at this point?
current_video = self.playbin.get_property('current-video')
if not current_video:
_log.critical('Could not get any video data \
from playbin')
else:
_log.info('Got video data from playbin')
self.duration = self.get_duration(self.playbin)
self.permission_to_take_picture = True
self.buffer_probes = {}
pipeline = ''.join([
'filesrc location="%s" ! decodebin ! ' % self.source_path,
'ffmpegcolorspace ! videoscale ! ',
'video/x-raw-rgb,depth=24,bpp=24,pixel-aspect-ratio=1/1',
',width={0}'.format(self.width) if self.width else '',
',height={0}'.format(self.height) if self.height else '',
' ! ',
'fakesink signal-handoffs=True'])
_log.debug('thumbnail_pipeline: {0}'.format(pipeline))
self.thumbnail_pipeline = gst.parse_launch(pipeline)
self.thumbnail_message_bus = self.thumbnail_pipeline.get_bus()
self.thumbnail_message_bus.add_signal_watch()
self.thumbnail_bus_watch_id = self.thumbnail_message_bus.connect(
'message',
self.on_thumbnail_message)
self.thumbnail_pipeline.set_state(gst.STATE_PAUSED)
gobject.timeout_add(3000, self.on_gobject_timeout)
return False
def on_thumbnail_message(self, message_bus, message):
# This is silenced to prevent clobbering of the terminal window
#_log.debug('thumbnail message: {0}'.format(message))
if message.type == gst.MESSAGE_ERROR:
_log.error('thumbnail error: {0}'.format(message.parse_error()))
gobject.idle_add(self.on_thumbnail_error, message)
if message.type == gst.MESSAGE_STATE_CHANGED:
prev_state, cur_state, pending_state = \
message.parse_state_changed()
_log.debug('thumbnail state changed: \nprev: {0}\ncur: {1}\n \
pending: {2}'.format(
prev_state,
cur_state,
pending_state))
if cur_state == gst.STATE_PAUSED and \
not self.state == self.STATE_PROCESSING_THUMBNAIL:
# Find the fakesink sink pad and attach the on_buffer_probe
# handler to it.
seek_amount = self.position_callback(self.duration, gst)
seek_result = self.thumbnail_pipeline.seek(
1.0,
gst.FORMAT_TIME,
gst.SEEK_FLAG_FLUSH | gst.SEEK_FLAG_ACCURATE,
gst.SEEK_TYPE_SET,
seek_amount,
gst.SEEK_TYPE_NONE,
0)
if not seek_result:
_log.info('Could not seek.')
else:
_log.info('Seek successful, attaching buffer probe')
self.state = self.STATE_PROCESSING_THUMBNAIL
for sink in self.thumbnail_pipeline.sinks():
sink_name = sink.get_name()
sink_factory_name = sink.get_factory().get_name()
if sink_factory_name == 'fakesink':
sink_pad = sink.get_pad('sink')
self.buffer_probes[sink_name] = sink_pad\
.add_buffer_probe(
self.on_pad_buffer_probe,
sink_name)
_log.info('Attached buffer probes: {0}'.format(
self.buffer_probes))
break
elif self.state == self.STATE_PROCESSING_THUMBNAIL:
_log.info('Already processing thumbnail')
def on_pad_buffer_probe(self, *args):
_log.debug('buffer probe handler: {0}'.format(args))
gobject.idle_add(lambda: self.take_snapshot(*args))
def take_snapshot(self, pad, buff, name):
if self.state == self.STATE_HALTING:
_log.debug('Pipeline is halting, will not take snapshot')
return False
_log.info('Taking snapshot! ({0})'.format(
(pad, buff, name)))
try:
caps = buff.caps
if caps is None:
_log.error('No buffer caps present /take_snapshot')
self.disconnect()
_log.debug('caps: {0}'.format(caps))
filters = caps[0]
width = filters['width']
height = filters['height']
im = Image.new('RGB', (width, height))
data = pixbuf_to_pilbuf(buff.data)
im.putdata(data)
im.save(self.dest_path)
_log.info('Saved snapshot!')
self.disconnect()
except gst.QueryError as exc:
_log.error('take_snapshot - QueryError: {0}'.format(exc))
return False
def on_thumbnail_error(self, message):
scaling_failed = False
if 'Error calculating the output scaled size - integer overflow' \
in message.parse_error()[1]:
# GStreamer videoscale sometimes fails to calculate the dimensions
# given only one of the destination dimensions and the source
# dimensions. This is a workaround in case videoscale returns an
# error that indicates this has happened.
scaling_failed = True
_log.error('Thumbnailing failed because of videoscale integer'
' overflow. Will retry with fallback.')
else:
_log.error('Thumbnailing failed: {0}'.format(message.parse_error()))
# Kill the current mainloop
self.disconnect()
if scaling_failed:
# Manually scale the destination dimensions
_log.info('Retrying with manually set sizes...')
info = VideoTranscoder().discover(self.source_path)
h = info['videoheight']
w = info['videowidth']
ratio = 180 / int(w)
h = int(h * ratio)
self.__init__(self.source_path, self.dest_path, 180, h)
def disconnect(self):
self.state = self.STATE_HALTING
if self.playbin is not None:
self.playbin.set_state(gst.STATE_NULL)
for sink in self.playbin.sinks():
sink_name = sink.get_name()
sink_factory_name = sink.get_factory().get_name()
if sink_factory_name == 'fakesink':
sink_pad = sink.get_pad('sink')
sink_pad.remove_buffer_probe(self.buffer_probes[sink_name])
del self.buffer_probes[sink_name]
self.playbin = None
if self.thumbnail_pipeline is not None:
self.thumbnail_pipeline.set_state(gst.STATE_NULL)
self.thumbnail_pipeline = None
if self.playbin_message_bus is not None:
self.playbin_message_bus.disconnect(self.playbin_bus_watch_id)
self.playbin_message_bus = None
self.halt()
def halt(self):
gobject.idle_add(self.mainloop.quit)
def on_gobject_timeout(self):
_log.critical('Reached gobject timeout')
self.disconnect()
def get_duration(self, pipeline, attempt=1):
if attempt == 5:
_log.critical('Pipeline duration query retry limit reached.')
return 0
try:
return pipeline.query_duration(gst.FORMAT_TIME)[0]
except gst.QueryError as exc:
_log.error('Could not get duration on attempt {0}: {1}'.format(
attempt,
exc))
return self.get_duration(pipeline, attempt + 1)
class VideoTranscoder(object):
'''
Video transcoder
Transcodes the SRC video file to a VP8 WebM video file at DST
- Does the same thing as VideoThumbnailer, but produces a WebM vp8
and vorbis video file.
- The VideoTranscoder exceeds the VideoThumbnailer in the way
that it was refined afterwards and therefore is done more
correctly.
'''
def __init__(self):
_log.info('Initializing VideoTranscoder...')
self.progress_percentage = None
self.loop = gobject.MainLoop()
def transcode(self, src, dst, **kwargs):
'''
Transcode a video file into a 'medium'-sized version.
'''
self.source_path = src
self.destination_path = dst
# vp8enc options
self.destination_dimensions = kwargs.get('dimensions', (640, 640))
self.vp8_quality = kwargs.get('vp8_quality', 8)
# Number of threads used by vp8enc:
# number of real cores - 1 as per recommendation on
#
self.vp8_threads = kwargs.get('vp8_threads', CPU_COUNT - 1)
# 0 means auto-detect, but dict.get() only falls back to CPU_COUNT
# if value is None, this will correct our incompatibility with
# dict.get()
# This will also correct cases where there's only 1 CPU core, see
# original self.vp8_threads assignment above.
if self.vp8_threads == 0:
self.vp8_threads = CPU_COUNT
# vorbisenc options
self.vorbis_quality = kwargs.get('vorbis_quality', 0.3)
self._progress_callback = kwargs.get('progress_callback') or None
if not type(self.destination_dimensions) == tuple:
raise Exception('dimensions must be tuple: (width, height)')
self._setup()
self._run()
# XXX: This could be a static method.
def discover(self, src):
'''
Discover properties about a media file
'''
_log.info('Discovering {0}'.format(src))
self.source_path = src
self._setup_discover(discovered_callback=self.__on_discovered)
self.discoverer.discover()
self.loop.run()
if hasattr(self, '_discovered_data'):
return self._discovered_data.__dict__
else:
return None
def __on_discovered(self, data, is_media):
_log.debug('Discovered: {0}'.format(data))
if not is_media:
self.__stop()
raise Exception('Could not discover {0}'.format(self.source_path))
self._discovered_data = data
self.__stop_mainloop()
def _setup(self):
self._setup_discover()
self._setup_pipeline()
def _run(self):
_log.info('Discovering...')
self.discoverer.discover()
_log.info('Done')
_log.debug('Initializing MainLoop()')
self.loop.run()
def _setup_discover(self, **kw):
_log.debug('Setting up discoverer')
self.discoverer = discoverer.Discoverer(self.source_path)
# Connect self.__discovered to the 'discovered' event
self.discoverer.connect(
'discovered',
kw.get('discovered_callback', self.__discovered))
def __discovered(self, data, is_media):
'''
Callback for media discoverer.
'''
if not is_media:
self.__stop()
raise Exception('Could not discover {0}'.format(self.source_path))
_log.debug('__discovered, data: {0}'.format(data.__dict__))
self.data = data
# Launch things that should be done after discovery
self._link_elements()
self.__setup_videoscale_capsfilter()
# Tell the transcoding pipeline to start running
self.pipeline.set_state(gst.STATE_PLAYING)
_log.info('Transcoding...')
def _setup_pipeline(self):
_log.debug('Setting up transcoding pipeline')
# Create the pipeline bin.
self.pipeline = gst.Pipeline('VideoTranscoderPipeline')
# Create all GStreamer elements, starting with
# filesrc & decoder
self.filesrc = gst.element_factory_make('filesrc', 'filesrc')
self.filesrc.set_property('location', self.source_path)
self.pipeline.add(self.filesrc)
self.decoder = gst.element_factory_make('decodebin2', 'decoder')
self.decoder.connect('new-decoded-pad', self._on_dynamic_pad)
self.pipeline.add(self.decoder)
# Video elements
self.videoqueue = gst.element_factory_make('queue', 'videoqueue')
self.pipeline.add(self.videoqueue)
self.videorate = gst.element_factory_make('videorate', 'videorate')
self.pipeline.add(self.videorate)
self.ffmpegcolorspace = gst.element_factory_make(
'ffmpegcolorspace', 'ffmpegcolorspace')
self.pipeline.add(self.ffmpegcolorspace)
self.videoscale = gst.element_factory_make('ffvideoscale', 'videoscale')
#self.videoscale.set_property('method', 2) # I'm not sure this works
#self.videoscale.set_property('add-borders', 0)
self.pipeline.add(self.videoscale)
self.capsfilter = gst.element_factory_make('capsfilter', 'capsfilter')
self.pipeline.add(self.capsfilter)
self.vp8enc = gst.element_factory_make('vp8enc', 'vp8enc')
self.vp8enc.set_property('quality', self.vp8_quality)
self.vp8enc.set_property('threads', self.vp8_threads)
self.vp8enc.set_property('max-latency', 25)
self.pipeline.add(self.vp8enc)
# Audio elements
self.audioqueue = gst.element_factory_make('queue', 'audioqueue')
self.pipeline.add(self.audioqueue)
self.audiorate = gst.element_factory_make('audiorate', 'audiorate')
self.audiorate.set_property('tolerance', 80000000)
self.pipeline.add(self.audiorate)
self.audioconvert = gst.element_factory_make('audioconvert', 'audioconvert')
self.pipeline.add(self.audioconvert)
self.audiocapsfilter = gst.element_factory_make('capsfilter',
'audiocapsfilter')
audiocaps = ['audio/x-raw-float']
self.audiocapsfilter.set_property(
'caps',
gst.caps_from_string(
','.join(audiocaps)))
self.pipeline.add(self.audiocapsfilter)
self.vorbisenc = gst.element_factory_make('vorbisenc', 'vorbisenc')
self.vorbisenc.set_property('quality', self.vorbis_quality)
self.pipeline.add(self.vorbisenc)
# WebMmux & filesink
self.webmmux = gst.element_factory_make('webmmux', 'webmmux')
self.pipeline.add(self.webmmux)
self.filesink = gst.element_factory_make('filesink', 'filesink')
self.filesink.set_property('location', self.destination_path)
self.pipeline.add(self.filesink)
# Progressreport
self.progressreport = gst.element_factory_make(
'progressreport', 'progressreport')
# Update every second
self.progressreport.set_property('update-freq', 1)
self.progressreport.set_property('silent', True)
self.pipeline.add(self.progressreport)
def _link_elements(self):
'''
Link all the elements
This code depends on data from the discoverer and is called
from __discovered
'''
_log.debug('linking elements')
# Link the filesrc element to the decoder. The decoder then emits
# 'new-decoded-pad' which links decoded src pads to either a video
# or audio sink
self.filesrc.link(self.decoder)
# Link all the video elements in a row to webmmux
gst.element_link_many(
self.videoqueue,
self.videorate,
self.ffmpegcolorspace,
self.videoscale,
self.capsfilter,
self.vp8enc,
self.webmmux)
if self.data.is_audio:
# Link all the audio elements in a row to webmux
gst.element_link_many(
self.audioqueue,
self.audiorate,
self.audioconvert,
self.audiocapsfilter,
self.vorbisenc,
self.webmmux)
gst.element_link_many(
self.webmmux,
self.progressreport,
self.filesink)
# Setup the message bus and connect _on_message to the pipeline
self._setup_bus()
def _on_dynamic_pad(self, dbin, pad, islast):
'''
Callback called when ``decodebin2`` has a pad that we can connect to
'''
# Intersect the capabilities of the video sink and the pad src
# Then check if they have no common capabilities.
if self.ffmpegcolorspace.get_pad_template('sink')\
.get_caps().intersect(pad.get_caps()).is_empty():
# It is NOT a video src pad.
pad.link(self.audioqueue.get_pad('sink'))
else:
# It IS a video src pad.
pad.link(self.videoqueue.get_pad('sink'))
def _setup_bus(self):
self.bus = self.pipeline.get_bus()
self.bus.add_signal_watch()
self.bus.connect('message', self._on_message)
def __setup_videoscale_capsfilter(self):
'''
Sets up the output format (width, height) for the video
'''
caps = ['video/x-raw-yuv', 'pixel-aspect-ratio=1/1', 'framerate=30/1']
if self.data.videoheight > self.data.videowidth:
# Whoa! We have ourselves a portrait video!
caps.append('height={0}'.format(
self.destination_dimensions[1]))
else:
# It's a landscape, phew, how normal.
caps.append('width={0}'.format(
self.destination_dimensions[0]))
self.capsfilter.set_property(
'caps',
gst.caps_from_string(
','.join(caps)))
def _on_message(self, bus, message):
_log.debug((bus, message, message.type))
t = message.type
if message.type == gst.MESSAGE_EOS:
self._discover_dst_and_stop()
_log.info('Done')
elif message.type == gst.MESSAGE_ELEMENT:
if message.structure.get_name() == 'progress':
data = dict(message.structure)
# Update progress state if it has changed
if self.progress_percentage != data.get('percent'):
self.progress_percentage = data.get('percent')
if self._progress_callback:
self._progress_callback(data.get('percent'))
_log.info('{percent}% done...'.format(
percent=data.get('percent')))
_log.debug(data)
elif t == gst.MESSAGE_ERROR:
_log.error((bus, message))
self.__stop()
def _discover_dst_and_stop(self):
self.dst_discoverer = discoverer.Discoverer(self.destination_path)
self.dst_discoverer.connect('discovered', self.__dst_discovered)
self.dst_discoverer.discover()
def __dst_discovered(self, data, is_media):
self.dst_data = data
self.__stop()
def __stop(self):
_log.debug(self.loop)
if hasattr(self, 'pipeline'):
# Stop executing the pipeline
self.pipeline.set_state(gst.STATE_NULL)
# This kills the loop, mercifully
gobject.idle_add(self.__stop_mainloop)
def __stop_mainloop(self):
'''
Wrapper for gobject.MainLoop.quit()
This wrapper makes us able to see if self.loop.quit has been called
'''
_log.info('Terminating MainLoop')
self.loop.quit()
if __name__ == '__main__':
os.nice(19)
logging.basicConfig()
from optparse import OptionParser
parser = OptionParser(
usage='%prog [-v] -a [ video | thumbnail | discover ] SRC [ DEST ]')
parser.add_option('-a', '--action',
dest='action',
help='One of "video", "discover" or "thumbnail"')
parser.add_option('-v',
dest='verbose',
action='store_true',
help='Output debug information')
parser.add_option('-q',
dest='quiet',
action='store_true',
help='Dear program, please be quiet unless *error*')
parser.add_option('-w', '--width',
type=int,
default=180)
(options, args) = parser.parse_args()
if options.verbose:
_log.setLevel(logging.DEBUG)
else:
_log.setLevel(logging.INFO)
if options.quiet:
_log.setLevel(logging.ERROR)
_log.debug(args)
if not len(args) == 2 and not options.action == 'discover':
parser.print_help()
sys.exit()
transcoder = VideoTranscoder()
if options.action == 'thumbnail':
args.append(options.width)
VideoThumbnailerMarkII(*args)
elif options.action == 'video':
def cb(data):
print('I\'m a callback!')
transcoder.transcode(*args, progress_callback=cb)
elif options.action == 'discover':
print transcoder.discover(*args)