nvflare.apis.streaming module¶
- class ConsumerFactory[source]¶
Bases:
ABC
- abstract get_consumer(stream_ctx: dict, fl_ctx: FLContext) ObjectConsumer [source]¶
Called to get an ObjectConsumer to process a new stream on the receiving side. This is called only when the 1st streaming object is received for each stream.
- Parameters:
stream_ctx – the context of the stream
fl_ctx – FLContext object
Returns: an ObjectConsumer
- return_consumer(consumer: ObjectConsumer, stream_ctx: dict, fl_ctx: FLContext)[source]¶
Return the consumer back to the factory after a stream is finished on the receiving side.
- Parameters:
consumer – the consumer to be returned
stream_ctx – context of the stream
fl_ctx – FLContext object
Returns: None
- class ObjectConsumer[source]¶
Bases:
ABC
- abstract consume(shareable: Shareable, stream_ctx: dict, fl_ctx: FLContext) Tuple[bool, Shareable] [source]¶
Consume the received Shareable object in the stream.
- Parameters:
stream_ctx – the stream context data.
shareable – the Shareable object to be processed
fl_ctx – the FLContext object
Returns: a tuple of (whether to continue streaming, reply message)
Note: the channel and topic here are defined by the app. They are not the regular message headers (CHANNEL and TOPIC) defined in MessageHeaderKey.
- class ObjectProducer[source]¶
Bases:
ABC
- abstract process_replies(replies: Dict[str, Shareable], stream_ctx: dict, fl_ctx: FLContext) Any [source]¶
Called to process replies from receivers of the last Shareable object sent to them.
- Parameters:
replies – replies from receivers. It’s dict of site_name => reply
stream_ctx – stream context data
fl_ctx – the FLContext object
Returns: Any object or None
If None is returned, the streaming will continue; otherwise the streaming stops and the returned object is returned as the final result of the streaming.
- abstract produce(stream_ctx: dict, fl_ctx: FLContext) Tuple[Shareable, float] [source]¶
Called to produce the next Shareable object to be sent. If this method needs to take long time, it should check the abort_signal in the fl_ctx frequently. If aborted it should return immediately. You can get the abort_signal by calling fl_ctx.get_run_abort_signal().
- Parameters:
stream_ctx – stream context data
fl_ctx – The FLContext object
Returns: a tuple of (Shareable object to be sent, timeout for sending this object)
- class StreamContextKey[source]¶
Bases:
object
- CHANNEL = '__channel__'¶
- RC = '__RC__'¶
- TOPIC = '__topic__'¶
- class StreamableEngine[source]¶
Bases:
ABC
This class defines requirements for streaming capable engines.
- abstract register_stream_processing(channel: str, topic: str, factory: ConsumerFactory, stream_done_cb=None, **cb_kwargs)[source]¶
Register a ConsumerFactory for specified app channel and topic. Once a new streaming request is received for the channel/topic, the registered factory will be used to create an ObjectConsumer object to handle the new stream.
Note: the factory should generate a new ObjectConsumer every time get_consumer() is called. This is because multiple streaming sessions could be going on at the same time. Each streaming session should have its own ObjectConsumer.
- Parameters:
channel – app channel
topic – app topic
factory – the factory to be registered
stream_done_cb – the callback to be called when streaming is done on receiving side
Returns: None
- abstract stream_objects(channel: str, topic: str, stream_ctx: dict, targets: List[str], producer: ObjectProducer, fl_ctx: FLContext, optional=False, secure=False)[source]¶
Send a stream of Shareable objects to receivers.
- Parameters:
channel – the channel for this stream
topic – topic of the stream
stream_ctx – context of the stream
targets – receiving sites
producer – the ObjectProducer that can produces the stream of Shareable objects
fl_ctx – the FLContext object
optional – whether the stream is optional
secure – whether to use P2P security
Returns: result from the generator’s reply processing