nvflare.fuel.f3.stream_cell module¶
- class StreamCell(cell: CoreCell)[source]¶
Bases:
object
- static get_chunk_size()[source]¶
Gets the default chunk size used by StreamCell.
Byte stream are broken into chunks of this size before sending over Cellnet
- register_blob_cb(channel: str, topic: str, blob_cb, *args, **kwargs)[source]¶
Registers a callback for receiving the blob.
This callback is invoked when the whole blob is received. If streaming fails, the streamer will try again. The failed streaming is ignored.
The callback must have the following signature:
The future’s result is the final BLOB received
- Parameters:
channel – the channel of the request
topic – topic of the request
blob_cb – The callback to handle the stream
- register_file_cb(channel: str, topic: str, file_cb, *args, **kwargs)[source]¶
Registers callbacks for file receiving.
The callbacks must have the following signatures:
- Parameters:
channel – the channel of the request
topic – topic of the request
file_cb – This CB is called when file transfer starts
- register_objects_cb(channel: str, topic: str, object_stream_cb: Callable, object_cb: Callable, *args, **kwargs)[source]¶
Registers callback for receiving the object.
The callback signature is:
- Parameters:
channel – the channel of the request
topic – topic of the request
object_stream_cb – The callback when an object stream is started
object_cb – The callback is invoked when each object is received
- register_stream_cb(channel: str, topic: str, stream_cb: Callable, *args, **kwargs)[source]¶
Registers a callback for reading stream.
The stream_cb must have the following signature:
- Parameters:
channel – the channel of the request
topic – topic of the request
stream_cb – The callback to handle the stream. This is called when a stream is started. It also provides restart offset for restarted streams. This CB is invoked in a dedicated thread, and it can block
*args – positional args to be passed to the callbacks
**kwargs – keyword args to be passed to the callbacks
- send_blob(channel: str, topic: str, target: str, message: Message, secure=False, optional=False) StreamFuture [source]¶
Sends a BLOB (Binary Large Object) to the target.
The payload of message is the BLOB. The BLOB must fit in memory on the receiving end.
- Parameters:
channel – channel for the message
topic – topic of the message
target – destination cell IDs
message – the headers and the blob as payload
secure – Send the message with end-end encryption if True
optional – Optional message, error maybe suppressed
- Returns:
StreamFuture that can be used to check status/progress and get result The future result is the total number of bytes sent
- send_file(channel: str, topic: str, target: str, message: Message, secure=False, optional=False) StreamFuture [source]¶
Sends a file to target using stream API.
- Parameters:
channel – channel for the message
topic – topic for the message
target – destination cell FQCN
message – the headers and the full path of the file to be sent as payload
secure – Send the message with end-end encryption if True
optional – Optional message, error maybe suppressed
- Returns:
StreamFuture that can be used to check status/progress and get the total bytes sent
- send_objects(channel: str, topic: str, target: str, message: Message, secure=False, optional=False) ObjectStreamFuture [source]¶
Sends a list of objects to the destination.
Each object is sent as BLOB, so it must fit in memory
- Parameters:
channel – channel for the message
topic – topic of the message
target – destination cell IDs
message – Headers and the payload which is an iterator that provides next object
secure – Send the message with end-end encryption if True
optional – Optional message, error maybe suppressed
- Returns:
ObjectStreamFuture that can be used to check status/progress, or register callbacks
- send_stream(channel: str, topic: str, target: str, message: Message, secure=False, optional=False) StreamFuture [source]¶
Sends a byte-stream over a channel/topic asynchronously. The streaming is performed in a different thread.
The streamer will read from stream and send the data in chunks till the stream reaches EOF.
- Parameters:
channel – channel for the stream
topic – topic for the stream
target – destination cell FQCN
message – The payload is the stream to send
secure – Send the message with end-end encryption if True
optional – Optional message, error maybe suppressed
- Returns:
A StreamFuture that can be used to check status/progress, or register callbacks. The future result is the number of bytes sent