nvflare.fuel.f3.stream_cell module

class StreamCell(cell: CoreCell)[source]

Bases: object

get_chunk_size()[source]

Gets the default chunk size used by StreamCell.

Byte stream are broken into chunks of this size before sending over Cellnet

register_blob_cb(channel: str, topic: str, blob_cb, *args, **kwargs)[source]

Registers a callback for receiving the blob.

This callback is invoked when the whole blob is received. If streaming fails, the streamer will try again. The failed streaming is ignored.

The callback must have the following signature:

The future’s result is the final BLOB received

Parameters:
  • channel – the channel of the request

  • topic – topic of the request

  • blob_cb – The callback to handle the stream

register_stream_cb(channel: str, topic: str, stream_cb: Callable, *args, **kwargs)[source]

Registers a callback for reading stream.

The stream_cb must have the following signature:

Parameters:
  • channel – the channel of the request

  • topic – topic of the request

  • stream_cb – The callback to handle the stream. This is called when a stream is started. It also provides restart offset for restarted streams. This CB is invoked in a dedicated thread, and it can block

  • *args – positional args to be passed to the callbacks

  • **kwargs – keyword args to be passed to the callbacks

send_blob(channel: str, topic: str, target: str, message: Message, secure=False, optional=False) StreamFuture[source]

Sends a BLOB (Binary Large Object) to the target.

The payload of message is the BLOB. The BLOB must fit in memory on the receiving end.

Parameters:
  • channel – channel for the message

  • topic – topic of the message

  • target – destination cell IDs

  • message – the headers and the blob as payload

  • secure – Send the message with end-end encryption if True

  • optional – Optional message, error maybe suppressed

Returns:

StreamFuture that can be used to check status/progress and get result The future result is the total number of bytes sent

send_stream(channel: str, topic: str, target: str, message: Message, secure=False, optional=False) StreamFuture[source]

Sends a byte-stream over a channel/topic asynchronously. The streaming is performed in a different thread.

The streamer will read from stream and send the data in chunks till the stream reaches EOF.

Parameters:
  • channel – channel for the stream

  • topic – topic for the stream

  • target – destination cell FQCN

  • message – The payload is the stream to send

  • secure – Send the message with end-end encryption if True

  • optional – Optional message, error maybe suppressed

Returns:

A StreamFuture that can be used to check status/progress, or register callbacks. The future result is the number of bytes sent