aiflows.utils package

Submodules

aiflows.utils.coflows_utils module

class aiflows.utils.coflows_utils.FlowFuture(cl, message_path)

Bases: object

A future object that represents a response from a flow. One can use this object to read the response from the flow.

get_data()

Blocking read of the future returns a dictionary of the data.

get_message()

Blocking read of the future returns a message.

set_output_interface(ouput_interface: Callable)

Set the output interface for the future.

try_get_data()
try_get_message()

Non-blocking read, returns None if there is no response yet.

aiflows.utils.coflows_utils.dispatch_response(cl, output_message, reply_data)

Dispatches a response message to the appropriate flow.

Parameters:
  • cl (CL.Colink) – The colink object

  • output_message (FlowMessage) – The output message

  • reply_data – The meta data describing how to reply

aiflows.utils.coflows_utils.push_to_flow(cl: <module 'colink.colink' from '/Users/nicolasbaldwin/opt/miniconda3/envs/mockenv/lib/python3.11/site-packages/colink/colink.py'>, target_user_id: str, target_flow_id: str, message: ~aiflows.messages.flow_message.FlowMessage)

Pushes a message to a flow via colink

Parameters:
  • cl (CL.colink) – The colink object

  • target_user_id (str) – The user id of the flow we’re pushing to

  • target_flow_id (str) – The flow id of the flow we’re pushing to

  • message (FlowMessage) – The message to push

aiflows.utils.constants module

aiflows.utils.general_helpers module

aiflows.utils.general_helpers.create_unique_id(existing_ids: List[str] = None)

creates a unique id

Parameters:

existing_ids (List[str], optional) – A list of existing ids to check against, defaults to None

Returns:

A unique id

Return type:

str

aiflows.utils.general_helpers.encode_from_buffer(buffer)

Encodes a buffer (typically an image from a video) to base64.

aiflows.utils.general_helpers.encode_image(image_path)

Encodes an image to base64.

aiflows.utils.general_helpers.exception_handler(e)

Handles an exception.

Parameters:

e (Exception) – The exception to handle

aiflows.utils.general_helpers.find_replace_in_dict(cfg, key_to_find, new_value, current_path='')

Recursively searches for keys == key_to_find in a dictionary and replaces its value with new_value. note1: it replaces each key == key_to_find, whever it is nested in the dictionary or not. note2: we recommend to only use this function in the Quick Start tutorial, and not in production code.

Parameters:
  • cfg (Dict[str, Any]) – The dictionary to search in

  • key_to_find (str) – The key to find

  • new_value (Any) – The new value to set

  • current_path (str, optional) – The current path, defaults to “”

Returns:

The updated dictionary

Return type:

Dict[str, Any]

aiflows.utils.general_helpers.flatten_dict(d, parent_key='', sep='.')

Flattens a dictionary.

Parameters:
  • d (Dict[str, Any]) – The dictionary to flatten

  • parent_key (str, optional) – The parent key to use, defaults to ‘’

  • sep (str, optional) – The separator to use, defaults to ‘.’

Returns:

The flattened dictionary

Return type:

Dict[str, Any]

aiflows.utils.general_helpers.get_current_datetime_ns()

Returns the current datetime in nanoseconds.

Returns:

The current datetime in nanoseconds

Return type:

int

aiflows.utils.general_helpers.get_predictions_dir_path(output_dir, create_if_not_exists=True)

Returns the path to the predictions folder.

Parameters:
  • output_dir (str) – The output directory

  • create_if_not_exists (bool, optional) – Whether to create the folder if it does not exist, defaults to True

Returns:

The path to the predictions folder

Return type:

str

aiflows.utils.general_helpers.log_suggest_help()

Logs a message suggesting to get help or provide feedback on github.

aiflows.utils.general_helpers.nested_keys_pop(data_dict: dict, nested_key: str) Any

Pop a nested key in a dictionary.

Parameters:
  • data_dict (dict) – The dictionary to pop from.

  • nested_key (str) – The nested key to pop, in the format “key1.key2.key3”.

Returns:

The value of the popped key.

Return type:

Any

Searches for a nested key in a dictionary using a composite key string.

Parameters:
  • search_dict (dict) – The dictionary to search in.

  • nested_key (str) – The composite key string to search for.

Returns:

A tuple containing the value of the nested key and a boolean indicating if the key was found.

Return type:

