import math
from pathlib import Path
from typing import Optional
import numpy as np
import numpy.ma as ma
from astropy.io import ascii, fits
from astropy.table import Table
from scipy.interpolate import interp1d
from scipy.optimize import leastsq
from scipy.signal import find_peaks
from .options import OPTIONS
from .sort_nicely import sort_nicely as sn
from ..lib import plots
# s00
[docs]def readfiles(meta):
"""Reads in the files saved in datadir and saves them into a list.
Parameters
----------
meta :
Metadata object'
Returns
-------
meta :
Metadata object but adds segment_list to metadata containing the
sorted data fits files.
Notes
-----
History:
Written by Sebastian Zieba December 2021
"""
meta.segment_list = []
for fname in meta.datadir.iterdir():
if fname.name.endswith(f'{meta.suffix}.fits'):
meta.segment_list.append(meta.datadir / fname)
return meta
# s02
[docs]def ancil(meta, s10: Optional[bool] = False, s20: Optional[bool] = False):
"""This function loads in a lot of useful arrays and values into meta.
The following additional information are being loading into meta:
- **norbit:** number of orbits
- **nvisit:** number of visits
- **files_sp:** all spectra files
- **files_di:** all direct image files
- **ra:** RA of the target in radians (from the header) (Note: this data is taken from the first spectrum file)
- **dec:** DEC of the target in radians (from the header) (Note: this data is taken from the first spectrum file)
- **coordtable:** a list of files containing the vector information of HST downloaded in s01
Parameters
----------
meta
metadata object
s10: bool
Is set to True when s10 is being performed
s20: bool
Is set to True when s20 is being performed
Returns
-------
meta
metadata object
Notes
-----
History:
Written by Sebastian Zieba December 2021
"""
filelist = ascii.read(meta.workdir / 'filelist.txt')
aa = filelist['iorbit'].data
bb = np.diff(aa)
meta.norbit, meta.nvisit = len(np.insert(aa[1:][np.where(bb != 0)], 0, aa[0])), len(set(filelist['ivisit'].data))
meta.mask_sp = np.array([i[0] for i in filelist['instr']]) == 'G'
meta.mask_di = np.array([i[0] for i in filelist['instr']]) == 'F'
meta.files_sp = [Path(meta.datadir) / i for i in filelist['filenames'][meta.mask_sp].data]
meta.files_di = [Path(meta.datadir) / i for i in filelist['filenames'][meta.mask_di].data]
f = fits.open(meta.files_sp[0])
meta.ra = f[0].header['ra_targ'] * math.pi / 180.0 # stores right ascension
meta.dec = f[0].header['dec_targ'] * math.pi / 180.0 # stores declination
meta.coordtable = [] # table of spacecraft coordinates
for i in range(max(filelist['ivisit']) + 1): meta.coordtable.append(
meta.workdir / 'ancil' / 'horizons' / f'horizons_results_v{i}.txt')
# 03
# TODO: Q: Is it okay if I assume everything in datadir uses same Filter Grism pair?
meta.filter = filelist['instr'][meta.mask_di][0]
meta.grism = filelist['instr'][meta.mask_sp][0]
meta.scans_sp = filelist['scan'][meta.mask_sp].data
meta.iorbit_sp = filelist['iorbit'][meta.mask_sp].data
meta.ivisit_sp = filelist['ivisit'][meta.mask_sp].data
meta.iexp_orb_sp = filelist['iexp_orb'][meta.mask_sp].data
# the following list stores the cumulative orbit number
meta.iorbit_sp_cumulative = np.zeros(len(meta.iorbit_sp), dtype=int)
c = 0
for i in range(len(meta.iorbit_sp) - 1):
if meta.iorbit_sp[i + 1] != meta.iorbit_sp[i]:
c = c + 1
meta.iorbit_sp_cumulative[i + 1] = c
# the following lists store the indices of spectral files where the orbit/visit increases
meta.new_orbit_idx_sp = np.concatenate(([0], np.where(np.diff(meta.iorbit_sp)!=0)[0]+1))
meta.new_visit_idx_sp = np.concatenate(([0], np.where(np.diff(meta.ivisit_sp)!=0)[0]+1))
meta.iorbit_di = filelist['iorbit'][meta.mask_di].data
meta.ivisit_di = filelist['ivisit'][meta.mask_di].data
# the following list stores the cumulative orbit number for the direct images
meta.iorbit_di_cumulative = np.zeros(len(meta.iorbit_di), dtype=int)
c = 0
for i in range(len(meta.iorbit_di) - 1):
if meta.iorbit_di[i + 1] != meta.iorbit_di[i]:
c = c + 1
meta.iorbit_di_cumulative[i + 1] = c
# TODO: this is a problem if there is one DI per visit. It will give us meta.iorbit_di_cumulative == [0,0,0,...]
# TODO: with as many zeros as visits
# TODO: I implemented a quick fix (29th march 2023):
# TODO: i check if there are as many DIs as visits and if the DIs have been taken same time apart.
t_bjd_di_tmp = filelist['t_mjd'][meta.mask_di].data
if (len(meta.iorbit_di) == (max(meta.ivisit_sp)+1)) and (np.all(np.diff(t_bjd_di_tmp)>1/24)):
meta.iorbit_di_cumulative = np.arange(len(meta.iorbit_di), dtype=int)
meta.t_mjd_sp = filelist['t_mjd'][meta.mask_sp].data
meta.t_orbit_sp = filelist['t_orbit'][meta.mask_sp].data
meta.t_visit_sp = filelist['t_visit'][meta.mask_sp].data
meta.platescale = 0.13 # IR detector has plate scale of 0.13 arcsec/pixel
f_sp0 = fits.open(meta.files_sp[0])
meta.POSTARG1_sp = f_sp0[0].header['POSTARG1'] # x-coordinate of the observer requested target offset
meta.POSTARG2_sp = f_sp0[0].header['POSTARG2'] # y-coordinate of the observer requested target offset
meta.LTV1 = int(f_sp0[1].header['LTV1']) # X offset to get into physical pixels
meta.LTV2 = int(f_sp0[1].header['LTV2']) # Y offset to get into physical pixels
meta.subarray_size = f_sp0[1].header['SIZAXIS1'] # size of subarray
f_sp0.close()
f_di0 = fits.open(meta.files_di[0])
meta.POSTARG1_di = f_di0[0].header['POSTARG1'] # x-coordinate of the observer requested target offset
meta.POSTARG2_di = f_di0[0].header['POSTARG2'] # y-coordinate of the observer requested target offset
meta.POSTARG1 = meta.POSTARG1_sp - meta.POSTARG1_di
meta.POSTARG2 = meta.POSTARG2_sp - meta.POSTARG2_di
if meta.grism == 'G102':
meta.BEAMA_i = 41
meta.BEAMA_f = 248
elif meta.grism == 'G141':
meta.BEAMA_i = 15
meta.BEAMA_f = 196
f_di0.close()
if s10:
if 't_bjd' in filelist.keys():
meta.t_bjd_sp = filelist['t_bjd'][meta.mask_sp].data
if 't_bjd' in filelist.keys():
meta.t_bjd_di = filelist['t_bjd'][meta.mask_di].data
if s20:
meta.rdnoise = 22.0 # read noise
if meta.grism == 'G102':
meta.flat = meta.pacmandir / 'data' / 'flats' / 'WFC3.IR.G102.flat.2.fits'
elif meta.grism == 'G141':
meta.flat = meta.pacmandir / 'data' / 'flats' / 'WFC3.IR.G141.flat.2.fits'
return meta
# 03
[docs]def gaussian_kernel(meta, x, y):
"""Performs a gaussian kernel over an array. Used in smoothing of the stellar spectrum.
Taken from: https://stackoverflow.com/questions/24143320/gaussian-sum-filter-for-irregular-spaced-points.
"""
y = y / max(y)
x.sort()
resolution = min(len(x), 3000)
x_eval = np.linspace(min(x), max(x), resolution)
sigma = meta.smooth_sigma * 1e-10 / 5
delta_x = x_eval[:, None] - x
weights = np.exp(-delta_x*delta_x / (2*sigma*sigma)) / (np.sqrt(2*np.pi) * sigma)
weights /= np.sum(weights, axis=1, keepdims=True)
y_eval = np.dot(weights, y)
y_eval = y_eval / max(y_eval)
if meta.save_smooth_plot:
plots.smooth(meta, x, y, x_eval, y_eval)
return x_eval, y_eval
# def gaussian_kernel_old(meta, x, y):
# """
# https://matthew-brett.github.io/teaching/smoothing_intro.html
# """
#
# sigma = meta.smooth_sigma * 1e-10
#
# y = y / max(y)
# y_smoothed = np.zeros(y.shape)
# for x_idx, x_val in tqdm(enumerate(x), desc='smoothing...', total=len(x)):
# kernel = np.exp(-(x - x_val) ** 2 / (2 * sigma ** 2))
# kernel = kernel / sum(kernel)
# y_smoothed[x_idx] = sum(y * kernel)
# y_smoothed = y_smoothed / max(y_smoothed)
#
# if meta.save_smooth_plot:
# plots.smooth(meta, x, y, x, y_smoothed)
#
# return (x, y_smoothed)
# s10
# f.close()
# s20
[docs]def get_wave_grid(meta):
"""Gets grid of wavelength solutions for each orbit and row."""
if meta.grism == 'G102':
from ..lib import geometry102 as geo
elif meta.grism == 'G141':
from ..lib import geometry141 as geo
wave_grid = np.empty((meta.norbit*meta.nvisit, meta.subarray_size, meta.subarray_size))
#calculates wavelength solution row by row for each orbit
for i in range(meta.norbit):
for j in range(meta.subarray_size):
disp_solution = geo.dispersion(meta.refpix[i,1], -meta.LTV2+j)
delx = 0.5 + np.arange(meta.subarray_size) - (meta.refpix[i,2] + meta.LTV1 + meta.POSTARG1/meta.platescale)
wave_grid[i, j, :] = disp_solution[0] + delx*disp_solution[1]
return wave_grid
[docs]def get_flatfield(meta): # function that flatfields a data array D, which starts at [minr, cmin] of hdu[1].data
"""Opens the flat file and uses it for bad pixel masking."""
flat = fits.open(meta.flat) # reads in flatfield cube
WMIN = flat[0].header['WMIN'] # constants for computing flatfield coefficients
WMAX = flat[0].header['WMAX']
#print('flatfield:', WMIN, WMAX)
a0 = flat[0].data[-meta.LTV1:-meta.LTV1+meta.subarray_size, -meta.LTV2:-meta.LTV2+meta.subarray_size]
a1 = flat[1].data[-meta.LTV1:-meta.LTV1+meta.subarray_size, -meta.LTV2:-meta.LTV2+meta.subarray_size]
a2 = flat[2].data[-meta.LTV1:-meta.LTV1+meta.subarray_size, -meta.LTV2:-meta.LTV2+meta.subarray_size]
flatfield = []
for i in range(meta.norbit*meta.nvisit):
wave = meta.wave_grid[i, :]
x = (wave - WMIN)/(WMAX-WMIN)
flatfield.append(a0+a1*x+a2*x**2)
flatfield[i][flatfield[i] < 0.5] = -1. #sets flatfield pixels below 0.5 to -1 so they can be masked
return flatfield
[docs]def peak_finder(array, i, ii, orbnum, meta):
"""Finding peaks in an array."""
# determine the aperture cutout
rowmedian = np.median(array, axis=1) # median of every row
rowmedian_absder = abs(
rowmedian[1:] - rowmedian[:-1]) # absolute derivative in order to determine where the spectrum is
# we use scipy.signal.find_peaks to determine in which rows the spectrum starts and ends
# TODO: Think about better values for height and distance
# TODO: Finding the row with the highest change in flux (compared to row above and below) isnt robust against outliers!
peaks, _ = find_peaks(rowmedian_absder, height=max(rowmedian_absder * 0.3), distance=2)
# peaks = peaks[:2] # only take the two biggest peaks (there should be only 2)
peaks = np.array([min(peaks[:2]), max(peaks[:2]) + 1])
if meta.save_utr_plot or meta.show_utr_plot:
plots.utr(array, meta, i, ii, orbnum, rowmedian, rowmedian_absder, peaks)
return peaks
[docs]def zero_pad_x(array):
# TODO: Try to make this look nicer
array_new = np.concatenate((np.linspace(-5000, min(array)+1, 10, endpoint=True),
array, np.linspace(max(array) + 350, 30000, 10, endpoint=False)))
return array_new
[docs]def zero_pad_y(array):
return np.concatenate((np.zeros(10), array, np.zeros(10)))
[docs]def read_refspec(meta):
"""Reads in the reference spectrum."""
refspec = np.loadtxt(meta.workdir / 'ancil' / 'refspec' / 'refspec.txt').T
x_refspec, y_refspec = refspec[0]*1e10, refspec[1]/max(refspec[1])
x_refspec = zero_pad_x(x_refspec)
y_refspec = zero_pad_y(y_refspec)
return x_refspec, y_refspec
[docs]def residuals2(params, x1, y1, x2, y2):
"""Calculate residuals for leastsq."""
a, b, c = params
x1 = np.array(x1)
x2 = np.array(x2)
y1 = np.array(y1)
y2 = np.array(y2)
f = interp1d(x1, y1, kind='linear')
fit = f(a+b*x2)*c
return fit - y2
[docs]def residuals2_lin(params, x1, y1, x2, y2):
"""Calculate residuals for leastsq."""
a, c = params
x1 = np.array(x1)
x2 = np.array(x2)
y1 = np.array(y1)
y2 = np.array(y2)
f = interp1d(x1, y1, kind='linear')
fit = f(a+x2)*c
return fit - y2
[docs]def correct_wave_shift_fct_0(meta, orbnum, cmin, cmax, spec_opt, i):
"""Use the reference spectrum for the wave cal."""
template_waves = meta.wave_grid[0, int(meta.refpix[orbnum, 1]) + meta.LTV1, cmin:cmax]
g102mask = template_waves > 820 # we dont use the spectrum below 8200 angstrom for the interpolation as the reference bandpass cuts out below this wavelength
x_refspec, y_refspec = read_refspec(meta)
# TODO: will break if optimal extractions isnt used!
#x_data & y_data is the spectrum if no wavelength correction would be performed
x_data = template_waves[g102mask]
y_data = (spec_opt / max(spec_opt))[g102mask]
p0 = [0, 1, 1] # initial guess for least squares
leastsq_res = leastsq(residuals2, p0, args=(x_refspec, y_refspec, x_data, y_data))[0]
#leastsq_res[0] = leastsq_res[0] + 60
if meta.save_refspec_fit_plot or meta.show_refspec_fit_plot:
plots.refspec_fit(x_refspec, y_refspec, p0, x_data, y_data, leastsq_res, meta, i)
# for all other but first exposure in visit exposures
x_data_firstexpvisit = leastsq_res[0] + x_data * leastsq_res[1]
y_data_firstexpvisit = np.copy(y_data)
#np.savetxt('testing_template.txt', list(zip(x_refspec, y_refspec)))
#np.savetxt('testing_exp0.txt', list(zip(x_data, y_data)))
return x_data_firstexpvisit, y_data_firstexpvisit, leastsq_res
[docs]def correct_wave_shift_fct_0_lin(meta, orbnum, cmin, cmax, spec_opt, i):
"""Use the reference spectrum for the wave cal."""
template_waves = meta.wave_grid[0, int(meta.refpix[orbnum, 1]) + meta.LTV1, cmin:cmax]
g102mask = template_waves > 820 # we dont use the spectrum below 8200 angstrom for the interpolation as the reference bandpass cuts out below this wavelength
x_refspec, y_refspec = read_refspec(meta)
# TODO: will break if optimal extractions isnt used!
#x_data & y_data is the spectrum if no wavelength correction would be performed
x_data = template_waves[g102mask]
y_data = (spec_opt / max(spec_opt))[g102mask]
p0 = [0, 1] # initial guess for least squares
leastsq_res = leastsq(residuals2_lin, p0, args=(x_refspec, y_refspec, x_data, y_data))[0]
# leastsq_res[0] = leastsq_res[0] + 60
if meta.save_refspec_fit_plot or meta.show_refspec_fit_plot:
plots.refspec_fit_lin(x_refspec, y_refspec, p0, x_data, y_data, leastsq_res, meta, i)
# for all other but first exposure in visit exposures
x_data_firstexpvisit = leastsq_res[0] + x_data
y_data_firstexpvisit = np.copy(y_data)
# np.savetxt('/home/zieba/Desktop/Projects/Observations/Hubble/KELT11_15255/run_2022-05-25_16-57-51_new/spectra_drift/' + 'testing_template.txt', list(zip(x_refspec_new, y_refspec_new)))
# np.savetxt('/home/zieba/Desktop/Projects/Observations/Hubble/KELT11_15255/run_2022-05-25_16-57-51_new/spectra_drift/' + 'testing_exp0.txt', list(zip(x_data, y_data)))
return x_data_firstexpvisit, y_data_firstexpvisit, leastsq_res
[docs]def correct_wave_shift_fct_00(meta, orbnum, cmin, cmax, spec_opt, i):
"""Use the first exposure in the visit for wave cal."""
template_waves = meta.wave_grid[0, int(meta.refpix[orbnum, 1]) + meta.LTV1, cmin:cmax]
x_refspec, y_refspec = template_waves, spec_opt / max(spec_opt)
# TODO: will break if optimal extractions isnt used!
#x_data & y_data is the spectrum if no wavelength correction would be performed
x_data = template_waves
y_data = (spec_opt / max(spec_opt))
p0 = [0, 1, 1] # initial guess for least squares
leastsq_res = leastsq(residuals2, p0, args=(x_refspec, y_refspec, x_data, y_data))[0]
if meta.save_refspec_fit_plot or meta.show_refspec_fit_plot:
plots.refspec_fit(x_refspec, y_refspec, p0, x_data, y_data, leastsq_res, meta, i)
# for all other but first exposure in visit exposures
x_data_firstexpvisit = leastsq_res[0] + x_data * leastsq_res[1]
y_data_firstexpvisit = np.copy(y_data)
# np.savetxt('testing_exp0.txt', list(zip(x_data_firstexpvisit, y_data_firstexpvisit)))
# np.savetxt('testing_firstexp.txt', list(zip(template_waves, spec_opt/max(spec_opt))))
return x_data_firstexpvisit, y_data_firstexpvisit, leastsq_res
[docs]def correct_wave_shift_fct_1(meta, orbnum, cmin, cmax, spec_opt,
x_data_firstexpvisit, y_data_firstexpvisit, i):
x_model = zero_pad_x(x_data_firstexpvisit)
y_model = zero_pad_y(y_data_firstexpvisit)
x_data = meta.wave_grid[0, int(meta.refpix[orbnum, 1]) + meta.LTV1, cmin:cmax]
y_data = spec_opt / max(spec_opt)
p0 = [0, 1, 1]
leastsq_res = leastsq(residuals2, p0, args=(x_model, y_model, x_data, y_data))[0]
if meta.save_refspec_fit_plot or meta.show_refspec_fit_plot:
plots.refspec_fit(x_model, y_model, p0, x_data, y_data, leastsq_res, meta, i)
wvls = leastsq_res[0] + x_data * leastsq_res[1]
#np.savetxt('testing_exp{0}.txt'.format(i), list(zip(x_data, y_data)))
return wvls, leastsq_res
[docs]def correct_wave_shift_fct_1_lin(meta, orbnum, cmin, cmax, spec_opt,
x_data_firstexpvisit, y_data_firstexpvisit, i):
x_model = zero_pad_x(x_data_firstexpvisit)
y_model = zero_pad_y(y_data_firstexpvisit)
x_data = meta.wave_grid[0, int(meta.refpix[orbnum, 1]) + meta.LTV1, cmin:cmax]
y_data = spec_opt / max(spec_opt)
p0 = [0, 1]
leastsq_res = leastsq(residuals2_lin, p0, args=(x_model, y_model, x_data, y_data))[0]
if meta.save_refspec_fit_plot or meta.show_refspec_fit_plot:
plots.refspec_fit_lin(x_model, y_model, p0, x_data, y_data, leastsq_res, meta, i)
wvls = leastsq_res[0] + x_data
# np.savetxt('/home/zieba/Desktop/Projects/Observations/Hubble/KELT11_15255/run_2022-05-25_16-57-51_new/spectra_drift/' + 'testing_exp{0}.txt'.format(i), list(zip(x_data, y_data)))
return wvls, leastsq_res
# 30
[docs]def read_fitfiles(meta):
"""Read in the files (white or spectroscopic) which will be fitted."""
if meta.s30_fit_white:
print('White light curve fit will be performed')
files = []
if meta.s30_most_recent_s20:
lst_dir = sn((meta.workdir / "extracted_lc").iterdir())
# the following line makes sure that only directories starting with a "2" are considered
# this was implemented after issue #10 was raised (see issue for more info)
# this works because the dates will always start with a "2"
lst_dir_new = [lst_dir_i for lst_dir_i in lst_dir if lst_dir_i.name.startswith("2")]
white_dir = lst_dir_new[-1]
files.append(meta.workdir / "extracted_lc" / white_dir / "lc_white.txt")
print('using most recent s20 run: {0}'.format(white_dir))
else:
files.append(meta.s30_white_file_path)
print('using user set white file: '.format(meta.s30_white_file_path))
if meta.grism == 'G102':
meta.wavelength_list = [1.0]
elif meta.grism == 'G141':
meta.wavelength_list = [1.4]
elif meta.s30_fit_spec:
print('Spectroscopic light curve fit(s) will be performed')
if meta.s30_most_recent_s21:
# find most recent bins directory
lst_dir = np.array([path for path in (meta.workdir / "extracted_sp").iterdir()
if path.is_dir()])
dirs_bool = np.array(['bins' in i.name for i in lst_dir])
lst_dir = lst_dir[dirs_bool]
dirs_times = [i.name[-19:] for i in lst_dir] # There are 19 digits in the typical '%Y-%m-%d_%H-%M-%S' format
# sort the times
times_sorted = sn(dirs_times)
# most recent time
recent_time = times_sorted[-1]
idx = 0
for i in range(len(lst_dir)):
if lst_dir[i].name[-19:] == recent_time:
idx = i
spec_dir_full = lst_dir[idx]
#spec_dir_full = meta.workdir + "/extracted_sp/" + spec_dir
files = sn(spec_dir_full.glob("*.txt"))
print(f'using most recent s21 run: {spec_dir_full}')
else:
spec_dir_full = meta.s30_spec_dir_path
files = sn(spec_dir_full.glob("*.txt"))
print('using user set spectroscopic directory: '.format(meta.s30_spec_dir_path))
spec_dir_wvl_file = spec_dir_full / 'wvl_table.dat'
meta.wavelength_list = ascii.read(spec_dir_wvl_file)['wavelengths']
else:
print('Neither s30_fit_white nor s30_fit_spec are True!')
print('Identified file(s) for fitting:', files)
meta.nfits = len(files)
return files, meta
[docs]def return_free_array(nvisit, fixed_array, tied_array):
"""Reads in the fit_par.txt and determines which parameters are free."""
free_array = []
for i in range(len(fixed_array)):
if fixed_array[i].lower() == 'true' and tied_array[i] == -1:
for ii in range(nvisit):
free_array.append(False)
if fixed_array[i].lower() == 'true' and not tied_array[i] == -1:
free_array.append(False)
if fixed_array[i].lower() == 'false' and tied_array[i] == -1:
free_array.append(True)
for ii in range(nvisit-1):
free_array.append(False)
if fixed_array[i].lower() == 'false' and not tied_array[i] == -1:
free_array.append(True)
free_array = np.array(free_array)
return free_array
[docs]def computeRMS(data, maxnbins=None, binstep=1, isrmserr=False):
"""COMPUTE ROOT-MEAN-SQUARE AND STANDARD ERROR OF DATA FOR VARIOUS BIN SIZES
Taken from POET: https://github.com/kevin218/POET/blob/master/code/lib/correlated_noise.py
"""
# data = fit.normresiduals
# maxnbin = maximum # of bins
# binstep = Bin step size
# bin data into multiple bin sizes
npts = data.size
if maxnbins is None:
maxnbins = npts / 10.
binsz = np.arange(1, maxnbins + binstep, step=binstep, dtype=int)
nbins = np.zeros(binsz.size, dtype=int)
rms = np.zeros(binsz.size)
rmserr = np.zeros(binsz.size)
for i in range(binsz.size):
nbins[i] = int(np.floor(data.size / binsz[i]))
bindata = np.zeros(nbins[i], dtype=float)
# bin data
# ADDED INTEGER CONVERSION, mh 01/21/12
for j in range(nbins[i]):
bindata[j] = data[j * binsz[i]:(j + 1) * binsz[i]].mean()
# get rms
rms[i] = np.sqrt(np.mean(bindata ** 2))
rmserr[i] = rms[i] / np.sqrt(2. * nbins[i])
# expected for white noise (WINN 2008, PONT 2006)
stderr = (data.std() / np.sqrt(binsz)) * np.sqrt(nbins / (nbins - 1.))
if isrmserr is True:
return rms, stderr, binsz, rmserr
else:
return rms, stderr, binsz
[docs]def make_lsq_rprs_txt(vals, errs, idxs, meta):
"""Saves the rprs vs wvl as a txt file as resulting from the lsq."""
with (meta.workdir / meta.fitdir / 'lsq_res'/ "lsq_rprs.txt")\
.open('w', encoding=OPTIONS["encoding"]) as f_lsq:
rp_idx = np.where(np.array(meta.labels) == 'rp')[0][0]
rprs_vals_lsq = [vals[ii][idxs[0][rp_idx]] for ii in range(len(vals))]
rprs_errs_lsq = [errs[ii][idxs[0][rp_idx]] for ii in range(len(errs))]
file_header = ['wavelength (micron)', 'rprs', 'rprs_err']
print("#{: <24} {: <25} {: <25}".format(*file_header), file=f_lsq)
for row in zip(meta.wavelength_list, rprs_vals_lsq, rprs_errs_lsq):
print("{: <25} {: <25} {: <25}".format(*row), file=f_lsq)
[docs]def make_rprs_txt(vals, errs_lower, errs_upper, meta, fitter=None):
"""Saves the rprs vs wvl as a txt file as resulting from the sampler."""
rp_idx = np.where(np.array(meta.labels) == 'rp')[0][0]
medians = np.array(vals).T[rp_idx]
errors_lower = np.array(errs_lower).T[rp_idx]
errors_upper = np.array(errs_upper).T[rp_idx]
if fitter == 'mcmc':
with (meta.workdir / meta.fitdir / 'mcmc_res' / "mcmc_rprs.txt")\
.open('w', encoding=OPTIONS["encoding"]) as f_mcmc:
file_header = ['wavelength (micron)', 'rprs', 'rprs_err_lower', 'rprs_err_upper']
print("#{: <24} {: <25} {: <25} {: <25}".format(*file_header), file=f_mcmc)
for row in zip(meta.wavelength_list, medians, errors_lower, errors_upper):
print("{: <25} {: <25} {: <25} {: <25}".format(*row), file=f_mcmc)
if fitter == 'nested':
with (meta.workdir / meta.fitdir / 'nested_res' / "nested_rprs.txt")\
.open('w', encoding=OPTIONS["encoding"]) as f_nested:
file_header = ['wavelength (micron)', 'rprs', 'rprs_err_lower', 'rprs_err_upper']
print("#{: <24} {: <25} {: <25} {: <25}".format(*file_header), file=f_nested)
for row in zip(meta.wavelength_list, medians, errors_lower, errors_upper):
print("{: <25} {: <25} {: <25} {: <25}".format(*row), file=f_nested)
[docs]def quantile(x, q):
return np.percentile(x, [100. * qi for qi in q])
[docs]def weighted_mean(data, err):
"""Calculates the weighted mean for data points data with std devs. err."""
ind = err != 0.0
weights = 1.0/err[ind]**2
mu = np.sum(data[ind]*weights)/np.sum(weights)
var = 1.0/np.sum(weights)
return [mu, np.sqrt(var)]
[docs]def create_res_dir(meta):
"""Creates the result directory depending on which fitters were used."""
print('H11', meta.workdir)
print('H22', meta.fitdir)
lsq_res_dir = meta.workdir / meta.fitdir / 'lsq_res'
print('H33', lsq_res_dir)
print('DOES IT EXIST', lsq_res_dir.exists())
if not lsq_res_dir.exists():
lsq_res_dir.mkdir(parents=True)
print('DOES THIS STILL RUN?')
mcmc_res_dir = meta.workdir / meta.fitdir / 'mcmc_res'
nested_res_dir = meta.workdir / meta.fitdir / 'nested_res'
if meta.run_lsq:
if not lsq_res_dir.exists():
lsq_res_dir.mkdir(parents=True)
if meta.run_mcmc:
if not mcmc_res_dir.exists():
mcmc_res_dir.mkdir(parents=True)
if meta.run_nested:
if not nested_res_dir.exists():
nested_res_dir.mkdir(parents=True)
[docs]def log_run_setup(meta):
"""Prepares lists in meta where fit statistics will be saved into."""
meta.rms_pred_list_lsq = []
meta.rms_pred_list_mcmc = []
meta.rms_pred_list_nested = []
meta.rms_list_lsq = []
meta.rms_list_mcmc = []
meta.rms_list_nested = []
meta.chi2_list_lsq = []
meta.chi2_list_mcmc = []
meta.chi2_list_nested = []
meta.chi2red_list_lsq = []
meta.chi2red_list_mcmc = []
meta.chi2red_list_nested = []
meta.bic_list_lsq = []
meta.bic_list_mcmc = []
meta.bic_list_nested = []
# meta.bic_alt_list_lsq = []
# meta.bic_alt_list_mcmc = []
# meta.bic_alt_list_nested = []
meta.ln_like_list_lsq = []
meta.ln_like_list_mcmc = []
meta.ln_like_list_nested = []
if 'uncmulti' in meta.s30_myfuncs or meta.rescale_uncert:
meta.chi2_notrescaled_list_lsq = []
meta.chi2_notrescaled_list_mcmc = []
meta.chi2_notrescaled_list_nested = []
meta.chi2red_notrescaled_list_lsq = []
meta.chi2red_notrescaled_list_mcmc = []
meta.chi2red_notrescaled_list_nested = []
meta.bic_notrescaled_list_lsq = []
meta.bic_notrescaled_list_mcmc = []
meta.bic_notrescaled_list_nested = []
# meta.bic_alt_notrescaled_list_lsq = []
# meta.bic_alt_notrescaled_list_mcmc = []
# meta.bic_alt_notrescaled_list_nested = []
meta.ln_like_notrescaled_list_lsq = []
meta.ln_like_notrescaled_list_mcmc = []
meta.ln_like_notrescaled_list_nested = []
if 'uncmulti' in meta.s30_myfuncs:
meta.uncmulti_mcmc = []
meta.uncmulti_nested = []
return meta
[docs]def append_fit_output(fit, meta, fitter=None, medians=None):
"""Appends fit statistics like rms or chi2 into meta lists."""
precision = 3
if fitter == 'lsq':
meta.rms_pred_list_lsq.append(round(fit.rms_predicted, precision))
meta.rms_list_lsq.append(round(fit.rms, precision))
meta.chi2_list_lsq.append(round(fit.chi2, precision))
meta.chi2red_list_lsq.append(round(fit.chi2red, precision))
meta.bic_list_lsq.append(round(fit.bic, precision))
# meta.bic_alt_list_lsq.append(round(fit.bic_alt, precision))
meta.ln_like_list_lsq.append(round(fit.ln_like, precision))
if fitter == 'mcmc':
meta.rms_pred_list_mcmc.append(round(fit.rms_predicted, precision))
meta.rms_list_mcmc.append(round(fit.rms, precision))
meta.chi2_list_mcmc.append(round(fit.chi2, precision))
meta.chi2red_list_mcmc.append(round(fit.chi2red, precision))
meta.bic_list_mcmc.append(round(fit.bic, precision))
# meta.bic_alt_list_mcmc.append(round(fit.bic_alt, precision))
meta.ln_like_list_mcmc.append(round(fit.ln_like, precision))
if fitter == 'nested':
meta.rms_pred_list_nested.append(round(fit.rms_predicted, precision))
meta.rms_list_nested.append(round(fit.rms, precision))
meta.chi2_list_nested.append(round(fit.chi2, precision))
meta.chi2red_list_nested.append(round(fit.chi2red, precision))
meta.bic_list_nested.append(round(fit.bic, precision))
# meta.bic_alt_list_nested.append(round(fit.bic_alt, precision))
meta.ln_like_list_nested.append(round(fit.ln_like, precision))
if ('uncmulti' in meta.s30_myfuncs) or (meta.rescale_uncert):
if fitter == 'mcmc':
meta.chi2_notrescaled_list_mcmc.append(round(fit.chi2_notrescaled, precision))
meta.chi2red_notrescaled_list_mcmc.append(round(fit.chi2red_notrescaled, precision))
meta.bic_notrescaled_list_mcmc.append(round(fit.bic_notrescaled, precision))
# meta.bic_alt_notrescaled_list_mcmc.append(round(fit.bic_alt_notrescaled, precision))
meta.ln_like_notrescaled_list_mcmc.append(round(fit.ln_like_notrescaled, precision))
if ('uncmulti' in meta.s30_myfuncs):
meta.uncmulti_mcmc.append(round(medians[-1], precision))
if fitter == 'nested':
meta.chi2_notrescaled_list_nested.append(round(fit.chi2_notrescaled, precision))
meta.chi2red_notrescaled_list_nested.append(round(fit.chi2red_notrescaled, precision))
meta.bic_notrescaled_list_nested.append(round(fit.bic_notrescaled, precision))
# meta.bic_alt_notrescaled_list_nested.append(round(fit.bic_alt_notrescaled, precision))
meta.ln_like_notrescaled_list_nested.append(round(fit.ln_like_notrescaled, precision))
if ('uncmulti' in meta.s30_myfuncs):
meta.uncmulti_nested.append(round(medians[-1], precision))
[docs]def save_fit_output(fit, data, meta):
"""Saves all the fit statistics like rms or chi2 into an astropy table."""
t = Table()
t['wave'] = meta.wavelength_list
t['wave'].info.format = '.3f'
if meta.run_mcmc:
t['rms_pred'] = meta.rms_pred_list_mcmc
t['rms'] = meta.rms_list_mcmc
t['chi2'] = meta.chi2_list_mcmc
t['chi2red'] = meta.chi2red_list_mcmc
t['bic'] = meta.bic_list_mcmc
# t['bic_alt'] = meta.bic_alt_list_mcmc
t['ln_like'] = meta.ln_like_list_mcmc
if ('uncmulti' in meta.s30_myfuncs) or (meta.rescale_uncert):
t['chi2_notrescaled'] = meta.chi2_notrescaled_list_mcmc
t['chi2red_notrescaled'] = meta.chi2red_notrescaled_list_mcmc
t['bic_notrescaled'] = meta.bic_notrescaled_list_mcmc
# t['bic_alt_notrescaled'] = meta.bic_alt_notrescaled_list_mcmc
t['ln_like_notrescaled'] = meta.ln_like_notrescaled_list_mcmc
if 'uncmulti' in meta.s30_myfuncs:
t['uncmulti'] = meta.uncmulti_mcmc
if meta.run_nested:
t['rms_pred'] = meta.rms_pred_list_nested
t['rms'] = meta.rms_list_nested
t['chi2'] = meta.chi2_list_nested
t['chi2red'] = meta.chi2red_list_nested
t['bic'] = meta.bic_list_nested
# t['bic_alt'] = meta.bic_alt_list_nested
t['ln_like'] = meta.ln_like_list_nested
if ('uncmulti' in meta.s30_myfuncs) or (meta.rescale_uncert):
t['chi2_notrescaled'] = meta.chi2_notrescaled_list_nested
t['chi2red_notrescaled'] = meta.chi2red_notrescaled_list_nested
t['bic_notrescaled'] = meta.bic_notrescaled_list_nested
# t['bic_alt_notrescaled'] = meta.bic_alt_notrescaled_list_nested
t['ln_like_notrescaled'] = meta.ln_like_notrescaled_list_nested
if 'uncmulti' in meta.s30_myfuncs:
t['uncmulti'] = meta.uncmulti_nested
if 'uncmulti' in meta.s30_myfuncs:
t['uncmulti2'] = t['uncmulti'] ** 2
t['uncmulti2'].info.format = '.3f'
t['npoints'] = [data.npoints] * meta.nfits
t['nfree_param'] = [data.nfree_param] * meta.nfits
t['dof'] = [data.dof] * meta.nfits
ascii.write(t, meta.workdir / meta.fitdir / 'fit_results.txt',
format='rst', overwrite=True)
print('Saved fit_results.txt file')
[docs]def save_allandata(binsz, rms, stderr, meta, fitter=None):
"""Saves the data used to create the Allan deviation plot."""
t = Table()
t['binsz'] = binsz
t['rms'] = rms
t['stderr'] = stderr
fname = Path(f'{fitter}_res') /\
f'corr_data_bin{meta.s30_file_counter}_wvl{meta.wavelength:0.3f}.txt'
ascii.write(t, meta.workdir / meta.fitdir / fname, format='rst', overwrite=True)
# def residuals(params, template_waves, template, spectrum, error):
# shift, scale = params
# fit = scale*np.interp(template_waves, template_waves-shift, spectrum)
# x = (template-fit)/error
# return (template-fit)/error
# def interpolate_spectrum(spectrum, error, template, template_waves):
# p0 = [1., 1.0] #initial guess for parameters shift and scale
# plsq, success = leastsq(residuals, p0, args=(template_waves, template, spectrum, error))
# shift, scale = plsq
# interp_spectrum = np.interp(template_waves, template_waves-shift, spectrum)
# interp_error = np.interp(template_waves, template_waves-shift, error)
# return [interp_spectrum, interp_error, shift]
# def read_dict(filename):
# #with open(filename,"r") as text:
# # return dict(line.strip().split() for line in text)
# dict = {}
# with open(filename,"r") as text:
# for line in text:
# key, value = line.split()
# value = str_to_num(value)
# if value == 'True': value = True
# if value == 'False': value = False
# dict[key] = value
# return dict
# def getphase(t):
# phase = (t - t0)/period
# return phase - int(phase)