nvflare.client.flare_agent module

exception AgentClosed[source]

Bases: FlareAgentException

exception CallStateError[source]

Bases: FlareAgentException

class FlareAgent(pipe: Pipe, read_interval=0.1, heartbeat_interval=5.0, heartbeat_timeout=30.0, resend_interval=2.0, max_resends=None, submit_result_timeout=30.0, metric_pipe=None, task_channel_name: str = 'task', metric_channel_name: str = 'metric', close_pipe: bool = True, close_metric_pipe: bool = True)[source]

Bases: object

Constructor of Flare Agent.

The agent is responsible for communicating with the Flare Client Job cell (CJ) to get task and to submit task result.

Parameters:
  • pipe (Pipe) – pipe for task communication.

  • read_interval (float) – how often to read from the pipe. Defaults to 0.1.

  • heartbeat_interval (float) – how often to send a heartbeat to the peer. Defaults to 5.0.

  • heartbeat_timeout (float) – how long to wait for a heartbeat from the peer before treating the peer as dead, 0 means DO NOT check for heartbeat. Defaults to 30.0.

  • resend_interval (float) – how often to resend a message if failing to send. None means no resend. Note that if the pipe does not support resending, then no resend. Defaults to 2.0.

  • max_resends (int, optional) – max number of resend. None means no limit. Defaults to None.

  • submit_result_timeout (float) – when submitting task result, how long to wait for response from the CJ. Defaults to 30.0.

  • metric_pipe (Pipe, optional) – pipe for metric communication. Defaults to None.

  • task_channel_name (str) – channel name for task. Defaults to task.

  • metric_channel_name (str) – channel name for metric. Defaults to metric.

  • close_pipe (bool) – whether to close the task pipe when stopped. Defaults to True. Usually for FilePipe we set to False, for CellPipe we set to True.

  • close_metric_pipe (bool) – whether to close the metric pipe when stopped. Defaults to True. Usually for FilePipe we set to False, for CellPipe we set to True.

get_task(timeout: float | None = None) Task | None[source]

Get a task from FLARE. This is a blocking call.

Parameters:

timeout (float, optional) – If specified, this call is blocked only for the specified amount of time. If not specified, this call is blocked forever until a task has been received or agent has been closed.

Returns:

None if no task is available before timeout; or a Task object if task is available.

Raises:
  • AgentClosed exception if the agent has been closed before timeout.

  • CallStateError exception if the call has not been made properly.

  • AgentAbortException – If the other endpoint of the pipe requests to abort.

  • AgentEndException – If the other endpoint has ended.

  • AgentPeerGoneException – If the other endpoint is gone.

Note: the application must make the call only when it is just started or after a previous task’s result has been submitted.

log(record: DXO) bool[source]

Logs a metric record.

Parameters:

record (DXO) – A metric record.

Returns:

whether the metric record is submitted successfully

shareable_to_task_data(shareable: Shareable) Any[source]

Convert the Shareable object received from the TaskExchanger to an app-friendly format.

Subclass can override this method to convert to its own app-friendly task data. By default, we convert to DXO object.

Parameters:

shareable – the Shareable object received from the TaskExchanger.

Returns:

task data.

start()[source]

Start the agent.

This method must be called to enable CJ/Agent communication.

Returns: None

stop()[source]

Stop the agent.

After this is called, there will be no more communications between CJ and agent.

Returns: None

submit_result(result, rc='OK') bool[source]

Submit the result of the current task.

This is a blocking call. The agent will try to send the result to flare site until it is successfully sent or the task is aborted or the agent is closed.

Parameters:
  • result – result to be submitted

  • rc – return code

Returns:

whether the result is submitted successfully

Raises:

the CallStateError exception if the submit_result call is not made properly.

Notes: the application must only make this call after the received task is processed. The call can only be made a single time regardless whether the submission is successful.

task_result_to_shareable(result: Any, rc) Shareable[source]

Convert the result object to Shareable object before sending back to the TaskExchanger.

Subclass can override this method to convert its app-friendly result type to Shareable. By default, we expect the result to be DXO object.

Parameters:
  • result – the result object to be converted to Shareable. If None, an empty Shareable object will be created with the rc only.

  • rc – the return code.

Returns:

A Shareable object

exception FlareAgentException[source]

Bases: Exception

class FlareAgentWithCellPipe(agent_id: str, site_name: str, root_url: str, secure_mode: bool, workspace_dir: str, read_interval=0.1, heartbeat_interval=5.0, heartbeat_timeout=30.0, resend_interval=2.0, max_resends=None, submit_result_timeout=30.0, has_metrics=False)[source]

Bases: FlareAgent

Constructor of Flare Agent with Cell Pipe. This is a convenient class.

Parameters:
  • agent_id (str) – unique id to guarantee the uniqueness of cell’s FQCN.

  • site_name (str) – name of the FLARE site

  • root_url (str) – the root url of the cellnet that the pipe’s cell will join

  • secure_mode (bool) – whether connection to the root is secure (TLS)

  • workspace_dir (str) – the directory that contains startup for joining the cellnet. Required only in secure mode

  • read_interval (float) – how often to read from the pipe.

  • heartbeat_interval (float) – how often to send a heartbeat to the peer.

  • heartbeat_timeout (float) – how long to wait for a heartbeat from the peer before treating the peer as gone, 0 means DO NOT check for heartbeat.

  • resend_interval (float) – how often to resend a message if failing to send. None means no resend. Note that if the pipe does not support resending, then no resend.

  • max_resends (int, optional) – max number of resend. None means no limit.

  • submit_result_timeout (float) – when submitting task result, how long to wait for response from the CJ.

  • has_metrics (bool) – has metric pipe or not.

class Task(task_name: str, task_id: str, data)[source]

Bases: object