Source code for pywiggle.utils

import numpy as np
from . import _wiggle
import os
from time import time
import pkgutil, io
import healpy as hp
import pywiggle

# For benchmarking and testing
try:
    import ducc0
    from pspy._mcm_fortran import mcm_compute as mcm_fortran
    import pymaster as nmt
except:
    pass


"""
=========
Utilities
=========
"""

[docs] def change_alm_lmax(alms, lmax, mmax_out=None): ilmax = hp.Alm.getlmax(alms.shape[-1], mmax_out) immax = mmax_out or ilmax # fall back to full m-range ommax = mmax_out or lmax return np.asarray(hp.sphtfunc.resize_alm(alms, ilmax, immax, lmax, ommax))
[docs] def multipole_to_bin_indices(lmax, bin_edges): """ Returns an array mapping multipole index l (0 to lmax) to bin index, based on provided bin_edges. Parameters: - lmax: int, maximum multipole - bin_edges: 1D array-like, sorted bin edges of length nbins + 1 Returns: - bin_indices: np.ndarray of shape (lmax + 1,), where bin_indices[l] gives the bin index for multipole l """ bin_edges = np.asarray(bin_edges) if bin_edges.min()<0: raise ValueError if bin_edges.max()>(lmax+1): raise ValueError if np.any(np.diff(bin_edges)<=0): raise ValueError("Bin edges must be monotonically increasing.") ls = np.arange(lmax + 1) bin_indices = np.searchsorted(bin_edges, ls, side='right') - 1 return bin_indices
[docs] def bin_array(arr, bin_indices, nbins, weights=None): """ Bin an array using bin indices and optional per-element weights. Parameters ---------- arr : ndarray of shape (lmax + 1,) Array defined over multipole ℓ values. bin_indices : ndarray of shape (lmax + 1,) Per-ℓ bin index (-1 for entries to ignore). weights : ndarray of shape (lmax + 1,), optional Optional per-element weights w_ell. If not provided, defaults to 1. Returns ------- binned : ndarray of shape (nbins,) Array with weighted sum of values in each bin. """ arr = np.asarray(arr) bin_indices = np.asarray(bin_indices) if arr.shape != bin_indices.shape: raise ValueError("arr and bin_indices must have the same shape") if weights is None: weights = np.ones_like(arr, dtype=np.float64) else: weights = np.asarray(weights) if weights.shape != arr.shape: raise ValueError("weights must have the same shape as arr") # Mask out entries with bin_index == -1 or >= nbins mask = np.logical_and(bin_indices >= 0, bin_indices < nbins) valid_bins = bin_indices[mask] valid_vals = arr[mask] valid_weights = weights[mask] weighted_vals = valid_vals * valid_weights # Vectorized binning with weights binned = np.bincount(valid_bins, weights=weighted_vals, minlength=nbins) return binned
[docs] def normalize_weights_per_bin(nbins, bin_indices, weights): """ Normalize weights so that values in each bin sum to 1. Parameters ---------- nbins : int Number of bins. bin_indices : ndarray of shape (N,) Bin index for each element. Use -1 for elements to be ignored. weights : ndarray of shape (N,) Weights to normalize. Returns ------- normalized_weights : ndarray of shape (N,) Weights normalized per bin. """ bin_indices = np.asarray(bin_indices) weights = np.asarray(weights) # Sum weights per bin (bins with no entries will have sum=0) bin_sums = np.bincount(bin_indices[bin_indices >= 0], weights[bin_indices >= 0], minlength=nbins) # Get the sum for each entry’s bin per_point_sums = bin_sums[bin_indices.clip(0)] # Avoid division by zero for bins with no data norm_factors = np.where(per_point_sums > 0, per_point_sums, 1.0) # Normalize normalized = np.where(bin_indices >= 0, weights / norm_factors, 0.0) return normalized
[docs] def get_wigner_d(lmax,s1,s2,cos_thetas, bin_edges=None,bin_weights=None,bin_weights2=None): r""" Compute Wigner-d matrices for given spins and angles, with optional weighted binning over multipoles. This function evaluates the Wigner small-d matrices, \( d^{\ell}_{s_1,s_2}(\theta) \), for multipoles from 0 up to `lmax` at the specified cosines of angles (`cos_thetas`). It supports optional binning of the multipole axis using specified bin edges and weighting schemes. Parameters ---------- lmax : int Maximum multipole (ℓ) for which to compute the Wigner-d matrices. s1 : int First spin index for the Wigner-d matrix. s2 : int Second spin index for the Wigner-d matrix. cos_thetas : array_like Array of cosines of the angles θ at which to evaluate the Wigner-d matrices. bin_edges : array_like, optional Bin edges defining multipole bins for aggregating the Wigner-d matrices. Bins are interpreted as intervals: low_edge ≤ ℓ < upper_edge. If not provided, no binning is performed and the full multipole range is returned. bin_weights : array_like, optional Multipole weights used for binning along the ℓ axis. If provided, enables weighted binning. If `bin_weights2` is not provided, single weighted binning is performed. bin_weights2 : array_like, optional Optional second set of multipole weights for double-weighted binning. Only used if `bin_weights` is also provided. If `bin_weights2` is provided, double weighted binning is performed. Returns ------- ndarray If no binning is requested (`bin_edges` is None), returns a 2D array of shape `(len(cos_thetas), lmax + 1)` containing the Wigner-d matrices evaluated at each angle and multipole. If binning is requested, returns a binned (or double-binned) Wigner-d matrix array with shape depending on the number of bins and angles. Raises ------ ValueError If `bin_weights` is provided without `bin_edges`. Notes ----- - This function is intended primarily for testing purposes and is not used in `wiggle`. """ if bin_edges is None: if bin_weights is not None: raise ValueError return _wiggle._compute_wigner_d_matrix(lmax,s1,s2,cos_thetas) else: nbins, bin_indices, nweights = _prepare_bins(bin_edges,bin_weights,lmax) if bin_weights2 is not None: _, _, nweights2 = _prepare_bins(bin_edges,bin_weights2,lmax) return _wiggle._compute_double_binned_wigner_d(lmax,s1,s2,cos_thetas,nbins,bin_indices,nweights,nweights2) else: return _wiggle._compute_binned_wigner_d(lmax,s1,s2,cos_thetas,nbins,bin_indices,nweights)
[docs] def bin_square_matrix(matrix,bin_edges,lmax,bin_weights=None,bin_weights2=None): """ Bin a square matrix in along both directions using specified multipole binning and weights. This function reduces a 2D square matrix (e.g., a coupling matrix) from (lmax+1) × (lmax+1) to a binned form of shape (n_bins × n_bins), where binning is defined by multipole `bin_edges` and optional weights. It supports different weighting schemes along each axis of the matrix. Parameters ---------- matrix : 2D array_like Square matrix of shape (lmax+1, lmax+1) to be binned. Typically a function of multipole indices (ℓ, ℓ'). bin_edges : array_like Array defining the edges of the multipole bins. Must span the relevant ℓ-range up to `lmax`. Note, these are of the form low_edge <= ℓ < upper_edge. lmax : int Maximum multipole to consider. Defines the size of the unbinned matrix as (lmax+1) × (lmax+1). bin_weights : array_like or None, optional Weights to apply within each bin along the first axis (rows). If `None`, unit weights are used and internally normalized. bin_weights2 : array_like or None, optional Weights to apply within each bin along the second axis (columns). If `None`, unit weights are used but not normalized—useful for asymmetric binning or theoretical filters. Returns ------- binned_matrix : ndarray Binned square matrix of shape (n_bins, n_bins), where `n_bins = len(bin_edges) - 1`. Notes ----- - Internally uses normalized weights along the first axis, and unnormalized unity for the second axis if weights for the second axis are not provided. This correctly transforms an unbinned coupling matrix to a binned coupling matrix. """ if bin_weights is None: bin_weights = np.ones(matrix.shape[0]) # meant to be normalized if bin_weights2 is None: nbins, bin_indices, nweights = _prepare_bins(bin_edges,bin_weights,lmax,bin_weights2=None) nweights2 = nweights*0 + 1 # not normalized else: nbins, bin_indices, nweights, nweights2 = _prepare_bins(bin_edges,bin_weights,lmax,bin_weights2=bin_weights2) return _wiggle.bin_matrix(matrix,bin_indices,bin_indices,nweights,nweights2,nbins,nbins)
[docs] def wfactor(n,mask,sht=True,pmap=None,equal_area=False): """ Copied from msyriac/orphics/maps.py Approximate correction to an n-point function for the loss of power due to the application of a mask. For an n-point function using SHTs, this is the ratio of area weighted by the nth power of the mask to the full sky area 4 pi. This simplifies to mean(mask**n) for equal area pixelizations like healpix. For SHTs on CAR, it is sum(mask**n * pixel_area_map) / 4pi. When using FFTs, it is the area weighted by the nth power normalized to the area of the map. This also simplifies to mean(mask**n) for equal area pixels. For CAR, it is sum(mask**n * pixel_area_map) / sum(pixel_area_map). If not, it does an expensive calculation of the map of pixel areas. If this has been pre-calculated, it can be provided as the pmap argument. """ from pixell import enmap assert mask.ndim==1 or mask.ndim==2 if pmap is None: if equal_area: npix = mask.size pmap = 4*np.pi / npix if sht else enmap.area(mask.shape,mask.wcs) / npix else: pmap = enmap.pixsizemap(mask.shape,mask.wcs) return np.sum((mask**n)*pmap) /np.pi / 4. if sht else np.sum((mask**n)*pmap) / np.sum(pmap)
[docs] def get_camb_spectra(lmax=512, tensor=True, ns=0.965, As=2e-9, r=0.05): """ Returns CMB Cls [TT, EE, BB, TE] from CAMB with low accuracy for fast testing. """ import camb from camb import model pars = camb.CAMBparams() pars.set_cosmology(H0=67.5, ombh2=0.022, omch2=0.122) pars.InitPower.set_params(As=As, ns=ns, r=r) pars.set_for_lmax(lmax, lens_potential_accuracy=1) pars.WantTensors = tensor pars.AccuracyBoost = 1 pars.lAccuracyBoost = 1 pars.HighAccuracyDefault = False results = camb.get_results(pars) cls = results.get_total_cls(lmax=lmax,CMB_unit='muK',raw_cl=True) # cls has shape (lmax+1, 4): TT, EE, BB, TE ps = np.zeros((3, 3, lmax+1)) ps[0, 0] = cls[:, 0] # TT ps[1, 1] = cls[:, 1] # EE ps[2, 2] = cls[:, 2] # BB ps[0, 1] = ps[1, 0] = cls[:, 3] # TE return ps
[docs] def get_cldict(ps): cl_dict = {} cl_dict['TT'] = ps[0,0] cl_dict['TE'] = ps[0,1] cl_dict['EE'] = ps[1,1] cl_dict['BB'] = ps[2,2] cl_dict['EB'] = ps[2,2]*0. cl_dict['TB'] = ps[2,2]*0. return cl_dict
[docs] def cosine_apodize(bmask,width_deg): r = width_deg * np.pi / 180. return 0.5*(1-np.cos(bmask.distance_transform(rmax=r)*(np.pi/r)))
# CONSOLE I/O
[docs] def cprint(string,color=None,bold=False,uline=False): if not(isinstance(string,str)): string = str(string) x="" if bold: x+=bcolors.BOLD if uline: x+=bcolors.UNDERLINE if color is not None: color = color.lower() if color in ['b','blue']: x+=bcolors.OKBLUE elif color in ['r','red','f','fail']: x+=bcolors.FAIL elif color in ['g','green','ok']: x+=bcolors.OKGREEN elif color in ['y','yellow','w','warning']: x+=bcolors.WARNING elif color in ['p','purple','h','header']: x+=bcolors.HEADER print(x+string+bcolors.ENDC)
[docs] class bcolors: ''' Colored output for print commands ''' HEADER = '\033[95m' OKBLUE = '\033[94m' OKGREEN = '\033[92m' WARNING = '\033[93m' FAIL = '\033[91m' ENDC = '\033[0m' BOLD = '\033[1m' UNDERLINE = '\033[4m'
[docs] def hplot(img,savename=None,verbose=True,grid=False,**kwargs): from pixell import enplot plots = enplot.get_plots(img,grid=grid,**kwargs) if savename is None: enplot.show(plots) return enplot.write(savename,plots) if verbose: cprint("Saved plot to "+ savename,color="g")
[docs] def mollview(hp_map,filename=None,lim=None,coord='C',verbose=True,return_projected_map=False,xsize=1200,grat_deg=None,dpi=None,grat_color='gray',grat_alpha=0.5,**kwargs): ''' mollview plot for healpix wrapper ''' import healpy as hp if lim is None: cmin = cmax = None elif type(lim) is list or type(lim) is tuple: cmin,cmax = lim else: cmin =-lim cmax = lim retimg = hp.mollview(hp_map,min=cmin,max=cmax,coord=coord,return_projected_map=return_projected_map,xsize=xsize,**kwargs) if grat_deg is not None: hp.graticule(dpar=grat_deg,dmer=grat_deg,coord=coord,color=grat_color,alpha=grat_alpha) if filename is not None: plt.savefig(filename,dpi=dpi) if verbose: cprint("Saved healpix plot to "+ filename,color="g") if return_projected_map: return retimg
""" ========= Helpers ========= """ def _prepare_bins(bin_edges,bin_weights,lmax,bin_weights2=None): if (bin_edges is None): if bin_weights is not None: raise ValueError return None bin_indices = multipole_to_bin_indices(lmax, bin_edges) if (bin_weights is None): bin_weights = np.ones(lmax+1) if bin_weights.size<(lmax+1): raise ValueError(f"bin_weights size is {bin_weights.size} but need lmax+1={lmax+1}") # TODO: Do other checks like making sure bins are monotonic and non-overlapping. bin_weights = bin_weights[:lmax+1] nbins = len(bin_edges)-1 nweights = normalize_weights_per_bin(nbins, bin_indices, bin_weights) if bin_weights2 is None: return nbins, bin_indices, nweights else: nweights2 = normalize_weights_per_bin(nbins, bin_indices, bin_weights2) return nbins, bin_indices, nweights, nweights2 def _parity_flip(c,parity): c = c.copy() if parity=='-': c[1::2] *= -1 # This is (-1)^ell elif parity=='+': pass else: raise ValueError return c """ =================== Benchmark utilities =================== The following are interfaces to ducc, pspy and Namaster for benchmarking and testing. """ # Interface to namaster provided by David Alonso def _get_mcm_standalone(spin1, spin2, cl_mask, lmax, pureE1=False, pureB1=False, pureE2=False, pureB2=False, beam1=None, beam2=None): lmax_mask = len(cl_mask)-1 if beam1 is None: beam1 = np.ones(lmax+1) if beam2 is None: beam2 = np.ones(lmax+1) # (binning is arbitrary, but required by C layer) b = nmt.NmtBin.from_lmax_linear(lmax, 10) # Call C layer wsp = nmt.nmtlib.comp_coupling_matrix( int(spin1), int(spin2), int(lmax), int(lmax_mask), int(pureE1), int(pureB1), int(pureE2), int(pureB2), 0, 0.0, beam1, beam2, cl_mask, b.bin, 0, -1, -1, -1) # Extract MCM from C layer nmap1 = 2 if spin1 else 1 nmap2 = 2 if spin2 else 1 ncls = nmap1*nmap2 nrows = (lmax+1)*ncls mcm = nmt.nmtlib.get_mcm(wsp, nrows*nrows).reshape([nrows, nrows]) nmt.nmtlib.workspace_free(wsp) return mcm.reshape([lmax+1, ncls, lmax+1, ncls]) # Copied many of the following from mreinecke/ducc0 def _tri2full(tri, lmax): res = np.zeros((tri.shape[0], tri.shape[1], lmax+1, lmax+1)) lfac = 2.*np.arange(lmax+1) + 1. for l1 in range(lmax+1): startidx = l1*(lmax+1) - (l1*(l1+1))//2 res[:,:,l1,l1:] = lfac[l1:] * tri[:,:, startidx+l1:startidx+lmax+1] res[:,:,l1:,l1] = (2*l1+1) * tri[:,:, startidx+l1:startidx+lmax+1] return res def _mcm00_nmt(spec,lmax): mcm = _get_mcm_standalone(spin1=0, spin2=0, cl_mask=spec[0], lmax=lmax)[:,0,:,0][None,...] return mcm def _mcm00_pspy(spec, lmax): nspec = spec.shape[0] lrange_spec = np.arange(spec.shape[1]) res=np.zeros((nspec, lmax+1, lmax+1)) mcmtmp = np.empty((lmax+1, lmax+1)) for i in range(nspec): wcl = spec[i]*(2*lrange_spec+1) mcm_fortran.calc_coupling_spin0(wcl, lmax+1, lmax+1, lmax+1, mcmtmp.T) mcm_fortran.fill_upper(mcmtmp.T) mcmtmp *= (np.arange(2, lmax+3)*2+1.)/(4*np.pi) res[i, 2:, 2:] = mcmtmp[:-2,:-2] return res def _mcm00_ducc_tri(spec, lmax,nthreads): out= np.empty((spec.shape[0],1,((lmax+1)*(lmax+2))//2),dtype=np.float32) ducc0.misc.experimental.coupling_matrix_spin0and2_tri(spec.reshape((spec.shape[0],1,spec.shape[1])), lmax, (0,0,0,0), (0,-1,-1,-1,-1), nthreads=nthreads, res=out) return out def _mcm02_ducc_tri(spec, lmax,nthreads): out= np.empty((spec.shape[0],5,((lmax+1)*(lmax+2))//2),dtype=np.float32) ducc0.misc.experimental.coupling_matrix_spin0and2_tri(spec[:,:,:], lmax, (0,1,2,3), (0,1,2,3,4), nthreads=nthreads, res=out) return out def _mcmpm_ducc_tri(spec, lmax,nthreads): out= np.empty((spec.shape[0],2,((lmax+1)*(lmax+2))//2),dtype=np.float32) ducc0.misc.experimental.coupling_matrix_spin0and2_tri(spec[:,3:,:], lmax, (0,0,0,0), (-1,-1,-1,0,1), nthreads=nthreads, res=out) return out def _mcm02_pure_ducc(spec, lmax,nthreads): res = np.empty((nspec, 4, lmax+1, lmax+1), dtype=np.float32) return ducc0.misc.experimental.coupling_matrix_spin0and2_pure(spec, lmax, nthreads=nthreads, res=res) # Modified version of ducc0 mcm_bench.py
[docs] class Benchmark(object): def __init__(self,lmax, bin_edges = None,nthreads=None,verbose=False): self.lmax = lmax self.nthreads = nthreads # number of spectra to process simultaneously nspec=1 if verbose: print() print("Mode coupling matrix computation comparison") print(f"nspec={nspec}, lmax={lmax}, nthreads={nthreads}") # we generate the spectra up to 2*lmax+1 to use all Wigner 3j symbols # but this could also be lower. seed = 1 np.random.seed(seed) cls = np.random.normal(size=(2*lmax+1,)) self.spec = np.repeat(cls[None, :], repeats=4, axis=0)[None,...]
[docs] def get_mcm(self,code,spin=0,bin_edges=None,bin_weights=None): a = time() if bin_edges is not None: nbins = len(bin_edges)-1 if code=='ducc' or code=='pspy' or code=='nmt': if spin==0: if code=='pspy': f = _mcm00_pspy elif code=='nmt': f = _mcm00_nmt elif code=='ducc': f = lambda x, y: _mcm00_ducc_tri(x,y,nthreads=self.nthreads) ducc = f(self.spec[:,0,:], self.lmax) if code=='ducc': mcm = _tri2full(ducc, self.lmax)[:,0,:,:][0] else: mcm = ducc[:,:,:][0] elif spin==2: if code!='ducc': raise ValueError duccpm = _mcmpm_ducc_tri(self.spec, self.lmax,nthreads=self.nthreads) mcmi = _tri2full(duccpm, self.lmax)[0] if bin_edges is not None: if spin==0: mcm = bin_square_matrix(mcm,bin_edges,self.lmax,bin_weights=bin_weights) elif spin==2: mcm = np.zeros((2,nbins,nbins)) for i in range(2): mcm[i] = bin_square_matrix(mcmi[i],bin_edges,self.lmax,bin_weights=bin_weights) else: if spin==0: mcm = mcm[2:,2:] elif spin==2: mcm = mcmi[:,2:,2:] elif code=='wiggle': if spin==0: mcm = pywiggle.get_coupling_matrix_from_mask_cls(self.spec[0,0],self.lmax,spintype='TT', bin_edges = bin_edges,bin_weights = bin_weights,verbose=False) elif spin==2: mcm = pywiggle.get_coupling_matrix_from_mask_cls(self.spec[0,0],self.lmax,spintype=['+','-'], bin_edges = bin_edges,bin_weights = bin_weights,verbose=False) if bin_edges is None: if spin==0: mcm = mcm[2:,2:] elif spin==2: mcm = mcm[:,2:,2:] else: raise ValueError etime = time()-a return mcm,etime