Source code for edgel3.core

import os
import resampy
import traceback
import sklearn.decomposition
import soundfile as sf
import numpy as np
from numbers import Real
import warnings
import keras
from edgel3.models import load_embedding_model
from edgel3.edgel3_exceptions import EdgeL3Error
from edgel3.edgel3_warnings import EdgeL3Warning

L3_TARGET_SR = 48000
SEA_TARGET_SR = 8000

[docs]def _center_audio(audio, frame_len): """Center audio so that first sample will occur in the middle of the first frame""" return np.pad(audio, (int(frame_len / 2.0), 0), mode='constant', constant_values=0)
[docs]def _pad_audio(audio, frame_len, hop_len): """Pad audio if necessary so that all samples are processed""" audio_len = audio.size if audio_len < frame_len: pad_length = frame_len - audio_len else: pad_length = int(np.ceil((audio_len - frame_len)/float(hop_len))) * hop_len \ - (audio_len - frame_len) if pad_length > 0: audio = np.pad(audio, (0, pad_length), mode='constant', constant_values=0) return audio
[docs]def get_embedding(audio, sr, model=None, model_type='sparse', emb_dim=128, retrain_type='ft', sparsity=95.45, center=True, hop_size=0.1, verbose=1): """Computes and returns L3 embedding for an audio data from pruned audio model. Parameters ---------- audio : np.ndarray [shape=(N,) or (N,C)] 1D numpy array of audio data. sr : int Sampling rate, if not 48kHz or 8kHz will audio will be resampled for `sparse` and `sea` models respectively. model : keras.models.Model or None Loaded model object. If a model is provided, then `sparsity` will be ignored. If None is provided, the desired version of smaller L3 will be loaded, determined by `model_type`. model will be loaded using model_type : {'sea', 'sparse'} Type of smaller version of L3 model. If `sea` is selected, the audio model is a UST specialized (SEA) model. `sparse` gives a sparse L3 model with the desired 'sparsity'. emb_dim : {512, 256, 128, 64} Desired embedding dimension of the UST specialized embedding approximated (SEA) models. Not used for `sparse` models. retrain_type : {'ft', 'kd'} Type of retraining for the sparsified weights of L3 audio model. `ft` chooses the fine-tuning method and `kd` returns knowledge distilled model. sparsity : {95.45, 53.5, 63.5, 72.3, 87.0} The desired sparsity of audio model. center : boolean If True, pads beginning of signal so timestamps correspond to center of window. hop_size : float Hop size in seconds. verbose : 0 or 1 Keras verbosity. Returns ------- embedding : np.ndarray [shape=(T, D)] Array of embeddings for each window. timestamps : np.ndarray [shape=(T,)] Array of timestamps corresponding to each embedding in the output. """ if audio.size == 0: raise EdgeL3Error('Got empty audio') # Warn user if audio is all zero if np.all(audio == 0): warnings.warn('Provided audio is all zeros', EdgeL3Warning) if model is not None and not isinstance(model, keras.models.Model): raise EdgeL3Error('Invalid model provided. Must be of type keras.model.Models' ' but got {}'.format(str(type(model)))) if model_type not in ('sea', 'sparse'): raise EdgeL3Error('Invalid EdgeL3 model type {}'.format(model_type)) if emb_dim not in (512, 256, 128, 64): raise EdgeL3Error('Invalid embedding dimension value {}'.format(emb_dim)) if retrain_type not in ('ft', 'kd'): raise EdgeL3Error('Invalid re-training type {}'.format(retrain_type)) if not isinstance(sparsity, Real) or sparsity <= 0: raise EdgeL3Error('Invalid sparsity value {}'.format(sparsity)) if sparsity not in (53.5, 63.5, 72.3, 87.0, 95.45): raise EdgeL3Error('Invalid sparsity value {}'.format(sparsity)) if not isinstance(hop_size, Real) or hop_size <= 0: raise EdgeL3Error('Invalid hop size {}'.format(hop_size)) if verbose not in (0, 1): raise EdgeL3Error('Invalid verbosity level {}'.format(verbose)) if center not in (True, False): raise EdgeL3Error('Invalid center value {}'.format(center)) TARGET_SR = L3_TARGET_SR if model_type == 'sparse' else SEA_TARGET_SR # Check audio array dimension if audio.ndim > 2: raise EdgeL3Error('Audio array can only be be 1D or 2D') elif audio.ndim == 2: # Downmix if multichannel audio = np.mean(audio, axis=1) # Resample if necessary if sr != TARGET_SR: audio = resampy.resample(audio, sr_orig=sr, sr_new=TARGET_SR, filter='kaiser_best') # Get embedding model if model is None: model = load_embedding_model( model_type, emb_dim=emb_dim, retrain_type=retrain_type, sparsity=sparsity ) audio_len = audio.size frame_len = TARGET_SR hop_len = int(hop_size * TARGET_SR) if audio_len < frame_len: warnings.warn('Duration of provided audio is shorter than window size (1 second). Audio will be padded.', EdgeL3Warning) if center: # Center audio audio = _center_audio(audio, frame_len) # Pad if necessary to ensure that we process all samples audio = _pad_audio(audio, frame_len, hop_len) # Split audio into frames, copied from librosa.util.frame n_frames = 1 + int((len(audio) - frame_len) / float(hop_len)) x = np.lib.stride_tricks.as_strided(audio, shape=(frame_len, n_frames), strides=(audio.itemsize, hop_len * audio.itemsize)).T # Add a channel dimension x = x.reshape((x.shape[0], 1, x.shape[-1])) # Get embedding and timestamps embedding = model.predict(x, verbose=verbose) ts = np.arange(embedding.shape[0]) * hop_size return embedding, ts
[docs]def process_file(filepath, output_dir=None, suffix=None, model=None, model_type='sparse', emb_dim=128, sparsity=95.45, center=True, hop_size=0.1, verbose=True): """Computes and saves L3 embedding for given audio file Parameters ---------- filepath : str Path to WAV file to be processed. output_dir : str or None Path to directory for saving output files. If None, output files will be saved to the directory containing the input file. suffix : str or None String to be appended to the output filename, i.e. <base filename>_<suffix>.npz. If None, then no suffix will be added, i.e. <base filename>.npz. model : keras.models.Model or None Loaded model object. If a model is provided, then `model_type` will be ignored. If None is provided, UST specialized L3 or sparse L3 is loaded according to the ``model_type``. model_type : {'sea', 'sparse'} Type of smaller version of L3 model. If `sea` is selected, the audio model is a UST specialized (SEA) model. `sparse` gives a sparse L3 model with the desired 'sparsity'. emb_dim : {512, 256, 128, 64} Desired embedding dimension of the UST specialized embedding approximated (SEA) models. Not used for `sparse` models. sparsity : {95.45, 53.5, 63.5, 72.3, 87.0} The desired sparsity of audio model. center : boolean If True, pads beginning of signal so timestamps correspond to center of window. hop_size : float Hop size in seconds. verbose : 0 or 1 Keras verbosity. Returns ------- """ if not os.path.exists(filepath): raise EdgeL3Error('File "{}" could not be found.'.format(filepath)) try: audio, sr = sf.read(filepath) except Exception: raise EdgeL3Error('Could not open file "{}":\n{}'.format(filepath, traceback.format_exc())) if not suffix: suffix = "" output_path = get_output_path(filepath, suffix + ".npz", output_dir=output_dir) embedding, ts = get_embedding( audio, sr, model=model, model_type=model_type, emb_dim=emb_dim, sparsity=sparsity, center=center, hop_size=hop_size, verbose=1 if verbose else 0 ) np.savez(output_path, embedding=embedding, timestamps=ts) assert os.path.exists(output_path)
[docs]def get_output_path(filepath, suffix, output_dir=None): """ Parameters ---------- filepath : str Path to audio file to be processed. suffix : str String to append to filename (including extension) output_dir : str or None Path to directory where file will be saved. If None, will use directory of given filepath. Returns ------- output_path : str Path to output file. """ base_filename = os.path.splitext(os.path.basename(filepath))[0] if not output_dir: output_dir = os.path.dirname(filepath) if suffix[0] != '.': output_filename = "{}_{}".format(base_filename, suffix) else: output_filename = base_filename + suffix return os.path.join(output_dir, output_filename)