nvflare.app_common.streamers.object_retriever module¶
- class ObjectRetriever(topic: str | None = None)[source]¶
Bases:
FLComponent
,ABC
This is the base class for object retrieval with streaming. The retrieval works as follows: - The requesting site initiates the process by sending a data request to the site that has the data; - The requesting site then waits for the data to be completely received; - Once the data request is received, the data owner site streams the data to the requesting site; - During the streaming process, the requesting site keeps checking for the completion of the streaming until either the data is completely received, or timed out, or aborted.
Init FLComponent.
The FLComponent is the base class of all FL Components. (executors, controllers, responders, filters, aggregators, and widgets are all FLComponents)
FLComponents have the capability to handle and fire events and contain various methods for logging.
- abstract do_stream(target: str, request: Shareable, fl_ctx: FLContext, stream_ctx: dict, validation_data: Any) Any [source]¶
Object sending side. Called to stream data to the requesting side.
- Parameters:
target – the requesting site to stream to
request – the object retrieval request
fl_ctx – a FLContext object
stream_ctx – stream context data
validation_data – the validation data produced by the validate_request method.
Returns: Any object
- abstract get_result(stream_ctx: dict) -> (<class 'str'>, typing.Any)[source]¶
Object requesting side, which is also the stream receiving side. Called to get the result of the streaming.
- Parameters:
stream_ctx – StreamContext object
Returns: tuple of (ReturnCode, Result Object)
- handle_event(event_type: str, fl_ctx: FLContext)[source]¶
Handles events.
- Parameters:
event_type (str) – event type fired by workflow.
fl_ctx (FLContext) – FLContext information.
- abstract register_stream_processing(channel: str, topic: str, fl_ctx: FLContext, stream_done_cb, **cb_kwargs)[source]¶
Object requester side, which will receive data stream. This is called to register the status_cb for received stream.
- Parameters:
channel – stream channel
topic – stream topic
fl_ctx – FLContext object
stream_done_cb – the stream_done callback to be registered
**cb_kwargs – kwargs to be passed to the CB
Returns:
- retrieve(from_site: str, fl_ctx: ~nvflare.apis.fl_context.FLContext, timeout: float, **obj_attrs) -> (<class 'str'>, typing.Any)[source]¶
Retrieve an object from a specified site.
- Parameters:
from_site – the site to retrieve the object from
fl_ctx – a FLContext object
timeout – max number of seconds to wait for the data
**obj_attrs – attributes of the object to be retrieved
Returns: tuple of (ReturnCode, Retrieved Data)
- abstract validate_request(request: ~nvflare.apis.shareable.Shareable, fl_ctx: ~nvflare.apis.fl_context.FLContext) -> (<class 'str'>, typing.Any)[source]¶
Object sending side. Called to validate the received retrieval request.
- Parameters:
request – the request to be validated
fl_ctx – FLContext object
Returns: tuple of (ReturnCode, Validation Data) This method should do as much as possible so that the do_stream method won’t be called if any error is detected (the do_stream method is called in a separate thread). The validation data produced by this method will be passed to the do_stream method.