1 #!/usr/bin/env python3.4
3 # This file is part of ABYSS.
5 # ABYSS is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # ABYSS is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with ABYSS. If not, see <http://www.gnu.org/licenses/>.
18 # Copyright (c) 2016 David Testé
21 from os
import listdir
22 from time
import localtime
, strftime
25 from gi
.repository
import Gst
26 from gi
.repository
import GstVideo
28 # Pathname has to be defined
30 AUDIO_DEFAULT
= PATHNAME
+ 'AUDIO_DEFAULT'
31 RAWVIDEO_DEFAULT
= PATHNAME
+ 'RAWVIDEO_DEFAULT'
32 STREAM_DEFAULT
= PATHNAME
+ 'STREAM_DEFAULT'
33 BACKUP_SUFFIX
= '_BACKUP'
34 FAILED_SUFFIX
= '_FAILED_'
36 AUDIO_BACKUP
= AUDIO_DEFAULT
+ BACKUP_SUFFIX
37 RAWVIDEO_BACKUP
= RAWVIDEO_DEFAULT
+ BACKUP_SUFFIX
38 STREAM_BACKUP
= STREAM_DEFAULT
+ BACKUP_SUFFIX
43 AUDIO_INPUT
= 'alsa_input.usb-Burr-Brown_from_TI_USB_Audio_CODEC-00-CODEC.analog-stereo'
46 class New_user_pipeline():
49 def __init__(self
, feed
='main'):
51 self
.user_pipeline
= self
.create_gstreamer_pipeline()
53 def create_video_sources(self
):
54 """Create video inputs from various sources."""
55 self
.videosrc
= Gst
.ElementFactory
.make('rtspsrc', 'videosrc')
56 self
.videosrc
.set_property('location', 'rtsp://192.168.48.2:554')
57 self
.videosrc
.set_property('latency', 0)
58 ## self.videosrc.set_property('debug', True)
59 if self
.feed
== 'backup':
60 self
.videosrc_backup
= Gst
.ElementFactory
.make('v4l2src',
62 device_location
= self
.find_webcam_device()
63 self
.videosrc_backup
.set_property('device', device_location
)
65 def find_webcam_device(self
):
66 """Look out for the USB webcam device."""
67 devices
= [dev
for dev
in listdir('/dev/') if 'video' in dev
]
69 # In case of computer having a built-in webcam
70 if item
!= 'video0' and len(devices
) > 1:
72 # Without built-in webcam
73 elif len(devices
) == 1:
75 print(ERROR
, gettime(), 'No webcam device found.')
77 def find_mixingdesk_device(self
):
78 """Look out for the USB mixing desk device.
79 Product used here: Behringer XENYX Q1002USB.
81 # shell cmd : 'pactl list | grep alsa_input'
82 # AUDIO_INPUT --> const used currently
85 def create_pipeline_callbacks(self
):
86 """Callbacks to connect dynamically created pads."""
87 self
.videosrc
.connect('pad-added', self
.on_pad_added_to_rtspsrc
)
89 def on_pad_added_to_rtspsrc(self
, rtspsrc
, pad
):
90 """Connect the dynamic 'src'pad of an RTSP source."""
91 sinkpad
= self
.queuev_1
.get_static_pad('sink')
94 def create_audio_sources(self
):
95 """Create audio inputs from various sources."""
96 self
.audiosrc
= Gst
.ElementFactory
.make('pulsesrc', 'audiosrc')
97 self
.audiosrc
.set_property('device', AUDIO_INPUT
)
99 def create_audiolevel_plugin(self
):
100 """Create audio level plugin to feed a vu-meter."""
101 self
.audiolevel
= Gst
.ElementFactory
.make('level', 'audiolevel')
102 self
.audiolevel
.set_property('interval', 200000000)
104 def create_filesink(self
):
105 """Create storable output elements."""
106 self
.disksink_rawvideo
= Gst
.ElementFactory
.make('filesink')
107 #[TO DO]: File location has to be defined
108 self
.disksink_rawvideo
.set_property('location', RAWVIDEO_DEFAULT
)
109 self
.disksink_audio
= Gst
.ElementFactory
.make('filesink')
110 self
.disksink_audio
.set_property('location', AUDIO_DEFAULT
)
111 self
.disksink_stream
= Gst
.ElementFactory
.make('filesink')
112 self
.disksink_stream
.set_property('location', STREAM_DEFAULT
)
113 if self
.feed
== 'backup':
114 self
.disksink_rawvideo
.set_property('location', RAWVIDEO_BACKUP
)
115 self
.disksink_audio
.set_property('location', AUDIO_BACKUP
)
116 self
.disksink_stream
.set_property('location', STREAM_BACKUP
)
118 def create_streamsink(self
):
119 """Create streamable output elements."""
121 self
.screensink
= Gst
.ElementFactory
.make('xvimagesink', 'screensink')
122 self
.screensink
.set_property('sync', False)
124 self
.icecastsink_audio
= Gst
.ElementFactory
.make('shout2send', 'icecastsink_audio')
125 self
.icecastsink_audio
.set_property('sync', False)
126 ## Configuration should be written on a file locally to keep safe private addresses
127 self
.icecastsink_audio
.set_property('ip', 'live2.fsf.org')
128 self
.icecastsink_audio
.set_property('port', 80)
129 self
.icecastsink_audio
.set_property('mount', 'testaudio.ogg')
130 self
.icecastsink_audio
.set_property('password', 'thahw3Wiez')
131 self
.icecastsink_stream
= Gst
.ElementFactory
.make('shout2send', 'icecastsink_stream')
132 self
.icecastsink_stream
.set_property('sync', False)
133 self
.icecastsink_stream
.set_property('ip', 'live2.fsf.org')
134 self
.icecastsink_stream
.set_property('port', 80)
135 self
.icecastsink_stream
.set_property('mount', 'teststream.webm')
136 self
.icecastsink_stream
.set_property('password', 'thahw3Wiez')
138 def create_payloader_elements(self
):
141 def create_depayloader_elements(self
):
142 self
.rtpjpegdepay
= Gst
.ElementFactory
.make('rtpjpegdepay', 'rtpjpegdepay')
144 def create_encoder_elements(self
):
146 self
.vorbisenc
= Gst
.ElementFactory
.make('vorbisenc', 'vorbisenc')
148 self
.vp8enc
= Gst
.ElementFactory
.make('vp8enc', 'vp8enc')
149 self
.vp8enc
.set_property('min_quantizer', 1)
150 self
.vp8enc
.set_property('max_quantizer', 13)
151 self
.vp8enc
.set_property('cpu-used', 5)
152 self
.vp8enc
.set_property('deadline', 42000)
153 self
.vp8enc
.set_property('threads', 2)
154 self
.vp8enc
.set_property('sharpness', 7)
156 def create_decoder_elements(self
):
157 self
.jpegdec
= Gst
.ElementFactory
.make('jpegdec', 'jpegdec')
158 self
.jpegdec
.set_property('max-errors', -1)
160 def create_muxer_elements(self
):
161 self
.oggmux
= Gst
.ElementFactory
.make('oggmux', 'oggmux')
162 self
.mkvmux
= Gst
.ElementFactory
.make('matroskamux', 'mkvmux')
163 self
.webmmux
= Gst
.ElementFactory
.make('webmmux', 'webmmux')
164 self
.webmmux
.set_property('streamable', True)
166 def create_demuxer_elements(self
):
169 def create_filtering_elements(self
):
170 self
.scaling
= Gst
.ElementFactory
.make('videoscale', 'scaling')
171 caps
= Gst
.caps_from_string('video/x-raw, width=(int)640, height=(int)360')
172 self
.capsfilter
= Gst
.ElementFactory
.make('capsfilter', 'capsfilter')
173 self
.capsfilter
.set_property('caps', caps
)
175 caps_backup
= Gst
.caps_from_string('video/x-raw, width=(int)640, height=(int)360')
176 self
.capsfilter_backup
= Gst
.ElementFactory
.make('capsfilter', 'capsfilter_backup')
177 self
.capsfilter_backup
.set_property('caps', caps_backup
)
179 def create_tee_elements(self
):
180 """Create tee elements to divide feeds."""
181 self
.tee_rawvideo
= Gst
.ElementFactory
.make('tee', 'tee_rawvideo')
182 self
.tee_videodecoded
= Gst
.ElementFactory
.make('tee', 'tee_videodecoded')
183 self
.tee_streamfull
= Gst
.ElementFactory
.make('tee', 'tee_streamfull')
184 self
.tee_rawaudio
= Gst
.ElementFactory
.make('tee', 'tee_rawaudio')
185 self
.tee_streamaudio
= Gst
.ElementFactory
.make('tee', 'tee_streamaudio')
187 def connect_tee(self
,
192 output_element_3
=None,):
193 """Links input and outputs of a given tee element."""
194 # Find a way to check if the element given are in the pipeline
195 # then pass the result to the 'if' statement.
196 ## argcheck = [True for arg in locals() if arg in 'the_list_of_elements_added']
197 ## print('[DEBUG] ArgList check: ', argcheck)
198 ## if False not in argcheck
200 input_element
.link(tee_element
)
201 tee_element
.link(output_element_1
)
202 tee_element
.link(output_element_2
)
204 tee_element
.link(output_element_3
)
208 'Couldn\'t link the tee. Element(s) probably not in the pipeline ')
210 def create_queues(self
):
212 self
.queuev_1
= Gst
.ElementFactory
.make('queue', 'queuev_1')
213 self
.queuev_2
= Gst
.ElementFactory
.make('queue', 'queuev_2')
214 self
.queuev_3
= Gst
.ElementFactory
.make('queue', 'queuev_3')
215 self
.queuev_4
= Gst
.ElementFactory
.make('queue', 'queuev_4')
216 self
.queuev_5
= Gst
.ElementFactory
.make('queue', 'queuev_5')
217 self
.queuev_6
= Gst
.ElementFactory
.make('queue', 'queuev_6')
219 self
.queuea_1
= Gst
.ElementFactory
.make('queue', 'queuea_1')
220 self
.queuea_2
= Gst
.ElementFactory
.make('queue', 'queuea_2')
221 self
.queuea_3
= Gst
.ElementFactory
.make('queue', 'queuea_3')
222 self
.queuea_4
= Gst
.ElementFactory
.make('queue', 'queuea_4')
223 self
.queuea_4
.set_property('leaky', 2)
224 self
.queuea_5
= Gst
.ElementFactory
.make('queue', 'queuea_5')
225 # For audio+video muxer:
226 self
.queuem_1
= Gst
.ElementFactory
.make('queue', 'queuem_1')
227 self
.queuem_2
= Gst
.ElementFactory
.make('queue', 'queuem_2')
228 self
.queuem_2
.set_property('leaky', 2)
230 def create_pipeline_elements(self
):
231 print(INFO
, gettime(), 'Pipeline creation state: creating elements... ', end
='')
233 self
.create_video_sources()
234 self
.create_audio_sources()
236 self
.create_audiolevel_plugin()
237 self
.create_payloader_elements()
238 self
.create_depayloader_elements()
239 self
.create_encoder_elements()
240 self
.create_decoder_elements()
241 self
.create_muxer_elements()
242 self
.create_filtering_elements()
243 self
.create_tee_elements()
246 self
.create_filesink()
247 self
.create_streamsink()
249 if self
.feed
== 'backup':
252 'Webcam device location: ',
253 self
.videosrc_backup
.get_property('device'))
256 def add_elements_to_pipeline(self
):
257 print(INFO
, gettime(), 'Pipeline creation state: adding elements... ', end
='')
259 self
.streampipe
.add(self
.audiosrc
)
261 self
.streampipe
.add(self
.audiolevel
)
262 self
.streampipe
.add(self
.vorbisenc
)
263 self
.streampipe
.add(self
.vp8enc
)
264 self
.streampipe
.add(self
.mkvmux
)
265 self
.streampipe
.add(self
.oggmux
)
266 self
.streampipe
.add(self
.webmmux
)
267 self
.streampipe
.add(self
.tee_rawaudio
)
268 self
.streampipe
.add(self
.tee_rawvideo
)
269 self
.streampipe
.add(self
.tee_streamaudio
)
270 self
.streampipe
.add(self
.tee_streamfull
)
271 self
.streampipe
.add(self
.queuev_2
)
272 self
.streampipe
.add(self
.queuev_3
)
273 self
.streampipe
.add(self
.queuev_4
)
274 self
.streampipe
.add(self
.queuev_5
)
275 self
.streampipe
.add(self
.queuea_1
)
276 self
.streampipe
.add(self
.queuea_2
)
277 self
.streampipe
.add(self
.queuea_3
)
278 self
.streampipe
.add(self
.queuea_4
)
279 self
.streampipe
.add(self
.queuea_5
)
280 self
.streampipe
.add(self
.queuem_1
)
281 self
.streampipe
.add(self
.queuem_2
)
283 self
.streampipe
.add(self
.screensink
)
284 self
.streampipe
.add(self
.disksink_rawvideo
)
285 self
.streampipe
.add(self
.disksink_audio
)
286 self
.streampipe
.add(self
.disksink_stream
)
287 self
.streampipe
.add(self
.icecastsink_audio
)
288 self
.streampipe
.add(self
.icecastsink_stream
)
289 if self
.feed
== 'main':
291 self
.streampipe
.add(self
.videosrc
)
293 self
.streampipe
.add(self
.rtpjpegdepay
)
294 self
.streampipe
.add(self
.jpegdec
)
295 self
.streampipe
.add(self
.scaling
)
296 self
.streampipe
.add(self
.capsfilter
)
297 self
.streampipe
.add(self
.tee_videodecoded
)
298 self
.streampipe
.add(self
.queuev_1
)
299 elif self
.feed
== 'backup':
301 self
.streampipe
.add(self
.videosrc_backup
)
303 self
.streampipe
.add(self
.capsfilter_backup
)
304 print ('BACKUP OK...', end
='')
307 def link_pipeline_elements(self
):
308 """Link all elements with static pads."""
309 print(INFO
, gettime(), 'Pipeline creation state: linking elements... ', end
='')
311 self
.audiosrc
.link(self
.audiolevel
)
312 self
.audiolevel
.link(self
.queuea_1
)
313 self
.queuea_1
.link(self
.vorbisenc
)
314 self
.connect_tee(self
.tee_rawaudio
,
318 self
.queuea_2
.link(self
.oggmux
)
319 self
.connect_tee(self
.tee_streamaudio
,
323 self
.queuea_3
.link(self
.disksink_audio
)
324 self
.queuea_4
.link(self
.icecastsink_audio
)
325 self
.queuea_5
.link(self
.webmmux
)
327 self
.queuev_2
.link(self
.mkvmux
)
328 self
.mkvmux
.link(self
.queuev_4
)
329 self
.queuev_4
.link(self
.disksink_rawvideo
)
330 self
.queuev_3
.link(self
.screensink
)
331 # Stream (audio+video) feed:
332 self
.vp8enc
.link(self
.queuev_5
)
333 self
.queuev_5
.link(self
.webmmux
)
334 self
.connect_tee(self
.tee_streamfull
,
338 self
.queuem_1
.link(self
.disksink_stream
)
339 self
.queuem_2
.link(self
.icecastsink_stream
)
340 if self
.feed
== 'main':
341 # linking here RTSP feed
342 self
.queuev_1
.link(self
.rtpjpegdepay
)
343 self
.connect_tee(self
.tee_rawvideo
,
347 self
.connect_tee(self
.tee_videodecoded
,
351 # Stream (video) feed:
352 self
.scaling
.link(self
.capsfilter
)
353 self
.capsfilter
.link(self
.vp8enc
)
355 elif self
.feed
== 'backup':
356 # linking here backup feed (WEBCAM)
357 self
.videosrc_backup
.link(self
.capsfilter_backup
)
358 self
.connect_tee(self
.tee_rawvideo
,
359 self
.capsfilter_backup
,
362 output_element_3
=self
.vp8enc
)
363 ## self.capsfilter_backup.link(self.queuev_3)
364 # Stream (video) feed:
365 print('BACKUP OK...', end
='')
368 def create_gstreamer_pipeline(self
):
369 # New empty pipeline:
370 self
.streampipe
= Gst
.Pipeline()
371 self
.create_pipeline_elements()
373 self
.add_elements_to_pipeline()
374 self
.link_pipeline_elements()
375 if self
.feed
== 'main':
376 self
.create_pipeline_callbacks()
379 bus
= self
.streampipe
.get_bus()
380 bus
.add_signal_watch()
381 bus
.enable_sync_message_emission()
382 # Used to get messages that GStreamer emits.
383 bus
.connect("message", self
.on_message
)
385 print(INFO
, gettime(), 'Pipeline creation state: successfully done.')
386 return self
.streampipe
388 def on_message(self
, bus
, message
):
390 ## print("[MESSAGE]", message.get_structure().get_name()) # [DEBUG]
393 if t
== Gst
.MessageType
.EOS
:
394 self
.streampipe
.set_state(Gst
.State
.NULL
)
395 elif t
== Gst
.MessageType
.ERROR
:
396 err
, debug
= message
.parse_error()
397 print (ERROR
, '%s' % err
, debug
)
398 # self.streampipe.set_state(Gst.State.NULL)
400 def stream_play(self
):
401 self
.streampipe
.set_state(Gst
.State
.PLAYING
)
402 if self
.feed
== 'backup':
403 print(WARN
, gettime(), 'Backup pipeline started.')
404 print(INFO
, gettime(), 'PLAYING State resquested')
406 def stream_stop(self
):
407 self
.streampipe
.set_state(Gst
.State
.NULL
)
408 print(INFO
, gettime(), 'STOPPED State resquested')
410 def set_filenames(self
, string
, streamfailed
=False):
411 """Sets filename and location for each sink."""
414 audio
= PATHNAME
+ filename
+ '_AUDIO'
415 rawvideo
= PATHNAME
+ filename
+ '_RAWVIDEO'
416 stream
= PATHNAME
+ filename
+ '_STREAM'
417 print('FEED STATE: ', self
.feed
)
418 if self
.feed
== 'main':
419 if streamfailed
and filename
:
420 audio
= audio
+ FAILED_SUFFIX
+ str(fail_counter
)
421 rawvideo
= rawvideo
+ FAILED_SUFFIX
+ str(fail_counter
)
422 stream
= stream
+ FAILED_SUFFIX
+ str(fail_counter
)
423 rename(AUDIO_DEFAULT
, audio
)
424 rename(RAWVIDEO_DEFAULT
, rawvideo
)
425 rename(STREAM_DEFAULT
, stream
)
428 audio
= AUDIO_DEFAULT
+ FAILED_SUFFIX
+ str(fail_counter
)
429 rawvideo
= RAWVIDEO_DEFAULT
+ FAILED_SUFFIX
+ str(fail_counter
)
430 stream
= STREAM_DEFAULT
+ FAILED_SUFFIX
+ str(fail_counter
)
431 rename(AUDIO_DEFAULT
, audio
)
432 rename(RAWVIDEO_DEFAULT
, rawvideo
)
433 rename(STREAM_DEFAULT
, stream
)
436 rename(AUDIO_DEFAULT
, audio
)
437 rename(RAWVIDEO_DEFAULT
, rawvideo
)
438 rename(STREAM_DEFAULT
, stream
)
439 elif self
.feed
== 'backup':
440 print('INSIDE BACKUP RENAMING')
441 rename(AUDIO_BACKUP
, audio
)
442 rename(RAWVIDEO_BACKUP
, rawvideo
)
443 rename(STREAM_BACKUP
, stream
)
448 def get_gstreamer_bus():
452 return strftime('%y-%m-%d_%H:%M:%S ', localtime())