Skip to content

aeolus.metrics

Air quality analysis functions, regulatory statistics, and AQI calculations.

Analysis Functions

Time-average air quality data with data capture thresholds.

Resamples hourly (or sub-hourly) data to a coarser time resolution, applying a minimum data capture threshold below which the averaged value is set to NaN. This is the foundation for regulatory statistics.

Parameters:

Name Type Description Default
data DataFrame

DataFrame from aeolus.download() with standard 8-column schema.

required
freq str

Target frequency string (pandas offset alias). Common values: "D" (daily), "8h" (8-hourly), "W" (weekly), "ME" (monthly), "YE" (yearly).

'D'
statistic Literal['mean', 'max', 'min', 'median', 'sum', 'std', 'percentile']

Aggregation function to apply.

'mean'
data_thresh float

Minimum fraction of valid observations required (0-1). Periods below this threshold get value=NaN. Set to 0 to disable thresholding.

0.75
percentile float

Percentile value when statistic="percentile" (0-100).

95.0
pollutants list[str] | None

List of measurand names to include. None = all.

None

Returns:

Type Description
DataFrame

DataFrame with columns: site_code, date_time, measurand, value,

DataFrame

units, source_network, data_capture.

Source code in src/aeolus/metrics/stats.py
def time_average(
    data: pd.DataFrame,
    freq: str = "D",
    statistic: Literal[
        "mean", "max", "min", "median", "sum", "std", "percentile"
    ] = "mean",
    data_thresh: float = 0.75,
    percentile: float = 95.0,
    pollutants: list[str] | None = None,
) -> pd.DataFrame:
    """
    Time-average air quality data with data capture thresholds.

    Resamples hourly (or sub-hourly) data to a coarser time resolution,
    applying a minimum data capture threshold below which the averaged
    value is set to NaN. This is the foundation for regulatory statistics.

    Args:
        data: DataFrame from aeolus.download() with standard 8-column schema.
        freq: Target frequency string (pandas offset alias).
              Common values: "D" (daily), "8h" (8-hourly), "W" (weekly),
              "ME" (monthly), "YE" (yearly).
        statistic: Aggregation function to apply.
        data_thresh: Minimum fraction of valid observations required (0-1).
                     Periods below this threshold get value=NaN.
                     Set to 0 to disable thresholding.
        percentile: Percentile value when statistic="percentile" (0-100).
        pollutants: List of measurand names to include. None = all.

    Returns:
        DataFrame with columns: site_code, date_time, measurand, value,
        units, source_network, data_capture.
    """
    validate_data(data)

    if data.empty:
        return _empty_time_average_df()

    df = data.copy()
    df["date_time"] = pd.to_datetime(df["date_time"])

    if pollutants is not None:
        df = df[df["measurand"].isin(pollutants)]
        if df.empty:
            return _empty_time_average_df()

    results = []

    for (site, measurand), group in df.groupby(
        ["site_code", "measurand"], observed=True
    ):
        g = group.set_index("date_time").sort_index()

        # Infer source data frequency for data capture calculation
        data_freq = _infer_data_frequency(group["date_time"])
        expected = _expected_observations(data_freq, freq)

        values = g["value"]

        # Apply aggregation
        if statistic == "mean":
            agg = values.resample(freq).mean()
        elif statistic == "max":
            agg = values.resample(freq).max()
        elif statistic == "min":
            agg = values.resample(freq).min()
        elif statistic == "median":
            agg = values.resample(freq).median()
        elif statistic == "sum":
            agg = values.resample(freq).sum(min_count=1)
        elif statistic == "std":
            agg = values.resample(freq).std()
        elif statistic == "percentile":
            agg = values.resample(freq).quantile(percentile / 100.0)
        else:
            raise ValueError(f"Unknown statistic: {statistic}")

        counts = values.resample(freq).count()

        # Data capture: fraction of expected observations present
        if expected > 0:
            dc = counts / expected
        else:
            dc = pd.Series(1.0, index=counts.index)

        # Cap data capture at 1.0 (can exceed 1 for variable-length periods)
        dc = dc.clip(upper=1.0)

        # Apply threshold: set value to NaN where data capture is insufficient
        if data_thresh > 0:
            agg = agg.where(dc >= data_thresh)

        # Get representative units and source_network from the group
        units_val = group["units"].iloc[0] if "units" in group.columns else ""
        network_val = (
            group["source_network"].iloc[0]
            if "source_network" in group.columns
            else ""
        )

        period_df = pd.DataFrame(
            {
                "site_code": site,
                "date_time": agg.index,
                "measurand": measurand,
                "value": agg.values,
                "units": units_val,
                "source_network": network_val,
                "data_capture": dc.values,
            }
        )

        results.append(period_df)

    if not results:
        return _empty_time_average_df()

    return pd.concat(results, ignore_index=True)

