STATE_PROCESSING = 2
# The current thumbnailer state
- state = STATE_NULL
-
- # This will contain the thumbnailing pipeline
- thumbnail_pipeline = None
-
- buffer_probes = {}
def __init__(self, source_path, dest_path):
'''
into the playbin
- Initialize
'''
+ # This will contain the thumbnailing pipeline
+ self.state = self.STATE_NULL
+ self.thumbnail_pipeline = None
+ self.buffer_probes = {}
self.errors = []
self.source_path = source_path
self.loop.quit()
+class VideoThumbnailerMarkII(object):
+ '''
+ Creates a thumbnail from a video file. Rewrite of VideoThumbnailer.
+
+ Large parts of the functionality and overall architectue contained within
+ this object is taken from Participatory Culture Foundation's
+ `gst_extractor.Extractor` object last seen at
+ https://github.com/pculture/miro/blob/master/tv/lib/frontends/widgets/gst/gst_extractor.py
+ in the `miro` codebase.
+
+ The `miro` codebase and the gst_extractor.py are licensed under the GNU
+ General Public License v2 or later.
+ '''
+ STATE_NULL = 0
+ STATE_HALTING = 1
+ STATE_PROCESSING = 2
+ STATE_PROCESSING_THUMBNAIL = 3
+
+ def __init__(self, source_path, dest_path, width=None, height=None,
+ position_callback=None):
+ self.state = self.STATE_NULL
+
+ self.has_reached_playbin_pause = False
+
+ self.thumbnail_pipeline = None
+
+ self.permission_to_take_picture = False
+
+ self.buffer_probes = {}
+
+ self.errors = []
+
+ self.source_path = os.path.abspath(source_path)
+ self.dest_path = os.path.abspath(dest_path)
+
+ self.width = width
+ self.height = height
+ self.position_callback = position_callback \
+ or self.wadsworth_position_callback
+
+ self.mainloop = gobject.MainLoop()
+
+ self.playbin = gst.element_factory_make('playbin')
+
+ self.videosink = gst.element_factory_make('fakesink', 'videosink')
+ self.audiosink = gst.element_factory_make('fakesink', 'audiosink')
+
+ self.playbin.set_property('video-sink', self.videosink)
+ self.playbin.set_property('audio-sink', self.audiosink)
+
+ self.playbin_message_bus = self.playbin.get_bus()
+
+ self.playbin_message_bus.add_signal_watch()
+ self.playbin_bus_watch_id = self.playbin_message_bus.connect(
+ 'message',
+ self.on_playbin_message)
+
+ self.playbin.set_property(
+ 'uri',
+ 'file:{0}'.format(
+ urllib.pathname2url(self.source_path)))
+
+ self.playbin.set_state(gst.STATE_PAUSED)
+
+ try:
+ self.run()
+ except Exception as exc:
+ _log.critical(
+ 'Exception "{0}" caught, disconnecting and re-raising'\
+ .format(exc))
+ self.disconnect()
+ raise
+
+ def wadsworth_position_callback(self, duration, gst):
+ return self.duration / 100 * 30
+
+ def run(self):
+ self.mainloop.run()
+
+ def on_playbin_message(self, message_bus, message):
+ _log.debug('playbin message: {0}'.format(message))
+
+ if message.type == gst.MESSAGE_ERROR:
+ _log.error('playbin error: {0}'.format(message))
+ gobject.idle_add(self.on_playbin_error)
+
+ if message.type == gst.MESSAGE_STATE_CHANGED:
+ prev_state, cur_state, pending_state = \
+ message.parse_state_changed()
+
+ _log.debug('playbin state changed: \nprev: {0}\ncur: {1}\n \
+pending: {2}'.format(
+ prev_state,
+ cur_state,
+ pending_state))
+
+ if cur_state == gst.STATE_PAUSED:
+ if message.src == self.playbin:
+ _log.info('playbin ready')
+ gobject.idle_add(self.on_playbin_paused)
+
+ def on_playbin_paused(self):
+ if self.has_reached_playbin_pause:
+ _log.warn('Has already reached logic for playbin pause. Aborting \
+without doing anything this time.')
+ return False
+
+ self.has_reached_playbin_pause = True
+
+ current_video = self.playbin.get_property('current-video')
+
+ if not current_video:
+ _log.critical('thumbnail could not get any video data \
+from playbin')
+
+ self.duration = self.get_duration(self.playbin)
+ self.permission_to_take_picture = True
+ self.buffer_probes = {}
+
+ pipeline = ''.join([
+ 'filesrc location="%s" ! decodebin ! ' % self.source_path,
+ 'ffmpegcolorspace ! videoscale ! ',
+ 'video/x-raw-rgb,depth=24,bpp=24,pixel-aspect-ratio=1/1',
+ ',width={0}'.format(self.width) if self.width else '',
+ ',height={0}'.format(self.height) if self.height else '',
+ ' ! ',
+ 'fakesink signal-handoffs=True'])
+
+ _log.debug('thumbnail_pipeline: {0}'.format(pipeline))
+
+ self.thumbnail_pipeline = gst.parse_launch(pipeline)
+ self.thumbnail_message_bus = self.thumbnail_pipeline.get_bus()
+ self.thumbnail_message_bus.add_signal_watch()
+ self.thumbnail_bus_watch_id = self.thumbnail_message_bus.connect(
+ 'message',
+ self.on_thumbnail_message)
+
+ self.thumbnail_pipeline.set_state(gst.STATE_PAUSED)
+
+ gobject.timeout_add(3000, self.on_gobject_timeout)
+
+ return False
+
+ def on_thumbnail_message(self, message_bus, message):
+ _log.debug('thumbnail message: {0}'.format(message))
+
+ if message.type == gst.MESSAGE_ERROR:
+ _log.error('thumbnail error: {0}'.format(message))
+ gobject.idle_add(self.on_thumbnail_error)
+
+ if message.type == gst.MESSAGE_STATE_CHANGED:
+ prev_state, cur_state, pending_state = \
+ message.parse_state_changed()
+
+ _log.debug('thumbnail state changed: \nprev: {0}\ncur: {1}\n \
+pending: {2}'.format(
+ prev_state,
+ cur_state,
+ pending_state))
+
+ if cur_state == gst.STATE_PAUSED and\
+ not self.state == self.STATE_PROCESSING_THUMBNAIL:
+ self.state = self.STATE_PROCESSING_THUMBNAIL
+
+ # Find the fakesink sink pad and attach the on_buffer_probe
+ # handler to it.
+ for sink in self.thumbnail_pipeline.sinks():
+ sink_name = sink.get_name()
+ sink_factory_name = sink.get_factory().get_name()
+
+ if sink_factory_name == 'fakesink':
+ sink_pad = sink.get_pad('sink')
+
+ self.buffer_probes[sink_name] = sink_pad\
+ .add_buffer_probe(
+ self.on_pad_buffer_probe,
+ sink_name)
+
+ _log.info('Attached buffer probes: {0}'.format(
+ self.buffer_probes))
+
+ break
+
+ seek_amount = self.position_callback(self.duration, gst)
+
+ seek_result = self.thumbnail_pipeline.seek(
+ 1.0,
+ gst.FORMAT_TIME,
+ gst.SEEK_FLAG_FLUSH | gst.SEEK_FLAG_ACCURATE,
+ gst.SEEK_TYPE_SET,
+ seek_amount,
+ gst.SEEK_TYPE_NONE,
+ 0)
+
+ if not seek_result:
+ _log.critical('Could not seek.')
+
+ elif self.state == self.STATE_PROCESSING_THUMBNAIL:
+ _log.debug('Already processing thumbnail')
+
+ def on_pad_buffer_probe(self, *args):
+ _log.debug('buffer probe handler: {0}'.format(args))
+ gobject.idle_add(lambda: self.take_snapshot(*args))
+
+ def take_snapshot(self, pad, buff, name):
+ if self.state == self.STATE_HALTING:
+ _log.debug('Pipeline is halting, will not take snapshot')
+ return False
+
+ _log.info('Taking snapshot! ({0})'.format(
+ (pad, buff, name)))
+ try:
+ caps = buff.caps
+ if caps is None:
+ _log.error('No buffer caps present /take_snapshot')
+ self.disconnect()
+
+ _log.debug('caps: {0}'.format(caps))
+
+ filters = caps[0]
+ width = filters['width']
+ height = filters['height']
+
+ im = Image.new('RGB', (width, height))
+
+ data = pixbuf_to_pilbuf(buff.data)
+
+ im.putdata(data)
+
+ im.save(self.dest_path)
+
+ _log.info('Saved snapshot!')
+
+ self.disconnect()
+
+ except gst.QueryError as exc:
+ _log.error('take_snapshot - QueryError: {0}'.format(exc))
+
+ return False
+
+ def on_thumbnail_error(self):
+ _log.error('Thumbnailing failed.')
+ self.disconnect()
+
+ def disconnect(self):
+ self.state = self.STATE_HALTING
+
+ if self.playbin is not None:
+ self.playbin.set_state(gst.STATE_NULL)
+
+ for sink in self.playbin.sinks():
+ sink_name = sink.get_name()
+ sink_factory_name = sink.get_factory().get_name()
+
+ if sink_factory_name == 'fakesink':
+ sink_pad = sink.get_pad('sink')
+ sink_pad.remove_buffer_probe(self.buffer_probes[sink_name])
+ del self.buffer_probes[sink_name]
+
+ self.playbin = None
+
+ if self.thumbnail_pipeline is not None:
+ self.thumbnail_pipeline.set_state(gst.STATE_NULL)
+ self.thumbnail_pipeline = None
+
+ if self.playbin_message_bus is not None:
+ self.playbin_message_bus.disconnect(self.playbin_bus_watch_id)
+ self.playbin_message_bus = None
+
+ self.halt()
+
+ def halt(self):
+ gobject.idle_add(self.mainloop.quit)
+
+ def on_gobject_timeout(self):
+ _log.critical('Reached gobject timeout')
+ self.disconnect()
+
+ def get_duration(self, pipeline, attempt=1):
+ if attempt == 5:
+ _log.critical('Pipeline duration query retry limit reached.')
+ return 0
+
+ try:
+ return pipeline.query_duration(gst.FORMAT_TIME)[0]
+ except gst.QueryError as exc:
+ _log.error('Could not get duration on attempt {0}: {1}'.format(
+ attempt,
+ exc))
+ return self.get_duration(pipeline, attempt + 1)
+
+
class VideoTranscoder:
'''
Video transcoder
transcoder = VideoTranscoder()
if options.action == 'thumbnail':
- VideoThumbnailer(*args)
+ VideoThumbnailerMarkII(*args)
elif options.action == 'video':
def cb(data):
print('I\'m a callback!')