flip_api.fl_services.services.fl_service

Exceptions

UnknownJobTypeError

Custom exception for unknown job types in FL

Functions

upload_app(→ Any)

Upload the application to the FL server.

get_fl_backend_job_id_by_model_id(→ str)

Get the FL backend job ID associated with a given model ID

add_fl_backend_job_id(→ None)

Add the FL backend job ID to the FLJob entry in the database

submit_job(→ None)

Submits a job to the FL API that is going to kick off training

check_server_status(...)

Fetch the status of the server from the FL API.

check_client_status(...)

Fetch the status of all clients from the FL API.

fetch_server_status(...)

Fetch the status of the server from the FL API.

fetch_client_status(...)

Fetch the status of the clients from the FL API.

is_client_available(→ bool)

Check if a specific client is available based on its status.

validate_client_availability(→ None)

Validate the availability of clients by checking their status.

abort_job(→ dict)

Aborts a job on the FL server.

start_training(→ None)

Start the training process for a given model by uploading the application and submitting the job.

bundle_nvflare_application(→ str)

Creates the app folder from the base application files and the uploaded files.

bundle_flower_application(→ str)

Creates the app folder from the base application files and the uploaded files.

verify_bundle_paths(→ None)

Verifies that all expected destination keys exist after bundling.

get_bundle_urls(→ list[str])

Creates pre-signed URLs for the bundle files in S3 (containing the application files and model files) that the FL

extract_current_job_data(...)

Extract the currently-running FL job matching fl_backend_job_id.

abort_model_training(→ None)

Check if the model is currently running training, and if it is, send an abort request to the FL server.

add_fl_job(→ None)

Insert a new FL job into the database.

keep_fl_api_session_alive(→ None)

A periodic function to keep the FL API session alive by making a simple request.

Module Contents

exception flip_api.fl_services.services.fl_service.UnknownJobTypeError

Bases: Exception

Custom exception for unknown job types in FL

flip_api.fl_services.services.fl_service.upload_app(model_id: uuid.UUID, training_details: flip_api.domain.interfaces.fl.IStartTrainingBody, endpoint: str) Any

Upload the application to the FL server.

It sends a POST request to the FL API service with the model ID and payload containing the project ID, cohort query, local rounds, global rounds, trusts, ignore result error, aggregator, and aggregation weights.

Parameters:
  • model_id (UUID) – The ID of the model to upload.

  • training_details (IStartTrainingBody) – The payload containing the training details.

  • endpoint (str) – The endpoint of the net (FL API service).

Returns:

The response from the server after uploading the application.

Return type:

Any

flip_api.fl_services.services.fl_service.get_fl_backend_job_id_by_model_id(model_id: uuid.UUID, session: sqlmodel.Session) str

Get the FL backend job ID associated with a given model ID

Parameters:
  • model_id (UUID) – The ID of the model

  • session (Session) – SQLModel session object

Returns:

The FL backend job ID associated with the model ID

Return type:

str

Raises:

ValueError – If the model ID is not found in the database

flip_api.fl_services.services.fl_service.add_fl_backend_job_id(fl_job_id: uuid.UUID, fl_backend_job_id: str, session: sqlmodel.Session) None

Add the FL backend job ID to the FLJob entry in the database

Parameters:
  • fl_job_id (UUID) – The ID of the FLJob entry

  • fl_backend_job_id (str) – The FL backend job ID to add. Needs to be a string as backend job IDs are strings.

  • session (Session) – SQLModel session object

Raises:

ValueError – If the FLJob entry is not found

flip_api.fl_services.services.fl_service.submit_job(fl_job_id: uuid.UUID, endpoint: str, model_id: uuid.UUID, session: sqlmodel.Session) None

Submits a job to the FL API that is going to kick off training

Parameters:
  • fl_job_id (UUID) – The ID of the FL job to add the backend job id given successful job submission

  • endpoint (str) – The endpoint of the FL API service.

  • model_id (UUID) – The ID of the model to start submit the job for.

  • session (Session) – An instance of the database connection.

Raises:

ValueError – If the backend job ID is not returned in the response.

flip_api.fl_services.services.fl_service.check_server_status(endpoint: str) flip_api.domain.interfaces.fl.IServerStatus | None

Fetch the status of the server from the FL API.

Parameters:

endpoint (str) – The endpoint of the server to check the status from.

Returns:

The server status, or None if the FL API did not respond.

Return type:

IServerStatus | None

flip_api.fl_services.services.fl_service.check_client_status(endpoint: str) list[flip_api.domain.interfaces.fl.IClientStatus] | None

Fetch the status of all clients from the FL API.

Parameters:

endpoint (str) – The endpoint of the server to check the status from.

Returns:

A list of client statuses if available, otherwise None.

Return type:

list[IClientStatus] | None

flip_api.fl_services.services.fl_service.fetch_server_status(endpoint: str) flip_api.domain.interfaces.fl.IServerStatus | None

Fetch the status of the server from the FL API.

Parameters:

endpoint (str) – The endpoint of the server to fetch the status from.

Returns:

The server status if available, otherwise None.

Return type:

IServerStatus | None

flip_api.fl_services.services.fl_service.fetch_client_status(endpoint: str) list[flip_api.domain.interfaces.fl.IClientStatus] | None

Fetch the status of the clients from the FL API.

Parameters:

endpoint (str) – The endpoint of the server to fetch the status from.

Returns:

A list of client statuses if available, otherwise None.

Return type:

list[IClientStatus] | None

flip_api.fl_services.services.fl_service.is_client_available(client_name: str, client_statuses: list[flip_api.domain.interfaces.fl.IClientStatus]) bool

Check if a specific client is available based on its status.

Parameters:
  • client_name (str) – The name of the client to check.

  • client_statuses (list[IClientStatus]) – A list of client statuses to check against.

Returns:

True if the client is available, False otherwise.

Return type:

bool

flip_api.fl_services.services.fl_service.validate_client_availability(clients: list[str], endpoint: str) None

Validate the availability of clients by checking their status. It sends a GET request to the FL API service to check the status of the clients. For NVFLARE, raises ValueError if any client is unavailable. For Flower, logs a warning instead — Flower’s SuperLink handles client selection at runtime.

Parameters:
  • clients (list[str]) – A list of client names to check the availability of.

  • endpoint (str) – The endpoint of the FL API service.

Returns:

None

Raises:

ValueError – If any client is unavailable (NVFLARE backend only).

flip_api.fl_services.services.fl_service.abort_job(endpoint: str, job_id: str) dict

Aborts a job on the FL server.

Parameters:
  • endpoint (str) – The endpoint of the FL API service.

  • job_id (str) – The ID of the job to abort.

Returns:

The response from the server after aborting the job.

Return type:

dict

flip_api.fl_services.services.fl_service.start_training(model_id: uuid.UUID, fl_job_id: uuid.UUID, clients: list[str], endpoint: str, bundle_urls: list[str], session: sqlmodel.Session) None

Start the training process for a given model by uploading the application and submitting the job. It first checks if the clients are available, then it bundles the application files, downloads the configuration, and finally uploads the application and submits the job.

Parameters:
  • model_id (UUID) – The ID of the model to start training for.

  • fl_job_id (UUID) – The ID of the FL job to add the backend job id given successful job submission.

  • clients (list[str]) – A list of client names to start training on.

  • endpoint (str) – The endpoint of the FL API service.

  • bundle_urls (list[str]) – A list of URLs for the application bundle.

  • session (Session) – An instance of the database connection.

Raises:

ValueError – If the backend job ID is not returned in the response.

flip_api.fl_services.services.fl_service.bundle_nvflare_application(model_id: uuid.UUID, job_type: flip_api.domain.interfaces.fl.JobTypes = JobTypes.standard) str

Creates the app folder from the base application files and the uploaded files.

It copies the base application files and the model files to the destination bucket. It checks if the destination bucket has any files, and if it does, it deletes them.

After copying, path-level verification ensures that all expected files are present in the destination bucket.

Example:

Base application files in the base bucket:

s3://base-bucket/standard/ ├── app_site1/ │ ├── config/ │ │ └── config_fed_client.json │ │ └── config_fed_server.json │ └── custom/ │ └── flip.py [and other files] ├── app_site2/ │ ├── config/ │ │ └── config_fed_server.json │ │ └── config_fed_client.json │ └── custom/ │ └── flip.py [and other files]

Model files in the model files bucket:

s3://model-bucket/<model_id>/ ├── trainer.py ├── validator.py ├── config.json └── [other user uploaded files]

Final structure in the destination bucket:

s3://dest-bucket/<model_id>/ ├── app_site1/ │ ├── config/ │ │ └── config_fed_client.json │ │ └── config_fed_server.json │ ├── custom/ │ │ ├── [base application files files] │ │ ├── trainer.py ← copied from model files │ │ ├── validator.py ← copied from model files │ │ └── config.json ← copied from model files │ │ └── [other user uploaded files] ├── app_site2/ │ ├── config/ │ │ └── config_fed_server.json │ │ └── config_fed_client.json │ ├── custom/ │ │ ├── [base application files] │ │ ├── trainer.py ← copied from model files │ │ ├── validator.py ← copied from model files │ │ └── config.json ← copied from model files │ │ └── [other user uploaded files] └── meta.json ← copied only once (not per app)

Parameters:
  • model_id (UUID) – model ID, which will give the name to the app folder.

  • job_type (JobTypes, optional) – type of job (e.g. ‘standard’, ‘evaluation’, etc.). This will cause

  • 'standard'. (a specific base application to be selected. Defaults to)

Raises:
  • EnvironmentError – If the S3 bucket environment variables are not set.

  • FileNotFoundError – If the base or model files are missing.

  • FileNotFoundError – If required files for the job type are missing.

Returns:

The destination bucket S3 path where the bundled application is located.

Return type:

str

