data_access_api.services.cohort

Attributes

COHORT_QUERY_THRESHOLD

ALLOWED_SCHEMA

MAX_QUERY_LENGTH

_ALLOWED_QUERY_TYPES

Functions

_invalid_query(→ fastapi.HTTPException)

validate_query(→ bool)

Validates that an inbound SQL query is structurally safe to run against OMOP.

get_records(→ pandas.DataFrame)

Executes a SQL query and returns results.

get_counts(→ dict)

Returns counts of non-null values for each column in the DataFrame.

get_null_counts(→ dict)

Returns counts of null values for each column in the DataFrame.

get_sex_distribution(→ dict)

Returns the distribution of sexes in the DataFrame.

get_age_distribution(→ dict)

Returns the distribution of ages in the DataFrame.

verify_cardinality(→ bool)

Verifies that the number of unique values in each column of the DataFrame is not smaller than the threshold.

make_other_category(→ list[dict])

Groups entries in the results list with counts less than min_count into an "Other" category.

get_statistics(...)

Returns aggregated statistics from the query results.

Module Contents

data_access_api.services.cohort.COHORT_QUERY_THRESHOLD = 10
data_access_api.services.cohort.ALLOWED_SCHEMA = 'omop'
data_access_api.services.cohort.MAX_QUERY_LENGTH = 10240
data_access_api.services.cohort._ALLOWED_QUERY_TYPES: tuple[type[sqlglot.exp.Expression], Ellipsis]
data_access_api.services.cohort._invalid_query(detail: str) fastapi.HTTPException
data_access_api.services.cohort.validate_query(query: str) bool

Validates that an inbound SQL query is structurally safe to run against OMOP.

Database-layer protections already in place

The data-access-api connects as data_analyst_reader (see flip-omop-db/files/create_readonly_users.sql), a Postgres role granted only CONNECT + USAGE on schema omop + SELECT on its tables and sequences, with INSERT, UPDATE, DELETE, TRUNCATE, and CREATE explicitly REVOKEd. Any DDL or DML is therefore rejected by Postgres itself, so this function does NOT keyword-filter for DROP / INSERT / UPDATE / etc. — those are already covered at the DB layer.

What this function enforces

Since the read-only role can still issue arbitrary SELECT queries:

  1. The query is shorter than MAX_QUERY_LENGTH (DoS guard).

  2. The query parses as exactly one non-empty statement (defeats query stacking, stray semicolons that bypass the count check, and empty inputs).

  3. The top-level statement is SELECT-shaped (rejects COPY, EXPLAIN, and any DDL/DML the DB would also reject — fail fast at the API).

  4. Any schema-qualified table reference targets only the omop schema (blocks enumeration of information_schema, pg_catalog, pg_class etc., which Postgres makes readable to role public by default).

  5. Every LIMIT and OFFSET is a literal integer (defeats the blind data-extraction technique that abuses LIMIT CASE WHEN <predicate> THEN n ELSE m END to make the row count a function of a single character value, then reads it back via the cohort-size error message).

param query:

The SQL query string from the caller.

returns:

True when the query is structurally safe.

raises HTTPException(400):

When any of the rules above is violated.

data_access_api.services.cohort.get_records(query: str | sqlalchemy.sql.elements.TextClause, params: collections.abc.Mapping[str, Any] | None = None) pandas.DataFrame

Executes a SQL query and returns results.

Parameters:
  • query (str | TextClause) – The SQL query to execute. Pass a TextClause with bind parameters when the query is parameterized.

  • params (Mapping[str, Any] | None) – Optional mapping of bind parameter names to values for parameterized queries.

Returns:

The results of the query as a DataFrame.

Return type:

pd.DataFrame

Raises:

HTTPException – If the query is invalid or if there is an error during execution.

data_access_api.services.cohort.get_counts(df: pandas.DataFrame) dict

Returns counts of non-null values for each column in the DataFrame.

Parameters:

df (pd.DataFrame) – The cohort DataFrame.

Returns:

{"name": "Counts", "results": [{"value": <column>, "count": <int>}, ...]}.

Return type:

dict

data_access_api.services.cohort.get_null_counts(df: pandas.DataFrame) dict

Returns counts of null values for each column in the DataFrame.

Parameters:

df (pd.DataFrame) – The cohort DataFrame.

Returns:

{"name": "Nulls", "results": [{"value": <column>, "count": <int>}, ...]}.

Return type:

dict

data_access_api.services.cohort.get_sex_distribution(df: pandas.DataFrame) dict

Returns the distribution of sexes in the DataFrame.

Assumes the DataFrame has accesion_id, query the table to get the sex distribution.

Parameters:

df (pd.DataFrame) – The cohort DataFrame. Must include a person_id column; otherwise an empty result set is returned.

Returns:

{"name": "Sex Distribution", "results": [{"value": <sex>, "count": <int>}, ...]}.

Return type:

dict

data_access_api.services.cohort.get_age_distribution(df: pandas.DataFrame) dict

Returns the distribution of ages in the DataFrame.

Assumes the DataFrame has accesion_id, query the table to get the age distribution.

Parameters:

df (pd.DataFrame) – The cohort DataFrame. Must include a person_id column; otherwise an empty result set is returned.

Returns:

{"name": "Age Distribution", "results": [{"value": "<decade>", "count": <int>}, ...]} where each value is a ten-year age bucket.

Return type:

dict

data_access_api.services.cohort.verify_cardinality(df: pandas.DataFrame, threshold: float = 0.05) bool

Verifies that the number of unique values in each column of the DataFrame is not smaller than the threshold.

This is to prevent leaking information about individuals in the cohort.

Parameters:
  • df (pd.DataFrame) – The cohort DataFrame to check.

  • threshold (float) – Minimum acceptable proportion of unique values per column. Defaults to 0.05.

Returns:

True if every column has enough unique values (either above COHORT_QUERY_THRESHOLD in absolute terms, or above threshold in relative terms). False if any column falls below both thresholds.

Return type:

bool

data_access_api.services.cohort.make_other_category(results: list[dict], min_count: int = COHORT_QUERY_THRESHOLD) list[dict]

Groups entries in the results list with counts less than min_count into an “Other” category.

Parameters:
  • results (list of dict) – List of dictionaries with ‘value’ and ‘count’ keys.

  • min_count (int) – Minimum count threshold to avoid grouping into “Other”.

Returns:

Updated list with low-count entries grouped into “Other”.

Return type:

list of dict

data_access_api.services.cohort.get_statistics(df: pandas.DataFrame, query_input: data_access_api.routers.schema.CohortQueryInput, threshold: int) data_access_api.routers.schema.StatisticsResponse

Returns aggregated statistics from the query results.

  • Counts the number of records.

  • Aggregates the number of occurrences of each unique value per column.

If the number of records is less than the threshold, an empty response is returned.

Parameters:
  • df (pd.DataFrame) – Query results dataframe.

  • query_input (data_access_api.routers.schema.CohortQueryInput) – Input object containing the query and metadata.

  • threshold (int) – Minimum number of records required to return results.

Returns:

Contains the aggregated statistics.

Return type:

StatisticsResponse

Raises:

HTTPException – If the request cannot be processed.