Source code for pytta.classes.measurement

# -*- coding: utf-8 -*-

import os
import json
import zipfile
import numpy as np
import sounddevice as sd
import time
from pytta.classes import _base
from pytta.classes.streaming import Monitor, Streaming
from pytta.classes.signal import SignalObj, ImpulsiveResponse
from pytta import _h5utils as _h5
import traceback


# Measurement class
class _MeasurementBase(_base.PyTTaObj):
    """
    Measurement object class created to define some properties and methods to
    be used by the playback, recording and processing classes. It is a private
    class

    Properties(self): (default), (dtype), meaning;

        * device (system default), (list/int):
            list of input and output devices;

        * inChannels ([1]), (ChannelsList | list[int]):
            list of device's input channel used for recording;

        * outChannels ([1]), (ChannelsList | list[int]):
            list of device's output channel used for playing/reproducing\
            a signalObj;

    Properties inherited (default), (dtype): meaning;

        * samplingRate (44100), (int):
            signal's sampling rate;

        * lengthDomain ('time'), (str):
            signal's length domain. May be 'time' or 'samples';

        * timeLength (seconds), (float):
            signal's time length in seconds for lengthDomain = 'time';

        * fftDegree (fftDegree), (float):
            2**fftDegree signal's number of samples for
            lengthDomain = 'samples';

        * numSamples (samples), (int):
            signal's number of samples

        * freqMin (20), (int):
            minimum frequency bandwidth limit;

        * freqMax (20000), (int):
            maximum frequency bandwidth limit;

        * comment ('No comments.'), (str):
            some commentary about the signal or measurement object;
    """

    def __init__(self,
                 device=None,
                 inChannels=None,
                 outChannels=None,
                 blocking=True,
                 *args,
                 **kwargs):
        super().__init__(*args, **kwargs)
        # device number. For device list use sounddevice.query_devices()
        self.device = device
        self.inChannels = _base.ChannelsList(inChannels)
        self.outChannels = _base.ChannelsList(outChannels)
        self.blocking = blocking
        return

    def __repr__(self):
        return (f'{self.__class__.__name__}('
                # Measurement properties
                f'device={self.device!r}, '
                f'inChannels={self.inChannels!r}, '
                f'outChannels={self.outChannels!r}, '
                f'blocking={self.blocking!r}, '
                # PyTTaObj properties
                f'samplingRate={self.samplingRate!r}, '
                f'freqMin={self.freqMin!r}, '
                f'freqMax={self.freqMax!r}, '
                f'comment={self.comment!r}), '
                f'lengthDomain={self.lengthDomain!r}, '
                f'fftDegree={self.fftDegree!r}, '
                f'timeLength={self.timeLength!r}')

    def _to_dict(self):
        out = {'device': self.device,
               'inChannels': self.inChannels._to_dict(),
               'outChannels': self.outChannels._to_dict()}
        return out

    def _h5_save(self, h5group):
        """
        Saves itself inside a hdf5 group from an already opened file.
        """
        h5group.attrs['device'] = _h5.list_w_int_parser(self.device)
        h5group.attrs['inChannels'] = repr(self.inChannels)
        h5group.attrs['outChannels'] = repr(self.outChannels)
        h5group.attrs['blocking'] = self.blocking
        super()._h5_save(h5group)
        pass

