SampleSplitter¶
- class acoular.process.SampleSplitter¶
Bases:
InOut
Distribute data from a source to multiple connected objects in a block-wise manner.
The
SampleSplitter
class is designed to manage the distribution of data blocks from a single source object, derived fromGenerator
, to multiple target objects, also derived fromGenerator
. Each connected target object is assigned a dedicated buffer to hold incoming data blocks. These buffers operate in a first-in-first-out (FIFO) manner, ensuring efficient and parallelized data handling.This class is particularly useful when distributing data blocks from a streaming source to multiple downstream processing objects.
Each registered target object maintains its own dedicated block buffer, allowing for independent data management. The buffer size can be customized per object, and different overflow handling strategies can be configured, such as raising an error, issuing a warning, or discarding old data. This ensures efficient parallel data processing, making it well-suited for complex workflows.
Notes
Buffers are dynamically created and managed for each registered object.
Buffer overflow behavior can be set individually for each target object.
Examples
Consider a time-domain signal stream where the FFT spectra and signal power are calculated block-by-block and in parallel using the
RFFT
,TimePower
, andAverage
objects. TheSampleSplitter
is responsible for distributing incoming data blocks to the buffers of theRFFT
andTimePower
objects whenever either object requests data via theresult()
generator.For the
TimePower
object, the buffer size is set to 10 blocks. If the buffer is full, an error is raised, as the buffer overflow treatment is set to'error'
. For theRFFT
object, the buffer size is limited to 1 block, and the overflow treatment is set to'none'
. This setup helps reduce latency in FFT calculations, which may take longer than signal power calculations. If new data arrives and theRFFT
buffer is full, theSampleSplitter
will discard the oldest block, ensuring that theRFFT
object always receives the most recent block of data.>>> import acoular as ac >>> import numpy as np >>> >>> # create a time domain signal source >>> ts = ac.TimeSamples(data=np.random.rand(1024, 1), sample_freq=51200) >>> >>> # create the sample splitter object >>> ss = ac.SampleSplitter(source=ts) >>> >>> # create the FFT spectra and further objects that receive the data >>> fft = ac.RFFT(source=ss, block_size=64) >>> pow = ac.TimePower(source=ss) >>> avg = ac.Average(source=pow, num_per_average=64) >>> >>> # register the subsequent processing block objects at the sample splitter >>> ss.register_object(fft, buffer_size=1, buffer_overflow_treatment='none') >>> ss.register_object(pow, buffer_size=10, buffer_overflow_treatment='error')
After object registration, the
SampleSplitter
object is ready to distribute the data to the object buffers. The block buffers can be accessed via theblock_buffer
attribute of theSampleSplitter
object.>>> ss.block_buffer.values() dict_values([deque([], maxlen=1), deque([], maxlen=10)])
Calling the result method of the FFT object will start the data collection and distribution process.
>>> generator = fft.result(num=1) >>> fft_res = next(generator)
Although we haven’t called the result method of the signal power object, one data block is already available in the buffer.
>>> print(len(ss.block_buffer[pow])) 1
To remove registered objects from the
SampleSplitter
, use theremove_object()
method.>>> ss.remove_object(pow) >>> print(len(ss.block_buffer)) 1
- block_buffer = Dict(key_trait=Instance(Generator))¶
A dictionary containing block buffers for registered objects. Keys are the registered objects, and values are deque structures holding data blocks.
- buffer_size = Union( …¶
The maximum number of blocks each buffer can hold. Can be set globally for all objects or individually using a dictionary.
- buffer_overflow_treatment = Dict( …¶
Defines behavior when a buffer exceeds its maximum size.
- register_object(*objects_to_register, buffer_size=None, buffer_overflow_treatment=None)¶
Register one or more target objects to the
SampleSplitter
object.This method creates and configures block buffers for the specified target objects, enabling them to receive data blocks from the
SampleSplitter
. Each registered object is assigned a dedicated buffer with customizable size and overflow behavior.- Parameters:
- objects_to_register
Generator
or list ofGenerator
A single object or a list of objects derived from
Generator
to be registered as targets for data distribution.- buffer_size
int
, optional The maximum number of data blocks each object’s buffer can hold. If not specified, the default buffer size (100 blocks) is used, or a globally defined size if
buffer_size
is a dictionary.- buffer_overflow_treatment
str
, optional Defines the behavior when a buffer exceeds its maximum size. Options are:
'error'
: Raises anIOError
when the buffer overflows.'warning'
: Issues a warning and may result in data loss.'none'
: Silently discards the oldest data blocks to make room for new ones. If not specified, the default behavior is'error'
.
- objects_to_register
- Raises:
OSError
If any of the specified objects is already registered.
- remove_object(*objects_to_remove)¶
Unregister one or more objects from the
SampleSplitter
.This method removes the specified objects and their associated block buffers from the
SampleSplitter
. If no objects are specified, all currently registered objects are unregistered, effectively clearing all buffers.- Parameters:
- objects_to_remove
Generator
or list ofGenerator
, optional A single object or a list of objects derived from
Generator
to be removed from theSampleSplitter
. If no objects are provided, all registered objects will be removed.
- objects_to_remove
- Raises:
KeyError
If any of the specified objects are not currently registered.
Notes
Once an object is removed, it will no longer receive data from the
SampleSplitter
.Removing an object also clears its associated buffer.
- result(num)¶
Yield data blocks from the buffer to the calling object.
This generator method retrieves blocks of data for the calling object, either from its dedicated block buffer or by processing new data from the source. If the buffer is empty, new data blocks are generated and distributed to all registered objects in a block-wise manner.
- Parameters:
- num
int
The size of each block to be yielded, defined as the number of samples per block.
- num
- Yields:
numpy.ndarray
Blocks of data with shape
(num, num_channels)
. The last block may be shorter thannum
if the source data is exhausted.
- Raises:
OSError
If the calling object is not registered with the
SampleSplitter
.OSError
If the block buffer reaches its maximum size and the overflow handling policy is set to
'error'
.
Notes
If the block buffer is empty, new data is fetched from the source and distributed to all registered objects.
Buffer overflow behavior is controlled by the
buffer_overflow_treatment
attribute, which can be set to'error'
,'warning'
, or'none'
.