nvflare.apis.impl.controller module¶
- class Controller(task_check_period=0.5)[source]¶
Bases:
Responder
,ControllerSpec
,ABC
Manage life cycles of tasks and their destinations.
- Parameters
task_check_period (float, optional) – interval for checking status of tasks. Defaults to 0.5.
- abort_all_tasks(fl_ctx: FLContext)[source]¶
Ask clients to abort the execution of all tasks.
NOTE: the server should send a notification to all clients, regardless of whether the server has any standing tasks.
- Parameters
fl_ctx (FLContext) – FLContext associated with this action
- abort_task(task, fl_ctx: FLContext)[source]¶
Ask all clients to abort the execution of the specified task.
- Parameters
task (str) – the task to be aborted
fl_ctx (FLContext) – FLContext associated with this action
- broadcast(task: Task, fl_ctx: FLContext, targets: Optional[Union[List[Client], List[str]]] = None, min_responses: int = 1, wait_time_after_min_received: int = 0)[source]¶
Schedule a broadcast task. This is a non-blocking call.
The task is scheduled into a task list. Clients can request tasks and controller will dispatch the task to eligible clients.
- Parameters
task (Task) – the task to be scheduled
fl_ctx (FLContext) – FLContext associated with this task
targets (Union[List[Client], List[str], None], optional) – the list of eligible clients or client names or None (all clients). Defaults to None.
min_responses (int, optional) – the condition to mark this task as completed because enough clients respond with submission. Defaults to 1.
wait_time_after_min_received (int, optional) – a grace period for late clients to contribute their submission. 0 means no grace period. Submission of late clients in the grace period are still collected as valid submission. Defaults to 0.
- Raises
ValueError – min_responses is greater than the length of targets since this condition will make the task, if allowed to be scheduled, never exit.
- broadcast_and_wait(task: Task, fl_ctx: FLContext, targets: Optional[Union[List[Client], List[str]]] = None, min_responses: int = 1, wait_time_after_min_received: int = 0, abort_signal: Optional[Signal] = None)[source]¶
Schedule a broadcast task. This is a blocking call.
The task is scheduled into a task list. Clients can request tasks and controller will dispatch the task to eligible clients.
- Parameters
task (Task) – the task to be scheduled
fl_ctx (FLContext) – FLContext associated with this task
targets (Union[List[Client], List[str], None], optional) – the list of eligible clients or client names or None (all clients). Defaults to None.
min_responses (int, optional) – the condition to mark this task as completed because enough clients respond with submission. Defaults to 1.
wait_time_after_min_received (int, optional) – a grace period for late clients to contribute their submission. 0 means no grace period.
0. (Submission of late clients in the grace period are still collected as valid submission. Defaults to) –
abort_signal (Optional[Signal], optional) – as this is a blocking call, this abort_signal informs this method to return. Defaults to None.
- broadcast_forever(task: Task, fl_ctx: FLContext, targets: Optional[Union[List[Client], List[str]]] = None)[source]¶
Schedule a broadcast task. This is a non-blocking call.
The task is scheduled into a task list. Clients can request tasks and controller will dispatch the task to eligible clients. This broadcast will not end.
- Parameters
task (Task) – the task to be scheduled
fl_ctx (FLContext) – FLContext associated with this task
targets (Union[List[Client], List[str], None], optional) – the list of eligible clients or client names or None (all clients). Defaults to None.
- cancel_all_tasks(completion_status=TaskCompletionStatus.CANCELLED, fl_ctx: Optional[FLContext] = None)[source]¶
Cancel all standing tasks in this controller.
- Parameters
completion_status (str, optional) – the completion status for this cancellation. Defaults to TaskCompletionStatus.CANCELLED.
fl_ctx (Optional[FLContext], optional) – FLContext associated with this cancellation. Defaults to None.
- cancel_task(task: Task, completion_status=TaskCompletionStatus.CANCELLED, fl_ctx: Optional[FLContext] = None)[source]¶
Cancel the specified task.
Change the task completion_status, which will inform task monitor to clean up this task NOTE: we only mark the task as completed and leave it to the task monitor to clean up This is to avoid potential dead lock of task_lock
- Parameters
task (Task) – the task to be cancelled
completion_status (str, optional) – the completion status for this cancellation. Defaults to TaskCompletionStatus.CANCELLED.
fl_ctx (Optional[FLContext], optional) – FLContext associated with this cancellation. Defaults to None.
- finalize_run(fl_ctx: FLContext)[source]¶
Do cleanup of the coordinator implementation.
NOTE: subclass controllers should not overwrite finalize_run.
- Parameters
fl_ctx (FLContext) – FLContext associated with this action
- get_num_standing_tasks() int [source]¶
Get the number of tasks that are currently standing.
- Returns
length of the list of standing tasks
- Return type
int
- handle_event(event_type: str, fl_ctx: FLContext)[source]¶
Called when events are fired.
- Parameters
event_type (str) – all event types, including AppEventType and EventType
fl_ctx (FLContext) – FLContext information with current event type
- handle_exception(task_id: str, fl_ctx: FLContext) None [source]¶
Called to cancel one task as its client_task is causing exception at upper level.
- Parameters
task_id (str) – an id to the failing client_task
fl_ctx (FLContext) – FLContext associated with this client_task
- initialize_run(fl_ctx: FLContext)[source]¶
Called by runners to initialize controller with information in fl_ctx.
Note: Controller subclasses must not overwrite this method.
- Parameters
fl_ctx (FLContext) – FLContext information
- process_submission(client: Client, task_name: str, task_id: str, result: Shareable, fl_ctx: FLContext)[source]¶
Called to process a submission from one client.
Note: this method is called by a separate thread.
- Parameters
client (Client) – the client that submitted this task
task_name (str) – the task name associated this submission
task_id (str) – the id associated with the client_task
result (Shareable) – the actual submitted data from the client
fl_ctx (FLContext) – the FLContext associated with this submission
- Raises
TypeError – when client is not an instance of Client
TypeError – when fl_ctx is not an instance of FLContext
TypeError – when result is not an instance of Shareable
ValueError – task_name is not found in the client_task
- process_task_request(client: Client, fl_ctx: FLContext) Tuple[str, str, Shareable] [source]¶
Called by runner when a client asks for a task.
Note: this is called in a separate thread.
- Parameters
client (Client) – The record of one client requesting tasks
fl_ctx (FLContext) – The FLContext associated with this request
- Raises
TypeError – when client is not an instance of Client
TypeError – when fl_ctx is not an instance of FLContext
TypeError – when any standing task containing an invalid client_task
- Returns
task_name, an id for the client_task, and the data for this requst
- Return type
Tuple[str, str, Shareable]
- relay(task: Task, fl_ctx: FLContext, targets: Optional[Union[List[Client], List[str]]] = None, send_order: SendOrder = SendOrder.SEQUENTIAL, task_assignment_timeout: int = 0, task_result_timeout: int = 0, dynamic_targets: bool = True)[source]¶
Schedule a single task to targets in one-after-another style. This is a non-blocking call.
The task is scheduled into a task list. Clients can request tasks and controller will dispatch the task to eligible clients based on the send_order.
- Parameters
task (Task) – the task to be scheduled
fl_ctx (FLContext) – FLContext associated with this task
targets (Union[List[Client], List[str], None], optional) – the list of eligible clients or client names or None (all clients). Defaults to None.
send_order (SendOrder, optional) – the order for clients to become eligible. SEQUENTIAL means the order in targets is enforced. ANY means any clients that are inside the targets and haven’t received the task are eligible. Defaults to SendOrder.SEQUENTIAL.
task_assignment_timeout (int, optional) – how long to wait for one client to pick the task. Defaults to 0.
task_result_timeout (int, optional) – how long to wait for current working client to reply its result. Defaults to 0.
dynamic_targets (bool, optional) – allow clients not in targets to join at the end of targets list. Defaults to True.
- Raises
ValueError – when task_assignment_timeout is greater than task’s timeout
ValueError – when task_result_timeout is greater than task’s timeout
TypeError – send_order is not defined in SendOrder
TypeError – when dynamic_targets is not a boolean variable
ValueError – targets is None or an empty list but dynamic_targets is False
- relay_and_wait(task: Task, fl_ctx: FLContext, targets: Optional[Union[List[Client], List[str]]] = None, send_order=SendOrder.SEQUENTIAL, task_assignment_timeout: int = 0, task_result_timeout: int = 0, dynamic_targets: bool = True, abort_signal: Optional[Signal] = None)[source]¶
Schedule a single task to targets in one-after-another style. This is a blocking call.
The task is scheduled into a task list. Clients can request tasks and controller will dispatch the task to eligible clients based on the send_order.
- Parameters
task (Task) – the task to be scheduled
fl_ctx (FLContext) – FLContext associated with this task
targets (Union[List[Client], List[str], None], optional) – the list of eligible clients or client names or None (all clients). Defaults to None.
send_order (SendOrder, optional) – the order for clients to become eligible. SEQUENTIAL means the order in targets is enforced. ANY means
SendOrder.SEQUENTIAL. (clients in targets and haven't received task are eligible for task. Defaults to) –
task_assignment_timeout (int, optional) – how long to wait for one client to pick the task. Defaults to 0.
task_result_timeout (int, optional) – how long to wait for current working client to reply its result. Defaults to 0.
dynamic_targets (bool, optional) – allow clients not in targets to join at the end of targets list. Defaults to True.
abort_signal (Optional[Signal], optional) – as this is a blocking call, this abort_signal informs this method to return. Defaults to None.
- send(task: Task, fl_ctx: FLContext, targets: Optional[Union[List[Client], List[str]]] = None, send_order: SendOrder = SendOrder.SEQUENTIAL, task_assignment_timeout: int = 0)[source]¶
Schedule a single task to targets. This is a non-blocking call.
The task is scheduled into a task list. Clients can request tasks and controller will dispatch the task to eligible clients based on the send_order.
- Parameters
task (Task) – the task to be scheduled
fl_ctx (FLContext) – FLContext associated with this task
targets (Union[List[Client], List[str], None], optional) – the list of eligible clients or client names or None (all clients). Defaults to None.
send_order (SendOrder, optional) – the order for clients to become eligible. SEQUENTIAL means the order in targets is enforced. ANY means
SendOrder.SEQUENTIAL. (clients in targets and haven't received task are eligible for task. Defaults to) –
task_assignment_timeout (int, optional) – how long to wait for one client to pick the task. Defaults to 0.
- Raises
ValueError – when task_assignment_timeout is greater than task’s timeout.
TypeError – send_order is not defined in SendOrder
ValueError – targets is None or an empty list
- send_and_wait(task: Task, fl_ctx: FLContext, targets: Optional[Union[List[Client], List[str]]] = None, send_order: SendOrder = SendOrder.SEQUENTIAL, task_assignment_timeout: int = 0, abort_signal: Optional[Signal] = None)[source]¶
Schedule a single task to targets. This is a blocking call.
The task is scheduled into a task list. Clients can request tasks and controller will dispatch the task to eligible clients based on the send_order.
- Parameters
task (Task) – the task to be scheduled
fl_ctx (FLContext) – FLContext associated with this task
targets (Union[List[Client], List[str], None], optional) – the list of eligible clients or client names or None (all clients). Defaults to None.
send_order (SendOrder, optional) – the order for clients to become eligible. SEQUENTIAL means the order in targets is enforced. ANY means
SendOrder.SEQUENTIAL. (clients in targets and haven't received task are eligible for task. Defaults to) –
task_assignment_timeout (int, optional) – how long to wait for one client to pick the task. Defaults to 0.
abort_signal (Optional[Signal], optional) – as this is a blocking call, this abort_signal informs this method to return. Defaults to None.