nvflare.fuel.hci.client.api module

class AdminAPI(user_name: str, admin_config: dict, cmd_modules: List | None = None, debug: bool = False, auto_login_max_tries: int = 15, event_handlers=None, study: str = 'default')[source]

Bases: AdminAPISpec, StreamableEngine

API to keep certs, keys and connection information and to execute admin commands through do_command.

Parameters:
  • cmd_modules – command modules to load and register. Note that FileTransferModule is initialized here with upload_dir and download_dir if cmd_modules is None.

  • user_name – Username to authenticate with FL server

  • debug – Whether to print debug messages, which can help with diagnosing problems. False by default.

  • auto_login_max_tries – maximum number of tries to auto-login.

check_command(command: str) CommandInfo[source]

Checks the specified command for processing info

Parameters:

command – command to be checked

Returns: command processing info

close()[source]
connect(timeout=None)[source]
debug(msg)[source]
do_command(command: str, props=None)[source]

A convenient method to call commands using string.

Parameters:
  • command (str) – command

  • props – additional props

Returns:

Object containing status and details (or direct response from server, which originally was just time and data)

download_file(source_fqcn: str, ref_id: str, file_name: str)[source]
fire_event(event_type: str, ctx: EventContext)[source]
fire_session_event(event_type: str, msg: str = '')[source]
get_cell()[source]
is_ready() bool[source]

Whether the API is ready for executing commands.

login()[source]
logout()[source]

Send logout command to server.

new_context()[source]
register_command(cmd_entry)[source]
register_stream_processing(channel: str, topic: str, factory: ConsumerFactory, stream_done_cb=None, consumed_cb=None, **cb_kwargs)[source]

Register a ConsumerFactory for specified app channel and topic. Once a new streaming request is received for the channel/topic, the registered factory will be used to create an ObjectConsumer object to handle the new stream.

Note: the factory should generate a new ObjectConsumer every time get_consumer() is called. This is because multiple streaming sessions could be going on at the same time. Each streaming session should have its own ObjectConsumer.

Parameters:
  • channel – app channel

  • topic – app topic

  • factory – the factory to be registered

  • stream_done_cb – the callback to be called when streaming is done on receiving side

  • consumed_cb – the callback to be called after a chunk is processed

Returns: None

server_execute(command, reply_processor=None, cmd_entry=None, cmd_ctx=None, props=None, headers=None)[source]

Executes a command on server side.

Parameters:
  • command – The command to be executed.

  • reply_processor – processor to process reply from server

  • cmd_ctx – command context

set_command_timeout(timeout: float)[source]
shutdown_streamer()[source]

Shutdown the engine’s streamer.

Returns: None

stream_objects(channel: str, topic: str, stream_ctx: dict, targets: List[str], producer: ObjectProducer, fl_ctx: FLContext, optional=False, secure=False)[source]

Send a stream of Shareable objects to receivers.

Parameters:
  • channel – the channel for this stream

  • topic – topic of the stream

  • stream_ctx – context of the stream

  • targets – receiving sites

  • producer – the ObjectProducer that can produces the stream of Shareable objects

  • fl_ctx – the FLContext object

  • optional – whether the stream is optional

  • secure – whether to use P2P security

Returns: result from the generator’s reply processing

unset_command_timeout()[source]
upload_file(file_name: str, conn: Connection)[source]
class FileWaiter(tx_id)[source]

Bases: Event

get_stream_ctx()[source]
class ResultKey[source]

Bases: object

AUTH_CODE = 'auth_code'
DETAILS = 'details'
META = 'meta'
STATUS = 'status'