data_access_api.services.cohort =============================== .. py:module:: data_access_api.services.cohort Attributes ---------- .. autoapisummary:: data_access_api.services.cohort.COHORT_QUERY_THRESHOLD data_access_api.services.cohort.ALLOWED_SCHEMA data_access_api.services.cohort.MAX_QUERY_LENGTH data_access_api.services.cohort._ALLOWED_QUERY_TYPES Functions --------- .. autoapisummary:: data_access_api.services.cohort._invalid_query data_access_api.services.cohort.validate_query data_access_api.services.cohort.get_records data_access_api.services.cohort.get_counts data_access_api.services.cohort.get_null_counts data_access_api.services.cohort.get_sex_distribution data_access_api.services.cohort.get_age_distribution data_access_api.services.cohort.verify_cardinality data_access_api.services.cohort.make_other_category data_access_api.services.cohort.get_statistics Module Contents --------------- .. py:data:: COHORT_QUERY_THRESHOLD :value: 10 .. py:data:: ALLOWED_SCHEMA :value: 'omop' .. py:data:: MAX_QUERY_LENGTH :value: 10240 .. py:data:: _ALLOWED_QUERY_TYPES :type: tuple[type[sqlglot.exp.Expression], Ellipsis] .. py:function:: _invalid_query(detail: str) -> fastapi.HTTPException .. py:function:: validate_query(query: str) -> bool Validates that an inbound SQL query is structurally safe to run against OMOP. Database-layer protections already in place ------------------------------------------- The data-access-api connects as ``data_analyst_reader`` (see ``flip-omop-db/files/create_readonly_users.sql``), a Postgres role granted only ``CONNECT`` + ``USAGE`` on schema ``omop`` + ``SELECT`` on its tables and sequences, with ``INSERT``, ``UPDATE``, ``DELETE``, ``TRUNCATE``, and ``CREATE`` explicitly REVOKEd. Any DDL or DML is therefore rejected by Postgres itself, so this function does NOT keyword-filter for ``DROP`` / ``INSERT`` / ``UPDATE`` / etc. — those are already covered at the DB layer. What this function enforces --------------------------- Since the read-only role can still issue arbitrary ``SELECT`` queries: 1. The query is shorter than ``MAX_QUERY_LENGTH`` (DoS guard). 2. The query parses as exactly one non-empty statement (defeats query stacking, stray semicolons that bypass the count check, and empty inputs). 3. The top-level statement is SELECT-shaped (rejects ``COPY``, ``EXPLAIN``, and any DDL/DML the DB would also reject — fail fast at the API). 4. Any schema-qualified table reference targets only the ``omop`` schema (blocks enumeration of ``information_schema``, ``pg_catalog``, ``pg_class`` etc., which Postgres makes readable to role ``public`` by default). 5. Every ``LIMIT`` and ``OFFSET`` is a literal integer (defeats the blind data-extraction technique that abuses ``LIMIT CASE WHEN THEN n ELSE m END`` to make the row count a function of a single character value, then reads it back via the cohort-size error message). :param query: The SQL query string from the caller. :returns: ``True`` when the query is structurally safe. :raises HTTPException(400): When any of the rules above is violated. .. py:function:: get_records(query: str | sqlalchemy.sql.elements.TextClause, params: collections.abc.Mapping[str, Any] | None = None) -> pandas.DataFrame Executes a SQL query and returns results. :param query: The SQL query to execute. Pass a ``TextClause`` with bind parameters when the query is parameterized. :type query: str | TextClause :param params: Optional mapping of bind parameter names to values for parameterized queries. :type params: Mapping[str, Any] | None :returns: The results of the query as a DataFrame. :rtype: pd.DataFrame :raises HTTPException: If the query is invalid or if there is an error during execution. .. py:function:: get_counts(df: pandas.DataFrame) -> dict Returns counts of non-null values for each column in the DataFrame. :param df: The cohort DataFrame. :type df: pd.DataFrame :returns: ``{"name": "Counts", "results": [{"value": , "count": }, ...]}``. :rtype: dict .. py:function:: get_null_counts(df: pandas.DataFrame) -> dict Returns counts of null values for each column in the DataFrame. :param df: The cohort DataFrame. :type df: pd.DataFrame :returns: ``{"name": "Nulls", "results": [{"value": , "count": }, ...]}``. :rtype: dict .. py:function:: get_sex_distribution(df: pandas.DataFrame) -> dict Returns the distribution of sexes in the DataFrame. Assumes the DataFrame has accesion_id, query the table to get the sex distribution. :param df: The cohort DataFrame. Must include a ``person_id`` column; otherwise an empty result set is returned. :type df: pd.DataFrame :returns: ``{"name": "Sex Distribution", "results": [{"value": , "count": }, ...]}``. :rtype: dict .. py:function:: get_age_distribution(df: pandas.DataFrame) -> dict Returns the distribution of ages in the DataFrame. Assumes the DataFrame has accesion_id, query the table to get the age distribution. :param df: The cohort DataFrame. Must include a ``person_id`` column; otherwise an empty result set is returned. :type df: pd.DataFrame :returns: ``{"name": "Age Distribution", "results": [{"value": "", "count": }, ...]}`` where each value is a ten-year age bucket. :rtype: dict .. py:function:: verify_cardinality(df: pandas.DataFrame, threshold: float = 0.05) -> bool Verifies that the number of unique values in each column of the DataFrame is not smaller than the threshold. This is to prevent leaking information about individuals in the cohort. :param df: The cohort DataFrame to check. :type df: pd.DataFrame :param threshold: Minimum acceptable proportion of unique values per column. Defaults to ``0.05``. :type threshold: float :returns: ``True`` if every column has enough unique values (either above ``COHORT_QUERY_THRESHOLD`` in absolute terms, or above ``threshold`` in relative terms). ``False`` if any column falls below both thresholds. :rtype: bool .. py:function:: make_other_category(results: list[dict], min_count: int = COHORT_QUERY_THRESHOLD) -> list[dict] Groups entries in the results list with counts less than min_count into an "Other" category. :param results: List of dictionaries with 'value' and 'count' keys. :type results: list of dict :param min_count: Minimum count threshold to avoid grouping into "Other". :type min_count: int :returns: Updated list with low-count entries grouped into "Other". :rtype: list of dict .. py:function:: get_statistics(df: pandas.DataFrame, query_input: data_access_api.routers.schema.CohortQueryInput, threshold: int) -> data_access_api.routers.schema.StatisticsResponse Returns aggregated statistics from the query results. - Counts the number of records. - Aggregates the number of occurrences of each unique value per column. Below-threshold counts are privacy-suppressed by returning a ``StatisticsResponse`` with ``record_count=0``, empty ``data`` and ``suppressed=True`` — the count itself is suppressed, not just the per-field breakdown. A genuine zero is suppressed identically to a small (1..threshold-1) count, so the two are indistinguishable on the wire and the response cannot be used to infer that >=1 patient matched (membership disclosure — issue #519, security review). The ``suppressed`` flag only tells the hub/UI to render a "below-threshold" chip instead of a bare 0; it does not reveal which 0s were genuine. Suppression is intentional rather than an HTTPException so the trust still has a normal response to forward to the hub; raising here previously caused trust-api to skip the hub callback and leave the per-trust UI status stuck. :param df: Query results dataframe. :type df: pd.DataFrame :param query_input: Input object containing the query and metadata. :type query_input: data_access_api.routers.schema.CohortQueryInput :param threshold: Minimum number of records required to return results. :type threshold: int :returns: Contains the aggregated statistics, or a 0-count empty response when below ``COHORT_QUERY_THRESHOLD``. :rtype: StatisticsResponse