Acoular 25.04 documentation

SampleSplitter

«  Cache   ::   process   ::   TimeAverage  »

SampleSplitter

class acoular.process.SampleSplitter

Bases: InOut

Distribute data from a source to multiple connected objects in a block-wise manner.

The SampleSplitter class is designed to manage the distribution of data blocks from a single source object, derived from Generator, to multiple target objects, also derived from Generator. Each connected target object is assigned a dedicated buffer to hold incoming data blocks. These buffers operate in a first-in-first-out (FIFO) manner, ensuring efficient and parallelized data handling.

This class is particularly useful when distributing data blocks from a streaming source to multiple downstream processing objects.

Each registered target object maintains its own dedicated block buffer, allowing for independent data management. The buffer size can be customized per object, and different overflow handling strategies can be configured, such as raising an error, issuing a warning, or discarding old data. This ensures efficient parallel data processing, making it well-suited for complex workflows.

Notes

  • Buffers are dynamically created and managed for each registered object.

  • Buffer overflow behavior can be set individually for each target object.

Examples

Consider a time-domain signal stream where the FFT spectra and signal power are calculated block-by-block and in parallel using the RFFT, TimePower, and Average objects. The SampleSplitter is responsible for distributing incoming data blocks to the buffers of the RFFT and TimePower objects whenever either object requests data via the result() generator.

For the TimePower object, the buffer size is set to 10 blocks. If the buffer is full, an error is raised, as the buffer overflow treatment is set to 'error'. For the RFFT object, the buffer size is limited to 1 block, and the overflow treatment is set to 'none'. This setup helps reduce latency in FFT calculations, which may take longer than signal power calculations. If new data arrives and the RFFT buffer is full, the SampleSplitter will discard the oldest block, ensuring that the RFFT object always receives the most recent block of data.

>>> import acoular as ac
>>> import numpy as np
>>>
>>> # create a time domain signal source
>>> ts = ac.TimeSamples(data=np.random.rand(1024, 1), sample_freq=51200)
>>>
>>> # create the sample splitter object
>>> ss = ac.SampleSplitter(source=ts)
>>>
>>> # create the FFT spectra and further objects that receive the data
>>> fft = ac.RFFT(source=ss, block_size=64)
>>> pow = ac.TimePower(source=ss)
>>> avg = ac.Average(source=pow, num_per_average=64)
>>>
>>> # register the subsequent processing block objects at the sample splitter
>>> ss.register_object(fft, buffer_size=1, buffer_overflow_treatment='none')
>>> ss.register_object(pow, buffer_size=10, buffer_overflow_treatment='error')

After object registration, the SampleSplitter object is ready to distribute the data to the object buffers. The block buffers can be accessed via the block_buffer attribute of the SampleSplitter object.

>>> ss.block_buffer.values()
dict_values([deque([], maxlen=1), deque([], maxlen=10)])

Calling the result method of the FFT object will start the data collection and distribution process.

>>> generator = fft.result(num=1)
>>> fft_res = next(generator)

Although we haven’t called the result method of the signal power object, one data block is already available in the buffer.

>>> print(len(ss.block_buffer[pow]))
1

To remove registered objects from the SampleSplitter, use the remove_object() method.

>>> ss.remove_object(pow)
>>> print(len(ss.block_buffer))
1
block_buffer = Dict(key_trait=Instance(Generator))

A dictionary containing block buffers for registered objects. Keys are the registered objects, and values are deque structures holding data blocks.

buffer_size = Union(

The maximum number of blocks each buffer can hold. Can be set globally for all objects or individually using a dictionary.

buffer_overflow_treatment = Dict(

Defines behavior when a buffer exceeds its maximum size.

register_object(*objects_to_register, buffer_size=None, buffer_overflow_treatment=None)

Register one or more target objects to the SampleSplitter object.

This method creates and configures block buffers for the specified target objects, enabling them to receive data blocks from the SampleSplitter. Each registered object is assigned a dedicated buffer with customizable size and overflow behavior.

Parameters:
objects_to_registerGenerator or list of Generator

A single object or a list of objects derived from Generator to be registered as targets for data distribution.

buffer_sizeint, optional

The maximum number of data blocks each object’s buffer can hold. If not specified, the default buffer size (100 blocks) is used, or a globally defined size if buffer_size is a dictionary.

buffer_overflow_treatmentstr, optional

Defines the behavior when a buffer exceeds its maximum size. Options are:

  • 'error': Raises an IOError when the buffer overflows.

  • 'warning': Issues a warning and may result in data loss.

  • 'none': Silently discards the oldest data blocks to make room for new ones. If not specified, the default behavior is 'error'.

Raises:
OSError

If any of the specified objects is already registered.

remove_object(*objects_to_remove)

Unregister one or more objects from the SampleSplitter.

This method removes the specified objects and their associated block buffers from the SampleSplitter. If no objects are specified, all currently registered objects are unregistered, effectively clearing all buffers.

Parameters:
objects_to_removeGenerator or list of Generator, optional

A single object or a list of objects derived from Generator to be removed from the SampleSplitter. If no objects are provided, all registered objects will be removed.

Raises:
KeyError

If any of the specified objects are not currently registered.

Notes

  • Once an object is removed, it will no longer receive data from the SampleSplitter.

  • Removing an object also clears its associated buffer.

result(num)

Yield data blocks from the buffer to the calling object.

This generator method retrieves blocks of data for the calling object, either from its dedicated block buffer or by processing new data from the source. If the buffer is empty, new data blocks are generated and distributed to all registered objects in a block-wise manner.

Parameters:
numint

The size of each block to be yielded, defined as the number of samples per block.

Yields:
numpy.ndarray

Blocks of data with shape (num, num_channels). The last block may be shorter than num if the source data is exhausted.

Raises:
OSError

If the calling object is not registered with the SampleSplitter.

OSError

If the block buffer reaches its maximum size and the overflow handling policy is set to 'error'.

Notes

  • If the block buffer is empty, new data is fetched from the source and distributed to all registered objects.

  • Buffer overflow behavior is controlled by the buffer_overflow_treatment attribute, which can be set to 'error', 'warning', or 'none'.

«  Cache   ::   process   ::   TimeAverage  »