nvflare.app_opt.tensor_stream.server module

class TensorServerStreamer(format: str = ExchangeFormat.PYTORCH, tasks: list[str] | None = None, tensor_send_timeout: float = 30.0, wait_send_task_data_all_clients_timeout: float = 300.0)[source]

Bases: FLComponent

Handles sending task data tensors to clients and receiving task results from clients.

It uses a StreamableEngine, TensorReceiver, and TensorSender to manage tensor streaming on the server side. .. attribute:: format

The format of the tensors to send/receive. Default is “pytorch”.

type:

str

tensor_send_timeout

Timeout for tensor entry transfer operations. Default is 30.0 seconds.

Type:

float

engine

The StreamableEngine used for tensor streaming.

Type:

StreamableEngine

sender

The TensorSender used to send tensors to clients.

Type:

TensorSender

receiver

The TensorReceiver used to receive tensors from clients.

Type:

TensorReceiver

start_sending_time

The timestamp when sending to clients started for the current round.

Type:

dict[int, float]

seen_tasks

The set of task IDs seen in the current round.

Type:

dict[int, set[str]]

num_task_data_sent

The number of task data sent to clients successfully for the current round.

Type:

dict[int, int]

num_task_skipped

The number of task data skipped (not sent) to clients for the current round.

Type:

dict[int, int]

data_cleaned

Flag indicating whether the task data has been cleaned from the FLContext for the current round.

Type:

dict[int, bool]

lock

A lock to protect shared data structures.

Type:

Lock

initialize(fl_ctx)[source]

Initializes the TensorServerStreamer component.

handle_event(event_type, fl_ctx)[source]

Handles events for the TensorSender component.

send_tensors_to_client(fl_ctx)[source]

Sends tensors to the client after task data filtering.

clean_counters(current_round)[source]

Cleans the counters for the current round.

wait_clients_to_complete(num_clients, fl_ctx)

Waits until all clients have received the tensors or timeout occurs.

try_to_clean_task_data(fl_ctx)[source]

Cleans the task data in the FLContext if all clients have received the tensors.

Initialize the TensorServerStreamer component.

The server automatically communicates the required minimum get_task_timeout to clients to prevent fast clients from timing out while waiting for slow clients to receive tensors.

Background: Fast clients finish receiving tensors early and immediately request the next task. However, the server blocks waiting for all clients to receive tensors (up to wait_send_task_data_all_clients_timeout). Without proper timeout configuration, fast clients would timeout and fail.

Automatic Timeout Management:
  • Server calculates: min_timeout = wait_send_task_data_all_clients_timeout + 60s buffer

  • Server sends this requirement to clients in task responses

  • Clients automatically adjust their get_task_timeout if it’s too small

  • Transparent logging shows when auto-adjustment occurs

Optional Manual Configuration:

Users can still explicitly set get_task_timeout in config_fed_client.json to override the automatic behavior if needed:

{

“get_task_timeout”: 400.0 // Explicit override

}

Parameters:
  • format (str) – The format of the tensors to send/receive. Default is ExchangeFormat.PYTORCH.

  • tasks (list[str]) – The list of tasks to send tensors for. Default is None, which means the “train” task.

  • tensor_send_timeout (float) – Timeout for each tensor chunk transfer operation. Default is 30.0 seconds.

  • wait_send_task_data_all_clients_timeout (float) – Maximum time to wait for all clients to receive task tensors. Default is 300.0 seconds.

GET_TASK_TIMEOUT_BUFFER = 60.0
clean_counters(current_round: int)[source]

Clean the counters for the current round.

Parameters:

current_round (int) – The current round number.

handle_event(event_type: str, fl_ctx: FLContext)[source]

Handle events for the TensorServerStreamer component.

Parameters:
  • event_type (str) – The type of event to handle.

  • fl_ctx (FLContext) – The FLContext for the current operation.

initialize(fl_ctx: FLContext)[source]

Initialize the TensorServerSender component. :param fl_ctx: The FLContext for the current operation. :type fl_ctx: FLContext

send_tensors_to_client(fl_ctx: FLContext)[source]

Send tensors to the client after task data filtering.

Parameters:

fl_ctx (FLContext) – The FLContext for the current operation.

try_to_clean_task_data(num_clients: int, fl_ctx: FLContext)[source]

Clean the task data in the FLContext.

Parameters:
  • num_clients (int) – The number of clients to wait for.

  • fl_ctx (FLContext) – The FLContext to clean the task data from.

wait_sending_task_data_all_clients(num_clients: int, fl_ctx: FLContext)[source]

Wait until all clients have received the task data tensors or timeout occurs.

Parameters:
  • num_clients (int) – The number of clients to wait for.

  • fl_ctx (FLContext) – The FLContext for the current operation.

Raises:

TimeoutError – If not all clients have received the tensors within the timeout period.