Source code for pytta.classes.filter

# -*- coding: utf-8 -*-

import numpy as np
from copy import copy
from scipy import signal as ss
from pytta.classes import SignalObj
from pytta.classes._base import ChannelsList
from pytta.utils import fractional_octave_frequencies, freq_to_band, \
                        normalize_frequencies, freqs_to_center_and_edges


class FilterBase(object):
    """Base class for filters."""

    def __init__(self, order: int, samplingrate: int):
        self.samplingRate = samplingrate
        self.order = order
        return

    def __call__(self, sigobj: SignalObj):
        """
        Filter the signal object.

        For each channel inside the input signalObj, will be generated a new SignalObj with the channel filtered signal.

        Args:
            sigobj: SignalObj

        Return:
            output: List
                A list containing one SignalObj with the filtered data for each channel in the original signalObj.

        """
        return self.filter(sigobj)



[docs]class OctFilter(FilterBase): """ Octave filter. """
[docs] def __init__(self, order: int = None, nthOct: int = None, samplingRate: int = None, minFreq: float = None, maxFreq: float = None, refFreq: float = None, base: int = None) -> None: """ Parameters ---------- order : int, optional DESCRIPTION. The default is None. nthOct : int, optional DESCRIPTION. The default is None. samplingRate : int, optional DESCRIPTION. The default is None. minFreq : float, optional DESCRIPTION. The default is None. maxFreq : float, optional DESCRIPTION. The default is None. refFreq : float, optional DESCRIPTION. The default is None. base : int, optional DESCRIPTION. The default is None. Returns ------- None DESCRIPTION. """ FilterBase.__init__(self, order, samplingRate) self.nthOct = nthOct self.minFreq = minFreq self.minBand = freq_to_band(minFreq, nthOct, refFreq, base) self.maxFreq = maxFreq self.maxBand = freq_to_band(maxFreq, nthOct, refFreq, base) self.refFreq = refFreq self.base = base self.sos = self.get_sos_filters() return
def __enter__(self): return self def __exit__(self, kind, value, traceback): if traceback is None: return else: raise value def __design_sos_butter(self, bandEdges: np.ndarray, order: int = 4, samplingRate: int = 44100) -> np.ndarray: sos = np.zeros((order, 6, len(bandEdges))) for i, edges in enumerate(bandEdges): if edges[1] >= samplingRate//2: edges[1] = samplingRate//2 - 1 sos[:, :, i] = ss.butter(N=order, Wn=np.array([edges[0], edges[1]]), btype='bp', output='sos', fs=samplingRate) return sos def get_sos_filters(self) -> np.ndarray: freqs = fractional_octave_frequencies(self.nthOct, (self.minFreq, self.maxFreq), self.refFreq, self.base) self.center, edges = freqs_to_center_and_edges(freqs) return self.__design_sos_butter(edges, self.order, self.samplingRate) # def filter(self, sigobj): # print(":WARNING: `OctFilter.filter` method will soon be deprecated.") # return self._filter(sigobj)
[docs] def filter(self, sigobj): """ Filter the signal object. For each channel inside the input signalObj, will be generated a new SignalObj with the channel filtered signal. Args: sigobj: SignalObj Return: output: List A list containing one SignalObj with the filtered data for each channel in the original signalObj. """ if self.samplingRate != sigobj.samplingRate: raise ValueError("SignalObj must have same sampling rate of filter to be filtered.") n = self.sos.shape[2] output = [] chl = [] for ch in range(sigobj.numChannels): sobj = sigobj[ch] filtered = np.zeros((sobj.numSamples, n)) for k in range(n): cContigousArray = sobj.timeSignal[:].copy(order='C') filtered[:, k] = ss.sosfilt(self.sos[:, :, k].copy(order='C'), cContigousArray, axis=0).T chl.append(copy(sobj.channels[sobj.channels.mapping[0]])) chl[-1].num = k+1 chl[-1].name = f'Band {k+1}' chl[-1].code = f'B{k+1}' signalDict = {'signalArray': filtered, 'domain': 'time', 'samplingRate': self.samplingRate, 'freqMin': sigobj.freqMin, 'freqMax': sigobj.freqMax, } out = SignalObj(**signalDict) out.channels = ChannelsList(chl) # out.timeSignal = out.timeSignal * out.channels.CFlist() output.append(out) chl.clear() return output
class AntiAliasingFilter(object): def __init__(self, order, band, samplingRate): self.order = order self.band = band self.samplingRate = samplingRate self.__design() return def __design(self): self.sos = np.zeros((self.order, 6)) if self.band[1] > 22050: self.band[1] = 22050 elif self.band[0] < 12.5: self.band[0] = 12.5 self.sos[:, :] = ss.butter(N=self.order, Wn=np.array(self.band), btype='bp', output='sos', fs=self.samplingRate) return def filter(self, signalObj): if self.samplingRate != signalObj.samplingRate: raise ValueError("SignalObj must have same sampling\ rate of filter to be filtered.") output = [] for ch in range(signalObj.numChannels): filtered = np.zeros((signalObj.numSamples)) filtered[:] = ss.sosfilt(self.sos[:, :], signalObj.timeSignal[:, ch], axis=0).T signalDict = {'signalArray': filtered, 'domain': 'time', 'samplingRate': self.samplingRate, 'freqMin': self.band[0], 'freqMax': self.band[1]} output.append(SignalObj(**signalDict)) else: return output #class SPLWeight(object): k1 = 12194 k2 = 20.6 k3 = 107.7 k4 = 737.9 k5 = 158.5 k6 = 0.008304630545453301 k7 = 282.7 k8 = 1160 fc = 1000 def __Ra(freq): Ra = (k1**2)*(freq**4) / ((freq**2 + k2**2) * (((freq**2 + k3**2) * (freq**2 + k4**2))**0.5) * (freq**2 + k1**2)) return Ra def __A(freq): A = round(20*np.log10(__Ra(freq)) - 20*np.log10(__Ra(fc)), 2) return A def __Rb(freq): Rb = (k1**2)*(freq**3) / ((freq**2 + k2**2) * ((freq**2 + k5**2)**0.5) * (freq**2 + k1**2)) return Rb def __B(freq): B = round(20*np.log10(__Rb(freq)) - 20*np.log10(__Rb(fc)), 2) return B def __Rc(freq): Rc = (k1**2)*(freq**2) / ((freq**2 + k2**2) * (freq**2 + k1**2)) return Rc def __C(freq): C = round(20*np.log10(__Rc(freq)) - 20*np.log10(__Rc(fc)), 2) return C def __h(freq): h = ((1037918.48 - freq**2)**2 + 1080768.16*(freq**2))\ / ((9837328 - freq**2)**2 + 11723776*(freq**2)) return h def __Rd(freq): Rd = (freq / k6**2) * (__h(freq) / ((freq**2 + k7**2) * (freq**2 + k8**2)))**0.5 return Rd def __D(freq): D = round(20*np.log10(__Rd(freq)) - 20*np.log10(__Rd(fc)), 2) return D _categories = ['20', '25', '31.5', '40', '50', '63', '80', '100', '125', '160', '200', '250', '315', '400', '500', '630', '800', '1000', '1250', '1600', '2000', '2500', '3150', '4000', '5000', '6300', '8000', '10000', '12500', '16000', '20000']
[docs]def weighting(kind='A', nth=None, freqs=None): """ Level weighting curve. Parameters ---------- kind : TYPE, optional DESCRIPTION. The default is 'A'. nth : TYPE, optional DESCRIPTION. The default is None. freqs : TYPE, optional DESCRIPTION. The default is None. Returns ------- np.ndarray The weighting curve in dB. """ out = [] if freqs is not None: for freq in freqs: out.append(eval('__' + kind + '(' + freq + ')')) elif nth is not None: if nth == 1: for val in _categories[2::3]: out.append(eval('__' + kind + '(' + val + ')')) elif nth == 3: for val in _categories: out.append(eval(kind+'('+val+')')) return np.asarray(out, ndmin=2).T