nvflare.private.stream_runner module

class HeaderKey[source]

Bases: object

CHANNEL = 'ObjectStreamer.CHANNEL'
CTX = 'ObjectStreamer.CTX'
END_RESULT = 'ObjectStreamer.END_RESULT'
SEQ = 'ObjectStreamer.SEQ'
TOPIC = 'ObjectStreamer.TOPIC'
TX_ID = 'ObjectStreamer.TX_ID'
class ObjectStreamer(aux_runner: AuxRunner)[source]

Bases: FLComponent

Init FLComponent.

The FLComponent is the base class of all FL Components. (executors, controllers, responders, filters, aggregators, and widgets are all FLComponents)

FLComponents have the capability to handle and fire events and contain various methods for logging.

debug(req: Shareable, msg: str)[source]
error(req: Shareable, msg: str)[source]
info(req: Shareable, msg: str)[source]
register_stream_processing(channel: str, topic: str, factory: ConsumerFactory, stream_done_cb=None, consumed_cb=None, **cb_kwargs)[source]

Register a ConsumerFactory for specified app channel and topic. Once a new streaming request is received for the channel/topic, the registered factory will be used to create a new ObjectConsumer object to handle the stream.

Note: the factory should generate a new ObjectConsumer every time get_consumer() is called. This is because multiple streaming sessions could be going on at the same time. Each streaming session should have its own ObjectConsumer.

Parameters:
  • channel – app channel

  • topic – app topic

  • factory – the factory to be registered

  • consumed_cb – the CB is called after a chunk is consumed

  • stream_done_cb – the CB to be called when a stream is done

Returns: None

shutdown()[source]
stream(channel: str, topic: str, stream_ctx: dict, targets: List[AuxMsgTarget], producer: ObjectProducer, fl_ctx: FLContext, secure=False, optional=False) Tuple[str, Any][source]
stream_no_wait(channel: str, topic: str, stream_ctx: dict, targets: List[AuxMsgTarget], producer: ObjectProducer, fl_ctx: FLContext, secure=False, optional=False) Future[source]