process#

General purpose blockwise processing methods independent of the domain (time or frequency).

Average

Calculate the average across consecutive time samples or frequency snapshots.

Cache

Cache the output of a source in a file to avoid redundant computations.

SampleSplitter

Distribute data from a source to multiple connected objects in a block-wise manner.

SamplesBuffer

Handle buffering of samples from a source.

class acoular.process.LockedGenerator(it)#

Bases: object

Thread-safe wrapper for an iterator or generator.

The LockedGenerator class ensures that calls to the __next__ method of the given iterator or generator are thread-safe, preventing race conditions when accessed by multiple threads simultaneously.

It achieves thread safety by using a lock to serialize access to the underlying iterator or generator.

Parameters:
ititerator or generator

The iterator or generator to be made thread-safe.

See also

SampleSplitter

Distribute data from a source to several following objects in a block-wise manner.

class acoular.process.Average#

Bases: InOut

Calculate the average across consecutive time samples or frequency snapshots.

The average operation is performed differently depending on the source type. If the source is a time domain source (e.g. derived from SamplesGenerator), the average is calculated over a certain number of time samples given by num_per_average. If the source is a frequency domain source (e.g. derived from SpectraGenerator), the average is calculated over a certain number of frequency snapshots given by num_per_average.

See also

InOut

Receive data from any source domain and return signals in the same domain.

Examples

To estimate the RMS of a white noise (time-domain) signal, the average of the squared signal can be calculated:

>>> import acoular as ac
>>> import numpy as np
>>>
>>> signal = ac.WNoiseGenerator(sample_freq=51200, num_samples=51200, rms=2.0).signal()
>>> ts = ac.TimeSamples(data=signal[:, np.newaxis], sample_freq=51200)
>>> tp = ac.TimePower(source=ts)
>>> avg = ac.Average(source=tp, num_per_average=512)
>>> mean_squared_value = next(avg.result(num=1))
>>> rms = np.sqrt(mean_squared_value)[0, 0]
>>> print(rms)
1.9985200025816718

Here, each evaluation of the generator created by the result() method of the Average object via the next() function returns num=1 average across a snapshot of 512 time samples.

If the source is a frequency domain source, the average is calculated over a certain number of frequency snapshots, defined by num_per_average.

>>> fft = ac.RFFT(source=ts, block_size=64)
>>> ps = ac.AutoPowerSpectra(source=fft)
>>> avg = ac.Average(source=ps, num_per_average=16)
>>> mean_power = next(avg.result(num=1))
>>> print(np.sqrt(mean_power.sum()))
2.0024960894399295

Here, the generator created by the result() method of the Average object returns the average across 16 snapshots in the frequency domain.

num_per_average = Int(64, desc='number of samples/snapshots to average over')#

The number of samples (time domain source) or snapshots (frequency domain source) to average over. Default is 64.

sample_freq = Property(depends_on=['source.sample_freq', 'num_per_average'])#

The sampling frequency of the output signal. It is set automatically as (sample_freq / num_per_average).

num_samples = Property(depends_on=['source.num_samples', 'num_per_average'])#

The number of samples (time domain) or snapshots (frequency domain) of the output signal. It is set automatically as (num_samples / num_per_average).

digest = Property(depends_on=['source.digest', 'num_per_average'])#

A unique identifier based on the class properties.

result(num)#

Generate averaged output blocks from the source data.

This method implements a Python generator that yields blocks of averaged data from the source. The averaging is performed over num_per_average samples (for time-domain sources) or snapshots (for frequency-domain sources). The size of the blocks yielded is defined by the num parameter.

Parameters:
numint

The number of averaged blocks to yield at a time. Each block contains the average over num_per_average time samples or frequency snapshots. The last block may be shorter than the specified size if the remaining data is insufficient.

Yields:
numpy.ndarray

A 2D NumPy array of shape (num, num_channels), where num is the number of averaged blocks requested, and num_channels corresponds to the number of channels in the source, as specified by num_channels. Each entry in the array is the average over num_per_average samples/snapshots.

Notes

  • The averaging operation depends on the source type:
  • The generator will stop yielding when the source data is exhausted.

  • If the source provides fewer than num * num_per_average samples, the final block may be smaller than the requested num size.

class acoular.process.Cache#

Bases: InOut

Cache the output of a source in a file to avoid redundant computations.

The Cache class stores the output of a source (derived from Generator) in a cache file within the Acoular cache directory. This enables faster reuse of precomputed data by avoiding time-consuming recalculations. The cache behavior is managed through the Config class by setting the global_caching attribute.