Calculate annual regulatory air quality statistics.

Produces one row per site/year/pollutant with statistics suitable for LAQM Annual Status Reports: annual mean, maxima, percentiles, data capture, and exceedance counts.

Exceedance columns are pollutant-specific
  • exceedance_hours_200: hours with concentration > 200 ug/m3 (NO2)
  • exceedance_days_50: days with daily mean > 50 ug/m3 (PM10)
  • exceedance_days_120: days where max 8-hour rolling mean > 120 ug/m3 (O3)

Parameters:

Name Type Description Default
data DataFrame

DataFrame from aeolus.download() with standard schema.

required
year int | list[int] | None

Filter to specific year(s). None = all years in data.

None
pollutant str | list[str] | None

Filter to specific pollutant(s). None = all.

None
data_thresh float

Minimum annual data capture for valid stats (0-1). Below this threshold all stats are NaN.

0.75

Returns:

Type Description
DataFrame

DataFrame with one row per site/year/pollutant.

Source code in src/aeolus/metrics/stats.py
def aq_stats(
    data: pd.DataFrame,
    year: int | list[int] | None = None,
    pollutant: str | list[str] | None = None,
    data_thresh: float = 0.75,
) -> pd.DataFrame:
    """
    Calculate annual regulatory air quality statistics.

    Produces one row per site/year/pollutant with statistics suitable for
    LAQM Annual Status Reports: annual mean, maxima, percentiles,
    data capture, and exceedance counts.

    Exceedance columns are pollutant-specific:
        - exceedance_hours_200: hours with concentration > 200 ug/m3 (NO2)
        - exceedance_days_50: days with daily mean > 50 ug/m3 (PM10)
        - exceedance_days_120: days where max 8-hour rolling mean > 120 ug/m3 (O3)

    Args:
        data: DataFrame from aeolus.download() with standard schema.
        year: Filter to specific year(s). None = all years in data.
        pollutant: Filter to specific pollutant(s). None = all.
        data_thresh: Minimum annual data capture for valid stats (0-1).
                     Below this threshold all stats are NaN.

    Returns:
        DataFrame with one row per site/year/pollutant.
    """
    validate_data(data)

    if data.empty:
        return pd.DataFrame(columns=_AQ_STATS_COLUMNS)

    df = data.copy()
    df["date_time"] = pd.to_datetime(df["date_time"])
    df["year"] = df["date_time"].dt.year

    # Apply filters
    if year is not None:
        if not isinstance(year, list):
            year = [int(year)]
        df = df[df["year"].isin(year)]

    if pollutant is not None:
        if isinstance(pollutant, str):
            pollutant = [pollutant]
        df = df[df["measurand"].isin(pollutant)]

    if df.empty:
        return pd.DataFrame(columns=_AQ_STATS_COLUMNS)

    results = []

    for (site, yr, meas), group in df.groupby(
        ["site_code", "year", "measurand"], observed=True
    ):
        g = group.set_index("date_time").sort_index()
        values = g["value"].dropna()

        if values.empty:
            continue

        # Expected hours in this year
        hours_in_year = 8784 if calendar.isleap(yr) else 8760
        dc = len(values) / hours_in_year

        row = {
            "site_code": site,
            "year": yr,
            "pollutant": meas,
            "data_capture": dc,
        }

        # If data capture is below threshold, all stats are NaN
        if dc < data_thresh:
            row.update(
                {
                    "annual_mean": np.nan,
                    "max_hourly": np.nan,
                    "max_daily_mean": np.nan,
                    "max_8h_rolling_mean": np.nan,
                    "p95": np.nan,
                    "p99": np.nan,
                    "exceedance_hours_200": np.nan,
                    "exceedance_days_50": np.nan,
                    "exceedance_days_120": np.nan,
                }
            )
            results.append(row)
            continue

        # Basic stats from hourly values
        row["annual_mean"] = values.mean()
        row["max_hourly"] = values.max()
        row["p95"] = values.quantile(0.95)
        row["p99"] = values.quantile(0.99)

        # Daily means (18/24 hour threshold per day)
        daily = values.resample("D").agg(["mean", "count"])
        daily_valid = daily[daily["count"] >= 18]
        row["max_daily_mean"] = (
            daily_valid["mean"].max() if not daily_valid.empty else np.nan
        )

        # 8-hour rolling mean (6/8 minimum periods)
        rolling_8h = values.rolling("8h", min_periods=6).mean()
        row["max_8h_rolling_mean"] = (
            rolling_8h.max() if not rolling_8h.empty else np.nan
        )

        # Exceedances (pollutant-specific, NaN for irrelevant pollutants)
        meas_upper = meas.upper()

        # NO2: hours > 200 ug/m3
        if meas_upper == "NO2":
            row["exceedance_hours_200"] = int((values > 200).sum())
        else:
            row["exceedance_hours_200"] = np.nan

        # PM10: days with daily mean > 50 ug/m3
        if meas_upper == "PM10":
            if not daily_valid.empty:
                row["exceedance_days_50"] = int(
                    (daily_valid["mean"] > 50).sum()
                )
            else:
                row["exceedance_days_50"] = 0
        else:
            row["exceedance_days_50"] = np.nan

        # O3: days where max rolling 8-hour > 120 ug/m3
        if meas_upper == "O3":
            daily_max_8h = rolling_8h.resample("D").max()
            row["exceedance_days_120"] = int(
                (daily_max_8h > 120).sum()
            )
        else:
            row["exceedance_days_120"] = np.nan

        results.append(row)

    if not results:
        return pd.DataFrame(columns=_AQ_STATS_COLUMNS)

    result_df = pd.DataFrame(results)
    # Ensure column order
    for col in _AQ_STATS_COLUMNS:
        if col not in result_df.columns:
            result_df[col] = np.nan
    return result_df[_AQ_STATS_COLUMNS]

