nvflare.fuel.f3.streaming.byte_receiver module

class ByteReceiver(cell: CoreCell)[source]

Bases: object

received_stream_counter_pool = <nvflare.fuel.f3.stats_pool.CounterPool object>
received_stream_size_pool = <nvflare.fuel.f3.stats_pool.HistPool object>
register_callback(channel: str, topic: str, stream_cb: Callable, *args, **kwargs)[source]
class RxStream(task: RxTask)[source]

Bases: Stream

A stream that’s used to read streams from the streaming task

Constructor for stream

Parameters:
  • size – The total size of stream. 0 if unknown

  • headers – Optional headers to be passed to the receiver

close()[source]

Close the stream

read(size: int) bytes[source]

Read and return up to size bytes. It can return less but not more than the size. An empty bytes object is returned if the stream reaches the end.

Parameters:

size – Up to (but maybe less) this many bytes will be returned

Returns:

Binary data. If empty, it means the stream is depleted (EOS)

class RxTask(sid: int, origin: str, cell: CoreCell)[source]

Bases: object

Receiving task for ByteStream

classmethod find_or_create_task(message: Message, cell: CoreCell) RxTask | None[source]
map_lock = <unlocked _thread.lock object>
process_chunk(message: Message) bool[source]

Returns True if a new stream is created

read(size: int) bytes | bytearray | memoryview | list[source]
rx_task_map = {}
stop(error: StreamError | None = None, notify=True)[source]