aiflows.base_flows package

Submodules

aiflows.base_flows.abstract module

class aiflows.base_flows.abstract.Flow(flow_config: Dict[str, Any], cl: CoLink = None)

Bases: ABC

Abstract class inherited by all Flows.

Parameters:

flow_config (Dict[str, Any]) – The configuration of the flow

REQUIRED_KEYS_CONFIG = ['name', 'description']
SUPPORTS_CACHING = False
cl: CoLink
flow_config: Dict[str, Any]
flow_state: Dict[str, Any]
classmethod get_config(**overrides)

Returns the default config for the flow, with the overrides applied. The default implementation construct the default config by recursively merging the configs of the base classes.

Parameters:

overrides (Dict[str, Any], optional) – The parameters to override in the default config

Returns:

The default config with the overrides applied

Return type:

Dict[str, Any]

get_flow_state()

Returns the flow state.

Returns:

The flow state

Return type:

Dict[str, Any]

get_instance_id()
get_interface_description()

Returns the input and output interface description of the flow.

get_reply(**kw)
get_reply_future(**kw)
classmethod instantiate_from_config(config)

Instantiates the flow from the given config.

Parameters:

config (Dict[str, Any]) – The config to instantiate the flow from

Returns:

The instantiated flow

Return type:

aiflows.flow.Flow

classmethod instantiate_from_default_config(**overrides: Dict[str, Any] | None)

Instantiates the flow from the default config, with the given overrides applied.

Parameters:

overrides (Dict[str, Any], optional) – The parameters to override in the default config

Returns:

The instantiated flow

Return type:

aiflows.flow.Flow

classmethod instantiate_with_overrides(overrides)

Instantiates the flow with the given overrides.

Parameters:

overrides (Dict[str, Any], optional) – The parameters to override in the default config

Returns:

The instantiated flow

local_proxy_invocations: Dict[str, Any] = {}
property name

Returns the name of the flow

Returns:

The name of the flow

Return type:

str

package_input_message(data: Dict[str, Any], dst_flow: str = 'unknown', reply_data: Dict[str, Any] = {'mode': 'no_reply'})

Packages the given payload into an FlowMessage.

Parameters:
  • data (Dict[str, Any]) – The data dictionary to package

  • dst_flow (str) – The destination flow

Returns:

The packaged input message

Return type:

FlowMessage

package_output_message(input_message: FlowMessage, response: Dict[str, Any] | FlowMessage)

Packages the given response into an FlowMessage.

Parameters:
  • input_message (FlowMessage) – The input message that was used to generate the response

  • response (Dict[str, Any]) – The response to package

Returns:

The packaged output message

Return type:

FlowMessage

reset(full_reset: bool, recursive: bool, src_flow: Flow | str | None = 'Launcher')

Reset the flow state. If recursive is True, reset all subflows as well.

Parameters:
  • full_reset – If True, remove all data in flow_state. If False, keep the data in flow_state.

  • recursive – If True, reset all subflows as well.

  • src_flow – The flow that initiated the reset

Returns:

run(input_message: FlowMessage) None

Runs the flow on the given input data. (Not implemented in the base class)

Parameters:

input_message (FlowMessage) – The input message to run the flow on

send_message(**kw)

Sets the colink object for the flow and all its subflows.

Parameters:
  • cl (CL.CoLink) – The colink object to set

  • recursive (bool) – Whether to set the colink for all subflows as well

set_up_flow_state()

Sets up the flow state. This method is called when the flow is instantiated, and when the flow is reset.

classmethod type()

aiflows.base_flows.atomic module

class aiflows.base_flows.atomic.AtomicFlow(**kwargs)

Bases: Flow, ABC

AtomicFlow is the minimal execution unit in the Flow framework. It is an encapsulation of a single functionality that takes an input message and returns an output message.

Parameters:

**kwargs – Arguments to be passed to the Flow constructor

classmethod type()

Returns the type of the flow.

aiflows.base_flows.composite module

class aiflows.base_flows.composite.CompositeFlow(flow_config: Dict[str, Any], subflows: List[Flow])

Bases: Flow, ABC

This class implements a composite flow. It is a flow that consists of multiple sub-flows. It is the a parent class for BranchingFlow, SequentialFlow and CircularFlow. Note that the run method of a CompositeFlow is not implemented.

Parameters:
  • flow_config (Dict[str, Any]) – The configuration of the flow. It must usually contain the following keys: - “subflows_config” (Dict[str,Any]): A dictionary of subflows configurations.The keys are the names of the subflows and the values are the configurations of the subflows. This is necessary when instantiating the flow from a config file. - The parameters required by the constructor of the parent class Flow

  • subflows (List[Flow]) – A list of subflows. This is necessary when instantiating the flow programmatically.

REQUIRED_KEYS_CONFIG = ['subflows_config']
classmethod instantiate_from_config(config)

Instantiates the flow from a config file.

Parameters:

config – The configuration of the flow. It must usually contain the following keys: - “subflows_config” (Dict[str,Any]): A dictionary of subflows configurations.The keys are the names of the subflows and the values are the configurations of the subflows. This is necessary when instantiating the flow from a config file. - The parameters required by the constructor of the parent class Flow

subflows: Dict[str, Flow]
classmethod type()

Returns the type of the flow as a string.

Module contents

Contains basic flow classes that can be used to build more complex flows.

AtomicFlow is the minimal execution unit in the Flow framework. CompositeFlow is a flow that contains subflows and defines how they are executed.