Commit | Line | Data |
---|---|---|
f1d06e1d JW |
1 | # processing.py -- various audio processing functions |
2 | # Copyright (C) 2008 MUSIC TECHNOLOGY GROUP (MTG) | |
3 | # UNIVERSITAT POMPEU FABRA | |
4 | # | |
5 | # This program is free software: you can redistribute it and/or modify | |
6 | # it under the terms of the GNU Affero General Public License as | |
7 | # published by the Free Software Foundation, either version 3 of the | |
8 | # License, or (at your option) any later version. | |
9 | # | |
10 | # This program is distributed in the hope that it will be useful, | |
11 | # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
12 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
13 | # GNU Affero General Public License for more details. | |
14 | # | |
15 | # You should have received a copy of the GNU Affero General Public License | |
16 | # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
17 | # | |
18 | # Authors: | |
19 | # Bram de Jong <bram.dejong at domain.com where domain in gmail> | |
20 | # 2012, Joar Wandborg <first name at last name dot se> | |
21 | ||
22 | from PIL import Image | |
23 | import math | |
24 | import numpy | |
25 | ||
26 | try: | |
27 | import scikits.audiolab as audiolab | |
28 | except ImportError: | |
29 | print "WARNING: audiolab is not installed so wav2png will not work" | |
30 | ||
31 | ||
32 | class AudioProcessingException(Exception): | |
33 | pass | |
34 | ||
35 | ||
36 | class SpectrogramImage(object): | |
37 | def __init__(self, image_size, fft_size): | |
38 | self.image_width, self.image_height = image_size | |
39 | self.fft_size = fft_size | |
40 | ||
41 | colors = [ | |
42 | (0, 0, 0, 0), | |
43 | (58 / 4, 68 / 4, 65 / 4, 255), | |
44 | (80 / 2, 100 / 2, 153 / 2, 255), | |
45 | (90, 180, 100, 255), | |
46 | (224, 224, 44, 255), | |
47 | (255, 60, 30, 255), | |
48 | (255, 255, 255, 255) | |
49 | ] | |
50 | ||
51 | self.palette = interpolate_colors(colors) | |
52 | ||
53 | # Generate lookup table for y-coordinate from fft-bin | |
54 | self.y_to_bin = [] | |
55 | ||
56 | fft_min = 100.0 | |
57 | fft_max = 22050.0 # kHz? | |
58 | ||
59 | y_min = math.log10(fft_min) | |
60 | y_max = math.log10(fft_max) | |
61 | ||
62 | for y in range(self.image_height): | |
63 | freq = math.pow( | |
64 | 10.0, | |
65 | y_min + y / (self.image_height - 1.0) | |
66 | * (y_max - y_min)) | |
67 | ||
68 | fft_bin = freq / fft_max * (self.fft_size / 2 + 1) | |
69 | ||
70 | if fft_bin < self.fft_size / 2: | |
71 | alpha = fft_bin - int(fft_bin) | |
72 | ||
73 | self.y_to_bin.append((int(fft_bin), alpha * 255)) | |
74 | ||
75 | # this is a bit strange, but using image.load()[x,y] = ... is | |
76 | # a lot slower than using image.putadata and then rotating the image | |
77 | # so we store all the pixels in an array and then create the image when saving | |
78 | self.pixels = [] | |
79 | ||
80 | def draw_spectrum(self, x, spectrum): | |
81 | # for all frequencies, draw the pixels | |
82 | for index, alpha in self.y_to_bin: | |
83 | self.pixels.append( | |
84 | self.palette[int((255.0 - alpha) * spectrum[index] | |
85 | + alpha * spectrum[index + 1])]) | |
86 | ||
87 | # if the FFT is too small to fill up the image, fill with black to the top | |
88 | for y in range(len(self.y_to_bin), self.image_height): | |
89 | self.pixels.append(self.palette[0]) | |
90 | ||
91 | def save(self, filename, quality=90): | |
92 | self.image = Image.new( | |
93 | 'RGBA', | |
94 | (self.image_height, self.image_width)) | |
95 | ||
96 | self.image.putdata(self.pixels) | |
97 | self.image.transpose(Image.ROTATE_90).save( | |
98 | filename, | |
99 | quality=quality) | |
100 | ||
101 | ||
102 | class AudioProcessor(object): | |
103 | """ | |
104 | The audio processor processes chunks of audio an calculates the spectrac centroid and the peak | |
105 | samples in that chunk of audio. | |
106 | """ | |
107 | def __init__(self, input_filename, fft_size, window_function=numpy.hanning): | |
108 | max_level = get_max_level(input_filename) | |
109 | ||
110 | self.audio_file = audiolab.Sndfile(input_filename, 'r') | |
111 | self.fft_size = fft_size | |
112 | self.window = window_function(self.fft_size) | |
113 | self.spectrum_range = None | |
114 | self.lower = 100 | |
115 | self.higher = 22050 | |
116 | self.lower_log = math.log10(self.lower) | |
117 | self.higher_log = math.log10(self.higher) | |
118 | self.clip = lambda val, low, high: min(high, max(low, val)) | |
119 | ||
120 | # figure out what the maximum value is for an FFT doing the FFT of a DC signal | |
121 | fft = numpy.fft.rfft(numpy.ones(fft_size) * self.window) | |
122 | max_fft = (numpy.abs(fft)).max() | |
123 | ||
124 | # set the scale to normalized audio and normalized FFT | |
125 | self.scale = 1.0 / max_level / max_fft if max_level > 0 else 1 | |
126 | ||
127 | def read(self, start, size, resize_if_less=False): | |
128 | """ read size samples starting at start, if resize_if_less is True and less than size | |
129 | samples are read, resize the array to size and fill with zeros """ | |
130 | ||
131 | # number of zeros to add to start and end of the buffer | |
132 | add_to_start = 0 | |
133 | add_to_end = 0 | |
134 | ||
135 | if start < 0: | |
136 | # the first FFT window starts centered around zero | |
137 | if size + start <= 0: | |
138 | return numpy.zeros(size) if resize_if_less else numpy.array([]) | |
139 | else: | |
140 | self.audio_file.seek(0) | |
141 | ||
142 | add_to_start = - start # remember: start is negative! | |
143 | to_read = size + start | |
144 | ||
145 | if to_read > self.audio_file.nframes: | |
146 | add_to_end = to_read - self.audio_file.nframes | |
147 | to_read = self.audio_file.nframes | |
148 | else: | |
149 | self.audio_file.seek(start) | |
150 | ||
151 | to_read = size | |
152 | if start + to_read >= self.audio_file.nframes: | |
153 | to_read = self.audio_file.nframes - start | |
154 | add_to_end = size - to_read | |
155 | ||
156 | try: | |
157 | samples = self.audio_file.read_frames(to_read) | |
158 | except RuntimeError: | |
159 | # this can happen for wave files with broken headers... | |
160 | return numpy.zeros(size) if resize_if_less else numpy.zeros(2) | |
161 | ||
162 | # convert to mono by selecting left channel only | |
163 | if self.audio_file.channels > 1: | |
164 | samples = samples[:,0] | |
165 | ||
166 | if resize_if_less and (add_to_start > 0 or add_to_end > 0): | |
167 | if add_to_start > 0: | |
168 | samples = numpy.concatenate((numpy.zeros(add_to_start), samples), axis=1) | |
169 | ||
170 | if add_to_end > 0: | |
171 | samples = numpy.resize(samples, size) | |
172 | samples[size - add_to_end:] = 0 | |
173 | ||
174 | return samples | |
175 | ||
176 | def spectral_centroid(self, seek_point, spec_range=110.0): | |
177 | """ starting at seek_point read fft_size samples, and calculate the spectral centroid """ | |
178 | ||
179 | samples = self.read(seek_point - self.fft_size/2, self.fft_size, True) | |
180 | ||
181 | samples *= self.window | |
182 | fft = numpy.fft.rfft(samples) | |
183 | spectrum = self.scale * numpy.abs(fft) # normalized abs(FFT) between 0 and 1 | |
184 | ||
185 | length = numpy.float64(spectrum.shape[0]) | |
186 | ||
187 | # scale the db spectrum from [- spec_range db ... 0 db] > [0..1] | |
188 | db_spectrum = ((20*(numpy.log10(spectrum + 1e-60))).clip(-spec_range, 0.0) + spec_range)/spec_range | |
189 | ||
190 | energy = spectrum.sum() | |
191 | spectral_centroid = 0 | |
192 | ||
193 | if energy > 1e-60: | |
194 | # calculate the spectral centroid | |
195 | ||
196 | if self.spectrum_range == None: | |
197 | self.spectrum_range = numpy.arange(length) | |
198 | ||
199 | spectral_centroid = (spectrum * self.spectrum_range).sum() / (energy * (length - 1)) * self.audio_file.samplerate * 0.5 | |
200 | ||
201 | # clip > log10 > scale between 0 and 1 | |
202 | spectral_centroid = (math.log10(self.clip(spectral_centroid, self.lower, self.higher)) - self.lower_log) / (self.higher_log - self.lower_log) | |
203 | ||
204 | return (spectral_centroid, db_spectrum) | |
205 | ||
206 | ||
207 | def peaks(self, start_seek, end_seek): | |
208 | """ read all samples between start_seek and end_seek, then find the minimum and maximum peak | |
209 | in that range. Returns that pair in the order they were found. So if min was found first, | |
210 | it returns (min, max) else the other way around. """ | |
211 | ||
212 | # larger blocksizes are faster but take more mem... | |
213 | # Aha, Watson, a clue, a tradeof! | |
214 | block_size = 4096 | |
215 | ||
216 | max_index = -1 | |
217 | max_value = -1 | |
218 | min_index = -1 | |
219 | min_value = 1 | |
220 | ||
221 | if start_seek < 0: | |
222 | start_seek = 0 | |
223 | ||
224 | if end_seek > self.audio_file.nframes: | |
225 | end_seek = self.audio_file.nframes | |
226 | ||
227 | if end_seek <= start_seek: | |
228 | samples = self.read(start_seek, 1) | |
229 | return (samples[0], samples[0]) | |
230 | ||
231 | if block_size > end_seek - start_seek: | |
232 | block_size = end_seek - start_seek | |
233 | ||
234 | for i in range(start_seek, end_seek, block_size): | |
235 | samples = self.read(i, block_size) | |
236 | ||
237 | local_max_index = numpy.argmax(samples) | |
238 | local_max_value = samples[local_max_index] | |
239 | ||
240 | if local_max_value > max_value: | |
241 | max_value = local_max_value | |
242 | max_index = local_max_index | |
243 | ||
244 | local_min_index = numpy.argmin(samples) | |
245 | local_min_value = samples[local_min_index] | |
246 | ||
247 | if local_min_value < min_value: | |
248 | min_value = local_min_value | |
249 | min_index = local_min_index | |
250 | ||
251 | return (min_value, max_value) if min_index < max_index else (max_value, min_value) | |
252 | ||
253 | ||
254 | def create_spectrogram_image(source_filename, output_filename, | |
255 | image_size, fft_size, progress_callback=None): | |
256 | ||
257 | processor = AudioProcessor(source_filename, fft_size, numpy.hamming) | |
258 | samples_per_pixel = processor.audio_file.nframes / float(image_size[0]) | |
259 | ||
260 | spectrogram = SpectrogramImage(image_size, fft_size) | |
261 | ||
262 | for x in range(image_size[0]): | |
263 | if progress_callback and x % (image_size[0] / 10) == 0: | |
264 | progress_callback((x * 100) / image_size[0]) | |
265 | ||
266 | seek_point = int(x * samples_per_pixel) | |
267 | next_seek_point = int((x + 1) * samples_per_pixel) | |
268 | ||
269 | (spectral_centroid, db_spectrum) = processor.spectral_centroid(seek_point) | |
270 | ||
271 | spectrogram.draw_spectrum(x, db_spectrum) | |
272 | ||
273 | if progress_callback: | |
274 | progress_callback(100) | |
275 | ||
276 | spectrogram.save(output_filename) | |
277 | ||
278 | ||
279 | def interpolate_colors(colors, flat=False, num_colors=256): | |
280 | ||
281 | palette = [] | |
282 | ||
283 | for i in range(num_colors): | |
284 | # TODO: What does this do? | |
285 | index = ( | |
286 | (i * | |
287 | (len(colors) - 1) # 7 | |
288 | ) # 0..7..14..21..28... | |
289 | / | |
290 | (num_colors - 1.0) # 255.0 | |
291 | ) | |
292 | ||
293 | # TODO: What is the meaning of 'alpha' in this context? | |
294 | alpha = index - round(index) | |
295 | ||
296 | channels = list('rgb') | |
297 | values = dict() | |
298 | ||
299 | for k, v in zip(range(len(channels)), channels): | |
300 | if alpha > 0: | |
301 | values[v] = ( | |
302 | (1.0 - alpha) | |
303 | * | |
304 | colors[int(index)][k] | |
305 | + | |
306 | alpha * colors[int(index) + 1][k] | |
307 | ) | |
308 | else: | |
309 | values[v] = ( | |
310 | (1.0 - alpha) | |
311 | * | |
312 | colors[int(index)][k] | |
313 | ) | |
314 | ||
315 | if flat: | |
316 | palette.extend( | |
317 | tuple(int(values[i]) for i in channels)) | |
318 | else: | |
319 | palette.append( | |
320 | tuple(int(values[i]) for i in channels)) | |
321 | ||
322 | return palette | |
323 | ||
324 | ||
325 | def get_max_level(filename): | |
326 | max_value = 0 | |
327 | buffer_size = 4096 | |
328 | audio_file = audiolab.Sndfile(filename, 'r') | |
329 | n_samples_left = audio_file.nframes | |
330 | ||
331 | while n_samples_left: | |
332 | to_read = min(buffer_size, n_samples_left) | |
333 | ||
334 | try: | |
335 | samples = audio_file.read_frames(to_read) | |
336 | except RuntimeError: | |
337 | # this can happen with a broken header | |
338 | break | |
339 | ||
340 | # convert to mono by selecting left channel only | |
341 | if audio_file.channels > 1: | |
342 | samples = samples[:,0] | |
343 | ||
344 | max_value = max(max_value, numpy.abs(samples).max()) | |
345 | ||
346 | n_samples_left -= to_read | |
347 | ||
348 | audio_file.close() | |
349 | ||
350 | return max_value | |
351 | ||
352 | if __name__ == '__main__': | |
353 | import sys | |
354 | sys.argv[4] = int(sys.argv[4]) | |
355 | sys.argv[3] = tuple([int(i) for i in sys.argv[3].split('x')]) | |
356 | ||
357 | create_spectrogram_image(*sys.argv[1:]) |