Skip to content

Read I/O

Writing and loading per-read modification calls in mod-BAM format. For the tag layout see Outputs › read_results.bam.

Writing

write_mod_bam

write_mod_bam(
    hierarchical_results: dict[
        str, "ContigModificationResult"
    ],
    native_bam: PathLike,
    ivt_bam: PathLike,
    ref_fasta: PathLike,
    output_path: PathLike,
) -> Path

Write mod-BAM with MM/ML tags from HMM results + original BAM reads.

Compatibility wrapper: loops :func:flush_contig_to_bam over each contig into a tempdir, then calls :func:merge_contig_bams.

Parameters:

Name Type Description Default
hierarchical_results dict[str, 'ContigModificationResult']

Per-contig HMM pipeline output (contains p_mod_hmm arrays).

required
native_bam PathLike

Path to the original native BAM file.

required
ivt_bam PathLike

Path to the original IVT control BAM file.

required
ref_fasta PathLike

Reference FASTA (used for BAM header).

required
output_path PathLike

Destination path for the output BAM file.

required

Returns:

Type Description
Path

Path to the written (sorted, indexed) BAM file.

Source code in baleen/eventalign/_read_bam.py
def write_mod_bam(
    hierarchical_results: dict[str, "ContigModificationResult"],
    native_bam: PathLike,
    ivt_bam: PathLike,
    ref_fasta: PathLike,
    output_path: PathLike,
) -> Path:
    """Write mod-BAM with MM/ML tags from HMM results + original BAM reads.

    Compatibility wrapper: loops :func:`flush_contig_to_bam` over each
    contig into a tempdir, then calls :func:`merge_contig_bams`.

    Parameters
    ----------
    hierarchical_results
        Per-contig HMM pipeline output (contains p_mod_hmm arrays).
    native_bam
        Path to the original native BAM file.
    ivt_bam
        Path to the original IVT control BAM file.
    ref_fasta
        Reference FASTA (used for BAM header).
    output_path
        Destination path for the output BAM file.

    Returns
    -------
    Path
        Path to the written (sorted, indexed) BAM file.
    """
    out = Path(output_path)
    out.parent.mkdir(parents=True, exist_ok=True)

    if not hierarchical_results:
        logger.warning("No hierarchical results; skipping mod-BAM output")
        return out

    _ensure_bam_indexed(native_bam)
    _ensure_bam_indexed(ivt_bam)

    header = _build_header_from_bam(native_bam, ref_fasta)

    logger.info(
        "Building mod-BAM for %d contigs",
        len(hierarchical_results),
    )

    # Lazy import to avoid a circular dependency (_pipeline imports
    # flush_contig_to_bam from this module).
    from baleen.eventalign._pipeline import _sanitize_contig_filename

    with tempfile.TemporaryDirectory(prefix="baleen-modbam-") as tmp:
        tmp_dir = Path(tmp)
        per_contig_paths: list[Path] = []
        for contig in sorted(hierarchical_results.keys()):
            cmr = hierarchical_results[contig]
            slice_path = tmp_dir / f"{_sanitize_contig_filename(contig)}.bam"
            flush_contig_to_bam(cmr, native_bam, ivt_bam, header, slice_path)
            per_contig_paths.append(slice_path)
        merge_contig_bams(per_contig_paths, out)

    logger.info("Wrote mod-BAM to %s", out)
    return out

flush_contig_to_bam

flush_contig_to_bam(
    cmr: "ContigModificationResult",
    native_bam: PathLike,
    ivt_bam: PathLike,
    header: AlignmentHeader,
    out_path: PathLike,
) -> Path

Write a single contig's read-level mod calls to one coordinate-sorted BAM slice.

Uses fetch(contig) to scan only reads from this contig in the input BAMs — requires the input BAMs to be indexed. Native and IVT reads are written interleaved, then pysam.sort orders the slice by coordinate before the atomic rename to out_path.

Parameters:

Name Type Description Default
cmr 'ContigModificationResult'

ContigModificationResult for one contig.

required
native_bam PathLike

Original input BAM paths. Must be indexed.

required
ivt_bam PathLike

Original input BAM paths. Must be indexed.

required
header AlignmentHeader

Output BAM header (built once via :func:_build_header_from_bam).

required
out_path PathLike

Destination path for the per-contig BAM slice (coordinate-sorted).

required

Returns:

Type Description
Path

The written per-contig BAM path.

