nvflare.app_opt.tensor_stream.server module
- class TensorServerStreamer(format: str = ExchangeFormat.PYTORCH, tasks: list[str] | None = None, tensor_send_timeout: float = 30.0, wait_send_task_data_all_clients_timeout: float = 300.0)[source]
Bases:
FLComponentHandles sending task data tensors to clients and receiving task results from clients.
It uses a StreamableEngine, TensorReceiver, and TensorSender to manage tensor streaming on the server side. .. attribute:: format
The format of the tensors to send/receive. Default is “pytorch”.
- type:
str
- tensor_send_timeout
Timeout for tensor entry transfer operations. Default is 30.0 seconds.
- Type:
float
- engine
The StreamableEngine used for tensor streaming.
- Type:
StreamableEngine
- sender
The TensorSender used to send tensors to clients.
- Type:
TensorSender
- receiver
The TensorReceiver used to receive tensors from clients.
- Type:
TensorReceiver
- start_sending_time
The timestamp when sending to clients started for the current round.
- Type:
dict[int, float]
- seen_tasks
The set of task IDs seen in the current round.
- Type:
dict[int, set[str]]
- num_task_data_sent
The number of task data sent to clients successfully for the current round.
- Type:
dict[int, int]
- num_task_skipped
The number of task data skipped (not sent) to clients for the current round.
- Type:
dict[int, int]
- data_cleaned
Flag indicating whether the task data has been cleaned from the FLContext for the current round.
- Type:
dict[int, bool]
- lock
A lock to protect shared data structures.
- Type:
Lock
- wait_clients_to_complete(num_clients, fl_ctx)
Waits until all clients have received the tensors or timeout occurs.
- try_to_clean_task_data(fl_ctx)[source]
Cleans the task data in the FLContext if all clients have received the tensors.
Initialize the TensorServerStreamer component.
The server automatically communicates the required minimum get_task_timeout to clients to prevent fast clients from timing out while waiting for slow clients to receive tensors.
Background: Fast clients finish receiving tensors early and immediately request the next task. However, the server blocks waiting for all clients to receive tensors (up to wait_send_task_data_all_clients_timeout). Without proper timeout configuration, fast clients would timeout and fail.
- Automatic Timeout Management:
Server calculates: min_timeout = wait_send_task_data_all_clients_timeout + 60s buffer
Server sends this requirement to clients in task responses
Clients automatically adjust their get_task_timeout if it’s too small
Transparent logging shows when auto-adjustment occurs
- Optional Manual Configuration:
Users can still explicitly set get_task_timeout in config_fed_client.json to override the automatic behavior if needed:
- {
“get_task_timeout”: 400.0 // Explicit override
}
- Parameters:
format (str) – The format of the tensors to send/receive. Default is ExchangeFormat.PYTORCH.
tasks (list[str]) – The list of tasks to send tensors for. Default is None, which means the “train” task.
tensor_send_timeout (float) – Timeout for each tensor chunk transfer operation. Default is 30.0 seconds.
wait_send_task_data_all_clients_timeout (float) – Maximum time to wait for all clients to receive task tensors. Default is 300.0 seconds.
- GET_TASK_TIMEOUT_BUFFER = 60.0
- clean_counters(current_round: int)[source]
Clean the counters for the current round.
- Parameters:
current_round (int) – The current round number.
- handle_event(event_type: str, fl_ctx: FLContext)[source]
Handle events for the TensorServerStreamer component.
- Parameters:
event_type (str) – The type of event to handle.
fl_ctx (FLContext) – The FLContext for the current operation.
- initialize(fl_ctx: FLContext)[source]
Initialize the TensorServerSender component. :param fl_ctx: The FLContext for the current operation. :type fl_ctx: FLContext
- send_tensors_to_client(fl_ctx: FLContext)[source]
Send tensors to the client after task data filtering.
- Parameters:
fl_ctx (FLContext) – The FLContext for the current operation.
- try_to_clean_task_data(num_clients: int, fl_ctx: FLContext)[source]
Clean the task data in the FLContext.
- Parameters:
num_clients (int) – The number of clients to wait for.
fl_ctx (FLContext) – The FLContext to clean the task data from.
- wait_sending_task_data_all_clients(num_clients: int, fl_ctx: FLContext)[source]
Wait until all clients have received the task data tensors or timeout occurs.
- Parameters:
num_clients (int) – The number of clients to wait for.
fl_ctx (FLContext) – The FLContext for the current operation.
- Raises:
TimeoutError – If not all clients have received the tensors within the timeout period.