process#
General purpose blockwise processing methods independent of the domain (time or frequency).
Calculate the average across consecutive time samples or frequency snapshots. |
|
Cache the output of a source in a file to avoid redundant computations. |
|
Distribute data from a source to multiple connected objects in a block-wise manner. |
|
Handle buffering of samples from a source. |
- class acoular.process.LockedGenerator(it)#
Bases:
objectThread-safe wrapper for an iterator or generator.
The
LockedGeneratorclass ensures that calls to the__next__method of the given iterator or generator are thread-safe, preventing race conditions when accessed by multiple threads simultaneously.It achieves thread safety by using a lock to serialize access to the underlying iterator or generator.
- Parameters:
- ititerator or generator
The iterator or generator to be made thread-safe.
See also
SampleSplitterDistribute data from a source to several following objects in a block-wise manner.
- class acoular.process.Average#
Bases:
InOutCalculate the average across consecutive time samples or frequency snapshots.
The average operation is performed differently depending on the source type. If the source is a time domain source (e.g. derived from
SamplesGenerator), the average is calculated over a certain number of time samples given bynum_per_average. If the source is a frequency domain source (e.g. derived fromSpectraGenerator), the average is calculated over a certain number of frequency snapshots given bynum_per_average.See also
InOutReceive data from any source domain and return signals in the same domain.
Examples
To estimate the RMS of a white noise (time-domain) signal, the average of the squared signal can be calculated:
>>> import acoular as ac >>> import numpy as np >>> >>> signal = ac.WNoiseGenerator(sample_freq=51200, num_samples=51200, rms=2.0).signal() >>> ts = ac.TimeSamples(data=signal[:, np.newaxis], sample_freq=51200) >>> tp = ac.TimePower(source=ts) >>> avg = ac.Average(source=tp, num_per_average=512) >>> mean_squared_value = next(avg.result(num=1)) >>> rms = np.sqrt(mean_squared_value)[0, 0] >>> print(rms) 1.9985200025816718
Here, each evaluation of the generator created by the
result()method of theAverageobject via thenext()function returnsnum=1average across a snapshot of 512 time samples.If the source is a frequency domain source, the average is calculated over a certain number of frequency snapshots, defined by
num_per_average.>>> fft = ac.RFFT(source=ts, block_size=64) >>> ps = ac.AutoPowerSpectra(source=fft) >>> avg = ac.Average(source=ps, num_per_average=16) >>> mean_power = next(avg.result(num=1)) >>> print(np.sqrt(mean_power.sum())) 2.0024960894399295
Here, the generator created by the
result()method of theAverageobject returns the average across 16 snapshots in the frequency domain.- num_per_average = Int(64, desc='number of samples/snapshots to average over')#
The number of samples (time domain source) or snapshots (frequency domain source) to average over. Default is
64.
- sample_freq = Property(depends_on=['source.sample_freq', 'num_per_average'])#
The sampling frequency of the output signal. It is set automatically as (
sample_freq/num_per_average).
- num_samples = Property(depends_on=['source.num_samples', 'num_per_average'])#
The number of samples (time domain) or snapshots (frequency domain) of the output signal. It is set automatically as (
num_samples/num_per_average).
- digest = Property(depends_on=['source.digest', 'num_per_average'])#
A unique identifier based on the class properties.
- result(num)#
Generate averaged output blocks from the source data.
This method implements a Python generator that yields blocks of averaged data from the source. The averaging is performed over
num_per_averagesamples (for time-domain sources) or snapshots (for frequency-domain sources). The size of the blocks yielded is defined by thenumparameter.- Parameters:
- num
int The number of averaged blocks to yield at a time. Each block contains the average over
num_per_averagetime samples or frequency snapshots. The last block may be shorter than the specified size if the remaining data is insufficient.
- num
- Yields:
numpy.ndarrayA 2D NumPy array of shape
(num, num_channels), wherenumis the number of averaged blocks requested, andnum_channelscorresponds to the number of channels in the source, as specified bynum_channels. Each entry in the array is the average overnum_per_averagesamples/snapshots.
Notes
- The averaging operation depends on the source type:
For time-domain sources (e.g., derived from
SamplesGenerator), the average is calculated overnum_per_averagetime samples.For frequency-domain sources (e.g., derived from
SpectraGenerator), the average is calculated overnum_per_averagefrequency snapshots.
The generator will stop yielding when the source data is exhausted.
If the source provides fewer than
num * num_per_averagesamples, the final block may be smaller than the requestednumsize.
- class acoular.process.Cache#
Bases:
InOutCache the output of a source in a file to avoid redundant computations.
The
Cacheclass stores the output of a source (derived fromGenerator) in a cache file within the Acoular cache directory. This enables faster reuse of precomputed data by avoiding time-consuming recalculations. The cache behavior is managed through theConfigclass by setting theglobal_cachingattribute.The class intelligently determines whether to use the cached data, update it, or bypass caching based on the global caching configuration and the state of the cache file. The caching mechanism supports scenarios such as:
Reading from a complete or incomplete cache.
Overwriting an existing cache.
Operating in a read-only or no-cache mode.
See also
InOutReceive data from any source domain and return signals in the same domain.
Examples
Caching the output of an FFT computation:
>>> import acoular as ac >>> import numpy as np >>> >>> ac.config.h5library = 'tables' >>> data = np.random.rand(1024, 1) >>> ts = ac.TimeSamples(data=data, sample_freq=51200) >>> fft = ac.RFFT(source=ts, block_size=1024) >>> cache = ac.Cache(source=fft) # cache the output of the FFT in cache file >>> for block in cache.result(num=1): # read the cached data block-wise ... print(block.shape) [('void_cache.h5', 1)] (1, 513)
Disabling caching globally:
>>> ac.config.global_caching = 'none'
Changing the cache directory:
>>> ac.config.cache_dir = '/path/to/cache_dir'
- digest = Property(depends_on=['source.digest'])#
A unique identifier based on the cache properties.
- result(num)#
Generate data blocks from the source, using cache when available.
This method acts as a Python generator that yields blocks of output data from the source, reading from the cache file when possible. The size of the data blocks is determined by the
numparameter. The caching mechanism helps prevent redundant calculations by storing and reusing the source’s output.- Parameters:
- num
int The number of time samples or frequency snapshots per block to yield. The final block may be smaller if there is insufficient data.
- num
- Yields:
numpy.ndarrayA 2D NumPy array of shape
(num, num_channels)representing the output data. Each block is either retrieved from the cache file or generated by the source and cached dynamically during processing.
Notes
The behavior of the caching mechanism depends on the
global_cachingsetting:'none': Bypasses caching and directly retrieves data from the source.'readonly': Reads data from the cache if available; otherwise, retrieves data from the source without caching.'overwrite': Replaces any existing cache with newly computed data.
If the cache file is incomplete or corrupted, the method may generate new data from the source to update the cache unless the caching mode is
'readonly'.The cache node name is based on the source’s
digestattribute.
- class acoular.process.SampleSplitter#
Bases:
InOutDistribute data from a source to multiple connected objects in a block-wise manner.
The
SampleSplitterclass is designed to manage the distribution of data blocks from a single source object, derived fromGenerator, to multiple target objects, also derived fromGenerator. Each connected target object is assigned a dedicated buffer to hold incoming data blocks. These buffers operate in a first-in-first-out (FIFO) manner, ensuring efficient and parallelized data handling.This class is particularly useful when distributing data blocks from a streaming source to multiple downstream processing objects.
Each registered target object maintains its own dedicated block buffer, allowing for independent data management. The buffer size can be customized per object, and different overflow handling strategies can be configured, such as raising an error, issuing a warning, or discarding old data. This ensures efficient parallel data processing, making it well-suited for complex workflows.
Notes
Buffers are dynamically created and managed for each registered object.
Buffer overflow behavior can be set individually for each target object.
Examples
Consider a time-domain signal stream where the FFT spectra and signal power are calculated block-by-block and in parallel using the
RFFT,TimePower, andAverageobjects. TheSampleSplitteris responsible for distributing incoming data blocks to the buffers of theRFFTandTimePowerobjects whenever either object requests data via theresult()generator.For the
TimePowerobject, the buffer size is set to 10 blocks. If the buffer is full, an error is raised, as the buffer overflow treatment is set to'error'. For theRFFTobject, the buffer size is limited to 1 block, and the overflow treatment is set to'none'. This setup helps reduce latency in FFT calculations, which may take longer than signal power calculations. If new data arrives and theRFFTbuffer is full, theSampleSplitterwill discard the oldest block, ensuring that theRFFTobject always receives the most recent block of data.>>> import acoular as ac >>> import numpy as np >>> >>> # create a time domain signal source >>> ts = ac.TimeSamples(data=np.random.rand(1024, 1), sample_freq=51200) >>> >>> # create the sample splitter object >>> ss = ac.SampleSplitter(source=ts) >>> >>> # create the FFT spectra and further objects that receive the data >>> fft = ac.RFFT(source=ss, block_size=64) >>> pow = ac.TimePower(source=ss) >>> avg = ac.Average(source=pow, num_per_average=64) >>> >>> # register the subsequent processing block objects at the sample splitter >>> ss.register_object(fft, buffer_size=1, buffer_overflow_treatment='none') >>> ss.register_object(pow, buffer_size=10, buffer_overflow_treatment='error')
After object registration, the
SampleSplitterobject is ready to distribute the data to the object buffers. The block buffers can be accessed via theblock_bufferattribute of theSampleSplitterobject.>>> ss.block_buffer.values() dict_values([deque([], maxlen=1), deque([], maxlen=10)])
Calling the result method of the FFT object will start the data collection and distribution process.
>>> generator = fft.result(num=1) >>> fft_res = next(generator)
Although we haven’t called the result method of the signal power object, one data block is already available in the buffer.
>>> print(len(ss.block_buffer[pow])) 1
To remove registered objects from the
SampleSplitter, use theremove_object()method.>>> ss.remove_object(pow) >>> print(len(ss.block_buffer)) 1
- block_buffer = Dict(key_trait=Instance(Generator))#
A dictionary containing block buffers for registered objects. Keys are the registered objects, and values are deque structures holding data blocks.
- buffer_size = Union( …#
The maximum number of blocks each buffer can hold. Can be set globally for all objects or individually using a dictionary.
- buffer_overflow_treatment = Dict( …#
Defines behavior when a buffer exceeds its maximum size.
- register_object(*objects_to_register, buffer_size=None, buffer_overflow_treatment=None)#
Register one or more target objects to the
SampleSplitterobject.This method creates and configures block buffers for the specified target objects, enabling them to receive data blocks from the
SampleSplitter. Each registered object is assigned a dedicated buffer with customizable size and overflow behavior.- Parameters:
- objects_to_register
Generatoror list ofGenerator A single object or a list of objects derived from
Generatorto be registered as targets for data distribution.- buffer_size
int, optional The maximum number of data blocks each object’s buffer can hold. If not specified, the default buffer size (100 blocks) is used, or a globally defined size if
buffer_sizeis a dictionary.- buffer_overflow_treatment
str, optional Defines the behavior when a buffer exceeds its maximum size. Options are:
'error': Raises anIOErrorwhen the buffer overflows.'warning': Issues a warning and may result in data loss.'none': Silently discards the oldest data blocks to make room for new ones. If not specified, the default behavior is'error'.
- objects_to_register
- Raises:
OSErrorIf any of the specified objects is already registered.
- remove_object(*objects_to_remove)#
Unregister one or more objects from the
SampleSplitter.This method removes the specified objects and their associated block buffers from the
SampleSplitter. If no objects are specified, all currently registered objects are unregistered, effectively clearing all buffers.- Parameters:
- objects_to_remove
Generatoror list ofGenerator, optional A single object or a list of objects derived from
Generatorto be removed from theSampleSplitter. If no objects are provided, all registered objects will be removed.
- objects_to_remove
- Raises:
KeyErrorIf any of the specified objects are not currently registered.
Notes
Once an object is removed, it will no longer receive data from the
SampleSplitter.Removing an object also clears its associated buffer.
- result(num)#
Yield data blocks from the buffer to the calling object.
This generator method retrieves blocks of data for the calling object, either from its dedicated block buffer or by processing new data from the source. If the buffer is empty, new data blocks are generated and distributed to all registered objects in a block-wise manner.
- Parameters:
- num
int The size of each block to be yielded, defined as the number of samples per block.
- num
- Yields:
numpy.ndarrayBlocks of data with shape
(num, num_channels). The last block may be shorter thannumif the source data is exhausted.
- Raises:
OSErrorIf the calling object is not registered with the
SampleSplitter.OSErrorIf the block buffer reaches its maximum size and the overflow handling policy is set to
'error'.
Notes
If the block buffer is empty, new data is fetched from the source and distributed to all registered objects.
Buffer overflow behavior is controlled by the
buffer_overflow_treatmentattribute, which can be set to'error','warning', or'none'.
- class acoular.process.SamplesBuffer#
Bases:
InOutHandle buffering of samples from a source.
The
SamplesBufferclass buffers samples from a source and provides them in blocks of a specified size. It supports various use cases for efficient handling of sample data. Below is an example demonstrating its functionality.Examples
Suppose we want to draw blocks of 16 samples from the source, while ensuring that the buffer always holds twice that number (32 samples). The following code achieves this behavior:
>>> import acoular as ac >>> import numpy as np >>> # create a white noise source with 512 samples >>> source = ac.TimeSamples( ... data=ac.WNoiseGenerator( ... sample_freq=64, ... num_samples=512, ... ).signal()[:, np.newaxis], ... sample_freq=64, ... ) >>> # create a buffer with a size of 32 samples >>> buffer = ac.process.SamplesBuffer(source=source, length=32) >>> # get the first block of 16 samples >>> block = next(buffer.result(num=16)) >>> np.testing.assert_array_equal(block, source.data[:16])
In the example above, the buffer initially collects blocks of the specified size from the source. It then returns the first block of 16 samples. With subsequent calls to the
result()method, the buffer refills and returns additional blocks of 16 samples.In some cases, you may wish to retrieve a different number of samples from the source than you want to return. This can be achieved by setting the
source_numattribute. For example, in theBeamformerTimeTrajclass, the number of time samples varies based on the expected delay for moving sources, while still adhering to the desired block size for the buffer.The
shift_index_byattribute controls how the buffer updates its index when retrieving data. If set to'num', the buffer returnsresult_numsamples but forgets'num'samples from the buffer. If set toresult_num, the buffer will return and forget the same number of samples.>>> buffer = ac.process.SamplesBuffer(source=source, length=32, result_num=20, shift_index_by='num') >>> block_sizes = [] >>> block_sizes.append( ... next(buffer.result(num=16)).shape[0] ... ) # this time, the buffer will return 20 samples, but the buffer will only forget the first 16 samples >>> buffer.result_num = 24 >>> block_sizes.append( ... next(buffer.result(num=16)).shape[0] ... ) # this time, the buffer will return 24 samples, but the buffer will only forget the first 16 samples >>> np.testing.assert_array_equal(block_sizes, [20, 24])
- length = Int(desc='number of samples that fit in the buffer')#
The number of samples that the buffer can hold.
- source_num = Union( …#
The number of samples per block to obtain from the source. If set to
None, the number of samples will be determined by thenumargument of theresult()method.
- result_num = Union( …#
The number of samples to return from the buffer. If set to
None, the number of samples will be determined by thenumargument of theresult()method.
- shift_index_by = Enum( …#
Index shift value for the buffer.
If set to
'result_num', the buffer will return and forgetresult_numsamples.If set to
'num', the buffer will returnresult_numsamples but forgetnumsamples.
- level = Property(desc='current filling level of buffer')#
The current filling level of the buffer, i.e., how many samples are currently available.
- dtype = Any(desc='data type of the buffer')#
The data type of the elements in the buffer.
- increase_buffer(num)#
Increase the size of the buffer by a specified number of samples.
This method expands the buffer by appending additional samples, effectively increasing its capacity. The new samples are initialized to zero. The index of the buffer is adjusted accordingly to accommodate the increase.
- Parameters:
- num
int The number of samples by which to increase the buffer size.
- num
- read_from_buffer(num)#
Read a specified number of samples from the buffer.
This method retrieves samples from the buffer, ensuring that the requested number of samples is returned. If the buffer contains fewer samples than requested, the method will return all available samples. The index of the buffer is updated based on the
shift_index_bysetting.- Parameters:
- num
int The number of samples to read from the buffer.
- num
- Returns:
numpy.ndarrayA block of samples (array) from the buffer.
Notes
If the
result_numattribute is set, it determines the number of samples to return.The method ensures the buffer index is adjusted according to the
shift_index_bysetting. Options are:'result_num': The index will shift by the number of samples returned.'num': The index will shift by the number of samples requested (num).
- fill_buffer(snum)#
Fill the buffer with samples from the source.
The
fill_buffer()method collects samples from the source and writes them to the buffer. It continues to fill the buffer until there are enough samples available, or the source runs out of data. If the buffer reaches its maximum capacity, additional samples are discarded. The buffer will only contain the most recent data, and its index will be updated accordingly.- Parameters:
- snum
int The number of samples to retrieve from the source in each iteration.
- snum
- Yields:
NoneThis method is a generator and yields control back after filling the buffer.
Notes
The method ensures that the buffer is filled with the required number of samples, adjusting the buffer size if necessary (via the
increase_buffer()method) when more space is needed.Once the buffer is filled, it yields control and resumes only when the buffer is ready for more data.
- result(num)#
Return blocks of samples from the buffer.
The
result()method retrieves blocks of samples from the buffer and yields them to the calling process. The number of samples per block is determined by thenumargument, but can also be influenced by other attributes like result_num (if set). If the buffer is not yet filled, it will continue to collect samples from the source until the buffer contains enough data. Once the buffer is full, it will return the requested blocks of samples.- Parameters:
- num
int The number of samples to return in each block. This value specifies the size of the blocks to be yielded from the buffer.
- num
- Yields:
numpy.ndarrayA block of samples from the buffer. The size of the block is determined by the
numparameter or theresult_numattribute, depending on the buffer’s configuration.
Notes
If
result_numis set, the method will use it to determine the number of samples returned instead of thenumparameter.If the buffer is empty or does not have enough samples, it will attempt to fill the buffer by collecting data from the source. If there are not enough samples available from the source, the method will yield whatever samples are left in the buffer.