Skip to content

Modification Probabilities

Per-read modification scoring. Three algorithms share an EM-based calibration; compute_sequential_modification_probabilities drives the hierarchical V1→V2→V3 pipeline.

compute_sequential_modification_probabilities

compute_sequential_modification_probabilities(
    contig_result: ContigResult,
    *,
    shrinkage_window: int = 15,
    kappa_high: float = 0.5,
    kappa_medium: float = 2.0,
    kappa_low: float = 5.0,
    mixture_max_iter: int = 100,
    mixture_pi_threshold: float = 0.05,
    mixture_separation: float = 0.8,
    hmm_min_positions: int = 3,
    hmm_p_stay_per_base: float = 0.92,
    run_hmm: bool = True,
    hmm_params: HMMParams | None = None,
    emission_source: str = "p_mod_knn",
    consistent_fallback: bool = True,
    knn_weighted: bool = False,
    legacy_scoring: bool = False,
    show_progress: bool = True
) -> ContigModificationResult

Run the full V1→V2→V3 hierarchical pipeline on a contig.

Parameters:

Name Type Description Default
contig_result ContigResult

Output of the eventalign pipeline for one contig.

required
shrinkage_window int

Half-window in positions (not bases) for local shrinkage.

15
kappa_high float

Shrinkage strengths for high/medium/low coverage positions.

0.5
kappa_medium float

Shrinkage strengths for high/medium/low coverage positions.

0.5
kappa_low float

Shrinkage strengths for high/medium/low coverage positions.

0.5
mixture_max_iter int

Max EM iterations for V2 mixture.

100
mixture_pi_threshold float

Null-gate threshold on mixing proportion.

0.05
mixture_separation float

Minimum effect-size separation for V2 gate.

0.8
hmm_min_positions int

Minimum trajectory length for HMM.

3
hmm_p_stay_per_base float

Per-base state persistence for HMM transitions.

0.92
run_hmm bool

If False, skip V3 entirely.

True
hmm_params HMMParams | None

Trained HMM parameters. When provided, overrides hmm_p_stay_per_base and uses learned emission transforms and initial state probabilities.

None
emission_source str

Which per-read score field to use as HMM emissions. Also gates whether V1 (empirical-Bayes null + shrinkage) and V2 (anchored mixture EM) actually run:

  • "p_mod_knn" (default): run only kNN + Beta-EM → HMM. V1/V2 are skipped entirely; their fields on PositionStats are populated with zero/default placeholders.
  • "p_mod_raw": run V1 → V2 → HMM (V2 posteriors are the HMM emissions; kNN is still computed and stored).
'p_mod_knn'
consistent_fallback bool

If True (default, Fix A), set the short-trajectory fallback to emission_source instead of V2 p_mod_raw. Set to False to reproduce old behavior.

True
knn_weighted bool

If True (Fix B), use distance-weighted kNN scoring instead of unweighted counting.

False

Returns:

