nvflare.app_opt.tensor_stream.server module

class TensorServerStreamer(format: str = 'pytorch', tasks: list[str] | None = None, tensor_send_timeout: float = 30.0, wait_send_task_data_all_clients_timeout: float = 300.0)[source]

Bases: FLComponent

Handles sending task data tensors to clients and receiving task results from clients.

It uses a StreamableEngine, TensorReceiver, and TensorSender to manage tensor streaming on the server side. .. attribute:: format

The format of the tensors to send/receive. Default is “pytorch”.

type:

str

tensor_send_timeout

Timeout for tensor entry transfer operations. Default is 30.0 seconds.

Type:

float

engine

The StreamableEngine used for tensor streaming.

Type:

StreamableEngine

sender

The TensorSender used to send tensors to clients.

Type:

TensorSender

receiver

The TensorReceiver used to receive tensors from clients.

Type:

TensorReceiver

start_sending_time

The timestamp when sending to clients started for the current round.

Type:

dict[int, float]

seen_tasks

The set of task IDs seen in the current round.

Type:

dict[int, set[str]]

num_task_data_sent

The number of task data sent to clients successfully for the current round.

Type:

dict[int, int]

num_task_skipped

The number of task data skipped (not sent) to clients for the current round.

Type:

dict[int, int]

data_cleaned

Flag indicating whether the task data has been cleaned from the FLContext for the current round.

Type:

dict[int, bool]

lock

A lock to protect shared data structures.

Type:

Lock

initialize(fl_ctx)[source]

Initializes the TensorServerStreamer component.

handle_event(event_type, fl_ctx)[source]

Handles events for the TensorSender component.

send_tensors_to_client(fl_ctx)[source]

Sends tensors to the client after task data filtering.

clean_counters(current_round)[source]

Cleans the counters for the current round.

wait_clients_to_complete(num_clients, fl_ctx)

Waits until all clients have received the tensors or timeout occurs.

try_to_clean_task_data(fl_ctx)[source]

Cleans the task data in the FLContext if all clients have received the tensors.

Initialize the TensorServerStreamer component.

Parameters:
  • format (str) – The format of the tensors to send/receive. Default is ExchangeFormat.TORCH.

  • tasks (list[str]) – The list of tasks to send tensors for. Default is None, which means the “train” task.

  • tensor_send_timeout (float) – Timeout for tensor entry transfer operations. Default is 10.0 seconds.

  • wait_all_clients_timeout (float) – Timeout for sending tensors to all clients. Default is 120.0 seconds.

clean_counters(current_round: int)[source]

Clean the counters for the current round.

Parameters:

current_round (int) – The current round number.

handle_event(event_type: str, fl_ctx: FLContext)[source]

Handle events for the TensorServerStreamer component.

Parameters:
  • event_type (str) – The type of event to handle.

  • fl_ctx (FLContext) – The FLContext for the current operation.

initialize(fl_ctx: FLContext)[source]

Initialize the TensorServerSender component. :param fl_ctx: The FLContext for the current operation. :type fl_ctx: FLContext

send_tensors_to_client(fl_ctx: FLContext)[source]

Send tensors to the client after task data filtering.

Parameters:

fl_ctx (FLContext) – The FLContext for the current operation.

try_to_clean_task_data(num_clients: int, fl_ctx: FLContext)[source]

Clean the task data in the FLContext.

Parameters:
  • num_clients (int) – The number of clients to wait for.

  • fl_ctx (FLContext) – The FLContext to clean the task data from.

wait_sending_task_data_all_clients(num_clients: int, fl_ctx: FLContext)[source]

Wait until all clients have received the task data tensors or timeout occurs.

Parameters:
  • num_clients (int) – The number of clients to wait for.

  • fl_ctx (FLContext) – The FLContext for the current operation.

Raises:

TimeoutError – If not all clients have received the tensors within the timeout period.