nvflare.apis.impl.wf_comm_server module¶
- class WFCommServer(task_check_period=0.2)[source]¶
Bases:
FLComponent
,WFCommSpec
Manage life cycles of tasks and their destinations.
- Parameters:
task_check_period (float, optional) – interval for checking status of tasks. Defaults to 0.2.
- broadcast(task: Task, fl_ctx: FLContext, targets: List[Client] | List[str] | None = None, min_responses: int = 1, wait_time_after_min_received: int = 0)[source]¶
Schedule a broadcast task. This is a non-blocking call.
The task is scheduled into a task list. Clients can request tasks and controller will dispatch the task to eligible clients.
- Parameters:
task (Task) – the task to be scheduled
fl_ctx (FLContext) – FLContext associated with this task
targets (Union[List[Client], List[str], None], optional) – the list of eligible clients or client names or None (all clients). Defaults to None.
min_responses (int, optional) – the condition to mark this task as completed because enough clients respond with submission. Defaults to 1.
wait_time_after_min_received (int, optional) –
- a grace period for late clients to contribute their
submission. 0 means no grace period.
Submission of late clients in the grace period are still collected as valid submission. Defaults to 0.
- Raises:
ValueError – min_responses is greater than the length of targets since this condition will make the task, if allowed to be scheduled, never exit.
- broadcast_and_wait(task: Task, fl_ctx: FLContext, targets: List[Client] | List[str] | None = None, min_responses: int = 1, wait_time_after_min_received: int = 0, abort_signal: Signal | None = None)[source]¶
Schedule a broadcast task. This is a blocking call.
The task is scheduled into a task list. Clients can request tasks and controller will dispatch the task to eligible clients.
- Parameters:
task (Task) – the task to be scheduled
fl_ctx (FLContext) – FLContext associated with this task
targets (Union[List[Client], List[str], None], optional) – the list of eligible clients or client names or None (all clients). Defaults to None.
min_responses (int, optional) – the condition to mark this task as completed because enough clients respond with submission. Defaults to 1.
wait_time_after_min_received (int, optional) – a grace period for late clients to contribute their submission. 0 means no grace period.
0. (Submission of late clients in the grace period are still collected as valid submission. Defaults to)
abort_signal (Optional[Signal], optional) – as this is a blocking call, this abort_signal informs this method to return. Defaults to None.
- broadcast_forever(task: Task, fl_ctx: FLContext, targets: List[Client] | List[str] | None = None)[source]¶
Schedule a broadcast task. This is a non-blocking call.
The task is scheduled into a task list. Clients can request tasks and controller will dispatch the task to eligible clients.
This broadcast will not end.
- Parameters:
task (Task) – the task to be scheduled
fl_ctx (FLContext) – FLContext associated with this task
targets (Union[List[Client], List[str], None], optional) – the list of eligible clients or client names or None (all clients). Defaults to None.
- cancel_all_tasks(completion_status=TaskCompletionStatus.CANCELLED, fl_ctx: FLContext | None = None)[source]¶
Cancel all standing tasks in this controller.
- Parameters:
completion_status (str, optional) – the completion status for this cancellation. Defaults to TaskCompletionStatus.CANCELLED.
fl_ctx (Optional[FLContext], optional) – FLContext associated with this cancellation. Defaults to None.
- cancel_task(task: Task, completion_status=TaskCompletionStatus.CANCELLED, fl_ctx: FLContext | None = None)[source]¶
Cancel the specified task.
Change the task completion_status, which will inform task monitor to clean up this task
note:
We only mark the task as completed and leave it to the task monitor to clean up. This is to avoid potential deadlock of task_lock.
- Parameters:
task (Task) – the task to be cancelled
completion_status (str, optional) – the completion status for this cancellation. Defaults to TaskCompletionStatus.CANCELLED.
fl_ctx (Optional[FLContext], optional) – FLContext associated with this cancellation. Defaults to None.
- client_is_active(client_name: str, reason: str, fl_ctx: FLContext)[source]¶
Called by the Engine to notify us that the client is active .
- Parameters:
client_name – name of the client that is active
reason – why client is considered active
fl_ctx – the FLContext
- finalize_run(fl_ctx: FLContext)[source]¶
Do cleanup of the coordinator implementation.
Attention
Subclass controllers should not overwrite finalize_run.
- Parameters:
fl_ctx (FLContext) – FLContext associated with this action
- get_client_disconnect_time(client_name: str)[source]¶
Get the time that the client was deemed disconnected
- Parameters:
client_name – name of the client
Returns: time at which the client was deemed disconnected; or None if the client is not disconnected
- get_num_standing_tasks() int [source]¶
Get the number of tasks that are currently standing.
- Returns:
length of the list of standing tasks
- Return type:
int
- handle_event(event_type: str, fl_ctx: FLContext)[source]¶
Called when events are fired.
- Parameters:
event_type (str) – all event types, including AppEventType and EventType
fl_ctx (FLContext) – FLContext information with current event type
- handle_exception(task_id: str, fl_ctx: FLContext) None [source]¶
Called to cancel one task as its client_task is causing exception at upper level.
- Parameters:
task_id (str) – an id to the failing client_task
fl_ctx (FLContext) – FLContext associated with this client_task
- initialize_run(fl_ctx: FLContext)[source]¶
Called by runners to initialize controller with information in fl_ctx.
Attention
Controller subclasses must not overwrite this method.
- Parameters:
fl_ctx (FLContext) – FLContext information
- process_dead_client_report(client_name: str, fl_ctx: FLContext)[source]¶
Called by the Engine to process dead client report.
- Parameters:
client_name – name of the client that dead report is received
fl_ctx – the FLContext
- process_submission(client: Client, task_name: str, task_id: str, result: Shareable, fl_ctx: FLContext)[source]¶
Called to process a submission from one client.
Note
This method is called by a separate thread.
- Parameters:
client (Client) – the client that submitted this task
task_name (str) – the task name associated this submission
task_id (str) – the id associated with the client_task
result (Shareable) – the actual submitted data from the client
fl_ctx (FLContext) – the FLContext associated with this submission
- Raises:
TypeError – when client is not an instance of Client
TypeError – when fl_ctx is not an instance of FLContext
TypeError – when result is not an instance of Shareable
ValueError – task_name is not found in the client_task
- process_task_check(task_id: str, fl_ctx: FLContext)[source]¶
Called by the Engine to check whether a specified task still exists. :param task_id: the id of the task :param fl_ctx: the FLContext
Returns: the ClientTask object if exists; None otherwise
- process_task_request(client: Client, fl_ctx: FLContext) Tuple[str, str, Shareable] [source]¶
Called by runner when a client asks for a task.
Note
This is called in a separate thread.
- Parameters:
client (Client) – The record of one client requesting tasks
fl_ctx (FLContext) – The FLContext associated with this request
- Raises:
TypeError – when client is not an instance of Client
TypeError – when fl_ctx is not an instance of FLContext
TypeError – when any standing task containing an invalid client_task
- Returns:
task_name, an id for the client_task, and the data for this request
- Return type:
Tuple[str, str, Shareable]
- relay(task: Task, fl_ctx: FLContext, targets: List[Client] | List[str] | None = None, send_order: SendOrder = SendOrder.SEQUENTIAL, task_assignment_timeout: int = 0, task_result_timeout: int = 0, dynamic_targets: bool = True)[source]¶
Schedule a single task to targets in one-after-another style. This is a non-blocking call.
The task is scheduled into a task list. Clients can request tasks and controller will dispatch the task to eligible clients based on the send_order.
- Parameters:
task (Task) – the task to be scheduled
fl_ctx (FLContext) – FLContext associated with this task
targets (Union[List[Client], List[str], None], optional) – the list of eligible clients or client names or None (all clients). Defaults to None.
send_order (SendOrder, optional) – the order for clients to become eligible. SEQUENTIAL means the order in targets is enforced. ANY means any clients that are inside the targets and haven’t received the task are eligible. Defaults to SendOrder.SEQUENTIAL.
task_assignment_timeout (int, optional) – how long to wait for one client to pick the task. Defaults to 0.
task_result_timeout (int, optional) – how long to wait for current working client to reply its result. Defaults to 0.
dynamic_targets (bool, optional) – allow clients not in targets to join at the end of targets list. Defaults to True.
- Raises:
ValueError – when task_assignment_timeout is greater than task’s timeout
ValueError – when task_result_timeout is greater than task’s timeout
TypeError – send_order is not defined in SendOrder
TypeError – when dynamic_targets is not a boolean variable
ValueError – targets is None or an empty list but dynamic_targets is False
- relay_and_wait(task: Task, fl_ctx: FLContext, targets: List[Client] | List[str] | None = None, send_order=SendOrder.SEQUENTIAL, task_assignment_timeout: int = 0, task_result_timeout: int = 0, dynamic_targets: bool = True, abort_signal: Signal | None = None)[source]¶
Schedule a single task to targets in one-after-another style. This is a blocking call.
The task is scheduled into a task list. Clients can request tasks and controller will dispatch the task to eligible clients based on the send_order.
- Parameters:
task (Task) – the task to be scheduled
fl_ctx (FLContext) – FLContext associated with this task
targets (Union[List[Client], List[str], None], optional) – the list of eligible clients or client names or None (all clients). Defaults to None.
send_order (SendOrder, optional) – the order for clients to become eligible. SEQUENTIAL means the order in targets is enforced. ANY means clients in targets and haven’t received task are eligible for task. Defaults to SendOrder.SEQUENTIAL.
task_assignment_timeout (int, optional) – how long to wait for one client to pick the task. Defaults to 0.
task_result_timeout (int, optional) – how long to wait for current working client to reply its result. Defaults to 0.
dynamic_targets (bool, optional) – allow clients not in targets to join at the end of targets list. Defaults to True.
abort_signal (Optional[Signal], optional) – as this is a blocking call, this abort_signal informs this method to return. Defaults to None.
- send(task: Task, fl_ctx: FLContext, targets: List[Client] | List[str] | None = None, send_order: SendOrder = SendOrder.SEQUENTIAL, task_assignment_timeout: int = 0)[source]¶
Schedule a single task to targets. This is a non-blocking call.
The task is scheduled into a task list. Clients can request tasks and controller will dispatch the task to eligible clients based on the send_order.
- Parameters:
task (Task) – the task to be scheduled
fl_ctx (FLContext) – FLContext associated with this task
targets (Union[List[Client], List[str], None], optional) – the list of eligible clients or client names or None (all clients). Defaults to None.
send_order (SendOrder, optional) – the order for clients to become eligible. SEQUENTIAL means the order in targets is enforced. ANY means clients in targets and haven’t received task are eligible for task. Defaults to SendOrder.SEQUENTIAL.
task_assignment_timeout (int, optional) – how long to wait for one client to pick the task. Defaults to 0.
- Raises:
ValueError – when task_assignment_timeout is greater than task’s timeout.
TypeError – send_order is not defined in SendOrder
ValueError – targets is None or an empty list
- send_and_wait(task: Task, fl_ctx: FLContext, targets: List[Client] | List[str] | None = None, send_order: SendOrder = SendOrder.SEQUENTIAL, task_assignment_timeout: int = 0, abort_signal: Signal | None = None)[source]¶
Schedule a single task to targets. This is a blocking call.
The task is scheduled into a task list. Clients can request tasks and controller will dispatch the task to eligible clients based on the send_order.
- Parameters:
task (Task) – the task to be scheduled
fl_ctx (FLContext) – FLContext associated with this task
targets (Union[List[Client], List[str], None], optional) – the list of eligible clients or client names or None (all clients). Defaults to None.
send_order (SendOrder, optional) – the order for clients to become eligible. SEQUENTIAL means the order in targets is enforced. ANY means clients in targets and haven’t received task are eligible for task. Defaults to SendOrder.SEQUENTIAL.
task_assignment_timeout (int, optional) – how long to wait for one client to pick the task. Defaults to 0.
abort_signal (Optional[Signal], optional) – as this is a blocking call, this abort_signal informs this method to return. Defaults to None.