Type Description
ContigModificationResult
Source code in baleen/eventalign/_hierarchical.py
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
def compute_sequential_modification_probabilities(
    contig_result: ContigResult,
    *,
    shrinkage_window: int = 15,
    kappa_high: float = 0.5,
    kappa_medium: float = 2.0,
    kappa_low: float = 5.0,
    mixture_max_iter: int = 100,
    mixture_pi_threshold: float = 0.05,
    mixture_separation: float = 0.8,
    hmm_min_positions: int = 3,
    hmm_p_stay_per_base: float = 0.92,
    run_hmm: bool = True,
    hmm_params: HMMParams | None = None,
    emission_source: str = "p_mod_knn",
    consistent_fallback: bool = True,
    knn_weighted: bool = False,
    legacy_scoring: bool = False,
    show_progress: bool = True,
) -> ContigModificationResult:
    """Run the full V1→V2→V3 hierarchical pipeline on a contig.

    Parameters
    ----------
    contig_result : ContigResult
        Output of the eventalign pipeline for one contig.
    shrinkage_window : int
        Half-window in positions (not bases) for local shrinkage.
    kappa_high, kappa_medium, kappa_low : float
        Shrinkage strengths for high/medium/low coverage positions.
    mixture_max_iter : int
        Max EM iterations for V2 mixture.
    mixture_pi_threshold : float
        Null-gate threshold on mixing proportion.
    mixture_separation : float
        Minimum effect-size separation for V2 gate.
    hmm_min_positions : int
        Minimum trajectory length for HMM.
    hmm_p_stay_per_base : float
        Per-base state persistence for HMM transitions.
    run_hmm : bool
        If False, skip V3 entirely.
    hmm_params : HMMParams | None
        Trained HMM parameters.  When provided, overrides
        ``hmm_p_stay_per_base`` and uses learned emission transforms
        and initial state probabilities.
    emission_source : str
        Which per-read score field to use as HMM emissions. Also gates
        whether V1 (empirical-Bayes null + shrinkage) and V2 (anchored
        mixture EM) actually run:

        - ``"p_mod_knn"`` (default): run only kNN + Beta-EM → HMM.
          V1/V2 are skipped entirely; their fields on ``PositionStats``
          are populated with zero/default placeholders.
        - ``"p_mod_raw"``: run V1 → V2 → HMM (V2 posteriors are the
          HMM emissions; kNN is still computed and stored).
    consistent_fallback : bool
        If True (default, Fix A), set the short-trajectory fallback to
        *emission_source* instead of V2 ``p_mod_raw``.  Set to False
        to reproduce old behavior.
    knn_weighted : bool
        If True (Fix B), use distance-weighted kNN scoring instead of
        unweighted counting.

    Returns
    -------
    ContigModificationResult
    """
    sorted_positions = sorted(contig_result.positions.keys())

    if len(sorted_positions) == 0:
        return ContigModificationResult(
            contig=contig_result.contig,
            position_stats={},
            native_trajectories=[],
            ivt_trajectories=[],
            global_mu=0.0,
            global_sigma=1.0,
        )

    # V1/V2 only run when explicitly needed (p_mod_raw emission source).
    # Default pipeline uses p_mod_knn and skips V1/V2 entirely.
    needs_v1_v2 = (emission_source == "p_mod_raw")

    contig_short = contig_result.contig
    if len(contig_short) > 20:
        contig_short = contig_short[:17] + "..."
    n_pos = len(sorted_positions)

    # Progress bar: 3 passes (V1a + V2b + kNN) when V1/V2 active, else
    # kNN only.
    _n_stages = 3 if needs_v1_v2 else 1
    _mod_pbar = tqdm(
        total=n_pos * _n_stages,
        desc=f"  {contig_short} mod-calling",
        unit="step",
        leave=False,
        disable=not show_progress,
        bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}] {postfix}",
    )

    position_stats: dict[int, PositionStats] = {}

    if needs_v1_v2:
        # ── V1a: Extract scores and fit robust IVT null per position ──
        raw_params: dict[int, tuple[float, float]] = {}
        coverages: dict[int, int] = {}
        all_scores: dict[int, NDArray[np.float64]] = {}

        _mod_pbar.set_postfix_str("V1: null fitting")
        for pos in sorted_positions:
            pr = contig_result.positions[pos]
            scores = _extract_ivt_distances(
                pr.distance_matrix, pr.n_native_reads, pr.n_ivt_reads
            )
            all_scores[pos] = scores

            ivt_scores = scores[pr.n_native_reads:]
            mu_raw, sigma_raw = _fit_robust_ivt_null(ivt_scores)
            raw_params[pos] = (mu_raw, sigma_raw)
            coverages[pos] = pr.n_ivt_reads
            _mod_pbar.update(1)

        # ── V1b: Hierarchical shrinkage ───────────────────────────────
        shrunk_params = _shrink_parameters(
            sorted_positions,
            raw_params,
            coverages,
            window=shrinkage_window,
            kappa_high=kappa_high,
            kappa_medium=kappa_medium,
            kappa_low=kappa_low,
        )

        # Global IVT prior (for reporting)
        all_mus = [raw_params[p][0] for p in sorted_positions if coverages[p] >= 1]
        global_mu = float(np.median(all_mus)) if all_mus else 0.0
        all_sigmas = [raw_params[p][1] for p in sorted_positions if coverages[p] >= 1]
        global_sigma = float(np.median(all_sigmas)) if all_sigmas else 1.0

        # ── V1c: Z-scores and p-values ────────────────────────────────
        for pos in sorted_positions:
            pr = contig_result.positions[pos]
            scores = all_scores[pos]
            mu_s, sigma_s = shrunk_params[pos]
            mu_r, sigma_r = raw_params[pos]

            z = (scores - mu_s) / max(sigma_s, _MIN_SIGMA)
            # One-sided: P(Z ≥ z) — high z means far from IVT null
            p_null_vals = 1.0 - _norm_dist.cdf(z)

            n_total = pr.n_native_reads + pr.n_ivt_reads
            position_stats[pos] = PositionStats(
                position=pos,
                reference_kmer=pr.reference_kmer,
                coverage_class=_classify_coverage(pr.n_ivt_reads),
                n_ivt=pr.n_ivt_reads,
                n_native=pr.n_native_reads,
                native_read_names=pr.native_read_names,
                ivt_read_names=pr.ivt_read_names,
                mu_raw=mu_r,
                sigma_raw=sigma_r,
                mu_shrunk=mu_s,
                sigma_shrunk=sigma_s,
                scores=scores,
                z_scores=z,
                p_null=np.asarray(p_null_vals, dtype=np.float64),
                p_mod_raw=np.zeros(n_total, dtype=np.float64),
                mixture_pi=0.0,
                mixture_null_gate=True,
                gate_weight=0.0,
                p_mod_knn=np.full(n_total, np.nan, dtype=np.float64),
                p_mod_hmm=np.full(n_total, np.nan, dtype=np.float64),
            )

        # ── V2a: Contig-pooled EM for global alternative parameters ──
        global_mu1, global_sigma1 = _contig_pooled_mixture_em(
            position_stats, sorted_positions,
        )

        # ── V2b: Per-position anchored mixture with soft gating ──────
        _mod_pbar.set_postfix_str("V2: mixture EM")
        for pos in sorted_positions:
            ps = position_stats[pos]
            z_native = ps.z_scores[: ps.n_native]
            z_ivt = ps.z_scores[ps.n_native :]

            p_mod_all, pi, null_gate, gw = _anchored_mixture_em(
                z_native,
                z_ivt,
                ps.z_scores,
                max_iter=mixture_max_iter,
                pi_threshold=mixture_pi_threshold,
                separation_threshold=mixture_separation,
                global_mu1=global_mu1,
                global_sigma1=global_sigma1,
                legacy_scoring=legacy_scoring,
            )
            ps.p_mod_raw[:] = p_mod_all
            ps.mixture_pi = pi
            ps.mixture_null_gate = null_gate
            ps.gate_weight = gw

            # Default HMM to V2 result (will be overwritten if HMM runs)
            ps.p_mod_hmm[:] = p_mod_all
            _mod_pbar.update(1)
    else:
        # V1/V2 skipped — construct PositionStats with zero/default
        # fields so downstream code and consumers that still touch them
        # (e.g. legacy ablation tooling) see a well-formed object.
        global_mu, global_sigma = 0.0, 1.0
        for pos in sorted_positions:
            pr = contig_result.positions[pos]
            n_total = pr.n_native_reads + pr.n_ivt_reads
            position_stats[pos] = PositionStats(
                position=pos,
                reference_kmer=pr.reference_kmer,
                coverage_class=_classify_coverage(pr.n_ivt_reads),
                n_ivt=pr.n_ivt_reads,
                n_native=pr.n_native_reads,
                native_read_names=pr.native_read_names,
                ivt_read_names=pr.ivt_read_names,
                mu_raw=0.0,
                sigma_raw=1.0,
                mu_shrunk=0.0,
                sigma_shrunk=1.0,
                scores=np.zeros(n_total, dtype=np.float64),
                z_scores=np.zeros(n_total, dtype=np.float64),
                p_null=np.ones(n_total, dtype=np.float64),
                p_mod_raw=np.zeros(n_total, dtype=np.float64),
                mixture_pi=0.0,
                mixture_null_gate=True,
                gate_weight=0.0,
                p_mod_knn=np.full(n_total, np.nan, dtype=np.float64),
                p_mod_hmm=np.full(n_total, np.nan, dtype=np.float64),
            )

    # ── kNN IVT-purity scores ─────────────────────────────────────────────

    from baleen.eventalign._probability import _score_knn_ivt_purity, _calibrate_beta

    _mod_pbar.set_postfix_str("kNN scoring")

    for pos in sorted_positions:
        pr = contig_result.positions[pos]
        ps = position_stats[pos]
        n_total = pr.n_native_reads + pr.n_ivt_reads

        if n_total < 3:
            _mod_pbar.update(1)
            continue

        raw_knn = _score_knn_ivt_purity(
            pr.distance_matrix, pr.n_native_reads, pr.n_ivt_reads,
            weighted=knn_weighted,
        )

        ivt_mask = np.zeros(n_total, dtype=bool)
        ivt_mask[pr.n_native_reads:] = True

        cal = _calibrate_beta(raw_knn, ivt_mask, ~ivt_mask)
        ps.p_mod_knn[:] = cal.probabilities
        _mod_pbar.update(1)

    # Fix A: update short-trajectory fallback to match emission_source.
    # Without this, short-trajectory reads keep V2 p_mod_raw while long
    # trajectories are scored against kNN — a calibration mismatch.
    if consistent_fallback:
        for pos in sorted_positions:
            ps = position_stats[pos]
            ps.p_mod_hmm[:] = getattr(ps, emission_source)[:]

    _mod_pbar.set_postfix_str("V3: HMM smoothing")
    _mod_pbar.close()

    # ── V3: HMM smoothing ────────────────────────────────────────────────

    native_trajs, ivt_trajs = _build_read_trajectories(
        contig_result, sorted_positions,
    )

    if run_hmm:
        # Default to 2-state unsupervised HMM when no params provided.
        # The 2-state model (Unmodified/Modified) is more robust for
        # detecting isolated modifications than the 3-state model, which
        # requires transitions through a Flank state and uses Beta PDF
        # emissions that collapse at boundary values.
        effective_hmm_params = hmm_params
        if effective_hmm_params is None:
            from baleen.eventalign._hmm_training import create_unsupervised_params
            effective_hmm_params = create_unsupervised_params(n_states=2)

        _run_hmm_on_trajectories(
            native_trajs,
            position_stats,
            min_positions=hmm_min_positions,
            p_stay_per_base=hmm_p_stay_per_base,
            hmm_params=effective_hmm_params,
            emission_source=emission_source,
        )
        _run_hmm_on_trajectories(
            ivt_trajs,
            position_stats,
            min_positions=hmm_min_positions,
            p_stay_per_base=hmm_p_stay_per_base,
            hmm_params=effective_hmm_params,
            emission_source=emission_source,
        )

    return ContigModificationResult(
        contig=contig_result.contig,
        position_stats=position_stats,
        native_trajectories=native_trajs,
        ivt_trajectories=ivt_trajs,
        global_mu=global_mu,
        global_sigma=global_sigma,
    )