Non-parametric trend analysis using Theil-Sen slope and Mann-Kendall test.

Aggregates data to the requested time resolution, optionally removes the seasonal cycle (STL decomposition via statsmodels), then fits a Theil-Sen robust regression line. Statistical significance is assessed with the Mann-Kendall test.

Parameters:

Name Type Description Default
data DataFrame

DataFrame from aeolus.download() with standard schema.

required
pollutant str

Measurand to analyse (e.g. "NO2", "PM2.5").

required
avg_time Literal['month', 'season', 'year']

Aggregation period before fitting: "month", "season", "year".

'month'
deseason bool

Remove seasonal cycle before fitting (requires statsmodels for avg_time in ("month", "season"); ignored for "year").

True
autocor bool

Apply autocorrelation correction to effective sample size.

False
data_thresh float

Minimum fraction of valid periods required per aggregation window (0-1). Set to 0 to disable.

0.75
ci_level float

Confidence level for slope confidence interval.

0.95

Returns:

Type Description
TrendResult | list[TrendResult]

TrendResult for a single-site dataset, or list[TrendResult] for

TrendResult | list[TrendResult]

multi-site data.

Raises:

Type Description
ValueError

If fewer than 6 data points remain after aggregation, or if the requested pollutant is not in the data.

Source code in src/aeolus/metrics/stats.py
def trend(
    data: pd.DataFrame,
    pollutant: str,
    avg_time: Literal["month", "season", "year"] = "month",
    deseason: bool = True,
    autocor: bool = False,
    data_thresh: float = 0.75,
    ci_level: float = 0.95,
) -> TrendResult | list[TrendResult]:
    """
    Non-parametric trend analysis using Theil-Sen slope and Mann-Kendall test.

    Aggregates data to the requested time resolution, optionally removes the
    seasonal cycle (STL decomposition via statsmodels), then fits a Theil-Sen
    robust regression line. Statistical significance is assessed with the
    Mann-Kendall test.

    Args:
        data: DataFrame from aeolus.download() with standard schema.
        pollutant: Measurand to analyse (e.g. "NO2", "PM2.5").
        avg_time: Aggregation period before fitting: "month", "season", "year".
        deseason: Remove seasonal cycle before fitting (requires statsmodels
                  for avg_time in ("month", "season"); ignored for "year").
        autocor: Apply autocorrelation correction to effective sample size.
        data_thresh: Minimum fraction of valid periods required per
                     aggregation window (0-1). Set to 0 to disable.
        ci_level: Confidence level for slope confidence interval.

    Returns:
        TrendResult for a single-site dataset, or list[TrendResult] for
        multi-site data.

    Raises:
        ValueError: If fewer than 6 data points remain after aggregation,
                    or if the requested pollutant is not in the data.
    """
    from scipy import stats as sp_stats

    validate_data(data)

    df = data[data["measurand"] == pollutant].copy()
    if df.empty:
        raise ValueError(f"No data found for pollutant: {pollutant}")

    df["date_time"] = pd.to_datetime(df["date_time"])

    sites = df["site_code"].unique()
    results = []

    for site in sites:
        site_df = df[df["site_code"] == site].set_index("date_time").sort_index()
        values = site_df["value"]

        # Aggregate to requested time resolution
        freq_map = {"month": "MS", "season": "QS-DEC", "year": "YS"}
        freq = freq_map[avg_time]

        agg_mean = values.resample(freq).mean()
        agg_count = values.resample(freq).count()

        # Infer data frequency for threshold calculation
        data_freq = _infer_data_frequency(site_df.reset_index()["date_time"])
        expected = _expected_observations(data_freq, freq)

        if expected > 0 and data_thresh > 0:
            dc = (agg_count / expected).clip(upper=1.0)
            agg_mean = agg_mean.where(dc >= data_thresh)

        # Drop NaN values
        agg_mean = agg_mean.dropna()

        if len(agg_mean) < 6:
            raise ValueError(
                f"Insufficient data for trend analysis at site {site}: "
                f"{len(agg_mean)} points (need >= 6) after {avg_time} aggregation."
            )

        y = agg_mean.values.copy()

        # Deseasonalisation
        deseasonalised = False
        if deseason and avg_time in ("month", "season"):
            period = 12 if avg_time == "month" else 4
            if len(y) >= 2 * period:
                try:
                    from statsmodels.tsa.seasonal import STL

                    stl = STL(
                        pd.Series(y, index=agg_mean.index),
                        period=period,
                        robust=True,
                    )
                    res = stl.fit()
                    y = res.trend + res.resid
                    deseasonalised = True
                except ImportError:
                    warnings.warn(
                        "statsmodels is not installed. Skipping deseasonalisation. "
                        "Install with: pip install statsmodels",
                        UserWarning,
                        stacklevel=2,
                    )
            else:
                warnings.warn(
                    f"Not enough data for deseasonalisation "
                    f"({len(y)} points, need >= {2 * period}). "
                    f"Skipping deseasonalisation.",
                    UserWarning,
                    stacklevel=2,
                )

        # Convert datetime index to fractional years
        dates = agg_mean.index
        x_years = (
            dates.year
            + (dates.dayofyear - 1) / 365.25
        ).values.astype(float)

        # Theil-Sen slope estimation
        alpha = 1 - ci_level
        ts_result = sp_stats.theilslopes(y, x_years, alpha=alpha)
        slope = ts_result.slope
        intercept = ts_result.intercept
        ci_low = ts_result.low_slope
        ci_high = ts_result.high_slope

        # Mann-Kendall test for significance
        tau, p_value = sp_stats.kendalltau(x_years, y)

        # Autocorrelation correction (effective sample size)
        if autocor and len(y) > 2:
            residuals = y - (slope * x_years + intercept)
            lag1 = np.corrcoef(residuals[:-1], residuals[1:])[0, 1]
            if not np.isnan(lag1) and abs(lag1) < 1:
                n_eff = len(y) * (1 - lag1) / (1 + lag1)
                n_eff = max(n_eff, 3)
                # Scale p-value approximately
                scale_factor = len(y) / n_eff
                p_value = min(1.0, p_value * scale_factor)

        mean_val = float(np.nanmean(y))
        slope_pct = (slope / mean_val * 100) if mean_val != 0 else np.nan

        results.append(
            TrendResult(
                slope=float(slope),
                slope_pct=float(slope_pct),
                intercept=float(intercept),
                ci_lower=float(ci_low),
                ci_upper=float(ci_high),
                p_value=float(p_value),
                n_points=len(y),
                avg_time=avg_time,
                deseasonalised=deseasonalised,
                pollutant=pollutant,
                site_code=str(site),
                mean_value=mean_val,
                first_year=int(dates.year.min()),
                last_year=int(dates.year.max()),
            )
        )

    if len(results) == 1:
        return results[0]
    return results

