nvflare.fuel.f3.streaming.byte_streamer module

class ByteStreamer(cell: CoreCell)[source]

Bases: object

get_chunk_size()[source]
map_lock = <unlocked _thread.lock object>
send(channel: str, topic: str, target: str, headers: dict, stream: Stream, stream_type='byte', secure=False, optional=False) StreamFuture[source]
sent_stream_counter_pool = <nvflare.fuel.f3.stats_pool.CounterPool object>
sent_stream_size_pool = <nvflare.fuel.f3.stats_pool.HistPool object>
tx_task_map = {}
class TxTask(cell: CoreCell, chunk_size: int, channel: str, topic: str, target: str, headers: dict, stream: Stream, secure: bool, optional: bool)[source]

Bases: StreamTaskSpec

cancel()[source]

Cancel the task

Returns:

handle_ack(message: Message)[source]
send_loop()[source]

Read/send loop to transmit the whole stream with flow control

send_pending_buffer(final=False)[source]
start_task_thread(task_handler: Callable)[source]
stop(error: StreamError | None = None, notify=True)[source]