compute_modification_probabilities

compute_modification_probabilities(
    distance_matrix: NDArray[float64],
    n_native: int,
    n_ivt: int,
    position: int = -1,
    *,
    algorithms: Optional[Sequence[AlgorithmName]] = None,
    knn_k: Optional[int] = None,
    mds_components: int = 2,
    max_iter: int = 100,
    pi_threshold: float = 0.05
) -> dict[AlgorithmName, ModificationProbabilities]

Run one or more algorithms on a distance matrix.

Parameters:

Name Type Description Default
distance_matrix shape(n_native + n_ivt, n_native + n_ivt)

Symmetric DTW distance matrix. Rows/columns ordered: native reads first, then IVT reads.

required
n_native int

Number of native and IVT reads.

required
n_ivt int

Number of native and IVT reads.

required
position int

Genomic position (for labelling only).

-1
algorithms sequence of AlgorithmName

Which algorithms to run. Defaults to all three.

None
knn_k int

k for kNN algorithm. None = auto.

None
mds_components int

Embedding dimensionality for MDS+GMM.

2
max_iter int

Maximum EM iterations.

100
pi_threshold float

Null-gate threshold on mixing proportion.

0.05

Returns:

Type Description
dict mapping AlgorithmName → ModificationProbabilities
Source code in baleen/eventalign/_probability.py
def compute_modification_probabilities(
    distance_matrix: NDArray[np.float64],
    n_native: int,
    n_ivt: int,
    position: int = -1,
    *,
    algorithms: Optional[Sequence[AlgorithmName]] = None,
    knn_k: Optional[int] = None,
    mds_components: int = 2,
    max_iter: int = 100,
    pi_threshold: float = 0.05,
) -> dict[AlgorithmName, ModificationProbabilities]:
    """Run one or more algorithms on a distance matrix.

    Parameters
    ----------
    distance_matrix : shape (n_native + n_ivt, n_native + n_ivt)
        Symmetric DTW distance matrix.  Rows/columns ordered:
        native reads first, then IVT reads.
    n_native, n_ivt : int
        Number of native and IVT reads.
    position : int
        Genomic position (for labelling only).
    algorithms : sequence of AlgorithmName, optional
        Which algorithms to run.  Defaults to all three.
    knn_k : int, optional
        k for kNN algorithm.  ``None`` = auto.
    mds_components : int
        Embedding dimensionality for MDS+GMM.
    max_iter : int
        Maximum EM iterations.
    pi_threshold : float
        Null-gate threshold on mixing proportion.

    Returns
    -------
    dict mapping AlgorithmName → ModificationProbabilities
    """
    if algorithms is None:
        algorithms = ALL_ALGORITHMS

    n_total = n_native + n_ivt
    if distance_matrix.shape != (n_total, n_total):
        raise ValueError(
            f"distance_matrix shape {distance_matrix.shape} does not match "
            f"n_native={n_native} + n_ivt={n_ivt} = {n_total}"
        )

    results: dict[AlgorithmName, ModificationProbabilities] = {}

    for alg in algorithms:
        if alg == AlgorithmName.DISTANCE_TO_IVT:
            mp = distance_to_ivt(
                distance_matrix, n_native, n_ivt,
                max_iter=max_iter, pi_threshold=pi_threshold,
            )
        elif alg == AlgorithmName.KNN_IVT_PURITY:
            mp = knn_ivt_purity(
                distance_matrix, n_native, n_ivt,
                k=knn_k, max_iter=max_iter, pi_threshold=pi_threshold,
            )
        elif alg == AlgorithmName.MDS_GMM:
            mp = mds_gmm(
                distance_matrix, n_native, n_ivt,
                n_components=mds_components, max_iter=max_iter,
                pi_threshold=pi_threshold,
            )
        else:
            raise ValueError(f"Unknown algorithm: {alg}")

        mp.position = position
        results[alg] = mp

    return results