Tuple[Any, bool]

aiflows.utils.general_helpers.nested_keys_update(data_dict: dict, nested_key: str, value: Any) None

Update the value of a nested key in a dictionary.

Parameters:
  • data_dict (dict) – The dictionary to update.

  • nested_key (str) – The nested key to update, in the format “key1.key2.key3”.

  • value (Any) – The new value to set for the nested key.

aiflows.utils.general_helpers.process_config_leafs(config: Dict | List, leaf_processor: Callable[[Tuple[Any, Any]], Any])

Processes the leafs of a config dictionary or list.

Parameters:
  • config (Union[Dict, List]) – The config to process

  • leaf_processor (Callable[[Tuple[Any, Any]], Any]) – The leaf processor to use

aiflows.utils.general_helpers.quick_load(cfg, item, key='api_infos')

Recursively loads the config item in a dictionary with key. :param cfg: The dictionary to update :type cfg: Dict[str, Any] :param item: The item to set :type item: Dict[str, Any] :param key: The key to use, defaults to ‘api_infos’ :type key: str, optional

example: cfg = {

‘backend’: {

‘api_infos’: ‘???’, ‘model_name’: {

‘openai’: ‘gpt-4’, ‘azure’: ‘azure/gpt-4’ }

}

‘Executor’{
‘subflows_config’: {

‘backend’: { ‘api_infos’: ‘???’, ‘model_name’: {

‘openai’: ‘gpt-4’, ‘azure’: ‘azure/gpt-4’ }

}

}

}

}

api_information = [ApiInfo(backend_used=”openai”, api_key=os.getenv(“OPENAI_API_KEY”))] quick_load(cfg, api_information) returns: cfg = {

‘backend’: {

‘api_infos’: [ApiInfo(backend_used=”openai”, api_key=os.getenv(“OPENAI_API_KEY”))], ‘model_name’: {

‘openai’: ‘gpt-4’, ‘azure’: ‘azure/gpt-4’ }

}

‘Executor’{
‘subflows_config’: {

‘backend’: { ‘api_infos’: [ApiInfo(backend_used=”openai”, api_key=os.getenv(“OPENAI_API_KEY”))], ‘model_name’: {

‘openai’: ‘gpt-4’, ‘azure’: ‘azure/gpt-4’ }

}

}

}

}

aiflows.utils.general_helpers.quick_load_api_keys(cfg, api_information: List[ApiInfo], key='api_infos')

Recursively loads the api_information in a dictionary in any field where the key is the parameter key

Parameters:
  • cfg (Dict[str, Any]) – The dictionary to update

  • api_information (List[ApiInfo]) – The api information to set

  • key (str, optional) – The key to use, defaults to ‘api_infos’

aiflows.utils.general_helpers.read_gzipped_jsonlines(path_to_file)

Reads a gzipped jsonlines file and returns a list of dictionaries.

Parameters:

path_to_file (str) – The path to the gzipped jsonlines file

Returns:

A list of dictionaries

Return type:

List[Dict[str, Any]]

aiflows.utils.general_helpers.read_jsonlines(path_to_file)

Reads a jsonlines file and returns a list of dictionaries.

Parameters:

path_to_file (str) – The path to the jsonlines file

Returns:

A list of dictionaries

Return type:

List[Dict[str, Any]]

aiflows.utils.general_helpers.read_outputs(outputs_dir)

Reads the outputs from a jsonlines file.

Parameters:

outputs_dir (str) – The directory containing the output files

Returns:

The outputs

Return type:

List[Dict[str, Any]]

aiflows.utils.general_helpers.read_yaml_file(path_to_file, resolve=True)

Reads a yaml file.

Parameters:
  • path_to_file (str) – The path to the yaml file

  • resolve (bool, optional) – Whether to resolve the config, defaults to True

Returns:

The config

Return type:

Dict[str, Any]

aiflows.utils.general_helpers.recursive_dictionary_update(d, u)

Performs a recursive update of the values in dictionary d with the values of dictionary u

Parameters:
  • d (Dict[str, Any]) – The dictionary to update

  • u (Dict[str, Any]) – The dictionary to update with

Returns:

The updated dictionary

aiflows.utils.general_helpers.try_except_decorator(f)

A decorator that wraps the passed in function in order to handle exceptions and log a message suggesting to get help or provide feedback on github.

