Skip to content

Aggregation

Per-site aggregation of per-read modification probabilities, and TSV writers.

SiteResult dataclass

SiteResult(
    contig: str,
    position: int,
    kmer: str,
    mod_ratio: float,
    ci_low: float,
    ci_high: float,
    pvalue: float,
    padj: float,
    effect_size: float,
    n_native: int,
    n_ivt: int,
    mean_p_mod: float,
    stoichiometry: float,
)

Per-site aggregated modification call.

mod_ratio instance-attribute

mod_ratio: float

MAP estimate of modification stoichiometry (Beta-Binomial).

ci_low instance-attribute

ci_low: float

2.5th percentile of Beta posterior.

ci_high instance-attribute

ci_high: float

97.5th percentile of Beta posterior.

pvalue instance-attribute

pvalue: float

One-sided Fisher's exact test p-value (native > IVT).

padj instance-attribute

padj: float

Benjamini-Hochberg FDR-adjusted p-value.

effect_size instance-attribute

effect_size: float

median(native p_mod_hmm) - median(IVT p_mod_hmm).

mean_p_mod instance-attribute

mean_p_mod: float

Mean of native p_mod_hmm values.

stoichiometry instance-attribute

stoichiometry: float

Fraction of native reads with p_mod_hmm > 0.5.

aggregate_all

aggregate_all(
    results: dict[str, ContigModificationResult],
    *,
    score_field: str = "p_mod_hmm",
    mod_threshold: float = 0.9
) -> list[SiteResult]

Aggregate all contigs and apply per-transcript FDR correction.

Parameters:

Name Type Description Default
results dict[str, ContigModificationResult]

{contig_name: ContigModificationResult}

required
score_field str

Which per-read score to aggregate.

'p_mod_hmm'
mod_threshold float

Per-read probability threshold for counting a read as modified.

0.9

Returns:

Type Description
list[SiteResult]

All sites across all contigs, sorted by contig then position, with padj set via per-transcript Benjamini-Hochberg.

Source code in baleen/eventalign/_aggregation.py
def aggregate_all(
    results: dict[str, ContigModificationResult],
    *,
    score_field: str = "p_mod_hmm",
    mod_threshold: float = 0.9,
) -> list[SiteResult]:
    """Aggregate all contigs and apply per-transcript FDR correction.

    Parameters
    ----------
    results
        ``{contig_name: ContigModificationResult}``
    score_field
        Which per-read score to aggregate.
    mod_threshold
        Per-read probability threshold for counting a read as modified.

    Returns
    -------
    list[SiteResult]
        All sites across all contigs, sorted by contig then position,
        with ``padj`` set via per-transcript Benjamini-Hochberg.
    """
    all_sites: list[SiteResult] = []
    for contig in sorted(results.keys()):
        contig_sites = aggregate_contig(
            results[contig], score_field=score_field,
            mod_threshold=mod_threshold,
        )
        if contig_sites:
            # Apply BH FDR correction per transcript
            pvalues = np.array([s.pvalue for s in contig_sites], dtype=np.float64)
            padj = _benjamini_hochberg(pvalues)
            for site, adj in zip(contig_sites, padj):
                site.padj = float(adj)
            all_sites.extend(contig_sites)

    return all_sites

aggregate_contig

aggregate_contig(
    cmr: ContigModificationResult,
    *,
    score_field: str = "p_mod_hmm",
    mod_threshold: float = 0.9
) -> list[SiteResult]

Aggregate per-read results into site-level calls for one contig.

P-values are not FDR-corrected here; use :func:aggregate_all for multi-contig FDR correction, or apply :func:_benjamini_hochberg manually.

Parameters:

Name Type Description Default
cmr ContigModificationResult

Output of compute_sequential_modification_probabilities.

required
score_field str

Which per-read score to aggregate. Default "p_mod_hmm".

'p_mod_hmm'

Returns:

Type Description
list[SiteResult]

One entry per position, sorted by position. padj is set equal to pvalue (no FDR correction applied).

Source code in baleen/eventalign/_aggregation.py
def aggregate_contig(
    cmr: ContigModificationResult,
    *,
    score_field: str = "p_mod_hmm",
    mod_threshold: float = 0.9,
) -> list[SiteResult]:
    """Aggregate per-read results into site-level calls for one contig.

    P-values are *not* FDR-corrected here; use :func:`aggregate_all`
    for multi-contig FDR correction, or apply :func:`_benjamini_hochberg`
    manually.

    Parameters
    ----------
    cmr
        Output of ``compute_sequential_modification_probabilities``.
    score_field
        Which per-read score to aggregate.  Default ``"p_mod_hmm"``.

    Returns
    -------
    list[SiteResult]
        One entry per position, sorted by position.  ``padj`` is set
        equal to ``pvalue`` (no FDR correction applied).
    """
    results: list[SiteResult] = []

    for pos in sorted(cmr.position_stats.keys()):
        ps = cmr.position_stats[pos]
        scores = getattr(ps, score_field)

        native_scores = scores[: ps.n_native]
        ivt_scores = scores[ps.n_native :]

        # Skip positions with no valid native scores
        valid_native = native_scores[~np.isnan(native_scores)]
        valid_ivt = ivt_scores[~np.isnan(ivt_scores)]
        if len(valid_native) == 0:
            continue

        # Threshold-based aggregation (native reads only)
        mod_ratio, ci_low, ci_high = _threshold_aggregate(valid_native, mod_threshold)

        # Fisher's exact test on binary calls
        pvalue = _fisher_pvalue(valid_native, valid_ivt, mod_threshold)

        # Effect size (NaN if no IVT reads — avoids systematic upward bias)
        if len(valid_ivt) > 0:
            effect_size = float(np.median(valid_native)) - float(np.median(valid_ivt))
        else:
            effect_size = float('nan')

        # Stoichiometry: fraction of native reads confidently modified
        hmm_valid = valid_native
        stoichiometry = float(np.mean(hmm_valid > 0.5)) if len(hmm_valid) > 0 else 0.0

        results.append(
            SiteResult(
                contig=cmr.contig,
                position=pos,
                kmer=ps.reference_kmer,
                mod_ratio=mod_ratio,
                ci_low=ci_low,
                ci_high=ci_high,
                pvalue=pvalue,
                padj=pvalue,  # placeholder; corrected by aggregate_all
                effect_size=effect_size,
                n_native=ps.n_native,
                n_ivt=ps.n_ivt,
                mean_p_mod=float(np.mean(valid_native)),
                stoichiometry=stoichiometry,
            )
        )

    return results

