protocols#

This module contains the definitions of several typed dictionaries that store data produced or required by OM’s functions and classes.

class OmDataSourceProtocol(*, data_source_name, parameters, additional_info)#

Protocol for OM’s Data Source classes.

Data Sources are classes that perform all the operations needed to retrieve data from a single specific sensor or detector. A Data Source class can refer to any type of detector, from a simple diode or wave digitizer, to a big x-ray or optical detector.

This Protocol class describes the interface that every Data Source class in OM must implement.

A Data Source class must be initialized with the full set of OM’s configuration parameters, from which it extracts information about the sensor or detector. An identifying name for the sensor must also be provided.

Parameters:
  • data_source_name (str) – A name that identifies the current data source. It is used, for example, in communications with the user or for the retrieval of a sensor’s initialization

  • parameters (DataSourceParameters) – An object storing OM’s configuration parameters.

  • additional_info (dict[str, Any])

initialize_data_source()#

Data source initialization.

This method prepares OM to retrieve data from the sensor or detector, reading all the necessary configuration parameters and retrieving any additional required external data.

Return type:

None

get_data(*, event)#

Data Retrieval.

This function retrieves all the data generated by the sensor or detector for the provided data event.

Parameters:

event (dict[str, Any]) – A dictionary storing the event data.

Returns:

Data from the sensor.

Return type:

Any

class OmDataEventHandlerProtocol(*, source, parameters)#

Protocol class for OM’s Data Event Handler classes.

Data Event Handlers are classes that deal with data events and their sources. They have methods to initialize data event sources, retrieve events from them, open and close events, and examine the events’ content.

This Protocol class describes the interface that every Data Event Handler class in OM must implement.

A Data Event Handler class must be initialized with a string describing its data event source, and with a set of Data Source class instances that instruct the Data Event Handler on how to retrieve data from the events.

Parameters:
  • source (str) – A string describing the data event source.

  • data_sources

    A dictionary containing a set of Data Source class instances.

    • Each dictionary key must define the name of a data source.

    • The corresponding dictionary value must store the instance of the [Data Source class][om.protocols.data_retrieval_layer.OmDataSourceProtocol] # noqa: E501 that describes the data source.

  • parameters (DataRetrievalLayerParameters) – An object storing OM’s configuration parameters.

designated_collector_rank()#
Return type:

Literal[‘first’, ‘last’]

skip_rank_finalization()#
Return type:

bool

initialize_event_handling_on_collecting_node(*, node_rank, node_pool_size)#

Initializes event handling on the collecting node.

This function is called on the collecting node when OM starts, and initializes the event handling on the node.

Parameters:
  • node_rank (int) – The rank, in the OM pool, of the processing node calling the function.

  • node_pool_size (int) – The total number of nodes in the OM pool, including all the processing nodes and the collecting node.

Return type:

None

initialize_event_handling_on_processing_node(*, node_rank, node_pool_size)#

Initializes event handling on a processing node.

This function is called on a processing node when OM starts. It configures the node to start retrieving and processing data events, and initializes all the relevant Data Sources.

Parameters:
  • node_rank (int) – The rank, in the OM pool, of the processing node calling the function.

  • node_pool_size (int) – The total number of nodes in the OM pool, including all the processing nodes and the collecting node.

Return type:

None

event_generator(*, node_rank, node_pool_size)#

Retrieves events from the source.

This function retrieves a series of data events from a source. OM calls this function on each processing node to start retrieving events. The function, which is a generator, returns an iterator over the events that the calling node must process.

#TODO: Fix documentation

Parameters:
  • node_rank (int) – The rank, in the OM pool, of the processing node calling the function.

  • node_pool_size (int) – The total number of nodes in the OM pool, including all the processing nodes and the collecting node.

Yields:

A dictionary storing the data for the current event.

Return type:

Generator[dict[str, Any], None, None]

extract_data(*, event)#

Extracts data from a frame stored in an event.

This function extracts data from a data event. It works by calling, one after the other, the get_data function of each Data Source associated with the event, passing the event itself as input each time. Each function call returns the data extracted from the Data Source. All the retrieved data items are finally aggregated and returned.

Parameters:

event (dict[str, Any]) – A dictionary storing the event data.

Returns:

A dictionary storing the extracted data.

  • Each dictionary key identifies a Data Source in the event for which data has been retrieved.

  • The corresponding dictionary value stores the data that could be extracted from the Data Source for the provided event.

Return type:

dict[str, Any]

initialize_event_data_retrieval()#

Initializes frame data retrieval.

This function initializes the retrieval of data for a single standalone data event from a data event source, with all its related information. The way this function operates is in contrast with the way OM usually works. OM usually retrieves a series of events in sequence, one after the other. This function retrieves a single event, separated from all others.

