Source code for nvflare.apis.fl_context

# Copyright (c) 2021-2022, NVIDIA CORPORATION.  All rights reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
import threading

from .fl_constant import ReservedKey

_update_lock = threading.Lock()

[docs]class FLContext(object): MASK_STICKY = 1 << 0 MASK_PRIVATE = 1 << 1 @classmethod def _is_sticky(cls, mask) -> bool: return mask & cls.MASK_STICKY > 0 @classmethod def _is_private(cls, mask) -> bool: return mask & cls.MASK_PRIVATE > 0 @classmethod def _matching_private(cls, mask, private) -> bool: return (mask & cls.MASK_PRIVATE > 0) == private @classmethod def _to_string(cls, mask) -> str: if cls._is_private(mask): result = "private:" else: result = "public:" if cls._is_sticky(mask): return result + "sticky" else: return result + "non-sticky" def __init__(self): """Init the FLContext. The FLContext is used to passed data between FL Components. It can be thought of as a dictionary that stores key/value pairs called props (properties). Visibility: private props are only visible to local components, public props are also visible to remote components Stickiness: sticky props become available in all future FL Contexts, non-sticky props will only be available in the current FL Context """ self.model = None self.props = {} self.logger = logging.getLogger(self.__class__.__name__)
[docs] def public_key_exists(self, key): return key in self.props and not self._is_private(self.props[key]["mask"])
[docs] def get_all_public_props(self): return {k: v["value"] for k, v in self.props.items() if not self._is_private(v["mask"])}
[docs] def set_prop(self, key: str, value, private=True, sticky=True): if not isinstance(key, str): raise ValueError("prop key must be str, but got {}".format(type(key))) with _update_lock: mask = 0 if private: mask += FLContext.MASK_PRIVATE if sticky: mask += FLContext.MASK_STICKY if key not in self.props or key in self.props and self.props[key]["mask"] == mask: self.props[key] = {"value": value, "mask": mask} else: existing_mask = self.props[key]["mask"] self.logger.warning( f"property {key} already exists with attributes " f"{self._to_string(existing_mask)}, cannot change to {self._to_string(mask)}" ) return False return True
[docs] def get_prop(self, key, default=None): with _update_lock: if key in self.props: return self.props.get(key)["value"] else: return default
[docs] def get_prop_detail(self, key): with _update_lock: if key in self.props: prop = self.props.get(key) mask = prop["mask"] return {"value": prop["value"], "private": self._is_private(mask), "sticky": self._is_sticky(mask)} else: return None
[docs] def remove_prop(self, key: str): if not isinstance(key, str): return if key.startswith("__"): # do not allow removal of reserved props! return with _update_lock: self.props.pop(key, None)
def __str__(self): raw_list = [f'{k}: {type(v["value"])}' for k, v in self.props.items()] return " ".join(raw_list) # some convenience methods
[docs] def get_engine(self): return self.get_prop(key=ReservedKey.ENGINE, default=None)
[docs] def get_job_id(self): return self.get_prop(key=ReservedKey.RUN_NUM, default=None)
[docs] def get_identity_name(self): return self.get_prop(key=ReservedKey.IDENTITY_NAME, default="")
[docs] def get_run_abort_signal(self): return self.get_prop(key=ReservedKey.RUN_ABORT_SIGNAL, default=None)
[docs] def set_peer_context(self, ctx): self.set_prop(key=ReservedKey.PEER_CTX, value=ctx, private=True, sticky=False)
[docs] def get_peer_context(self): return self.get_prop(key=ReservedKey.PEER_CTX, default=None)
[docs] def set_public_props(self, metadata: dict): # remove all public props self.props = {k: v for k, v in self.props.items() if self._is_private(v["mask"] or self._is_sticky(v["mask"]))} for key, value in metadata.items(): self.set_prop(key, value, private=False, sticky=True)
[docs] def clone_sticky(self): new_fl_ctx = FLContext() for k, v in self.props.items(): if self._is_sticky(v["mask"]): new_fl_ctx.props[k] = {"value": v["value"], "mask": v["mask"]} return new_fl_ctx
[docs] def sync_sticky(self): ctx_manager = self.get_prop(key=ReservedKey.MANAGER, default=None) if not ctx_manager: raise ValueError("FLContextManager does not exist.") ctx_manager.finalize_context(self)
# implement Context Manager protocol def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): mgr = self.get_prop(key=ReservedKey.MANAGER) # if not mgr: # raise ValueError("This FLContext is not managed. Please use FLContextManager to create FLContext") if mgr: mgr.finalize_context(self)
[docs]class FLContextManager(object): """FLContextManager manages the creation and updates of FLContext objects for a run. NOTE: The engine may create a new FLContextManager object for each RUN! """ def __init__(self, engine, identity_name: str, job_id: str, public_stickers, private_stickers): """Init the FLContextManager. Args: engine: the engine that created this FLContextManager object identity_name (str): identity name job_id: the job id public_stickers: public sticky properties that are copied into or copied from private_stickers: private sticky properties that are copied into or copied from """ self.engine = engine self.identity_name = identity_name self.job_id = job_id self._update_lock = threading.Lock() self.public_stickers = {} self.private_stickers = {} if public_stickers and isinstance(public_stickers, dict): self.public_stickers.update(public_stickers) if private_stickers and isinstance(private_stickers, dict): self.private_stickers.update(private_stickers)
[docs] def new_context(self) -> FLContext: """Create a new FLContext object. Sticky properties are copied from the stickers into the new context. Returns: a FLContext object """ ctx = FLContext() ctx.set_prop(key=ReservedKey.MANAGER, value=self, private=True, sticky=False) # set permanent props ctx.set_prop(key=ReservedKey.ENGINE, value=self.engine, private=True) ctx.set_prop(key=ReservedKey.RUN_NUM, value=self.job_id, private=False) if self.identity_name: ctx.set_prop(key=ReservedKey.IDENTITY_NAME, value=self.identity_name, private=False) with self._update_lock: for k, v in self.public_stickers.items(): ctx.set_prop(key=k, value=v, sticky=True, private=False) for k, v in self.private_stickers.items(): ctx.set_prop(key=k, value=v, sticky=True, private=True) return ctx
[docs] def finalize_context(self, ctx: FLContext): """Finalize the context by copying/updating sticky props into stickers. Args: ctx: the context to be finalized """ with self._update_lock: for k, v in ctx.props.items(): if ctx._is_sticky(v["mask"]): if ctx._is_private(v["mask"]): self.private_stickers[k] = v["value"] else: self.public_stickers[k] = v["value"]