nvflare.app_opt.tensor_stream.server module¶
- class TensorServerStreamer(format: str = 'pytorch', tasks: list[str] | None = None, tensor_send_timeout: float = 30.0, wait_send_task_data_all_clients_timeout: float = 300.0)[source]¶
Bases:
FLComponentHandles sending task data tensors to clients and receiving task results from clients.
It uses a StreamableEngine, TensorReceiver, and TensorSender to manage tensor streaming on the server side. .. attribute:: format
The format of the tensors to send/receive. Default is “pytorch”.
- type:
str
- tensor_send_timeout¶
Timeout for tensor entry transfer operations. Default is 30.0 seconds.
- Type:
float
- engine¶
The StreamableEngine used for tensor streaming.
- Type:
StreamableEngine
- sender¶
The TensorSender used to send tensors to clients.
- Type:
TensorSender
- receiver¶
The TensorReceiver used to receive tensors from clients.
- Type:
TensorReceiver
- start_sending_time¶
The timestamp when sending to clients started for the current round.
- Type:
dict[int, float]
- seen_tasks¶
The set of task IDs seen in the current round.
- Type:
dict[int, set[str]]
- num_task_data_sent¶
The number of task data sent to clients successfully for the current round.
- Type:
dict[int, int]
- num_task_skipped¶
The number of task data skipped (not sent) to clients for the current round.
- Type:
dict[int, int]
- data_cleaned¶
Flag indicating whether the task data has been cleaned from the FLContext for the current round.
- Type:
dict[int, bool]
- lock¶
A lock to protect shared data structures.
- Type:
Lock
- wait_clients_to_complete(num_clients, fl_ctx)¶
Waits until all clients have received the tensors or timeout occurs.
- try_to_clean_task_data(fl_ctx)[source]¶
Cleans the task data in the FLContext if all clients have received the tensors.
Initialize the TensorServerStreamer component.
- Parameters:
format (str) – The format of the tensors to send/receive. Default is ExchangeFormat.TORCH.
tasks (list[str]) – The list of tasks to send tensors for. Default is None, which means the “train” task.
tensor_send_timeout (float) – Timeout for tensor entry transfer operations. Default is 10.0 seconds.
wait_all_clients_timeout (float) – Timeout for sending tensors to all clients. Default is 120.0 seconds.
- clean_counters(current_round: int)[source]¶
Clean the counters for the current round.
- Parameters:
current_round (int) – The current round number.
- handle_event(event_type: str, fl_ctx: FLContext)[source]¶
Handle events for the TensorServerStreamer component.
- Parameters:
event_type (str) – The type of event to handle.
fl_ctx (FLContext) – The FLContext for the current operation.
- initialize(fl_ctx: FLContext)[source]¶
Initialize the TensorServerSender component. :param fl_ctx: The FLContext for the current operation. :type fl_ctx: FLContext
- send_tensors_to_client(fl_ctx: FLContext)[source]¶
Send tensors to the client after task data filtering.
- Parameters:
fl_ctx (FLContext) – The FLContext for the current operation.
- try_to_clean_task_data(num_clients: int, fl_ctx: FLContext)[source]¶
Clean the task data in the FLContext.
- Parameters:
num_clients (int) – The number of clients to wait for.
fl_ctx (FLContext) – The FLContext to clean the task data from.
- wait_sending_task_data_all_clients(num_clients: int, fl_ctx: FLContext)[source]¶
Wait until all clients have received the task data tensors or timeout occurs.
- Parameters:
num_clients (int) – The number of clients to wait for.
fl_ctx (FLContext) – The FLContext for the current operation.
- Raises:
TimeoutError – If not all clients have received the tensors within the timeout period.