nvflare.edge.controllers.sage module

class ScatterAndGatherForEdge(num_rounds: int = 5, assessor_id: str = 'assessor', task_name='train', task_check_period: float = 0.5, assess_interval: float = 0.5, update_interval: float = 1.0)[source]

Bases: Controller

ScatterAndGatherForEdge Workflow.

The ScatterAndGatherForEdge workflow is a Fed Average algorithm for hierarchically organized edge devices.

During the execution of a task, the assessor (specified by assessor_id) is invoked periodically to assess the quality of training results to determine whether the task should be continued.

Parameters:
  • num_rounds (int, optional) – The total number of training rounds. Defaults to 5.

  • assessor_id (str) – ID of the assessor component.

  • task_name (str) – Name of the train task. Defaults to “train”.

  • task_check_period (float, optional) – interval for checking status of tasks. Defaults to 0.5.

  • assess_interval – how often to invoke the assessor during task execution

  • update_interval – how often for children to send updates

Raises:
  • TypeError – when any of input arguments does not have correct type

  • ValueError – when any of input arguments is out of range

control_flow(abort_signal: Signal, fl_ctx: FLContext) None[source]

This is the control logic for the RUN.

NOTE: this is running in a separate thread, and its life is the duration of the RUN.

Parameters:
  • fl_ctx – the FL context

  • abort_signal – the abort signal. If triggered, this method stops waiting and returns to the caller.

classmethod get_next_task_seq()[source]
handle_event(event_type: str, fl_ctx: FLContext)[source]

Handles events.

Parameters:
  • event_type (str) – event type fired by workflow.

  • fl_ctx (FLContext) – FLContext information.

next_task_seq = 0
process_result_of_unknown_task(client: Client, task_name, client_task_id, result: Shareable, fl_ctx: FLContext) None[source]

Process result when no task is found for it.

This is called when a result submission is received from a client, but no standing task can be found for it (from the task queue)

This could happen when: - the client’s submission is too late - the task is already completed - the Controller lost the task, e.g. the Server is restarted

Parameters:
  • client – the client that the result comes from

  • task_name – the name of the task

  • client_task_id – ID of the task

  • result – the result from the client

  • fl_ctx – the FL context that comes with the client’s submission

start_controller(fl_ctx: FLContext) None[source]

Starts the controller.

This method is called at the beginning of the RUN.

Parameters:
  • fl_ctx – the FL context. You can use this context to access services provided by the

  • example (framework. For)

  • your (you can get Command Register from it and register)

  • modules. (admin command)

stop_controller(fl_ctx: FLContext)[source]

Stops the controller.

This method is called right before the RUN is ended.

Parameters:
  • fl_ctx – the FL context. You can use this context to access services provided by the

  • example (framework. For)

  • your (you can get Command Register from it and unregister)

  • modules. (admin command)

class TaskDoneReason(value)[source]

Bases: Enum

An enumeration.

ABORTED = 'aborted'
ALL_CHILDREN_DONE = 'all_children_done'
ASSESSED_TASK_DONE = 'assessed_task_done'
ASSESSED_WORKFLOW_DONE = 'assessed_workflow_done'