nvflare.apis.streaming module

class ConsumerFactory[source]

Bases: ABC

abstract get_consumer(stream_ctx: dict, fl_ctx: FLContext) ObjectConsumer[source]

Called to get an ObjectConsumer to process a new stream on the receiving side. This is called only when the 1st streaming object is received for each stream.

Parameters:
  • stream_ctx – the context of the stream

  • fl_ctx – FLContext object

Returns: an ObjectConsumer

return_consumer(consumer: ObjectConsumer, stream_ctx: dict, fl_ctx: FLContext)[source]

Return the consumer back to the factory after a stream is finished on the receiving side.

Parameters:
  • consumer – the consumer to be returned

  • stream_ctx – context of the stream

  • fl_ctx – FLContext object

Returns: None

class ObjectConsumer[source]

Bases: ABC

abstract consume(shareable: Shareable, stream_ctx: dict, fl_ctx: FLContext) Tuple[bool, Shareable][source]

Consume the received Shareable object in the stream.

Parameters:
  • stream_ctx – the stream context data.

  • shareable – the Shareable object to be processed

  • fl_ctx – the FLContext object

Returns: a tuple of (whether to continue streaming, reply message)

Note: the channel and topic here are defined by the app. They are not the regular message headers (CHANNEL and TOPIC) defined in MessageHeaderKey.

finalize(stream_ctx: dict, fl_ctx: FLContext)[source]

Called to finalize the generator.

Parameters:
  • stream_ctx – stream context

  • fl_ctx – the FLContext object

Returns: None

This method is guaranteed to be called at the end of streaming.

class ObjectProducer[source]

Bases: ABC

abstract process_replies(replies: Dict[str, Shareable], stream_ctx: dict, fl_ctx: FLContext) Any[source]

Called to process replies from receivers of the last Shareable object sent to them.

Parameters:
  • replies – replies from receivers. It’s dict of site_name => reply

  • stream_ctx – stream context data

  • fl_ctx – the FLContext object

Returns: Any object or None

If None is returned, the streaming will continue; otherwise the streaming stops and the returned object is returned as the final result of the streaming.

abstract produce(stream_ctx: dict, fl_ctx: FLContext) Tuple[Shareable, float][source]

Called to produce the next Shareable object to be sent. If this method needs to take long time, it should check the abort_signal in the fl_ctx frequently. If aborted it should return immediately. You can get the abort_signal by calling fl_ctx.get_run_abort_signal().

Parameters:
  • stream_ctx – stream context data

  • fl_ctx – The FLContext object

Returns: a tuple of (Shareable object to be sent, timeout for sending this object)

class StreamContextKey[source]

Bases: object

CHANNEL = '__channel__'
RC = '__RC__'
TOPIC = '__topic__'
class StreamableEngine[source]

Bases: ABC

This class defines requirements for streaming capable engines.

abstract register_stream_processing(channel: str, topic: str, factory: ConsumerFactory, stream_done_cb=None, **cb_kwargs)[source]

Register a ConsumerFactory for specified app channel and topic. Once a new streaming request is received for the channel/topic, the registered factory will be used to create an ObjectConsumer object to handle the new stream.

Note: the factory should generate a new ObjectConsumer every time get_consumer() is called. This is because multiple streaming sessions could be going on at the same time. Each streaming session should have its own ObjectConsumer.

Parameters:
  • channel – app channel

  • topic – app topic

  • factory – the factory to be registered

  • stream_done_cb – the callback to be called when streaming is done on receiving side

Returns: None

abstract shutdown_streamer()[source]

Shutdown the engine’s streamer.

Returns: None

abstract stream_objects(channel: str, topic: str, stream_ctx: dict, targets: List[str], producer: ObjectProducer, fl_ctx: FLContext, optional=False, secure=False)[source]

Send a stream of Shareable objects to receivers.

Parameters:
  • channel – the channel for this stream

  • topic – topic of the stream

  • stream_ctx – context of the stream

  • targets – receiving sites

  • producer – the ObjectProducer that can produces the stream of Shareable objects

  • fl_ctx – the FLContext object

  • optional – whether the stream is optional

  • secure – whether to use P2P security

Returns: result from the generator’s reply processing

stream_done_cb_signature(stream_ctx: dict, fl_ctx: FLContext, **kwargs)[source]

This is the signature of stream_done_cb.

Parameters:
  • stream_ctx – context of the stream

  • fl_ctx – FLContext object

  • **kwargs – the kwargs specified when registering the stream_done_cb.

Returns: None