AQI Functions

Calculate AQI summary statistics for air quality data.

This is the primary function for calculating AQI values from downloaded data. It handles the required averaging periods for each index and provides summary statistics over the requested time periods.

Parameters:

Name Type Description Default
data DataFrame

DataFrame from aeolus.download() with columns: site_code, date_time, measurand, value, units

required
index str

AQI index to use (default: "UK_DAQI") Options: UK_DAQI, US_EPA, CHINA, EU_CAQI_ROADSIDE, EU_CAQI_BACKGROUND, INDIA_NAQI

'UK_DAQI'
freq FreqType

Aggregation frequency: None = entire period (default) "D" = daily "W" = weekly "M" = monthly "Q" = quarterly "Y" = yearly

None
format FormatType

Output format: "long" = one row per site/period/pollutant (default) "wide" = one row per site/period with pollutant columns

'long'
overall_only bool

If True, only return overall AQI (not per-pollutant)

False
warn_low_coverage bool

Warn when data coverage is below min_coverage

True
min_coverage float

Minimum coverage threshold (0-1) for warnings

0.75

Returns:

Type Description
DataFrame

DataFrame with AQI summary statistics.

DataFrame

Long format columns: site_code, period, pollutant, mean, median, p25, p75, min, max, aqi_value, aqi_category, coverage

DataFrame

Wide format columns: site_code, period, {pollutant}_mean, {pollutant}_aqi, ..., overall_aqi, overall_category, dominant_pollutant

DataFrame

overall_only format columns: site_code, period, aqi_value, aqi_category, dominant_pollutant

Example

import aeolus from aeolus import metrics

data = aeolus.download("AURN", ["MY1"], start, end)

Overall summary

summary = metrics.aqi_summary(data, index="UK_DAQI")

