nvflare.client.flare_agent module¶
- exception AgentClosed[source]¶
Bases:
FlareAgentException
- exception CallStateError[source]¶
Bases:
FlareAgentException
- class FlareAgent(pipe: Pipe, read_interval=0.1, heartbeat_interval=5.0, heartbeat_timeout=30.0, resend_interval=2.0, max_resends=None, submit_result_timeout=30.0, metric_pipe=None, task_channel_name: str = 'task', metric_channel_name: str = 'metric', close_pipe: bool = True, close_metric_pipe: bool = True)[source]¶
Bases:
object
Constructor of Flare Agent.
The agent is responsible for communicating with the Flare Client Job cell (CJ) to get task and to submit task result.
- Parameters:
pipe (Pipe) – pipe for task communication.
read_interval (float) – how often to read from the pipe. Defaults to 0.1.
heartbeat_interval (float) – how often to send a heartbeat to the peer. Defaults to 5.0.
heartbeat_timeout (float) – how long to wait for a heartbeat from the peer before treating the peer as dead, 0 means DO NOT check for heartbeat. Defaults to 30.0.
resend_interval (float) – how often to resend a message if failing to send. None means no resend. Note that if the pipe does not support resending, then no resend. Defaults to 2.0.
max_resends (int, optional) – max number of resend. None means no limit. Defaults to None.
submit_result_timeout (float) – when submitting task result, how long to wait for response from the CJ. Defaults to 30.0.
metric_pipe (Pipe, optional) – pipe for metric communication. Defaults to None.
task_channel_name (str) – channel name for task. Defaults to
task
.metric_channel_name (str) – channel name for metric. Defaults to
metric
.close_pipe (bool) – whether to close the task pipe when stopped. Defaults to True. Usually for
FilePipe
we set to False, forCellPipe
we set to True.close_metric_pipe (bool) – whether to close the metric pipe when stopped. Defaults to True. Usually for
FilePipe
we set to False, forCellPipe
we set to True.
- get_task(timeout: float | None = None) Task | None [source]¶
Get a task from FLARE. This is a blocking call.
- Parameters:
timeout (float, optional) – If specified, this call is blocked only for the specified amount of time. If not specified, this call is blocked forever until a task has been received or agent has been closed.
- Returns:
None if no task is available before timeout; or a Task object if task is available.
- Raises:
AgentClosed exception if the agent has been closed before timeout. –
CallStateError exception if the call has not been made properly. –
AgentAbortException – If the other endpoint of the pipe requests to abort.
AgentEndException – If the other endpoint has ended.
AgentPeerGoneException – If the other endpoint is gone.
Note: the application must make the call only when it is just started or after a previous task’s result has been submitted.
- log(record: DXO) bool [source]¶
Logs a metric record.
- Parameters:
record (DXO) – A metric record.
- Returns:
whether the metric record is submitted successfully
Convert the Shareable object received from the TaskExchanger to an app-friendly format.
Subclass can override this method to convert to its own app-friendly task data. By default, we convert to DXO object.
- Parameters:
shareable – the Shareable object received from the TaskExchanger.
- Returns:
task data.
- start()[source]¶
Start the agent.
This method must be called to enable CJ/Agent communication.
Returns: None
- stop()[source]¶
Stop the agent.
After this is called, there will be no more communications between CJ and agent.
Returns: None
- submit_result(result, rc='OK') bool [source]¶
Submit the result of the current task.
This is a blocking call. The agent will try to send the result to flare site until it is successfully sent or the task is aborted or the agent is closed.
- Parameters:
result – result to be submitted
rc – return code
- Returns:
whether the result is submitted successfully
- Raises:
the CallStateError exception if the submit_result call is not made properly. –
Notes: the application must only make this call after the received task is processed. The call can only be made a single time regardless whether the submission is successful.
Convert the result object to Shareable object before sending back to the TaskExchanger.
Subclass can override this method to convert its app-friendly result type to Shareable. By default, we expect the result to be DXO object.
- Parameters:
result – the result object to be converted to Shareable. If None, an empty Shareable object will be created with the rc only.
rc – the return code.
- Returns:
A Shareable object
- class FlareAgentWithCellPipe(agent_id: str, site_name: str, root_url: str, secure_mode: bool, workspace_dir: str, read_interval=0.1, heartbeat_interval=5.0, heartbeat_timeout=30.0, resend_interval=2.0, max_resends=None, submit_result_timeout=30.0, has_metrics=False)[source]¶
Bases:
FlareAgent
Constructor of Flare Agent with Cell Pipe. This is a convenient class.
- Parameters:
agent_id (str) – unique id to guarantee the uniqueness of cell’s FQCN.
site_name (str) – name of the FLARE site
root_url (str) – the root url of the cellnet that the pipe’s cell will join
secure_mode (bool) – whether connection to the root is secure (TLS)
workspace_dir (str) – the directory that contains startup for joining the cellnet. Required only in secure mode
read_interval (float) – how often to read from the pipe.
heartbeat_interval (float) – how often to send a heartbeat to the peer.
heartbeat_timeout (float) – how long to wait for a heartbeat from the peer before treating the peer as gone, 0 means DO NOT check for heartbeat.
resend_interval (float) – how often to resend a message if failing to send. None means no resend. Note that if the pipe does not support resending, then no resend.
max_resends (int, optional) – max number of resend. None means no limit.
submit_result_timeout (float) – when submitting task result, how long to wait for response from the CJ.
has_metrics (bool) – has metric pipe or not.