ModificationProbabilities dataclass

ModificationProbabilities(
    algorithm: AlgorithmName,
    position: int,
    probabilities: NDArray[float64],
    n_native: int,
    n_ivt: int,
    scores: NDArray[float64],
    null_gate_active: bool,
    mixing_proportion: float,
)

Per-read modification probabilities from a single algorithm.

probabilities instance-attribute

probabilities: NDArray[float64]

Shape (n_native + n_ivt,) — native reads first, then IVT.

scores instance-attribute

scores: NDArray[float64]

Raw scalar scores (before calibration). Same ordering as probabilities.

null_gate_active instance-attribute

null_gate_active: bool

True if the position was classified as unmodified (all probs set to 0).

mixing_proportion instance-attribute

mixing_proportion: float

Fitted mixing proportion π for the alternative component.

AlgorithmName

Bases: str, Enum

Scoring algorithms

distance_to_ivt

distance_to_ivt(
    distance_matrix: NDArray[float64],
    n_native: int,
    n_ivt: int,
    *,
    max_iter: int = 100,
    pi_threshold: float = 0.05
) -> ModificationProbabilities

Algorithm 1: Robust Distance-to-IVT Baseline.

Each read is scored by its median DTW distance to IVT controls, then calibrated via a Normal null + Normal alternative EM mixture.