Monthly breakdown

monthly = metrics.aqi_summary(data, index="UK_DAQI", freq="M")

Just overall AQI values

simple = metrics.aqi_summary(data, index="UK_DAQI", overall_only=True)

Source code in src/aeolus/metrics/__init__.py
def aqi_summary(
    data: pd.DataFrame,
    index: str = "UK_DAQI",
    freq: FreqType = None,
    format: FormatType = "long",
    overall_only: bool = False,
    warn_low_coverage: bool = True,
    min_coverage: float = 0.75,
) -> pd.DataFrame:
    """
    Calculate AQI summary statistics for air quality data.

    This is the primary function for calculating AQI values from downloaded data.
    It handles the required averaging periods for each index and provides
    summary statistics over the requested time periods.

    Args:
        data: DataFrame from aeolus.download() with columns:
              site_code, date_time, measurand, value, units
        index: AQI index to use (default: "UK_DAQI")
               Options: UK_DAQI, US_EPA, CHINA, EU_CAQI_ROADSIDE,
                        EU_CAQI_BACKGROUND, INDIA_NAQI
        freq: Aggregation frequency:
              None = entire period (default)
              "D" = daily
              "W" = weekly
              "M" = monthly
              "Q" = quarterly
              "Y" = yearly
        format: Output format:
                "long" = one row per site/period/pollutant (default)
                "wide" = one row per site/period with pollutant columns
        overall_only: If True, only return overall AQI (not per-pollutant)
        warn_low_coverage: Warn when data coverage is below min_coverage
        min_coverage: Minimum coverage threshold (0-1) for warnings

    Returns:
        DataFrame with AQI summary statistics.

        Long format columns:
            site_code, period, pollutant, mean, median, p25, p75, min, max,
            aqi_value, aqi_category, coverage

        Wide format columns:
            site_code, period, {pollutant}_mean, {pollutant}_aqi, ...,
            overall_aqi, overall_category, dominant_pollutant

        overall_only format columns:
            site_code, period, aqi_value, aqi_category, dominant_pollutant

    Example:
        >>> import aeolus
        >>> from aeolus import metrics
        >>>
        >>> data = aeolus.download("AURN", ["MY1"], start, end)
        >>>
        >>> # Overall summary
        >>> summary = metrics.aqi_summary(data, index="UK_DAQI")
        >>>
        >>> # Monthly breakdown
        >>> monthly = metrics.aqi_summary(data, index="UK_DAQI", freq="M")
        >>>
        >>> # Just overall AQI values
        >>> simple = metrics.aqi_summary(data, index="UK_DAQI", overall_only=True)
    """
    import warnings

    # Validate input
    validate_data(data)

    # Get index info
    index_info = get_index(index)
    if index_info is None:
        available = list_indices()
        raise ValueError(f"Unknown index '{index}'. Available: {available}")

    # Import the appropriate index module
    index_module = _get_index_module(index)

    # Get available pollutants in data
    available_pollutants = get_available_pollutants(data)
    supported_pollutants = set(index_info["pollutants"])
    usable_pollutants = available_pollutants & supported_pollutants

    if not usable_pollutants:
        raise ValueError(
            f"No supported pollutants found in data. "
            f"Index {index} requires: {supported_pollutants}. "
            f"Data contains: {available_pollutants}"
        )

    # Warn about missing pollutants
    missing = supported_pollutants - available_pollutants
    if missing:
        warnings.warn(
            f"Data is missing pollutants for complete {index} calculation: {missing}. "
            f"AQI will be calculated using available pollutants only.",
            UserWarning,
            stacklevel=2,
        )

    # Prepare data
    df = data.copy()
    df["date_time"] = pd.to_datetime(df["date_time"])
    df["pollutant_std"] = df["measurand"].apply(standardise_pollutant)
    df = df[df["pollutant_std"].isin(usable_pollutants)]

    # Determine period grouping
    if freq is None:
        df["period"] = "all"
    else:
        df["period"] = df["date_time"].dt.to_period(freq).astype(str)

    # Group and calculate statistics
    results = []

    for (site, period, pollutant), group in df.groupby(
        ["site_code", "period", "pollutant_std"], observed=True
    ):
        # Convert units to µg/m³ (vectorized)
        values_ugm3 = ensure_ugm3_array(
            group["value"].values,
            pollutant,
            group["units"],
        )

        # Filter out NaN values
        valid_mask = ~pd.isna(values_ugm3)
        values_series = pd.Series(values_ugm3[valid_mask])

        if len(values_series) == 0:
            continue

        # Calculate statistics
        stats = {
            "site_code": site,
            "period": period,
            "pollutant": pollutant,
            "mean": values_series.mean(),
            "median": values_series.median(),
            "p25": values_series.quantile(0.25),
            "p75": values_series.quantile(0.75),
            "min": values_series.min(),
            "max": values_series.max(),
        }

        # Calculate coverage
        if freq is not None:
            expected_hours = _get_expected_hours(freq)
            stats["coverage"] = len(values_series) / expected_hours
        else:
            # For "all" period, calculate based on date range
            date_range = df["date_time"].max() - df["date_time"].min()
            expected_hours = max(1, date_range.total_seconds() / 3600)
            stats["coverage"] = len(values_series) / expected_hours

        # Calculate AQI using the appropriate statistic for the index
        aqi_result = _calculate_pollutant_aqi(
            index_module,
            index,
            pollutant,
            stats,
        )

        stats["aqi_value"] = aqi_result.value
        stats["aqi_category"] = aqi_result.category

        results.append(stats)

    if not results:
        return pd.DataFrame()

    result_df = pd.DataFrame(results)

    # Check coverage and warn
    if warn_low_coverage and "coverage" in result_df.columns:
        low_coverage = result_df[result_df["coverage"] < min_coverage]
        if not low_coverage.empty:
            periods = low_coverage["period"].unique()
            warnings.warn(
                f"{len(periods)} period(s) have coverage below {min_coverage:.0%}. "
                f"Results may be unreliable. Suppress with warn_low_coverage=False.",
                UserWarning,
                stacklevel=2,
            )

    # Add overall AQI per site/period
    result_df = _add_overall_aqi(result_df)

    # Format output
    if overall_only:
        # Keep only overall rows
        overall_df = result_df[result_df["pollutant"] == "_overall"].copy()
        overall_df = overall_df.rename(columns={"pollutant": "dominant_pollutant"})
        # Get actual dominant pollutant
        overall_df["dominant_pollutant"] = overall_df.apply(
            lambda row: _get_dominant_pollutant(
                result_df, row["site_code"], row["period"]
            ),
            axis=1,
        )
        return overall_df[
            ["site_code", "period", "aqi_value", "aqi_category", "dominant_pollutant"]
        ]

    if format == "wide":
        return _to_wide_format(result_df)

    return result_df