This function can be called on any type of node in OM and even outside of an OnDA Monitor class instance. It prepares the system to retrieve the event data, it initializes the relevant Data Sources, etc.

After this function has been called, data for single events can be retrieved by invoking the [retrieve_event_data][om.protocols.data_retrieval_layer.OmDataEventHandlerProtocol.retrieve_event_data] function.

Return type:

None

retrieve_event_data(event_id)#

Retrieves all data attached to the requested data event.

This function retrieves all the information associated with the data event specified by the provided identifier. The data is returned in the form of a dictionary.

Before this function can be called, frame data retrieval must be initialized by calling the [initialize_event_data_retrieval][om.protocols.data_retrieval_layer.OmDataEventHandlerProtocol.initialize_event_data_retrieval] function.

Parameters:

event_id (str) – A string that uniquely identifies a data event.

Returns:

All data related to the requested data event.

Return type:

dict[str, Any]

class OmProcessingProtocol(*, parameters)#

Protocol for OM’s Processing classes.

Processing classes implement scientific data processing pipelines in OM. A Processing class defines how each individual retrieved data event is analyzed on the processing nodes and how multiple events are aggregated on the collecting node. A Processing class also determined which actions OM performs at the beginning and at the end of the data processing.

This Protocol class describes the interface that every Processing class in OM must implement.

Parameters:

parameters (MonitorParameters) – An object storing OM’s configuration parameters.

initialize_processing_node(*, node_rank, node_pool_size)#

Initializes an OM processing node.

This function is invoked on each processing node when OM starts. It performs all the operations needed to prepares the node to retrieve and process data events (recovering additional needed external data, initializing the algorithms with all required parameters, etc.)

Parameters:
  • node_rank (int) – The OM rank of the current node int the OM node pool. The rank is an integer that unambiguously identifies the node in the pool.

  • node_pool_size (int) – The total number of nodes in the OM pool, including all the processing nodes and the collecting node.

Return type:

None

initialize_collecting_node(*, node_rank, node_pool_size)#

Initializes an OM collecting node.

This function is invoked on the collecting node when OM starts. It performs all the operation needed to prepare the collecting node to aggregate events received from the processing nodes (creating memory buffers, initializing the collecting algorithm, etc.)

Parameters:
  • node_rank (int) – The OM rank of the current node int the OM node pool. The rank is an integer that unambiguously identifies the node in the pool.

  • node_pool_size (int) – The total number of nodes in the OM pool, including all the processing nodes and the collecting node.

Return type:

None

process_data(*, node_rank, node_pool_size, data)#

Processes a single data event.

This function is invoked on each processing node for every retrieved data event. It receives the data event as input and returns processed data. The output of this function is transferred by OM to the collecting node.

Parameters:
  • node_rank (int) – The OM rank of the current node int the OM node pool. The rank is an integer that unambiguously identifies the node in the pool.

  • node_pool_size (int) – The total number of nodes in the OM pool, including all the processing nodes and the collecting node.

  • data (dict[str, Any]) –

    A dictionary containing the data retrieved by OM for the data event being processed.

    • The dictionary keys must be the names of the Data Sources for which OM retrieves data. The keys in this dictionary must match the Data Source names listed in the required_data entry of OM’s om configuration parameter group.

    • The corresponding dictionary values must store the the data that OM retrieved for each of the Data Sources.

Returns:

A tuple with two entries, with the first entry being a dictionary storing

the processed data that should be sent to the collecting node, and the second being the OM rank number of the node that processed the information.

Return type:

tuple[dict[str, Any], int]

wait_for_data(*, node_rank, node_pool_size)#

Performs operations on the collecting node when no data is received.

This function is called on the collecting node continuously, when the node is not receiving data from any processing node (When data is received, the [collect_data][om.protocols.processing_layer.OmProcessingProtocol.collect_data] is invoked instead). This function can be used to perform operations that need to be carried out even when the data stream is not active (reacting to external commands and requests, updating graphical interfaces, etc.)

Parameters:
  • node_rank (int) – The OM rank of the current node, which is an integer that unambiguously identifies the current node in the OM node pool.

  • node_pool_size (int) – The total number of nodes in the OM pool, including all the processing nodes and the collecting node.

Return type:

None

collect_data(*, node_rank, node_pool_size, processed_data)#

Collects processed data from a processing node.

This function is invoked on the collecting node every time data is received from a processing node (When data is not being received, the collecting node continuously calls the [wait_for_data][om.protocols.processing_layer.OmProcessingProtocol.wait_for_data] function instead). The function accepts as input the data received from the processing node (the tuple returned by the [process_data][om.protocols.processing_layer.OmProcessingProtocol.process_data] method of this class), and performs calculations that must be carried out on aggregated data (computing cumulative statistics, preparing data for external programs or visualization, etc.)

