aiflows.utils package¶
Submodules¶
aiflows.utils.coflows_utils module¶
- class aiflows.utils.coflows_utils.FlowFuture(cl, message_path)¶
Bases:
object
A future object that represents a response from a flow. One can use this object to read the response from the flow.
- get_data()¶
Blocking read of the future returns a dictionary of the data.
- get_message()¶
Blocking read of the future returns a message.
- set_output_interface(ouput_interface: Callable)¶
Set the output interface for the future.
- try_get_data()¶
- try_get_message()¶
Non-blocking read, returns None if there is no response yet.
- aiflows.utils.coflows_utils.dispatch_response(cl, output_message, reply_data)¶
Dispatches a response message to the appropriate flow.
- Parameters:
cl (CL.Colink) – The colink object
output_message (FlowMessage) – The output message
reply_data – The meta data describing how to reply
- aiflows.utils.coflows_utils.push_to_flow(cl: <module 'colink.colink' from '/Users/nicolasbaldwin/opt/miniconda3/envs/mockenv/lib/python3.11/site-packages/colink/colink.py'>, target_user_id: str, target_flow_id: str, message: ~aiflows.messages.flow_message.FlowMessage)¶
Pushes a message to a flow via colink
- Parameters:
cl (CL.colink) – The colink object
target_user_id (str) – The user id of the flow we’re pushing to
target_flow_id (str) – The flow id of the flow we’re pushing to
message (FlowMessage) – The message to push
aiflows.utils.colink_utils module¶
- aiflows.utils.colink_utils.delete_entries_on_path(cl: CoLink, path)¶
Deletes all entries on the given path.
- Parameters:
cl (CoLink) – colink object
path (str) – path to delete
- aiflows.utils.colink_utils.print_flow_instances(cl: CoLink, print_values=False)¶
Prints the flow instances of the given colink object.
- Parameters:
cl (CoLink) – colink object
print_values (bool) – whether to print the values of the keys (default is False)
- aiflows.utils.colink_utils.print_served_flows(cl: CoLink, print_values=False)¶
Prints the served flows of the given colink object.
- Parameters:
cl (CoLink) – colink object
print_values (bool) – whether to print the values of the keys (default is False)
- aiflows.utils.colink_utils.recursive_print_keys(cl: CoLink, path, print_values=False, indent=0)¶
Recursively prints the keys in the given path.
- Parameters:
cl (CoLink) – colink object
path (str) – path to print
print_values (bool) – whether to print the values of the keys (default is False)
indent (int) – indentation level
- aiflows.utils.colink_utils.start_colink_server() CoLink ¶
Starts a colink server and returns a colink object with a generated user.
- aiflows.utils.colink_utils.start_colink_server_with_users(num_users: int = 1) List[CoLink] ¶
Starts a colink server and returns a list of colink objects with generated users.
- Parameters:
num_users (int) – number of users to create
- Returns:
list of colink objects (one for each user)
aiflows.utils.constants module¶
aiflows.utils.general_helpers module¶
- aiflows.utils.general_helpers.create_unique_id(existing_ids: List[str] = None)¶
creates a unique id
- Parameters:
existing_ids (List[str], optional) – A list of existing ids to check against, defaults to None
- Returns:
A unique id
- Return type:
str
- aiflows.utils.general_helpers.encode_from_buffer(buffer)¶
Encodes a buffer (typically an image from a video) to base64.
- aiflows.utils.general_helpers.encode_image(image_path)¶
Encodes an image to base64.
- aiflows.utils.general_helpers.exception_handler(e)¶
Handles an exception.
- Parameters:
e (Exception) – The exception to handle
- aiflows.utils.general_helpers.find_replace_in_dict(cfg, key_to_find, new_value, current_path='')¶
Recursively searches for keys == key_to_find in a dictionary and replaces its value with new_value. note1: it replaces each key == key_to_find, whever it is nested in the dictionary or not. note2: we recommend to only use this function in the Quick Start tutorial, and not in production code.
- Parameters:
cfg (Dict[str, Any]) – The dictionary to search in
key_to_find (str) – The key to find
new_value (Any) – The new value to set
current_path (str, optional) – The current path, defaults to “”
- Returns:
The updated dictionary
- Return type:
Dict[str, Any]
- aiflows.utils.general_helpers.flatten_dict(d, parent_key='', sep='.')¶
Flattens a dictionary.
- Parameters:
d (Dict[str, Any]) – The dictionary to flatten
parent_key (str, optional) – The parent key to use, defaults to ‘’
sep (str, optional) – The separator to use, defaults to ‘.’
- Returns:
The flattened dictionary
- Return type:
Dict[str, Any]
- aiflows.utils.general_helpers.get_current_datetime_ns()¶
Returns the current datetime in nanoseconds.
- Returns:
The current datetime in nanoseconds
- Return type:
int
- aiflows.utils.general_helpers.get_predictions_dir_path(output_dir, create_if_not_exists=True)¶
Returns the path to the predictions folder.
- Parameters:
output_dir (str) – The output directory
create_if_not_exists (bool, optional) – Whether to create the folder if it does not exist, defaults to True
- Returns:
The path to the predictions folder
- Return type:
str
- aiflows.utils.general_helpers.log_suggest_help()¶
Logs a message suggesting to get help or provide feedback on github.
- aiflows.utils.general_helpers.nested_keys_pop(data_dict: dict, nested_key: str) Any ¶
Pop a nested key in a dictionary.
- Parameters:
data_dict (dict) – The dictionary to pop from.
nested_key (str) – The nested key to pop, in the format “key1.key2.key3”.
- Returns:
The value of the popped key.
- Return type:
Any
- aiflows.utils.general_helpers.nested_keys_search(search_dict, nested_key) Tuple[Any, bool] ¶
Searches for a nested key in a dictionary using a composite key string.
- Parameters:
search_dict (dict) – The dictionary to search in.
nested_key (str) – The composite key string to search for.
- Returns:
A tuple containing the value of the nested key and a boolean indicating if the key was found.
- Return type:
Tuple[Any, bool]
- aiflows.utils.general_helpers.nested_keys_update(data_dict: dict, nested_key: str, value: Any) None ¶
Update the value of a nested key in a dictionary.
- Parameters:
data_dict (dict) – The dictionary to update.
nested_key (str) – The nested key to update, in the format “key1.key2.key3”.
value (Any) – The new value to set for the nested key.
- aiflows.utils.general_helpers.process_config_leafs(config: Dict | List, leaf_processor: Callable[[Tuple[Any, Any]], Any])¶
Processes the leafs of a config dictionary or list.
- Parameters:
config (Union[Dict, List]) – The config to process
leaf_processor (Callable[[Tuple[Any, Any]], Any]) – The leaf processor to use
- aiflows.utils.general_helpers.quick_load(cfg, item, key='api_infos')¶
Recursively loads the config item in a dictionary with key. :param cfg: The dictionary to update :type cfg: Dict[str, Any] :param item: The item to set :type item: Dict[str, Any] :param key: The key to use, defaults to ‘api_infos’ :type key: str, optional
example: cfg = {
- ‘backend’: {
‘api_infos’: ‘???’, ‘model_name’: {
‘openai’: ‘gpt-4’, ‘azure’: ‘azure/gpt-4’ }
}
- ‘Executor’{
- ‘subflows_config’: {
‘backend’: { ‘api_infos’: ‘???’, ‘model_name’: {
‘openai’: ‘gpt-4’, ‘azure’: ‘azure/gpt-4’ }
}
}
}
}
api_information = [ApiInfo(backend_used=”openai”, api_key=os.getenv(“OPENAI_API_KEY”))] quick_load(cfg, api_information) returns: cfg = {
- ‘backend’: {
‘api_infos’: [ApiInfo(backend_used=”openai”, api_key=os.getenv(“OPENAI_API_KEY”))], ‘model_name’: {
‘openai’: ‘gpt-4’, ‘azure’: ‘azure/gpt-4’ }
}
- ‘Executor’{
- ‘subflows_config’: {
‘backend’: { ‘api_infos’: [ApiInfo(backend_used=”openai”, api_key=os.getenv(“OPENAI_API_KEY”))], ‘model_name’: {
‘openai’: ‘gpt-4’, ‘azure’: ‘azure/gpt-4’ }
}
}
}
}
- aiflows.utils.general_helpers.quick_load_api_keys(cfg, api_information: List[ApiInfo], key='api_infos')¶
Recursively loads the api_information in a dictionary in any field where the key is the parameter key
- Parameters:
cfg (Dict[str, Any]) – The dictionary to update
api_information (List[ApiInfo]) – The api information to set
key (str, optional) – The key to use, defaults to ‘api_infos’
- aiflows.utils.general_helpers.read_gzipped_jsonlines(path_to_file)¶
Reads a gzipped jsonlines file and returns a list of dictionaries.
- Parameters:
path_to_file (str) – The path to the gzipped jsonlines file
- Returns:
A list of dictionaries
- Return type:
List[Dict[str, Any]]
- aiflows.utils.general_helpers.read_jsonlines(path_to_file)¶
Reads a jsonlines file and returns a list of dictionaries.
- Parameters:
path_to_file (str) – The path to the jsonlines file
- Returns:
A list of dictionaries
- Return type:
List[Dict[str, Any]]
- aiflows.utils.general_helpers.read_outputs(outputs_dir)¶
Reads the outputs from a jsonlines file.
- Parameters:
outputs_dir (str) – The directory containing the output files
- Returns:
The outputs
- Return type:
List[Dict[str, Any]]
- aiflows.utils.general_helpers.read_yaml_file(path_to_file, resolve=True)¶
Reads a yaml file.
- Parameters:
path_to_file (str) – The path to the yaml file
resolve (bool, optional) – Whether to resolve the config, defaults to True
- Returns:
The config
- Return type:
Dict[str, Any]
- aiflows.utils.general_helpers.recursive_dictionary_update(d, u)¶
Performs a recursive update of the values in dictionary d with the values of dictionary u
- Parameters:
d (Dict[str, Any]) – The dictionary to update
u (Dict[str, Any]) – The dictionary to update with
- Returns:
The updated dictionary
- aiflows.utils.general_helpers.try_except_decorator(f)¶
A decorator that wraps the passed in function in order to handle exceptions and log a message suggesting to get help or provide feedback on github.
- aiflows.utils.general_helpers.unflatten_dict(d, sep='.')¶
Unflattens a dictionary.
- Parameters:
d (Dict[str, Any]) – The dictionary to unflatten
sep (str, optional) – The separator to use, defaults to ‘.’
- Returns:
The unflattened dictionary
- Return type:
Dict[str, Any]
- aiflows.utils.general_helpers.validate_flow_config(cls, flow_config)¶
Validates the flow config.
- Parameters:
cls (class) – The class to validate the flow config for
flow_config (Dict[str, Any]) – The flow config to validate
- Raises:
ValueError – If the flow config is invalid
- aiflows.utils.general_helpers.write_gzipped_jsonlines(path_to_file, data, mode='w')¶
Writes a list of dictionaries to a gzipped jsonlines file.
- Parameters:
path_to_file (str) – The path to the gzipped jsonlines file
data (List[Dict[str, Any]]) – The data to write
mode (str, optional) – The mode to use, defaults to “w”
- aiflows.utils.general_helpers.write_jsonlines(path_to_file, data, mode='w')¶
Writes a list of dictionaries to a jsonlines file.
- Parameters:
path_to_file (str) – The path to the jsonlines file
data (List[Dict[str, Any]]) – The data to write
mode (str, optional) – The mode to use, defaults to “w”
- aiflows.utils.general_helpers.write_outputs(path_to_output_file, summary, mode)¶
Writes the summary to a jsonlines file.
- Parameters:
path_to_output_file (str) – The path to the output file
summary (List[Dict[str, Any]]) – The summary to write
mode (str) – The mode to use
aiflows.utils.io_utils module¶
- aiflows.utils.io_utils.coflows_deserialize(encoded_data: bytes, use_pickle=False) Any ¶
Deserializes the given data.
- Parameters:
encoded_data (bytes) – data to deserialize
use_pickle (bool) – whether to use pickle for deserialization (default is False)
- aiflows.utils.io_utils.coflows_serialize(data: Any, use_pickle=False) bytes ¶
Serializes the given data.
- Parameters:
data (Any) – data to serialize
use_pickle (bool) – whether to use pickle for serialization (default is False)
- aiflows.utils.io_utils.load_pickle(pickle_path: str)¶
Loads data from a pickle file.
- Parameters:
pickle_path (str) – The path to the pickle file
- Returns:
The data loaded from the pickle file
- Return type:
Any
- aiflows.utils.io_utils.recursive_json_serialize(obj)¶
Recursively serializes an object to json.
- Parameters:
obj (Any) – The object to serialize
- Returns:
The serialized object
- Return type:
Any
aiflows.utils.logging module¶
Logging utilities.
- aiflows.utils.logging.add_handler(handler: Handler) None ¶
adds a handler to the Flows’s root logger.
- aiflows.utils.logging.auto_set_dir(action=None, name=None)¶
Use
logger.set_logger_dir()
to set log directory to “./.aiflows/logs/{scriptname}:{name}”. “scriptname” is the name of the main python file currently running- Parameters:
action (str, optional) – an action of [“k”,”d”,”q”] to be performed when the directory exists. When the directory exists, Will ask user by default. -“d”: delete the directory. Note that the deletion may fail when the directory is used by tensorboard. -“k”: keep the directory. This is useful when you resume from a previous training and want the directory to look as if the training was not interrupted. Note that this option does not load old models or any other old states for you. It simply does nothing.
name (str, optional) – The name of the directory
- aiflows.utils.logging.disable_default_handler() None ¶
Disable the default handler of the Flows’s root logger.
- aiflows.utils.logging.disable_propagation() None ¶
Disable propagation of the library log outputs. Note that log propagation is disabled by default.
- aiflows.utils.logging.enable_default_handler() None ¶
Enable the default handler of the Flows’s root logger.
- aiflows.utils.logging.enable_explicit_format() None ¶
Enable explicit formatting for every Flows’s logger. The explicit formatter is as follows:
[LEVELNAME|FILENAME|LINE NUMBER] TIME >> MESSAGE
All handlers currently bound to the root logger are affected by this method.
- aiflows.utils.logging.enable_propagation() None ¶
Enable propagation of the library log outputs. Please disable the Flows’s default handler to prevent double logging if the root logger has been configured.
- aiflows.utils.logging.get_log_levels_dict()¶
Return a dictionary of all available log levels.
- aiflows.utils.logging.get_logger(name: str | None = None) Logger ¶
Return a logger with the specified name. This function is not supposed to be directly accessed unless you are writing a custom aiflows module.
- Parameters:
name (str, optional) – The name of the logger to return
- Returns:
The logger
- aiflows.utils.logging.get_logger_dir()¶
- Returns:
The logger directory, or None if not set. The directory is used for general logging, tensorboard events, checkpoints, etc.
- Return type:
str
- aiflows.utils.logging.get_verbosity() int ¶
Return the current level for the Flows’s root logger as an int.
- Returns:
The logging level
- Return type:
int
Note
Flows has following logging levels:
50: aiflows.logging.CRITICAL or aiflows.logging.FATAL
40: aiflows.logging.ERROR
30: aiflows.logging.WARNING or aiflows.logging.WARN
20: aiflows.logging.INFO
10: aiflows.logging.DEBUG
- aiflows.utils.logging.remove_handler(handler: Handler) None ¶
removes given handler from the Flows’s root logger.
- aiflows.utils.logging.reset_format() None ¶
Resets the formatting for Flows’s loggers. All handlers currently bound to the root logger are affected by this method.
- aiflows.utils.logging.set_dir(dirname, action=None)¶
Set the directory for global logging. :param dirname: log directory :type dirname: str :param action: an action of [“k”,”d”,”q”] to be performed when the directory exists. When the directory exists, Will ask user by default. - “d”: delete the directory. Note that the deletion may fail when the directory is used by tensorboard. - “k”: keep the directory. This is useful when you resume from a previous training and want the directory to look as if the training was not interrupted. Note that this option does not load old models or any other old states for you. It simply does nothing.
- Parameters:
dirname (str) – log directory
action (str, optional) – an action of [“k”,”d”,”q”] to be performed when the directory exists. When the directory exists, Will ask user by default. - “d”: delete the directory. Note that the deletion may fail when the directory is used by tensorboard. - “k”: keep the directory. This is useful when you resume from a previous training and want the directory to look as if the training was not interrupted. Note that this option does not load old models or any other old states for you. It simply does nothing.
- aiflows.utils.logging.set_verbosity(verbosity: int) None ¶
Set the verbosity level for the Flows’s root logger.
- Parameters:
verbosity (int) – Logging level. For example, it can be one of the following: - aiflows.logging.CRITICAL or aiflows.logging.FATAL - aiflows.logging.ERROR - aiflows.logging.WARNING or aiflows.logging.WARN - aiflows.logging.INFO - aiflows.logging.DEBUG
- aiflows.utils.logging.set_verbosity_debug()¶
Set the verbosity to the DEBUG level.
- aiflows.utils.logging.set_verbosity_error()¶
Set the verbosity to the ERROR level.
- aiflows.utils.logging.set_verbosity_info()¶
Set the verbosity to the INFO level.
- aiflows.utils.logging.set_verbosity_warning()¶
Set the verbosity to the WARNING level.
- aiflows.utils.logging.warning_advice(self, *args, **kwargs)¶
This method is identical to logger.warning(), but if env var FLOWS_NO_ADVISORY_WARNINGS=1 is set, this warning will not be printed
- Parameters:
self – The logger object
*args – The arguments to pass to the warning method
**kwargs – The keyword arguments to pass to the warning method
- aiflows.utils.logging.warning_once(self, *args, **kwargs)¶
This method is identical to logger.warning(), but will emit the warning with the same message only once
Note
The cache is for the function arguments, so 2 different callers using the same arguments will hit the cache. The assumption here is that all warning messages are unique across the code. If they aren’t then need to switch to another type of cache that includes the caller frame information in the hashing function.
aiflows.utils.rich_utils module¶
- aiflows.utils.rich_utils.print_config_tree(cfg: DictConfig, print_order: Sequence[str] = [], resolve: bool = False, save_to_file: bool = False) None ¶
Prints content of DictConfig using Rich library and its tree structure.
- Parameters:
cfg (DictConfig) – Configuration composed by Hydra.
print_order (Sequence[str], optional) – Determines in what order config components are printed, defaults to []
resolve (bool, optional) – Whether to resolve reference fields of DictConfig, defaults to False
aiflows.utils.serving module¶
- exception aiflows.utils.serving.FlowInstanceException(flow_endpoint, user_id, message='')¶
Bases:
Exception
- aiflows.utils.serving.delete_all_flow_endpoints(cl: CoLink)¶
Deletes all flow endpoints. This includes deleting all flow instances.
- Parameters:
cl (CoLink) – colink object
- aiflows.utils.serving.delete_flow_endpoint(cl: CoLink, flow_endpoint: str)¶
Deletes all colink entries at given flow_endpoint. This includes deleting all instances of this flow.
- Parameters:
cl (CoLink) – colink object
flow_endpoint (str) – endpoint of the flow
- aiflows.utils.serving.delete_flow_instance(cl: CoLink, flow_id: str)¶
Deletes all colink entries associated with flow instance.
- Parameters:
cl (CoLink) – colink object
flow_id (str) – id of the flow instance
- Returns:
dict with metadata about specified local flow instance
- Return type:
Dict[str, Any]
- aiflows.utils.serving.get_flow_instance(cl: CoLink, flow_endpoint: str, user_id: str = 'local', config_overrides: Dict[str, Any] = {}, initial_state: Dict[str, Any] = None, dispatch_point_override: str = None) AtomicFlow ¶
Returns an atomic flow wrapper around an instance of the specified flow.
- Parameters:
flow_endpoint (str) –
user_id (str) –
config_overrides (Dict[str, Any]) –
initial_state (Dict[str, Any]) –
dispatch_point_override (Dict[str, Any]) – overrides dispatch point for this instance (only relevant for locally served flows)
- Returns:
atomic flow wrapper around flow instance
- Return type:
aiflows.base_flows.AtomicFlow
- aiflows.utils.serving.is_flow_served(cl: CoLink, flow_endpoint: str) bool ¶
Returns True if the flow is being served at the given endpoint.
- Parameters:
cl (CoLink) – colink object
flow_endpoint (str) – endpoint of the flow
- Returns:
True if the flow is being served at the given endpoint
- Return type:
bool
- aiflows.utils.serving.mount(cl: CoLink, client_id: str, flow_endpoint: str, config_overrides: Dict[str, Any] = None, initial_state: Dict[str, Any] = None, dispatch_point_override: str = None) AtomicFlow ¶
Mounts a new instance at the specified flow endpoint by creating necessary entries in CoLink storage.
- Parameters:
cl (CoLink) – colink object
client_id (str) – id of colink user making the request (also known as user_id)
flow_endpoint (str) – endpoint of the flow
config_overrides (Dict[str, Any]) – dictionary with config overrides for the flow instance
initial_state (Dict[str, Any]) – initial state of the flow instance
dispatch_point_override (Dict[str, Any]) – overrides dispatch point for this instance
- Returns:
proxy flow object
- Return type:
aiflows.base_flows.AtomicFlow
- aiflows.utils.serving.recursive_delete_flow_instance(cl: CoLink, flow_id: str)¶
- aiflows.utils.serving.recursive_serve_flow(cl: CoLink, flow_class_name: str, flow_endpoint: str, dispatch_point: str = 'coflows_dispatch', parallel_dispatch: bool = False, singleton: bool = False) bool ¶
- aiflows.utils.serving.serve_flow(cl: CoLink, flow_class_name: str, flow_endpoint: str, dispatch_point: str = 'coflows_dispatch', parallel_dispatch: bool = False, singleton: bool = False) bool ¶
Serves the flow specified by flow_class_name at endpoint specified by flow_endpoint. After serving, users can get an instance of the served flow via the get_flow_instance operation.
- Parameters:
cl (CoLink) – colink object
flow_class_name (str) – name of the flow class (e.g. “aiflows.base_flows.AtomicFlow” or “flow_modules.my_flow.MyFlow”)
flow_endpoint (str) – endpoint of the flow (the name under which the flow will be served). Users will use this endpoint to get instances of the flow.
dispatch_point (str) – dispatch point for the flow
parallel_dispatch (bool) – whether to use parallel dispatch for the flow. If True, multiple calls to the same flow instance can be done simultaneously (the flow is stateless).
singleton – whether to serve the flow as a singleton. If True, only one instance of the flow can be mounted. Users will all get the same instance.
- Returns:
True if the flow was successfully served; False if the flow is already served or an error occurred.
- Return type:
bool
- aiflows.utils.serving.start_colink_component(component_name: str, args)¶
- aiflows.utils.serving.unserve_flow(cl: CoLink, flow_endpoint: str)¶
unserves flow - users will no longer be able to get instances from this flow_endpoint. all live instances created on this flow_endpoint remain alive.
- Parameters:
cl (CoLink) – colink object
flow_endpoint (str) – endpoint of the flow