Source code in baleen/eventalign/_read_bam.py
def flush_contig_to_bam(
    cmr: "ContigModificationResult",
    native_bam: PathLike,
    ivt_bam: PathLike,
    header: pysam.AlignmentHeader,
    out_path: PathLike,
) -> Path:
    """Write a single contig's read-level mod calls to one coordinate-sorted BAM slice.

    Uses ``fetch(contig)`` to scan only reads from this contig in the input
    BAMs — requires the input BAMs to be indexed.  Native and IVT reads are
    written interleaved, then ``pysam.sort`` orders the slice by coordinate
    before the atomic rename to *out_path*.

    Parameters
    ----------
    cmr
        ``ContigModificationResult`` for one contig.
    native_bam, ivt_bam
        Original input BAM paths.  Must be indexed.
    header
        Output BAM header (built once via :func:`_build_header_from_bam`).
    out_path
        Destination path for the per-contig BAM slice (coordinate-sorted).

    Returns
    -------
    Path
        The written per-contig BAM path.
    """
    out = Path(out_path)
    out.parent.mkdir(parents=True, exist_ok=True)

    contig = cmr.contig

    # Build read_positions for just this one contig.
    read_positions: dict[str, list[tuple[str, int, float, bool]]] = defaultdict(list)
    for pos, ps in cmr.position_stats.items():
        for i, name in enumerate(ps.native_read_names):
            p = float(ps.p_mod_hmm[i])
            if not math.isnan(p):
                read_positions[name].append((contig, pos, p, True))
        for j, name in enumerate(ps.ivt_read_names):
            p = float(ps.p_mod_hmm[ps.n_native + j])
            if not math.isnan(p):
                read_positions[name].append((contig, pos, p, False))

    # Write to an unsorted intermediate (native then IVT reads
    # interleave by coordinate), then sort by position before flushing
    # to *out_path* — keeps per-contig slices internally sorted so the
    # downstream ``samtools merge`` produces a globally sorted output
    # without an expensive whole-file resort.
    unsorted_path = out.with_suffix(out.suffix + ".unsorted.tmp")
    sorted_tmp = out.with_suffix(out.suffix + ".tmp")
    success = False
    try:
        seen_reads: set[str] = set()
        with pysam.AlignmentFile(str(unsorted_path), "wb", header=header) as bam_out:
            for bam_path, is_native, rg_label in [
                (native_bam, True, "native"),
                (ivt_bam, False, "ivt"),
            ]:
                with pysam.AlignmentFile(str(bam_path), "rb") as bam_in:
                    try:
                        read_iter: Iterable[pysam.AlignedSegment] = bam_in.fetch(contig)
                    except (ValueError, KeyError):
                        # Contig not present in this input BAM — nothing to fetch.
                        continue
                    _scan_and_write(
                        read_iter, bam_out, read_positions,
                        is_native, rg_label, seen_reads,
                    )
        # Coordinate-sort the slice (single-threaded; the slice is small
        # enough that parallelism here is mostly overhead).
        pysam.sort("-o", str(sorted_tmp), str(unsorted_path))
        os.replace(sorted_tmp, out)
        success = True
    finally:
        unsorted_path.unlink(missing_ok=True)
        if not success:
            sorted_tmp.unlink(missing_ok=True)

    return out

merge_contig_bams

merge_contig_bams(
    per_contig_bams: list[Path],
    output_path: PathLike,
    threads: int = 4,
    batch_size: int = _MERGE_BATCH_SIZE,
) -> Path

Merge a list of per-contig BAM slices into one sorted, indexed BAM.

Uses samtools merge — each per-contig slice has already been coordinate-sorted by :func:flush_contig_to_bam (which calls pysam.sort internally), so a streaming k-way merge produces a globally sorted output without the cost of a whole-file re-sort.

For large transcriptomes (thousands of contigs), opening every input BAM in one samtools merge call hits filesystem-level limits (file-descriptor limits, NFS handle caps, FUSE quirks). We therefore merge in batches of batch_size into intermediate BAMs in a tempdir, then merge those intermediates — repeating until a single merge call covers the remaining inputs. Output is bit-identical to a flat one-shot merge.

Parameters:

Name Type Description Default
per_contig_bams list[Path]

Sorted (alphabetically by contig name) list of per-contig BAMs.

required
output_path PathLike

Destination path for the merged BAM.

required
threads int

Merge threads (-@ flag).

4
batch_size int

Maximum number of inputs per samtools merge invocation. Each level of the tree opens at most this many files at once.

_MERGE_BATCH_SIZE

Returns:

Type Description
Path

Final BAM path (or output_path if input list is empty).

