nvflare.private.stream_runner module
- class HeaderKey[source]
Bases:
object- CHANNEL = 'ObjectStreamer.CHANNEL'
- CTX = 'ObjectStreamer.CTX'
- END_RESULT = 'ObjectStreamer.END_RESULT'
- SEQ = 'ObjectStreamer.SEQ'
- TOPIC = 'ObjectStreamer.TOPIC'
- TX_ID = 'ObjectStreamer.TX_ID'
- class ObjectStreamer(aux_runner: AuxRunner)[source]
Bases:
FLComponentInit FLComponent.
The FLComponent is the base class of all FL Components. (executors, controllers, responders, filters, aggregators, and widgets are all FLComponents)
FLComponents have the capability to handle and fire events and contain various methods for logging.
- register_stream_processing(channel: str, topic: str, factory: ConsumerFactory, stream_done_cb=None, consumed_cb=None, **cb_kwargs)[source]
Register a ConsumerFactory for specified app channel and topic. Once a new streaming request is received for the channel/topic, the registered factory will be used to create a new ObjectConsumer object to handle the stream.
Note: the factory should generate a new ObjectConsumer every time get_consumer() is called. This is because multiple streaming sessions could be going on at the same time. Each streaming session should have its own ObjectConsumer.
- Parameters:
channel – app channel
topic – app topic
factory – the factory to be registered
consumed_cb – the CB is called after a chunk is consumed
stream_done_cb – the CB to be called when a stream is done
Returns: None
- stream(channel: str, topic: str, stream_ctx: dict, targets: List[AuxMsgTarget], producer: ObjectProducer, fl_ctx: FLContext, secure=False, optional=False) Tuple[str, Any][source]
- stream_no_wait(channel: str, topic: str, stream_ctx: dict, targets: List[AuxMsgTarget], producer: ObjectProducer, fl_ctx: FLContext, secure=False, optional=False) Future[source]