nvflare.app_opt.tracking.mlflow.mlflow_receiver module

class MLflowReceiver(tracking_uri: str | None = None, kwargs: dict | None = None, artifact_location: str | None = None, events=None, buffer_flush_time=1)[source]

Bases: AnalyticsReceiver

MLflowReceiver receives log events from clients and deliver them to the MLflow tracking server.

Parameters:
  • tracking_uri (Optional[str], optional) – MLflow tracking server URI. When this is not specified, the metrics will be written to the local file system. If the tracking URI is specified, the MLflow tracking server must started before running the job. Defaults to None.

  • kwargs (Optional[dict], optional) – keyword arguments: “experiment_name” (str): Specifies the experiment name. If not specified, the default name of “FLARE FL Experiment” will be used. “run_name” (str): Specifies the run name “experiment_tags” (dict): Tags used when creating the MLflow experiment. “mlflow.note.content” is a special MLflow tag. When provided, it displays as experiment description field on the MLflow UI. You can use Markdown syntax for the description. “run_tags” (str): Tags used when creating the MLflow run. “mlflow.note.content” is a special MLflow tag. When provided, it displays as run description field on the MLflow UI. You can use Markdown syntax for the description.

  • artifact_location (Optional[str], optional) – Relative location of artifacts. Currently only text is supported at the moment.

  • events (_type_, optional) – The event the receiver is listening to. By default, it listens to “fed.analytix_log_stats”.

  • buffer_flush_time (int, optional) – The time in seconds between deliveries of event data to the MLflow tracking server. The data is buffered and then delivered to the MLflow tracking server in batches, and the buffer_flush_time controls the frequency of the sending. By default, the buffer flushes every second. You can reduce the time to a fraction of a second if you prefer less delay. Keep in mind that reducing the buffer_flush_time will potentially cause high traffic to the MLflow tracking server, which in some cases can actually cause more latency.

buffer_data(data: AnalyticsData, record_origin: str) None[source]

Buffer the data to send later.

A buffer for each data_type is in each site_buffer, all of which are in self.buffer

Parameters:
  • data (AnalyticsData) – Data.

  • record_origin (str) – Origin of the data, or site name.

finalize(fl_ctx: FLContext)[source]

Finalizes the receiver.

Called after EventType.END_RUN.

Parameters:

fl_ctx (FLContext) – fl context.

flush_buffer(record_origin)[source]

Flush the buffer and send all the data to the MLflow tracking server.

Parameters:

record_origin (str) – Origin of the data, or site name.

get_artifact_location(relative_path: str)[source]
get_job_id_tag(group_id: str) str[source]
get_mlflow_client(site_id: str) MlflowClient[source]
get_run_id(site_id: str) str[source]
get_run_name(kwargs: dict, default_name: str, site_name: str, run_group_id: str)[source]
get_run_tags(kwargs, run_group_id, run_name: str)[source]
get_target_type(data_type: AnalyticsDataType)[source]
initialize(fl_ctx: FLContext)[source]

Initializes MlflowClient for each site.

An MlflowClient for each client site is created, an experiment is created, and a run is created. The kwargs in the params for MLflowReceiver for “experiment_name” and “experiment_tags” are used for the experiment if provided. The “run_tags” are used for the run tags as well as “job_id” and “run_name” which are automatically generated. The “run_name” from kwargs is concatenated after the site name and job_id: {site_name}-{job_id_tag}-{run_name}.

Parameters:

fl_ctx (FLContext) – the FLContext

mlflow_setup(art_full_path, experiment_name, experiment_tags, sites)[source]

Set up an MlflowClient for each client site and create an experiment and run.

Parameters:
  • art_full_path (str) – Full path to artifacts.

  • experiment_name (str) – Experiment name.

  • experiment_tags (dict) – Experiment tags.

  • sites (List[Client]) – List of client sites.

pop_from_buffer(log_buffer)[source]
save(fl_ctx: FLContext, shareable: Shareable, record_origin: str)[source]

Saves the received data.

Specific implementations of AnalyticsReceiver will implement save in their own way.

Parameters:
  • fl_ctx (FLContext) – fl context.

  • shareable (Shareable) – the received message.

  • record_origin (str) – the sender of this message / record.

class MlflowConstants[source]

Bases: object

EXPERIMENT_NAME = 'experiment_name'
EXPERIMENT_TAG = 'experiment_tag'
RUN_TAG = 'run_tag'
get_current_time_millis()[source]