nvflare.apis.controller_spec module

class ClientTask(client: Client, task: Task)[source]

Bases: object

ClientTask records the processing information of a task for a client.

Init ClientTask.

Parameters:
  • client – the client

  • task – the processing information of this task will be recorded

class ControllerSpec[source]

Bases: ABC

broadcast(task: Task, fl_ctx: FLContext, targets: List[Client] | List[str] | None = None, min_responses: int = 0, wait_time_after_min_received: int = 0)[source]

Schedule to broadcast the task to specified targets.

This is a non-blocking call.

The task is standing until one of the following conditions comes true:
  • if timeout is specified (> 0), and the task has been standing for more than the specified time

  • the controller has received the specified min_responses results for this task, and all target clients are done.

  • the controller has received the specified min_responses results for this task, and has waited for wait_time_after_min_received.

While the task is standing:
  • Before sending the task to a client, the before_task_sent CB (if specified) is called;

  • When a result is received from a client, the result_received CB (if specified) is called;

After the task is done, the task_done CB (if specified) is called:
  • If result_received CB is specified, the ‘result’ in the ClientTask of each client is produced by the result_received CB;

  • Otherwise, the ‘result’ contains the original result submitted by the clients;

NOTE: if the targets is None, the actual broadcast target clients will be dynamic, because the clients could join/disconnect at any moment. While the task is standing, any client that joins automatically becomes a target for this broadcast.

Parameters:
  • task – the task to be sent

  • fl_ctx – the FL context

  • targets – list of destination clients. None means all clients are determined dynamically;

  • min_responses – the min number of responses expected. If == 0, must get responses from all clients that the task has been sent to;

  • wait_time_after_min_received – how long (secs) to wait after the min_responses is received. If == 0, end the task immediately after the min responses are received;

broadcast_and_wait(task: Task, fl_ctx: FLContext, targets: List[Client] | List[str] | None = None, min_responses: int = 0, wait_time_after_min_received: int = 0, abort_signal: Signal | None = None)[source]

This is the blocking version of the ‘broadcast’ method.

First, the task is scheduled for broadcast (see the broadcast method); It then waits until the task is completed.

Parameters:
  • task – the task to be sent

  • fl_ctx – the FL context

  • targets – list of destination clients. None means all clients are determined dynamically.

  • min_responses – the min number of responses expected. If == 0, must get responses from all clients that the task has been sent to;

  • wait_time_after_min_received – how long (secs) to wait after the min_responses is received. If == 0, end the task immediately after the min responses are received;

  • abort_signal – the abort signal. If triggered, this method stops waiting and returns to the caller.

broadcast_forever(task: Task, fl_ctx: FLContext, targets: List[Client] | List[str] | None = None)[source]

Schedule a broadcast task that never ends until timeout or explicitly cancelled.

All clients will get the task every time it asks for a new task. This is a non-blocking call.

NOTE: you can change the content of the task in the before_task_sent function.

Parameters:
  • task – the task to be sent

  • fl_ctx – the FL context

  • targets – list of destination clients. None means all clients are determined dynamically.

cancel_all_tasks(completion_status=TaskCompletionStatus.CANCELLED, fl_ctx: FLContext | None = None)[source]

Cancels all standing tasks.

Parameters:
  • completion_status – the TaskCompletionStatus of the task

  • fl_ctx – the FL context

cancel_task(task: Task, completion_status: TaskCompletionStatus = TaskCompletionStatus.CANCELLED, fl_ctx: FLContext | None = None)[source]

Cancels the specified task.

If the task is standing, the task is cancelled immediately (and removed from job queue) and calls the task_done CB (if specified);

If the task is not standing, this method has no effect.

Parameters:
  • task – the task to be cancelled

  • completion_status – the TaskCompletionStatus of the task

  • fl_ctx – the FL context

abstract control_flow(abort_signal: Signal, fl_ctx: FLContext)[source]

This is the control logic for the RUN.

NOTE: this is running in a separate thread, and its life is the duration of the RUN.

Parameters:
  • fl_ctx – the FL context

  • abort_signal – the abort signal. If triggered, this method stops waiting and returns to the caller.

get_client_disconnect_time(client_name)[source]