aiflows.utils.general_helpers.unflatten_dict(d, sep='.')

Unflattens a dictionary.

Parameters:
  • d (Dict[str, Any]) – The dictionary to unflatten

  • sep (str, optional) – The separator to use, defaults to ‘.’

Returns:

The unflattened dictionary

Return type:

Dict[str, Any]

aiflows.utils.general_helpers.validate_flow_config(cls, flow_config)

Validates the flow config.

Parameters:
  • cls (class) – The class to validate the flow config for

  • flow_config (Dict[str, Any]) – The flow config to validate

Raises:

ValueError – If the flow config is invalid

aiflows.utils.general_helpers.write_gzipped_jsonlines(path_to_file, data, mode='w')

Writes a list of dictionaries to a gzipped jsonlines file.

Parameters:
  • path_to_file (str) – The path to the gzipped jsonlines file

  • data (List[Dict[str, Any]]) – The data to write

  • mode (str, optional) – The mode to use, defaults to “w”

aiflows.utils.general_helpers.write_jsonlines(path_to_file, data, mode='w')

Writes a list of dictionaries to a jsonlines file.

Parameters:
  • path_to_file (str) – The path to the jsonlines file

  • data (List[Dict[str, Any]]) – The data to write

  • mode (str, optional) – The mode to use, defaults to “w”

aiflows.utils.general_helpers.write_outputs(path_to_output_file, summary, mode)

Writes the summary to a jsonlines file.

Parameters:
  • path_to_output_file (str) – The path to the output file

  • summary (List[Dict[str, Any]]) – The summary to write

  • mode (str) – The mode to use

aiflows.utils.io_utils module

aiflows.utils.io_utils.coflows_deserialize(encoded_data: bytes, use_pickle=False) Any

Deserializes the given data.

Parameters:
  • encoded_data (bytes) – data to deserialize

  • use_pickle (bool) – whether to use pickle for deserialization (default is False)

aiflows.utils.io_utils.coflows_serialize(data: Any, use_pickle=False) bytes

Serializes the given data.

Parameters:
  • data (Any) – data to serialize

  • use_pickle (bool) – whether to use pickle for serialization (default is False)

aiflows.utils.io_utils.load_pickle(pickle_path: str)

Loads data from a pickle file.

Parameters:

pickle_path (str) – The path to the pickle file

Returns:

The data loaded from the pickle file

Return type:

Any

aiflows.utils.io_utils.recursive_json_serialize(obj)

Recursively serializes an object to json.

Parameters:

obj (Any) – The object to serialize

Returns:

The serialized object

Return type:

Any

aiflows.utils.logging module

Logging utilities.

aiflows.utils.logging.add_handler(handler: Handler) None

adds a handler to the Flows’s root logger.

aiflows.utils.logging.auto_set_dir(action=None, name=None)

Use logger.set_logger_dir() to set log directory to “./.aiflows/logs/{scriptname}:{name}”. “scriptname” is the name of the main python file currently running

Parameters:
  • action (str, optional) – an action of [“k”,”d”,”q”] to be performed when the directory exists. When the directory exists, Will ask user by default. -“d”: delete the directory. Note that the deletion may fail when the directory is used by tensorboard. -“k”: keep the directory. This is useful when you resume from a previous training and want the directory to look as if the training was not interrupted. Note that this option does not load old models or any other old states for you. It simply does nothing.

  • name (str, optional) – The name of the directory

aiflows.utils.logging.disable_default_handler() None

Disable the default handler of the Flows’s root logger.

aiflows.utils.logging.disable_propagation() None

Disable propagation of the library log outputs. Note that log propagation is disabled by default.

aiflows.utils.logging.enable_default_handler() None

Enable the default handler of the Flows’s root logger.

aiflows.utils.logging.enable_explicit_format() None

Enable explicit formatting for every Flows’s logger. The explicit formatter is as follows:

[LEVELNAME|FILENAME|LINE NUMBER] TIME >> MESSAGE

All handlers currently bound to the root logger are affected by this method.

aiflows.utils.logging.enable_propagation() None

Enable propagation of the library log outputs. Please disable the Flows’s default handler to prevent double logging if the root logger has been configured.

aiflows.utils.logging.get_log_levels_dict()

Return a dictionary of all available log levels.

aiflows.utils.logging.get_logger(name: str | None = None) Logger