Calculate AQI values as a time series with proper rolling averages.

This function calculates the appropriate rolling averages required by each index (e.g., 8-hour for O3, 24-hour for PM2.5) and returns AQI values for each timestamp where sufficient data is available.

Parameters:

Name Type Description Default
data DataFrame

DataFrame from aeolus.download()

required
index str

AQI index to use (default: "UK_DAQI")

'UK_DAQI'
include_rolling bool

Include rolling average columns in output

True

Returns:

Type Description
DataFrame

DataFrame with columns: site_code, date_time, pollutant, value, rolling_avg, aqi_value, aqi_category

DataFrame

NaN values indicate insufficient data for the rolling calculation.

Example

ts = metrics.aqi_timeseries(data, index="UK_DAQI")

Plot hourly AQI

ts.pivot(index="date_time", columns="pollutant", values="aqi_value").plot()

Source code in src/aeolus/metrics/__init__.py
def aqi_timeseries(
    data: pd.DataFrame,
    index: str = "UK_DAQI",
    include_rolling: bool = True,
) -> pd.DataFrame:
    """
    Calculate AQI values as a time series with proper rolling averages.

    This function calculates the appropriate rolling averages required by each
    index (e.g., 8-hour for O3, 24-hour for PM2.5) and returns AQI values
    for each timestamp where sufficient data is available.

    Args:
        data: DataFrame from aeolus.download()
        index: AQI index to use (default: "UK_DAQI")
        include_rolling: Include rolling average columns in output

    Returns:
        DataFrame with columns:
            site_code, date_time, pollutant, value, rolling_avg,
            aqi_value, aqi_category

        NaN values indicate insufficient data for the rolling calculation.

    Example:
        >>> ts = metrics.aqi_timeseries(data, index="UK_DAQI")
        >>> # Plot hourly AQI
        >>> ts.pivot(index="date_time", columns="pollutant", values="aqi_value").plot()
    """
    validate_data(data)

    index_info = get_index(index)
    if index_info is None:
        raise ValueError(f"Unknown index '{index}'. Available: {list_indices()}")

    index_module = _get_index_module(index)

    df = data.copy()
    df["date_time"] = pd.to_datetime(df["date_time"])
    df["pollutant_std"] = df["measurand"].apply(standardise_pollutant)
    df = df.sort_values(["site_code", "pollutant_std", "date_time"])

    # Check if index module has vectorized calculate_array function
    has_vectorized = hasattr(index_module, "calculate_array")

    result_dfs = []

    for (site, pollutant), group in df.groupby(
        ["site_code", "pollutant_std"], observed=True
    ):
        if pollutant is None:
            continue

        if pollutant not in index_info["pollutants"]:
            continue

        # Get averaging period for this pollutant
        avg_period = _get_averaging_period(index_module, pollutant)
        window_hours = _period_to_hours(avg_period)

        # Set datetime index for rolling
        group = group.set_index("date_time").sort_index()

        # Convert values to µg/m³ (vectorized)
        group["value_ugm3"] = ensure_ugm3_array(
            group["value"].values,
            pollutant,
            group["units"],
        )

        # Calculate rolling average
        group["rolling_avg"] = (
            group["value_ugm3"]
            .rolling(
                window=f"{window_hours}h",
                min_periods=int(window_hours * 0.75),  # 75% coverage
            )
            .mean()
        )

        # Calculate AQI - use vectorized if available
        if has_vectorized:
            # Vectorized calculation (much faster)
            rolling_values = group["rolling_avg"].values
            aqi_values, aqi_categories = index_module.calculate_array(
                rolling_values, pollutant
            )
        else:
            # Fallback to scalar calculation
            aqi_values = []
            aqi_categories = []
            for val in group["rolling_avg"].values:
                if pd.isna(val):
                    aqi_values.append(None)
                    aqi_categories.append(None)
                else:
                    try:
                        result = index_module.calculate(val, pollutant)
                        aqi_values.append(result.value)
                        aqi_categories.append(result.category)
                    except Exception:
                        aqi_values.append(None)
                        aqi_categories.append(None)

        # Build result DataFrame for this group
        group_result = pd.DataFrame(
            {
                "site_code": site,
                "date_time": group.index,
                "pollutant": pollutant,
                "value": group["value_ugm3"].values,
                "aqi_value": aqi_values,
                "aqi_category": aqi_categories,
            }
        )

        if include_rolling:
            group_result["rolling_avg"] = group["rolling_avg"].values

        result_dfs.append(group_result)

    if not result_dfs:
        return pd.DataFrame()

    return pd.concat(result_dfs, ignore_index=True)

