Add full back-up pipeline.
[libre-streamer.git] / stream_2016 / gstconf.py
1 #!/usr/bin/env python3.4
2
3 # This file is part of Libre-Streamer.
4 #
5 # Libre-Streamer is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
9 #
10 # Libre-Streamer is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with Libre-Streamer. If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Copyright (c) 2016 David Testé
19
20 from os import rename
21 from os import listdir
22 from time import localtime, strftime
23
24 import gi
25 from gi.repository import Gst
26 from gi.repository import GstVideo
27
28 # Pathname has to be defined
29 PATHNAME = ''
30 AUDIO_DEFAULT = PATHNAME + 'AUDIO_DEFAULT'
31 RAWVIDEO_DEFAULT = PATHNAME + 'RAWVIDEO_DEFAULT'
32 STREAM_DEFAULT = PATHNAME + 'STREAM_DEFAULT'
33 BACKUP_SUFFIX = '_BACKUP'
34 AUDIO_BACKUP = AUDIO_DEFAULT + BACKUP_SUFFIX
35 RAWVIDEO_BACKUP = RAWVIDEO_DEFAULT + BACKUP_SUFFIX
36 STREAM_BACKUP = STREAM_DEFAULT + BACKUP_SUFFIX
37 ERROR = '[ERROR] '
38 INFO = '[INFO] '
39 WARN = '[WARN] '
40
41
42 class New_user_pipeline():
43
44
45 def __init__(self, feed='main'):
46 self.feed = feed
47 ## if self.feed == 'main':
48 self.user_pipeline = self.create_gstreamer_pipeline()
49 ## elif self.feed == 'backup':
50 ## self.user_pipeline = self.create_gstreamer_pipeline(feed='backup')
51
52 def create_video_sources(self):
53 """Create video inputs from various sources."""
54 self.videosrc = Gst.ElementFactory.make('rtspsrc', 'videosrc')
55 self.videosrc.set_property('location', 'rtsp://192.168.48.2:554')
56 self.videosrc.set_property('latency', 0)
57 ## self.videosrc.set_property('debug', True)
58 if self.feed == 'backup':
59 self.videosrc_backup = Gst.ElementFactory.make('v4l2src',
60 'videosrc_backup')
61 device_location = self.find_webcam_device()
62 self.videosrc_backup.set_property('device', device_location)
63
64 def find_webcam_device(self):
65 """Look out for the USB webcam device."""
66 devices = [dev for dev in listdir('/dev/') if 'video' in dev]
67 for item in devices:
68 # In case of computer having a built-in webcam
69 if item != 'video0' and len(devices) > 1:
70 return '/dev/' + item
71 # Without built-in webcam
72 elif len(devices) == 1:
73 return '/dev/video0'
74 print(ERROR, gettime(), 'No webcam device found.')
75
76 def create_pipeline_callbacks(self):
77 """Callbacks to connect dynamically created pads."""
78 self.videosrc.connect('pad-added', self.on_pad_added_to_rtspsrc)
79
80 def on_pad_added_to_rtspsrc(self, rtspsrc, pad):
81 """Connect the dynamic 'src'pad of an RTSP source."""
82 sinkpad = self.queuev_1.get_static_pad('sink')
83 pad.link(sinkpad)
84
85 def create_audio_sources(self):
86 """Create audio inputs from various sources."""
87 self.audiosrc = Gst.ElementFactory.make('pulsesrc', 'audiosrc')
88 ## self.videosrc.set_property('latency', 0)
89
90 def create_audiolevel_plugin(self):
91 """Create audio level plugin to feed a vu-meter."""
92 self.audiolevel = Gst.ElementFactory.make('level', 'audiolevel')
93 self.audiolevel.set_property('interval', 200000000)
94
95 def create_filesink(self):
96 """Create storable output elements."""
97 self.disksink_rawvideo = Gst.ElementFactory.make('filesink')
98 #[TO DO]: File location has to be defined
99 self.disksink_rawvideo.set_property('location', RAWVIDEO_DEFAULT)
100 self.disksink_audio = Gst.ElementFactory.make('filesink')
101 self.disksink_audio.set_property('location', AUDIO_DEFAULT)
102 self.disksink_stream = Gst.ElementFactory.make('filesink')
103 self.disksink_stream.set_property('location', STREAM_DEFAULT)
104
105 def create_streamsink(self):
106 """Create streamable output elements."""
107 # To local screen:
108 self.screensink = Gst.ElementFactory.make('xvimagesink', 'screensink')
109 self.screensink.set_property('sync', False)
110 # To icecast server:
111 self.icecastsink_audio = Gst.ElementFactory.make('shout2send', 'icecastsink_audio')
112 self.icecastsink_audio.set_property('sync', False)
113 ## Configuration should be written on a file locally to keep safe private addresses
114 self.icecastsink_audio.set_property('ip', 'live2.fsf.org')
115 self.icecastsink_audio.set_property('port', 80)
116 self.icecastsink_audio.set_property('mount', 'testaudio.ogg')
117 self.icecastsink_audio.set_property('password', 'thahw3Wiez')
118 self.icecastsink_stream = Gst.ElementFactory.make('shout2send', 'icecastsink_stream')
119 self.icecastsink_stream.set_property('sync', False)
120 self.icecastsink_stream.set_property('ip', 'live2.fsf.org')
121 self.icecastsink_stream.set_property('port', 80)
122 self.icecastsink_stream.set_property('mount', 'teststream.webm')
123 self.icecastsink_stream.set_property('password', 'thahw3Wiez')
124
125 def create_payloader_elements(self):
126 pass
127
128 def create_depayloader_elements(self):
129 self.rtpjpegdepay = Gst.ElementFactory.make('rtpjpegdepay', 'rtpjpegdepay')
130
131 def create_encoder_elements(self):
132 # Audio encoders:
133 self.vorbisenc = Gst.ElementFactory.make('vorbisenc', 'vorbisenc')
134 # Video encoders:
135 self.vp8enc = Gst.ElementFactory.make('vp8enc', 'vp8enc')
136 self.vp8enc.set_property('min_quantizer', 1)
137 self.vp8enc.set_property('max_quantizer', 13)
138 self.vp8enc.set_property('cpu-used', 5)
139 self.vp8enc.set_property('deadline', 42000)
140 self.vp8enc.set_property('threads', 2)
141 self.vp8enc.set_property('sharpness', 7)
142
143 def create_decoder_elements(self):
144 self.jpegdec = Gst.ElementFactory.make('jpegdec', 'jpegdec')
145 self.jpegdec.set_property('max-errors', -1)
146
147 def create_muxer_elements(self):
148 self.oggmux = Gst.ElementFactory.make('oggmux', 'oggmux')
149 self.mkvmux = Gst.ElementFactory.make('matroskamux', 'mkvmux')
150 self.webmmux = Gst.ElementFactory.make('webmmux', 'webmmux')
151 self.webmmux.set_property('streamable', True)
152
153 def create_demuxer_elements(self):
154 pass
155
156 def create_filtering_elements(self):
157 self.scaling = Gst.ElementFactory.make('videoscale', 'scaling')
158 caps = Gst.caps_from_string('video/x-raw, width=(int)640, height=(int)360')
159 self.capsfilter = Gst.ElementFactory.make('capsfilter', 'capsfilter')
160 self.capsfilter.set_property('caps', caps)
161
162 caps_backup = Gst.caps_from_string('video/x-raw, width=(int)640, height=(int)360')
163 self.capsfilter_backup = Gst.ElementFactory.make('capsfilter', 'capsfilter_backup')
164 self.capsfilter_backup.set_property('caps', caps_backup)
165
166 def create_tee_elements(self):
167 """Create tee elements to divide feeds."""
168 self.tee_rawvideo = Gst.ElementFactory.make('tee', 'tee_rawvideo')
169 self.tee_videodecoded = Gst.ElementFactory.make('tee', 'tee_videodecoded')
170 self.tee_streamfull = Gst.ElementFactory.make('tee', 'tee_streamfull')
171 self.tee_rawaudio = Gst.ElementFactory.make('tee', 'tee_rawaudio')
172 self.tee_streamaudio = Gst.ElementFactory.make('tee', 'tee_streamaudio')
173
174 def connect_tee(self,
175 tee_element,
176 input_element,
177 output_element_1,
178 output_element_2,
179 output_element_3=None,):
180 """Links input and outputs of a given tee element."""
181 # Find a way to check if the element given are in the pipeline
182 # then pass the result to the 'if' statement.
183 ## argcheck = [True for arg in locals() if arg in 'the_list_of_elements_added']
184 ## print('[DEBUG] ArgList check: ', argcheck)
185 ## if False not in argcheck
186 if True:
187 input_element.link(tee_element)
188 tee_element.link(output_element_1)
189 tee_element.link(output_element_2)
190 if output_element_3:
191 tee_element.link(output_element_3)
192 else:
193 print(ERROR,
194 gettime(),
195 'Couldn\'t link the tee. Element(s) probably not in the pipeline ')
196
197 def create_queues(self):
198 # For video feed:
199 self.queuev_1 = Gst.ElementFactory.make('queue', 'queuev_1')
200 self.queuev_2 = Gst.ElementFactory.make('queue', 'queuev_2')
201 self.queuev_3 = Gst.ElementFactory.make('queue', 'queuev_3')
202 self.queuev_4 = Gst.ElementFactory.make('queue', 'queuev_4')
203 self.queuev_5 = Gst.ElementFactory.make('queue', 'queuev_5')
204 self.queuev_6 = Gst.ElementFactory.make('queue', 'queuev_6')
205 # For audio feed:
206 self.queuea_1 = Gst.ElementFactory.make('queue', 'queuea_1')
207 self.queuea_2 = Gst.ElementFactory.make('queue', 'queuea_2')
208 self.queuea_3 = Gst.ElementFactory.make('queue', 'queuea_3')
209 self.queuea_4 = Gst.ElementFactory.make('queue', 'queuea_4')
210 self.queuea_4.set_property('leaky', 2)
211 self.queuea_5 = Gst.ElementFactory.make('queue', 'queuea_5')
212 # For audio+video muxer:
213 self.queuem_1 = Gst.ElementFactory.make('queue', 'queuem_1')
214 self.queuem_2 = Gst.ElementFactory.make('queue', 'queuem_2')
215 self.queuem_2.set_property('leaky', 2)
216
217 def create_pipeline_elements(self):
218 print(INFO, gettime(), 'Pipeline creation state: creating elements... ', end='')
219 # Inputs elements:
220 self.create_video_sources()
221 self.create_audio_sources()
222 # Middle elements:
223 self.create_audiolevel_plugin()
224 self.create_payloader_elements()
225 self.create_depayloader_elements()
226 self.create_encoder_elements()
227 self.create_decoder_elements()
228 self.create_muxer_elements()
229 self.create_filtering_elements()
230 self.create_tee_elements()
231 self.create_queues()
232 # Output elements:
233 self.create_filesink()
234 self.create_streamsink()
235 print('created')
236 if self.feed == 'backup':
237 print (INFO,
238 gettime(),
239 'Webcam device location: ',
240 self.videosrc_backup.get_property('device'))
241
242
243 def add_elements_to_pipeline(self):
244 print(INFO, gettime(), 'Pipeline creation state: adding elements... ', end='')
245 # Inputs elements:
246 self.streampipe.add(self.audiosrc)
247 # Middle elements:
248 self.streampipe.add(self.audiolevel)
249 self.streampipe.add(self.vorbisenc)
250 self.streampipe.add(self.vp8enc)
251 self.streampipe.add(self.mkvmux)
252 self.streampipe.add(self.oggmux)
253 self.streampipe.add(self.webmmux)
254 self.streampipe.add(self.tee_rawaudio)
255 self.streampipe.add(self.tee_rawvideo)
256 self.streampipe.add(self.tee_streamaudio)
257 self.streampipe.add(self.tee_streamfull)
258 self.streampipe.add(self.queuev_2)
259 self.streampipe.add(self.queuev_3)
260 self.streampipe.add(self.queuev_4)
261 self.streampipe.add(self.queuev_5)
262 self.streampipe.add(self.queuea_1)
263 self.streampipe.add(self.queuea_2)
264 self.streampipe.add(self.queuea_3)
265 self.streampipe.add(self.queuea_4)
266 self.streampipe.add(self.queuea_5)
267 self.streampipe.add(self.queuem_1)
268 self.streampipe.add(self.queuem_2)
269 # Outputs elements:
270 self.streampipe.add(self.screensink)
271 self.streampipe.add(self.disksink_rawvideo)
272 self.streampipe.add(self.disksink_audio)
273 self.streampipe.add(self.disksink_stream)
274 self.streampipe.add(self.icecastsink_audio)
275 self.streampipe.add(self.icecastsink_stream)
276 if self.feed == 'main':
277 # Inputs elements:
278 self.streampipe.add(self.videosrc)
279 # Middle elements:
280 self.streampipe.add(self.rtpjpegdepay)
281 self.streampipe.add(self.jpegdec)
282 self.streampipe.add(self.scaling)
283 self.streampipe.add(self.capsfilter)
284 self.streampipe.add(self.tee_videodecoded)
285 self.streampipe.add(self.queuev_1)
286 elif self.feed == 'backup':
287 # Inputs elements:
288 self.streampipe.add(self.videosrc_backup)
289 # Middle elements:
290 self.streampipe.add(self.capsfilter_backup)
291 print ('BACKUP OK...', end='')
292 print('added')
293
294 def link_pipeline_elements(self):
295 """Link all elements with static pads."""
296 print(INFO, gettime(), 'Pipeline creation state: linking elements... ', end='')
297 # Audio feed:
298 self.audiosrc.link(self.audiolevel)
299 self.audiolevel.link(self.queuea_1)
300 self.queuea_1.link(self.vorbisenc)
301 self.connect_tee(self.tee_rawaudio,
302 self.vorbisenc,
303 self.queuea_2,
304 self.queuea_5,)
305 self.queuea_2.link(self.oggmux)
306 self.connect_tee(self.tee_streamaudio,
307 self.oggmux,
308 self.queuea_3,
309 self.queuea_4,)
310 self.queuea_3.link(self.disksink_audio)
311 self.queuea_4.link(self.icecastsink_audio)
312 self.queuea_5.link(self.webmmux)
313 # Video feed:
314 self.queuev_2.link(self.mkvmux)
315 self.mkvmux.link(self.queuev_4)
316 self.queuev_4.link(self.disksink_rawvideo)
317 self.queuev_3.link(self.screensink)
318 # Stream (audio+video) feed:
319 self.vp8enc.link(self.queuev_5)
320 self.queuev_5.link(self.webmmux)
321 self.connect_tee(self.tee_streamfull,
322 self.webmmux,
323 self.queuem_1,
324 self.queuem_2,)
325 self.queuem_1.link(self.disksink_stream)
326 self.queuem_2.link(self.icecastsink_stream)
327 if self.feed == 'main':
328 # linking here RTSP feed
329 self.queuev_1.link(self.rtpjpegdepay)
330 self.connect_tee(self.tee_rawvideo,
331 self.rtpjpegdepay,
332 self.queuev_2,
333 self.jpegdec,)
334 self.connect_tee(self.tee_videodecoded,
335 self.jpegdec,
336 self.queuev_3,
337 self.scaling,)
338 # Stream (video) feed:
339 self.scaling.link(self.capsfilter)
340 self.capsfilter.link(self.vp8enc)
341
342 elif self.feed == 'backup':
343 # linking here backup feed (WEBCAM)
344 self.videosrc_backup.link(self.capsfilter_backup)
345 self.connect_tee(self.tee_rawvideo,
346 self.capsfilter_backup,
347 self.queuev_2,
348 self.queuev_3,
349 output_element_3=self.vp8enc)
350 ## self.capsfilter_backup.link(self.queuev_3)
351 # Stream (video) feed:
352 print('BACKUP OK...', end='')
353 print('linked')
354
355 def create_gstreamer_pipeline(self):
356 # New empty pipeline:
357 self.streampipe = Gst.Pipeline()
358 self.create_pipeline_elements()
359 # Setting-up:
360 self.add_elements_to_pipeline()
361 self.link_pipeline_elements()
362 if self.feed == 'main':
363 self.create_pipeline_callbacks()
364
365 global bus
366 bus = self.streampipe.get_bus()
367 bus.add_signal_watch()
368 bus.enable_sync_message_emission()
369 # Used to get messages that GStreamer emits.
370 bus.connect("message", self.on_message)
371
372 print(INFO, gettime(), 'Pipeline creation state: successfully done.')
373 return self.streampipe
374
375 def on_message(self, bus, message):
376 #
377 ## print("[MESSAGE]", message.get_structure().get_name()) # [DEBUG]
378 #
379 t = message.type
380 if t == Gst.MessageType.EOS:
381 self.streampipe.set_state(Gst.State.NULL)
382 elif t == Gst.MessageType.ERROR:
383 err, debug = message.parse_error()
384 print (ERROR, '%s' % err, debug)
385 # self.streampipe.set_state(Gst.State.NULL)
386
387 def stream_play(self):
388 self.streampipe.set_state(Gst.State.PLAYING)
389 if self.feed == 'backup':
390 print(WARN, gettime(), 'Backup pipeline started.')
391 print(INFO, gettime(), 'PLAYING State resquested')
392
393 def stream_stop(self):
394 self.streampipe.set_state(Gst.State.NULL)
395 print(INFO, gettime(), 'STOPPED State resquested')
396
397 def get_stream_state(self):
398 print(self.streampipe.get_state(self))
399 ##[FIXME] return self.streampipe.get_state()
400
401 def set_filenames(self, string):
402 """Sets filename and location for each sink."""
403 filename = string
404 audio = PATHNAME + filename + '_AUDIO'
405 rawvideo = PATHNAME + filename + '_RAWVIDEO'
406 stream = PATHNAME + filename + '_STREAM'
407 if self.feed == 'main':
408 rename(AUDIO_DEFAULT, audio)
409 rename(RAWVIDEO_DEFAULT, rawvideo)
410 rename(STREAM_DEFAULT, stream)
411 elif self.feed == 'backup':
412 rename(AUDIO_BACKUP, audio)
413 rename(RAWVIDEO_BACKUP, rawvideo)
414 rename(STREAM_BACKUP, stream)
415
416 def get_gstreamer_bus():
417 return bus
418
419 def gettime():
420 return strftime('%y-%m-%d_%H:%M:%S ', localtime())