aiflows.flow_launchers package

Submodules

aiflows.flow_launchers.abstract module

class aiflows.flow_launchers.abstract.BaseLauncher

Bases: ABC

A base class for creating a model launcher.

predict(batch: Iterable[Dict]) List[Dict]

Runs inference for the data provided in the batch. It returns a list of dictionaries containing the predictions. (Not Implemented for BaseLauncher)

Parameters:

batch (Iterable[Dict]) – An iterable of dictionaries containing the data for each sample to run inference on.

Returns:

A list of dictionaries containing the predictions.

Return type:

List[Dict]

predict_dataloader(dataloader: Iterable, path_to_cache: str | None = None)

Runs inference for the data provided in the dataloader. (Not Implemented for BaseLauncher)

Parameters:
  • dataloader (Iterable) – An iterable of dictionaries containing the data for each sample to run inference on.

  • path_to_cache (Optional[str], optional) – A path to a cache file containing existing predictions to use as a starting point.

classmethod write_batch_output(batch: List[Dict], path_to_output_file: str, keys_to_write: List[str])

Class method that writes the output of a batch to a file.

Parameters:
  • batch (List[Dict]) – A list of dictionaries containing the predictions.

  • path_to_output_file (str) – The path to the output file.

  • keys_to_write (List[str]) – A list of keys to write to file.

class aiflows.flow_launchers.abstract.MultiThreadedAPILauncher(**kwargs)

Bases: BaseLauncher, ABC

A class for creating a multi-threaded model to query API that can make requests using multiple API keys.

Parameters:
  • debug (bool, optional) – A boolean indicating whether to print debug information (if true, it will not run the multithreading).

  • output_dir (str, optional) – The directory to write the output files to.

  • n_workers (int, optional) – The number of workers to use in the multithreading.

  • wait_time_per_key (int, optional) – The number of seconds to wait before making another request with the same API key.

  • single_threaded (bool, optional) – A boolean indicating whether to run the multithreading or not.

predict_dataloader(dataloader: Iterable[dict], flows_with_interfaces: List[Dict[str, Any]]) None

Runs inference for the data provided in the dataloader. It writes the results to output files selected from the output_dir attributes.

Parameters:
  • dataloader – An iterable of dictionaries containing the data for each sample to run inference on.

  • flows_with_interfaces(List[Dict]) – A list of dictionaries containing a flow instance, and an input and output interface.

aiflows.flow_launchers.flow_API_launcher module

class aiflows.flow_launchers.flow_API_launcher.FlowLauncher(n_independent_samples: int, fault_tolerant_mode: bool, n_batch_retries: int, wait_time_between_retries: int, **kwargs)

Bases: MultiThreadedAPILauncher

Flow Launcher class for running inference on a flow. One can run the inference with the flow launcher in multiple ways: - Using the launch class method: This method takes a flow and runs inference on the given data (no multithreading) and no need to instantiate the class. - Using the predict_dataloader method: This method runs inference on the given dataloader (Requires instatiating the class). The predict_dataloader`method can run inference in both single-threaded and multi-threaded modes (see the `MultiThreadedAPILauncher class for more details).

Parameters:
  • n_independent_samples (int) – the number of times to independently repeat the same inference for a given sample

  • fault_tolerant_mode (bool) – whether to crash if an error occurs during the inference for a given sample

  • n_batch_retries (int) – the number of times to retry the batch if an error occurs (only used if fault_tolerant_mode is True)

  • wait_time_between_retries – the number of seconds to wait before retrying the batch (only used if fault_tolerant_mode is True)

  • **kwargs – Additional keyword arguments to instantiate the MultiThreadedAPILauncher class.

classmethod launch(flow_with_interfaces: Dict[str, Any], data: Dict | List[Dict], path_to_output_file: str | None = None) Tuple[List[dict]]

Class method that takes a flow and runs inference on the given data (no multithreading) and no need to instantiate the class.

Parameters:
  • flow_with_interfaces (Dict[str, Any]) – A dictionary containing the flow to run inference with and the input and output interfaces to use.

  • data (Union[Dict, List[Dict]]) – The data to run inference on.

  • path_to_output_file (Optional[str], optional) – A path to a file to write the outputs to.

Returns:

A tuple containing the full outputs and the human-readable outputs.

Return type:

Tuple[List[dict]]

predict(batch: List[dict])

Runs inference for the given batch (possibly in a multithreaded fashion). This method is called by the predict_dataloader method of the MultiThreadedAPILauncher class.

Parameters:

batch (List[dict]) – The batch to run inference for.

Returns:

The batch with the inference outputs added to it.

Return type:

List[dict]

classmethod predict_batch(flow: Flow, batch: List[dict], input_interface: Interface | None = None, output_interface: Interface | None = None, path_to_output_file: str | None = None, keys_to_write: List[str] | None = None, n_independent_samples: int = 1, fault_tolerant_mode: bool = False, n_batch_retries: int = 1, wait_time_between_retries: int = 1)

Class method that runs inference on the given batch for a given flow.

Parameters:
  • flow (Flow) – The flow to run inference with.

  • batch (List[dict]) – The batch to run inference for.

  • input_interface (Optional[Interface]) – The input interface of the flow. Default: None

  • output_interface (Optional[Interface]) – The output interface of the flow. Default: None

  • path_to_output_file (Optional[str]) – A path to a file to write the outputs to. Default: None

  • keys_to_write (Optional[List[str]]) – A list of keys to write to file. Default: None

  • n_independent_samples (Optional[int]) – the number of times to independently repeat the same inference for a given sample. Default: 1

Returns:

The batch with the inference outputs added to it.

Return type:

List[dict]

static predict_sample(flow: Flow, sample: Dict, input_interface: Interface | None = None, output_interface: Interface | None = None, fault_tolerant_mode: bool = False, n_batch_retries: int = 1, wait_time_between_retries: int = 1) Tuple[Dict]

Static method that runs inference on a single sample with a given flow.

Flow:

The flow to run inference with.

Sample:

The sample to run inference on.

Input_interface:

The input interface of the flow. Default: None

Output_interface:

The output interface of the flow. Default: None

Fault_tolerant_mode:

whether to crash if an error occurs during the inference for a given sample. Default: False

N_batch_retries:

the number of times to retry the batch if an error occurs (only used if fault_tolerant_mode is True). Default: 1

Wait_time_between_retries:

the number of seconds to wait before retrying the batch (only used if fault_tolerant_mode is True). Default: 1

Returns:

A tuple containing the output message, the output data, and the error (if any).

Return type:

Tuple[Dict]

Module contents