nvflare.app_common.streamers.object_retriever module

class ObjectRetriever(topic: str | None = None)[source]

Bases: FLComponent, ABC

This is the base class for object retrieval with streaming. The retrieval works as follows: - The requesting site initiates the process by sending a data request to the site that has the data; - The requesting site then waits for the data to be completely received; - Once the data request is received, the data owner site streams the data to the requesting site; - During the streaming process, the requesting site keeps checking for the completion of the streaming until either the data is completely received, or timed out, or aborted.

Init FLComponent.

The FLComponent is the base class of all FL Components. (executors, controllers, responders, filters, aggregators, and widgets are all FLComponents)

FLComponents have the capability to handle and fire events and contain various methods for logging.

abstract do_stream(target: str, request: Shareable, fl_ctx: FLContext, stream_ctx: dict, validation_data: Any) Any[source]

Object sending side. Called to stream data to the requesting side.

Parameters:
  • target – the requesting site to stream to

  • request – the object retrieval request

  • fl_ctx – a FLContext object

  • stream_ctx – stream context data

  • validation_data – the validation data produced by the validate_request method.

Returns: Any object

abstract get_result(stream_ctx: dict) -> (<class 'str'>, typing.Any)[source]

Object requesting side, which is also the stream receiving side. Called to get the result of the streaming.

Parameters:

stream_ctx – StreamContext object

Returns: tuple of (ReturnCode, Result Object)

handle_event(event_type: str, fl_ctx: FLContext)[source]

Handles events.

Parameters:
  • event_type (str) – event type fired by workflow.

  • fl_ctx (FLContext) – FLContext information.

abstract register_stream_processing(channel: str, topic: str, fl_ctx: FLContext, stream_done_cb, **cb_kwargs)[source]

Object requester side, which will receive data stream. This is called to register the status_cb for received stream.

Parameters:
  • channel – stream channel

  • topic – stream topic

  • fl_ctx – FLContext object

  • stream_done_cb – the stream_done callback to be registered

  • **cb_kwargs – kwargs to be passed to the CB

Returns:

retrieve(from_site: str, fl_ctx: ~nvflare.apis.fl_context.FLContext, timeout: float, **obj_attrs) -> (<class 'str'>, typing.Any)[source]

Retrieve an object from a specified site.

Parameters:
  • from_site – the site to retrieve the object from

  • fl_ctx – a FLContext object

  • timeout – max number of seconds to wait for the data

  • **obj_attrs – attributes of the object to be retrieved

Returns: tuple of (ReturnCode, Retrieved Data)

abstract validate_request(request: ~nvflare.apis.shareable.Shareable, fl_ctx: ~nvflare.apis.fl_context.FLContext) -> (<class 'str'>, typing.Any)[source]

Object sending side. Called to validate the received retrieval request.

Parameters:
  • request – the request to be validated

  • fl_ctx – FLContext object

Returns: tuple of (ReturnCode, Validation Data) This method should do as much as possible so that the do_stream method won’t be called if any error is detected (the do_stream method is called in a separate thread). The validation data produced by this method will be passed to the do_stream method.