Return a logger with the specified name. This function is not supposed to be directly accessed unless you are writing a custom aiflows module.

Parameters:

name (str, optional) – The name of the logger to return

Returns:

The logger

aiflows.utils.logging.get_logger_dir()
Returns:

The logger directory, or None if not set. The directory is used for general logging, tensorboard events, checkpoints, etc.

Return type:

str

aiflows.utils.logging.get_verbosity() int

Return the current level for the Flows’s root logger as an int.

Returns:

The logging level

Return type:

int

Note

Flows has following logging levels:

  • 50: aiflows.logging.CRITICAL or aiflows.logging.FATAL

  • 40: aiflows.logging.ERROR

  • 30: aiflows.logging.WARNING or aiflows.logging.WARN

  • 20: aiflows.logging.INFO

  • 10: aiflows.logging.DEBUG

aiflows.utils.logging.remove_handler(handler: Handler) None

removes given handler from the Flows’s root logger.

aiflows.utils.logging.reset_format() None

Resets the formatting for Flows’s loggers. All handlers currently bound to the root logger are affected by this method.

aiflows.utils.logging.set_dir(dirname, action=None)

Set the directory for global logging. :param dirname: log directory :type dirname: str :param action: an action of [“k”,”d”,”q”] to be performed when the directory exists. When the directory exists, Will ask user by default. - “d”: delete the directory. Note that the deletion may fail when the directory is used by tensorboard. - “k”: keep the directory. This is useful when you resume from a previous training and want the directory to look as if the training was not interrupted. Note that this option does not load old models or any other old states for you. It simply does nothing.

Parameters:
  • dirname (str) – log directory

  • action (str, optional) – an action of [“k”,”d”,”q”] to be performed when the directory exists. When the directory exists, Will ask user by default. - “d”: delete the directory. Note that the deletion may fail when the directory is used by tensorboard. - “k”: keep the directory. This is useful when you resume from a previous training and want the directory to look as if the training was not interrupted. Note that this option does not load old models or any other old states for you. It simply does nothing.

aiflows.utils.logging.set_verbosity(verbosity: int) None

Set the verbosity level for the Flows’s root logger.

Parameters:

verbosity (int) – Logging level. For example, it can be one of the following: - aiflows.logging.CRITICAL or aiflows.logging.FATAL - aiflows.logging.ERROR - aiflows.logging.WARNING or aiflows.logging.WARN - aiflows.logging.INFO - aiflows.logging.DEBUG

aiflows.utils.logging.set_verbosity_debug()

Set the verbosity to the DEBUG level.

aiflows.utils.logging.set_verbosity_error()

Set the verbosity to the ERROR level.

aiflows.utils.logging.set_verbosity_info()

Set the verbosity to the INFO level.

aiflows.utils.logging.set_verbosity_warning()

Set the verbosity to the WARNING level.

aiflows.utils.logging.warning_advice(self, *args, **kwargs)

This method is identical to logger.warning(), but if env var FLOWS_NO_ADVISORY_WARNINGS=1 is set, this warning will not be printed

Parameters:
  • self – The logger object

  • *args – The arguments to pass to the warning method

  • **kwargs – The keyword arguments to pass to the warning method

aiflows.utils.logging.warning_once(self, *args, **kwargs)

This method is identical to logger.warning(), but will emit the warning with the same message only once

Note

The cache is for the function arguments, so 2 different callers using the same arguments will hit the cache. The assumption here is that all warning messages are unique across the code. If they aren’t then need to switch to another type of cache that includes the caller frame information in the hashing function.

aiflows.utils.rich_utils module

aiflows.utils.rich_utils.print_config_tree(cfg: DictConfig, print_order: Sequence[str] = [], resolve: bool = False, save_to_file: bool = False) None

Prints content of DictConfig using Rich library and its tree structure.

Parameters:
  • cfg (DictConfig) – Configuration composed by Hydra.

  • print_order (Sequence[str], optional) – Determines in what order config components are printed, defaults to []

  • resolve (bool, optional) – Whether to resolve reference fields of DictConfig, defaults to False

aiflows.utils.serving module

exception aiflows.utils.serving.FlowInstanceException(flow_endpoint, user_id, message='')

Bases: Exception

aiflows.utils.serving.delete_all_flow_endpoints(cl: CoLink)

Deletes all flow endpoints. This includes deleting all flow instances.

Parameters:

