aiflows.base_flows package¶
Submodules¶
aiflows.base_flows.abstract module¶
- class aiflows.base_flows.abstract.Flow(flow_config: Dict[str, Any], cl: CoLink = None)¶
Bases:
ABC
Abstract class inherited by all Flows.
- Parameters:
flow_config (Dict[str, Any]) – The configuration of the flow
- REQUIRED_KEYS_CONFIG = ['name', 'description']¶
- SUPPORTS_CACHING = False¶
- cl: CoLink¶
- flow_config: Dict[str, Any]¶
- flow_state: Dict[str, Any]¶
- classmethod get_config(**overrides)¶
Returns the default config for the flow, with the overrides applied. The default implementation construct the default config by recursively merging the configs of the base classes.
- Parameters:
overrides (Dict[str, Any], optional) – The parameters to override in the default config
- Returns:
The default config with the overrides applied
- Return type:
Dict[str, Any]
- get_flow_state()¶
Returns the flow state.
- Returns:
The flow state
- Return type:
Dict[str, Any]
- get_instance_id()¶
- get_interface_description()¶
Returns the input and output interface description of the flow.
- get_reply(**kw)¶
- get_reply_future(**kw)¶
- classmethod instantiate_from_config(config)¶
Instantiates the flow from the given config.
- Parameters:
config (Dict[str, Any]) – The config to instantiate the flow from
- Returns:
The instantiated flow
- Return type:
aiflows.flow.Flow
- classmethod instantiate_from_default_config(**overrides: Dict[str, Any] | None)¶
Instantiates the flow from the default config, with the given overrides applied.
- Parameters:
overrides (Dict[str, Any], optional) – The parameters to override in the default config
- Returns:
The instantiated flow
- Return type:
aiflows.flow.Flow
- classmethod instantiate_with_overrides(overrides)¶
Instantiates the flow with the given overrides.
- Parameters:
overrides (Dict[str, Any], optional) – The parameters to override in the default config
- Returns:
The instantiated flow
- local_proxy_invocations: Dict[str, Any] = {}¶
- property name¶
Returns the name of the flow
- Returns:
The name of the flow
- Return type:
str
- package_input_message(data: Dict[str, Any], dst_flow: str = 'unknown', reply_data: Dict[str, Any] = {'mode': 'no_reply'})¶
Packages the given payload into an FlowMessage.
- Parameters:
data (Dict[str, Any]) – The data dictionary to package
dst_flow (str) – The destination flow
- Returns:
The packaged input message
- Return type:
- package_output_message(input_message: FlowMessage, response: Dict[str, Any] | FlowMessage)¶
Packages the given response into an FlowMessage.
- Parameters:
input_message (FlowMessage) – The input message that was used to generate the response
response (Dict[str, Any]) – The response to package
- Returns:
The packaged output message
- Return type:
- reset(full_reset: bool, recursive: bool, src_flow: Flow | str | None = 'Launcher')¶
Reset the flow state. If recursive is True, reset all subflows as well.
- Parameters:
full_reset – If True, remove all data in flow_state. If False, keep the data in flow_state.
recursive – If True, reset all subflows as well.
src_flow – The flow that initiated the reset
- Returns:
- run(input_message: FlowMessage) None ¶
Runs the flow on the given input data. (Not implemented in the base class)
- Parameters:
input_message (FlowMessage) – The input message to run the flow on
- send_message(**kw)¶
- set_colink(cl, recursive=True)¶
Sets the colink object for the flow and all its subflows.
- Parameters:
cl (CL.CoLink) – The colink object to set
recursive (bool) – Whether to set the colink for all subflows as well
- set_up_flow_state()¶
Sets up the flow state. This method is called when the flow is instantiated, and when the flow is reset.
- classmethod type()¶
aiflows.base_flows.atomic module¶
- class aiflows.base_flows.atomic.AtomicFlow(**kwargs)¶
Bases:
Flow
,ABC
AtomicFlow is the minimal execution unit in the Flow framework. It is an encapsulation of a single functionality that takes an input message and returns an output message.
- Parameters:
**kwargs – Arguments to be passed to the Flow constructor
- classmethod type()¶
Returns the type of the flow.
aiflows.base_flows.composite module¶
- class aiflows.base_flows.composite.CompositeFlow(flow_config: Dict[str, Any], subflows: List[Flow])¶
Bases:
Flow
,ABC
This class implements a composite flow. It is a flow that consists of multiple sub-flows. It is the a parent class for BranchingFlow, SequentialFlow and CircularFlow. Note that the run method of a CompositeFlow is not implemented.
- Parameters:
flow_config (Dict[str, Any]) – The configuration of the flow. It must usually contain the following keys: - “subflows_config” (Dict[str,Any]): A dictionary of subflows configurations.The keys are the names of the subflows and the values are the configurations of the subflows. This is necessary when instantiating the flow from a config file. - The parameters required by the constructor of the parent class Flow
subflows (List[Flow]) – A list of subflows. This is necessary when instantiating the flow programmatically.
- REQUIRED_KEYS_CONFIG = ['subflows_config']¶
- classmethod instantiate_from_config(config)¶
Instantiates the flow from a config file.
- Parameters:
config – The configuration of the flow. It must usually contain the following keys: - “subflows_config” (Dict[str,Any]): A dictionary of subflows configurations.The keys are the names of the subflows and the values are the configurations of the subflows. This is necessary when instantiating the flow from a config file. - The parameters required by the constructor of the parent class Flow
- classmethod type()¶
Returns the type of the flow as a string.
Module contents¶
Contains basic flow classes that can be used to build more complex flows.
AtomicFlow is the minimal execution unit in the Flow framework. CompositeFlow is a flow that contains subflows and defines how they are executed.