From e4a1b6d22aea7c241118e778ac960d26e4b47572 Mon Sep 17 00:00:00 2001 From: Joar Wandborg Date: Tue, 2 Oct 2012 16:56:29 +0200 Subject: [PATCH] Added VideoThumbnailerMarkII - Set video.processing to use VideoThumbnailerMarkII. --- mediagoblin/media_types/video/processing.py | 5 +- mediagoblin/media_types/video/transcoders.py | 304 ++++++++++++++++++- 2 files changed, 301 insertions(+), 8 deletions(-) diff --git a/mediagoblin/media_types/video/processing.py b/mediagoblin/media_types/video/processing.py index abd14eed..ce47313f 100644 --- a/mediagoblin/media_types/video/processing.py +++ b/mediagoblin/media_types/video/processing.py @@ -104,7 +104,10 @@ def process_video(entry): with tmp_thumb: # Create a thumbnail.jpg that fits in a 180x180 square - transcoders.VideoThumbnailer(queued_filename, tmp_thumb.name) + transcoders.VideoThumbnailerMarkII( + queued_filename, + tmp_thumb.name, + 180) # Push the thumbnail to public storage _log.debug('Saving thumbnail...') diff --git a/mediagoblin/media_types/video/transcoders.py b/mediagoblin/media_types/video/transcoders.py index 25846ffa..26f96b5f 100644 --- a/mediagoblin/media_types/video/transcoders.py +++ b/mediagoblin/media_types/video/transcoders.py @@ -60,12 +60,6 @@ class VideoThumbnailer: STATE_PROCESSING = 2 # The current thumbnailer state - state = STATE_NULL - - # This will contain the thumbnailing pipeline - thumbnail_pipeline = None - - buffer_probes = {} def __init__(self, source_path, dest_path): ''' @@ -78,6 +72,10 @@ class VideoThumbnailer: into the playbin - Initialize ''' + # This will contain the thumbnailing pipeline + self.state = self.STATE_NULL + self.thumbnail_pipeline = None + self.buffer_probes = {} self.errors = [] self.source_path = source_path @@ -332,6 +330,298 @@ class VideoThumbnailer: self.loop.quit() +class VideoThumbnailerMarkII(object): + ''' + Creates a thumbnail from a video file. Rewrite of VideoThumbnailer. + + Large parts of the functionality and overall architectue contained within + this object is taken from Participatory Culture Foundation's + `gst_extractor.Extractor` object last seen at + https://github.com/pculture/miro/blob/master/tv/lib/frontends/widgets/gst/gst_extractor.py + in the `miro` codebase. + + The `miro` codebase and the gst_extractor.py are licensed under the GNU + General Public License v2 or later. + ''' + STATE_NULL = 0 + STATE_HALTING = 1 + STATE_PROCESSING = 2 + STATE_PROCESSING_THUMBNAIL = 3 + + def __init__(self, source_path, dest_path, width=None, height=None, + position_callback=None): + self.state = self.STATE_NULL + + self.has_reached_playbin_pause = False + + self.thumbnail_pipeline = None + + self.permission_to_take_picture = False + + self.buffer_probes = {} + + self.errors = [] + + self.source_path = os.path.abspath(source_path) + self.dest_path = os.path.abspath(dest_path) + + self.width = width + self.height = height + self.position_callback = position_callback \ + or self.wadsworth_position_callback + + self.mainloop = gobject.MainLoop() + + self.playbin = gst.element_factory_make('playbin') + + self.videosink = gst.element_factory_make('fakesink', 'videosink') + self.audiosink = gst.element_factory_make('fakesink', 'audiosink') + + self.playbin.set_property('video-sink', self.videosink) + self.playbin.set_property('audio-sink', self.audiosink) + + self.playbin_message_bus = self.playbin.get_bus() + + self.playbin_message_bus.add_signal_watch() + self.playbin_bus_watch_id = self.playbin_message_bus.connect( + 'message', + self.on_playbin_message) + + self.playbin.set_property( + 'uri', + 'file:{0}'.format( + urllib.pathname2url(self.source_path))) + + self.playbin.set_state(gst.STATE_PAUSED) + + try: + self.run() + except Exception as exc: + _log.critical( + 'Exception "{0}" caught, disconnecting and re-raising'\ + .format(exc)) + self.disconnect() + raise + + def wadsworth_position_callback(self, duration, gst): + return self.duration / 100 * 30 + + def run(self): + self.mainloop.run() + + def on_playbin_message(self, message_bus, message): + _log.debug('playbin message: {0}'.format(message)) + + if message.type == gst.MESSAGE_ERROR: + _log.error('playbin error: {0}'.format(message)) + gobject.idle_add(self.on_playbin_error) + + if message.type == gst.MESSAGE_STATE_CHANGED: + prev_state, cur_state, pending_state = \ + message.parse_state_changed() + + _log.debug('playbin state changed: \nprev: {0}\ncur: {1}\n \ +pending: {2}'.format( + prev_state, + cur_state, + pending_state)) + + if cur_state == gst.STATE_PAUSED: + if message.src == self.playbin: + _log.info('playbin ready') + gobject.idle_add(self.on_playbin_paused) + + def on_playbin_paused(self): + if self.has_reached_playbin_pause: + _log.warn('Has already reached logic for playbin pause. Aborting \ +without doing anything this time.') + return False + + self.has_reached_playbin_pause = True + + current_video = self.playbin.get_property('current-video') + + if not current_video: + _log.critical('thumbnail could not get any video data \ +from playbin') + + self.duration = self.get_duration(self.playbin) + self.permission_to_take_picture = True + self.buffer_probes = {} + + pipeline = ''.join([ + 'filesrc location="%s" ! decodebin ! ' % self.source_path, + 'ffmpegcolorspace ! videoscale ! ', + 'video/x-raw-rgb,depth=24,bpp=24,pixel-aspect-ratio=1/1', + ',width={0}'.format(self.width) if self.width else '', + ',height={0}'.format(self.height) if self.height else '', + ' ! ', + 'fakesink signal-handoffs=True']) + + _log.debug('thumbnail_pipeline: {0}'.format(pipeline)) + + self.thumbnail_pipeline = gst.parse_launch(pipeline) + self.thumbnail_message_bus = self.thumbnail_pipeline.get_bus() + self.thumbnail_message_bus.add_signal_watch() + self.thumbnail_bus_watch_id = self.thumbnail_message_bus.connect( + 'message', + self.on_thumbnail_message) + + self.thumbnail_pipeline.set_state(gst.STATE_PAUSED) + + gobject.timeout_add(3000, self.on_gobject_timeout) + + return False + + def on_thumbnail_message(self, message_bus, message): + _log.debug('thumbnail message: {0}'.format(message)) + + if message.type == gst.MESSAGE_ERROR: + _log.error('thumbnail error: {0}'.format(message)) + gobject.idle_add(self.on_thumbnail_error) + + if message.type == gst.MESSAGE_STATE_CHANGED: + prev_state, cur_state, pending_state = \ + message.parse_state_changed() + + _log.debug('thumbnail state changed: \nprev: {0}\ncur: {1}\n \ +pending: {2}'.format( + prev_state, + cur_state, + pending_state)) + + if cur_state == gst.STATE_PAUSED and\ + not self.state == self.STATE_PROCESSING_THUMBNAIL: + self.state = self.STATE_PROCESSING_THUMBNAIL + + # Find the fakesink sink pad and attach the on_buffer_probe + # handler to it. + for sink in self.thumbnail_pipeline.sinks(): + sink_name = sink.get_name() + sink_factory_name = sink.get_factory().get_name() + + if sink_factory_name == 'fakesink': + sink_pad = sink.get_pad('sink') + + self.buffer_probes[sink_name] = sink_pad\ + .add_buffer_probe( + self.on_pad_buffer_probe, + sink_name) + + _log.info('Attached buffer probes: {0}'.format( + self.buffer_probes)) + + break + + seek_amount = self.position_callback(self.duration, gst) + + seek_result = self.thumbnail_pipeline.seek( + 1.0, + gst.FORMAT_TIME, + gst.SEEK_FLAG_FLUSH | gst.SEEK_FLAG_ACCURATE, + gst.SEEK_TYPE_SET, + seek_amount, + gst.SEEK_TYPE_NONE, + 0) + + if not seek_result: + _log.critical('Could not seek.') + + elif self.state == self.STATE_PROCESSING_THUMBNAIL: + _log.debug('Already processing thumbnail') + + def on_pad_buffer_probe(self, *args): + _log.debug('buffer probe handler: {0}'.format(args)) + gobject.idle_add(lambda: self.take_snapshot(*args)) + + def take_snapshot(self, pad, buff, name): + if self.state == self.STATE_HALTING: + _log.debug('Pipeline is halting, will not take snapshot') + return False + + _log.info('Taking snapshot! ({0})'.format( + (pad, buff, name))) + try: + caps = buff.caps + if caps is None: + _log.error('No buffer caps present /take_snapshot') + self.disconnect() + + _log.debug('caps: {0}'.format(caps)) + + filters = caps[0] + width = filters['width'] + height = filters['height'] + + im = Image.new('RGB', (width, height)) + + data = pixbuf_to_pilbuf(buff.data) + + im.putdata(data) + + im.save(self.dest_path) + + _log.info('Saved snapshot!') + + self.disconnect() + + except gst.QueryError as exc: + _log.error('take_snapshot - QueryError: {0}'.format(exc)) + + return False + + def on_thumbnail_error(self): + _log.error('Thumbnailing failed.') + self.disconnect() + + def disconnect(self): + self.state = self.STATE_HALTING + + if self.playbin is not None: + self.playbin.set_state(gst.STATE_NULL) + + for sink in self.playbin.sinks(): + sink_name = sink.get_name() + sink_factory_name = sink.get_factory().get_name() + + if sink_factory_name == 'fakesink': + sink_pad = sink.get_pad('sink') + sink_pad.remove_buffer_probe(self.buffer_probes[sink_name]) + del self.buffer_probes[sink_name] + + self.playbin = None + + if self.thumbnail_pipeline is not None: + self.thumbnail_pipeline.set_state(gst.STATE_NULL) + self.thumbnail_pipeline = None + + if self.playbin_message_bus is not None: + self.playbin_message_bus.disconnect(self.playbin_bus_watch_id) + self.playbin_message_bus = None + + self.halt() + + def halt(self): + gobject.idle_add(self.mainloop.quit) + + def on_gobject_timeout(self): + _log.critical('Reached gobject timeout') + self.disconnect() + + def get_duration(self, pipeline, attempt=1): + if attempt == 5: + _log.critical('Pipeline duration query retry limit reached.') + return 0 + + try: + return pipeline.query_duration(gst.FORMAT_TIME)[0] + except gst.QueryError as exc: + _log.error('Could not get duration on attempt {0}: {1}'.format( + attempt, + exc)) + return self.get_duration(pipeline, attempt + 1) + + class VideoTranscoder: ''' Video transcoder @@ -709,7 +999,7 @@ if __name__ == '__main__': transcoder = VideoTranscoder() if options.action == 'thumbnail': - VideoThumbnailer(*args) + VideoThumbnailerMarkII(*args) elif options.action == 'video': def cb(data): print('I\'m a callback!') -- 2.25.1