Source code in baleen/eventalign/_read_bam.py
def merge_contig_bams(
    per_contig_bams: list[Path],
    output_path: PathLike,
    threads: int = 4,
    batch_size: int = _MERGE_BATCH_SIZE,
) -> Path:
    """Merge a list of per-contig BAM slices into one sorted, indexed BAM.

    Uses ``samtools merge`` — each per-contig slice has already been
    coordinate-sorted by :func:`flush_contig_to_bam` (which calls
    ``pysam.sort`` internally), so a streaming k-way merge produces a
    globally sorted output without the cost of a whole-file re-sort.

    For large transcriptomes (thousands of contigs), opening every input
    BAM in one ``samtools merge`` call hits filesystem-level limits
    (file-descriptor limits, NFS handle caps, FUSE quirks).  We therefore
    merge in **batches of ``batch_size``** into intermediate BAMs in a
    tempdir, then merge those intermediates — repeating until a single
    merge call covers the remaining inputs.  Output is bit-identical to
    a flat one-shot merge.

    Parameters
    ----------
    per_contig_bams
        Sorted (alphabetically by contig name) list of per-contig BAMs.
    output_path
        Destination path for the merged BAM.
    threads
        Merge threads (``-@`` flag).
    batch_size
        Maximum number of inputs per ``samtools merge`` invocation.
        Each level of the tree opens at most this many files at once.

    Returns
    -------
    Path
        Final BAM path (or *output_path* if input list is empty).
    """
    out = Path(output_path)
    if not per_contig_bams:
        logger.warning("No per-contig BAMs to merge; %s not written.", out)
        return out

    if batch_size < 2:
        raise ValueError(f"batch_size must be >= 2, got {batch_size}")

    out.parent.mkdir(parents=True, exist_ok=True)
    tmp = out.with_suffix(out.suffix + ".tmp")
    success = False
    cleanup_dir: Path | None = None
    try:
        if len(per_contig_bams) <= batch_size:
            # Fast path: one-shot merge.
            _samtools_merge(per_contig_bams, tmp, threads)
        else:
            # Multi-level merge.  Each level reduces the input count by
            # ~``batch_size``×; for 2924 inputs at batch=100 this is
            # one intermediate level (30 files) + one final merge.
            cleanup_dir = Path(
                tempfile.mkdtemp(prefix="baleen-merge-", dir=out.parent)
            )
            current = list(per_contig_bams)
            level = 0
            while len(current) > batch_size:
                logger.info(
                    "Chunked merge level %d: %d inputs → %d batches "
                    "of %d (size %d)",
                    level,
                    len(current),
                    (len(current) + batch_size - 1) // batch_size,
                    batch_size,
                    batch_size,
                )
                next_level: list[Path] = []
                for batch_idx in range(
                    0, len(current), batch_size
                ):
                    batch = current[batch_idx : batch_idx + batch_size]
                    chunk_out = (
                        cleanup_dir
                        / f"L{level}_B{batch_idx // batch_size:05d}.bam"
                    )
                    _samtools_merge(batch, chunk_out, threads)
                    next_level.append(chunk_out)
                # Drop previous-level intermediates eagerly to keep the
                # tempdir bounded.  Don't unlink the original input
                # ``per_contig_bams`` (level 0); the caller owns those.
                if level > 0:
                    for p in current:
                        p.unlink(missing_ok=True)
                current = next_level
                level += 1
            # Final merge: ``current`` has ≤ batch_size entries.
            logger.info(
                "Chunked merge final level %d: %d inputs",
                level,
                len(current),
            )
            _samtools_merge(current, tmp, threads)

        os.replace(tmp, out)
        pysam.index(str(out))
        success = True
    finally:
        if not success:
            tmp.unlink(missing_ok=True)
            if out.exists():
                out.unlink()
        if cleanup_dir is not None:
            shutil.rmtree(cleanup_dir, ignore_errors=True)

    return out

Loading

load_read_results

load_read_results(
    bam_path: PathLike,
    contig: str | None = None,
    start: int | None = None,
    end: int | None = None,
) -> "pd.DataFrame"

Load read-level results into a DataFrame.

Parameters:

Name Type Description Default
bam_path PathLike

Path to the mod-BAM file.

required
contig str | None

Filter to this contig (optional).

None
start int | None

Filter to this region within contig (0-based, optional).

None
end int | None

Filter to this region within contig (0-based, optional).

None

Returns:

Type Description
DataFrame

Columns: contig, position, read_name, is_native, p_mod_hmm.

