size should be a tuple
[mediagoblin.git] / mediagoblin / media_types / audio / spectrogram.py
CommitLineData
f1d06e1d
JW
1# processing.py -- various audio processing functions
2# Copyright (C) 2008 MUSIC TECHNOLOGY GROUP (MTG)
3# UNIVERSITAT POMPEU FABRA
4#
5# This program is free software: you can redistribute it and/or modify
6# it under the terms of the GNU Affero General Public License as
7# published by the Free Software Foundation, either version 3 of the
8# License, or (at your option) any later version.
9#
10# This program is distributed in the hope that it will be useful,
11# but WITHOUT ANY WARRANTY; without even the implied warranty of
12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13# GNU Affero General Public License for more details.
14#
15# You should have received a copy of the GNU Affero General Public License
16# along with this program. If not, see <http://www.gnu.org/licenses/>.
17#
18# Authors:
19# Bram de Jong <bram.dejong at domain.com where domain in gmail>
20# 2012, Joar Wandborg <first name at last name dot se>
21
d0e9f843
AL
22try:
23 from PIL import Image
24except ImportError:
25 import Image
f1d06e1d
JW
26import math
27import numpy
28
29try:
30 import scikits.audiolab as audiolab
31except ImportError:
32 print "WARNING: audiolab is not installed so wav2png will not work"
33
34
35class AudioProcessingException(Exception):
36 pass
37
38
39class SpectrogramImage(object):
40 def __init__(self, image_size, fft_size):
41 self.image_width, self.image_height = image_size
42 self.fft_size = fft_size
43
44 colors = [
45 (0, 0, 0, 0),
46 (58 / 4, 68 / 4, 65 / 4, 255),
47 (80 / 2, 100 / 2, 153 / 2, 255),
48 (90, 180, 100, 255),
49 (224, 224, 44, 255),
50 (255, 60, 30, 255),
51 (255, 255, 255, 255)
52 ]
53
54 self.palette = interpolate_colors(colors)
55
56 # Generate lookup table for y-coordinate from fft-bin
57 self.y_to_bin = []
58
59 fft_min = 100.0
60 fft_max = 22050.0 # kHz?
61
62 y_min = math.log10(fft_min)
63 y_max = math.log10(fft_max)
64
65 for y in range(self.image_height):
66 freq = math.pow(
67 10.0,
68 y_min + y / (self.image_height - 1.0)
69 * (y_max - y_min))
70
71 fft_bin = freq / fft_max * (self.fft_size / 2 + 1)
72
73 if fft_bin < self.fft_size / 2:
74 alpha = fft_bin - int(fft_bin)
75
76 self.y_to_bin.append((int(fft_bin), alpha * 255))
77
78 # this is a bit strange, but using image.load()[x,y] = ... is
79 # a lot slower than using image.putadata and then rotating the image
80 # so we store all the pixels in an array and then create the image when saving
81 self.pixels = []
82
83 def draw_spectrum(self, x, spectrum):
84 # for all frequencies, draw the pixels
85 for index, alpha in self.y_to_bin:
86 self.pixels.append(
87 self.palette[int((255.0 - alpha) * spectrum[index]
88 + alpha * spectrum[index + 1])])
89
90 # if the FFT is too small to fill up the image, fill with black to the top
91 for y in range(len(self.y_to_bin), self.image_height):
92 self.pixels.append(self.palette[0])
93
94 def save(self, filename, quality=90):
95 self.image = Image.new(
96 'RGBA',
97 (self.image_height, self.image_width))
98
99 self.image.putdata(self.pixels)
100 self.image.transpose(Image.ROTATE_90).save(
101 filename,
102 quality=quality)
103
104
105class AudioProcessor(object):
106 """
107 The audio processor processes chunks of audio an calculates the spectrac centroid and the peak
108 samples in that chunk of audio.
109 """
110 def __init__(self, input_filename, fft_size, window_function=numpy.hanning):
111 max_level = get_max_level(input_filename)
112
113 self.audio_file = audiolab.Sndfile(input_filename, 'r')
114 self.fft_size = fft_size
115 self.window = window_function(self.fft_size)
116 self.spectrum_range = None
117 self.lower = 100
118 self.higher = 22050
119 self.lower_log = math.log10(self.lower)
120 self.higher_log = math.log10(self.higher)
121 self.clip = lambda val, low, high: min(high, max(low, val))
122
123 # figure out what the maximum value is for an FFT doing the FFT of a DC signal
124 fft = numpy.fft.rfft(numpy.ones(fft_size) * self.window)
125 max_fft = (numpy.abs(fft)).max()
126
127 # set the scale to normalized audio and normalized FFT
128 self.scale = 1.0 / max_level / max_fft if max_level > 0 else 1
129
130 def read(self, start, size, resize_if_less=False):
131 """ read size samples starting at start, if resize_if_less is True and less than size
132 samples are read, resize the array to size and fill with zeros """
133
134 # number of zeros to add to start and end of the buffer
135 add_to_start = 0
136 add_to_end = 0
137
138 if start < 0:
139 # the first FFT window starts centered around zero
140 if size + start <= 0:
141 return numpy.zeros(size) if resize_if_less else numpy.array([])
142 else:
143 self.audio_file.seek(0)
144
145 add_to_start = - start # remember: start is negative!
146 to_read = size + start
147
148 if to_read > self.audio_file.nframes:
149 add_to_end = to_read - self.audio_file.nframes
150 to_read = self.audio_file.nframes
151 else:
152 self.audio_file.seek(start)
153
154 to_read = size
155 if start + to_read >= self.audio_file.nframes:
156 to_read = self.audio_file.nframes - start
157 add_to_end = size - to_read
158
159 try:
160 samples = self.audio_file.read_frames(to_read)
161 except RuntimeError:
162 # this can happen for wave files with broken headers...
163 return numpy.zeros(size) if resize_if_less else numpy.zeros(2)
164
165 # convert to mono by selecting left channel only
166 if self.audio_file.channels > 1:
167 samples = samples[:,0]
168
169 if resize_if_less and (add_to_start > 0 or add_to_end > 0):
170 if add_to_start > 0:
171 samples = numpy.concatenate((numpy.zeros(add_to_start), samples), axis=1)
172
173 if add_to_end > 0:
174 samples = numpy.resize(samples, size)
175 samples[size - add_to_end:] = 0
176
177 return samples
178
179 def spectral_centroid(self, seek_point, spec_range=110.0):
180 """ starting at seek_point read fft_size samples, and calculate the spectral centroid """
181
182 samples = self.read(seek_point - self.fft_size/2, self.fft_size, True)
183
184 samples *= self.window
185 fft = numpy.fft.rfft(samples)
186 spectrum = self.scale * numpy.abs(fft) # normalized abs(FFT) between 0 and 1
187
188 length = numpy.float64(spectrum.shape[0])
189
190 # scale the db spectrum from [- spec_range db ... 0 db] > [0..1]
191 db_spectrum = ((20*(numpy.log10(spectrum + 1e-60))).clip(-spec_range, 0.0) + spec_range)/spec_range
192
193 energy = spectrum.sum()
194 spectral_centroid = 0
195
196 if energy > 1e-60:
197 # calculate the spectral centroid
198
199 if self.spectrum_range == None:
200 self.spectrum_range = numpy.arange(length)
201
202 spectral_centroid = (spectrum * self.spectrum_range).sum() / (energy * (length - 1)) * self.audio_file.samplerate * 0.5
203
204 # clip > log10 > scale between 0 and 1
205 spectral_centroid = (math.log10(self.clip(spectral_centroid, self.lower, self.higher)) - self.lower_log) / (self.higher_log - self.lower_log)
206
207 return (spectral_centroid, db_spectrum)
208
209
210 def peaks(self, start_seek, end_seek):
211 """ read all samples between start_seek and end_seek, then find the minimum and maximum peak
212 in that range. Returns that pair in the order they were found. So if min was found first,
213 it returns (min, max) else the other way around. """
214
215 # larger blocksizes are faster but take more mem...
216 # Aha, Watson, a clue, a tradeof!
217 block_size = 4096
218
219 max_index = -1
220 max_value = -1
221 min_index = -1
222 min_value = 1
223
224 if start_seek < 0:
225 start_seek = 0
226
227 if end_seek > self.audio_file.nframes:
228 end_seek = self.audio_file.nframes
229
230 if end_seek <= start_seek:
231 samples = self.read(start_seek, 1)
232 return (samples[0], samples[0])
233
234 if block_size > end_seek - start_seek:
235 block_size = end_seek - start_seek
236
237 for i in range(start_seek, end_seek, block_size):
238 samples = self.read(i, block_size)
239
240 local_max_index = numpy.argmax(samples)
241 local_max_value = samples[local_max_index]
242
243 if local_max_value > max_value:
244 max_value = local_max_value
245 max_index = local_max_index
246
247 local_min_index = numpy.argmin(samples)
248 local_min_value = samples[local_min_index]
249
250 if local_min_value < min_value:
251 min_value = local_min_value
252 min_index = local_min_index
253
254 return (min_value, max_value) if min_index < max_index else (max_value, min_value)
255
256
257def create_spectrogram_image(source_filename, output_filename,
258 image_size, fft_size, progress_callback=None):
259
260 processor = AudioProcessor(source_filename, fft_size, numpy.hamming)
261 samples_per_pixel = processor.audio_file.nframes / float(image_size[0])
262
263 spectrogram = SpectrogramImage(image_size, fft_size)
264
265 for x in range(image_size[0]):
266 if progress_callback and x % (image_size[0] / 10) == 0:
267 progress_callback((x * 100) / image_size[0])
268
269 seek_point = int(x * samples_per_pixel)
270 next_seek_point = int((x + 1) * samples_per_pixel)
271
272 (spectral_centroid, db_spectrum) = processor.spectral_centroid(seek_point)
273
274 spectrogram.draw_spectrum(x, db_spectrum)
275
276 if progress_callback:
277 progress_callback(100)
278
279 spectrogram.save(output_filename)
280
281
282def interpolate_colors(colors, flat=False, num_colors=256):
283
284 palette = []
285
286 for i in range(num_colors):
287 # TODO: What does this do?
288 index = (
289 (i *
290 (len(colors) - 1) # 7
291 ) # 0..7..14..21..28...
292 /
293 (num_colors - 1.0) # 255.0
294 )
295
296 # TODO: What is the meaning of 'alpha' in this context?
297 alpha = index - round(index)
298
299 channels = list('rgb')
300 values = dict()
301
302 for k, v in zip(range(len(channels)), channels):
303 if alpha > 0:
304 values[v] = (
305 (1.0 - alpha)
306 *
307 colors[int(index)][k]
308 +
309 alpha * colors[int(index) + 1][k]
310 )
311 else:
312 values[v] = (
313 (1.0 - alpha)
314 *
315 colors[int(index)][k]
316 )
317
318 if flat:
319 palette.extend(
320 tuple(int(values[i]) for i in channels))
321 else:
322 palette.append(
323 tuple(int(values[i]) for i in channels))
324
325 return palette
326
327
328def get_max_level(filename):
329 max_value = 0
330 buffer_size = 4096
331 audio_file = audiolab.Sndfile(filename, 'r')
332 n_samples_left = audio_file.nframes
333
334 while n_samples_left:
335 to_read = min(buffer_size, n_samples_left)
336
337 try:
338 samples = audio_file.read_frames(to_read)
339 except RuntimeError:
340 # this can happen with a broken header
341 break
342
343 # convert to mono by selecting left channel only
344 if audio_file.channels > 1:
345 samples = samples[:,0]
346
347 max_value = max(max_value, numpy.abs(samples).max())
348
349 n_samples_left -= to_read
350
351 audio_file.close()
352
353 return max_value
354
355if __name__ == '__main__':
356 import sys
357 sys.argv[4] = int(sys.argv[4])
358 sys.argv[3] = tuple([int(i) for i in sys.argv[3].split('x')])
359
360 create_spectrogram_image(*sys.argv[1:])