flip_api.fl_services.services.fl_service.bundle_flower_application(model_id: uuid.UUID, job_type: flip_api.domain.interfaces.fl.JobTypes = JobTypes.standard) str

Creates the app folder from the base application files and the uploaded files.

It copies the base application files and the model files to the destination bucket. It checks if the destination bucket has any files, and if it does, it deletes them.

Example:

Base application files in the base bucket:

s3://base-bucket/standard/ ├── app/ │ └── server_app.py └── pyproject.toml

Model files in the model files bucket:

s3://model-bucket/<model_id>/ ├── client_app.py ├── models.py ├── config.json └── [other user uploaded files]

Final structure in the destination bucket:

s3://dest-bucket/<model_id>/ ├── app/ │ ├── server_app.py │ ├── client_app.py ← copied from model files │ ├── models.py ← copied from model files │ ├── config.json ← copied from model files │ └── [other user uploaded files] └── pyproject.toml ← copied from base application (not overwritten by model files)

Parameters:
  • model_id (UUID) – model ID, which will give the name to the app folder.

  • job_type (JobTypes, optional) – type of job (e.g. ‘standard’, ‘evaluation’, etc.). This will cause

  • 'standard'. (a specific base application to be selected. Defaults to)

Raises:
  • EnvironmentError – If the S3 bucket environment variables are not set.

  • FileNotFoundError – If the base or model files are missing.

  • FileNotFoundError – If required files for the job type are missing.

Returns:

The destination bucket S3 path where the bundled application is located.

Return type:

str

flip_api.fl_services.services.fl_service.verify_bundle_paths(*, s3: flip_api.utils.s3_client.S3Client, base_files: list[str], model_files: list[str], app_folders: set[str], base_bucket_s3_path: str, model_bucket_s3_path: str, dest_bucket_s3_path: str) None

Verifies that all expected destination keys exist after bundling.

Parameters:
  • s3 (S3Client) – S3 client used to list destination objects.

  • base_files (list[str]) – Keys of the base application files in the source bucket.

  • model_files (list[str]) – Keys of the user-uploaded model files in the source bucket.

  • app_folders (set[str]) – Application subfolder names that model files get mirrored into.

  • base_bucket_s3_path (str) – Root S3 path of the base application bucket.

  • model_bucket_s3_path (str) – Root S3 path of the user model bucket.

  • dest_bucket_s3_path (str) – Root S3 path of the destination bundle bucket.

Raises:

RuntimeError – If any expected destination key is missing from the bundle bucket.

flip_api.fl_services.services.fl_service.get_bundle_urls(s3_path: str) list[str]

Creates pre-signed URLs for the bundle files in S3 (containing the application files and model files) that the FL API will use for training.

Parameters:

s3_path (str) – The S3 path of the bundle to get the URLs for.

Returns:

A list of pre-signed URLs for the bundle files.

Return type:

list[str]

Raises:

ClientError – If there is an error listing objects or generating pre-signed URLs.

flip_api.fl_services.services.fl_service.extract_current_job_data(net_endpoint: str, fl_backend_job_id: str) flip_api.domain.interfaces.fl.IJobMetaData | None

Extract the currently-running FL job matching fl_backend_job_id.

Parameters:
  • net_endpoint (str) – The endpoint of the FL API service.

  • fl_backend_job_id (str) – The FL job ID to look for.

Returns:

The running job’s metadata, or None if no running job

matches fl_backend_job_id (the job is already terminal or never started).

Return type:

IJobMetaData | None

Raises:
  • ValueError – If the FL server response is not a list, or more than one running job shares the same ID.

  • pydantic.ValidationError – If a returned item does not conform to IJobMetaData (e.g. an unknown status from a non-conforming FL-API adapter) — failing loudly here is intentional.

flip_api.fl_services.services.fl_service.abort_model_training(request: fastapi.Request, model_id: uuid.UUID, session: sqlmodel.Session) None

Check if the model is currently running training, and if it is, send an abort request to the FL server.

Parameters:
  • request (Request) – The FastAPI request object

  • model_id (UUID) – The ID of the model to abort

  • session (Session) – SQLModel session object

Raises:

ValueError – If the FL server is not running, or if target is invalid.

flip_api.fl_services.services.fl_service.add_fl_job(model_id: uuid.UUID, clients: list[str], session: sqlmodel.Session) None

Insert a new FL job into the database.

Parameters:
  • model_id (UUID) – The ID of the model for which the FL job is being created.

  • clients (list[str]) – A list of client names associated with the FL job.

  • session (Session) – The SQLModel session to use for the database operation.

Raises:

Exception – If there is an error during the database operation.

flip_api.fl_services.services.fl_service.keep_fl_api_session_alive() None

A periodic function to keep the FL API session alive by making a simple request. This is useful to prevent the session from going idle or being shut down by the server.

TODO This was developed for the NVFLARE backend and might need to be revisited for the Flower backend. See https://github.com/NVIDIA/NVFlare/discussions/3526#discussioncomment-13574644