nvflare.fuel.f3.stream_cell module

class StreamCell(cell: CoreCell)[source]

Bases: object

static get_chunk_size()[source]

Gets the default chunk size used by StreamCell.

Byte stream are broken into chunks of this size before sending over Cellnet

register_blob_cb(channel: str, topic: str, blob_cb, *args, **kwargs)[source]

Registers a callback for receiving the blob.

This callback is invoked when the whole blob is received. If streaming fails, the streamer will try again. The failed streaming is ignored.

The callback must have the following signature:

The future’s result is the final BLOB received

Parameters:
  • channel – the channel of the request

  • topic – topic of the request

  • blob_cb – The callback to handle the stream

register_file_cb(channel: str, topic: str, file_cb, *args, **kwargs)[source]

Registers callbacks for file receiving.

The callbacks must have the following signatures:

Parameters:
  • channel – the channel of the request

  • topic – topic of the request

  • file_cb – This CB is called when file transfer starts

register_objects_cb(channel: str, topic: str, object_stream_cb: Callable, object_cb: Callable, *args, **kwargs)[source]

Registers callback for receiving the object.

The callback signature is:

Parameters:
  • channel – the channel of the request

  • topic – topic of the request

  • object_stream_cb – The callback when an object stream is started

  • object_cb – The callback is invoked when each object is received

register_stream_cb(channel: str, topic: str, stream_cb: Callable, *args, **kwargs)[source]

Registers a callback for reading stream.

The stream_cb must have the following signature:

Parameters:
  • channel – the channel of the request

  • topic – topic of the request

  • stream_cb – The callback to handle the stream. This is called when a stream is started. It also provides restart offset for restarted streams. This CB is invoked in a dedicated thread, and it can block

  • *args – positional args to be passed to the callbacks

  • **kwargs – keyword args to be passed to the callbacks

send_blob(channel: str, topic: str, target: str, message: Message, secure=False, optional=False) StreamFuture[source]

Sends a BLOB (Binary Large Object) to the target.

The payload of message is the BLOB. The BLOB must fit in memory on the receiving end.

Parameters:
  • channel – channel for the message

  • topic – topic of the message

  • target – destination cell IDs

  • message – the headers and the blob as payload

  • secure – Send the message with end-end encryption if True

  • optional – Optional message, error maybe suppressed

Returns:

StreamFuture that can be used to check status/progress and get result The future result is the total number of bytes sent

send_file(channel: str, topic: str, target: str, message: Message, secure=False, optional=False) StreamFuture[source]

Sends a file to target using stream API.

Parameters:
  • channel – channel for the message

  • topic – topic for the message

  • target – destination cell FQCN

  • message – the headers and the full path of the file to be sent as payload

  • secure – Send the message with end-end encryption if True

  • optional – Optional message, error maybe suppressed

Returns:

StreamFuture that can be used to check status/progress and get the total bytes sent

send_objects(channel: str, topic: str, target: str, message: Message, secure=False, optional=False) ObjectStreamFuture[source]

Sends a list of objects to the destination.

Each object is sent as BLOB, so it must fit in memory

Parameters:
  • channel – channel for the message

  • topic – topic of the message

  • target – destination cell IDs

  • message – Headers and the payload which is an iterator that provides next object

  • secure – Send the message with end-end encryption if True

  • optional – Optional message, error maybe suppressed

Returns:

ObjectStreamFuture that can be used to check status/progress, or register callbacks

send_stream(channel: str, topic: str, target: str, message: Message, secure=False, optional=False) StreamFuture[source]

Sends a byte-stream over a channel/topic asynchronously. The streaming is performed in a different thread.

The streamer will read from stream and send the data in chunks till the stream reaches EOF.

Parameters:
  • channel – channel for the stream

  • topic – topic for the stream

  • target – destination cell FQCN

  • message – The payload is the stream to send

  • secure – Send the message with end-end encryption if True

  • optional – Optional message, error maybe suppressed

Returns:

A StreamFuture that can be used to check status/progress, or register callbacks. The future result is the number of bytes sent