Source code for pacman.lib.read_fit_par

import warnings
import numpy as np


def _as_bool(value):
    return str(value).lower() == "true"


[docs]def prior_bounds(prior, p1, p2, nsigma=5.0): """Return lower/upper bounds implied by a prior. U: p1, p2 are lower/upper bounds. N: p1 is mean, p2 is sigma, bounds are mean +/- nsigma*sigma. X: no bounds. """ prior = str(prior).upper() p1 = float(p1) p2 = float(p2) if prior == "U": if p2 <= p1: raise ValueError( f"Uniform prior requires p2 > p1, but got p1={p1}, p2={p2}." ) return p1, p2 if prior == "N": if p2 <= 0: raise ValueError( f"Normal prior requires sigma p2 > 0, but got p2={p2}." ) return p1 - nsigma * p2, p1 + nsigma * p2 if prior == "X": return -np.inf, np.inf raise ValueError(f"Unknown prior type '{prior}'. Expected U, N, or X.")
[docs]def validate_fit_par( fit_par, run_mcmc=False, run_nested=False, nsigma_lsq=5.0, warn_only_for_x=True, ): """Validate fit_par.txt. - Checks that initial values are inside the implied prior/bounds. - Warns if MCMC/nested is requested but a free parameter has prior X. """ problems = [] x_free = [] for i in range(len(fit_par)): name = str(fit_par["parameter"][i]) tied = str(fit_par["tied"][i]) fixed = _as_bool(fit_par["fixed"][i]) prior = str(fit_par["prior"][i]).upper() value = float(fit_par["value"][i]) p1 = float(fit_par["p1"][i]) p2 = float(fit_par["p2"][i]) label = name if tied == "-1" else f"{name}_{tied}" if prior not in ["U", "N", "X"]: problems.append( f"{label}: unknown prior '{prior}'. Use U, N, or X." ) continue if (run_mcmc or run_nested) and (not fixed) and prior == "X": x_free.append(label) if prior in ["U", "N"]: lo, hi = prior_bounds(prior, p1, p2, nsigma=nsigma_lsq) if not (lo <= value <= hi): if prior == "U": problems.append( f"{label}: initial value {value} is outside " f"uniform prior [{lo}, {hi}]." ) elif prior == "N": problems.append( f"{label}: initial value {value} is outside " f"{nsigma_lsq:g}σ normal-prior sanity range " f"[{lo}, {hi}]." ) if x_free: message = ( "The following free parameters have prior X while MCMC/nested " "sampling is enabled:\n" + "\n".join(f" - {name}" for name in x_free) + "\nThese parameters are unconstrained by the sampler prior. " "Use prior U for bounded parameters or N for Gaussian priors." ) if warn_only_for_x: warnings.warn(message, RuntimeWarning) else: problems.append(message) if problems: raise ValueError( "\nInvalid fit_par.txt configuration:\n" + "\n".join(f" - {problem}" for problem in problems) )
[docs]def read_fit_par_for_ls(parinfo, params_s, data, fit_par): """Read fit_par.txt and prepare MPFIT parinfo. New fit_par format: #parameter fixed tied value prior p1 p2 Bounds: - U: [p1, p2] - N: [p1 - 5*p2, p1 + 5*p2] - X: unbounded; should normally only be used for fixed parameters """ nvisit = data.nvisit ii = 0 for i in range(int(len(data.parnames))): tied = str(fit_par["tied"][ii]) if tied == "-1": # One row, shared/tied across visits. prior = fit_par["prior"][ii] p1 = fit_par["p1"][ii] p2 = fit_par["p2"][ii] lo, hi = prior_bounds(prior, p1, p2, nsigma=5.0) for j in range(nvisit): idx = i * nvisit + j parinfo[idx]["value"] = float(fit_par["value"][ii]) parinfo[idx]["fixed"] = _as_bool(fit_par["fixed"][ii]) if j > 0: parinfo[idx]["tied"] = f"p[{nvisit * i}]" if np.isfinite(lo): parinfo[idx]["limited"][0] = True parinfo[idx]["limits"][0] = lo if np.isfinite(hi): parinfo[idx]["limited"][1] = True parinfo[idx]["limits"][1] = hi params_s.append(float(fit_par["value"][ii])) ii += 1 else: # One row per visit. for j in range(nvisit): idx = i * nvisit + j prior = fit_par["prior"][ii] p1 = fit_par["p1"][ii] p2 = fit_par["p2"][ii] lo, hi = prior_bounds(prior, p1, p2, nsigma=5.0) parinfo[idx]["value"] = float(fit_par["value"][ii]) parinfo[idx]["fixed"] = _as_bool(fit_par["fixed"][ii]) if np.isfinite(lo): parinfo[idx]["limited"][0] = True parinfo[idx]["limits"][0] = lo if np.isfinite(hi): parinfo[idx]["limited"][1] = True parinfo[idx]["limits"][1] = hi params_s.append(float(fit_par["value"][ii])) ii += 1 return parinfo, np.array(params_s)
[docs]def get_initial_walkers(data, theta, meta, fit_par): """Generate initial MCMC walker positions from fit_par priors. For U priors: draw uniformly from [p1, p2] For N priors: draw from Normal(mean=p1, sigma=p2) For X priors: raise an error for free parameters, because MCMC needs a finite initialization scale. """ nwalkers = meta.run_nwalkers ndim = len(theta) priors = [] ii = 0 nvisit = int(meta.nvisit) for i in range(len(fit_par)): if ii == len(fit_par): break fixed = _as_bool(fit_par["fixed"][ii]) tied = str(fit_par["tied"][ii]) if not fixed: if tied == "-1": priors.append( ( str(fit_par["prior"][ii]).upper(), float(fit_par["p1"][ii]), float(fit_par["p2"][ii]), ) ) ii += 1 else: for _ in range(nvisit): priors.append( ( str(fit_par["prior"][ii]).upper(), float(fit_par["p1"][ii]), float(fit_par["p2"][ii]), ) ) ii += 1 else: ii += 1 if len(priors) != ndim: raise ValueError( f"Expected {ndim} priors for MCMC initialization, " f"but found {len(priors)}." ) pos = np.zeros((nwalkers, ndim), dtype=float) rng = np.random.default_rng() for j, (prior, p1, p2) in enumerate(priors): if prior == "U": if p2 <= p1: raise ValueError( f"Uniform prior requires p2 > p1, got p1={p1}, p2={p2}." ) pos[:, j] = rng.uniform(p1, p2, size=nwalkers) elif prior == "N": if p2 <= 0: raise ValueError( f"Normal prior requires sigma p2 > 0, got p2={p2}." ) pos[:, j] = rng.normal(p1, p2, size=nwalkers) elif prior == "X": raise ValueError( "Cannot initialize MCMC walkers for a free parameter with " "prior X. Use U or N." ) else: raise ValueError(f"Unknown prior type '{prior}'.") return pos
[docs]def validate_c_against_light_curve( fit_par, data, nsigma_lc=100.0, nsigma_prior=100.0, warn_only=False, ): """Check that c guesses and bounds are plausible for the light curve. PACMAN's constant model uses: flux_model = 10**c Therefore c is checked against log10(flux), not flux. """ if "c" not in data.parnames: return c_rows = fit_par[fit_par["parameter"] == "c"] if len(c_rows) == 0: raise ValueError( "Parameter 'c' is used by the model, but no 'c' row was found " "in fit_par.txt." ) problems = [] summaries = [] tied_values = np.array([int(tied) for tied in c_rows["tied"]]) for visit in range(data.nvisit): visit_mask = data.vis_num == visit flux = np.asarray(data.flux[visit_mask], dtype=float) flux = flux[np.isfinite(flux) & (flux > 0)] if len(flux) == 0: problems.append(f"Visit {visit}: no finite positive flux values found.") continue log_flux = np.log10(flux) log_min = float(np.min(log_flux)) log_max = float(np.max(log_flux)) log_mean = float(np.mean(log_flux)) log_std = float(np.std(log_flux)) if log_std == 0: log_std = 1.0e-12 sanity_lo = log_min - nsigma_lc * log_std sanity_hi = log_max + nsigma_lc * log_std visit_match = np.where(tied_values == visit)[0] shared_match = np.where(tied_values == -1)[0] if len(visit_match) > 0: row = c_rows[visit_match[0]] label = f"c_{visit}" elif len(shared_match) > 0: row = c_rows[shared_match[0]] label = "c" else: problems.append( f"Visit {visit}: no c row found with tied={visit} " "and no shared c row with tied=-1." ) continue c_value = float(row["value"]) prior = str(row["prior"]).upper() p1 = float(row["p1"]) p2 = float(row["p2"]) values_to_check = [("initial value", c_value)] if prior == "U": c_lo = p1 c_hi = p2 values_to_check.extend([ ("lower prior bound", c_lo), ("upper prior bound", c_hi), ]) elif prior == "N": c_lo = p1 - nsigma_prior * p2 c_hi = p1 + nsigma_prior * p2 values_to_check.extend([ (f"lower {nsigma_prior:g}σ prior sanity bound", c_lo), (f"upper {nsigma_prior:g}σ prior sanity bound", c_hi), ]) elif prior == "X": pass else: problems.append( f"{label}: unknown prior type '{prior}'. Use U, N, or X." ) continue for description, value in values_to_check: if not (sanity_lo <= value <= sanity_hi): problems.append( f"Visit {visit}, {label}: {description} {value:.6f} " f"is outside the log10(flux) sanity range " f"[{sanity_lo:.6f}, {sanity_hi:.6f}]. " f"Flux stats: mean={np.mean(flux):.6e}, " f"min={np.min(flux):.6e}, max={np.max(flux):.6e}; " f"log10(flux) stats: mean={log_mean:.6f}, " f"std={log_std:.6f}." ) summaries.append( f"Visit {visit}: log10(flux) mean={log_mean:.6f}, " f"std={log_std:.6f}, sanity range=[{sanity_lo:.6f}, " f"{sanity_hi:.6f}], {label}={c_value:.6f}" ) if problems: message = ( "\nSuspicious c values in fit_par.txt:\n" + "\n".join(f" - {problem}" for problem in problems) + "\n\nPACMAN's constant model uses flux = 10**c. " "This usually means c should be close to log10(raw flux)." ) if warn_only: import warnings warnings.warn(message, RuntimeWarning) else: raise ValueError(message) print("c sanity check passed:") for summary in summaries: print(" " + summary)