Get the time that the client is deemed disconnected.

Parameters:

client_name – the name of the client

Returns: time at which the client was deemed disconnected; or None if the client is not disconnected.

get_num_standing_tasks() int[source]

Gets tasks that are currently standing.

Returns: length of the list of standing tasks

process_result_of_unknown_task(client: Client, task_name: str, client_task_id: str, result: Shareable, fl_ctx: FLContext)[source]

Process result when no task is found for it.

This is called when a result submission is received from a client, but no standing task can be found for it (from the task queue)

This could happen when: - the client’s submission is too late - the task is already completed - the Controller lost the task, e.g. the Server is restarted

Parameters:
  • client – the client that the result comes from

  • task_name – the name of the task

  • client_task_id – ID of the task

  • result – the result from the client

  • fl_ctx – the FL context that comes with the client’s submission

relay(task: Task, fl_ctx: FLContext, targets: List[Client] | List[str] | None = None, send_order: SendOrder = SendOrder.SEQUENTIAL, task_assignment_timeout: int = 0, task_result_timeout: int = 0, dynamic_targets: bool = True)[source]

Schedules a task to be done sequentially by the clients in the targets list. This is a non-blocking call.

Parameters:
  • task – the task to be performed by each client

  • fl_ctx – the FL context for scheduling the task

  • targets – list of clients. If None, all clients.

  • send_order – how to choose the next client

  • task_assignment_timeout – how long to wait for the expected client to get assigned

  • client. (before assigning to next) –

  • task_result_timeout – how long to wait for result from the assigned client before giving up.

  • dynamic_targets – whether to dynamically grow the target list. If True, then the target list is

  • joins. (expanded dynamically when a new client) –

relay_and_wait(task: Task, fl_ctx: FLContext, targets: List[Client] | List[str] | None = None, send_order=SendOrder.SEQUENTIAL, task_assignment_timeout: int = 0, task_result_timeout: int = 0, dynamic_targets: bool = True, abort_signal: Signal | None = None)[source]

This is the blocking version of ‘relay’.

send(task: Task, fl_ctx: FLContext, targets: List[Client] | List[str] | None = None, send_order: SendOrder = SendOrder.SEQUENTIAL, task_assignment_timeout: int = 0)[source]

Schedule to send the task to a single target client.

This is a non-blocking call.

In ANY order, the target client is the first target that asks for task. In SEQUENTIAL order, the controller will try its best to send the task to the first client in the targets list. If can’t, it will try the next target, and so on.

NOTE: if the ‘targets’ is None, the actual target clients will be dynamic, because the clients could join/disconnect at any moment. While the task is standing, any client that joins automatically becomes a target for this task.

If the send_order is SEQUENTIAL, the targets must be a non-empty list of client names.

Parameters:
  • task – the task to be sent

  • fl_ctx – the FL context

  • targets – list of candidate target clients.

  • send_order – how to choose the client to send the task.

  • task_assignment_timeout – in SEQUENTIAL order, this is the wait time for trying a target client, before trying next target.

send_and_wait(task: Task, fl_ctx: FLContext, targets: List[Client] | List[str] | None = None, send_order: SendOrder = SendOrder.SEQUENTIAL, task_assignment_timeout: int = 0, abort_signal: Signal | None = None)[source]

This is the blocking version of the ‘send’ method.

First, the task is scheduled for send (see the ‘send’ method); It then waits until the task is completed and returns the task completion status and collected result.

Parameters:
  • task – the task to be performed by each client

  • fl_ctx – the FL context for scheduling the task

  • targets – list of clients. If None, all clients.

  • send_order – how to choose the next client

  • task_assignment_timeout – how long to wait for the expected client to get assigned

  • client. (before assigning to next) –

  • abort_signal – the abort signal. If triggered, this method stops waiting and returns to the caller.

abstract start_controller(fl_ctx: FLContext)[source]

Starts the controller.

This method is called at the beginning of the RUN.

Parameters:
  • fl_ctx – the FL context. You can use this context to access services provided by the

  • example (framework. For) –

  • your (you can get Command Register from it and register) –

  • modules. (admin command) –

abstract stop_controller(fl_ctx: FLContext)[source]

