nvflare.fuel.f3.stream_cell module

class StreamCell(cell: CoreCell)[source]

Bases: object

static get_chunk_size()[source]

Get the default chunk size used by StreamCell Byte stream are broken into chunks of this size before sending over Cellnet

register_blob_cb(channel: str, topic: str, blob_cb, *args, **kwargs)[source]

Register a callback for receiving the blob. This callback is invoked when the whole blob is received. If streaming fails, the streamer will try again. The failed streaming is ignored.

The callback must have the following signature,

blob_cb(future: StreamFuture, *args, **kwargs)

The future’s result is the final BLOB received

Parameters:
  • channel – the channel of the request

  • topic – topic of the request

  • blob_cb – The callback to handle the stream

register_file_cb(channel: str, topic: str, file_cb, *args, **kwargs)[source]
Register callbacks for file receiving. The callbacks must have the following signatures,
file_cb(future: StreamFuture, file_name: str, *args, **kwargs) -> str

The future represents the file receiving task and the result is the final file path It returns the full path where the file will be written to

Parameters:
  • channel – the channel of the request

  • topic – topic of the request

  • file_cb – This CB is called when file transfer starts

register_objects_cb(channel: str, topic: str, object_stream_cb: Callable, object_cb: Callable, *args, **kwargs)[source]
Register callback for receiving the object. The callback signature is,
objects_stream_cb(future: ObjectStreamFuture, resume: bool, *args, **kwargs) -> int

future: It represents the streaming of all objects. An object CB can be registered with the future to receive each object. resume: True if this is a restarted stream This CB returns the index to restart if this is a restarted stream

object_cb(obj_sid: str, index: int, message: Message, args, * kwargs)

obj_sid: Object Stream ID index: The index of the object message: The header and payload is the object

resume_cb(stream_id: str, *args, **kwargs) -> int

is received. The index starts from 0. The callback must have the following signature,

objects_cb(future: ObjectStreamFuture, index: int, object: Any, headers: Optional[dict], *args, **kwargs) resume_cb(stream_id: str, *args, **kwargs) -> int

Parameters:
  • channel – the channel of the request

  • topic – topic of the request

  • object_stream_cb – The callback when an object stream is started

  • object_cb – The callback is invoked when each object is received

register_stream_cb(channel: str, topic: str, stream_cb: Callable, *args, **kwargs)[source]
Register a callback for reading stream. The stream_cb must have the following signature,
stream_cb(future: StreamFuture, stream: Stream, resume: bool, *args, **kwargs) -> int

future: The future represents the ongoing streaming. It’s done when streaming is complete. stream: The stream to read the receiving data from resume: True if this is a restarted stream It returns the offset to resume from if this is a restarted stream

The resume_cb returns the offset to resume from:

resume_cb(stream_id: str, *args, **kwargs) -> int

If None, the stream is not resumable.

Parameters:
  • channel – the channel of the request

  • topic – topic of the request

  • stream_cb – The callback to handle the stream. This is called when a stream is started. It also provides restart offset for restarted streams. This CB is invoked in a dedicated thread, and it can block

  • *args – positional args to be passed to the callbacks

  • **kwargs – keyword args to be passed to the callbacks

send_blob(channel: str, topic: str, target: str, message: Message, secure=False, optional=False) StreamFuture[source]

Send a BLOB (Binary Large Object) to the target. The payload of message is the BLOB. The BLOB must fit in memory on the receiving end.

Parameters:
  • channel – channel for the message

  • topic – topic of the message

  • target – destination cell IDs

  • message – the headers and the blob as payload

  • secure – Send the message with end-end encryption if True

  • optional – Optional message, error maybe suppressed

Returns: StreamFuture that can be used to check status/progress and get result

The future result is the total number of bytes sent

send_file(channel: str, topic: str, target: str, message: Message, secure=False, optional=False) StreamFuture[source]

Send a file to target using stream API.

Parameters:
  • channel – channel for the message

  • topic – topic for the message

  • target – destination cell FQCN

  • message – the headers and the full path of the file to be sent as payload

  • secure – Send the message with end-end encryption if True

  • optional – Optional message, error maybe suppressed

Returns: StreamFuture that can be used to check status/progress and get the total bytes sent

send_objects(channel: str, topic: str, target: str, message: Message, secure=False, optional=False) ObjectStreamFuture[source]

Send a list of objects to the destination. Each object is sent as BLOB, so it must fit in memory

Parameters:
  • channel – channel for the message

  • topic – topic of the message

  • target – destination cell IDs

  • message – Headers and the payload which is an iterator that provides next object

  • secure – Send the message with end-end encryption if True

  • optional – Optional message, error maybe suppressed

Returns: ObjectStreamFuture that can be used to check status/progress, or register callbacks

send_stream(channel: str, topic: str, target: str, message: Message, secure=False, optional=False) StreamFuture[source]

Send a byte-stream over a channel/topic asynchronously. The streaming is performed in a different thread. The streamer will read from stream and send the data in chunks till the stream reaches EOF.

Parameters:
  • channel – channel for the stream

  • topic – topic for the stream

  • target – destination cell FQCN

  • message – The payload is the stream to send

  • secure – Send the message with end-end encryption if True

  • optional – Optional message, error maybe suppressed

Returns: StreamFuture that can be used to check status/progress, or register callbacks.

The future result is the number of bytes sent