| 1 | # processing.py -- various audio processing functions |
| 2 | # Copyright (C) 2008 MUSIC TECHNOLOGY GROUP (MTG) |
| 3 | # UNIVERSITAT POMPEU FABRA |
| 4 | # |
| 5 | # This program is free software: you can redistribute it and/or modify |
| 6 | # it under the terms of the GNU Affero General Public License as |
| 7 | # published by the Free Software Foundation, either version 3 of the |
| 8 | # License, or (at your option) any later version. |
| 9 | # |
| 10 | # This program is distributed in the hope that it will be useful, |
| 11 | # but WITHOUT ANY WARRANTY; without even the implied warranty of |
| 12 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
| 13 | # GNU Affero General Public License for more details. |
| 14 | # |
| 15 | # You should have received a copy of the GNU Affero General Public License |
| 16 | # along with this program. If not, see <http://www.gnu.org/licenses/>. |
| 17 | # |
| 18 | # Authors: |
| 19 | # Bram de Jong <bram.dejong at domain.com where domain in gmail> |
| 20 | # 2012, Joar Wandborg <first name at last name dot se> |
| 21 | |
| 22 | from PIL import Image |
| 23 | import math |
| 24 | import numpy |
| 25 | |
| 26 | try: |
| 27 | import scikits.audiolab as audiolab |
| 28 | except ImportError: |
| 29 | print "WARNING: audiolab is not installed so wav2png will not work" |
| 30 | |
| 31 | |
| 32 | class AudioProcessingException(Exception): |
| 33 | pass |
| 34 | |
| 35 | |
| 36 | class SpectrogramImage(object): |
| 37 | def __init__(self, image_size, fft_size): |
| 38 | self.image_width, self.image_height = image_size |
| 39 | self.fft_size = fft_size |
| 40 | |
| 41 | colors = [ |
| 42 | (0, 0, 0, 0), |
| 43 | (58 / 4, 68 / 4, 65 / 4, 255), |
| 44 | (80 / 2, 100 / 2, 153 / 2, 255), |
| 45 | (90, 180, 100, 255), |
| 46 | (224, 224, 44, 255), |
| 47 | (255, 60, 30, 255), |
| 48 | (255, 255, 255, 255) |
| 49 | ] |
| 50 | |
| 51 | self.palette = interpolate_colors(colors) |
| 52 | |
| 53 | # Generate lookup table for y-coordinate from fft-bin |
| 54 | self.y_to_bin = [] |
| 55 | |
| 56 | fft_min = 100.0 |
| 57 | fft_max = 22050.0 # kHz? |
| 58 | |
| 59 | y_min = math.log10(fft_min) |
| 60 | y_max = math.log10(fft_max) |
| 61 | |
| 62 | for y in range(self.image_height): |
| 63 | freq = math.pow( |
| 64 | 10.0, |
| 65 | y_min + y / (self.image_height - 1.0) |
| 66 | * (y_max - y_min)) |
| 67 | |
| 68 | fft_bin = freq / fft_max * (self.fft_size / 2 + 1) |
| 69 | |
| 70 | if fft_bin < self.fft_size / 2: |
| 71 | alpha = fft_bin - int(fft_bin) |
| 72 | |
| 73 | self.y_to_bin.append((int(fft_bin), alpha * 255)) |
| 74 | |
| 75 | # this is a bit strange, but using image.load()[x,y] = ... is |
| 76 | # a lot slower than using image.putadata and then rotating the image |
| 77 | # so we store all the pixels in an array and then create the image when saving |
| 78 | self.pixels = [] |
| 79 | |
| 80 | def draw_spectrum(self, x, spectrum): |
| 81 | # for all frequencies, draw the pixels |
| 82 | for index, alpha in self.y_to_bin: |
| 83 | self.pixels.append( |
| 84 | self.palette[int((255.0 - alpha) * spectrum[index] |
| 85 | + alpha * spectrum[index + 1])]) |
| 86 | |
| 87 | # if the FFT is too small to fill up the image, fill with black to the top |
| 88 | for y in range(len(self.y_to_bin), self.image_height): |
| 89 | self.pixels.append(self.palette[0]) |
| 90 | |
| 91 | def save(self, filename, quality=90): |
| 92 | self.image = Image.new( |
| 93 | 'RGBA', |
| 94 | (self.image_height, self.image_width)) |
| 95 | |
| 96 | self.image.putdata(self.pixels) |
| 97 | self.image.transpose(Image.ROTATE_90).save( |
| 98 | filename, |
| 99 | quality=quality) |
| 100 | |
| 101 | |
| 102 | class AudioProcessor(object): |
| 103 | """ |
| 104 | The audio processor processes chunks of audio an calculates the spectrac centroid and the peak |
| 105 | samples in that chunk of audio. |
| 106 | """ |
| 107 | def __init__(self, input_filename, fft_size, window_function=numpy.hanning): |
| 108 | max_level = get_max_level(input_filename) |
| 109 | |
| 110 | self.audio_file = audiolab.Sndfile(input_filename, 'r') |
| 111 | self.fft_size = fft_size |
| 112 | self.window = window_function(self.fft_size) |
| 113 | self.spectrum_range = None |
| 114 | self.lower = 100 |
| 115 | self.higher = 22050 |
| 116 | self.lower_log = math.log10(self.lower) |
| 117 | self.higher_log = math.log10(self.higher) |
| 118 | self.clip = lambda val, low, high: min(high, max(low, val)) |
| 119 | |
| 120 | # figure out what the maximum value is for an FFT doing the FFT of a DC signal |
| 121 | fft = numpy.fft.rfft(numpy.ones(fft_size) * self.window) |
| 122 | max_fft = (numpy.abs(fft)).max() |
| 123 | |
| 124 | # set the scale to normalized audio and normalized FFT |
| 125 | self.scale = 1.0 / max_level / max_fft if max_level > 0 else 1 |
| 126 | |
| 127 | def read(self, start, size, resize_if_less=False): |
| 128 | """ read size samples starting at start, if resize_if_less is True and less than size |
| 129 | samples are read, resize the array to size and fill with zeros """ |
| 130 | |
| 131 | # number of zeros to add to start and end of the buffer |
| 132 | add_to_start = 0 |
| 133 | add_to_end = 0 |
| 134 | |
| 135 | if start < 0: |
| 136 | # the first FFT window starts centered around zero |
| 137 | if size + start <= 0: |
| 138 | return numpy.zeros(size) if resize_if_less else numpy.array([]) |
| 139 | else: |
| 140 | self.audio_file.seek(0) |
| 141 | |
| 142 | add_to_start = - start # remember: start is negative! |
| 143 | to_read = size + start |
| 144 | |
| 145 | if to_read > self.audio_file.nframes: |
| 146 | add_to_end = to_read - self.audio_file.nframes |
| 147 | to_read = self.audio_file.nframes |
| 148 | else: |
| 149 | self.audio_file.seek(start) |
| 150 | |
| 151 | to_read = size |
| 152 | if start + to_read >= self.audio_file.nframes: |
| 153 | to_read = self.audio_file.nframes - start |
| 154 | add_to_end = size - to_read |
| 155 | |
| 156 | try: |
| 157 | samples = self.audio_file.read_frames(to_read) |
| 158 | except RuntimeError: |
| 159 | # this can happen for wave files with broken headers... |
| 160 | return numpy.zeros(size) if resize_if_less else numpy.zeros(2) |
| 161 | |
| 162 | # convert to mono by selecting left channel only |
| 163 | if self.audio_file.channels > 1: |
| 164 | samples = samples[:,0] |
| 165 | |
| 166 | if resize_if_less and (add_to_start > 0 or add_to_end > 0): |
| 167 | if add_to_start > 0: |
| 168 | samples = numpy.concatenate((numpy.zeros(add_to_start), samples), axis=1) |
| 169 | |
| 170 | if add_to_end > 0: |
| 171 | samples = numpy.resize(samples, size) |
| 172 | samples[size - add_to_end:] = 0 |
| 173 | |
| 174 | return samples |
| 175 | |
| 176 | def spectral_centroid(self, seek_point, spec_range=110.0): |
| 177 | """ starting at seek_point read fft_size samples, and calculate the spectral centroid """ |
| 178 | |
| 179 | samples = self.read(seek_point - self.fft_size/2, self.fft_size, True) |
| 180 | |
| 181 | samples *= self.window |
| 182 | fft = numpy.fft.rfft(samples) |
| 183 | spectrum = self.scale * numpy.abs(fft) # normalized abs(FFT) between 0 and 1 |
| 184 | |
| 185 | length = numpy.float64(spectrum.shape[0]) |
| 186 | |
| 187 | # scale the db spectrum from [- spec_range db ... 0 db] > [0..1] |
| 188 | db_spectrum = ((20*(numpy.log10(spectrum + 1e-60))).clip(-spec_range, 0.0) + spec_range)/spec_range |
| 189 | |
| 190 | energy = spectrum.sum() |
| 191 | spectral_centroid = 0 |
| 192 | |
| 193 | if energy > 1e-60: |
| 194 | # calculate the spectral centroid |
| 195 | |
| 196 | if self.spectrum_range == None: |
| 197 | self.spectrum_range = numpy.arange(length) |
| 198 | |
| 199 | spectral_centroid = (spectrum * self.spectrum_range).sum() / (energy * (length - 1)) * self.audio_file.samplerate * 0.5 |
| 200 | |
| 201 | # clip > log10 > scale between 0 and 1 |
| 202 | spectral_centroid = (math.log10(self.clip(spectral_centroid, self.lower, self.higher)) - self.lower_log) / (self.higher_log - self.lower_log) |
| 203 | |
| 204 | return (spectral_centroid, db_spectrum) |
| 205 | |
| 206 | |
| 207 | def peaks(self, start_seek, end_seek): |
| 208 | """ read all samples between start_seek and end_seek, then find the minimum and maximum peak |
| 209 | in that range. Returns that pair in the order they were found. So if min was found first, |
| 210 | it returns (min, max) else the other way around. """ |
| 211 | |
| 212 | # larger blocksizes are faster but take more mem... |
| 213 | # Aha, Watson, a clue, a tradeof! |
| 214 | block_size = 4096 |
| 215 | |
| 216 | max_index = -1 |
| 217 | max_value = -1 |
| 218 | min_index = -1 |
| 219 | min_value = 1 |
| 220 | |
| 221 | if start_seek < 0: |
| 222 | start_seek = 0 |
| 223 | |
| 224 | if end_seek > self.audio_file.nframes: |
| 225 | end_seek = self.audio_file.nframes |
| 226 | |
| 227 | if end_seek <= start_seek: |
| 228 | samples = self.read(start_seek, 1) |
| 229 | return (samples[0], samples[0]) |
| 230 | |
| 231 | if block_size > end_seek - start_seek: |
| 232 | block_size = end_seek - start_seek |
| 233 | |
| 234 | for i in range(start_seek, end_seek, block_size): |
| 235 | samples = self.read(i, block_size) |
| 236 | |
| 237 | local_max_index = numpy.argmax(samples) |
| 238 | local_max_value = samples[local_max_index] |
| 239 | |
| 240 | if local_max_value > max_value: |
| 241 | max_value = local_max_value |
| 242 | max_index = local_max_index |
| 243 | |
| 244 | local_min_index = numpy.argmin(samples) |
| 245 | local_min_value = samples[local_min_index] |
| 246 | |
| 247 | if local_min_value < min_value: |
| 248 | min_value = local_min_value |
| 249 | min_index = local_min_index |
| 250 | |
| 251 | return (min_value, max_value) if min_index < max_index else (max_value, min_value) |
| 252 | |
| 253 | |
| 254 | def create_spectrogram_image(source_filename, output_filename, |
| 255 | image_size, fft_size, progress_callback=None): |
| 256 | |
| 257 | processor = AudioProcessor(source_filename, fft_size, numpy.hamming) |
| 258 | samples_per_pixel = processor.audio_file.nframes / float(image_size[0]) |
| 259 | |
| 260 | spectrogram = SpectrogramImage(image_size, fft_size) |
| 261 | |
| 262 | for x in range(image_size[0]): |
| 263 | if progress_callback and x % (image_size[0] / 10) == 0: |
| 264 | progress_callback((x * 100) / image_size[0]) |
| 265 | |
| 266 | seek_point = int(x * samples_per_pixel) |
| 267 | next_seek_point = int((x + 1) * samples_per_pixel) |
| 268 | |
| 269 | (spectral_centroid, db_spectrum) = processor.spectral_centroid(seek_point) |
| 270 | |
| 271 | spectrogram.draw_spectrum(x, db_spectrum) |
| 272 | |
| 273 | if progress_callback: |
| 274 | progress_callback(100) |
| 275 | |
| 276 | spectrogram.save(output_filename) |
| 277 | |
| 278 | |
| 279 | def interpolate_colors(colors, flat=False, num_colors=256): |
| 280 | |
| 281 | palette = [] |
| 282 | |
| 283 | for i in range(num_colors): |
| 284 | # TODO: What does this do? |
| 285 | index = ( |
| 286 | (i * |
| 287 | (len(colors) - 1) # 7 |
| 288 | ) # 0..7..14..21..28... |
| 289 | / |
| 290 | (num_colors - 1.0) # 255.0 |
| 291 | ) |
| 292 | |
| 293 | # TODO: What is the meaning of 'alpha' in this context? |
| 294 | alpha = index - round(index) |
| 295 | |
| 296 | channels = list('rgb') |
| 297 | values = dict() |
| 298 | |
| 299 | for k, v in zip(range(len(channels)), channels): |
| 300 | if alpha > 0: |
| 301 | values[v] = ( |
| 302 | (1.0 - alpha) |
| 303 | * |
| 304 | colors[int(index)][k] |
| 305 | + |
| 306 | alpha * colors[int(index) + 1][k] |
| 307 | ) |
| 308 | else: |
| 309 | values[v] = ( |
| 310 | (1.0 - alpha) |
| 311 | * |
| 312 | colors[int(index)][k] |
| 313 | ) |
| 314 | |
| 315 | if flat: |
| 316 | palette.extend( |
| 317 | tuple(int(values[i]) for i in channels)) |
| 318 | else: |
| 319 | palette.append( |
| 320 | tuple(int(values[i]) for i in channels)) |
| 321 | |
| 322 | return palette |
| 323 | |
| 324 | |
| 325 | def get_max_level(filename): |
| 326 | max_value = 0 |
| 327 | buffer_size = 4096 |
| 328 | audio_file = audiolab.Sndfile(filename, 'r') |
| 329 | n_samples_left = audio_file.nframes |
| 330 | |
| 331 | while n_samples_left: |
| 332 | to_read = min(buffer_size, n_samples_left) |
| 333 | |
| 334 | try: |
| 335 | samples = audio_file.read_frames(to_read) |
| 336 | except RuntimeError: |
| 337 | # this can happen with a broken header |
| 338 | break |
| 339 | |
| 340 | # convert to mono by selecting left channel only |
| 341 | if audio_file.channels > 1: |
| 342 | samples = samples[:,0] |
| 343 | |
| 344 | max_value = max(max_value, numpy.abs(samples).max()) |
| 345 | |
| 346 | n_samples_left -= to_read |
| 347 | |
| 348 | audio_file.close() |
| 349 | |
| 350 | return max_value |
| 351 | |
| 352 | if __name__ == '__main__': |
| 353 | import sys |
| 354 | sys.argv[4] = int(sys.argv[4]) |
| 355 | sys.argv[3] = tuple([int(i) for i in sys.argv[3].split('x')]) |
| 356 | |
| 357 | create_spectrogram_image(*sys.argv[1:]) |