Stops the controller.

This method is called right before the RUN is ended.

Parameters:
  • fl_ctx – the FL context. You can use this context to access services provided by the

  • example (framework. For) –

  • your (you can get Command Register from it and unregister) –

  • modules. (admin command) –

class OperatorConfigKey[source]

Bases: object

OPERATORS = 'operators'
class OperatorMethod[source]

Bases: object

BROADCAST = 'bcast'
RELAY = 'relay'
class SendOrder(value)[source]

Bases: Enum

An enumeration.

ANY = 'any'
SEQUENTIAL = 'sequential'
class Task(name: str, data: Shareable, props: Dict | None = None, timeout: int = 0, before_task_sent_cb=None, after_task_sent_cb=None, result_received_cb=None, task_done_cb=None, operator=None, secure=False)[source]

Bases: object

Init the Task.

A task is a piece of work that is assigned by the Controller to client workers. Depending on how the task is assigned (broadcast, send, or relay), the task will be performed by one or more clients.

Parameters:
  • name (str) – name of the task

  • data (Shareable) – data of the task

  • props – Any additional properties of the task

  • timeout – How long this task will last. If == 0, the task never time out (WFCommServer-> never time out, WFCommClient-> time out after max_task_timeout).

  • before_task_sent_cb – If provided, this callback would be called before controller sends the tasks to clients. It needs to follow the before_task_sent_cb_signature.

  • after_task_sent_cb – If provided, this callback would be called after controller sends the tasks to clients. It needs to follow the after_task_sent_cb_signature.

  • result_received_cb – If provided, this callback would be called when controller receives results from clients. It needs to follow the result_received_cb_signature.

  • task_done_cb – If provided, this callback would be called when task is done. It needs to follow the task_done_cb_signature.

  • operator – task operator that describes the operation of the task

  • secure – should this task be transmitted in a secure way

get_prop(key)[source]
set_prop(key, value)[source]
class TaskCompletionStatus(value)[source]

Bases: Enum

An enumeration.

ABORTED = 'aborted'
CANCELLED = 'cancelled'
CLIENT_DEAD = 'client_dead'
ERROR = 'error'
IGNORED = 'ignored'
OK = 'ok'
TIMEOUT = 'timeout'
class TaskOperatorKey[source]

Bases: object

AGGREGATOR = 'aggregator'
DATA_FILTERS = 'data_filters'
METHOD = 'method'
MIN_TARGETS = 'min_targets'
NUM_ROUNDS = 'num_rounds'
OP_ID = 'op_id'
PERSISTOR = 'persistor'
RESULT_FILTERS = 'result_filters'
SHAREABLE_GENERATOR = 'shareable_gen'
TARGETS = 'targets'
TASK_ASSIGNMENT_TIMEOUT = 'task_assign_timeout'
TIMEOUT = 'timeout'
WAIT_TIME_AFTER_MIN_RESPS = 'wait_time_after_min_received'
after_task_sent_cb_signature(client_task: ClientTask, fl_ctx: FLContext)[source]

Signature of the after_task_sent CB.

Called after sending a task to a client. Usually used to clean up the FL Context or the Task data

Parameters:
  • client_task – the client task that has been sent

  • fl_ctx – the FL context that comes with the client’s task request.

before_task_sent_cb_signature(client_task: ClientTask, fl_ctx: FLContext)[source]

Signature of the before_task_sent CB.

Called before sending a task to a client. Usually used to prepare the FL Context, which is created to process client’s task req You can also use this CB to alter the data of the task to be sent.

Parameters:
  • client_task – the client task that is about to be sent

  • fl_ctx – the FL context that comes with the client’s task request.

  • client! (Public properties you set to this context will be sent to the) –

result_received_cb_signature(client_task: ClientTask, fl_ctx: FLContext)[source]

Signature of result_received CB.

Called after a result is received from a client

Parameters:
  • client_task – the client task that the result is for

  • fl_ctx – the FL context that comes with the client’s result submission

task_done_cb_signature(task: Task, fl_ctx: FLContext)[source]

Signature of task_done CB.

Called when the task is completed.

Parameters:
  • task – the task that is completed

  • fl_ctx – an instance of FL Context used for this call only.