# Copyright (c) 2023, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import enum
import time
from abc import ABC, abstractmethod
from typing import List, Optional
[docs]class MonitorReturnCode(int, enum.Enum):
JOB_FINISHED = 0
TIMEOUT = 1
ENDED_BY_CB = 2
[docs]class NoConnection(Exception):
pass
[docs]class SessionClosed(Exception):
pass
[docs]class InvalidArgumentError(Exception):
pass
[docs]class InvalidJobDefinition(Exception):
pass
[docs]class JobNotFound(Exception):
pass
[docs]class JobNotRunning(Exception):
pass
[docs]class JobNotDone(Exception):
pass
[docs]class InternalError(Exception):
pass
[docs]class AuthenticationError(Exception):
pass
[docs]class AuthorizationError(Exception):
pass
[docs]class NoClientsAvailable(Exception):
pass
[docs]class ClientsStillRunning(Exception):
pass
[docs]class InvalidTarget(Exception):
pass
[docs]class TargetType:
ALL = "all"
SERVER = "server"
CLIENT = "client"
[docs]class ServerInfo:
def __init__(self, status, start_time):
self.status = status
self.start_time = start_time
def __str__(self) -> str:
return f"status: {self.status}, start_time: {time.asctime(time.localtime(self.start_time))}"
[docs]class ClientInfo:
def __init__(self, name: str, last_connect_time):
self.name = name
self.last_connect_time = last_connect_time
def __str__(self) -> str:
return f"{self.name}(last_connect_time: {time.asctime(time.localtime(self.last_connect_time))})"
[docs]class JobInfo:
def __init__(self, job_id: str, app_name: str):
self.job_id = job_id
self.app_name = app_name
def __str__(self) -> str:
return f"JobInfo:\n job_id: {self.job_id}\n app_name: {self.app_name}"
[docs]class SystemInfo:
def __init__(self, server_info: ServerInfo, client_info: List[ClientInfo], job_info: List[JobInfo]):
self.server_info = server_info
self.client_info = client_info
self.job_info = job_info
def __str__(self) -> str:
client_info_str = "\n".join(map(str, self.client_info))
job_info_str = "\n".join(map(str, self.job_info))
return (
f"SystemInfo\nserver_info:\n{self.server_info}\nclient_info:\n{client_info_str}\njob_info:\n{job_info_str}"
)
[docs]class SessionSpec(ABC):
[docs] @abstractmethod
def submit_job(self, job_definition_path: str) -> str:
"""Submit a predefined job to the NVFLARE system
Args:
job_definition_path: path to the folder that defines a NVFLARE job
Returns: the job id if accepted by the system
If the submission fails, exception will be raised:
"""
pass
[docs] @abstractmethod
def clone_job(self, job_id: str) -> str:
"""Create a new job by cloning a specified job
Args:
job_id: job to be cloned
Returns: ID of the new job
"""
pass
[docs] @abstractmethod
def list_jobs(self, detailed: bool = False, all: bool = False) -> List[dict]:
"""Get the job info from the server
Args:
detailed: True to get the detailed information for each job, False by default
all: True to get jobs submitted by all users (default is to only list jobs submitted by the same user)
Returns: a list of job metadata
"""
pass
[docs] @abstractmethod
def download_job_result(self, job_id: str) -> str:
"""
Download result of the job
Args:
job_id: ID of the job
Returns: folder path to the location of the job result
If the job size is smaller than the maximum size set on the server, the job will download to the download_dir
set in Session through the admin config, and the path to the downloaded result will be returned. If the size
of the job is larger than the maximum size, the location to download the job will be returned.
"""
pass
[docs] @abstractmethod
def abort_job(self, job_id: str):
"""Abort the specified job
Args:
job_id: job to be aborted
Returns: None
If the job is already done, no effect;
If job is not started yet, it will be cancelled and won't be scheduled.
If the job is being executed, it will be aborted
"""
pass
[docs] @abstractmethod
def delete_job(self, job_id: str):
"""Delete the specified job completely from the system
Args:
job_id: job to be deleted
Returns: None
If the job is being executed, the job will be stopped first.
Everything of the job will be deleted from the job store, as well as workspaces on
the FL server and clients.
"""
pass
[docs] @abstractmethod
def get_system_info(self) -> SystemInfo:
"""Get general info of the FLARE system"""
pass
[docs] @abstractmethod
def get_client_job_status(self, client_names: List[str] = None) -> List[dict]:
"""Get job status info of specified FL clients
Args:
client_names: names of the clients to get status info
Returns: A list of jobs running on the clients. Each job is described by a dict of: id, app name and status.
If there are multiple jobs running on one client, the list contains one entry for each job for that client.
If no FL clients are connected or the server failed to communicate to them, this method returns None.
"""
pass
[docs] @abstractmethod
def restart(self, target_type: str, client_names: Optional[List[str]] = None) -> dict:
"""
Restart specified system target(s)
Args:
target_type: what system target (server, client, or all) to restart
client_names: clients to be restarted if target_type is client. If not specified, all clients.
Returns: a dict that contains detailed info about the restart request:
status - the overall status of the result.
server_status - whether the server is restarted successfully - only if target_type is "all" or "server".
client_status - a dict (keyed on client name) that specifies status of each client - only if target_type
is "all" or "client".
"""
pass
[docs] @abstractmethod
def shutdown(self, target_type: TargetType, client_names: Optional[List[str]] = None):
"""Shut down specified system target(s)
Args:
target_type: what system target (server, client, or all) to shut down
client_names: clients to be shut down if target_type is client. If not specified, all clients.
Returns: None
"""
pass
[docs] @abstractmethod
def set_timeout(self, value: float):
"""
Set a session-specific command timeout. This is the amount of time the server will wait for responses
after sending commands to FL clients.
Note that this value is only effective for the current API session.
Args:
value: a positive float number
Returns: None
"""
pass
[docs] @abstractmethod
def unset_timeout(self):
"""
Unset the session-specific command timeout. Once unset, the FL Admin Server's default will be used.
Returns: None
"""
pass
[docs] @abstractmethod
def list_sp(self) -> dict:
"""List available service providers
Returns: a dict that contains information about the primary SP and others
"""
pass
[docs] @abstractmethod
def get_active_sp(self) -> dict:
"""Get the current active service provider (SP).
Returns: a dict that describes the current active SP. If no SP is available currently, the 'name' attribute of
the result is empty.
"""
pass
[docs] @abstractmethod
def get_available_apps_to_upload(self):
"""Get defined FLARE app folders from the upload folder on the machine the FLARE API is running
Returns: a list of app folders
"""
pass
[docs] @abstractmethod
def shutdown_system(self):
"""Shut down the whole NVFLARE system including the overseer, FL server(s), and all FL clients.
Returns: None
Note: the user must be a Project Admin to use this method; otherwise the NOT_AUTHORIZED exception will raise.
"""
pass
[docs] @abstractmethod
def ls_target(self, target: str, options: str = None, path: str = None) -> str:
"""Run the "ls" command on the specified target and return result
Args:
target: the target (server or a client name) the command will run
options: options of the "ls" command
path: the optional file path
Returns: result of "ls" command
"""
pass
[docs] @abstractmethod
def cat_target(self, target: str, options: str = None, file: str = None) -> str:
"""Run the "cat" command on the specified target and return result
Args:
target: the target (server or a client name) the command will run
options: options of the "cat" command
file: the file that the "cat" command will run against
Returns: result of "cat" command
"""
pass
[docs] @abstractmethod
def tail_target(self, target: str, options: str = None, file: str = None) -> str:
"""Run the "tail" command on the specified target and return result
Args:
target: the target (server or a client name) the command will run
options: options of the "tail" command
file: the file that the "tail" command will run against
Returns: result of "tail" command
"""
pass
[docs] @abstractmethod
def tail_target_log(self, target: str, options: str = None) -> str:
"""Run the "tail log.txt" command on the specified target and return result
Args:
target: the target (server or a client name) the command will run
options: options of the "tail" command
Returns: result of "tail" command
"""
pass
[docs] @abstractmethod
def head_target(self, target: str, options: str = None, file: str = None) -> str:
"""Run the "head" command on the specified target and return result
Args:
target: the target (server or a client name) the command will run
options: options of the "head" command
file: the file that the "head" command will run against
Returns: result of "head" command
"""
pass
[docs] @abstractmethod
def head_target_log(self, target: str, options: str = None) -> str:
"""Run the "head log.txt" command on the specified target and return result
Args:
target: the target (server or a client name) the command will run
options: options of the "head" command
Returns: result of "head" command
"""
pass
[docs] @abstractmethod
def grep_target(self, target: str, options: str = None, pattern: str = None, file: str = None) -> str:
"""Run the "grep" command on the specified target and return result
Args:
target: the target (server or a client name) the command will run
options: options of the "grep" command
pattern: the grep pattern
file: the file that the "grep" command will run against
Returns: result of "grep" command
"""
pass
[docs] @abstractmethod
def get_working_directory(self, target: str) -> str:
"""Get the working directory of the specified target
Args:
target: the target (server of a client name)
Returns: current working directory of the specified target
"""
pass
[docs] @abstractmethod
def show_stats(self, job_id: str, target_type: str, targets: Optional[List[str]] = None) -> dict:
"""Show processing stats of specified job on specified targets
Args:
job_id: ID of the job
target_type: type of target (server or client)
targets: list of client names if target type is "client". All clients if not specified.
Returns: a dict that contains job stats on specified targets. The key of the dict is target name. The value is
a dict of stats reported by different system components (ServerRunner or ClientRunner).
"""
pass
[docs] @abstractmethod
def show_errors(self, job_id: str, target_type: str, targets: Optional[List[str]] = None) -> dict:
"""Show processing errors of specified job on specified targets
Args:
job_id: ID of the job
target_type: type of target (server or client)
targets: list of client names if target type is "client". All clients if not specified.
Returns: a dict that contains job errors (if any) on specified targets. The key of the dict is target name.
The value is a dict of errors reported by different system components (ServerRunner or ClientRunner).
"""
pass
[docs] @abstractmethod
def reset_errors(self, job_id: str):
"""Clear errors for all system targets for the specified job
Args:
job_id: ID of the job
Returns: None
"""
pass
[docs] @abstractmethod
def get_connected_client_list(self) -> List[ClientInfo]:
"""Get the list of connected clients
Returns: a list of ClientInfo objects
"""
pass
[docs] @abstractmethod
def monitor_job(
self, job_id: str, timeout: int = 0, poll_interval: float = 2.0, cb=None, *cb_args, **cb_kwargs
) -> MonitorReturnCode:
"""Monitor the job progress until one of the conditions occurs:
- job is done
- timeout
- the status_cb returns False
Args:
job_id: the job to be monitored
timeout: how long to monitor. If 0, never time out.
poll_interval: how often to poll job status
cb: if provided, callback to be called after each poll
Returns: a MonitorReturnCode
Every time the cb is called, it must return a bool indicating whether the monitor
should continue. If False, this method ends.
"""
pass
[docs] @abstractmethod
def close(self):
"""Close the session
Returns:
"""
pass
[docs]def job_monitor_cb_signature(session: SessionSpec, job_id: str, job_mea: dict, *args, **kwargs) -> bool:
"""
Args:
session: the session
job_id: ID of the job being monitored
job_mea: meta info of the job
*args:
**kwargs:
Returns:
"""
pass