The class intelligently determines whether to use the cached data, update it, or bypass caching based on the global caching configuration and the state of the cache file. The caching mechanism supports scenarios such as:

  • Reading from a complete or incomplete cache.

  • Overwriting an existing cache.

  • Operating in a read-only or no-cache mode.

See also

InOut

Receive data from any source domain and return signals in the same domain.

Examples

Caching the output of an FFT computation:

>>> import acoular as ac
>>> import numpy as np
>>>
>>> ac.config.h5library = 'tables'
>>> data = np.random.rand(1024, 1)
>>> ts = ac.TimeSamples(data=data, sample_freq=51200)
>>> fft = ac.RFFT(source=ts, block_size=1024)
>>> cache = ac.Cache(source=fft)  # cache the output of the FFT in cache file
>>> for block in cache.result(num=1):  # read the cached data block-wise
...     print(block.shape)
[('void_cache.h5', 1)]
(1, 513)

Disabling caching globally:

>>> ac.config.global_caching = 'none'

Changing the cache directory:

>>> ac.config.cache_dir = '/path/to/cache_dir'
digest = Property(depends_on=['source.digest'])#

A unique identifier based on the cache properties.

result(num)#

Generate data blocks from the source, using cache when available.

This method acts as a Python generator that yields blocks of output data from the source, reading from the cache file when possible. The size of the data blocks is determined by the num parameter. The caching mechanism helps prevent redundant calculations by storing and reusing the source’s output.

Parameters:
numint

The number of time samples or frequency snapshots per block to yield. The final block may be smaller if there is insufficient data.

Yields:
numpy.ndarray

A 2D NumPy array of shape (num, num_channels) representing the output data. Each block is either retrieved from the cache file or generated by the source and cached dynamically during processing.

Notes

  • The behavior of the caching mechanism depends on the global_caching setting:

    • 'none': Bypasses caching and directly retrieves data from the source.

    • 'readonly': Reads data from the cache if available; otherwise, retrieves data from the source without caching.

    • 'overwrite': Replaces any existing cache with newly computed data.

  • If the cache file is incomplete or corrupted, the method may generate new data from the source to update the cache unless the caching mode is 'readonly'.

  • The cache node name is based on the source’s digest attribute.

class acoular.process.SampleSplitter#

Bases: InOut

Distribute data from a source to multiple connected objects in a block-wise manner.

The SampleSplitter class is designed to manage the distribution of data blocks from a single source object, derived from Generator, to multiple target objects, also derived from Generator. Each connected target object is assigned a dedicated buffer to hold incoming data blocks. These buffers operate in a first-in-first-out (FIFO) manner, ensuring efficient and parallelized data handling.

This class is particularly useful when distributing data blocks from a streaming source to multiple downstream processing objects.

Each registered target object maintains its own dedicated block buffer, allowing for independent data management. The buffer size can be customized per object, and different overflow handling strategies can be configured, such as raising an error, issuing a warning, or discarding old data. This ensures efficient parallel data processing, making it well-suited for complex workflows.

Notes

  • Buffers are dynamically created and managed for each registered object.

  • Buffer overflow behavior can be set individually for each target object.

Examples

Consider a time-domain signal stream where the FFT spectra and signal power are calculated block-by-block and in parallel using the RFFT, TimePower, and Average objects. The SampleSplitter is responsible for distributing incoming data blocks to the buffers of the RFFT and TimePower objects whenever either object requests data via the result() generator.

For the TimePower object, the buffer size is set to 10 blocks. If the buffer is full, an error is raised, as the buffer overflow treatment is set to 'error'. For the RFFT object, the buffer size is limited to 1 block, and the overflow treatment is set to 'none'. This setup helps reduce latency in FFT calculations, which may take longer than signal power calculations. If new data arrives and the RFFT buffer is full, the SampleSplitter will discard the oldest block, ensuring that the RFFT object always receives the most recent block of data.

>>> import acoular as ac
>>> import numpy as np
>>>
>>> # create a time domain signal source
>>> ts = ac.TimeSamples(data=np.random.rand(1024, 1), sample_freq=51200)
>>>
>>> # create the sample splitter object
>>> ss = ac.SampleSplitter(source=ts)
>>>
>>> # create the FFT spectra and further objects that receive the data
>>> fft = ac.RFFT(source=ss, block_size=64)
>>> pow = ac.TimePower(source=ss)
>>> avg = ac.Average(source=pow, num_per_average=64)
>>>
>>> # register the subsequent processing block objects at the sample splitter
>>> ss.register_object(fft, buffer_size=1, buffer_overflow_treatment='none')
>>> ss.register_object(pow, buffer_size=10, buffer_overflow_treatment='error')

