flip_api.fl_services.services.fl_service ======================================== .. py:module:: flip_api.fl_services.services.fl_service Exceptions ---------- .. autoapisummary:: flip_api.fl_services.services.fl_service.UnknownJobTypeError Functions --------- .. autoapisummary:: flip_api.fl_services.services.fl_service.upload_app flip_api.fl_services.services.fl_service.get_fl_backend_job_id_by_model_id flip_api.fl_services.services.fl_service.add_fl_backend_job_id flip_api.fl_services.services.fl_service.submit_job flip_api.fl_services.services.fl_service.check_server_status flip_api.fl_services.services.fl_service.check_client_status flip_api.fl_services.services.fl_service.fetch_server_status flip_api.fl_services.services.fl_service.fetch_client_status flip_api.fl_services.services.fl_service.is_client_available flip_api.fl_services.services.fl_service.validate_client_availability flip_api.fl_services.services.fl_service.abort_job flip_api.fl_services.services.fl_service.start_training flip_api.fl_services.services.fl_service.bundle_nvflare_application flip_api.fl_services.services.fl_service.bundle_flower_application flip_api.fl_services.services.fl_service.verify_bundle_paths flip_api.fl_services.services.fl_service.get_bundle_urls flip_api.fl_services.services.fl_service.extract_current_job_data flip_api.fl_services.services.fl_service.abort_model_training flip_api.fl_services.services.fl_service.add_fl_job flip_api.fl_services.services.fl_service.keep_fl_api_session_alive Module Contents --------------- .. py:exception:: UnknownJobTypeError Bases: :py:obj:`Exception` Custom exception for unknown job types in FL .. py:function:: upload_app(model_id: uuid.UUID, training_details: flip_api.domain.interfaces.fl.IStartTrainingBody, endpoint: str) -> Any Upload the application to the FL server. It sends a POST request to the FL API service with the model ID and payload containing the project ID, cohort query, local rounds, global rounds, trusts, ignore result error, aggregator, and aggregation weights. :param model_id: The ID of the model to upload. :type model_id: UUID :param training_details: The payload containing the training details. :type training_details: IStartTrainingBody :param endpoint: The endpoint of the net (FL API service). :type endpoint: str :returns: The response from the server after uploading the application. :rtype: Any .. py:function:: get_fl_backend_job_id_by_model_id(model_id: uuid.UUID, session: sqlmodel.Session) -> str Get the FL backend job ID associated with a given model ID :param model_id: The ID of the model :type model_id: UUID :param session: SQLModel session object :type session: Session :returns: The FL backend job ID associated with the model ID :rtype: str :raises ValueError: If the model ID is not found in the database .. py:function:: add_fl_backend_job_id(fl_job_id: uuid.UUID, fl_backend_job_id: str, session: sqlmodel.Session) -> None Add the FL backend job ID to the FLJob entry in the database :param fl_job_id: The ID of the FLJob entry :type fl_job_id: UUID :param fl_backend_job_id: The FL backend job ID to add. Needs to be a string as backend job IDs are strings. :type fl_backend_job_id: str :param session: SQLModel session object :type session: Session :raises ValueError: If the FLJob entry is not found .. py:function:: submit_job(fl_job_id: uuid.UUID, endpoint: str, model_id: uuid.UUID, session: sqlmodel.Session) -> None Submits a job to the FL API that is going to kick off training :param fl_job_id: The ID of the FL job to add the backend job id given successful job submission :type fl_job_id: UUID :param endpoint: The endpoint of the FL API service. :type endpoint: str :param model_id: The ID of the model to start submit the job for. :type model_id: UUID :param session: An instance of the database connection. :type session: Session :raises ValueError: If the backend job ID is not returned in the response. .. py:function:: check_server_status(endpoint: str) -> flip_api.domain.interfaces.fl.IServerStatus | None Fetch the status of the server from the FL API. :param endpoint: The endpoint of the server to check the status from. :type endpoint: str :returns: The server status, or ``None`` when the FL API does not respond. :rtype: IServerStatus | None .. py:function:: check_client_status(endpoint: str) -> list[flip_api.domain.interfaces.fl.IClientStatus] | None Fetch the status of all clients from the FL API. :param endpoint: The endpoint of the server to check the status from. :type endpoint: str :returns: A list of client statuses if available, otherwise None. :rtype: list[IClientStatus] | None .. py:function:: fetch_server_status(endpoint: str) -> flip_api.domain.interfaces.fl.IServerStatus | None Fetch the status of the server from the FL API. :param endpoint: The endpoint of the server to fetch the status from. :type endpoint: str :returns: The server status if available, otherwise None. :rtype: IServerStatus | None .. py:function:: fetch_client_status(endpoint: str) -> list[flip_api.domain.interfaces.fl.IClientStatus] | None Fetch the status of the clients from the FL API. :param endpoint: The endpoint of the server to fetch the status from. :type endpoint: str :returns: A list of client statuses if available, otherwise None. :rtype: list[IClientStatus] | None .. py:function:: is_client_available(client_name: str, client_statuses: list[flip_api.domain.interfaces.fl.IClientStatus]) -> bool Check if a specific client is available based on its status. :param client_name: The name of the client to check. :type client_name: str :param client_statuses: A list of client statuses to check against. :type client_statuses: list[IClientStatus] :returns: True if the client is available, False otherwise. :rtype: bool .. py:function:: validate_client_availability(clients: list[str], endpoint: str, fl_backend: flip_api.domain.schemas.types.FLBackend) -> None Validate the availability of clients by checking their status. It sends a GET request to the FL API service to check the status of the clients. For NVFLARE, raises ValueError if any client is unavailable. For Flower, logs a warning instead — Flower's SuperLink handles client selection at runtime. :param clients: A list of client names to check the availability of. :type clients: list[str] :param endpoint: The endpoint of the FL API service. :type endpoint: str :param fl_backend: The FL backend of the net being validated (``nvflare`` or ``flower``). :type fl_backend: FLBackend :returns: None :raises ValueError: If any client is unavailable (NVFLARE backend only). .. py:function:: abort_job(endpoint: str, job_id: str) -> dict Aborts a job on the FL server. :param endpoint: The endpoint of the FL API service. :type endpoint: str :param job_id: The ID of the job to abort. :type job_id: str :returns: The response from the server after aborting the job. :rtype: dict .. py:function:: start_training(model_id: uuid.UUID, fl_job_id: uuid.UUID, clients: list[str], endpoint: str, bundle_urls: list[str], session: sqlmodel.Session) -> None Start the training process for a given model by uploading the application and submitting the job. It first checks if the clients are available, then it bundles the application files, downloads the configuration, and finally uploads the application and submits the job. :param model_id: The ID of the model to start training for. :type model_id: UUID :param fl_job_id: The ID of the FL job to add the backend job id given successful job submission. :type fl_job_id: UUID :param clients: A list of client names to start training on. :type clients: list[str] :param endpoint: The endpoint of the FL API service. :type endpoint: str :param bundle_urls: A list of URLs for the application bundle. :type bundle_urls: list[str] :param session: An instance of the database connection. :type session: Session :raises ValueError: If the backend job ID is not returned in the response. .. py:function:: bundle_nvflare_application(model_id: uuid.UUID, job_type: str = DEFAULT_JOB_TYPE) -> str Creates the app folder from the base application files and the uploaded files. It copies the base application files and the model files to the destination bucket. It checks if the destination bucket has any files, and if it does, it deletes them. After copying, path-level verification ensures that all expected files are present in the destination bucket. Example: Base application files in the base bucket: s3://base-bucket/standard/ ├── app_site1/ │ ├── config/ │ │ └── config_fed_client.json │ │ └── config_fed_server.json │ └── custom/ │ └── flip.py [and other files] ├── app_site2/ │ ├── config/ │ │ └── config_fed_server.json │ │ └── config_fed_client.json │ └── custom/ │ └── flip.py [and other files] Model files in the model files bucket: s3://model-bucket// ├── trainer.py ├── validator.py ├── config.json └── [other user uploaded files] Final structure in the destination bucket: s3://dest-bucket// ├── app_site1/ │ ├── config/ │ │ └── config_fed_client.json │ │ └── config_fed_server.json │ ├── custom/ │ │ ├── [base application files files] │ │ ├── trainer.py ← copied from model files │ │ ├── validator.py ← copied from model files │ │ └── config.json ← copied from model files │ │ └── [other user uploaded files] ├── app_site2/ │ ├── config/ │ │ └── config_fed_server.json │ │ └── config_fed_client.json │ ├── custom/ │ │ ├── [base application files] │ │ ├── trainer.py ← copied from model files │ │ ├── validator.py ← copied from model files │ │ └── config.json ← copied from model files │ │ └── [other user uploaded files] └── meta.json ← copied only once (not per app) :param model_id: model ID, which will give the name to the app folder. :type model_id: UUID :param job_type: type of job (e.g. 'standard', 'evaluation', etc.). This will cause :type job_type: str, optional :param a specific base application to be selected. Defaults to 'standard'.: :raises EnvironmentError: If the S3 bucket environment variables are not set. :raises FileNotFoundError: If the base or model files are missing. :raises FileNotFoundError: If required files for the job type are missing. :returns: The destination bucket S3 path where the bundled application is located. :rtype: str .. py:function:: bundle_flower_application(model_id: uuid.UUID, job_type: str = DEFAULT_JOB_TYPE) -> str Creates the app folder from the base application files and the uploaded files. It copies the base application files and the model files to the destination bucket. It checks if the destination bucket has any files, and if it does, it deletes them. Example: Base application files in the base bucket: s3://base-bucket/standard/ ├── app/ │ └── server_app.py └── pyproject.toml Model files in the model files bucket: s3://model-bucket// ├── client_app.py ├── models.py ├── config.json └── [other user uploaded files] Final structure in the destination bucket: s3://dest-bucket// ├── app/ │ ├── server_app.py │ ├── client_app.py ← copied from model files │ ├── models.py ← copied from model files │ ├── config.json ← copied from model files │ └── [other user uploaded files] └── pyproject.toml ← copied from base application (not overwritten by model files) :param model_id: model ID, which will give the name to the app folder. :type model_id: UUID :param job_type: type of job (e.g. 'standard', 'evaluation', etc.). This will cause :type job_type: str, optional :param a specific base application to be selected. Defaults to 'standard'.: :raises EnvironmentError: If the S3 bucket environment variables are not set. :raises FileNotFoundError: If the base or model files are missing. :raises FileNotFoundError: If required files for the job type are missing. :returns: The destination bucket S3 path where the bundled application is located. :rtype: str .. py:function:: verify_bundle_paths(*, s3: flip_api.utils.s3_client.S3Client, base_files: list[str], model_files: list[str], app_folders: set[str], base_bucket_s3_path: str, model_bucket_s3_path: str, dest_bucket_s3_path: str) -> None Verifies that all expected destination keys exist after bundling. :param s3: S3 client used to list destination objects. :type s3: S3Client :param base_files: Keys of the base application files in the source bucket. :type base_files: list[str] :param model_files: Keys of the user-uploaded model files in the source bucket. :type model_files: list[str] :param app_folders: Application subfolder names that model files get mirrored into. :type app_folders: set[str] :param base_bucket_s3_path: Root S3 path of the base application bucket. :type base_bucket_s3_path: str :param model_bucket_s3_path: Root S3 path of the user model bucket. :type model_bucket_s3_path: str :param dest_bucket_s3_path: Root S3 path of the destination bundle bucket. :type dest_bucket_s3_path: str :raises RuntimeError: If any expected destination key is missing from the bundle bucket. .. py:function:: get_bundle_urls(s3_path: str) -> list[str] Creates pre-signed URLs for the bundle files in S3 (containing the application files and model files) that the FL API will use for training. :param s3_path: The S3 path of the bundle to get the URLs for. :type s3_path: str :returns: A list of pre-signed URLs for the bundle files. :rtype: list[str] :raises ClientError: If there is an error listing objects or generating pre-signed URLs. .. py:function:: extract_current_job_data(net_endpoint: str, fl_backend_job_id: str) -> flip_api.domain.interfaces.fl.IJobMetaData | None Extract the currently-running FL job matching ``fl_backend_job_id``. :param net_endpoint: The endpoint of the FL API service. :type net_endpoint: str :param fl_backend_job_id: The FL job ID to look for. :type fl_backend_job_id: str :returns: The running job's metadata, or ``None`` if no running job matches ``fl_backend_job_id`` (the job is already terminal or never started). :rtype: IJobMetaData | None :raises ValueError: If the FL server response is not a list, or more than one running job shares the same ID. :raises pydantic.ValidationError: If a returned item does not conform to ``IJobMetaData`` (e.g. an unknown status from a non-conforming FL-API adapter) — failing loudly here is intentional. .. py:function:: abort_model_training(request: fastapi.Request, model_id: uuid.UUID, session: sqlmodel.Session) -> None Check if the model is currently running training, and if it is, send an abort request to the FL server. :param request: The FastAPI request object :type request: Request :param model_id: The ID of the model to abort :type model_id: UUID :param session: SQLModel session object :type session: Session :raises ValueError: If the FL server is not running, or if ``target`` is invalid. .. py:function:: add_fl_job(model_id: uuid.UUID, trusts: list[flip_api.db.models.main_models.Trust], session: sqlmodel.Session) -> None Insert a new FL job into the database with its trust participants. :param model_id: The ID of the model for which the FL job is being created. :type model_id: UUID :param trusts: Trust rows participating in this job. Stored via the `fl_job_trust` link table — the relationship gives `job.trusts` direct access to full Trust ORM rows without a manual id-to-name lookup. :type trusts: list[Trust] :param session: The SQLModel session to use for the database operation. :type session: Session :raises Exception: If there is an error during the database operation. .. py:function:: keep_fl_api_session_alive() -> None A periodic function to keep the FL API session alive by making a simple request. This is useful to prevent the session from going idle or being shut down by the server. TODO This was developed for the NVFLARE backend and might need to be revisited for the Flower backend. See https://github.com/NVIDIA/NVFlare/discussions/3526#discussioncomment-13574644