nvflare.app_opt.tensor_stream.producer module¶
- class TensorProducer(tensors: dict[str, Tensor], task_id: str, tensor_send_timeout: float)[source]¶
Bases:
ObjectProducerTensorProducer produces stream data bytes objects from a map of torch tensors.
- logger¶
Logger for logging messages.
- tensor_send_timeout¶
Timeout for each entry in the stream.
- last¶
Flag indicating if the last tensor has been sent.
- tensors¶
Dictionary of tensors to be sent.
- tensors_keys¶
List of keys for the tensors to be sent.
- start¶
Starting index for streaming.
- current¶
Current index in the streaming process.
- end¶
Ending index for streaming.
- process_replies(replies, stream_ctx, fl_ctx)[source]¶
Processes replies from peers after sending tensors.
Initialize the TensorProducer.
- Parameters:
tensors (dict) – A dictionary of tensors to be sent.
task_id (str) – The task ID associated with the tensors.
tensor_send_timeout (float) – The timeout for each chunk of tensors to be sent over the stream.
- Raises:
ValueError – If no tensors are provided.
- process_replies(replies: dict[str, Shareable], stream_ctx: dict, fl_ctx: FLContext) Any[source]¶
Process replies from peers after sending tensors.
- Parameters:
replies (dict[str, Shareable]) – A dictionary of replies from peers.
stream_ctx (StreamContext) – The stream context for the current operation. (not used)
fl_ctx (FLContext) – The FL context for the current operation. (not used)
- Returns:
- True if all replies are successful and the last tensor has been sent,
False if there was an error in any reply, None if more tensors need to be sent.
- Return type:
Any
- produce(stream_ctx: dict, fl_ctx: FLContext) tuple[Shareable, float][source]¶
Produce the next chunk of tensors to be sent.
It serializes and return the next tensor using safetensors and prepares them for sending. :param stream_ctx: The stream context for the current operation. :type stream_ctx: StreamContext :param fl_ctx: The FL context for the current operation. :type fl_ctx: FLContext
- Returns:
A tuple containing the shareable object with the tensor data and the timeout for the entry.
- Return type:
tuple[Shareable, float]
- Raises:
Warning – If no tensors are found in the FLContext.