Source code in baleen/eventalign/_probability.py
def distance_to_ivt(
    distance_matrix: NDArray[np.float64],
    n_native: int,
    n_ivt: int,
    *,
    max_iter: int = 100,
    pi_threshold: float = 0.05,
) -> ModificationProbabilities:
    """Algorithm 1: Robust Distance-to-IVT Baseline.

    Each read is scored by its median DTW distance to IVT controls,
    then calibrated via a Normal null + Normal alternative EM mixture.
    """
    n_total = n_native + n_ivt
    scores = _score_distance_to_ivt(distance_matrix, n_native, n_ivt)

    ivt_mask = np.zeros(n_total, dtype=bool)
    ivt_mask[n_native:] = True
    native_mask = ~ivt_mask

    cal = _calibrate_normal(scores, ivt_mask, native_mask,
                            max_iter=max_iter, pi_threshold=pi_threshold)

    return ModificationProbabilities(
        algorithm=AlgorithmName.DISTANCE_TO_IVT,
        position=-1,  # caller fills in
        probabilities=cal.probabilities,
        n_native=n_native,
        n_ivt=n_ivt,
        scores=scores,
        null_gate_active=cal.null_gate_active,
        mixing_proportion=cal.pi,
    )

knn_ivt_purity

knn_ivt_purity(
    distance_matrix: NDArray[float64],
    n_native: int,
    n_ivt: int,
    *,
    k: Optional[int] = None,
    lof_weight: float = 0.3,
    max_iter: int = 100,
    pi_threshold: float = 0.05
) -> ModificationProbabilities

