nvflare.fuel.f3.streaming.byte_streamer module

class ByteStreamer(cell: CoreCell)[source]

Bases: object

get_chunk_size()[source]
map_lock = <unlocked _thread.lock object>
send(channel: str, topic: str, target: str, headers: dict, stream: Stream, stream_type='byte', secure=False, optional=False, reliable: bool | None = None) StreamFuture[source]
sent_stream_counter_pool = <nvflare.fuel.f3.stats_pool.CounterPool object>
sent_stream_size_pool = <nvflare.fuel.f3.stats_pool.HistPool object>
tx_task_map = {}
class ReliableRetryScheduler[source]

Bases: object

register(task)[source]
shutdown()[source]
unregister(task)[source]
wakeup()[source]
class TxTask(cell: CoreCell, chunk_size: int, channel: str, topic: str, target: str, headers: dict, stream: Stream, reliable: bool | None, secure: bool, optional: bool)[source]

Bases: StreamTaskSpec

cancel()[source]

Cancel the task

Returns:

handle_ack(message: Message)[source]
remove_task()[source]
retry_task() float | None[source]
send_loop()[source]

Read/send loop to transmit the whole stream with flow control

send_pending_buffer(final=False)[source]
start_task_thread(task_handler: Callable)[source]
stop(error: StreamError | None = None, notify=True)[source]