Commit | Line | Data |
---|---|---|
f1d06e1d JW |
1 | # processing.py -- various audio processing functions |
2 | # Copyright (C) 2008 MUSIC TECHNOLOGY GROUP (MTG) | |
3 | # UNIVERSITAT POMPEU FABRA | |
4 | # | |
5 | # This program is free software: you can redistribute it and/or modify | |
6 | # it under the terms of the GNU Affero General Public License as | |
7 | # published by the Free Software Foundation, either version 3 of the | |
8 | # License, or (at your option) any later version. | |
9 | # | |
10 | # This program is distributed in the hope that it will be useful, | |
11 | # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
12 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
13 | # GNU Affero General Public License for more details. | |
14 | # | |
15 | # You should have received a copy of the GNU Affero General Public License | |
16 | # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
17 | # | |
18 | # Authors: | |
19 | # Bram de Jong <bram.dejong at domain.com where domain in gmail> | |
20 | # 2012, Joar Wandborg <first name at last name dot se> | |
21 | ||
d0e9f843 AL |
22 | try: |
23 | from PIL import Image | |
24 | except ImportError: | |
25 | import Image | |
f1d06e1d JW |
26 | import math |
27 | import numpy | |
28 | ||
29 | try: | |
30 | import scikits.audiolab as audiolab | |
31 | except ImportError: | |
32 | print "WARNING: audiolab is not installed so wav2png will not work" | |
33 | ||
34 | ||
35 | class AudioProcessingException(Exception): | |
36 | pass | |
37 | ||
38 | ||
39 | class SpectrogramImage(object): | |
40 | def __init__(self, image_size, fft_size): | |
41 | self.image_width, self.image_height = image_size | |
42 | self.fft_size = fft_size | |
43 | ||
44 | colors = [ | |
45 | (0, 0, 0, 0), | |
46 | (58 / 4, 68 / 4, 65 / 4, 255), | |
47 | (80 / 2, 100 / 2, 153 / 2, 255), | |
48 | (90, 180, 100, 255), | |
49 | (224, 224, 44, 255), | |
50 | (255, 60, 30, 255), | |
51 | (255, 255, 255, 255) | |
52 | ] | |
53 | ||
54 | self.palette = interpolate_colors(colors) | |
55 | ||
56 | # Generate lookup table for y-coordinate from fft-bin | |
57 | self.y_to_bin = [] | |
58 | ||
59 | fft_min = 100.0 | |
60 | fft_max = 22050.0 # kHz? | |
61 | ||
62 | y_min = math.log10(fft_min) | |
63 | y_max = math.log10(fft_max) | |
64 | ||
65 | for y in range(self.image_height): | |
66 | freq = math.pow( | |
67 | 10.0, | |
68 | y_min + y / (self.image_height - 1.0) | |
69 | * (y_max - y_min)) | |
70 | ||
71 | fft_bin = freq / fft_max * (self.fft_size / 2 + 1) | |
72 | ||
73 | if fft_bin < self.fft_size / 2: | |
74 | alpha = fft_bin - int(fft_bin) | |
75 | ||
76 | self.y_to_bin.append((int(fft_bin), alpha * 255)) | |
77 | ||
78 | # this is a bit strange, but using image.load()[x,y] = ... is | |
79 | # a lot slower than using image.putadata and then rotating the image | |
80 | # so we store all the pixels in an array and then create the image when saving | |
81 | self.pixels = [] | |
82 | ||
83 | def draw_spectrum(self, x, spectrum): | |
84 | # for all frequencies, draw the pixels | |
85 | for index, alpha in self.y_to_bin: | |
86 | self.pixels.append( | |
87 | self.palette[int((255.0 - alpha) * spectrum[index] | |
88 | + alpha * spectrum[index + 1])]) | |
89 | ||
90 | # if the FFT is too small to fill up the image, fill with black to the top | |
91 | for y in range(len(self.y_to_bin), self.image_height): | |
92 | self.pixels.append(self.palette[0]) | |
93 | ||
94 | def save(self, filename, quality=90): | |
95 | self.image = Image.new( | |
96 | 'RGBA', | |
97 | (self.image_height, self.image_width)) | |
98 | ||
99 | self.image.putdata(self.pixels) | |
100 | self.image.transpose(Image.ROTATE_90).save( | |
101 | filename, | |
102 | quality=quality) | |
103 | ||
104 | ||
105 | class AudioProcessor(object): | |
106 | """ | |
107 | The audio processor processes chunks of audio an calculates the spectrac centroid and the peak | |
108 | samples in that chunk of audio. | |
109 | """ | |
110 | def __init__(self, input_filename, fft_size, window_function=numpy.hanning): | |
111 | max_level = get_max_level(input_filename) | |
112 | ||
113 | self.audio_file = audiolab.Sndfile(input_filename, 'r') | |
114 | self.fft_size = fft_size | |
115 | self.window = window_function(self.fft_size) | |
116 | self.spectrum_range = None | |
117 | self.lower = 100 | |
118 | self.higher = 22050 | |
119 | self.lower_log = math.log10(self.lower) | |
120 | self.higher_log = math.log10(self.higher) | |
121 | self.clip = lambda val, low, high: min(high, max(low, val)) | |
122 | ||
123 | # figure out what the maximum value is for an FFT doing the FFT of a DC signal | |
124 | fft = numpy.fft.rfft(numpy.ones(fft_size) * self.window) | |
125 | max_fft = (numpy.abs(fft)).max() | |
126 | ||
127 | # set the scale to normalized audio and normalized FFT | |
128 | self.scale = 1.0 / max_level / max_fft if max_level > 0 else 1 | |
129 | ||
130 | def read(self, start, size, resize_if_less=False): | |
131 | """ read size samples starting at start, if resize_if_less is True and less than size | |
132 | samples are read, resize the array to size and fill with zeros """ | |
133 | ||
134 | # number of zeros to add to start and end of the buffer | |
135 | add_to_start = 0 | |
136 | add_to_end = 0 | |
137 | ||
138 | if start < 0: | |
139 | # the first FFT window starts centered around zero | |
140 | if size + start <= 0: | |
141 | return numpy.zeros(size) if resize_if_less else numpy.array([]) | |
142 | else: | |
143 | self.audio_file.seek(0) | |
144 | ||
145 | add_to_start = - start # remember: start is negative! | |
146 | to_read = size + start | |
147 | ||
148 | if to_read > self.audio_file.nframes: | |
149 | add_to_end = to_read - self.audio_file.nframes | |
150 | to_read = self.audio_file.nframes | |
151 | else: | |
152 | self.audio_file.seek(start) | |
153 | ||
154 | to_read = size | |
155 | if start + to_read >= self.audio_file.nframes: | |
156 | to_read = self.audio_file.nframes - start | |
157 | add_to_end = size - to_read | |
158 | ||
159 | try: | |
160 | samples = self.audio_file.read_frames(to_read) | |
161 | except RuntimeError: | |
162 | # this can happen for wave files with broken headers... | |
163 | return numpy.zeros(size) if resize_if_less else numpy.zeros(2) | |
164 | ||
165 | # convert to mono by selecting left channel only | |
166 | if self.audio_file.channels > 1: | |
167 | samples = samples[:,0] | |
168 | ||
169 | if resize_if_less and (add_to_start > 0 or add_to_end > 0): | |
170 | if add_to_start > 0: | |
171 | samples = numpy.concatenate((numpy.zeros(add_to_start), samples), axis=1) | |
172 | ||
173 | if add_to_end > 0: | |
174 | samples = numpy.resize(samples, size) | |
175 | samples[size - add_to_end:] = 0 | |
176 | ||
177 | return samples | |
178 | ||
179 | def spectral_centroid(self, seek_point, spec_range=110.0): | |
180 | """ starting at seek_point read fft_size samples, and calculate the spectral centroid """ | |
181 | ||
182 | samples = self.read(seek_point - self.fft_size/2, self.fft_size, True) | |
183 | ||
184 | samples *= self.window | |
185 | fft = numpy.fft.rfft(samples) | |
186 | spectrum = self.scale * numpy.abs(fft) # normalized abs(FFT) between 0 and 1 | |
187 | ||
188 | length = numpy.float64(spectrum.shape[0]) | |
189 | ||
190 | # scale the db spectrum from [- spec_range db ... 0 db] > [0..1] | |
191 | db_spectrum = ((20*(numpy.log10(spectrum + 1e-60))).clip(-spec_range, 0.0) + spec_range)/spec_range | |
192 | ||
193 | energy = spectrum.sum() | |
194 | spectral_centroid = 0 | |
195 | ||
196 | if energy > 1e-60: | |
197 | # calculate the spectral centroid | |
198 | ||
199 | if self.spectrum_range == None: | |
200 | self.spectrum_range = numpy.arange(length) | |
201 | ||
202 | spectral_centroid = (spectrum * self.spectrum_range).sum() / (energy * (length - 1)) * self.audio_file.samplerate * 0.5 | |
203 | ||
204 | # clip > log10 > scale between 0 and 1 | |
205 | spectral_centroid = (math.log10(self.clip(spectral_centroid, self.lower, self.higher)) - self.lower_log) / (self.higher_log - self.lower_log) | |
206 | ||
207 | return (spectral_centroid, db_spectrum) | |
208 | ||
209 | ||
210 | def peaks(self, start_seek, end_seek): | |
211 | """ read all samples between start_seek and end_seek, then find the minimum and maximum peak | |
212 | in that range. Returns that pair in the order they were found. So if min was found first, | |
213 | it returns (min, max) else the other way around. """ | |
214 | ||
215 | # larger blocksizes are faster but take more mem... | |
216 | # Aha, Watson, a clue, a tradeof! | |
217 | block_size = 4096 | |
218 | ||
219 | max_index = -1 | |
220 | max_value = -1 | |
221 | min_index = -1 | |
222 | min_value = 1 | |
223 | ||
224 | if start_seek < 0: | |
225 | start_seek = 0 | |
226 | ||
227 | if end_seek > self.audio_file.nframes: | |
228 | end_seek = self.audio_file.nframes | |
229 | ||
230 | if end_seek <= start_seek: | |
231 | samples = self.read(start_seek, 1) | |
232 | return (samples[0], samples[0]) | |
233 | ||
234 | if block_size > end_seek - start_seek: | |
235 | block_size = end_seek - start_seek | |
236 | ||
237 | for i in range(start_seek, end_seek, block_size): | |
238 | samples = self.read(i, block_size) | |
239 | ||
240 | local_max_index = numpy.argmax(samples) | |
241 | local_max_value = samples[local_max_index] | |
242 | ||
243 | if local_max_value > max_value: | |
244 | max_value = local_max_value | |
245 | max_index = local_max_index | |
246 | ||
247 | local_min_index = numpy.argmin(samples) | |
248 | local_min_value = samples[local_min_index] | |
249 | ||
250 | if local_min_value < min_value: | |
251 | min_value = local_min_value | |
252 | min_index = local_min_index | |
253 | ||
254 | return (min_value, max_value) if min_index < max_index else (max_value, min_value) | |
255 | ||
256 | ||
257 | def create_spectrogram_image(source_filename, output_filename, | |
258 | image_size, fft_size, progress_callback=None): | |
259 | ||
260 | processor = AudioProcessor(source_filename, fft_size, numpy.hamming) | |
261 | samples_per_pixel = processor.audio_file.nframes / float(image_size[0]) | |
262 | ||
263 | spectrogram = SpectrogramImage(image_size, fft_size) | |
264 | ||
265 | for x in range(image_size[0]): | |
266 | if progress_callback and x % (image_size[0] / 10) == 0: | |
267 | progress_callback((x * 100) / image_size[0]) | |
268 | ||
269 | seek_point = int(x * samples_per_pixel) | |
270 | next_seek_point = int((x + 1) * samples_per_pixel) | |
271 | ||
272 | (spectral_centroid, db_spectrum) = processor.spectral_centroid(seek_point) | |
273 | ||
274 | spectrogram.draw_spectrum(x, db_spectrum) | |
275 | ||
276 | if progress_callback: | |
277 | progress_callback(100) | |
278 | ||
279 | spectrogram.save(output_filename) | |
280 | ||
281 | ||
282 | def interpolate_colors(colors, flat=False, num_colors=256): | |
283 | ||
284 | palette = [] | |
285 | ||
286 | for i in range(num_colors): | |
287 | # TODO: What does this do? | |
288 | index = ( | |
289 | (i * | |
290 | (len(colors) - 1) # 7 | |
291 | ) # 0..7..14..21..28... | |
292 | / | |
293 | (num_colors - 1.0) # 255.0 | |
294 | ) | |
295 | ||
296 | # TODO: What is the meaning of 'alpha' in this context? | |
297 | alpha = index - round(index) | |
298 | ||
299 | channels = list('rgb') | |
300 | values = dict() | |
301 | ||
302 | for k, v in zip(range(len(channels)), channels): | |
303 | if alpha > 0: | |
304 | values[v] = ( | |
305 | (1.0 - alpha) | |
306 | * | |
307 | colors[int(index)][k] | |
308 | + | |
309 | alpha * colors[int(index) + 1][k] | |
310 | ) | |
311 | else: | |
312 | values[v] = ( | |
313 | (1.0 - alpha) | |
314 | * | |
315 | colors[int(index)][k] | |
316 | ) | |
317 | ||
318 | if flat: | |
319 | palette.extend( | |
320 | tuple(int(values[i]) for i in channels)) | |
321 | else: | |
322 | palette.append( | |
323 | tuple(int(values[i]) for i in channels)) | |
324 | ||
325 | return palette | |
326 | ||
327 | ||
328 | def get_max_level(filename): | |
329 | max_value = 0 | |
330 | buffer_size = 4096 | |
331 | audio_file = audiolab.Sndfile(filename, 'r') | |
332 | n_samples_left = audio_file.nframes | |
333 | ||
334 | while n_samples_left: | |
335 | to_read = min(buffer_size, n_samples_left) | |
336 | ||
337 | try: | |
338 | samples = audio_file.read_frames(to_read) | |
339 | except RuntimeError: | |
340 | # this can happen with a broken header | |
341 | break | |
342 | ||
343 | # convert to mono by selecting left channel only | |
344 | if audio_file.channels > 1: | |
345 | samples = samples[:,0] | |
346 | ||
347 | max_value = max(max_value, numpy.abs(samples).max()) | |
348 | ||
349 | n_samples_left -= to_read | |
350 | ||
351 | audio_file.close() | |
352 | ||
353 | return max_value | |
354 | ||
355 | if __name__ == '__main__': | |
356 | import sys | |
357 | sys.argv[4] = int(sys.argv[4]) | |
358 | sys.argv[3] = tuple([int(i) for i in sys.argv[3].split('x')]) | |
359 | ||
360 | create_spectrogram_image(*sys.argv[1:]) |