nvflare.app_common.executors.task_exchanger module

class TaskExchanger(pipe_id: str, read_interval: float = 0.5, heartbeat_interval: float = 5.0, heartbeat_timeout: float | None = 60.0, resend_interval: float = 2.0, max_resends: int | None = None, peer_read_timeout: float | None = 60.0, task_wait_time: float | None = None, result_poll_interval: float = 0.5, pipe_channel_name='task')[source]

Bases: Executor

Constructor of TaskExchanger.

Parameters:
  • pipe_id (str) – component id of pipe.

  • read_interval (float) – how often to read from pipe.

  • heartbeat_interval (float) – how often to send heartbeat to peer.

  • heartbeat_timeout (float, optional) – how long to wait for a heartbeat from the peer before treating the peer as dead, 0 means DO NOT check for heartbeat.

  • resend_interval (float) – how often to resend a message if failing to send. None means no resend. Note that if the pipe does not support resending, then no resend.

  • max_resends (int, optional) – max number of resend. None means no limit. Defaults to None.

  • peer_read_timeout (float, optional) – time to wait for peer to accept sent message.

  • task_wait_time (float, optional) – how long to wait for a task to complete. None means waiting forever. Defaults to None.

  • result_poll_interval (float) – how often to poll task result. Defaults to 0.5.

  • pipe_channel_name – the channel name for sending task requests. Defaults to “task”.

ask_peer_to_end(fl_ctx: FLContext) bool[source]
check_input_shareable(task_name: str, shareable: Shareable, fl_ctx: FLContext) bool[source]

Checks input shareable before execute.

Returns:

True, if input shareable looks good; False, otherwise.

check_output_shareable(task_name: str, shareable: Shareable, fl_ctx: FLContext) bool[source]

Checks output shareable after execute.

Returns:

True, if output shareable looks good; False, otherwise.

execute(task_name: str, shareable: Shareable, fl_ctx: FLContext, abort_signal: Signal) Shareable[source]

The TaskExchanger always sends the Shareable to the peer, and expects to receive a Shareable object from the peer. The peer can convert the Shareable object to whatever format that is best for its applications (e.g. DXO or FLModel object). Similarly, when submitting result, the peer must convert its result object to a Shareable object before sending it back to the TaskExchanger.

This “late-binding” (binding of the Shareable object to an application-friendly object) strategy makes the TaskExchanger generic and can be reused for any applications (e.g. Shareable based, DXO based, or any custom data based).

get_pipe()[source]

Gets pipe.

get_pipe_channel_name()[source]

Gets pipe_channel_name.

handle_event(event_type: str, fl_ctx: FLContext)[source]

Handles events.

Parameters:
  • event_type (str) – event type fired by workflow.

  • fl_ctx (FLContext) – FLContext information.

pause_pipe_handler()[source]

Stops pipe_handler heartbeat.

peer_is_up_or_dead() bool[source]
resume_pipe_handler()[source]

Resumes pipe_handler heartbeat.