# Measurement Properties
    @property
    def device(self):
        return self._device

    @device.setter
    def device(self, newDevice):
        self._device = newDevice
        return

    @property
    def numInChannels(self):
        return len(self.inChannels)

    @property
    def numOutChannels(self):
        return len(self.outChannels)

    def calib_pressure(self, chIndex,
                       refPrms=1.00, refFreq=1000):
        """
        calibPressure method: use informed SignalObj, with a calibration
        acoustic pressure signal, and the reference RMS acoustic pressure to
        calculate the Correction Factor.

            >>> SignalObj.calibPressure(chIndex,refSignalObj,refPrms,refFreq)

        Parameters:
        -------------

            * chIndex (), (int):
                channel index for calibration. Starts in 0;

            * refSignalObj (), (SignalObj):
                SignalObj with the calibration recorded signal;

            * refPrms (1.00), (float):
                the reference pressure provided by the acoustic calibrator;

            * refFreq (1000), (int):
                the reference sine frequency provided by the acoustic
                calibrator;
        """

        refSignalObj = RecMeasure(lengthDomain='time',
                                  timeLength=5,
                                  samplingRate=self.samplingRate,
                                  inChannels=chIndex,
                                  device=self.device,
                                  freqMin=self.freqMin,
                                  freqMax=self.freqMax).run()
        if chIndex-1 in range(len(self.inChannels)):
            self.inChannels[chIndex-1].calib_press(refSignalObj,
                                                   refPrms, refFreq)
            self.inChannels[chIndex-1].calibCheck = True
        else:
            raise IndexError('chIndex not in list of channel numbers')
        return


class Measurement(_MeasurementBase):
    """
    Measurement object class created to define some properties and methods to
    be used by the playback, recording and processing classes. It is a private
    class

    Properties(self): (default), (dtype), meaning;

        * device (system default), (list/int):
            list of input and output devices;

        * inChannels ([1]), (ChannelsList | list[int]):
            list of device's input channel used for recording;

        * outChannels ([1]), (ChannelsList | list[int]):
            list of device's output channel used for playing/reproducing\
            a signalObj;

    Properties inherited (default), (dtype): meaning;

        * samplingRate (44100), (int):
            signal's sampling rate;

        * lengthDomain ('time'), (str):
            signal's length domain. May be 'time' or 'samples';

        * timeLength (seconds), (float):
            signal's time length in seconds for lengthDomain = 'time';

        * fftDegree (fftDegree), (float):
            2**fftDegree signal's number of samples for
            lengthDomain = 'samples';

        * numSamples (samples), (int):
            signal's number of samples

        * freqMin (20), (int):
            minimum frequency bandwidth limit;

        * freqMax (20000), (int):
            maximum frequency bandwidth limit;

        * comment ('No comments.'), (str):
            some commentary about the signal or measurement object;
    """

    def __init__(self,
                 device=None,
                 inChannels=None,
                 outChannels=None,
                 blocking=True,
                 *args,
                 **kwargs):
        super().__init__(device, inChannels, outChannels, blocking, *args, **kwargs)
        return

    def play(self, monitor = Monitor(5512)):
        """
        Play SignalObj.

        Returns
        -------
        None.

        """
        with Streaming('O',
                       self.samplingRate,
                       self.device,
                       'float32',
                       0,
                       None,
                       self.outChannels,
                       self.excitation,
                       None,
                       self.numSamples,
                       monitor) as strm:
            strm.play()
        return

    def record(self, monitor = Monitor(5512)):
        with Streaming('I',
                       self.samplingRate,
                       self.device,
                       'float32',
                       0,
                       self.inChannels,
                       None,
                       None,
                       None,
                       self.numSamples,
                       monitor) as strm:
            rec = strm.record()
        return SignalObj(rec, 'time', self.samplingRate)

    def playrec(self, monitor = Monitor(5512)):
        with Streaming('IO',
                       self.samplingRate,
                       self.device,
                       'float32',
                       0,
                       self.inChannels,
                       self.outChannels,
                       self.excitation,
                       None,
                       self.numSamples,
                       monitor) as strm:
            rec = strm.playrec()
        return SignalObj(rec, 'time', self.samplingRate)