Algorithm 3: kNN IVT-Purity Score with LOF blending.

Each read is scored by how few of its k nearest neighbors are IVT controls, blended with a Local Outlier Factor (LOF) anomaly score, then calibrated via a Beta null + Beta alternative EM mixture.

Parameters:

Name Type Description Default
lof_weight float

Weight for LOF score in the blend (0 = pure kNN, 1 = pure LOF). Default 0.3 gives 70% kNN purity + 30% LOF anomaly signal.

0.3
Source code in baleen/eventalign/_probability.py
def knn_ivt_purity(
    distance_matrix: NDArray[np.float64],
    n_native: int,
    n_ivt: int,
    *,
    k: Optional[int] = None,
    lof_weight: float = 0.3,
    max_iter: int = 100,
    pi_threshold: float = 0.05,
) -> ModificationProbabilities:
    """Algorithm 3: kNN IVT-Purity Score with LOF blending.

    Each read is scored by how few of its k nearest neighbors are IVT
    controls, blended with a Local Outlier Factor (LOF) anomaly score,
    then calibrated via a Beta null + Beta alternative EM mixture.

    Parameters
    ----------
    lof_weight : float
        Weight for LOF score in the blend (0 = pure kNN, 1 = pure LOF).
        Default 0.3 gives 70% kNN purity + 30% LOF anomaly signal.
    """
    n_total = n_native + n_ivt
    knn_scores = _score_knn_ivt_purity(distance_matrix, n_native, n_ivt, k=k)

    # Blend with LOF anomaly score for density-aware detection
    if lof_weight > 0:
        lof_scores = _score_lof(distance_matrix, n_native, n_ivt, k=k)
        scores = (1.0 - lof_weight) * knn_scores + lof_weight * lof_scores
        # Clip to [0, 1] for Beta calibration
        scores = np.clip(scores, _EPS, 1.0 - _EPS)
    else:
        scores = knn_scores

    ivt_mask = np.zeros(n_total, dtype=bool)
    ivt_mask[n_native:] = True
    native_mask = ~ivt_mask

    cal = _calibrate_beta(scores, ivt_mask, native_mask,
                          max_iter=max_iter, pi_threshold=pi_threshold)

    return ModificationProbabilities(
        algorithm=AlgorithmName.KNN_IVT_PURITY,
        position=-1,
        probabilities=cal.probabilities,
        n_native=n_native,
        n_ivt=n_ivt,
        scores=scores,
        null_gate_active=cal.null_gate_active,
        mixing_proportion=cal.pi,
    )

mds_gmm

mds_gmm(
    distance_matrix: NDArray[float64],
    n_native: int,
    n_ivt: int,
    *,
    n_components: int = 2,
    max_iter: int = 100,
    pi_threshold: float = 0.05
) -> ModificationProbabilities

Algorithm 5: MDS + Anchored Gaussian Mixture.

Embed the full distance matrix into low-dimensional space via classical MDS, then fit an IVT-anchored null Gaussian and a native alternative Gaussian using EM.