After object registration, the SampleSplitter object is ready to distribute the data to the object buffers. The block buffers can be accessed via the block_buffer attribute of the SampleSplitter object.

>>> ss.block_buffer.values()
dict_values([deque([], maxlen=1), deque([], maxlen=10)])

Calling the result method of the FFT object will start the data collection and distribution process.

>>> generator = fft.result(num=1)
>>> fft_res = next(generator)

Although we haven’t called the result method of the signal power object, one data block is already available in the buffer.

>>> print(len(ss.block_buffer[pow]))
1

To remove registered objects from the SampleSplitter, use the remove_object() method.

>>> ss.remove_object(pow)
>>> print(len(ss.block_buffer))
1
block_buffer = Dict(key_trait=Instance(Generator))#

A dictionary containing block buffers for registered objects. Keys are the registered objects, and values are deque structures holding data blocks.

buffer_size = Union( #

The maximum number of blocks each buffer can hold. Can be set globally for all objects or individually using a dictionary.

buffer_overflow_treatment = Dict( #

Defines behavior when a buffer exceeds its maximum size.

register_object(*objects_to_register, buffer_size=None, buffer_overflow_treatment=None)#

Register one or more target objects to the SampleSplitter object.

This method creates and configures block buffers for the specified target objects, enabling them to receive data blocks from the SampleSplitter. Each registered object is assigned a dedicated buffer with customizable size and overflow behavior.

Parameters:
objects_to_registerGenerator or list of Generator

A single object or a list of objects derived from Generator to be registered as targets for data distribution.

buffer_sizeint, optional

The maximum number of data blocks each object’s buffer can hold. If not specified, the default buffer size (100 blocks) is used, or a globally defined size if buffer_size is a dictionary.

buffer_overflow_treatmentstr, optional

Defines the behavior when a buffer exceeds its maximum size. Options are:

  • 'error': Raises an IOError when the buffer overflows.

  • 'warning': Issues a warning and may result in data loss.

  • 'none': Silently discards the oldest data blocks to make room for new ones. If not specified, the default behavior is 'error'.

Raises:
OSError

If any of the specified objects is already registered.

remove_object(*objects_to_remove)#

Unregister one or more objects from the SampleSplitter.

This method removes the specified objects and their associated block buffers from the SampleSplitter. If no objects are specified, all currently registered objects are unregistered, effectively clearing all buffers.

Parameters:
objects_to_removeGenerator or list of Generator, optional

A single object or a list of objects derived from Generator to be removed from the SampleSplitter. If no objects are provided, all registered objects will be removed.

Raises:
KeyError

If any of the specified objects are not currently registered.

Notes

  • Once an object is removed, it will no longer receive data from the SampleSplitter.

  • Removing an object also clears its associated buffer.

result(num)#

Yield data blocks from the buffer to the calling object.

This generator method retrieves blocks of data for the calling object, either from its dedicated block buffer or by processing new data from the source. If the buffer is empty, new data blocks are generated and distributed to all registered objects in a block-wise manner.

Parameters:
numint

The size of each block to be yielded, defined as the number of samples per block.

Yields:
numpy.ndarray

Blocks of data with shape (num, num_channels). The last block may be shorter than num if the source data is exhausted.

Raises:
OSError

If the calling object is not registered with the SampleSplitter.

OSError

If the block buffer reaches its maximum size and the overflow handling policy is set to 'error'.

Notes

  • If the block buffer is empty, new data is fetched from the source and distributed to all registered objects.

  • Buffer overflow behavior is controlled by the buffer_overflow_treatment attribute, which can be set to 'error', 'warning', or 'none'.

class acoular.process.SamplesBuffer#

Bases: InOut

Handle buffering of samples from a source.

The SamplesBuffer class buffers samples from a source and provides them in blocks of a specified size. It supports various use cases for efficient handling of sample data. Below is an example demonstrating its functionality.

Examples

Suppose we want to draw blocks of 16 samples from the source, while ensuring that the buffer always holds twice that number (32 samples). The following code achieves this behavior:

>>> import acoular as ac
>>> import numpy as np
>>> # create a white noise source with 512 samples
>>> source = ac.TimeSamples(
...     data=ac.WNoiseGenerator(
...         sample_freq=64,
...         num_samples=512,
...     ).signal()[:, np.newaxis],
...     sample_freq=64,
... )
>>> # create a buffer with a size of 32 samples
>>> buffer = ac.process.SamplesBuffer(source=source, length=32)
>>> # get the first block of 16 samples
>>> block = next(buffer.result(num=16))
>>> np.testing.assert_array_equal(block, source.data[:16])

In the example above, the buffer initially collects blocks of the specified size from the source. It then returns the first block of 16 samples. With subsequent calls to the result() method, the buffer refills and returns additional blocks of 16 samples.

In some cases, you may wish to retrieve a different number of samples from the source than you want to return. This can be achieved by setting the source_num attribute. For example, in the BeamformerTimeTraj class, the number of time samples varies based on the expected delay for moving sources, while still adhering to the desired block size for the buffer.

The shift_index_by attribute controls how the buffer updates its index when retrieving data. If set to 'num', the buffer returns result_num samples but forgets 'num' samples from the buffer. If set to result_num, the buffer will return and forget the same number of samples.

>>> buffer = ac.process.SamplesBuffer(source=source, length=32, result_num=20, shift_index_by='num')
>>> block_sizes = []
>>> block_sizes.append(
...     next(buffer.result(num=16)).shape[0]
... )  # this time, the buffer will return 20 samples, but the buffer will only forget the first 16 samples
>>> buffer.result_num = 24
>>> block_sizes.append(
...     next(buffer.result(num=16)).shape[0]
... )  # this time, the buffer will return 24 samples, but the buffer will only forget the first 16 samples
>>> np.testing.assert_array_equal(block_sizes, [20, 24])
length = Int(desc='number of samples that fit in the buffer')#

The number of samples that the buffer can hold.

source_num = Union( #

The number of samples per block to obtain from the source. If set to None, the number of samples will be determined by the num argument of the result() method.

result_num = Union( #

The number of samples to return from the buffer. If set to None, the number of samples will be determined by the num argument of the result() method.

shift_index_by = Enum( #

Index shift value for the buffer.

  • If set to 'result_num', the buffer will return and forget result_num samples.

  • If set to 'num', the buffer will return result_num samples but forget num samples.

level = Property(desc='current filling level of buffer')#

The current filling level of the buffer, i.e., how many samples are currently available.

dtype = Any(desc='data type of the buffer')#

The data type of the elements in the buffer.

increase_buffer(num)#

Increase the size of the buffer by a specified number of samples.

This method expands the buffer by appending additional samples, effectively increasing its capacity. The new samples are initialized to zero. The index of the buffer is adjusted accordingly to accommodate the increase.

Parameters:
numint

The number of samples by which to increase the buffer size.

read_from_buffer(num)#

Read a specified number of samples from the buffer.

This method retrieves samples from the buffer, ensuring that the requested number of samples is returned. If the buffer contains fewer samples than requested, the method will return all available samples. The index of the buffer is updated based on the shift_index_by setting.

Parameters:
numint

The number of samples to read from the buffer.

Returns:
numpy.ndarray

A block of samples (array) from the buffer.

Notes

  • If the result_num attribute is set, it determines the number of samples to return.

  • The method ensures the buffer index is adjusted according to the shift_index_by setting. Options are:

    • 'result_num': The index will shift by the number of samples returned.

    • 'num': The index will shift by the number of samples requested (num).

fill_buffer(snum)#

Fill the buffer with samples from the source.

The fill_buffer() method collects samples from the source and writes them to the buffer. It continues to fill the buffer until there are enough samples available, or the source runs out of data. If the buffer reaches its maximum capacity, additional samples are discarded. The buffer will only contain the most recent data, and its index will be updated accordingly.

Parameters:
snumint

The number of samples to retrieve from the source in each iteration.

Yields:
None

This method is a generator and yields control back after filling the buffer.

Notes

  • The method ensures that the buffer is filled with the required number of samples, adjusting the buffer size if necessary (via the increase_buffer() method) when more space is needed.

  • Once the buffer is filled, it yields control and resumes only when the buffer is ready for more data.

result(num)#

Return blocks of samples from the buffer.

The result() method retrieves blocks of samples from the buffer and yields them to the calling process. The number of samples per block is determined by the num argument, but can also be influenced by other attributes like result_num (if set). If the buffer is not yet filled, it will continue to collect samples from the source until the buffer contains enough data. Once the buffer is full, it will return the requested blocks of samples.

Parameters:
numint

The number of samples to return in each block. This value specifies the size of the blocks to be yielded from the buffer.

Yields:
numpy.ndarray

A block of samples from the buffer. The size of the block is determined by the num parameter or the result_num attribute, depending on the buffer’s configuration.

Notes

  • If result_num is set, the method will use it to determine the number of samples returned instead of the num parameter.

  • If the buffer is empty or does not have enough samples, it will attempt to fill the buffer by collecting data from the source. If there are not enough samples available from the source, the method will yield whatever samples are left in the buffer.