Added VideoThumbnailerMarkII
authorJoar Wandborg <git@wandborg.com>
Tue, 2 Oct 2012 14:56:29 +0000 (16:56 +0200)
committerJoar Wandborg <git@wandborg.com>
Tue, 2 Oct 2012 14:56:29 +0000 (16:56 +0200)
- Set video.processing to use VideoThumbnailerMarkII.

mediagoblin/media_types/video/processing.py
mediagoblin/media_types/video/transcoders.py

index abd14eed2fadeb0a289c660b48242e8e6fd93df7..ce47313f55f220e4a377c41e4674339970d00a67 100644 (file)
@@ -104,7 +104,10 @@ def process_video(entry):
 
     with tmp_thumb:
         # Create a thumbnail.jpg that fits in a 180x180 square
-        transcoders.VideoThumbnailer(queued_filename, tmp_thumb.name)
+        transcoders.VideoThumbnailerMarkII(
+                queued_filename,
+                tmp_thumb.name,
+                180)
 
         # Push the thumbnail to public storage
         _log.debug('Saving thumbnail...')
index 25846ffab9a41cdbdab790e6fcbf2e71b37c73b6..26f96b5f227ca42115a01fdf245f6e1e5e403444 100644 (file)
@@ -60,12 +60,6 @@ class VideoThumbnailer:
     STATE_PROCESSING = 2
 
     # The current thumbnailer state
-    state = STATE_NULL
-
-    # This will contain the thumbnailing pipeline
-    thumbnail_pipeline = None
-
-    buffer_probes = {}
 
     def __init__(self, source_path, dest_path):
         '''
@@ -78,6 +72,10 @@ class VideoThumbnailer:
           into the playbin
         - Initialize
         '''
+        # This will contain the thumbnailing pipeline
+        self.state = self.STATE_NULL
+        self.thumbnail_pipeline = None
+        self.buffer_probes = {}
         self.errors = []
 
         self.source_path = source_path
@@ -332,6 +330,298 @@ class VideoThumbnailer:
         self.loop.quit()
 
 