Source code in baleen/eventalign/_probability.py
def mds_gmm(
    distance_matrix: NDArray[np.float64],
    n_native: int,
    n_ivt: int,
    *,
    n_components: int = 2,
    max_iter: int = 100,
    pi_threshold: float = 0.05,
) -> ModificationProbabilities:
    """Algorithm 5: MDS + Anchored Gaussian Mixture.

    Embed the full distance matrix into low-dimensional space via classical
    MDS, then fit an IVT-anchored null Gaussian and a native alternative
    Gaussian using EM.
    """
    n_total = n_native + n_ivt

    # Embed
    coords = _classical_mds(distance_matrix, n_components=n_components)

    ivt_coords = coords[n_native:]
    native_coords = coords[:n_native]

    # 1D score for the output (Euclidean distance to IVT centroid)
    ivt_centroid = np.mean(ivt_coords, axis=0) if n_ivt > 0 else np.zeros(n_components)
    scores_all = np.sqrt(np.sum((coords - ivt_centroid) ** 2, axis=1))

    if n_ivt < 2 or n_native < 2:
        return ModificationProbabilities(
            algorithm=AlgorithmName.MDS_GMM,
            position=-1,
            probabilities=np.zeros(n_total, dtype=np.float64),
            n_native=n_native,
            n_ivt=n_ivt,
            scores=scores_all,
            null_gate_active=True,
            mixing_proportion=0.0,
        )

    # Fit null from IVT
    mu0, sigma0 = _fit_multivariate_normal(ivt_coords)

    # Initialise alternative from native
    mu1, sigma1 = _fit_multivariate_normal(native_coords)
    pi = 0.1

    tol = 1e-6
    for _ in range(max_iter):
        f0 = _mvn_pdf(native_coords, mu0, sigma0)
        f1 = _mvn_pdf(native_coords, mu1, sigma1)
        denom = (1.0 - pi) * f0 + pi * f1 + _EPS
        r = (pi * f1) / denom

        pi_new = float(np.mean(r))
        r_sum = float(np.sum(r)) + _EPS
        mu1_new = np.sum(r[:, np.newaxis] * native_coords, axis=0) / r_sum

        diff = native_coords - mu1_new
        sigma1_new = (diff * r[:, np.newaxis]).T @ diff / r_sum
        sigma1_new += np.eye(n_components) * _MIN_SIGMA  # regularise

        if abs(pi_new - pi) < tol and np.max(np.abs(mu1_new - mu1)) < tol:
            pi, mu1, sigma1 = pi_new, mu1_new, sigma1_new
            break
        pi, mu1, sigma1 = pi_new, mu1_new, sigma1_new

    # BIC gate
    n = len(native_coords)
    d = n_components
    f0_n = _mvn_pdf(native_coords, mu0, sigma0)
    ll_null = float(np.sum(np.log(f0_n + _EPS)))
    f1_n = _mvn_pdf(native_coords, mu1, sigma1)
    ll_mix = float(np.sum(np.log((1 - pi) * f0_n + pi * f1_n + _EPS)))
    k_null = d + d * (d + 1) // 2
    k_mix = 1 + d + d * (d + 1) // 2
    bic_null = -2 * ll_null + k_null * math.log(max(n, 1))
    bic_mix = -2 * ll_mix + k_mix * math.log(max(n, 1))

    centroid_sep = float(np.sqrt(np.sum((mu1 - mu0) ** 2)))
    null_scale = float(np.sqrt(np.trace(sigma0) / d))
    separation = centroid_sep / max(null_scale, _MIN_SIGMA)
    null_gate = pi < pi_threshold or bic_mix >= bic_null or separation < 1.0

    if null_gate:
        return ModificationProbabilities(
            algorithm=AlgorithmName.MDS_GMM,
            position=-1,
            probabilities=np.zeros(n_total, dtype=np.float64),
            n_native=n_native,
            n_ivt=n_ivt,
            scores=scores_all,
            null_gate_active=True,
            mixing_proportion=pi,
        )

    # Clamp pi to prevent degenerate posteriors when pi→1.
    pi_post = min(max(pi, 0.01), 0.7)
    f0_all = _mvn_pdf(coords, mu0, sigma0)
    f1_all = _mvn_pdf(coords, mu1, sigma1)
    denom_all = (1.0 - pi_post) * f0_all + pi_post * f1_all + _EPS
    probs = (pi_post * f1_all) / denom_all
    probs = np.clip(probs, 0.0, 1.0)

    return ModificationProbabilities(
        algorithm=AlgorithmName.MDS_GMM,
        position=-1,
        probabilities=probs,
        n_native=n_native,
        n_ivt=n_ivt,
        scores=scores_all,
        null_gate_active=False,
        mixing_proportion=pi,
    )