Source code for pacman.lib.util

import math
import time
import shutil
from pathlib import Path
from typing import Optional

import numpy as np
import numpy.ma as ma
from astropy.io import ascii, fits
from astropy.table import Table
from scipy.interpolate import interp1d
from scipy.optimize import leastsq
from scipy.signal import find_peaks

from .sort_nicely import sort_nicely as sn
from ..lib import plots
from . import manageevent as me
from . import logedit
from . import read_pcf as rd


# s00
[docs]def readfiles(meta): """Reads in the files saved in datadir and saves them into a list. Parameters ---------- meta : Metadata object' Returns ------- meta : Metadata object but adds segment_list to metadata containing the sorted data fits files. Notes ----- History: Written by Sebastian Zieba December 2021 """ meta.segment_list = [] for fname in meta.datadir.iterdir(): if fname.name.endswith(f'{meta.suffix}.fits'): meta.segment_list.append(meta.datadir / fname) return meta
#s01
[docs]def find_latest_stage_run(rundir, stage_name, pattern): """Return the most recent run directory inside a stage directory.""" stage_dir = Path(rundir) / stage_name run_dirs = sorted(stage_dir.glob(pattern)) if len(run_dirs) == 0: raise FileNotFoundError(f'No run directories found in {stage_dir} matching {pattern}') return run_dirs[-1]
[docs]def setup_stage( pcf_path, stage_num, previous_stage_num, stage_subdir=None, copy_filelist=True, copy_xrefyref=False, copy_ancil=False, copy_extracted_lc=False, copy_extracted_sp=False, meta=None, ): """Set up a PACMAN stage run directory. This function: - finds the latest previous-stage run directory - loads metadata from that previous stage - creates the new stage directory and timestamped workdir - copies current obs_par.pcf and fit_par.txt from pacman_run_files - updates meta with the copied obs_par.pcf settings - copies selected products from the previous stage - initializes the stage log Parameters ---------- pcf_path : pathlib.Path Path to pacman_run_files. stage_num : str Stage number, e.g. "01", "02", "03", "10", "20", "21". previous_stage_num : str Previous stage number, e.g. "00", "01", "02", "03", "10", "20". copy_filelist : bool Copy filelist.txt from previous stage. copy_xrefyref : bool Copy xrefyref.txt from previous stage. copy_ancil : bool Copy ancil/ from previous stage. copy_extracted_lc : bool Copy extracted_lc/ from previous stage. copy_extracted_sp : bool Copy extracted_sp/ from previous stage. meta : object or None Optional metadata object. If None, metadata is loaded from previous stage. Returns ------- meta Updated metadata object. log Logedit object for the current stage. """ pcf_path = Path(pcf_path) rundir = pcf_path.parent stage_name = f"stage{stage_num}" run_pattern = f"s{stage_num}_run_*" previous_stage_name = f"stage{previous_stage_num}" previous_run_pattern = f"s{previous_stage_num}_run_*" input_workdir = find_latest_stage_run( rundir, previous_stage_name, previous_run_pattern, ) if meta is None: meta = me.loadevent(input_workdir / "WFC3_Meta_Save") datetime = time.strftime("%Y-%m-%d_%H-%M-%S") meta.inputdir = input_workdir stage_dir = rundir / stage_name # the following is just for s30, where we want to save the results in a subdirectories called "white_lc" or "spec_lc" depending on the type of light curve we are fitting # I thought that might help to keep s30 a bit more organized :) if stage_subdir is not None: stage_dir = stage_dir / stage_subdir setattr(meta, f"stage{stage_num}dir", stage_dir) meta.workdir = stage_dir / f"s{stage_num}_run_{datetime}" meta.workdir.mkdir(parents=True, exist_ok=True) # Copy current config files from pacman_run_files shutil.copy(pcf_path / "obs_par.pcf", meta.workdir) shutil.copy(pcf_path / "fit_par.txt", meta.workdir) # Update meta using the copied obs_par.pcf snapshot pcf = rd.read_pcf(meta.workdir / "obs_par.pcf") rd.store_pcf(meta, pcf) # Copy selected files/directories from previous stage if copy_filelist and (meta.inputdir / "filelist.txt").exists(): shutil.copy(meta.inputdir / "filelist.txt", meta.workdir) if copy_xrefyref and (meta.inputdir / "xrefyref.txt").exists(): shutil.copy(meta.inputdir / "xrefyref.txt", meta.workdir) if copy_ancil and (meta.inputdir / "ancil").exists(): shutil.copytree( meta.inputdir / "ancil", meta.workdir / "ancil", dirs_exist_ok=True, ) if copy_extracted_lc and (meta.inputdir / "extracted_lc").exists(): shutil.copytree( meta.inputdir / "extracted_lc", meta.workdir / "extracted_lc", dirs_exist_ok=True, ) if copy_extracted_sp and (meta.inputdir / "extracted_sp").exists(): shutil.copytree( meta.inputdir / "extracted_sp", meta.workdir / "extracted_sp", dirs_exist_ok=True, ) previous_log = meta.inputdir / f"s{previous_stage_num}.log" meta.logname = meta.workdir / f"s{stage_num}.log" log = logedit.Logedit(meta.logname, read=previous_log) log.writelog(f"Starting s{stage_num}") log.writelog(f"Using Stage {previous_stage_num} input directory: {meta.inputdir}") log.writelog(f"Location of the new Stage {stage_num} run directory: {meta.workdir}") return meta, log
# s02
[docs]def ancil(meta, s10: Optional[bool] = False, s20: Optional[bool] = False): """This function loads in a lot of useful arrays and values into meta. The following additional information are being loading into meta: - **norbit:** number of orbits - **nvisit:** number of visits - **files_sp:** all spectra files - **files_di:** all direct image files - **ra:** RA of the target in radians (from the header) (Note: this data is taken from the first spectrum file) - **dec:** DEC of the target in radians (from the header) (Note: this data is taken from the first spectrum file) - **coordtable:** a list of files containing the vector information of HST downloaded in s01 Parameters ---------- meta metadata object s10: bool Is set to True when s10 is being performed s20: bool Is set to True when s20 is being performed Returns ------- meta metadata object Notes ----- History: Written by Sebastian Zieba December 2021 """ filelist = ascii.read(str(meta.workdir / "filelist.txt")) aa = filelist['iorbit'].data bb = np.diff(aa) meta.norbit, meta.nvisit = len(np.insert(aa[1:][np.where(bb != 0)], 0, aa[0])), len(set(filelist['ivisit'].data)) meta.mask_sp = np.array([i[0] for i in filelist['instr']]) == 'G' meta.mask_di = np.array([i[0] for i in filelist['instr']]) == 'F' meta.files_sp = [Path(meta.datadir) / i for i in filelist['filenames'][meta.mask_sp].data] meta.files_di = [Path(meta.datadir) / i for i in filelist['filenames'][meta.mask_di].data] f = fits.open(meta.files_sp[0]) meta.ra = f[0].header['ra_targ'] * math.pi / 180.0 # stores right ascension meta.dec = f[0].header['dec_targ'] * math.pi / 180.0 # stores declination horizons_dir = getattr(meta, "inputdir", meta.workdir) / "ancil" / "horizons" meta.coordtable = []# table of spacecraft coordinates for i in range(int(np.max(filelist["ivisit"])) + 1): meta.coordtable.append(horizons_dir / f"horizons_results_v{i}.txt") # 03 # TODO: Q: Is it okay if I assume everything in datadir uses same Filter Grism pair? meta.filter = filelist['instr'][meta.mask_di][0] meta.grism = filelist['instr'][meta.mask_sp][0] meta.scans_sp = filelist['scan'][meta.mask_sp].data meta.iorbit_sp = filelist['iorbit'][meta.mask_sp].data meta.ivisit_sp = filelist['ivisit'][meta.mask_sp].data meta.iexp_orb_sp = filelist['iexp_orb'][meta.mask_sp].data # the following list stores the cumulative orbit number meta.iorbit_sp_cumulative = np.zeros(len(meta.iorbit_sp), dtype=int) c = 0 for i in range(len(meta.iorbit_sp) - 1): if meta.iorbit_sp[i + 1] != meta.iorbit_sp[i]: c = c + 1 meta.iorbit_sp_cumulative[i + 1] = c # the following lists store the indices of spectral files where the orbit/visit increases meta.new_orbit_idx_sp = np.concatenate(([0], np.where(np.diff(meta.iorbit_sp)!=0)[0]+1)) meta.new_visit_idx_sp = np.concatenate(([0], np.where(np.diff(meta.ivisit_sp)!=0)[0]+1)) meta.iorbit_di = filelist['iorbit'][meta.mask_di].data meta.ivisit_di = filelist['ivisit'][meta.mask_di].data # the following list stores the cumulative orbit number for the direct images meta.iorbit_di_cumulative = np.zeros(len(meta.iorbit_di), dtype=int) c = 0 for i in range(len(meta.iorbit_di) - 1): if meta.iorbit_di[i + 1] != meta.iorbit_di[i]: c = c + 1 meta.iorbit_di_cumulative[i + 1] = c # TODO: this is a problem if there is one DI per visit. It will give us meta.iorbit_di_cumulative == [0,0,0,...] # TODO: with as many zeros as visits # TODO: I implemented a quick fix (29th march 2023): # TODO: i check if there are as many DIs as visits and if the DIs have been taken same time apart. t_bjd_di_tmp = filelist['t_mjd'][meta.mask_di].data if (len(meta.iorbit_di) == (max(meta.ivisit_sp)+1)) and (np.all(np.diff(t_bjd_di_tmp)>1/24)): meta.iorbit_di_cumulative = np.arange(len(meta.iorbit_di), dtype=int) meta.t_mjd_sp = filelist['t_mjd'][meta.mask_sp].data meta.t_orbit_sp = filelist['t_orbit'][meta.mask_sp].data meta.t_visit_sp = filelist['t_visit'][meta.mask_sp].data meta.platescale = 0.13 # IR detector has plate scale of 0.13 arcsec/pixel f_sp0 = fits.open(meta.files_sp[0]) meta.POSTARG1_sp = f_sp0[0].header['POSTARG1'] # x-coordinate of the observer requested target offset meta.POSTARG2_sp = f_sp0[0].header['POSTARG2'] # y-coordinate of the observer requested target offset meta.LTV1 = int(f_sp0[1].header['LTV1']) # X offset to get into physical pixels meta.LTV2 = int(f_sp0[1].header['LTV2']) # Y offset to get into physical pixels meta.subarray_size = f_sp0[1].header['SIZAXIS1'] # size of subarray f_sp0.close() f_di0 = fits.open(meta.files_di[0]) meta.POSTARG1_di = f_di0[0].header['POSTARG1'] # x-coordinate of the observer requested target offset meta.POSTARG2_di = f_di0[0].header['POSTARG2'] # y-coordinate of the observer requested target offset meta.POSTARG1 = meta.POSTARG1_sp - meta.POSTARG1_di meta.POSTARG2 = meta.POSTARG2_sp - meta.POSTARG2_di if meta.grism == 'G102': meta.BEAMA_i = 41 meta.BEAMA_f = 248 elif meta.grism == 'G141': meta.BEAMA_i = 15 meta.BEAMA_f = 196 f_di0.close() if s10: if 't_bjd' in filelist.keys(): meta.t_bjd_sp = filelist['t_bjd'][meta.mask_sp].data if 't_bjd' in filelist.keys(): meta.t_bjd_di = filelist['t_bjd'][meta.mask_di].data if s20: meta.rdnoise = 22.0 # read noise if meta.grism == 'G102': meta.flat = meta.pacmandir / 'data' / 'flats' / 'WFC3.IR.G102.flat.2.fits' elif meta.grism == 'G141': meta.flat = meta.pacmandir / 'data' / 'flats' / 'WFC3.IR.G141.flat.2.fits' return meta
# 03
[docs]def gaussian_kernel(meta, x, y): """Performs a gaussian kernel over an array. Used in smoothing of the stellar spectrum. Taken from: https://stackoverflow.com/questions/24143320/gaussian-sum-filter-for-irregular-spaced-points. """ y = y / max(y) x.sort() resolution = min(len(x), 3000) x_eval = np.linspace(min(x), max(x), resolution) sigma = meta.smooth_sigma * 1e-10 / 5 delta_x = x_eval[:, None] - x weights = np.exp(-delta_x*delta_x / (2*sigma*sigma)) / (np.sqrt(2*np.pi) * sigma) weights /= np.sum(weights, axis=1, keepdims=True) y_eval = np.dot(weights, y) y_eval = y_eval / max(y_eval) if meta.save_smooth_plot: plots.smooth(meta, x, y, x_eval, y_eval) return x_eval, y_eval
# def gaussian_kernel_old(meta, x, y): # """ # https://matthew-brett.github.io/teaching/smoothing_intro.html # """ # # sigma = meta.smooth_sigma * 1e-10 # # y = y / max(y) # y_smoothed = np.zeros(y.shape) # for x_idx, x_val in tqdm(enumerate(x), desc='smoothing...', total=len(x)): # kernel = np.exp(-(x - x_val) ** 2 / (2 * sigma ** 2)) # kernel = kernel / sum(kernel) # y_smoothed[x_idx] = sum(y * kernel) # y_smoothed = y_smoothed / max(y_smoothed) # # if meta.save_smooth_plot: # plots.smooth(meta, x, y, x, y_smoothed) # # return (x, y_smoothed) # s10
[docs]def di_reformat(meta): """This function was introduced because some observations have several DIs per orbit. The user can set in the pcf how they want to determine the DI target position in this case. """ iorbit_max = max(meta.iorbit_sp_cumulative) ivisit_max = max(meta.ivisit_sp) control_one_per_orbit = np.arange(iorbit_max + 1) control_one_per_visit = np.arange(ivisit_max + 1) reffile = ascii.read(str(meta.workdir / "xrefyref.txt")) meta.ivisits_new_orbit = meta.iorbit_sp[meta.new_visit_idx_sp] meta.nvisits_in_orbit = np.diff(np.append(meta.iorbit_sp[meta.new_visit_idx_sp], np.array([max(meta.iorbit_sp) + 1]))) meta.norbits_in_visit = np.append(meta.iorbit_sp[meta.new_visit_idx_sp[1:]-1],meta.iorbit_sp[-1])+1 # First case: Every orbit has just one DI if np.array_equal(reffile['iorbit_cumul'], control_one_per_orbit): print('There is one DI per orbit.') meta.refpix = np.zeros((iorbit_max + 1, 3)) for i in range(iorbit_max + 1): meta.refpix[i] = [reffile['t_bjd'][i], reffile['pos1'][i], reffile['pos2'][i]] # print(reffile['t_bjd'][i], reffile['pos1'][i], reffile['pos2'][i], file=f) # f.close() # Second case: Every visit has just one DI elif np.array_equal(reffile['iorbit_cumul'], control_one_per_visit): print('There is one DI per visit.') meta.refpix = [] counter = 0 for i in range(len(meta.norbits_in_visit)): for j in range(meta.norbits_in_visit[counter]): meta.refpix.append([reffile['t_bjd'][counter], reffile['pos1'][counter], reffile['pos2'][counter]]) counter = counter + 1 meta.refpix = np.array(meta.refpix) # Third case: Every orbit contains at least one DI. But there is at least one orbit with more than one DI. elif set(control_one_per_orbit) == set(reffile['iorbit_cumul']): print('There is at least one orbit with at least more than one DI') meta.refpix = np.zeros((iorbit_max + 1, 3)) for i in range(iorbit_max + 1): mask_i = reffile['iorbit'] == i if meta.di_multi == 'median': meta.refpix[i] = [np.median(reffile['t_bjd'][mask_i]), np.median(reffile['pos1'][mask_i]), np.median(reffile['pos2'][mask_i])] # print(np.median(reffile['t_bjd'][mask_i]), np.median(reffile['pos1'][mask_i]), np.median(reffile['pos2'][mask_i]), file=f) elif meta.di_multi == 'latest': meta.refpix[i] = [reffile['t_bjd'][mask_i][-1], reffile['pos1'][mask_i][-1], reffile['pos2'][mask_i][-1]] # print(reffile['t_bjd'][mask_i][-1], reffile['pos1'][mask_i][-1], reffile['pos2'][mask_i][-1], file=f) # f.close() # TODO Add this check # Third case. Not every orbit has a DI. if set(control_one_per_orbit) != set(reffile['iorbit_cumul']) and len(set(control_one_per_orbit)) < len( set(reffile['iorbit_cumul'])): print('Not every orbit has a DI.')
# f.close() # s20
[docs]def get_wave_grid(meta): """Gets grid of wavelength solutions for each orbit and row.""" if meta.grism == 'G102': from ..lib import geometry102 as geo elif meta.grism == 'G141': from ..lib import geometry141 as geo wave_grid = np.empty((meta.norbit*meta.nvisit, meta.subarray_size, meta.subarray_size)) #calculates wavelength solution row by row for each orbit for i in range(meta.norbit): for j in range(meta.subarray_size): disp_solution = geo.dispersion(meta.refpix[i,1], -meta.LTV2+j) delx = 0.5 + np.arange(meta.subarray_size) - (meta.refpix[i,2] + meta.LTV1 + meta.POSTARG1/meta.platescale) wave_grid[i, j, :] = disp_solution[0] + delx*disp_solution[1] return wave_grid
[docs]def get_flatfield(meta): # function that flatfields a data array D, which starts at [minr, cmin] of hdu[1].data """Opens the flat file and uses it for bad pixel masking.""" flat = fits.open(meta.flat) # reads in flatfield cube WMIN = flat[0].header['WMIN'] # constants for computing flatfield coefficients WMAX = flat[0].header['WMAX'] #print('flatfield:', WMIN, WMAX) a0 = flat[0].data[-meta.LTV1:-meta.LTV1+meta.subarray_size, -meta.LTV2:-meta.LTV2+meta.subarray_size] a1 = flat[1].data[-meta.LTV1:-meta.LTV1+meta.subarray_size, -meta.LTV2:-meta.LTV2+meta.subarray_size] a2 = flat[2].data[-meta.LTV1:-meta.LTV1+meta.subarray_size, -meta.LTV2:-meta.LTV2+meta.subarray_size] flatfield = [] for i in range(meta.norbit*meta.nvisit): wave = meta.wave_grid[i, :] x = (wave - WMIN)/(WMAX-WMIN) flatfield.append(a0+a1*x+a2*x**2) flatfield[i][flatfield[i] < 0.5] = -1. #sets flatfield pixels below 0.5 to -1 so they can be masked return flatfield
[docs]def peak_finder(array, i, ii, orbnum, meta): """Finding peaks in an array.""" # determine the aperture cutout rowmedian = np.median(array, axis=1) # median of every row rowmedian_absder = abs( rowmedian[1:] - rowmedian[:-1]) # absolute derivative in order to determine where the spectrum is # we use scipy.signal.find_peaks to determine in which rows the spectrum starts and ends # TODO: Think about better values for height and distance # TODO: Finding the row with the highest change in flux (compared to row above and below) isnt robust against outliers! peaks, _ = find_peaks(rowmedian_absder, height=max(rowmedian_absder * 0.3), distance=2) # peaks = peaks[:2] # only take the two biggest peaks (there should be only 2) peaks = np.array([min(peaks[:2]), max(peaks[:2]) + 1]) if meta.save_utr_plot or meta.show_utr_plot: plots.utr(array, meta, i, ii, orbnum, rowmedian, rowmedian_absder, peaks) return peaks
[docs]def median_abs_dev(vec): """Used to determine the variance for the background count estimate.""" med = ma.median(vec) return ma.median(abs(vec - med))
[docs]def zero_pad_x(array): # TODO: Try to make this look nicer array_new = np.concatenate((np.linspace(-5000, min(array)+1, 10, endpoint=True), array, np.linspace(max(array) + 350, 30000, 10, endpoint=False))) return array_new
[docs]def zero_pad_y(array): return np.concatenate((np.zeros(10), array, np.zeros(10)))
[docs]def read_refspec(meta): """Reads in the reference spectrum.""" refspec = np.loadtxt(meta.workdir / 'ancil' / 'refspec' / 'refspec.txt').T x_refspec, y_refspec = refspec[0]*1e10, refspec[1]/max(refspec[1]) x_refspec = zero_pad_x(x_refspec) y_refspec = zero_pad_y(y_refspec) return x_refspec, y_refspec
[docs]def residuals2(params, x1, y1, x2, y2): """Calculate residuals for leastsq.""" a, b, c = params x1 = np.array(x1) x2 = np.array(x2) y1 = np.array(y1) y2 = np.array(y2) f = interp1d(x1, y1, kind='linear') fit = f(a+b*x2)*c return fit - y2
[docs]def residuals2_lin(params, x1, y1, x2, y2): """Calculate residuals for leastsq.""" a, c = params x1 = np.array(x1) x2 = np.array(x2) y1 = np.array(y1) y2 = np.array(y2) f = interp1d(x1, y1, kind='linear') fit = f(a+x2)*c return fit - y2
[docs]def get_boxspat(meta,i,d,rmin,rmax,cmin,cmax,imageIndex):#,x_ref, y_ref,x_data, y_data,i,direction='y'): fullframe_first = d[imageIndex].data[rmin:rmax,cmin:cmax] #first non-destructive exposure # NOTE: Background subtraction if not meta.background_box: below_threshold = fullframe_first < meta.background_thld # mask with all pixels below the user defined threshold skymedian_first = np.median(fullframe_first[below_threshold].flatten()) # estimates the background counts by taking the flux median of the pixels below the flux threshold else: bg_rmin, bg_rmax, bg_cmin, bg_cmax = int(meta.bg_rmin), int(meta.bg_rmax), int(meta.bg_cmin), int(meta.bg_cmax), skymedian_first = np.median(fullframe_first[bg_rmin:bg_rmax, bg_cmin:bg_cmax].flatten()) spectrum_first = fullframe_first - skymedian_first spat_box_0_first = spectrum_first.sum(axis = 1) #box-extracted spatial profile, sums along dispersion direction (sums columns in a row) # NOTE: norm profile and remove nans. r_array = np.linspace(rmin,rmax+1,rmax-rmin) #array with row indexes spat_box_0_first = spat_box_0_first/np.nanmax(spat_box_0_first) r_array = r_array[~np.isnan(spat_box_0_first)] spat_box_0_first = spat_box_0_first[~np.isnan(spat_box_0_first)] # NOTE: interpolate spectrum for fit r_array_interp = np.linspace(rmin,rmax+1,(rmax-rmin)*1000) spat_box_0_first_interp = np.interp(r_array_interp, r_array, spat_box_0_first) return r_array_interp, spat_box_0_first_interp
[docs]def validate_rowshift_settings(meta): """Check consistency of rowshift/stretch plotting settings.""" rowshift_plot_requested = ( meta.save_rowshift_plot or meta.show_rowshift_plot or meta.save_rowshift_stretch_plot or meta.show_rowshift_stretch_plot ) if rowshift_plot_requested and not meta.calculate_rowshift: raise ValueError( "Rowshift/stretch plots were requested, but " "calculate_rowshift is False.\n" "Set calculate_rowshift=True, or set all of the following to False:\n" " save_rowshift_plot\n" " show_rowshift_plot\n" " save_rowshift_stretch_plot\n" " show_rowshift_stretch_plot" )
[docs]def calculate_rowshift(meta,x_ref, y_ref,x_data, y_data,i): p0 = [0,1] # initial guess for least squares leastsq_res = leastsq(residuals2_lin, p0, args=(x_ref, y_ref,x_data, y_data))[0] if meta.save_rowshift_plot or meta.show_rowshift_plot: plots.rowshift_fit(x_ref, y_ref, x_data, y_data, leastsq_res, meta,i) return leastsq_res[0]#float
[docs]def calculate_stretch(meta,x_ref, y_ref,x_data, y_data,i): #all np arrays f(a+b*x2)*c p0 = [0,1,1] # initial guess for least squares leastsq_res = leastsq(residuals2, p0, args=(x_ref, y_ref,x_data, y_data))[0] if meta.save_rowshift_stretch_plot or meta.show_rowshift_stretch_plot: plots.stretch_fit(x_ref, y_ref, x_data, y_data, leastsq_res, meta,i) return leastsq_res[1]#float
[docs]def correct_wave_shift_fct_0(meta, orbnum, cmin, cmax, spec_opt, i): """Use the reference spectrum for the wave cal.""" template_waves = meta.wave_grid[0, int(meta.refpix[orbnum, 1]) + meta.LTV1, cmin:cmax] g102mask = template_waves > 820 # we dont use the spectrum below 8200 angstrom for the interpolation as the reference bandpass cuts out below this wavelength x_refspec, y_refspec = read_refspec(meta) # TODO: will break if optimal extractions isnt used! #x_data & y_data is the spectrum if no wavelength correction would be performed x_data = template_waves[g102mask] y_data = (spec_opt / max(spec_opt))[g102mask] p0 = [0, 1, 1] # initial guess for least squares leastsq_res = leastsq(residuals2, p0, args=(x_refspec, y_refspec, x_data, y_data))[0] #leastsq_res[0] = leastsq_res[0] + 60 if meta.save_refspec_fit_plot or meta.show_refspec_fit_plot: plots.refspec_fit(x_refspec, y_refspec, p0, x_data, y_data, leastsq_res, meta, i) # for all other but first exposure in visit exposures x_data_firstexpvisit = leastsq_res[0] + x_data * leastsq_res[1] y_data_firstexpvisit = np.copy(y_data) #np.savetxt('testing_template.txt', list(zip(x_refspec, y_refspec))) #np.savetxt('testing_exp0.txt', list(zip(x_data, y_data))) return x_data_firstexpvisit, y_data_firstexpvisit, leastsq_res
[docs]def correct_wave_shift_fct_0_lin(meta, orbnum, cmin, cmax, spec_opt, i): """Use the reference spectrum for the wave cal.""" template_waves = meta.wave_grid[0, int(meta.refpix[orbnum, 1]) + meta.LTV1, cmin:cmax] g102mask = template_waves > 820 # we dont use the spectrum below 8200 angstrom for the interpolation as the reference bandpass cuts out below this wavelength x_refspec, y_refspec = read_refspec(meta) # TODO: will break if optimal extractions isnt used! #x_data & y_data is the spectrum if no wavelength correction would be performed x_data = template_waves[g102mask] y_data = (spec_opt / max(spec_opt))[g102mask] p0 = [0, 1] # initial guess for least squares leastsq_res = leastsq(residuals2_lin, p0, args=(x_refspec, y_refspec, x_data, y_data))[0] # leastsq_res[0] = leastsq_res[0] + 60 if meta.save_refspec_fit_plot or meta.show_refspec_fit_plot: plots.refspec_fit_lin(x_refspec, y_refspec, p0, x_data, y_data, leastsq_res, meta, i) # for all other but first exposure in visit exposures x_data_firstexpvisit = leastsq_res[0] + x_data y_data_firstexpvisit = np.copy(y_data) # np.savetxt('/home/zieba/Desktop/Projects/Observations/Hubble/KELT11_15255/run_2022-05-25_16-57-51_new/spectra_drift/' + 'testing_template.txt', list(zip(x_refspec_new, y_refspec_new))) # np.savetxt('/home/zieba/Desktop/Projects/Observations/Hubble/KELT11_15255/run_2022-05-25_16-57-51_new/spectra_drift/' + 'testing_exp0.txt', list(zip(x_data, y_data))) return x_data_firstexpvisit, y_data_firstexpvisit, leastsq_res
[docs]def correct_wave_shift_fct_00(meta, orbnum, cmin, cmax, spec_opt, i): """Use the first exposure in the visit for wave cal.""" template_waves = meta.wave_grid[0, int(meta.refpix[orbnum, 1]) + meta.LTV1, cmin:cmax] x_refspec, y_refspec = template_waves, spec_opt / max(spec_opt) # TODO: will break if optimal extractions isnt used! #x_data & y_data is the spectrum if no wavelength correction would be performed x_data = template_waves y_data = (spec_opt / max(spec_opt)) p0 = [0, 1, 1] # initial guess for least squares leastsq_res = leastsq(residuals2, p0, args=(x_refspec, y_refspec, x_data, y_data))[0] if meta.save_refspec_fit_plot or meta.show_refspec_fit_plot: plots.refspec_fit(x_refspec, y_refspec, p0, x_data, y_data, leastsq_res, meta, i) # for all other but first exposure in visit exposures x_data_firstexpvisit = leastsq_res[0] + x_data * leastsq_res[1] y_data_firstexpvisit = np.copy(y_data) # np.savetxt('testing_exp0.txt', list(zip(x_data_firstexpvisit, y_data_firstexpvisit))) # np.savetxt('testing_firstexp.txt', list(zip(template_waves, spec_opt/max(spec_opt)))) return x_data_firstexpvisit, y_data_firstexpvisit, leastsq_res
[docs]def correct_wave_shift_fct_1(meta, orbnum, cmin, cmax, spec_opt, x_data_firstexpvisit, y_data_firstexpvisit, i): x_model = zero_pad_x(x_data_firstexpvisit) y_model = zero_pad_y(y_data_firstexpvisit) x_data = meta.wave_grid[0, int(meta.refpix[orbnum, 1]) + meta.LTV1, cmin:cmax] y_data = spec_opt / max(spec_opt) p0 = [0, 1, 1] leastsq_res = leastsq(residuals2, p0, args=(x_model, y_model, x_data, y_data))[0] if meta.save_refspec_fit_plot or meta.show_refspec_fit_plot: plots.refspec_fit(x_model, y_model, p0, x_data, y_data, leastsq_res, meta, i) wvls = leastsq_res[0] + x_data * leastsq_res[1] #np.savetxt('testing_exp{0}.txt'.format(i), list(zip(x_data, y_data))) return wvls, leastsq_res
[docs]def correct_wave_shift_fct_1_lin(meta, orbnum, cmin, cmax, spec_opt, x_data_firstexpvisit, y_data_firstexpvisit, i): x_model = zero_pad_x(x_data_firstexpvisit) y_model = zero_pad_y(y_data_firstexpvisit) x_data = meta.wave_grid[0, int(meta.refpix[orbnum, 1]) + meta.LTV1, cmin:cmax] y_data = spec_opt / max(spec_opt) p0 = [0, 1] leastsq_res = leastsq(residuals2_lin, p0, args=(x_model, y_model, x_data, y_data))[0] if meta.save_refspec_fit_plot or meta.show_refspec_fit_plot: plots.refspec_fit_lin(x_model, y_model, p0, x_data, y_data, leastsq_res, meta, i) wvls = leastsq_res[0] + x_data # np.savetxt('/home/zieba/Desktop/Projects/Observations/Hubble/KELT11_15255/run_2022-05-25_16-57-51_new/spectra_drift/' + 'testing_exp{0}.txt'.format(i), list(zip(x_data, y_data))) return wvls, leastsq_res
# 30
[docs]def read_fitfiles(meta): """Read in the white or spectroscopic light curve files to fit.""" if meta.s30_fit_white: print("White light curve fit will be performed") files = [meta.workdir / "extracted_lc" / "lc_white.txt"] wvl_table = get_wavelength_table(meta) meta.wavelength_list = np.asarray(wvl_table["wavelength"], dtype=float) elif meta.s30_fit_spec: print("Spectroscopic light curve fit(s) will be performed") spec_dir = meta.workdir / "extracted_sp" if not spec_dir.exists(): raise FileNotFoundError(f"Could not find spectroscopic directory: {spec_dir}") files = sn(list(spec_dir.glob("speclc*.txt"))) if len(files) == 0: raise FileNotFoundError(f"No speclc*.txt files found in {spec_dir}") wvl_table = get_wavelength_table(meta) meta.wavelength_list = np.asarray(wvl_table["wavelength"], dtype=float) print(f"Using spectroscopic directory: {spec_dir}") else: raise ValueError("Either s30_fit_white or s30_fit_spec must be True.") print("Identified file(s) for fitting:", files) meta.nfits = len(files) return files, meta
[docs]def return_free_array(nvisit, fixed_array, tied_array): """Reads in the fit_par.txt and determines which parameters are free.""" free_array = [] for i in range(len(fixed_array)): if fixed_array[i].lower() == 'true' and tied_array[i] == -1: for ii in range(nvisit): free_array.append(False) if fixed_array[i].lower() == 'true' and not tied_array[i] == -1: free_array.append(False) if fixed_array[i].lower() == 'false' and tied_array[i] == -1: free_array.append(True) for ii in range(nvisit-1): free_array.append(False) if fixed_array[i].lower() == 'false' and not tied_array[i] == -1: free_array.append(True) free_array = np.array(free_array) return free_array
[docs]def return_untied_array(nvisit, fixed_array, tied_array): """Reads in the fit_par.txt and determines which parameters are untied.""" untied_array = [] for i in range(len(fixed_array)): if fixed_array[i].lower() == 'true' and tied_array[i] == -1: for ii in range(nvisit): untied_array.append(False) if fixed_array[i].lower() == 'true' and not tied_array[i] == -1: untied_array.append(True) if fixed_array[i].lower() == 'false' and tied_array[i] == -1: untied_array.append(False) for ii in range(nvisit-1): untied_array.append(False) if fixed_array[i].lower() == 'false' and not tied_array[i] == -1: untied_array.append(True) untied_array = np.array(untied_array) return untied_array
[docs]def format_params_for_Model(theta, params, nvisit, fixed_array, tied_array, free_array, untied_array): nvisit = int(nvisit) params_updated = [] # #TODO: Oida? # def repeated(array, index, n_times): # array_new = [] # index_index = 0 # for ii in range(len(array)): # if ii in index: # array_new.append([array[index[index_index]]] * n_times) # else: # array_new.append([array[ii]]) # index_index = index_index + 1 # array_new2 = np.array([item for sublist in array_new for item in sublist]) # return array_new2 #new function that copies parameters of visit 0 in visits>0 if tied=-1.: theta_new = np.copy(theta) params_updated = np.copy(params) params_updated[free_array] = theta_new for i_p in range(0,len(untied_array), nvisit): if not untied_array[i_p]: for ii_p in range(1,nvisit): params_updated[i_p+ii_p] = params_updated[i_p] # print('free_array', free_array) # print('params_updated', params_updated) return np.array(params_updated)
[docs]def computeRMS(data, maxnbins=None, binstep=1, isrmserr=False): """COMPUTE ROOT-MEAN-SQUARE AND STANDARD ERROR OF DATA FOR VARIOUS BIN SIZES Taken from POET: https://github.com/kevin218/POET/blob/master/code/lib/correlated_noise.py """ # data = fit.normresiduals # maxnbin = maximum # of bins # binstep = Bin step size # bin data into multiple bin sizes npts = data.size if maxnbins is None: maxnbins = npts / 10. binsz = np.arange(1, maxnbins + binstep, step=binstep, dtype=int) nbins = np.zeros(binsz.size, dtype=int) rms = np.zeros(binsz.size) rmserr = np.zeros(binsz.size) for i in range(binsz.size): nbins[i] = int(np.floor(data.size / binsz[i])) bindata = np.zeros(nbins[i], dtype=float) # bin data # ADDED INTEGER CONVERSION, mh 01/21/12 for j in range(nbins[i]): bindata[j] = data[j * binsz[i]:(j + 1) * binsz[i]].mean() # get rms rms[i] = np.sqrt(np.mean(bindata ** 2)) rmserr[i] = rms[i] / np.sqrt(2. * nbins[i]) # expected for white noise (WINN 2008, PONT 2006) stderr = (data.std() / np.sqrt(binsz)) * np.sqrt(nbins / (nbins - 1.)) if isrmserr is True: return rms, stderr, binsz, rmserr else: return rms, stderr, binsz
[docs]def format_params_for_sampling(params, meta, fit_par): # FIXME: make sure this works for cases when nvisit>1 nvisit = int(meta.nvisit) fixed_array = np.array(fit_par['fixed']) tied_array = np.array(fit_par['tied']) free_array = [] for i in range(len(fixed_array)): if fixed_array[i].lower() == 'true' and tied_array[i] == -1: for ii in range(nvisit): free_array.append(False) if fixed_array[i].lower() == 'true' and not tied_array[i] == -1: free_array.append(False) if fixed_array[i].lower() == 'false' and tied_array[i] == -1: free_array.append(True) for ii in range(nvisit-1): free_array.append(False) if fixed_array[i].lower() == 'false' and not tied_array[i] == -1: free_array.append(True) free_array = np.array(free_array) return np.array(params[free_array])
[docs]def make_lsq_rprs_txt(vals, errs, idxs, meta): """Save rprs vs wavelength from LSQ.""" wvl_table = get_wavelength_table(meta) out_file = meta.workdir / meta.fitdir / "lsq_res" / "lsq_rprs.txt" rp_idx = np.where(np.array(meta.labels) == "rp")[0][0] rprs_vals_lsq = [vals[ii][idxs[0][rp_idx]] for ii in range(len(vals))] rprs_errs_lsq = [errs[ii][idxs[0][rp_idx]] for ii in range(len(errs))] with out_file.open("w", encoding="utf-8") as f_lsq: header = [ "wavelength", "wave_width", "wave_low", "wave_high", "rprs", "rprs_err", ] print("#" + " ".join(f"{item:<18}" for item in header), file=f_lsq) for row in zip( wvl_table["wavelength"], wvl_table["half_width"], wvl_table["lower_edge"], wvl_table["upper_edge"], rprs_vals_lsq, rprs_errs_lsq, ): print(" ".join(f"{float(item):<18.8e}" for item in row), file=f_lsq)
[docs]def make_rprs_txt(vals, errs_lower, errs_upper, meta, fitter=None, vals_maxL=None): """Save rprs vs wavelength from MCMC or nested sampling. Parameters ---------- vals : list Median/p50 parameter values for each wavelength bin. errs_lower : list Lower 1-sigma errors, i.e. p50 - p16. errs_upper : list Upper 1-sigma errors, i.e. p84 - p50. meta PACMAN metadata object. fitter : str Either "mcmc" or "nested". vals_maxL : list or None Maximum-likelihood parameter values for each wavelength bin. Required for fitter="nested" if you want maxL in nested_rprs.txt. """ if fitter not in ["mcmc", "nested"]: raise ValueError("fitter must be 'mcmc' or 'nested'.") wvl_table = get_wavelength_table(meta) rp_idx = np.where(np.array(meta.labels) == "rp")[0][0] p50 = np.array(vals).T[rp_idx] minus = np.array(errs_lower).T[rp_idx] plus = np.array(errs_upper).T[rp_idx] p16 = p50 - minus p84 = p50 + plus out_file = meta.workdir / meta.fitdir / f"{fitter}_res" / f"{fitter}_rprs.txt" with out_file.open("w", encoding="utf-8") as f_out: if fitter == "mcmc": header = [ "wavelength", "wave_width", "wave_low", "wave_high", "p50", "p16", "p84", "minus", "plus", ] print("#" + " ".join(f"{item:<18}" for item in header), file=f_out) for row in zip( wvl_table["wavelength"], wvl_table["half_width"], wvl_table["lower_edge"], wvl_table["upper_edge"], p50, p16, p84, minus, plus, ): print(" ".join(f"{float(item):<18.8e}" for item in row), file=f_out) elif fitter == "nested": if vals_maxL is None: raise ValueError("vals_maxL must be provided for nested_rprs.txt.") maxL = np.array(vals_maxL).T[rp_idx] header = [ "wavelength", "wave_width", "wave_low", "wave_high", "p50", "p16", "p84", "minus", "plus", "maxL", ] print("#" + " ".join(f"{item:<18}" for item in header), file=f_out) for row in zip( wvl_table["wavelength"], wvl_table["half_width"], wvl_table["lower_edge"], wvl_table["upper_edge"], p50, p16, p84, minus, plus, maxL, ): print(" ".join(f"{float(item):<18.8e}" for item in row), file=f_out)
[docs]def quantile(x, q): return np.percentile(x, [100. * qi for qi in q])
[docs]def weighted_mean(data, err): """Calculates the weighted mean for data points data with std devs. err.""" ind = err != 0.0 weights = 1.0/err[ind]**2 mu = np.sum(data[ind]*weights)/np.sum(weights) var = 1.0/np.sum(weights) return [mu, np.sqrt(var)]
[docs]def create_res_dir(meta): """Creates the result directory depending on which fitters were used.""" #print('H11', meta.workdir) #print('H22', meta.fitdir) lsq_res_dir = meta.workdir / meta.fitdir / 'lsq_res' #print('H33', lsq_res_dir) #print('DOES IT EXIST', lsq_res_dir.exists()) if not lsq_res_dir.exists(): lsq_res_dir.mkdir(parents=True) #print('DOES THIS STILL RUN?') mcmc_res_dir = meta.workdir / meta.fitdir / 'mcmc_res' nested_res_dir = meta.workdir / meta.fitdir / 'nested_res' if meta.run_lsq: if not lsq_res_dir.exists(): lsq_res_dir.mkdir(parents=True) if meta.run_mcmc: if not mcmc_res_dir.exists(): mcmc_res_dir.mkdir(parents=True) if meta.run_nested: if not nested_res_dir.exists(): nested_res_dir.mkdir(parents=True)
[docs]def log_run_setup(meta): """Prepares lists in meta where fit statistics will be saved into.""" meta.rms_pred_list_lsq = [] meta.rms_pred_list_mcmc = [] meta.rms_pred_list_nested = [] meta.rms_list_lsq = [] meta.rms_list_mcmc = [] meta.rms_list_nested = [] meta.chi2_list_lsq = [] meta.chi2_list_mcmc = [] meta.chi2_list_nested = [] meta.chi2red_list_lsq = [] meta.chi2red_list_mcmc = [] meta.chi2red_list_nested = [] meta.bic_list_lsq = [] meta.bic_list_mcmc = [] meta.bic_list_nested = [] # meta.bic_alt_list_lsq = [] # meta.bic_alt_list_mcmc = [] # meta.bic_alt_list_nested = [] meta.ln_like_list_lsq = [] meta.ln_like_list_mcmc = [] meta.ln_like_list_nested = [] if 'uncmulti' in meta.s30_myfuncs or meta.rescale_uncert: meta.chi2_notrescaled_list_lsq = [] meta.chi2_notrescaled_list_mcmc = [] meta.chi2_notrescaled_list_nested = [] meta.chi2red_notrescaled_list_lsq = [] meta.chi2red_notrescaled_list_mcmc = [] meta.chi2red_notrescaled_list_nested = [] meta.bic_notrescaled_list_lsq = [] meta.bic_notrescaled_list_mcmc = [] meta.bic_notrescaled_list_nested = [] # meta.bic_alt_notrescaled_list_lsq = [] # meta.bic_alt_notrescaled_list_mcmc = [] # meta.bic_alt_notrescaled_list_nested = [] meta.ln_like_notrescaled_list_lsq = [] meta.ln_like_notrescaled_list_mcmc = [] meta.ln_like_notrescaled_list_nested = [] if 'uncmulti' in meta.s30_myfuncs: meta.uncmulti_mcmc = [] meta.uncmulti_nested = [] return meta
[docs]def append_fit_output(fit, meta, fitter=None, medians=None): """Appends fit statistics like rms or chi2 into meta lists.""" precision = 3 if fitter == 'lsq': meta.rms_pred_list_lsq.append(round(fit.rms_predicted, precision)) meta.rms_list_lsq.append(round(fit.rms, precision)) meta.chi2_list_lsq.append(round(fit.chi2, precision)) meta.chi2red_list_lsq.append(round(fit.chi2red, precision)) meta.bic_list_lsq.append(round(fit.bic, precision)) # meta.bic_alt_list_lsq.append(round(fit.bic_alt, precision)) meta.ln_like_list_lsq.append(round(fit.ln_like, precision)) if fitter == 'mcmc': meta.rms_pred_list_mcmc.append(round(fit.rms_predicted, precision)) meta.rms_list_mcmc.append(round(fit.rms, precision)) meta.chi2_list_mcmc.append(round(fit.chi2, precision)) meta.chi2red_list_mcmc.append(round(fit.chi2red, precision)) meta.bic_list_mcmc.append(round(fit.bic, precision)) # meta.bic_alt_list_mcmc.append(round(fit.bic_alt, precision)) meta.ln_like_list_mcmc.append(round(fit.ln_like, precision)) if fitter == 'nested': meta.rms_pred_list_nested.append(round(fit.rms_predicted, precision)) meta.rms_list_nested.append(round(fit.rms, precision)) meta.chi2_list_nested.append(round(fit.chi2, precision)) meta.chi2red_list_nested.append(round(fit.chi2red, precision)) meta.bic_list_nested.append(round(fit.bic, precision)) # meta.bic_alt_list_nested.append(round(fit.bic_alt, precision)) meta.ln_like_list_nested.append(round(fit.ln_like, precision)) if ('uncmulti' in meta.s30_myfuncs) or (meta.rescale_uncert): if fitter == 'mcmc': meta.chi2_notrescaled_list_mcmc.append(round(fit.chi2_notrescaled, precision)) meta.chi2red_notrescaled_list_mcmc.append(round(fit.chi2red_notrescaled, precision)) meta.bic_notrescaled_list_mcmc.append(round(fit.bic_notrescaled, precision)) # meta.bic_alt_notrescaled_list_mcmc.append(round(fit.bic_alt_notrescaled, precision)) meta.ln_like_notrescaled_list_mcmc.append(round(fit.ln_like_notrescaled, precision)) if ('uncmulti' in meta.s30_myfuncs): meta.uncmulti_mcmc.append(round(medians[-1], precision)) if fitter == 'nested': meta.chi2_notrescaled_list_nested.append(round(fit.chi2_notrescaled, precision)) meta.chi2red_notrescaled_list_nested.append(round(fit.chi2red_notrescaled, precision)) meta.bic_notrescaled_list_nested.append(round(fit.bic_notrescaled, precision)) # meta.bic_alt_notrescaled_list_nested.append(round(fit.bic_alt_notrescaled, precision)) meta.ln_like_notrescaled_list_nested.append(round(fit.ln_like_notrescaled, precision)) if ('uncmulti' in meta.s30_myfuncs): meta.uncmulti_nested.append(round(medians[-1], precision))
[docs]def save_fit_output(fit, data, meta): """Saves all the fit statistics like rms or chi2 into an astropy table.""" t = Table() t['wave'] = meta.wavelength_list t['wave'].info.format = '.3f' if meta.run_mcmc: t['rms_pred'] = meta.rms_pred_list_mcmc t['rms'] = meta.rms_list_mcmc t['chi2'] = meta.chi2_list_mcmc t['chi2red'] = meta.chi2red_list_mcmc t['bic'] = meta.bic_list_mcmc # t['bic_alt'] = meta.bic_alt_list_mcmc t['ln_like'] = meta.ln_like_list_mcmc if ('uncmulti' in meta.s30_myfuncs) or (meta.rescale_uncert): t['chi2_notrescaled'] = meta.chi2_notrescaled_list_mcmc t['chi2red_notrescaled'] = meta.chi2red_notrescaled_list_mcmc t['bic_notrescaled'] = meta.bic_notrescaled_list_mcmc # t['bic_alt_notrescaled'] = meta.bic_alt_notrescaled_list_mcmc t['ln_like_notrescaled'] = meta.ln_like_notrescaled_list_mcmc if 'uncmulti' in meta.s30_myfuncs: t['uncmulti'] = meta.uncmulti_mcmc if meta.run_nested: t['rms_pred'] = meta.rms_pred_list_nested t['rms'] = meta.rms_list_nested t['chi2'] = meta.chi2_list_nested t['chi2red'] = meta.chi2red_list_nested t['bic'] = meta.bic_list_nested # t['bic_alt'] = meta.bic_alt_list_nested t['ln_like'] = meta.ln_like_list_nested if ('uncmulti' in meta.s30_myfuncs) or (meta.rescale_uncert): t['chi2_notrescaled'] = meta.chi2_notrescaled_list_nested t['chi2red_notrescaled'] = meta.chi2red_notrescaled_list_nested t['bic_notrescaled'] = meta.bic_notrescaled_list_nested # t['bic_alt_notrescaled'] = meta.bic_alt_notrescaled_list_nested t['ln_like_notrescaled'] = meta.ln_like_notrescaled_list_nested if 'uncmulti' in meta.s30_myfuncs: t['uncmulti'] = meta.uncmulti_nested if 'uncmulti' in meta.s30_myfuncs: t['uncmulti2'] = t['uncmulti'] ** 2 t['uncmulti2'].info.format = '.3f' t['npoints'] = [data.npoints] * meta.nfits t['nfree_param'] = [data.nfree_param] * meta.nfits t['dof'] = [data.dof] * meta.nfits ascii.write(t, meta.workdir / meta.fitdir / 'fit_results.txt', format='rst', overwrite=True) print('Saved fit_results.txt file')
[docs]def save_time_averaging_data(binsz, rms, stderr, meta, fitter=None): """Saves the data used to create the time averaging plot (formally known as Allan deviation plot).""" t = Table() t['binsz'] = binsz t['rms'] = rms t['stderr'] = stderr fname = Path(f'{fitter}_res') /\ f'corr_data_bin{meta.s30_file_counter}_wvl{meta.wavelength:0.3f}.txt' ascii.write(t, meta.workdir / meta.fitdir / fname, format='rst', overwrite=True)
# def residuals(params, template_waves, template, spectrum, error): # shift, scale = params # fit = scale*np.interp(template_waves, template_waves-shift, spectrum) # x = (template-fit)/error # return (template-fit)/error # def interpolate_spectrum(spectrum, error, template, template_waves): # p0 = [1., 1.0] #initial guess for parameters shift and scale # plsq, success = leastsq(residuals, p0, args=(template_waves, template, spectrum, error)) # shift, scale = plsq # interp_spectrum = np.interp(template_waves, template_waves-shift, spectrum) # interp_error = np.interp(template_waves, template_waves-shift, error) # return [interp_spectrum, interp_error, shift] # def read_dict(filename): # #with open(filename,"r") as text: # # return dict(line.strip().split() for line in text) # dict = {} # with open(filename,"r") as text: # for line in text: # key, value = line.split() # value = str_to_num(value) # if value == 'True': value = True # if value == 'False': value = False # dict[key] = value # return dict # def getphase(t): # phase = (t - t0)/period # return phase - int(phase)
[docs]def apply_uncmulti(data, params): """Apply visit-specific uncertainty rescaling. If ``uncmulti_val`` is tied with tied = -1, the expanded params vector should contain the same value for every visit. If it is untied, each visit gets its own value. """ if "uncmulti" not in data.s30_myfuncs: return None if "uncmulti_val" not in data.par_order: raise KeyError( "'uncmulti' is in s30_myfuncs, but 'uncmulti_val' is missing " "from fit_par.txt." ) start = data.par_order["uncmulti_val"] * data.nvisit stop = start + data.nvisit uncmulti_vals = np.asarray(params[start:stop], dtype=float) if len(uncmulti_vals) != data.nvisit: raise ValueError( f"Expected {data.nvisit} uncmulti_val values after parameter " f"expansion, but got {len(uncmulti_vals)}." ) err = np.array(data.err_notrescaled, dtype=float, copy=True) for visit in range(data.nvisit): visit_mask = data.vis_num == visit err[visit_mask] *= uncmulti_vals[visit] data.err = err return uncmulti_vals
[docs]def get_wavelength_table(meta): """Return the wavelength table for the current Stage 30 fit.""" if meta.s30_fit_white: wvl_file = meta.workdir / "extracted_lc" / "wvl_table.dat" elif meta.s30_fit_spec: wvl_file = meta.workdir / "extracted_sp" / "wvl_table.dat" else: raise ValueError("Either s30_fit_white or s30_fit_spec must be True.") if not wvl_file.exists(): raise FileNotFoundError(f"Could not find wavelength table: {wvl_file}") return ascii.read(str(wvl_file))
[docs]def get_wavelength_info(meta, file_counter=None): """Return wavelength info dict for the current fit/bin.""" if file_counter is None: file_counter = getattr(meta, "s30_file_counter", 0) table = get_wavelength_table(meta) if meta.s30_fit_white: row = table[0] else: row = table[file_counter] return { "wavelength": float(row["wavelength"]), "half_width": float(row["half_width"]), "lower_edge": float(row["lower_edge"]), "upper_edge": float(row["upper_edge"]), }
[docs]def get_param_value_from_fit_or_fitpar(data, fit, param_name, visit=0): """ Return parameter value from fitted params if available, otherwise fall back to fit_par.txt. """ from .formatter import FormatParams import numpy as np p = FormatParams(fit.params, data) if hasattr(p, param_name): arr = np.atleast_1d(getattr(p, param_name)) if len(arr) > visit: return float(arr[visit]) return float(arr[0]) fit_par = data.fit_par rows = fit_par[fit_par["parameter"] == param_name] if len(rows) == 0: raise ValueError(f"Could not find parameter '{param_name}' in fit_par.") tied = np.array(rows["tied"], dtype=int) values = np.array(rows["value"], dtype=float) # exact visit match first match = np.where(tied == visit)[0] if len(match) > 0: return float(values[match[0]]) # otherwise use shared value match = np.where(tied == -1)[0] if len(match) > 0: return float(values[match[0]]) # final fallback return float(values[0])
[docs]def make_time_model_per_visit(data, pad_hours=0.25, points_per_hour=15): """ Build per-visit model time arrays. Returns a list of time arrays, one per visit. """ segments = [] pad_days = pad_hours / 24.0 for visit in range(data.nvisit): ind = data.vis_num == visit tmin = np.min(data.time[ind]) - pad_days tmax = np.max(data.time[ind]) + pad_days duration_hours = (tmax - tmin) * 24.0 npts = max(50, int(np.ceil(duration_hours * points_per_hour))) segments.append(np.linspace(tmin, tmax, npts)) return segments
[docs]def get_lc_type_label(meta): """Return a short label for the current light-curve type.""" fitdir_str = str(meta.fitdir) if "fit_white" in fitdir_str: return "Wh.-LC" if "fit_spec" in fitdir_str: return "Sp.-LC" return "LC"