Source code for nvflare.fuel.sec.audit

# Copyright (c) 2021, NVIDIA CORPORATION.  All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import uuid
from datetime import datetime

EXCLUDED_ACTIONS = {
    "scheduler.check_resource",
    "_check_session",
    "_commands",
    "__aux_command__",
}


[docs]class Auditor(object): def __init__(self, audit_file_name: str): """Manages the audit file to log events. Args: audit_file_name (str): the location to save audit log file """ assert isinstance(audit_file_name, str), "audit_file_name must be str" if os.path.exists(audit_file_name): assert os.path.isfile(audit_file_name), "audit_file_name is not a valid file" # create/open the file self.audit_file = open(audit_file_name, "a")
[docs] def add_event(self, user: str, action: str, ref: str = "", msg: str = "") -> str: if action in EXCLUDED_ACTIONS: return "" # server might already shut down, the audit_file could be None if self.audit_file is None: return "" event_id = uuid.uuid4() parts = [ f"[E:{event_id}]", f"[R:{ref}]" if ref else "", f"[T:{datetime.now()}]", f"[U:{user}]", f"[A:{action}]", msg if msg else "", ] line = "".join(parts) self.audit_file.write(line + "\n") self.audit_file.flush() return str(event_id)
[docs] def add_job_event( self, job_id: str, scope_name: str = "", task_name: str = "", task_id: str = "", ref: str = "", msg: str = "" ) -> str: event_id = uuid.uuid4() parts = [ f"[E:{event_id}]", f"[R:{ref}]" if ref else "", f"[T:{datetime.now()}]", f"[S:{scope_name}]" if scope_name else "", f"[J:{job_id}]", f"[A:{task_name}#{task_id}]" if task_name else "", msg if msg else "", ] line = "".join(parts) self.audit_file.write(line + "\n") self.audit_file.flush() return str(event_id)
[docs] def close(self): if self.audit_file is not None: self.audit_file.close() self.audit_file = None
[docs]class AuditService(object): """Service for interacting with Auditor to add events to log.""" the_auditor = None
[docs] @staticmethod def initialize(audit_file_name: str): if not AuditService.the_auditor: AuditService.the_auditor = Auditor(audit_file_name) return AuditService.the_auditor
[docs] @staticmethod def get_auditor(): return AuditService.the_auditor
[docs] @staticmethod def add_event(user: str, action: str, ref: str = "", msg: str = "") -> str: if not AuditService.the_auditor: return "" return AuditService.the_auditor.add_event(user, action, ref, msg)
[docs] @staticmethod def add_job_event( job_id: str, scope_name: str = "", task_name: str = "", task_id: str = "", ref: str = "", msg: str = "" ) -> str: if not AuditService.the_auditor: return "" return AuditService.the_auditor.add_job_event( scope_name=scope_name, job_id=job_id, task_name=task_name, task_id=task_id, ref=ref, msg=msg )
[docs] @staticmethod def close(): if AuditService.the_auditor: AuditService.the_auditor.close()