nvflare.app_opt.tensor_stream.client module

class TensorClientStreamer(format: str = ExchangeFormat.PYTORCH, tasks: list[str] | None = None, tensor_send_timeout=30.0)[source]

Bases: FLComponent

TensorClientSender handles receiving task data and sending task results tensors from/to server.

It uses a StreamableEngine, TensorReceiver, and TensorSender to manage tensor streaming on the client side. .. attribute:: format

The format of the tensors to send. Default is “pytorch”.

type:

str

tensor_send_timeout

Timeout for tensor entry transfer operations. Default is 30.0 seconds.

Type:

float

engine

The StreamableEngine used for tensor streaming.

Type:

StreamableEngine

sender

The TensorSender used to send tensors to the server.

Type:

TensorSender

receiver

The TensorReceiver used to receive tensors from the server.

Type:

TensorReceiver

initialize(fl_ctx)[source]

Initializes the TensorClientStreamer component.

handle_event(event_type, fl_ctx)[source]

Handles events for the TensorSender component.

send_tensors_to_server(fl_ctx)[source]

Sends tensors to the server before sending the task result.

Initialize the TensorClientStreamer component.

The client automatically receives and applies the minimum get_task_timeout requirement from the server when tensor streaming is active. This prevents fast clients from timing out while waiting for slow clients to finish receiving tensors.

Automatic Timeout Management:
  • Server calculates and sends the required minimum timeout

  • Client automatically adjusts if the received minimum is higher than current timeout

  • Logs clearly indicate when automatic adjustment occurs

  • No manual configuration needed in most cases

Optional Manual Override:

Users can explicitly set get_task_timeout in config_fed_client.json if they need to override the automatic behavior (e.g., for even longer timeouts).

Parameters:
  • format (str) – The format of the tensors to send. Default is ExchangeFormat.PYTORCH.

  • tasks (list[str]) – The list of tasks to send tensors for. Default is None, which means the “train” task.

  • tensor_send_timeout (float) – Timeout for each tensor chunk transfer operation. Default is 30.0 seconds.

handle_event(event_type: str, fl_ctx: FLContext)[source]

Handle events for the TensorSender component.

Parameters:
  • event_type (str) – The type of event to handle.

  • fl_ctx (FLContext) – The FLContext for the current operation.

initialize(fl_ctx: FLContext)[source]

Initialize the TensorClientStreamer component. :param fl_ctx: The FLContext for the current operation. :type fl_ctx: FLContext

send_tensors_to_server(fl_ctx: FLContext)[source]

Sends tensors to the server before sending the task result.

Parameters:

fl_ctx (FLContext) – The FLContext for the current operation.