Source code in baleen/eventalign/_read_bam.py
def load_read_results(
    bam_path: PathLike,
    contig: str | None = None,
    start: int | None = None,
    end: int | None = None,
) -> "pd.DataFrame":
    """Load read-level results into a DataFrame.

    Parameters
    ----------
    bam_path
        Path to the mod-BAM file.
    contig
        Filter to this contig (optional).
    start, end
        Filter to this region within *contig* (0-based, optional).

    Returns
    -------
    pd.DataFrame
        Columns: contig, position, read_name, is_native, p_mod_hmm.
    """
    records = list(load_read_results_iter(bam_path, contig, start, end))
    if not records:
        return pd.DataFrame(
            columns=["contig", "position", "read_name", "is_native", "p_mod_hmm"],
        )
    return pd.DataFrame.from_records(records)

load_read_results_iter

load_read_results_iter(
    bam_path: PathLike,
    contig: str | None = None,
    start: int | None = None,
    end: int | None = None,
) -> Iterator[dict[str, Any]]

Iterate read-level results as dicts from a mod-BAM file.

Parses MM:Z / ML:B:C tags to reconstruct per-position modification probabilities. Falls back to legacy MP:f tag format if MM is absent.

Parameters:

Name Type Description Default
bam_path PathLike

Path to the mod-BAM file.

required
contig str | None

Optional region filter (0-based coordinates).

None
start str | None

Optional region filter (0-based coordinates).

None
end str | None

Optional region filter (0-based coordinates).

None

Yields:

Type Description
dict

Keys: contig, position, read_name, is_native, p_mod_hmm.

Source code in baleen/eventalign/_read_bam.py
def load_read_results_iter(
    bam_path: PathLike,
    contig: str | None = None,
    start: int | None = None,
    end: int | None = None,
) -> Iterator[dict[str, Any]]:
    """Iterate read-level results as dicts from a mod-BAM file.

    Parses MM:Z / ML:B:C tags to reconstruct per-position modification
    probabilities.  Falls back to legacy MP:f tag format if MM is absent.

    Parameters
    ----------
    bam_path
        Path to the mod-BAM file.
    contig, start, end
        Optional region filter (0-based coordinates).

    Yields
    ------
    dict
        Keys: contig, position, read_name, is_native, p_mod_hmm.
    """
    with pysam.AlignmentFile(str(bam_path), "rb") as bam:
        if contig is not None:
            iterator = bam.fetch(contig, start, end)
        else:
            iterator = bam.fetch()

        for read in iterator:
            if read.is_unmapped:
                continue

            read_name = read.query_name
            ref_name = read.reference_name

            # Determine read group
            try:
                rg = read.get_tag("RG")
            except KeyError:
                rg = "native"
            is_native = rg == "native"

            # Legacy format: single MP:f tag per record (one record per position)
            try:
                mp_tag = read.get_tag("MP")
                # Legacy record: one position per record
                try:
                    kmer = read.get_tag("KM")
                except KeyError:
                    kmer = None
                result = {
                    "contig": ref_name,
                    "position": read.reference_start + 1,
                    "read_name": read_name,
                    "is_native": is_native,
                    "p_mod_hmm": float(mp_tag),
                }
                if kmer is not None:
                    result["kmer"] = kmer
                yield result
                continue
            except KeyError:
                pass

            # Mod-BAM format: MM:Z + ML:B:C tags
            try:
                mm_str = read.get_tag("MM")
                ml_vals = read.get_tag("ML")
            except KeyError:
                continue  # No modification data on this read

            # Parse MM tag to get query positions
            query_positions = _parse_mm_tag(mm_str)
            if len(query_positions) != len(ml_vals):
                logger.warning(
                    "MM/ML length mismatch for read %s: %d vs %d",
                    read_name, len(query_positions), len(ml_vals),
                )
                continue

            # Map query positions back to reference positions
            # aligned_pairs: (query_pos, ref_pos)
            query_to_ref: dict[int, int] = {}
            for qry, ref in read.get_aligned_pairs():
                if qry is not None and ref is not None:
                    query_to_ref[qry] = ref

            for qpos, ml_val in zip(query_positions, ml_vals):
                ref_pos = query_to_ref.get(qpos)
                if ref_pos is None:
                    continue  # insertion position, skip

                genomic_pos = ref_pos + 1  # 0-based -> 1-based
                p_mod = ml_val / 255.0

                # Apply region filter for mod-BAM (the read spans the region
                # but individual positions may be outside it)
                if start is not None and ref_pos < start:
                    continue
                if end is not None and ref_pos >= end:
                    continue

                yield {
                    "contig": ref_name,
                    "position": genomic_pos,
                    "read_name": read_name,
                    "is_native": is_native,
                    "p_mod_hmm": p_mod,
                }