Source code for hasasia.sensitivity

# -*- coding: utf-8 -*-
from __future__ import print_function
"""Main module."""
import numpy as np
import itertools as it
import scipy.stats as sps
import scipy.linalg as sl
import os, pickle
from astropy import units as u

import hasasia
from .utils import create_design_matrix

current_path = os.path.abspath(hasasia.__path__[0])
sc_dir = os.path.join(current_path,'sensitivity_curves/')

__all__ =['GWBSensitivityCurve',
          'DeterSensitivityCurve',
          'Pulsar',
          'Spectrum',
          'R_matrix',
          'G_matrix',
          'get_Tf',
          'get_NcalInv',
          'resid_response',
          'HellingsDownsCoeff',
          'get_Tspan',
          'get_TspanIJ',
          'corr_from_psd',
          'quantize_fast',
          'red_noise_powerlaw',
          'Agwb_from_Seff_plaw',
          'PI_hc',
          'nanograv_11yr_stoch',
          'nanograv_11yr_deter',
          ]

## Some constants
yr_sec = 365.25*24*3600
fyr = 1/yr_sec

[docs]def R_matrix(designmatrix, N): """ Create R matrix as defined in Ellis et al (2013) and Demorest et al (2012) Parameters ---------- designmatrix : array Design matrix of timing model. N : array TOA uncertainties [s] Returns ------- R matrix """ M = designmatrix n,m = M.shape L = np.linalg.cholesky(N) Linv = np.linalg.inv(L) U,s,_ = np.linalg.svd(np.matmul(Linv,M), full_matrices=True) Id = np.eye(M.shape[0]) S = np.zeros_like(M) S[:m,:m] = np.diag(s) inner = np.linalg.inv(np.matmul(S.T,S)) outer = np.matmul(S,np.matmul(inner,S.T)) return Id - np.matmul(L,np.matmul(np.matmul(U,outer),np.matmul(U.T,Linv)))
[docs]def G_matrix(designmatrix): """ Create G matrix as defined in van Haasteren 2013 Parameters ---------- designmatrix : array Design matrix for a pulsar timing model. Returns ------- G matrix """ M = designmatrix n , m = M.shape U, _ , _ = np.linalg.svd(M, full_matrices=True) return U[:,m:]
[docs]def get_Tf(designmatrix, toas, N=None, nf=200, fmin=None, fmax=2e-7, freqs=None, exact_astro_freqs = False, from_G=True, twofreqs=False, Gmatrix=None): """ Calculate the transmission function for a given pulsar design matrix, TOAs and TOA errors. Parameters ---------- designmatrix : array Design matrix for a pulsar timing model, N_TOA x N_param. toas : array Times-of-arrival for pulsar, N_TOA long. N : array Covariance matrix for pulsar time-of-arrivals, N_TOA x N_TOA. Often just a diagonal matrix of inverse TOA errors squared. nf : int, optional Number of frequencies at which to calculate transmission function. fmin : float, optional Minimum frequency at which to calculate transmission function. fmax : float, optional Maximum frequency at which to calculate transmission function. exact_astro_freqs : bool, optional Whether to use exact 1/year and 2/year frequency values in calculation. from_G : bool, optional Whether to use G matrix for transmission function calculate. If False R-matrix is used. twofreqs : bool, optional Whether to calculate a two frequency transmission function. Gmatrix : ndarray, optional Provide already calculated G-matrix. This can speed up calculations since the singular value decomposition can take time for large matrices. """ if not from_G and N is None: err_msg = 'Covariance Matrix must be provided if constructing' err_msg += ' from R-matrix.' raise ValueError(err_msg) M = designmatrix N_TOA = M.shape[0] ## Prep Correlation t1, t2 = np.meshgrid(toas, toas) tm = np.abs(t1-t2) # make filter T = toas.max()-toas.min() f0 = 1 / T if freqs is None: if fmin is None: fmin = f0/5 ff = np.logspace(np.log10(fmin), np.log10(fmax), nf,dtype='float128') if exact_astro_freqs: ff = np.sort(np.append(ff,[fyr,2*fyr])) nf +=2 else: nf = len(freqs) ff = freqs Tmat = np.zeros(nf, dtype='float64') if from_G: if Gmatrix is None: G = G_matrix(M) else: G = Gmatrix m = G.shape[1] Gtilde = np.zeros((ff.size,G.shape[1]),dtype='complex128') Gtilde = np.dot(np.exp(1j*2*np.pi*ff[:,np.newaxis]*toas),G) Tmat = np.matmul(np.conjugate(Gtilde),Gtilde.T)/N_TOA if twofreqs: Tmat = np.real(Tmat) else: Tmat = np.real(np.diag(Tmat)) else: R = R_matrix(M, N) for ct, f in enumerate(ff): Tmat[ct] = np.real(np.sum(np.exp(1j*2*np.pi*f*tm)*R)/N_TOA) return np.real(Tmat), ff, T
[docs]def get_NcalInv(psr, nf=200, fmin=None, fmax=2e-7, freqs=None, exact_yr_freqs = False, full_matrix=False, return_Gtilde_Ncal=False, tm_fit=True, Gmatrix=None): r""" Calculate the inverse-noise-wieghted transmission function for a given pulsar. This calculates :math:`\mathcal{N}^{-1}(f,f') , \; \mathcal{N}^{-1}(f)` in `[1]`_, see Equations (19-20). .. _[1]: https://arxiv.org/abs/1907.04341 Parameters ---------- psr : array Pulsar object. nf : int, optional Number of frequencies at which to calculate transmission function. fmin : float, optional Minimum frequency at which to calculate transmission function. fmax : float, optional Maximum frequency at which to calculate transmission function. exact_yr_freqs : bool, optional Whether to use exact 1/year and 2/year frequency values in calculation. full_matrix : bool, optional Whether to return the full, two frequency NcalInv. return_Gtilde_Ncal : bool, optional Whether to return Gtilde and Ncal. Gtilde is the Fourier transform of the G-matrix. tm_fit : bool, optional Whether to include the timing model fit in the calculation. Gmatrix : ndarray, optional Provide already calculated G-matrix. This can speed up calculations since the singular value decomposition can take time for large matrices. Returns ------- inverse-noise-weighted transmission function """ toas = psr.toas # make filter T = toas.max()-toas.min() f0 = 1 / T if freqs is None: if fmin is None: fmin = f0/5 ff = np.logspace(np.log10(fmin), np.log10(fmax), nf,dtype='float128') if exact_yr_freqs: ff = np.sort(np.append(ff,[fyr,2*fyr])) nf +=2 else: nf = len(freqs) ff = freqs if tm_fit: if Gmatrix is None: G = G_matrix(psr.designmatrix) else: G = Gmatrix else: G = np.eye(toas.size) Gtilde = np.zeros((ff.size,G.shape[1]),dtype='complex128') #N_freqs x N_TOA-N_par # Note we do not include factors of NTOA or Timespan as they cancel # with the definition of Ncal Gtilde = np.dot(np.exp(1j*2*np.pi*ff[:,np.newaxis]*toas),G) # N_freq x N_TOA-N_par Ncal = np.matmul(G.T,np.matmul(psr.N,G)) #N_TOA-N_par x N_TOA-N_par NcalInv = np.linalg.inv(Ncal) #N_TOA-N_par x N_TOA-N_par TfN = np.matmul(np.conjugate(Gtilde),np.matmul(NcalInv,Gtilde.T)) / 2 if return_Gtilde_Ncal: return np.real(TfN), Gtilde, Ncal elif full_matrix: return np.real(TfN) else: return np.real(np.diag(TfN)) / get_Tspan([psr])
[docs]def resid_response(freqs): r""" Returns the timing residual response function for a pulsar across as set of frequencies. See Equation (53) in `[1]`_. .. math:: \mathcal{R}(f)=\frac{1}{12\pi^2\;f^2} .. _[1]: https://arxiv.org/abs/1907.04341 """ return 1/(12 * np.pi**2 * freqs**2)
[docs]class Pulsar(object): """ Class to encode information about individual pulsars. Parameters ---------- toas : array Pulsar Times of Arrival [sec]. toaerrs : array Pulsar TOA errors [sec]. phi : float Ecliptic longitude of pulsar [rad]. theta : float Ecliptic latitude of pulsar [rad]. designmatrix : array Design matrix for pulsar's timing model. N_TOA x N_param. N : array Covariance matrix for the pulsar. N_TOA x N_TOA. Made from toaerrs if not provided. pdist : astropy.quantity, float Earth-pulsar distance. Default units is kpc. """ def __init__(self, toas, toaerrs, phi=None, theta=None, designmatrix=None, N=None, pdist=1.0*u.kpc): self.toas = toas self.toaerrs = toaerrs self.phi = phi self.theta = theta self.pdist = make_quant(pdist,'kpc') if N is None: self.N = np.diag(toaerrs**2) #N ==> weights else: self.N = N if designmatrix is None: self.designmatrix = create_design_matrix(toas, RADEC=True, PROPER=True, PX=True) else: self.designmatrix = designmatrix @property def G(self): """Inverse Noise Weighted Transmission Function.""" if not hasattr(self, '_G'): self._G = G_matrix(designmatrix=self.designmatrix) return self._G
[docs]class Spectrum(object): """Class to encode the spectral information for a single pulsar. Parameters ---------- psr : `hasasia.Pulsar` A `hasasia.Pulsar` instance. nf : int, optional Number of frequencies over which to build the various spectral densities. fmin : float, optional [Hz] Minimum frequency over which to build the various spectral densities. Defaults to the timespan/5 of the pulsar. fmax : float, optional [Hz] Minimum frequency over which to build the various spectral densities. freqs : array, optional [Hz] Optionally supply an array of frequencies over which to build the various spectral densities. """ def __init__(self, psr, nf=400, fmin=None, fmax=2e-7, freqs=None, tm_fit=True, **Tf_kwargs): self._H_0 = 72 * u.km / u.s / u.Mpc self.toas = psr.toas self.toaerrs = psr.toaerrs self.phi = psr.phi self.theta = psr.theta self.N = psr.N self.G = psr.G self.designmatrix = psr.designmatrix self.pdist = psr.pdist self.tm_fit = tm_fit self.Tf_kwargs = Tf_kwargs if freqs is None: f0 = 1 / get_Tspan([psr]) if fmin is None: fmin = f0/5 self.freqs = np.logspace(np.log10(fmin), np.log10(fmax), nf) else: self.freqs = freqs self._psd_prefit = np.zeros_like(self.freqs) @property def psd_postfit(self): """Postfit Residual Power Spectral Density""" if not hasattr(self, '_psd_postfit'): self._psd_postfit = self.psd_prefit * self.NcalInv return self._psd_postfit @property def psd_prefit(self): """Prefit Residual Power Spectral Density""" if np.all(self._psd_prefit==0): raise ValueError('Must set Prefit Residual Power Spectral Density.') # print('No Prefit Residual Power Spectral Density set.\n' # 'Setting psd_prefit to harmonic mean of toaerrs.') # sigma = sps.hmean(self.toaerrs) # dt = 14*24*3600 # 2 Week Cadence # self.add_white_noise_pow(sigma=sigma,dt=dt) return self._psd_prefit @property def Tf(self): if not hasattr(self, '_Tf'): self._Tf,_,_ = get_Tf(designmatrix=self.designmatrix, toas=self.toas, N=self.N, freqs=self.freqs, from_G=True, Gmatrix=self.G, **self.Tf_kwargs) return self._Tf @property def NcalInv(self): """Inverse Noise Weighted Transmission Function.""" if not hasattr(self, '_NcalInv'): self._NcalInv = get_NcalInv(psr=self, freqs=self.freqs, tm_fit=self.tm_fit, Gmatrix=self.G) return self._NcalInv @property def P_n(self): """Inverse Noise Weighted Transmission Function.""" if not hasattr(self, '_P_n'): self._P_n = np.power(get_NcalInv(psr=self, freqs=self.freqs, tm_fit=False), -1) return self._P_n @property def S_I(self): r"""Strain power sensitivity for this pulsar. Equation (74) in `[1]`_ .. math:: S_I=\frac{1}{\mathcal{N}^{-1}\;\mathcal{R}} .. _[1]: https://arxiv.org/abs/1907.04341 """ if not hasattr(self, '_S_I'): self._S_I = 1/resid_response(self.freqs)/self.NcalInv return self._S_I @property def S_R(self): r"""Residual power sensitivity for this pulsar. .. math:: S_R=\frac{1}{\mathcal{N}^{-1}} """ if not hasattr(self, '_S_R'): self._S_R = 1/self.NcalInv return self._S_R @property def h_c(self): r"""Characteristic strain sensitivity for this pulsar. .. math:: h_c=\sqrt{f\;S_I} """ if not hasattr(self, '_h_c'): self._h_c = np.sqrt(self.freqs * self.S_I) return self._h_c @property def Omega_gw(self): r"""Energy Density sensitivity. .. math:: \Omega_{gw}=\frac{2\pi^2}{3\;H_0^2}f^3\;S_I """ self._Omega_gw = ((2*np.pi**2/3) * self.freqs**3 * self.S_I / self._H_0.to('Hz').value**2) return self._Omega_gw
[docs] def add_white_noise_power(self, sigma=None, dt=None, vals=False): r""" Add power law red noise to the prefit residual power spectral density. **Note:** All noise information is furnished by the covariance matrix in the `hasasia.Pulsar` object, this is simply useful for bookkeeping and plots. Parameters ---------- sigma : float TOA error. dt : float Time between observing epochs in [seconds]. vals : bool Whether to return the psd values as an array. Otherwise just added to `self.psd_prefit`. """ white_noise = 2.0 * dt * (sigma)**2 * np.ones_like(self.freqs) self._psd_prefit += white_noise if vals: return white_noise
[docs] def add_red_noise_power(self, A=None, gamma=None, vals=False): r""" Add power law red noise to the prefit residual power spectral density. As :math:`P=A^2(f/fyr)^{-\gamma}`. **Note:** All noise information is furnished by the covariance matrix in the `hasasia.Pulsar` object, this is simply useful for bookkeeping and plots. Parameters ---------- A : float Amplitude of red noise. gamma : float Spectral index of red noise powerlaw. vals : bool Whether to return the psd values as an array. Otherwise just added to `self.psd_prefit`. """ ff = self.freqs red_noise = A**2*(ff/fyr)**(-gamma)/(12*np.pi**2) * yr_sec**3 self._psd_prefit += red_noise if vals: return red_noise
[docs] def add_noise_power(self,noise): r"""Add any spectrum of noise. Must match length of frequency array. **Note:** All noise information is furnished by the covariance matrix in the `hasasia.Pulsar` object, this is simply useful for bookkeeping and plots. """ self._psd_prefit += noise
class SensitivityCurve(object): r""" Base class for constructing PTA sensitivity curves. Takes a list of `hasasia.Spectrum` objects as input. """ def __init__(self, spectra): if not isinstance(spectra, list): raise ValueError('Must provide list of spectra!!') self._H_0 = 72 * u.km / u.s / u.Mpc self.Npsrs = len(spectra) self.phis = np.array([p.phi for p in spectra]) self.thetas = np.array([p.theta for p in spectra]) self.Tspan = get_Tspan(spectra) # f0 = 1 / self.Tspan # if fmin is None: # fmin = f0/5 #Check to see if all frequencies are equal. freq_check = [sp.freqs for sp in spectra] if np.all(freq_check == spectra[0].freqs): self.freqs = spectra[0].freqs else: raise ValueError('All frequency arrays must match for sensitivity' ' curve calculation!!') self.SnI = np.array([sp.S_I for sp in spectra]) def to_pickle(self, filepath): self.filepath = filepath with open(filepath, "wb") as fout: pickle.dump(self, fout) @property def S_eff(self): """Strain power sensitivity. """ raise NotImplementedError('Effective Strain Power Sensitivity' 'method must be defined.') @property def h_c(self): """Characteristic strain sensitivity""" if not hasattr(self, '_h_c'): self._h_c = np.sqrt(self.freqs * self.S_eff) return self._h_c @property def Omega_gw(self): """Energy Density sensitivity""" self._Omega_gw = ((2*np.pi**2/3) * self.freqs**3 * self.S_eff / self._H_0.to('Hz').value**2) return self._Omega_gw @property def H_0(self): """Hubble Constant. Assumed to be in units of km /(s Mpc) unless supplied as an `astropy.quantity`. """ self._H_0 = make_quant(self._H_0,'km /(s Mpc)') return self._H_0
[docs]class GWBSensitivityCurve(SensitivityCurve): r""" Class to produce a sensitivity curve for a gravitational wave background, using Hellings-Downs spatial correlations. Parameters ---------- orf : str, optional {'hd', 'st', 'dipole', 'monopole'} Overlap reduction function to be used in the sensitivity curve. Maybe be Hellings-Downs, Scalar-Tensor, Dipole or Monopole. """ def __init__(self, spectra, orf='hd',autocorr=False): super().__init__(spectra) if orf == 'hd': Coff = HellingsDownsCoeff(self.phis, self.thetas, autocorr=autocorr) elif orf == 'st': Coff = ScalarTensorCoeff(self.phis, self.thetas) elif orf == 'dipole': Coff = DipoleCoeff(self.phis, self.thetas) elif orf == 'monopole': Coff = MonopoleCoeff(self.phis, self.thetas) self.ThetaIJ, self.chiIJ, self.pairs, self.chiRSS = Coff self.T_IJ = np.array([get_TspanIJ(spectra[ii],spectra[jj]) for ii,jj in zip(self.pairs[0],self.pairs[1])])
[docs] def SNR(self, Sh): """ Calculate the signal-to-noise ratio of a given signal strain power spectral density, `Sh`. Must match frequency range and `df` of `self`. """ integrand = Sh**2 / self.S_eff**2 return np.sqrt(2.0 * self.Tspan * np.trapz(y=integrand, x=self.freqs, axis=0))
@property def S_eff(self): """Strain power sensitivity. """ if not hasattr(self, '_S_eff'): ii = self.pairs[0] jj = self.pairs[1] kk = np.arange(len(self.chiIJ)) num = self.T_IJ[kk] / self.Tspan * self.chiIJ[kk]**2 series = num[:,np.newaxis] / (self.SnI[ii] * self.SnI[jj]) self._S_eff = np.power(np.sum(series, axis=0),-0.5) return self._S_eff @property def S_effIJ(self): """Strain power sensitivity. """ if not hasattr(self, '_S_effIJ'): ii = self.pairs[0] jj = self.pairs[1] kk = np.arange(len(self.chiIJ)) num = self.T_IJ[kk] / self.Tspan * self.chiIJ[kk]**2 self._S_effIJ = np.sqrt((self.SnI[ii] * self.SnI[jj]) / num[:,np.newaxis]) return self._S_effIJ
[docs]class DeterSensitivityCurve(SensitivityCurve): ''' Parameters ---------- include_corr : bool Whether to include cross correlations from the GWB as an additional noise source in full PTA correlation matrix. (Has little to no effect and adds a lot of computation time.) A_GWB : float Value of GWB amplitude for use in cross correlations. ''' def __init__(self, spectra, pulsar_term=True, include_corr=False, A_GWB=None): super().__init__(spectra) self.T_I = np.array([sp.toas.max()-sp.toas.min() for sp in spectra]) self.pulsar_term = pulsar_term self.include_corr = include_corr if include_corr: self.spectra = spectra if A_GWB is None: self.A_GWB = 1e-15 else: self.A_GWB = A_GWB Coff = HellingsDownsCoeff(self.phis, self.thetas) self.ThetaIJ, self.chiIJ, self.pairs, self.chiRSS = Coff self.T_IJ = np.array([get_TspanIJ(spectra[ii],spectra[jj]) for ii,jj in zip(self.pairs[0], self.pairs[1])]) self.NcalInvI = np.array([sp.NcalInv for sp in spectra])
[docs] def SNR(self, h0): r''' Calculate the signal-to-noise ratio of a source given the strain amplitude. This is based on Equation (79) from Hazboun, et al., 2019 `[1]`_. .. math:: \rho(\hat{n})=h_0\sqrt{\frac{T_{\rm obs}}{S_{\rm eff}(f_0 ,\hat{k})}} .. _[1]: https://arxiv.org/abs/1907.04341 ''' return h0 * np.sqrt(self.Tspan / self.S_eff)
@property def S_eff(self): """Strain power sensitivity. """ if not hasattr(self, '_S_eff'): t_I = self.T_I / self.Tspan elements = t_I[:,np.newaxis] / self.SnI sum1 = np.sum(elements, axis=0) if self.include_corr: sum = 0 ii = self.pairs[0] jj = self.pairs[1] kk = np.arange(len(self.chiIJ)) num = self.T_IJ[kk] / self.Tspan * self.chiIJ[kk] summand = num[:,np.newaxis] * self.NcalInvIJ summand *= resid_response(self.freqs)[np.newaxis,:] sum2 = np.sum(summand, axis=0) norm = 4./5 if self.pulsar_term else 2./5 self._S_eff = np.power(norm * sum1,-1) return self._S_eff @property def NcalInvIJ(self): """ Inverse Noise Weighted Transmission Function that includes cross-correlation noise from GWB. """ if not hasattr(self,'_NcalInvIJ'): self._NcalInvIJ = get_NcalInvIJ(psrs=self.spectra, A_GWB=self.A_GWB, freqs=self.freqs, full_matrix=True) return self._NcalInvIJ
def HD(phis,thetas): return HellingsDownsCoeff(np.array(phis),np.array(thetas))[1][0] def get_NcalInvIJ(psrs, A_GWB, freqs, full_matrix=False, return_Gtilde_Ncal=False): r""" Calculate the inverse-noise-wieghted transmission function for a given pulsar. This calculates :math:`\mathcal{N}^{-1}(f,f') , \; \mathcal{N}^{-1}(f)` in `[1]`_, see Equations (19-20). .. _[1]: https://arxiv.org/abs/1907.04341 Parameters ---------- psrs : list of hasasia.Pulsar objects List of hasasia.Pulsar objects to build NcalInvIJ Returns ------- inverse-noise-weighted transmission function across two pulsars. """ Npsrs = len(psrs) toas = np.concatenate([p.toas for p in psrs], axis=None) # make filter ff = np.tile(freqs, Npsrs) ## CHANGE BACK # G = sl.block_diag(*[G_matrix(p.designmatrix) for p in psrs]) G = sl.block_diag(*[np.eye(p.toas.size) for p in psrs]) Gtilde = np.zeros((ff.size, G.shape[1]), dtype='complex128') #N_freqs x N_TOA-N_par Gtilde = np.dot(np.exp(1j*2*np.pi*ff[:,np.newaxis]*toas),G) # N_freq x N_TOA-N_par #CHANGE BACK # psd = red_noise_powerlaw(A=A_GWB, gamma=13./3, freqs=freqs) psd = 2*(365.25*24*3600/40)*(1e-6)**2 Ch_blocks = [[(HD([pc.phi,pr.phi],[pc.theta,pr.theta]) *corr_from_psdIJ(freqs=freqs, psd=psd, toasI=pc.toas, toasJ=pr.toas, fast=True)) if r!=c else corr_from_psdIJ(freqs=freqs, psd=psd, toasI=pc.toas, toasJ=pr.toas, fast=True) for r, pr in enumerate(psrs)] for c, pc in enumerate(psrs)] C_h = np.block(Ch_blocks) C_n = sl.block_diag(*[p.N for p in psrs]) # C_h = sl.block_diag(*[corr_from_psd(freqs=freqs, psd=psd, # toas=p.toas, fast=True) for p in psrs]) C = C_n + C_h Ncal = np.matmul(G.T, np.matmul(C, G)) #N_TOA-N_par x N_TOA-N_par NcalInv = np.linalg.inv(Ncal) #N_TOA-N_par x N_TOA-N_par TfN = NcalInv#np.matmul(G, np.matmul(NcalInv, G.T)) #np.matmul(np.conjugate(Gtilde),np.matmul(NcalInv,Gtilde.T)) / 2 if return_Gtilde_Ncal: return np.real(TfN), Gtilde, Ncal elif full_matrix: return np.real(TfN), toas, ChiIJ else: return np.real(np.diag(TfN)) / get_Tspan(psrs)
[docs]def HellingsDownsCoeff(phi, theta, autocorr=False): """ Calculate Hellings and Downs coefficients from two lists of sky positions. Parameters ---------- phi : array, list Pulsar axial coordinate. theta : array, list Pulsar azimuthal coordinate. Returns ------- ThetaIJ : array An Npair-long array of angles between pairs of pulsars. chiIJ : array An Npair-long array of Hellings and Downs relation coefficients. pairs : array A 2xNpair array of pair indices corresponding to input order of sky coordinates. chiRSS : float Root-sum-squared value of all Hellings-Downs coefficients. """ Npsrs = len(phi) # Npairs = np.int(Npsrs * (Npsrs-1) / 2.) psr_idx = np.arange(Npsrs) pairs = list(it.combinations(psr_idx,2)) first, second = list(map(list, zip(*pairs))) cosThetaIJ = np.cos(theta[first]) * np.cos(theta[second]) \ + np.sin(theta[first]) * np.sin(theta[second]) \ * np.cos(phi[first] - phi[second]) if autocorr: first.extend(psr_idx) second.extend(psr_idx) cosThetaIJ = np.append(cosThetaIJ,np.zeros(Npsrs)) X = (1. - cosThetaIJ) / 2. chiIJ = [1.5*x*np.log(x) - 0.25*x + 0.5 if x!=0 else 1. for x in X] chiIJ = np.array(chiIJ) # calculate rss (root-sum-squared) of Hellings-Downs factor chiRSS = np.sqrt(np.sum(chiIJ**2)) return np.arccos(cosThetaIJ), chiIJ, np.array([first,second]), chiRSS
def ScalarTensorCoeff(phi, theta, norm='std'): """ Calculate Scalar-Tensor overlap reduction coefficients for alternative polarizations from two lists of sky positions. Parameters ---------- phi : array, list Pulsar axial coordinate. theta : array, list Pulsar azimuthal coordinate. Returns ------- ThetaIJ : array An Npair-long array of angles between pairs of pulsars. chiIJ : array An Npair-long array of Scalar Tensor ORF coefficients. pairs : array A 2xNpair array of pair indices corresponding to input order of sky coordinates. chiRSS : float Root-sum-squared value of all Scalar Tensor ORF coefficients. """ Npsrs = len(phi) # Npairs = np.int(Npsrs * (Npsrs-1) / 2.) psr_idx = np.arange(Npsrs) pairs = list(it.combinations(psr_idx,2)) first, second = list(map(list, zip(*pairs))) cosThetaIJ = np.cos(theta[first]) * np.cos(theta[second]) \ + np.sin(theta[first]) * np.sin(theta[second]) \ * np.cos(phi[first] - phi[second]) X = 3/8+1/8*cosThetaIJ chiIJ = [x if x!=0 else 1. for x in X] chiIJ = np.array(chiIJ) # calculate rss (root-sum-squared) of Hellings-Downs factor chiRSS = np.sqrt(np.sum(chiIJ**2)) return np.arccos(cosThetaIJ), chiIJ, np.array([first,second]), chiRSS def DipoleCoeff(phi, theta, norm='std'): """ Calculate Dipole overlap reduction coefficients from two lists of sky positions. Parameters ---------- phi : array, list Pulsar axial coordinate. theta : array, list Pulsar azimuthal coordinate. Returns ------- ThetaIJ : array An Npair-long array of angles between pairs of pulsars. chiIJ : array An Npair-long array of Dipole ORF coefficients. pairs : array A 2xNpair array of pair indices corresponding to input order of sky coordinates. chiRSS : float Root-sum-squared value of all Dipole ORF coefficients. """ Npsrs = len(phi) # Npairs = np.int(Npsrs * (Npsrs-1) / 2.) psr_idx = np.arange(Npsrs) pairs = list(it.combinations(psr_idx,2)) first, second = list(map(list, zip(*pairs))) cosThetaIJ = np.cos(theta[first]) * np.cos(theta[second]) \ + np.sin(theta[first]) * np.sin(theta[second]) \ * np.cos(phi[first] - phi[second]) X = 0.5*cosThetaIJ chiIJ = [x if x!=0 else 1. for x in X] chiIJ = np.array(chiIJ) # calculate rss (root-sum-squared) of Hellings-Downs factor chiRSS = np.sqrt(np.sum(chiIJ**2)) return np.arccos(cosThetaIJ), chiIJ, np.array([first,second]), chiRSS def MonopoleCoeff(phi, theta, norm='std'): """ Calculate Monopole overlap reduction coefficients from two lists of sky positions. Parameters ---------- phi : array, list Pulsar axial coordinate. theta : array, list Pulsar azimuthal coordinate. Returns ------- ThetaIJ : array An Npair-long array of angles between pairs of pulsars. chiIJ : array An Npair-long array of Dipole ORF coefficients. pairs : array A 2xNpair array of pair indices corresponding to input order of sky coordinates. chiRSS : float Root-sum-squared value of all Dipole ORF coefficients. """ Npsrs = len(phi) # Npairs = np.int(Npsrs * (Npsrs-1) / 2.) psr_idx = np.arange(Npsrs) pairs = list(it.combinations(psr_idx,2)) first, second = list(map(list, zip(*pairs))) cosThetaIJ = np.cos(theta[first]) * np.cos(theta[second]) \ + np.sin(theta[first]) * np.sin(theta[second]) \ * np.cos(phi[first] - phi[second]) chiIJ = np.ones_like(cosThetaIJ) # calculate rss (root-sum-squared) of Hellings-Downs factor chiRSS = np.sqrt(np.sum(chiIJ**2)) return np.arccos(cosThetaIJ), chiIJ, np.array([first,second]), chiRSS
[docs]def get_Tspan(psrs): """ Returns the total timespan from a list or arry of Pulsar objects, psrs. """ last = np.amax([p.toas.max() for p in psrs]) first = np.amin([p.toas.min() for p in psrs]) return last - first
[docs]def get_TspanIJ(psr1,psr2): """ Returns the overlapping timespan of two Pulsar objects, psr1/psr2. """ start = np.amax([psr1.toas.min(),psr2.toas.min()]) stop = np.amin([psr1.toas.max(),psr2.toas.max()]) return stop - start
[docs]def corr_from_psd(freqs, psd, toas, fast=True): """ Calculates the correlation matrix over a set of TOAs for a given power spectral density. Parameters ---------- freqs : array Array of freqs over which the psd is given. psd : array Power spectral density to use in calculation of correlation matrix. toas : array Pulsar times-of-arrival to use in correlation matrix. fast : bool, optional Fast mode uses a matix inner product, while the slower mode uses the numpy.trapz function which is slower, but more accurate. Returns ------- corr : array A 2-dimensional array which represents the correlation matrix for the given set of TOAs. """ if fast: df = np.diff(freqs) df = np.append(df,df[-1]) tm = np.sqrt(psd*df)*np.exp(1j*2*np.pi*freqs*toas[:,np.newaxis]) integrand = np.matmul(tm, np.conjugate(tm.T)) return np.real(integrand) else: #Makes much larger arrays, but uses np.trapz t1, t2 = np.meshgrid(toas, toas, indexing='ij') tm = np.abs(t1-t2) integrand = psd*np.cos(2*np.pi*freqs*tm[:,:,np.newaxis])#df* return np.trapz(integrand, axis=2, x=freqs)#np.sum(integrand,axis=2)#
def corr_from_psdIJ(freqs, psd, toasI, toasJ, fast=True): """ Calculates the correlation matrix over a set of TOAs for a given power spectral density for two pulsars. Parameters ---------- freqs : array Array of freqs over which the psd is given. psd : array Power spectral density to use in calculation of correlation matrix. toas : array Pulsar times-of-arrival to use in correlation matrix. fast : bool, optional Fast mode uses a matix inner product, while the slower mode uses the numpy.trapz function which is slower, but more accurate. Returns ------- corr : array A 2-dimensional array which represents the correlation matrix for the given set of TOAs. """ if fast: df = np.diff(freqs) df = np.append(df,df[-1]) tmI = np.sqrt(psd*df)*np.exp(1j*2*np.pi*freqs*toasI[:,np.newaxis]) tmJ = np.sqrt(psd*df)*np.exp(1j*2*np.pi*freqs*toasJ[:,np.newaxis]) integrand = np.matmul(tmI, np.conjugate(tmJ.T)) return np.real(integrand) else: #Makes much larger arrays, but uses np.trapz t1, t2 = np.meshgrid(toasI, toasJ, indexing='ij') tm = np.abs(t1-t2) integrand = psd*np.cos(2*np.pi*freqs*tm[:,:,np.newaxis])#df* return np.trapz(integrand, axis=2, x=freqs)
[docs]def quantize_fast(toas, toaerrs, flags=None, dt=0.1): r""" Function to quantize and average TOAs by observation epoch. Used especially for NANOGrav multiband data. Pulled from `[3]`_. .. _[3]: https://github.com/vallis/libstempo/blob/master/libstempo/toasim.py Parameters ---------- times : array TOAs for a pulsar. flags : array, optional Flags for TOAs. dt : float Coarse graining time [days]. """ isort = np.argsort(toas) bucket_ref = [toas[isort[0]]] bucket_ind = [[isort[0]]] dt *= (24*3600) for i in isort[1:]: if toas[i] - bucket_ref[-1] < dt: bucket_ind[-1].append(i) else: bucket_ref.append(toas[i]) bucket_ind.append([i]) avetoas = np.array([np.mean(toas[l]) for l in bucket_ind],'d') avetoaerrs = np.array([sps.hmean(toaerrs[l]) for l in bucket_ind],'d') if flags is not None: aveflags = np.array([flags[l[0]] for l in bucket_ind]) U = np.zeros((len(toas),len(bucket_ind)),'d') for i,l in enumerate(bucket_ind): U[l,i] = 1 if flags is not None: return avetoas, avetoaerrs, aveflags, U, bucket_ind else: return avetoas, avetoaerrs, U, bucket_ind
def SimCurve(): raise NotImplementedError()
[docs]def red_noise_powerlaw(A, freqs, gamma=None, alpha=None): r""" Add power law red noise to the prefit residual power spectral density. As :math:`P=A^2(f/fyr)^{-\gamma}` Parameters ---------- A : float Amplitude of red noise. gamma : float Spectral index of red noise powerlaw. freqs : array Frequencies at which to calculate the red noise power law. """ if gamma is None and alpha is not None: gamma = 3-2*alpha elif ((gamma is None and alpha is None) or (gamma is not None and alpha is not None)): ValueError('Must specify one version of spectral index.') return A**2*(freqs/fyr)**(-gamma)/(12*np.pi**2) * yr_sec**3
def S_h(A, alpha, freqs): r""" Add power law red noise to the prefit residual power spectral density. As S_h=A^2*(f/fyr)^(2*alpha)/f Parameters ---------- A : float Amplitude of red noise. alpha : float Spectral index of red noise powerlaw. freqs : array Array of frequencies at which to calculate S_h. """ return A**2*(freqs/fyr)**(2*alpha) / freqs
[docs]def Agwb_from_Seff_plaw(freqs, Tspan, SNR, S_eff, gamma=13/3., alpha=None): r""" Must supply numpy.ndarrays. """ if alpha is None: alpha = (3-gamma)/2 else: pass if hasattr(alpha,'size'): fS_sqr = freqs**2 * S_eff**2 integrand = (freqs[:,np.newaxis]/fyr)**(4*alpha) integrand /= fS_sqr[:,np.newaxis] fintegral = np.trapz(integrand, x=freqs,axis=0) else: integrand = (freqs/fyr)**(4*alpha) / freqs**2 / S_eff**2 fintegral = np.trapz(integrand, x=freqs) return np.sqrt(SNR)/np.power(2 * Tspan * fintegral, 1/4.)
[docs]def PI_hc(freqs, Tspan, SNR, S_eff, N=200): '''Power law-integrated characteristic strain.''' alpha = np.linspace(-1.75, 1.25, N) h = Agwb_from_Seff_plaw(freqs=freqs, Tspan=Tspan, SNR=SNR, S_eff=S_eff, alpha=alpha) plaw = np.dot((freqs[:,np.newaxis]/fyr)**alpha,h[:,np.newaxis]*np.eye(N)) PI_sensitivity = np.amax(plaw, axis=1) return PI_sensitivity, plaw
def get_dt(toas): '''Returns average dt between observation epochs given toas.''' toas = make_quant(toas, u.s) return np.diff(np.unique(np.round(toas.to('day')))).mean() def make_quant(param, default_unit): """Convenience function to intialize a parameter as an astropy quantity. param == parameter to initialize. default_unit == string that matches an astropy unit, set as default for this parameter. returns: an astropy quantity example: self.f0 = make_quant(f0,'MHz') """ default_unit = u.core.Unit(default_unit) if hasattr(param, 'unit'): try: param.to(default_unit) except u.UnitConversionError: raise ValueError("Quantity {0} with incompatible unit {1}" .format(param, default_unit)) quantity = param.to(default_unit) else: quantity = param * default_unit return quantity ################## Pre-Made Sensitivity Curves#############
[docs]def nanograv_11yr_deter(): ''' Returns a `DeterSensitivityCurve` object built using with the NANOGrav 11-year data set. ''' path = sc_dir + 'nanograv_11yr_deter.sc' with open(path, "rb") as fin: sc = pickle.load(fin) sc.filepath = path return sc
[docs]def nanograv_11yr_stoch(): ''' Returns a `GWBSensitivityCurve` object built using with the NANOGrav 11-year data set. ''' path = sc_dir + 'nanograv_11yr_stoch.sc' with open(path, "rb") as fin: sc = pickle.load(fin) sc.filepath = path return sc