nvflare.app_opt.tensor_stream.producer module

class TensorProducer(tensors: dict[str, Tensor], task_id: str, tensor_send_timeout: float)[source]

Bases: ObjectProducer

TensorProducer produces stream data bytes objects from a map of torch tensors.

logger

Logger for logging messages.

tensor_send_timeout

Timeout for each entry in the stream.

last

Flag indicating if the last tensor has been sent.

tensors

Dictionary of tensors to be sent.

tensors_keys

List of keys for the tensors to be sent.

start

Starting index for streaming.

current

Current index in the streaming process.

end

Ending index for streaming.

produce(stream_ctx, fl_ctx)[source]

Produces the next chunk of tensors to be sent.

process_replies(replies, stream_ctx, fl_ctx)[source]

Processes replies from peers after sending tensors.

Initialize the TensorProducer.

Parameters:
  • tensors (dict) – A dictionary of tensors to be sent.

  • task_id (str) – The task ID associated with the tensors.

  • tensor_send_timeout (float) – The timeout for each chunk of tensors to be sent over the stream.

Raises:

ValueError – If no tensors are provided.

log_completion(fl_ctx: FLContext)[source]
process_replies(replies: dict[str, Shareable], stream_ctx: dict, fl_ctx: FLContext) Any[source]

Process replies from peers after sending tensors.

Parameters:
  • replies (dict[str, Shareable]) – A dictionary of replies from peers.

  • stream_ctx (StreamContext) – The stream context for the current operation. (not used)

  • fl_ctx (FLContext) – The FL context for the current operation. (not used)

Returns:

True if all replies are successful and the last tensor has been sent,

False if there was an error in any reply, None if more tensors need to be sent.

Return type:

Any

produce(stream_ctx: dict, fl_ctx: FLContext) tuple[Shareable, float][source]

Produce the next chunk of tensors to be sent.

It serializes and return the next tensor using safetensors and prepares them for sending. :param stream_ctx: The stream context for the current operation. :type stream_ctx: StreamContext :param fl_ctx: The FL context for the current operation. :type fl_ctx: FLContext

Returns:

A tuple containing the shareable object with the tensor data and the timeout for the entry.

Return type:

tuple[Shareable, float]

Raises:

Warning – If no tensors are found in the FLContext.