Check air quality data against WHO guidelines.

Parameters:

Name Type Description Default
data DataFrame

DataFrame from aeolus.download()

required
target Literal['AQG', 'IT-1', 'IT-2', 'IT-3', 'IT-4']

WHO target level to check against: "AQG" = Air Quality Guideline (strictest, default) "IT-4" = Interim Target 4 "IT-3" = Interim Target 3 "IT-2" = Interim Target 2 "IT-1" = Interim Target 1 (least strict)

'AQG'
averaging_period str | None

Override default averaging period (default varies by pollutant)

None

Returns:

Type Description
DataFrame

DataFrame with columns: site_code, pollutant, mean_concentration, guideline_value, meets_guideline, exceedance_ratio, message

Example

compliance = metrics.aqi_check_who(data) print(compliance[["pollutant", "meets_guideline", "exceedance_ratio"]])

Check against less strict interim target

compliance_it1 = metrics.aqi_check_who(data, target="IT-1")

Source code in src/aeolus/metrics/__init__.py
def aqi_check_who(
    data: pd.DataFrame,
    target: Literal["AQG", "IT-1", "IT-2", "IT-3", "IT-4"] = "AQG",
    averaging_period: str | None = None,
) -> pd.DataFrame:
    """
    Check air quality data against WHO guidelines.

    Args:
        data: DataFrame from aeolus.download()
        target: WHO target level to check against:
                "AQG" = Air Quality Guideline (strictest, default)
                "IT-4" = Interim Target 4
                "IT-3" = Interim Target 3
                "IT-2" = Interim Target 2
                "IT-1" = Interim Target 1 (least strict)
        averaging_period: Override default averaging period
                         (default varies by pollutant)

    Returns:
        DataFrame with columns:
            site_code, pollutant, mean_concentration, guideline_value,
            meets_guideline, exceedance_ratio, message

    Example:
        >>> compliance = metrics.aqi_check_who(data)
        >>> print(compliance[["pollutant", "meets_guideline", "exceedance_ratio"]])
        >>>
        >>> # Check against less strict interim target
        >>> compliance_it1 = metrics.aqi_check_who(data, target="IT-1")
    """
    from .indices import who

    validate_data(data)

    df = data.copy()
    df["date_time"] = pd.to_datetime(df["date_time"])
    df["pollutant_std"] = df["measurand"].apply(standardise_pollutant)

    results = []

    for (site, pollutant), group in df.groupby(
        ["site_code", "pollutant_std"], observed=True
    ):
        if pollutant is None or pollutant not in who.GUIDELINES:
            continue

        # Convert values to proper units
        values = []
        for _, row in group.iterrows():
            if pd.notna(row["value"]):
                # WHO uses µg/m³ (mg/m³ for CO)
                if pollutant == "CO":
                    # Keep as mg/m³ if already in mg/m³
                    if row["units"].lower() in ("mg/m3", "mg/m³"):
                        values.append(row["value"])
                    else:
                        # Convert from µg/m³ to mg/m³
                        values.append(row["value"] / 1000)
                else:
                    val = ensure_ugm3(row["value"], pollutant, row["units"], warn=False)
                    values.append(val)

        if not values:
            continue

        mean_conc = sum(values) / len(values)

        try:
            result = who.check_guideline(
                mean_conc,
                pollutant,
                averaging_period,
                target,
            )

            results.append(
                {
                    "site_code": site,
                    "pollutant": pollutant,
                    "mean_concentration": mean_conc,
                    "guideline_value": result.guideline_value,
                    "meets_guideline": result.meets_guideline,
                    "exceedance_ratio": result.exceedance_ratio,
                    "message": result.message,
                }
            )
        except ValueError:
            # Target not available for this pollutant/period
            continue

    return pd.DataFrame(results)

