aiflows.flow_launchers package¶
Submodules¶
aiflows.flow_launchers.abstract module¶
- class aiflows.flow_launchers.abstract.BaseLauncher¶
Bases:
ABC
A base class for creating a model launcher.
- predict(batch: Iterable[Dict]) List[Dict] ¶
Runs inference for the data provided in the batch. It returns a list of dictionaries containing the predictions. (Not Implemented for BaseLauncher)
- Parameters:
batch (Iterable[Dict]) – An iterable of dictionaries containing the data for each sample to run inference on.
- Returns:
A list of dictionaries containing the predictions.
- Return type:
List[Dict]
- predict_dataloader(dataloader: Iterable, path_to_cache: str | None = None)¶
Runs inference for the data provided in the dataloader. (Not Implemented for BaseLauncher)
- Parameters:
dataloader (Iterable) – An iterable of dictionaries containing the data for each sample to run inference on.
path_to_cache (Optional[str], optional) – A path to a cache file containing existing predictions to use as a starting point.
- classmethod write_batch_output(batch: List[Dict], path_to_output_file: str, keys_to_write: List[str])¶
Class method that writes the output of a batch to a file.
- Parameters:
batch (List[Dict]) – A list of dictionaries containing the predictions.
path_to_output_file (str) – The path to the output file.
keys_to_write (List[str]) – A list of keys to write to file.
- class aiflows.flow_launchers.abstract.MultiThreadedAPILauncher(**kwargs)¶
Bases:
BaseLauncher
,ABC
A class for creating a multi-threaded model to query API that can make requests using multiple API keys.
- Parameters:
debug (bool, optional) – A boolean indicating whether to print debug information (if true, it will not run the multithreading).
output_dir (str, optional) – The directory to write the output files to.
n_workers (int, optional) – The number of workers to use in the multithreading.
wait_time_per_key (int, optional) – The number of seconds to wait before making another request with the same API key.
single_threaded (bool, optional) – A boolean indicating whether to run the multithreading or not.
- predict_dataloader(dataloader: Iterable[dict], flows_with_interfaces: List[Dict[str, Any]]) None ¶
Runs inference for the data provided in the dataloader. It writes the results to output files selected from the output_dir attributes.
- Parameters:
dataloader – An iterable of dictionaries containing the data for each sample to run inference on.
flows_with_interfaces(List[Dict]) – A list of dictionaries containing a flow instance, and an input and output interface.
aiflows.flow_launchers.flow_API_launcher module¶
- class aiflows.flow_launchers.flow_API_launcher.FlowLauncher(n_independent_samples: int, fault_tolerant_mode: bool, n_batch_retries: int, wait_time_between_retries: int, **kwargs)¶
Bases:
MultiThreadedAPILauncher
Flow Launcher class for running inference on a flow. One can run the inference with the flow launcher in multiple ways: - Using the launch class method: This method takes a flow and runs inference on the given data (no multithreading) and no need to instantiate the class. - Using the predict_dataloader method: This method runs inference on the given dataloader (Requires instatiating the class). The predict_dataloader`method can run inference in both single-threaded and multi-threaded modes (see the `MultiThreadedAPILauncher class for more details).
- Parameters:
n_independent_samples (int) – the number of times to independently repeat the same inference for a given sample
fault_tolerant_mode (bool) – whether to crash if an error occurs during the inference for a given sample
n_batch_retries (int) – the number of times to retry the batch if an error occurs (only used if fault_tolerant_mode is True)
wait_time_between_retries – the number of seconds to wait before retrying the batch (only used if fault_tolerant_mode is True)
**kwargs – Additional keyword arguments to instantiate the MultiThreadedAPILauncher class.
- classmethod launch(flow_with_interfaces: Dict[str, Any], data: Dict | List[Dict], path_to_output_file: str | None = None) Tuple[List[dict]] ¶
Class method that takes a flow and runs inference on the given data (no multithreading) and no need to instantiate the class.
- Parameters:
flow_with_interfaces (Dict[str, Any]) – A dictionary containing the flow to run inference with and the input and output interfaces to use.
data (Union[Dict, List[Dict]]) – The data to run inference on.
path_to_output_file (Optional[str], optional) – A path to a file to write the outputs to.
- Returns:
A tuple containing the full outputs and the human-readable outputs.
- Return type:
Tuple[List[dict]]
- predict(batch: List[dict])¶
Runs inference for the given batch (possibly in a multithreaded fashion). This method is called by the predict_dataloader method of the MultiThreadedAPILauncher class.
- Parameters:
batch (List[dict]) – The batch to run inference for.
- Returns:
The batch with the inference outputs added to it.
- Return type:
List[dict]
- classmethod predict_batch(flow: Flow, batch: List[dict], input_interface: Interface | None = None, output_interface: Interface | None = None, path_to_output_file: str | None = None, keys_to_write: List[str] | None = None, n_independent_samples: int = 1, fault_tolerant_mode: bool = False, n_batch_retries: int = 1, wait_time_between_retries: int = 1)¶
Class method that runs inference on the given batch for a given flow.
- Parameters:
flow (Flow) – The flow to run inference with.
batch (List[dict]) – The batch to run inference for.
input_interface (Optional[Interface]) – The input interface of the flow. Default: None
output_interface (Optional[Interface]) – The output interface of the flow. Default: None
path_to_output_file (Optional[str]) – A path to a file to write the outputs to. Default: None
keys_to_write (Optional[List[str]]) – A list of keys to write to file. Default: None
n_independent_samples (Optional[int]) – the number of times to independently repeat the same inference for a given sample. Default: 1
- Returns:
The batch with the inference outputs added to it.
- Return type:
List[dict]
- static predict_sample(flow: Flow, sample: Dict, input_interface: Interface | None = None, output_interface: Interface | None = None, fault_tolerant_mode: bool = False, n_batch_retries: int = 1, wait_time_between_retries: int = 1) Tuple[Dict] ¶
Static method that runs inference on a single sample with a given flow.
- Flow:
The flow to run inference with.
- Sample:
The sample to run inference on.
- Input_interface:
The input interface of the flow. Default: None
- Output_interface:
The output interface of the flow. Default: None
- Fault_tolerant_mode:
whether to crash if an error occurs during the inference for a given sample. Default: False
- N_batch_retries:
the number of times to retry the batch if an error occurs (only used if fault_tolerant_mode is True). Default: 1
- Wait_time_between_retries:
the number of seconds to wait before retrying the batch (only used if fault_tolerant_mode is True). Default: 1
- Returns:
A tuple containing the output message, the output data, and the error (if any).
- Return type:
Tuple[Dict]