# RecMeasure class
[docs]class RecMeasure(_MeasurementBase): """ Recording object Properties: ------------ * lengthDomain ('time'), (str): signal's length domain. May be 'time' or 'samples'; * timeLength (seconds), (float): signal's time length in seconds for lengthDomain = 'time'; * fftDegree (fftDegree), (float): 2**fftDegree signal's number of samples for\ lengthDomain = 'samples'; * device (system default), (list/int): list of input and output devices; * inChannels ([1]), (list/int): list of device's input channel used for recording; * samplingRate (44100), (int): signal's sampling rate; * numSamples (samples), (int): signal's number of samples * freqMin (20), (float): minimum frequency bandwidth limit; * freqMax (20000), (float): maximum frequency bandwidth limit; * comment ('No comments.'), (str): some commentary about the signal or measurement object; Methods: --------- * run(): starts recording using the inch and device information, during timeLen seconds; """ def __init__(self, lengthDomain=None, fftDegree=None, timeLength=None, *args, **kwargs): super().__init__(*args, **kwargs) self.lengthDomain = lengthDomain if self.lengthDomain == 'samples': self.fftDegree = fftDegree elif self.lengthDomain == 'time': self.timeLength = timeLength else: self._timeLength = None self._fftDegree = None self._outChannels = None return def __repr__(self): return (f'{self.__class__.__name__}(' # RecMeasure properties f'lengthDomain={self.lengthDomain!r}, ' f'fftDegree={self.fftDegree!r}, ' f'timeLength={self.timeLength!r}, ' # Measurement properties f'device={self.device!r}, ' f'inChannels={self.inChannels!r}, ' f'blocking={self.blocking!r}, ' # PyTTaObj properties f'samplingRate={self.samplingRate!r}, ' f'freqMin={self.freqMin!r}, ' f'freqMax={self.freqMax!r}, ' f'comment={self.comment!r})') def _to_dict(self): sup = super()._to_dict() sup['fftDegree'] = self.fftDegree return sup def pytta_save(self, dirname=time.ctime(time.time())): dic = self._to_dict() name = dirname + '.pytta' with zipfile.ZipFile(name, 'w') as zdir: with open('RecMeasure.json', 'w') as f: json.dump(dic, f, indent=4) zdir.write('RecMeasure.json') return name def _h5_save(self, h5group, setClass=True): """ Saves itself inside a hdf5 group from an already openned file via pytta._h5_save(...). Use setClass=True if the attribute 'class' must be seted to RecMeasure. >>> RecMeasure._h5_save(h5group, setClass=True) """ if setClass is True: h5group.attrs['class'] = 'RecMeasure' h5group.attrs['lengthDomain'] = _h5.none_parser(self.lengthDomain) h5group.attrs['fftDegree'] = _h5.none_parser(self.fftDegree) h5group.attrs['timeLength'] = _h5.none_parser(self.timeLength) super()._h5_save(h5group) pass # Rec Properties @property def timeLength(self): return self._timeLength @timeLength.setter def timeLength(self, newLength): self._timeLength = np.round(newLength, 2) self._numSamples = int(self.timeLength * self.samplingRate) self._fftDegree = np.round(np.log2(self.numSamples), 2) return @property def fftDegree(self): return self._fftDegree @fftDegree.setter def fftDegree(self, newDegree): self._fftDegree = np.round(newDegree, 2) self._numSamples = 2**self.fftDegree self._timeLength = np.round(self.numSamples / self.samplingRate, 2) return # Rec Methods
[docs] def run(self): """ Run method: starts recording during Tmax seconds Outputs a signalObj with the recording content """ # Code snippet to guarantee that generated object name is # the declared at global scope # for frame, line in traceback.walk_stack(None): for framenline in traceback.walk_stack(None): # varnames = frame.f_code.co_varnames varnames = framenline[0].f_code.co_varnames if varnames == (): break # creation_file, creation_line, creation_function, \ # creation_text = \ extracted_text = \ traceback.extract_stack(framenline[0], 1)[0] # traceback.extract_stack(frame, 1)[0] # creation_name = creation_text.split("=")[0].strip() creation_name = extracted_text[3].split("=")[0].strip() # Record recording = sd.rec(frames=self.numSamples, samplerate=self.samplingRate, mapping=self.inChannels.mapping, blocking=self.blocking, device=self.device, latency='low', dtype='float32') recording = np.squeeze(recording) recording = SignalObj(signalArray=recording*self.inChannels.CFlist(), domain='time', samplingRate=self.samplingRate) recording.channels = self.inChannels recording.timeStamp = time.ctime(time.time()) recording.freqMin, recording.freqMax\ = self.freqMin, self.freqMax recording.comment = 'SignalObj from a Rec measurement' recording.creation_name = creation_name _print_max_level(recording, kind='input') return recording
# PlayRecMeasure class
[docs]class PlayRecMeasure(_MeasurementBase): """ Playback and Record object Properties: ------------ * excitation (SignalObj), (SignalObj): signal information used to reproduce (playback); * outputAmplification (0), (float): Gain in dB applied to the output channels. * device (system default), (list/int): list of input and output devices; * inChannels ([1]), (list/int): list of device's input channel used for recording; * outChannels ([1]), (list/int): list of device's output channel used for playing or reproducing a signalObj; * samplingRate (44100), (int): signal's sampling rate; * lengthDomain ('time'), (str): signal's length domain. May be 'time' or 'samples'; * timeLength (seconds), (float): signal's time length in seconds for lengthDomain = 'time'; * fftDegree (fftDegree), (float): 2**fftDegree signal's number of samples for\ lengthDomain = 'samples'; * numSamples (samples), (int): signal's number of samples * freqMin (20), (int): minimum frequency bandwidth limit; * freqMax (20000), (int): maximum frequency bandwidth limit; * comment ('No comments.'), (str): some commentary about the signal or measurement object; Methods: meaning; * run(): starts playing the excitation signal and recording during the excitation timeLen duration; """ def __init__(self, excitation=None, outputAmplification=0, *args, **kwargs): if excitation is None: self._excitation = None super().__init__(*args, **kwargs) else: self.excitation = excitation if 'freqMin' in kwargs: kwargs.pop('freqMin') if 'freqMax' in kwargs: kwargs.pop('freqMax') super().__init__(*args, samplingRate=excitation.samplingRate, freqMin=excitation.freqMin, freqMax=excitation.freqMax, fftDegree=excitation.fftDegree, timeLength=excitation.timeLength, lengthDomain=excitation.lengthDomain, numSamples=excitation.numSamples, **kwargs) self.outChannel = excitation.channels self.outputAmplification = outputAmplification return def __repr__(self): return (f'{self.__class__.__name__}(' # PlayRecMeasure properties f'excitation={self.excitation!r}, ' f'outputAmplification={self.outputAmplification!r}, ' # Measurement properties f'device={self.device!r}, ' f'inChannels={self.inChannels!r}, ' f'outChannels={self.outChannels!r}, ' f'blocking={self.blocking!r}, ' # PyTTaObj properties f'samplingRate={self.samplingRate!r}, ' f'freqMin={self.freqMin!r}, ' f'freqMax={self.freqMax!r}, ' f'comment={self.comment!r})') # PlayRec Methods
[docs] def run(self): """ Starts reproducing the excitation signal and recording at the same time Outputs a signalObj with the recording content """ # Code snippet to guarantee that generated object name is # the declared at global scope # for frame, line in traceback.walk_stack(None): for framenline in traceback.walk_stack(None): # varnames = frame.f_code.co_varnames varnames = framenline[0].f_code.co_varnames if varnames == (): break # creation_file, creation_line, creation_function, \ # creation_text = \ extracted_text = \ traceback.extract_stack(framenline[0], 1)[0] # traceback.extract_stack(framenline, 1)[0] # creation_name = creation_text.split("=")[0].strip() creation_name = extracted_text[3].split("=")[0].strip() timeStamp = time.ctime(time.time()) recording = sd.playrec(self.excitation.timeSignal* self.outputLinearGain, samplerate=self.samplingRate, input_mapping=self.inChannels.mapping, output_mapping=self.outChannels.mapping, device=self.device, blocking=self.blocking, latency='low', dtype='float32') recording = np.squeeze(recording) recording = SignalObj(signalArray=recording*self.inChannels.CFlist(), domain='time', samplingRate=self.samplingRate, freqMin=self.freqMin, freqMax=self.freqMax) recording.channels = self.inChannels recording.timeStamp = timeStamp recording.comment = 'SignalObj from a PlayRec measurement' recording.creation_name = creation_name _print_max_level(self.excitation, kind='output', gain=self.outputLinearGain, mapping=self.outChannels.mapping) _print_max_level(recording, kind='input') return recording
def _to_dict(self): sup = super()._to_dict() sup['excitationAddress'] = self.excitation._to_dict() return sup def pytta_save(self, dirname=time.ctime(time.time())): dic = self._to_dict() name = dirname + '.pytta' with zipfile.ZipFile(name, 'w') as zdir: excit = self.excitation.pytta_save('excitation') dic['excitationAddress'] = excit zdir.write(excit) os.remove(excit) with open('PlayRecMeasure.json', 'w') as f: json.dump(dic, f, indent=4) zdir.write('PlayRecMeasure.json') os.remove('PlayRecMeasure.json') return name def _h5_save(self, h5group, setClass=True): """ Saves itself inside a hdf5 group from an already openned file via pytta._h5_save(...). Use setClass=True if the attribute 'class' must be seted to PlayRecMeasure. >>> PlayRecMeasure._h5_save(h5group, setClass=True) """ if setClass is True: h5group.attrs['class'] = 'PlayRecMeasure' self.excitation._h5_save(h5group.create_group('excitation')) h5group.attrs['outputAmplification'] = self.outputAmplification super()._h5_save(h5group) pass # PlayRec Properties @property def excitation(self): return self._excitation @excitation.setter def excitation(self, newSignalObj): self._excitation = newSignalObj return @property def outputAmplification(self): return self._outputAmplification @outputAmplification.setter def outputAmplification(self, newOutputGain): self._outputAmplification = newOutputGain self.outputLinearGain = 10**(self._outputAmplification/20) return
# @property # def samplingRate(self): # return self.excitation._samplingRate # # @property # def fftDegree(self): # return self.excitation._fftDegree # # @property # def timeLength(self): # return self.excitation._timeLength # # @property # def numSamples(self): # return self.excitation._numSamples # # @property # def freqMin(self): # return self.excitation._freqMin # # @property # def freqMax(self): # return self.excitation._freqMax # FRFMeasure class
[docs]class FRFMeasure(PlayRecMeasure): """ Transferfunction object Properties: ------------ * excitation (SignalObj), (SignalObj): signal information used to reproduce (playback); * device (system default), (list | int): list of input and output devices; * inChannels ([1]), (list | int): list of device's input channel used for recording; * outChannels ([1]), (list | int): list of device's output channel used for playing or reproducing a signalObj; * samplingRate (44100), (int): signal's sampling rate; * lengthDomain ('time'), (str): signal's length domain. May be 'time' or 'samples'; * timeLength (seconds), (float): signal's time length in seconds for lengthDomain = 'time'; * fftDegree (fftDegree), (float): 2**fftDegree signal's number of samples for lengthDomain = 'samples'; * numSamples (samples), (int): signal's number of samples * freqMin (20), (int): minimum frequency bandwidth limit; * freqMax (20000), (int): maximum frequency bandwidth limit; * comment ('No comments.'), (str): some commentary about the signal or measurement object; Methods: --------- * run(): starts playing the excitation signal and recording during the excitation timeLen duration; """ def __init__(self, # Coordinate and orientation management being done trough # ChannelObj at in/out ChannelsList # coordinates={'points': [], # 'reference': 'south-west-floor corner', # 'unit': 'm'}, method='linear', winType=None, winSize=None, overlap=None, regularization=True, *args, **kwargs): super().__init__(*args, **kwargs) # self.coordinates = coordinates self.method = method self.winType = winType self.winSize = winSize self.overlap = overlap self.regularization = regularization return def __repr__(self): return (f'{self.__class__.__name__}(' # FRFMeasure properties f'method={self.method!r}, ' f'winType={self.winType!r}, ' f'winSize={self.winSize!r}, ' f'overlap={self.overlap!r}, ' # PlayRecMeasure properties f'excitation={self.excitation!r}, ' f'outputAmplification={self.outputAmplification!r}, ' # Measurement properties f'device={self.device!r}, ' f'inChannels={self.inChannels!r}, ' f'outChannels={self.outChannels!r}, ' f'blocking={self.blocking!r}, ' # PyTTaObj properties f'samplingRate={self.samplingRate!r}, ' f'freqMin={self.freqMin!r}, ' f'freqMax={self.freqMax!r}, ' f'comment={self.comment!r})') def pytta_save(self, dirname=time.ctime(time.time())): dic = self._to_dict() name = dirname + '.pytta' with zipfile.ZipFile(name, 'w') as zdir: excit = self.excitation.pytta_save('excitation') dic['excitationAddress'] = excit zdir.write(excit) os.remove(excit) with open('FRFMeasure.json', 'w') as f: json.dump(dic, f, indent=4) zdir.write('FRFMeasure.json') os.remove('FRFMeasure.json') return name def _h5_save(self, h5group, setClass=True): """ Saves itself inside a hdf5 group from an already openned file via pytta._h5_save(...). Use setClass=True if the attribute 'class' must be seted to FRFMeasure. >>> FRFMeasure._h5_save(h5group, setClass=True) """ if setClass is True: h5group.attrs['class'] = 'FRFMeasure' h5group.attrs['method'] = _h5.none_parser(self.method) h5group.attrs['winType'] = _h5.none_parser(self.winType) h5group.attrs['winSize'] = _h5.none_parser(self.winSize) h5group.attrs['overlap'] = _h5.none_parser(self.overlap) super()._h5_save(h5group, setClass=False) pass
[docs] def run(self): """ Starts reproducing the excitation signal and recording at the same time Outputs the transferfunction ImpulsiveResponse """ # Code snippet to guarantee that generated object name is # the declared at global scope # for frame, line in traceback.walk_stack(None): for framenline in traceback.walk_stack(None): # varnames = frame.f_code.co_varnames varnames = framenline[0].f_code.co_varnames if varnames == (): break # creation_file, creation_line, creation_function, \ # creation_text = \ extracted_text = \ traceback.extract_stack(framenline[0], 1)[0] # traceback.extract_stack(frame, 1)[0] # creation_name = creation_text.split("=")[0].strip() creation_name = extracted_text[3].split("=")[0].strip() recording = super().run() transferfunction = ImpulsiveResponse(self.excitation, recording, self.method, self.winType, self.winSize, self.overlap, self.regularization) transferfunction.timeStamp = recording.timeStamp transferfunction.creation_name = creation_name return transferfunction
# Sub functions def _print_max_level(sigObj, kind, gain=1, mapping=None): for chIndex in range(sigObj.numChannels): chNum = sigObj.channels.mapping[chIndex] if mapping is not None: chNumMap = mapping[chIndex] else: chNumMap = chNum # Calculating the final level with a linear gain applied linearRmsAmplitude = 10**(sigObj.max_level()[chIndex]/20) finalLevel = 20*np.log10(linearRmsAmplitude*gain) print('max {} level (excitation) on channel [{}]: ' .format(kind, chNumMap) + '{:.2f} {} - ref.: {} [{}]' .format(finalLevel, sigObj.channels[chNum].dBName, sigObj.channels[chNum].dBRef, sigObj.channels[chNum].unit)) if finalLevel >= 0: print('\x1b[0;30;43mATTENTION! CLIPPING OCCURRED\x1b[0m') return