The function usually does not return any value, but can optionally return a nested dictionary (a dictionary whose values are other dictionaries). When this happens, the data in the dictionary is provided as feedback data to the processing nodes. The nested dictionary must have the following format:

  • The keys of the outer dictionary must match the OM rank numbers of the processing nodes which receive the feedback data. A key value of 0 can be used to send feedback data to all the processing nodes at the same time.

  • The value corresponding to each key of the outer dictionary must in turn be a dictionary that stores the feedback data that is sent to the node defined by the key.

  • On each processing node, the feedback data dictionary, when received, is merged with the data argument of the [process_data][om.protocols.processing_layer.OmProcessingProtocol.process_data] function the next time the function is called.

Parameters:
  • node_rank (int) – The OM rank of the current node, which is an integer that unambiguously identifies the current node in the OM node pool.

  • node_pool_size (int) – The total number of nodes in the OM pool, including all the processing nodes and the collecting node.

  • processed_data (tuple[dict, int]) – A tuple whose first entry is a dictionary storing the data received from a processing node, and whose second entry is the OM rank number of the node that processed the information.

Returns:

Usually nothing. Optionally, a nested dictionary that can be used to send

feedback data to the processing nodes.

Return type:

dict[str, dict[str, Any]] | None

end_processing_on_processing_node(*, node_rank, node_pool_size)#

Executes end-of-processing actions on a processing node.

This function is called on each processing node at the end of the data processing, immediately before OM stops. It performs clean up and shut down operations (closing communication sockets, computing final statistics, etc.). This function usually does not return any value, but can optionally return a dictionary. If this happens, the dictionary is transferred to the collecting node before the processing node shuts down.

Parameters:
  • node_rank (int) – The OM rank of the current node, which is an integer that unambiguously identifies the current node in the OM node pool.

  • node_pool_size (int) – The total number of nodes in the OM pool, including all the processing nodes and the collecting node.

Returns:

Usually nothing. Optionally, a dictionary storing information that must be

sent to the processing node.

Return type:

dict[str, Any] | None

end_processing_on_collecting_node(*, node_rank, node_pool_size)#

Executes end-of-processing actions on the collecting node.

This function is called on the collecting node at the end of the data processing, immediately before OM stops. It often performs clean up and shut operations (closing communication sockets, computing final statistics, etc.).

Parameters:
  • node_rank (int) – The OM rank of the current node, which is an integer that unambiguously identifies the current node in the OM node pool.

  • node_pool_size (int) – The total number of nodes in the OM pool, including all the processing nodes and the collecting node.

Return type:

None

class OmParallelizationProtocol(*, data_retrieval_layer, processing_layer, parameters)#

Protocol for OM’s Parallelization classes.

Parallelization classes orchestrate OM’s processing and collecting nodes, and take care of the communication between them.

  • When OM start, a Parallelization class instance initializes several processing nodes, plus a single collecting node. The class then associates an instance of a Data Retrieval class (see [OmDataRetrievalProtocol][om.protocols.data_retrieval_layer.OmDataRetrievalProtocol]) # noqa: E501 and an instance of a Processing class (see [OmProcessingProtocol][om.protocols.processing_layer.OmProcessingProtocol]) with each node.

  • Each processing node retrieves an event from a data event source by calling the relevant Data Retrieval class methods. It then invokes the appropriate Processing class methods on the event. Finally, it transfers the processed data to the collecting node. The node then retrieves another event, and the cycle continues until there are no more data events or OM shuts down.

  • Every time it receives data from a processing node, the collecting node invokes the relevant Processing class methods to aggregate the received data.

  • When all events from the source have been processed, all nodes perform some final clean-up tasks by calling the appropriate methods of the Processing class. All nodes then shut down.

This Protocol class describes the interface that every Parallelization class in OM must implement.

Parameters:
start()#

Starts OM.

This function begins operations on the processing and collecting nodes.

When this function is called on a processing node, the processing node starts retrieving data events and processing them. When instead this function is called on the collecting node, the node starts receiving data from the processing nodes and aggregating it.

Return type:

None

shutdown(*, msg='Reason not provided.')#

Shuts down OM.

This function stops the processing and collecting nodes.

When this function is called on a processing node, the processing node communicates to the collecting node that it is shutting down, then shuts down. When instead this function is called on the collecting node, the collecting node tells every processing node to shut down, waits for all the nodes to confirm that they have stopped operating, then shuts itself down.

Parameters:

msg (str) – Reason for shutting down. Defaults to “Reason not provided”.

Return type:

None

class OmPeakDetectionProtocol(parameters)#
Parameters:

parameters (dict[str, Any])

find_peaks(*, data)#
Parameters:

data (ndarray[Any, dtype[int64 | float64]])

Return type:

PeakList