write_site_tsv

write_site_tsv(
    sites: list[SiteResult], path: str | Path
) -> Path

Write site-level results to a TSV file (header + rows).

Parameters:

Name Type Description Default
sites list[SiteResult]

Output of :func:aggregate_all or :func:aggregate_contig.

required
path str | Path

Output file path.

required

Returns:

Type Description
Path

The written file path.

Source code in baleen/eventalign/_aggregation.py
def write_site_tsv(
    sites: list[SiteResult],
    path: str | Path,
) -> Path:
    """Write site-level results to a TSV file (header + rows).

    Parameters
    ----------
    sites
        Output of :func:`aggregate_all` or :func:`aggregate_contig`.
    path
        Output file path.

    Returns
    -------
    Path
        The written file path.
    """
    out = Path(path)
    out.parent.mkdir(parents=True, exist_ok=True)

    with out.open("w", newline="") as f:
        write_site_tsv_header(f)
        write_site_tsv_rows(f, sites)

    logger.info("Wrote %d site results to %s", len(sites), out)
    return out

write_site_tsv_header

write_site_tsv_header(file: IO[str]) -> None

Write the TSV column header (single row) to file.

Source code in baleen/eventalign/_aggregation.py
def write_site_tsv_header(file: IO[str]) -> None:
    """Write the TSV column header (single row) to *file*."""
    writer = csv.writer(file, delimiter="\t")
    writer.writerow(_TSV_COLUMNS)

write_site_tsv_rows

write_site_tsv_rows(
    file: IO[str], sites: list[SiteResult]
) -> None

Write sites as TSV data rows (no header) to file.

Source code in baleen/eventalign/_aggregation.py
def write_site_tsv_rows(file: IO[str], sites: list[SiteResult]) -> None:
    """Write *sites* as TSV data rows (no header) to *file*."""
    writer = csv.writer(file, delimiter="\t")
    for site in sites:
        writer.writerow([
            site.contig,
            site.position,
            site.kmer,
            f"{site.mod_ratio:.6f}",
            f"{site.ci_low:.6f}",
            f"{site.ci_high:.6f}",
            f"{site.pvalue:.6e}",
            f"{site.padj:.6e}",
            f"{site.effect_size:.6f}",
            site.n_native,
            site.n_ivt,
            f"{site.mean_p_mod:.6f}",
            f"{site.stoichiometry:.6f}",
        ])

merge_contig_tsvs

merge_contig_tsvs(
    per_contig_tsvs: list[Path], output_path: str | Path
) -> Path

Concat per-contig TSV slices (rows-only) into one TSV with a header.

The caller is responsible for sorting per_contig_tsvs in the desired output order (typically alphabetic by contig name).

Parameters:

Name Type Description Default
per_contig_tsvs list[Path]

List of per-contig TSV paths. Each file contains data rows only (no header) — produced via :func:write_site_tsv_rows.

required
output_path str | Path

Final TSV path.

required

Returns:

Type Description
Path

The written final TSV path.

Source code in baleen/eventalign/_aggregation.py
def merge_contig_tsvs(
    per_contig_tsvs: list[Path],
    output_path: str | Path,
) -> Path:
    """Concat per-contig TSV slices (rows-only) into one TSV with a header.

    The caller is responsible for sorting *per_contig_tsvs* in the desired
    output order (typically alphabetic by contig name).

    Parameters
    ----------
    per_contig_tsvs
        List of per-contig TSV paths.  Each file contains data rows
        only (no header) — produced via :func:`write_site_tsv_rows`.
    output_path
        Final TSV path.

    Returns
    -------
    Path
        The written final TSV path.
    """
    out = Path(output_path)
    out.parent.mkdir(parents=True, exist_ok=True)
    tmp = out.with_suffix(out.suffix + ".tmp")
    success = False
    try:
        with tmp.open("w", newline="") as fout:
            write_site_tsv_header(fout)
            for p in per_contig_tsvs:
                src = Path(p)
                if not src.exists():
                    continue
                with src.open("r") as fin:
                    shutil.copyfileobj(fin, fout)
        os.replace(tmp, out)
        success = True
    finally:
        if not success:
            tmp.unlink(missing_ok=True)

    return out