+class VideoThumbnailerMarkII(object):
+    '''
+    Creates a thumbnail from a video file. Rewrite of VideoThumbnailer.
+
+    Large parts of the functionality and overall architectue contained within
+    this object is taken from Participatory Culture Foundation's
+    `gst_extractor.Extractor` object last seen at
+    https://github.com/pculture/miro/blob/master/tv/lib/frontends/widgets/gst/gst_extractor.py
+    in the `miro` codebase.
+
+    The `miro` codebase and the gst_extractor.py are licensed under the GNU
+    General Public License v2 or later.
+    '''
+    STATE_NULL = 0
+    STATE_HALTING = 1
+    STATE_PROCESSING = 2
+    STATE_PROCESSING_THUMBNAIL = 3
+
+    def __init__(self, source_path, dest_path, width=None, height=None,
+            position_callback=None):
+        self.state = self.STATE_NULL
+
+        self.has_reached_playbin_pause = False
+
+        self.thumbnail_pipeline = None
+
+        self.permission_to_take_picture = False
+
+        self.buffer_probes = {}
+
+        self.errors = []
+
+        self.source_path = os.path.abspath(source_path)
+        self.dest_path = os.path.abspath(dest_path)
+
+        self.width = width
+        self.height = height
+        self.position_callback = position_callback \
+                or self.wadsworth_position_callback
+
+        self.mainloop = gobject.MainLoop()
+
+        self.playbin = gst.element_factory_make('playbin')
+
+        self.videosink = gst.element_factory_make('fakesink', 'videosink')
+        self.audiosink = gst.element_factory_make('fakesink', 'audiosink')
+
+        self.playbin.set_property('video-sink', self.videosink)
+        self.playbin.set_property('audio-sink', self.audiosink)
+
+        self.playbin_message_bus = self.playbin.get_bus()
+
+        self.playbin_message_bus.add_signal_watch()
+        self.playbin_bus_watch_id = self.playbin_message_bus.connect(
+                'message',
+                self.on_playbin_message)
+
+        self.playbin.set_property(
+                'uri',
+                'file:{0}'.format(
+                    urllib.pathname2url(self.source_path)))
+
+        self.playbin.set_state(gst.STATE_PAUSED)
+
+        try:
+            self.run()
+        except Exception as exc:
+            _log.critical(
+                    'Exception "{0}" caught, disconnecting and re-raising'\
+                            .format(exc))
+            self.disconnect()
+            raise
+
+    def wadsworth_position_callback(self, duration, gst):
+        return self.duration / 100 * 30
+
+    def run(self):
+        self.mainloop.run()
+
+    def on_playbin_message(self, message_bus, message):
+        _log.debug('playbin message: {0}'.format(message))
+
+        if message.type == gst.MESSAGE_ERROR:
+            _log.error('playbin error: {0}'.format(message))
+            gobject.idle_add(self.on_playbin_error)
+
+        if message.type == gst.MESSAGE_STATE_CHANGED:
+            prev_state, cur_state, pending_state = \
+                    message.parse_state_changed()
+
+            _log.debug('playbin state changed: \nprev: {0}\ncur: {1}\n \
+pending: {2}'.format(
+    prev_state,
+    cur_state,
+    pending_state))
+
+            if cur_state == gst.STATE_PAUSED:
+                if message.src == self.playbin:
+                    _log.info('playbin ready')
+                    gobject.idle_add(self.on_playbin_paused)
+
+    def on_playbin_paused(self):
+        if self.has_reached_playbin_pause:
+            _log.warn('Has already reached logic for playbin pause. Aborting \
+without doing anything this time.')
+            return False
+
+        self.has_reached_playbin_pause = True
+
+        current_video = self.playbin.get_property('current-video')
+
+        if not current_video:
+            _log.critical('thumbnail could not get any video data \
+from playbin')
+
+        self.duration = self.get_duration(self.playbin)
+        self.permission_to_take_picture = True
+        self.buffer_probes = {}
+
+        pipeline = ''.join([
+            'filesrc location="%s" ! decodebin ! ' % self.source_path,
+            'ffmpegcolorspace ! videoscale ! ',
+            'video/x-raw-rgb,depth=24,bpp=24,pixel-aspect-ratio=1/1',
+            ',width={0}'.format(self.width) if self.width else '',
+            ',height={0}'.format(self.height) if self.height else '',
+            ' ! ',
+            'fakesink signal-handoffs=True'])
+
+        _log.debug('thumbnail_pipeline: {0}'.format(pipeline))
+
+        self.thumbnail_pipeline = gst.parse_launch(pipeline)
+        self.thumbnail_message_bus = self.thumbnail_pipeline.get_bus()
+        self.thumbnail_message_bus.add_signal_watch()
+        self.thumbnail_bus_watch_id = self.thumbnail_message_bus.connect(
+                'message',
+                self.on_thumbnail_message)
+
+        self.thumbnail_pipeline.set_state(gst.STATE_PAUSED)
+
+        gobject.timeout_add(3000, self.on_gobject_timeout)
+
+        return False
+
+    def on_thumbnail_message(self, message_bus, message):
+        _log.debug('thumbnail message: {0}'.format(message))
+
+        if message.type == gst.MESSAGE_ERROR:
+            _log.error('thumbnail error: {0}'.format(message))
+            gobject.idle_add(self.on_thumbnail_error)
+
+        if message.type == gst.MESSAGE_STATE_CHANGED:
+            prev_state, cur_state, pending_state = \
+                    message.parse_state_changed()
+
+            _log.debug('thumbnail state changed: \nprev: {0}\ncur: {1}\n \
+pending: {2}'.format(
+    prev_state,
+    cur_state,
+    pending_state))
+
+            if cur_state == gst.STATE_PAUSED and\
+                    not self.state == self.STATE_PROCESSING_THUMBNAIL:
+                self.state = self.STATE_PROCESSING_THUMBNAIL
+
+                # Find the fakesink sink pad and attach the on_buffer_probe
+                # handler to it.
+                for sink in self.thumbnail_pipeline.sinks():
+                    sink_name = sink.get_name()
+                    sink_factory_name = sink.get_factory().get_name()
+
+                    if sink_factory_name == 'fakesink':
+                        sink_pad = sink.get_pad('sink')
+
+                        self.buffer_probes[sink_name] = sink_pad\
+                                .add_buffer_probe(
+                                        self.on_pad_buffer_probe,
+                                        sink_name)
+
+                        _log.info('Attached buffer probes: {0}'.format(
+                            self.buffer_probes))
+
+                        break
+
+                seek_amount = self.position_callback(self.duration, gst)
+
+                seek_result = self.thumbnail_pipeline.seek(
+                        1.0,
+                        gst.FORMAT_TIME,
+                        gst.SEEK_FLAG_FLUSH | gst.SEEK_FLAG_ACCURATE,
+                        gst.SEEK_TYPE_SET,
+                        seek_amount,
+                        gst.SEEK_TYPE_NONE,
+                        0)
+
+                if not seek_result:
+                    _log.critical('Could not seek.')
+
+            elif self.state == self.STATE_PROCESSING_THUMBNAIL:
+                _log.debug('Already processing thumbnail')
+
+    def on_pad_buffer_probe(self, *args):
+        _log.debug('buffer probe handler: {0}'.format(args))
+        gobject.idle_add(lambda: self.take_snapshot(*args))
+
+    def take_snapshot(self, pad, buff, name):
+        if self.state == self.STATE_HALTING:
+            _log.debug('Pipeline is halting, will not take snapshot')
+            return False
+
+        _log.info('Taking snapshot! ({0})'.format(
+            (pad, buff, name)))
+        try:
+            caps = buff.caps
+            if caps is None:
+                _log.error('No buffer caps present /take_snapshot')
+                self.disconnect()
+
+            _log.debug('caps: {0}'.format(caps))
+
+            filters = caps[0]
+            width = filters['width']
+            height = filters['height']
+
+            im = Image.new('RGB', (width, height))
+
+            data = pixbuf_to_pilbuf(buff.data)
+
+            im.putdata(data)
+
+            im.save(self.dest_path)
+
+            _log.info('Saved snapshot!')
+
+            self.disconnect()
+
+        except gst.QueryError as exc:
+            _log.error('take_snapshot - QueryError: {0}'.format(exc))
+
+        return False
+
+    def on_thumbnail_error(self):
+        _log.error('Thumbnailing failed.')
+        self.disconnect()
+
+    def disconnect(self):
+        self.state = self.STATE_HALTING
+
+        if self.playbin is not None:
+            self.playbin.set_state(gst.STATE_NULL)
+
+            for sink in self.playbin.sinks():
+                sink_name = sink.get_name()
+                sink_factory_name = sink.get_factory().get_name()
+
+                if sink_factory_name == 'fakesink':
+                    sink_pad = sink.get_pad('sink')
+                    sink_pad.remove_buffer_probe(self.buffer_probes[sink_name])
+                    del self.buffer_probes[sink_name]
+
+            self.playbin = None
+
+        if self.thumbnail_pipeline is not None:
+            self.thumbnail_pipeline.set_state(gst.STATE_NULL)
+            self.thumbnail_pipeline = None
+
+        if self.playbin_message_bus is not None:
+            self.playbin_message_bus.disconnect(self.playbin_bus_watch_id)
+            self.playbin_message_bus = None
+
+        self.halt()
+
+    def halt(self):
+        gobject.idle_add(self.mainloop.quit)
+
+    def on_gobject_timeout(self):
+        _log.critical('Reached gobject timeout')
+        self.disconnect()
+
+    def get_duration(self, pipeline, attempt=1):
+        if attempt == 5:
+            _log.critical('Pipeline duration query retry limit reached.')
+            return 0
+
+        try:
+            return pipeline.query_duration(gst.FORMAT_TIME)[0]
+        except gst.QueryError as exc:
+            _log.error('Could not get duration on attempt {0}: {1}'.format(
+                attempt,
+                exc))
+            return self.get_duration(pipeline, attempt + 1)
+
+
 class VideoTranscoder:
     '''
     Video transcoder
@@ -709,7 +999,7 @@ if __name__ == '__main__':
     transcoder = VideoTranscoder()
 
     if options.action == 'thumbnail':
-        VideoThumbnailer(*args)
+        VideoThumbnailerMarkII(*args)
     elif options.action == 'video':
         def cb(data):
             print('I\'m a callback!')