List all available AQI indices.

Returns:

Type Description
list[str]

List of index keys (e.g., ["UK_DAQI", "US_EPA", "CHINA", ...])

Example

metrics.list_indices() ['UK_DAQI', 'US_EPA', 'CHINA', 'WHO', 'EU_CAQI_ROADSIDE', 'EU_CAQI_BACKGROUND', 'INDIA_NAQI']

Source code in src/aeolus/metrics/__init__.py
def list_indices() -> list[str]:
    """
    List all available AQI indices.

    Returns:
        List of index keys (e.g., ["UK_DAQI", "US_EPA", "CHINA", ...])

    Example:
        >>> metrics.list_indices()
        ['UK_DAQI', 'US_EPA', 'CHINA', 'WHO', 'EU_CAQI_ROADSIDE',
         'EU_CAQI_BACKGROUND', 'INDIA_NAQI']
    """
    return _list_indices()

Get detailed information about an AQI index.

Parameters:

Name Type Description Default
index str

Index key (e.g., "UK_DAQI")

required

Returns:

Type Description
IndexInfo | None

IndexInfo dict with name, country, scale, pollutants, description, url

IndexInfo | None

or None if index not found

Example

info = metrics.get_index_info("UK_DAQI") print(info["name"]) 'UK Daily Air Quality Index' print(info["pollutants"]) ['O3', 'NO2', 'SO2', 'PM2.5', 'PM10']

Source code in src/aeolus/metrics/__init__.py
def get_index_info(index: str) -> IndexInfo | None:
    """
    Get detailed information about an AQI index.

    Args:
        index: Index key (e.g., "UK_DAQI")

    Returns:
        IndexInfo dict with name, country, scale, pollutants, description, url
        or None if index not found

    Example:
        >>> info = metrics.get_index_info("UK_DAQI")
        >>> print(info["name"])
        'UK Daily Air Quality Index'
        >>> print(info["pollutants"])
        ['O3', 'NO2', 'SO2', 'PM2.5', 'PM10']
    """
    return get_index(index)

Supported Indices

Index Country Scale Description
UK_DAQI UK 1-10 UK Daily Air Quality Index
US_EPA USA 0-500 US EPA Air Quality Index (with NowCast)
CHINA China 0-500 China Air Quality Index
EU_CAQI_ROADSIDE EU 1-6 European AQI for traffic stations
EU_CAQI_BACKGROUND EU 1-6 European AQI for background stations
INDIA_NAQI India 0-500 India National Air Quality Index
WHO Global - WHO Air Quality Guidelines checker

Usage Examples

AQI Summary

import aeolus
from aeolus import metrics
from datetime import datetime

# Download data
data = aeolus.download(
    sources="AURN",
    sites=["MY1"],
    start_date=datetime(2024, 1, 1),
    end_date=datetime(2024, 1, 31)
)

# Calculate UK DAQI summary
summary = metrics.aqi_summary(data, index="UK_DAQI")

# Monthly breakdown
monthly = metrics.aqi_summary(data, index="UK_DAQI", freq="M")

# Just overall AQI values
simple = metrics.aqi_summary(data, index="UK_DAQI", overall_only=True)

AQI Time Series

# Get hourly AQI values with rolling averages
ts = metrics.aqi_timeseries(data, index="UK_DAQI")

# Plot
ts.pivot(index="date_time", columns="pollutant", values="aqi_value").plot()

WHO Guideline Compliance

# Check against WHO Air Quality Guidelines
compliance = metrics.aqi_check_who(data)
print(compliance[["pollutant", "meets_guideline", "exceedance_ratio"]])

# Check against less strict interim targets
compliance_it1 = metrics.aqi_check_who(data, target="IT-1")

List Available Indices

# See all available indices
indices = metrics.list_indices()
print(indices)
# ['UK_DAQI', 'US_EPA', 'CHINA', 'WHO', 'EU_CAQI_ROADSIDE', ...]

# Get details about a specific index
info = metrics.get_index_info("UK_DAQI")
print(info["pollutants"])  # ['O3', 'NO2', 'SO2', 'PM2.5', 'PM10']