Note
Go to the end to download the full example code.
Parallel processing chains – SampleSplitter buffer handling.¶
This example shows the different behaviour of SampleSplitter class when the maximum size of a block buffer is reached for one object obtaining data.
- Three different settings are available for the buffer overflow behaviour:
none: no warning, no error
warning: a warning appears
error: an error is raised
import threading
from time import sleep
import acoular as ac
import numpy as np
Set up data source. For convenience, we use a synthetic white noise with length of 1 s.
fs = 16000
ts = ac.TimeSamples(data=np.random.randn(fs * 1)[:, np.newaxis], sample_freq=fs)
Connect SampleSplitter to data source. We limit the buffer size to 5 blocks.
ss = ac.SampleSplitter(source=ts, buffer_size=5)
Create three objects to process the data
tp1 = ac.TimePower(source=ss)
tp2 = ac.TimePower(source=ss)
# register these objects at SampleSplitter
ss.register_object(tp1, tp2) # register objects
Define some useful functions for inspecting and for reading data from the SampleSplitter buffers. Three different functions are defined to simulate different processing speeds (fast, slow).
def print_number_of_blocks_in_block_buffers():
"""Prints the number of data blocks in SampleSplitter-buffers. For each
subsequent object, a buffer exist.
"""
buffers = list(ss.block_buffer.values())
elements = [len(buf) for buf in buffers]
print(f"num blocks in buffers: {dict(zip(['tp1','tp2'], elements))}")
def get_data_fast(obj): # not time consuming function
"""Gets data fast (pause 0.01 seconds)"""
for _ in obj.result(2048): #
print('tp1 calls sample splitter')
print_number_of_blocks_in_block_buffers()
sleep(0.01)
def get_data_slow(obj): # more time consuming function
"""Gets data slow (pause 0.1 seconds)"""
for i in obj.result(2048): #
print('tp2 calls sample splitter')
print_number_of_blocks_in_block_buffers()
sleep(0.1)
Prepare and start processing in threads (no warning or error when block buffer is full)
print("buffer overflow behaviour == 'none'")
ss.buffer_overflow_treatment[tp1] = 'none'
ss.buffer_overflow_treatment[tp2] = 'none'
worker1 = threading.Thread(target=get_data_fast, args=(tp1,))
worker2 = threading.Thread(target=get_data_slow, args=(tp2,))
print('start threads')
worker1.start()
worker2.start()
worker1.join()
worker2.join()
print('threads finished')
buffer overflow behaviour == 'none'
start threads
tp1 calls sample splitter
num blocks in buffers: {'tp1': 0, 'tp2': 1}
tp2 calls sample splitter
num blocks in buffers: {'tp1': 0, 'tp2': 0}
tp1 calls sample splitter
num blocks in buffers: {'tp1': 0, 'tp2': 1}
tp1 calls sample splitter
num blocks in buffers: {'tp1': 0, 'tp2': 2}
tp1 calls sample splitter
num blocks in buffers: {'tp1': 0, 'tp2': 3}
tp1 calls sample splitter
num blocks in buffers: {'tp1': 0, 'tp2': 4}
tp1 calls sample splitter
num blocks in buffers: {'tp1': 0, 'tp2': 5}
tp1 calls sample splitter
num blocks in buffers: {'tp1': 0, 'tp2': 5}
tp1 calls sample splitter
num blocks in buffers: {'tp1': 0, 'tp2': 5}
tp2 calls sample splitter
num blocks in buffers: {'tp1': 0, 'tp2': 4}
tp2 calls sample splitter
num blocks in buffers: {'tp1': 0, 'tp2': 3}
tp2 calls sample splitter
num blocks in buffers: {'tp1': 0, 'tp2': 2}
tp2 calls sample splitter
num blocks in buffers: {'tp1': 0, 'tp2': 1}
tp2 calls sample splitter
num blocks in buffers: {'tp1': 0, 'tp2': 0}
threads finished
Prepare and start processing in threads (only warning when block buffer is full)
print("buffer overflow behaviour == 'warning'")
ss.buffer_overflow_treatment[tp1] = 'warning'
ss.buffer_overflow_treatment[tp2] = 'warning'
worker1 = threading.Thread(target=get_data_fast, args=(tp1,))
worker2 = threading.Thread(target=get_data_slow, args=(tp2,))
print('start threads')
worker1.start()
worker2.start()
worker1.join()
worker2.join()
print('threads finished')
buffer overflow behaviour == 'warning'
start threads
tp1 calls sample splitter
num blocks in buffers: {'tp1': 0, 'tp2': 1}
tp2 calls sample splitter
num blocks in buffers: {'tp1': 0, 'tp2': 0}
tp1 calls sample splitter
num blocks in buffers: {'tp1': 0, 'tp2': 1}
tp1 calls sample splitter
num blocks in buffers: {'tp1': 0, 'tp2': 2}
tp1 calls sample splitter
num blocks in buffers: {'tp1': 0, 'tp2': 3}
tp1 calls sample splitter
num blocks in buffers: {'tp1': 0, 'tp2': 4}
tp1 calls sample splitter
num blocks in buffers: {'tp1': 0, 'tp2': 5}
/home/runner/work/acoular/acoular/acoular/tprocess.py:2232: UserWarning: overfilled buffer for object: <acoular.tprocess.TimePower object at 0x7f3a6c18fb30> data will get lost
warn('overfilled buffer for object: %s data will get lost' % obj, UserWarning, stacklevel=1)
tp1 calls sample splitter
num blocks in buffers: {'tp1': 0, 'tp2': 5}
tp1 calls sample splitter
num blocks in buffers: {'tp1': 0, 'tp2': 5}
tp2 calls sample splitter
num blocks in buffers: {'tp1': 0, 'tp2': 4}
tp2 calls sample splitter
num blocks in buffers: {'tp1': 0, 'tp2': 3}
tp2 calls sample splitter
num blocks in buffers: {'tp1': 0, 'tp2': 2}
tp2 calls sample splitter
num blocks in buffers: {'tp1': 0, 'tp2': 1}
tp2 calls sample splitter
num blocks in buffers: {'tp1': 0, 'tp2': 0}
threads finished
Prepare and start processing in threads (raise error when block buffer is full)
print("buffer overflow behaviour == 'error'")
ss.buffer_overflow_treatment[tp1] = 'error'
ss.buffer_overflow_treatment[tp2] = 'error'
worker1 = threading.Thread(target=get_data_fast, args=(tp1,))
worker2 = threading.Thread(target=get_data_slow, args=(tp2,))
print('start threads')
worker1.start()
worker2.start()
worker1.join()
worker2.join()
print('threads finished')
buffer overflow behaviour == 'error'
start threads
tp1 calls sample splitter
num blocks in buffers: {'tp1': 0, 'tp2': 1}
tp2 calls sample splitter
num blocks in buffers: {'tp1': 0, 'tp2': 0}
tp1 calls sample splitter
num blocks in buffers: {'tp1': 0, 'tp2': 1}
tp1 calls sample splitter
num blocks in buffers: {'tp1': 0, 'tp2': 2}
tp1 calls sample splitter
num blocks in buffers: {'tp1': 0, 'tp2': 3}
tp1 calls sample splitter
num blocks in buffers: {'tp1': 0, 'tp2': 4}
tp1 calls sample splitter
num blocks in buffers: {'tp1': 0, 'tp2': 5}
Exception in thread Thread-9 (get_data_fast):
Traceback (most recent call last):
File "/opt/hostedtoolcache/Python/3.12.4/x64/lib/python3.12/threading.py", line 1073, in _bootstrap_inner
Exception in thread Thread-10 (get_data_slow):
Traceback (most recent call last):
File "/opt/hostedtoolcache/Python/3.12.4/x64/lib/python3.12/threading.py", line 1073, in _bootstrap_inner
self.run()
File "/opt/hostedtoolcache/Python/3.12.4/x64/lib/python3.12/threading.py", line 1010, in run
self._target(*self._args, **self._kwargs)
File "/home/runner/work/acoular/acoular/examples/io_and_signal_processing_examples/example_sample_splitter_bufferhandling.py", line 71, in get_data_slow
self.run()
File "/opt/hostedtoolcache/Python/3.12.4/x64/lib/python3.12/threading.py", line 1010, in run
self._target(*self._args, **self._kwargs)
File "/home/runner/work/acoular/acoular/examples/io_and_signal_processing_examples/example_sample_splitter_bufferhandling.py", line 63, in get_data_fast
for _ in obj.result(2048): #
File "/home/runner/work/acoular/acoular/acoular/tprocess.py", line 1417, in result
for i in obj.result(2048): #
File "/home/runner/work/acoular/acoular/acoular/tprocess.py", line 1417, in result
for temp in self.source.result(num):
File "/home/runner/work/acoular/acoular/acoular/tprocess.py", line 2299, in result
for temp in self.source.result(num):
File "/home/runner/work/acoular/acoular/acoular/tprocess.py", line 2299, in result
raise OSError(msg)
OSError: Maximum size of block buffer is reached!
raise OSError(msg)
OSError: Maximum size of block buffer is reached!
threads finished
Total running time of the script: (0 minutes 1.358 seconds)