From ba4fea24ed231612d0fa4d31694e7712f2386fb4 Mon Sep 17 00:00:00 2001 From: =?utf8?q?David=20Test=C3=A9?= Date: Mon, 7 Mar 2016 22:52:51 +0100 Subject: [PATCH] Add full back-up pipeline. Add backup feed filename formatting. Modify connect_tee() to receive a third argument as keyword. Clean add_elements_to_pipeline() and link_pipeline_elements(). --- stream_2016/gstconf.py | 245 +++++++++++++++++++++++------------------ 1 file changed, 136 insertions(+), 109 deletions(-) diff --git a/stream_2016/gstconf.py b/stream_2016/gstconf.py index 254da74..9e69df6 100755 --- a/stream_2016/gstconf.py +++ b/stream_2016/gstconf.py @@ -19,6 +19,7 @@ from os import rename from os import listdir +from time import localtime, strftime import gi from gi.repository import Gst @@ -29,6 +30,13 @@ PATHNAME = '' AUDIO_DEFAULT = PATHNAME + 'AUDIO_DEFAULT' RAWVIDEO_DEFAULT = PATHNAME + 'RAWVIDEO_DEFAULT' STREAM_DEFAULT = PATHNAME + 'STREAM_DEFAULT' +BACKUP_SUFFIX = '_BACKUP' +AUDIO_BACKUP = AUDIO_DEFAULT + BACKUP_SUFFIX +RAWVIDEO_BACKUP = RAWVIDEO_DEFAULT + BACKUP_SUFFIX +STREAM_BACKUP = STREAM_DEFAULT + BACKUP_SUFFIX +ERROR = '[ERROR] ' +INFO = '[INFO] ' +WARN = '[WARN] ' class New_user_pipeline(): @@ -36,21 +44,21 @@ class New_user_pipeline(): def __init__(self, feed='main'): self.feed = feed - if self.feed == 'main': - self.user_pipeline = self.create_gstreamer_pipeline() - elif self.feed == 'backup': - self.user_pipeline = self.create_gstreamer_pipeline(feed='backup') +## if self.feed == 'main': + self.user_pipeline = self.create_gstreamer_pipeline() +## elif self.feed == 'backup': +## self.user_pipeline = self.create_gstreamer_pipeline(feed='backup') def create_video_sources(self): """Create video inputs from various sources.""" self.videosrc = Gst.ElementFactory.make('rtspsrc', 'videosrc') self.videosrc.set_property('location', 'rtsp://192.168.48.2:554') self.videosrc.set_property('latency', 0) +## self.videosrc.set_property('debug', True) if self.feed == 'backup': self.videosrc_backup = Gst.ElementFactory.make('v4l2src', 'videosrc_backup') device_location = self.find_webcam_device() - print ('[INFO] Webcam device location: ', device_location) self.videosrc_backup.set_property('device', device_location) def find_webcam_device(self): @@ -63,7 +71,7 @@ class New_user_pipeline(): # Without built-in webcam elif len(devices) == 1: return '/dev/video0' - print('[ERROR] No webcam device found.') + print(ERROR, gettime(), 'No webcam device found.') def create_pipeline_callbacks(self): """Callbacks to connect dynamically created pads.""" @@ -101,15 +109,17 @@ class New_user_pipeline(): self.screensink.set_property('sync', False) # To icecast server: self.icecastsink_audio = Gst.ElementFactory.make('shout2send', 'icecastsink_audio') + self.icecastsink_audio.set_property('sync', False) ## Configuration should be written on a file locally to keep safe private addresses self.icecastsink_audio.set_property('ip', 'live2.fsf.org') self.icecastsink_audio.set_property('port', 80) - self.icecastsink_audio.set_property('mount', 'testaudio.ogv') + self.icecastsink_audio.set_property('mount', 'testaudio.ogg') self.icecastsink_audio.set_property('password', 'thahw3Wiez') self.icecastsink_stream = Gst.ElementFactory.make('shout2send', 'icecastsink_stream') + self.icecastsink_stream.set_property('sync', False) self.icecastsink_stream.set_property('ip', 'live2.fsf.org') self.icecastsink_stream.set_property('port', 80) - self.icecastsink_stream.set_property('mount', 'teststream.ogv') + self.icecastsink_stream.set_property('mount', 'teststream.webm') self.icecastsink_stream.set_property('password', 'thahw3Wiez') def create_payloader_elements(self): @@ -165,7 +175,8 @@ class New_user_pipeline(): tee_element, input_element, output_element_1, - output_element_2,): + output_element_2, + output_element_3=None,): """Links input and outputs of a given tee element.""" # Find a way to check if the element given are in the pipeline # then pass the result to the 'if' statement. @@ -176,8 +187,12 @@ class New_user_pipeline(): input_element.link(tee_element) tee_element.link(output_element_1) tee_element.link(output_element_2) + if output_element_3: + tee_element.link(output_element_3) else: - print('[ERROR] Couldn\'t link the tee. Element(s) probably not in the pipeline ') + print(ERROR, + gettime(), + 'Couldn\'t link the tee. Element(s) probably not in the pipeline ') def create_queues(self): # For video feed: @@ -192,13 +207,15 @@ class New_user_pipeline(): self.queuea_2 = Gst.ElementFactory.make('queue', 'queuea_2') self.queuea_3 = Gst.ElementFactory.make('queue', 'queuea_3') self.queuea_4 = Gst.ElementFactory.make('queue', 'queuea_4') + self.queuea_4.set_property('leaky', 2) self.queuea_5 = Gst.ElementFactory.make('queue', 'queuea_5') # For audio+video muxer: self.queuem_1 = Gst.ElementFactory.make('queue', 'queuem_1') self.queuem_2 = Gst.ElementFactory.make('queue', 'queuem_2') + self.queuem_2.set_property('leaky', 2) def create_pipeline_elements(self): - print('Pipeline creation state: creating elements... ', end='') + print(INFO, gettime(), 'Pipeline creation state: creating elements... ', end='') # Inputs elements: self.create_video_sources() self.create_audio_sources() @@ -216,128 +233,134 @@ class New_user_pipeline(): self.create_filesink() self.create_streamsink() print('created') + if self.feed == 'backup': + print (INFO, + gettime(), + 'Webcam device location: ', + self.videosrc_backup.get_property('device')) - def add_elements_to_pipeline(self, feed='main'): - print('Pipeline creation state: adding elements... ', end='') - - if feed == 'main': - # Add here the elments associated with the RTSP feed + def add_elements_to_pipeline(self): + print(INFO, gettime(), 'Pipeline creation state: adding elements... ', end='') + # Inputs elements: + self.streampipe.add(self.audiosrc) + # Middle elements: + self.streampipe.add(self.audiolevel) + self.streampipe.add(self.vorbisenc) + self.streampipe.add(self.vp8enc) + self.streampipe.add(self.mkvmux) + self.streampipe.add(self.oggmux) + self.streampipe.add(self.webmmux) + self.streampipe.add(self.tee_rawaudio) + self.streampipe.add(self.tee_rawvideo) + self.streampipe.add(self.tee_streamaudio) + self.streampipe.add(self.tee_streamfull) + self.streampipe.add(self.queuev_2) + self.streampipe.add(self.queuev_3) + self.streampipe.add(self.queuev_4) + self.streampipe.add(self.queuev_5) + self.streampipe.add(self.queuea_1) + self.streampipe.add(self.queuea_2) + self.streampipe.add(self.queuea_3) + self.streampipe.add(self.queuea_4) + self.streampipe.add(self.queuea_5) + self.streampipe.add(self.queuem_1) + self.streampipe.add(self.queuem_2) + # Outputs elements: + self.streampipe.add(self.screensink) + self.streampipe.add(self.disksink_rawvideo) + self.streampipe.add(self.disksink_audio) + self.streampipe.add(self.disksink_stream) + self.streampipe.add(self.icecastsink_audio) + self.streampipe.add(self.icecastsink_stream) + if self.feed == 'main': # Inputs elements: self.streampipe.add(self.videosrc) - self.streampipe.add(self.audiosrc) # Middle elements: - self.streampipe.add(self.audiolevel) self.streampipe.add(self.rtpjpegdepay) self.streampipe.add(self.jpegdec) - self.streampipe.add(self.tee_rawvideo) - self.streampipe.add(self.mkvmux) - self.streampipe.add(self.vorbisenc) - self.streampipe.add(self.oggmux) self.streampipe.add(self.scaling) self.streampipe.add(self.capsfilter) - self.streampipe.add(self.vp8enc) - self.streampipe.add(self.webmmux) - self.streampipe.add(self.tee_rawaudio) - self.streampipe.add(self.tee_streamaudio) self.streampipe.add(self.tee_videodecoded) - self.streampipe.add(self.tee_streamfull) self.streampipe.add(self.queuev_1) - self.streampipe.add(self.queuev_2) - self.streampipe.add(self.queuev_3) - self.streampipe.add(self.queuev_4) - self.streampipe.add(self.queuev_5) - self.streampipe.add(self.queuev_6) - self.streampipe.add(self.queuea_1) - self.streampipe.add(self.queuea_2) - self.streampipe.add(self.queuea_3) - self.streampipe.add(self.queuea_4) - self.streampipe.add(self.queuea_5) - self.streampipe.add(self.queuem_1) - self.streampipe.add(self.queuem_2) - # Outputs elements: - self.streampipe.add(self.screensink) - self.streampipe.add(self.disksink_rawvideo) - self.streampipe.add(self.disksink_audio) - self.streampipe.add(self.disksink_stream) - self.streampipe.add(self.icecastsink_audio) - self.streampipe.add(self.icecastsink_stream) - elif feed == 'backup': - # Add here the elments associated with the WEBCAM feed + elif self.feed == 'backup': + # Inputs elements: self.streampipe.add(self.videosrc_backup) + # Middle elements: self.streampipe.add(self.capsfilter_backup) - self.streampipe.add(self.queuev_1) - self.streampipe.add(self.screensink) print ('BACKUP OK...', end='') print('added') - def link_pipeline_elements(self, feed='main'): + def link_pipeline_elements(self): """Link all elements with static pads.""" - print('Pipeline creation state: linking elements... ', end='') - - if feed == 'main': + print(INFO, gettime(), 'Pipeline creation state: linking elements... ', end='') + # Audio feed: + self.audiosrc.link(self.audiolevel) + self.audiolevel.link(self.queuea_1) + self.queuea_1.link(self.vorbisenc) + self.connect_tee(self.tee_rawaudio, + self.vorbisenc, + self.queuea_2, + self.queuea_5,) + self.queuea_2.link(self.oggmux) + self.connect_tee(self.tee_streamaudio, + self.oggmux, + self.queuea_3, + self.queuea_4,) + self.queuea_3.link(self.disksink_audio) + self.queuea_4.link(self.icecastsink_audio) + self.queuea_5.link(self.webmmux) + # Video feed: + self.queuev_2.link(self.mkvmux) + self.mkvmux.link(self.queuev_4) + self.queuev_4.link(self.disksink_rawvideo) + self.queuev_3.link(self.screensink) + # Stream (audio+video) feed: + self.vp8enc.link(self.queuev_5) + self.queuev_5.link(self.webmmux) + self.connect_tee(self.tee_streamfull, + self.webmmux, + self.queuem_1, + self.queuem_2,) + self.queuem_1.link(self.disksink_stream) + self.queuem_2.link(self.icecastsink_stream) + if self.feed == 'main': # linking here RTSP feed - # Video feed: self.queuev_1.link(self.rtpjpegdepay) self.connect_tee(self.tee_rawvideo, self.rtpjpegdepay, self.queuev_2, self.jpegdec,) - self.queuev_2.link(self.mkvmux) - self.mkvmux.link(self.queuev_4) - self.queuev_4.link(self.disksink_rawvideo) self.connect_tee(self.tee_videodecoded, self.jpegdec, self.queuev_3, self.scaling,) - self.queuev_3.link(self.screensink) - # Audio feed: - self.audiosrc.link(self.audiolevel) - self.audiolevel.link(self.queuea_1) - self.queuea_1.link(self.vorbisenc) - self.connect_tee(self.tee_rawaudio, - self.vorbisenc, - self.queuea_2, - self.queuea_5,) - self.queuea_2.link(self.oggmux) - self.connect_tee(self.tee_streamaudio, - self.oggmux, - self.queuea_3, - self.queuea_4,) - self.queuea_3.link(self.disksink_audio) - self.queuea_4.link(self.icecastsink_audio) - self.queuea_5.link(self.webmmux) - # Stream (audio+video) feed: + # Stream (video) feed: self.scaling.link(self.capsfilter) self.capsfilter.link(self.vp8enc) - self.vp8enc.link(self.queuev_6) - self.queuev_6.link(self.webmmux) - self.connect_tee(self.tee_streamfull, - self.webmmux, - self.queuem_1, - self.queuem_2,) - self.queuem_1.link(self.disksink_stream) - self.queuem_2.link(self.icecastsink_stream) - elif feed == 'backup': + + elif self.feed == 'backup': # linking here backup feed (WEBCAM) self.videosrc_backup.link(self.capsfilter_backup) - self.capsfilter_backup.link(self.queuev_1) - self.queuev_1.link(self.screensink) + self.connect_tee(self.tee_rawvideo, + self.capsfilter_backup, + self.queuev_2, + self.queuev_3, + output_element_3=self.vp8enc) +## self.capsfilter_backup.link(self.queuev_3) + # Stream (video) feed: print('BACKUP OK...', end='') print('linked') - def create_gstreamer_pipeline(self, feed='main'): + def create_gstreamer_pipeline(self): # New empty pipeline: self.streampipe = Gst.Pipeline() self.create_pipeline_elements() # Setting-up: - if feed == 'main': - self.add_elements_to_pipeline() - self.link_pipeline_elements() + self.add_elements_to_pipeline() + self.link_pipeline_elements() + if self.feed == 'main': self.create_pipeline_callbacks() - elif feed == 'backup': - self.add_elements_to_pipeline(feed='backup') - self.link_pipeline_elements(feed='backup') global bus bus = self.streampipe.get_bus() @@ -346,13 +369,9 @@ class New_user_pipeline(): # Used to get messages that GStreamer emits. bus.connect("message", self.on_message) - print('Pipeline creation state: successfully done.') + print(INFO, gettime(), 'Pipeline creation state: successfully done.') return self.streampipe - def create_gstreamer_backup_pipeline(self): - """Creates a backup pipeline based on a webcam feed.""" - print("CREATE HERE A BACKUP PIPELINE") - def on_message(self, bus, message): # ## print("[MESSAGE]", message.get_structure().get_name()) # [DEBUG] @@ -362,18 +381,19 @@ class New_user_pipeline(): self.streampipe.set_state(Gst.State.NULL) elif t == Gst.MessageType.ERROR: err, debug = message.parse_error() - print ("Error: %s" % err, debug) - self.streampipe.set_state(Gst.State.NULL) -## self.create_gstreamer_backup_pipeline() -## return 'ERROR' + print (ERROR, '%s' % err, debug) +# self.streampipe.set_state(Gst.State.NULL) def stream_play(self): self.streampipe.set_state(Gst.State.PLAYING) - print('[INFO] PLAYING State resquested') + if self.feed == 'backup': + print(WARN, gettime(), 'Backup pipeline started.') + print(INFO, gettime(), 'PLAYING State resquested') def stream_stop(self): self.streampipe.set_state(Gst.State.NULL) - + print(INFO, gettime(), 'STOPPED State resquested') + def get_stream_state(self): print(self.streampipe.get_state(self)) ##[FIXME] return self.streampipe.get_state() @@ -384,10 +404,17 @@ class New_user_pipeline(): audio = PATHNAME + filename + '_AUDIO' rawvideo = PATHNAME + filename + '_RAWVIDEO' stream = PATHNAME + filename + '_STREAM' - rename(AUDIO_DEFAULT, audio) - rename(RAWVIDEO_DEFAULT, rawvideo) - rename(STREAM_DEFAULT, stream) + if self.feed == 'main': + rename(AUDIO_DEFAULT, audio) + rename(RAWVIDEO_DEFAULT, rawvideo) + rename(STREAM_DEFAULT, stream) + elif self.feed == 'backup': + rename(AUDIO_BACKUP, audio) + rename(RAWVIDEO_BACKUP, rawvideo) + rename(STREAM_BACKUP, stream) def get_gstreamer_bus(): return bus - \ No newline at end of file + +def gettime(): + return strftime('%y-%m-%d_%H:%M:%S ', localtime()) \ No newline at end of file -- 2.25.1