flip_api.fl_services.services.fl_service
Exceptions
Custom exception for unknown job types in FL |
Functions
|
Upload the application to the FL server. |
Get the FL backend job ID associated with a given model ID |
|
|
Add the FL backend job ID to the FLJob entry in the database |
|
Submits a job to the FL API that is going to kick off training |
|
Fetch the status of the server from the FL API. |
|
Fetch the status of all clients from the FL API. |
|
Fetch the status of the server from the FL API. |
|
Fetch the status of the clients from the FL API. |
|
Check if a specific client is available based on its status. |
|
Validate the availability of clients by checking their status. |
|
Aborts a job on the FL server. |
|
Start the training process for a given model by uploading the application and submitting the job. |
|
Creates the app folder from the base application files and the uploaded files. |
|
Creates the app folder from the base application files and the uploaded files. |
|
Verifies that all expected destination keys exist after bundling. |
|
Creates pre-signed URLs for the bundle files in S3 (containing the application files and model files) that the FL |
Extract the currently-running FL job matching |
|
|
Check if the model is currently running training, and if it is, send an abort request to the FL server. |
|
Insert a new FL job into the database with its trust participants. |
|
A periodic function to keep the FL API session alive by making a simple request. |
Module Contents
- exception flip_api.fl_services.services.fl_service.UnknownJobTypeError
Bases:
ExceptionCustom exception for unknown job types in FL
- flip_api.fl_services.services.fl_service.upload_app(model_id: uuid.UUID, training_details: flip_api.domain.interfaces.fl.IStartTrainingBody, endpoint: str) Any
Upload the application to the FL server.
It sends a POST request to the FL API service with the model ID and payload containing the project ID, cohort query, local rounds, global rounds, trusts, ignore result error, aggregator, and aggregation weights.
- Parameters:
model_id (UUID) – The ID of the model to upload.
training_details (IStartTrainingBody) – The payload containing the training details.
endpoint (str) – The endpoint of the net (FL API service).
- Returns:
The response from the server after uploading the application.
- Return type:
Any
- flip_api.fl_services.services.fl_service.get_fl_backend_job_id_by_model_id(model_id: uuid.UUID, session: sqlmodel.Session) str
Get the FL backend job ID associated with a given model ID
- Parameters:
model_id (UUID) – The ID of the model
session (Session) – SQLModel session object
- Returns:
The FL backend job ID associated with the model ID
- Return type:
str
- Raises:
ValueError – If the model ID is not found in the database
- flip_api.fl_services.services.fl_service.add_fl_backend_job_id(fl_job_id: uuid.UUID, fl_backend_job_id: str, session: sqlmodel.Session) None
Add the FL backend job ID to the FLJob entry in the database
- Parameters:
fl_job_id (UUID) – The ID of the FLJob entry
fl_backend_job_id (str) – The FL backend job ID to add. Needs to be a string as backend job IDs are strings.
session (Session) – SQLModel session object
- Raises:
ValueError – If the FLJob entry is not found
- flip_api.fl_services.services.fl_service.submit_job(fl_job_id: uuid.UUID, endpoint: str, model_id: uuid.UUID, session: sqlmodel.Session) None
Submits a job to the FL API that is going to kick off training
- Parameters:
fl_job_id (UUID) – The ID of the FL job to add the backend job id given successful job submission
endpoint (str) – The endpoint of the FL API service.
model_id (UUID) – The ID of the model to start submit the job for.
session (Session) – An instance of the database connection.
- Raises:
ValueError – If the backend job ID is not returned in the response.
- flip_api.fl_services.services.fl_service.check_server_status(endpoint: str) flip_api.domain.interfaces.fl.IServerStatus | None
Fetch the status of the server from the FL API.
- Parameters:
endpoint (str) – The endpoint of the server to check the status from.
- Returns:
The server status, or
Nonewhen the FL API does not respond.- Return type:
IServerStatus | None
- flip_api.fl_services.services.fl_service.check_client_status(endpoint: str) list[flip_api.domain.interfaces.fl.IClientStatus] | None
Fetch the status of all clients from the FL API.
- Parameters:
endpoint (str) – The endpoint of the server to check the status from.
- Returns:
A list of client statuses if available, otherwise None.
- Return type:
list[IClientStatus] | None
- flip_api.fl_services.services.fl_service.fetch_server_status(endpoint: str) flip_api.domain.interfaces.fl.IServerStatus | None
Fetch the status of the server from the FL API.
- Parameters:
endpoint (str) – The endpoint of the server to fetch the status from.
- Returns:
The server status if available, otherwise None.
- Return type:
IServerStatus | None
- flip_api.fl_services.services.fl_service.fetch_client_status(endpoint: str) list[flip_api.domain.interfaces.fl.IClientStatus] | None
Fetch the status of the clients from the FL API.
- Parameters:
endpoint (str) – The endpoint of the server to fetch the status from.
- Returns:
A list of client statuses if available, otherwise None.
- Return type:
list[IClientStatus] | None
- flip_api.fl_services.services.fl_service.is_client_available(client_name: str, client_statuses: list[flip_api.domain.interfaces.fl.IClientStatus]) bool
Check if a specific client is available based on its status.
- Parameters:
client_name (str) – The name of the client to check.
client_statuses (list[IClientStatus]) – A list of client statuses to check against.
- Returns:
True if the client is available, False otherwise.
- Return type:
bool
- flip_api.fl_services.services.fl_service.validate_client_availability(clients: list[str], endpoint: str, fl_backend: flip_api.domain.schemas.types.FLBackend) None
Validate the availability of clients by checking their status. It sends a GET request to the FL API service to check the status of the clients. For NVFLARE, raises ValueError if any client is unavailable. For Flower, logs a warning instead — Flower’s SuperLink handles client selection at runtime.
- Parameters:
clients (list[str]) – A list of client names to check the availability of.
endpoint (str) – The endpoint of the FL API service.
fl_backend (FLBackend) – The FL backend of the net being validated (
nvflareorflower).
- Returns:
None
- Raises:
ValueError – If any client is unavailable (NVFLARE backend only).
- flip_api.fl_services.services.fl_service.abort_job(endpoint: str, job_id: str) dict
Aborts a job on the FL server.
- Parameters:
endpoint (str) – The endpoint of the FL API service.
job_id (str) – The ID of the job to abort.
- Returns:
The response from the server after aborting the job.
- Return type:
dict
- flip_api.fl_services.services.fl_service.start_training(model_id: uuid.UUID, fl_job_id: uuid.UUID, clients: list[str], endpoint: str, bundle_urls: list[str], session: sqlmodel.Session) None
Start the training process for a given model by uploading the application and submitting the job. It first checks if the clients are available, then it bundles the application files, downloads the configuration, and finally uploads the application and submits the job.
- Parameters:
model_id (UUID) – The ID of the model to start training for.
fl_job_id (UUID) – The ID of the FL job to add the backend job id given successful job submission.
clients (list[str]) – A list of client names to start training on.
endpoint (str) – The endpoint of the FL API service.
bundle_urls (list[str]) – A list of URLs for the application bundle.
session (Session) – An instance of the database connection.
- Raises:
ValueError – If the backend job ID is not returned in the response.
- flip_api.fl_services.services.fl_service.bundle_nvflare_application(model_id: uuid.UUID, job_type: str = DEFAULT_JOB_TYPE) str
Creates the app folder from the base application files and the uploaded files.
It copies the base application files and the model files to the destination bucket. It checks if the destination bucket has any files, and if it does, it deletes them.
After copying, path-level verification ensures that all expected files are present in the destination bucket.
Example:
Base application files in the base bucket:
s3://base-bucket/standard/ ├── app_site1/ │ ├── config/ │ │ └── config_fed_client.json │ │ └── config_fed_server.json │ └── custom/ │ └── flip.py [and other files] ├── app_site2/ │ ├── config/ │ │ └── config_fed_server.json │ │ └── config_fed_client.json │ └── custom/ │ └── flip.py [and other files]
Model files in the model files bucket:
s3://model-bucket/<model_id>/ ├── trainer.py ├── validator.py ├── config.json └── [other user uploaded files]
Final structure in the destination bucket:
s3://dest-bucket/<model_id>/ ├── app_site1/ │ ├── config/ │ │ └── config_fed_client.json │ │ └── config_fed_server.json │ ├── custom/ │ │ ├── [base application files files] │ │ ├── trainer.py ← copied from model files │ │ ├── validator.py ← copied from model files │ │ └── config.json ← copied from model files │ │ └── [other user uploaded files] ├── app_site2/ │ ├── config/ │ │ └── config_fed_server.json │ │ └── config_fed_client.json │ ├── custom/ │ │ ├── [base application files] │ │ ├── trainer.py ← copied from model files │ │ ├── validator.py ← copied from model files │ │ └── config.json ← copied from model files │ │ └── [other user uploaded files] └── meta.json ← copied only once (not per app)
- Parameters:
model_id (UUID) – model ID, which will give the name to the app folder.
job_type (str, optional) – type of job (e.g. ‘standard’, ‘evaluation’, etc.). This will cause
'standard'. (a specific base application to be selected. Defaults to)
- Raises:
EnvironmentError – If the S3 bucket environment variables are not set.
FileNotFoundError – If the base or model files are missing.
FileNotFoundError – If required files for the job type are missing.
- Returns:
The destination bucket S3 path where the bundled application is located.
- Return type:
str
- flip_api.fl_services.services.fl_service.bundle_flower_application(model_id: uuid.UUID, job_type: str = DEFAULT_JOB_TYPE) str
Creates the app folder from the base application files and the uploaded files.
It copies the base application files and the model files to the destination bucket. It checks if the destination bucket has any files, and if it does, it deletes them.
Example:
Base application files in the base bucket:
s3://base-bucket/standard/ ├── app/ │ └── server_app.py └── pyproject.toml
Model files in the model files bucket:
s3://model-bucket/<model_id>/ ├── client_app.py ├── models.py ├── config.json └── [other user uploaded files]
Final structure in the destination bucket:
s3://dest-bucket/<model_id>/ ├── app/ │ ├── server_app.py │ ├── client_app.py ← copied from model files │ ├── models.py ← copied from model files │ ├── config.json ← copied from model files │ └── [other user uploaded files] └── pyproject.toml ← copied from base application (not overwritten by model files)
- Parameters:
model_id (UUID) – model ID, which will give the name to the app folder.
job_type (str, optional) – type of job (e.g. ‘standard’, ‘evaluation’, etc.). This will cause
'standard'. (a specific base application to be selected. Defaults to)
- Raises:
EnvironmentError – If the S3 bucket environment variables are not set.
FileNotFoundError – If the base or model files are missing.
FileNotFoundError – If required files for the job type are missing.
- Returns:
The destination bucket S3 path where the bundled application is located.
- Return type:
str
- flip_api.fl_services.services.fl_service.verify_bundle_paths(*, s3: flip_api.utils.s3_client.S3Client, base_files: list[str], model_files: list[str], app_folders: set[str], base_bucket_s3_path: str, model_bucket_s3_path: str, dest_bucket_s3_path: str) None
Verifies that all expected destination keys exist after bundling.
- Parameters:
s3 (S3Client) – S3 client used to list destination objects.
base_files (list[str]) – Keys of the base application files in the source bucket.
model_files (list[str]) – Keys of the user-uploaded model files in the source bucket.
app_folders (set[str]) – Application subfolder names that model files get mirrored into.
base_bucket_s3_path (str) – Root S3 path of the base application bucket.
model_bucket_s3_path (str) – Root S3 path of the user model bucket.
dest_bucket_s3_path (str) – Root S3 path of the destination bundle bucket.
- Raises:
RuntimeError – If any expected destination key is missing from the bundle bucket.
- flip_api.fl_services.services.fl_service.get_bundle_urls(s3_path: str) list[str]
Creates pre-signed URLs for the bundle files in S3 (containing the application files and model files) that the FL API will use for training.
- Parameters:
s3_path (str) – The S3 path of the bundle to get the URLs for.
- Returns:
A list of pre-signed URLs for the bundle files.
- Return type:
list[str]
- Raises:
ClientError – If there is an error listing objects or generating pre-signed URLs.
- flip_api.fl_services.services.fl_service.extract_current_job_data(net_endpoint: str, fl_backend_job_id: str) flip_api.domain.interfaces.fl.IJobMetaData | None
Extract the currently-running FL job matching
fl_backend_job_id.- Parameters:
net_endpoint (str) – The endpoint of the FL API service.
fl_backend_job_id (str) – The FL job ID to look for.
- Returns:
- The running job’s metadata, or
Noneif no running job matches
fl_backend_job_id(the job is already terminal or never started).
- The running job’s metadata, or
- Return type:
IJobMetaData | None
- Raises:
ValueError – If the FL server response is not a list, or more than one running job shares the same ID.
pydantic.ValidationError – If a returned item does not conform to
IJobMetaData(e.g. an unknown status from a non-conforming FL-API adapter) — failing loudly here is intentional.
- flip_api.fl_services.services.fl_service.abort_model_training(request: fastapi.Request, model_id: uuid.UUID, session: sqlmodel.Session) None
Check if the model is currently running training, and if it is, send an abort request to the FL server.
- Parameters:
request (Request) – The FastAPI request object
model_id (UUID) – The ID of the model to abort
session (Session) – SQLModel session object
- Raises:
ValueError – If the FL server is not running, or if
targetis invalid.
- flip_api.fl_services.services.fl_service.add_fl_job(model_id: uuid.UUID, trusts: list[flip_api.db.models.main_models.Trust], session: sqlmodel.Session) None
Insert a new FL job into the database with its trust participants.
- Parameters:
model_id (UUID) – The ID of the model for which the FL job is being created.
trusts (list[Trust]) – Trust rows participating in this job. Stored via the fl_job_trust link table — the relationship gives job.trusts direct access to full Trust ORM rows without a manual id-to-name lookup.
session (Session) – The SQLModel session to use for the database operation.
- Raises:
Exception – If there is an error during the database operation.
- flip_api.fl_services.services.fl_service.keep_fl_api_session_alive() None
A periodic function to keep the FL API session alive by making a simple request. This is useful to prevent the session from going idle or being shut down by the server.
TODO This was developed for the NVFLARE backend and might need to be revisited for the Flower backend. See https://github.com/NVIDIA/NVFlare/discussions/3526#discussioncomment-13574644