cl (CoLink) – colink object

aiflows.utils.serving.delete_flow_endpoint(cl: CoLink, flow_endpoint: str)

Deletes all colink entries at given flow_endpoint. This includes deleting all instances of this flow.

Parameters:
  • cl (CoLink) – colink object

  • flow_endpoint (str) – endpoint of the flow

aiflows.utils.serving.delete_flow_instance(cl: CoLink, flow_id: str)

Deletes all colink entries associated with flow instance.

Parameters:
  • cl (CoLink) – colink object

  • flow_id (str) – id of the flow instance

Returns:

dict with metadata about specified local flow instance

Return type:

Dict[str, Any]

aiflows.utils.serving.get_flow_instance(cl: CoLink, flow_endpoint: str, user_id: str = 'local', config_overrides: Dict[str, Any] = {}, initial_state: Dict[str, Any] = None, dispatch_point_override: str = None) AtomicFlow

Returns an atomic flow wrapper around an instance of the specified flow.

Parameters:
  • flow_endpoint (str) –

  • user_id (str) –

  • config_overrides (Dict[str, Any]) –

  • initial_state (Dict[str, Any]) –

  • dispatch_point_override (Dict[str, Any]) – overrides dispatch point for this instance (only relevant for locally served flows)

Returns:

atomic flow wrapper around flow instance

Return type:

aiflows.base_flows.AtomicFlow

aiflows.utils.serving.is_flow_served(cl: CoLink, flow_endpoint: str) bool

Returns True if the flow is being served at the given endpoint.

Parameters:
  • cl (CoLink) – colink object

  • flow_endpoint (str) – endpoint of the flow

Returns:

True if the flow is being served at the given endpoint

Return type:

bool

aiflows.utils.serving.mount(cl: CoLink, client_id: str, flow_endpoint: str, config_overrides: Dict[str, Any] = None, initial_state: Dict[str, Any] = None, dispatch_point_override: str = None) AtomicFlow

Mounts a new instance at the specified flow endpoint by creating necessary entries in CoLink storage.

Parameters:
  • cl (CoLink) – colink object

  • client_id (str) – id of colink user making the request (also known as user_id)

  • flow_endpoint (str) – endpoint of the flow

  • config_overrides (Dict[str, Any]) – dictionary with config overrides for the flow instance

  • initial_state (Dict[str, Any]) – initial state of the flow instance

  • dispatch_point_override (Dict[str, Any]) – overrides dispatch point for this instance

Returns:

proxy flow object

Return type:

aiflows.base_flows.AtomicFlow

aiflows.utils.serving.recursive_delete_flow_instance(cl: CoLink, flow_id: str)
aiflows.utils.serving.recursive_serve_flow(cl: CoLink, flow_class_name: str, flow_endpoint: str, dispatch_point: str = 'coflows_dispatch', parallel_dispatch: bool = False, singleton: bool = False) bool
aiflows.utils.serving.serve_flow(cl: CoLink, flow_class_name: str, flow_endpoint: str, dispatch_point: str = 'coflows_dispatch', parallel_dispatch: bool = False, singleton: bool = False) bool

Serves the flow specified by flow_class_name at endpoint specified by flow_endpoint. After serving, users can get an instance of the served flow via the get_flow_instance operation.

Parameters:
  • cl (CoLink) – colink object

  • flow_class_name (str) – name of the flow class (e.g. “aiflows.base_flows.AtomicFlow” or “flow_modules.my_flow.MyFlow”)

  • flow_endpoint (str) – endpoint of the flow (the name under which the flow will be served). Users will use this endpoint to get instances of the flow.

  • dispatch_point (str) – dispatch point for the flow

  • parallel_dispatch (bool) – whether to use parallel dispatch for the flow. If True, multiple calls to the same flow instance can be done simultaneously (the flow is stateless).

  • singleton – whether to serve the flow as a singleton. If True, only one instance of the flow can be mounted. Users will all get the same instance.

Returns:

True if the flow was successfully served; False if the flow is already served or an error occurred.

Return type:

bool

aiflows.utils.serving.unserve_flow(cl: CoLink, flow_endpoint: str)

unserves flow - users will no longer be able to get instances from this flow_endpoint. all live instances created on this flow_endpoint remain alive.

Parameters:
  • cl (CoLink) – colink object

  • flow_endpoint (str) – endpoint of the flow

Module contents