aiflows.workers package

Submodules

aiflows.workers.dispatch_worker module

aiflows.workers.dispatch_worker.create_flow(config: Dict[str, Any], config_overrides: Dict[str, Any] = None, state: Dict[str, Any] = None)

Creates a flow from the given config and state.

Parameters:
  • config (Dict[str, Any]) – flow config

  • config_overrides (Dict[str, Any]) – flow config overrides

  • state (Dict[str, Any]) – flow state

aiflows.workers.dispatch_worker.dispatch_task_handler(cl: CoLink, param: bytes, participants: List[Participant])

Dispatches a task to and runs the appropriate flow on a message.

Parameters:
  • cl (CL.Colink) – The colink object

  • param (bytes) – The serialized dispatch task

  • participants (List[colink.Participant]) – The participants involved in the task

aiflows.workers.dispatch_worker.parse_args()
aiflows.workers.dispatch_worker.run_dispatch_worker_thread(cl, dispatch_point='coflows_dispatch', flow_modules_base_path='', api_infos: List[ApiInfo] = None)

Runs a dispatch worker in a separate thread.

Parameters:
  • cl (CoLink) – colink object

  • dispatch_point (str) – dispatch point to which the workers will subscribe to

  • flow_modules_base_path (str) – path to directory that contains the flow_modules directory

  • api_infos (List[ApiInfo]) – Api Info that the worker should inject into flows when loading them from colink storage. Api Info remains local on the worker and doesn’t get stored in colink storage.

aiflows.workers.dispatch_worker.run_dispatch_worker_threads(cl, num_workers, dispatch_point='coflows_dispatch', flow_modules_base_path='', api_infos: List[ApiInfo] = None)

Runs multiple dispatch workers in separate threads.

Parameters:
  • cl (CoLink) – colink object

  • num_workers (int) – number of workers to run

  • dispatch_point (str) – dispatch point to which the workers will subscribe to

  • flow_modules_base_path (str) – path to directory that contains the flow_modules directory

  • api_infos (List[ApiInfo]) – Api Info that the worker should inject into flows when loading them from colink storage. Api Info remains local on the worker and doesn’t get stored in colink storage.

aiflows.workers.get_instance_worker module

aiflows.workers.get_instance_worker.get_instances_initiator_handler(cl: CoLink, param: bytes, participants: List[Participant])

Initiator handler for get_instances (for fetching flow instances from participants via colink)

Parameters:
  • cl (CoLink) – colink object

  • param (bytes) – parameters

  • participants (List[CL.Participant]) – list of participants involved in task

aiflows.workers.get_instance_worker.get_instances_receiver_handler(cl: CoLink, param: bytes, participants: List[Participant])

Receiver handler for get_instances (for fetching flow instances from participants via colink)

Parameters:
  • cl (CoLink) – colink object

  • param (bytes) – parameters

  • participants (List[CL.Participant]) – list of participants involved in task

aiflows.workers.get_instance_worker.parse_args()
aiflows.workers.get_instance_worker.run_get_instance_worker_thread(cl)

Runs get_instance worker in a thread.

Parameters:

cl (CoLink) – colink object

Module contents