Source code for nvflare.fuel.utils.json_scanner

# Copyright (c) 2021, NVIDIA CORPORATION.  All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import copy
import logging
from abc import ABC, abstractmethod

from nvflare.fuel.common.excepts import ComponentNotAuthorized, ConfigError
from nvflare.fuel.utils.config_factory import ConfigFactory
from nvflare.security.logging import secure_format_exception, secure_log_traceback


[docs]class Node(object): def __init__(self, element): """A JSON element with additional data. Args: element: element to create Node object for """ self.parent = None self.element = element self.level = 0 self.key = "" self.position = 0 self.paths = [] self.processor = None self.exit_cb = None # node_exit_cb_signature(node: Node) self.props = {}
[docs] def path(self): if len(self.paths) <= 0: return "" return ".".join(self.paths)
[docs] def parent_element(self): if self.parent: return self.parent.element else: return None
def _child_node(node: Node, key, pos, element) -> Node: child = Node(element) child.processor = node.processor child.level = node.level + 1 child.position = pos child.parent = node child.paths = copy.copy(node.paths) child.key = key if pos > 0: child.key = "#{}".format(pos) child.paths.append(child.key) return child
[docs]class JsonObjectProcessor(ABC): """JsonObjectProcessor is used to process JSON elements by the scan_json() function."""
[docs] @abstractmethod def process_element(self, node: Node): """This method is called by the scan() function for each JSON element scanned. Args: node: the node representing the JSON element """ pass
[docs]class JsonScanner(object): def __init__(self, json_data: dict, location=None): """Scanner for processing JSON data. Args: json_data: dictionary containing json data to scan location: location to provide in error messages """ if not isinstance(json_data, dict): raise ValueError("json_data must be dict") self.location = location self.data = json_data self.logger = logging.getLogger("JsonScanner") def _do_scan(self, node: Node): try: node.processor.process_element(node) except ComponentNotAuthorized as e: secure_log_traceback(self.logger) if self.location: raise ComponentNotAuthorized( "Error processing {} in JSON element {}: path: {}, exception: {}".format( self.location, node.element, node.path(), secure_format_exception(e) ) ) else: raise ComponentNotAuthorized( "Error in JSON element: {}, path: {}, exception: {}".format( node.element, node.path(), secure_format_exception(e) ) ) except Exception as e: secure_log_traceback(self.logger) config = ConfigFactory.load_config(self.location[0]) elmt_str = config.to_str(node.element) location = config.get_location() raise ConfigError(self.get_process_err_msg(e, elmt_str, location, node)) element = node.element if isinstance(element, dict): # need to make a copy of the element dict in case the processor modifies the dict iter_dict = copy.copy(element) for k, v in iter_dict.items(): self._do_scan(_child_node(node, k, 0, v)) elif isinstance(element, list): for i in range(len(element)): self._do_scan(_child_node(node, node.key, i + 1, element[i])) if node.exit_cb is not None: try: node.exit_cb(node) except Exception as e: raise ConfigError(self.get_post_proces_err_msg(e, node))
[docs] def get_process_err_msg(self, e, elmt, location, node): location_msg = f" processing '{location}' " if location else "" msg = "Error{}in element '{}': path: '{}', exception: '{}'".format( location_msg, elmt, node.path(), secure_format_exception(e) ) return msg
[docs] def get_post_proces_err_msg(self, e, node): location = f" {self.location} in " if self.location else "" msg = "Error post-processing{}JSON element: {}, exception: {}".format( location, node.path(), secure_format_exception(e) ) return msg
[docs] def scan(self, processor: JsonObjectProcessor): if not isinstance(processor, JsonObjectProcessor): raise ValueError(f"processor must be JsonObjectProcessor, but got type {type(processor)}") node = Node(self.data) node.processor = processor self._do_scan(node)