Source code for superfreq.naff

# coding: utf-8

# Standard library
import time

# Third-party
import numpy as np
from numpy.fft import fft, fftfreq

# Project
from .core import check_for_primes, SuperFreqResult
from ._naff import naff_frequency
from .simpsgauss import simpson
from .log import logger

__all__ = ['SuperFreq', 'find_frequencies',
           'find_integer_vectors', 'closest_resonance',
           'compute_actions']


def hamming(t_T, p):
    """Compute a Hamming filter / window function.

    Parameters
    ----------
    t_T : numeric, array_like
        Time values normalized by the period.
    p : numeric
        Exponent of filter.
    """
    p_fac = np.math.factorial(p)
    p2_fac = np.math.factorial(2*p)
    return 2. ** p * p_fac**2. / p2_fac * (1. + np.cos(np.pi*t_T))**p


[docs]class SuperFreq(object): """ Implementation of the Numerical Analysis of Fundamental Frequencies method of Laskar, later modified by Valluri and Merritt (see references below), with some slight modifications. This algorithm attempts to numerically find the fundamental frequencies of an input orbit (time series) and can also find approximate actions for the orbit. The basic idea is to Fourier transform the orbit convolved with a Hanning filter, find the most significant peak, subtract that frequency, and iterate on this until convergence or for a fixed number of terms. The fundamental frequencies can then be solved for by assuming that the frequencies found by the above method are integer combinations of the fundamental frequencies. For more information, see: - Laskar, J., Froeschlé, C., and Celletti, A. (1992) - Laskar, J. (1993) - Papaphilippou, Y. and Laskar, J. (1996) - Valluri, M. and Merritt, D. (1998) Parameters ---------- t : array_like Array of times. p : int (optional) Coefficient for Hamming filter -- default p=1 which is a Hanning filter, used by Laskar and Valluri/Merritt. keep_calm : bool (optional) If something fails when solving for the frequency of a given component, ``keep_calm`` determines whether to throw a RuntimeError or exit gracefully. If set to ``True``, will exit quietly and carry on to the next component. If ``False``, will die if any frequency determination fails. """ def __init__(self, t, p=1, keep_calm=False): n = len(t) self.n = check_for_primes(n) if self.n != len(t): logger.debug("Truncating time series to length={0} to avoid large " "prime divisors.".format(self.n)) # array of times self.t = t[:self.n] # average time t_avg = 0.5 * (self.t[-1] + self.t[0]) # re-center time so middle is 0 self.tz = self.t - t_avg self.dt = np.abs(self.tz[1] - self.tz[0]) # time window size: time series goes from -T to T self.T = 0.5 * (self.t[-1] - self.t[0]) # pre-compute values of Hamming filter for this window # see, e.g., C. Hunter (2001) for a description of why you might want # p not equal to 1 self.chi = hamming(self.tz/self.T, p) # when solving for frequencies and removing components from the time # series, if something fails for a given component and keep_calm is set # to True, SuperFreq will exit gracefully instead of throwing a # RuntimeError self.keep_calm = keep_calm
[docs] def frequency(self, f, omega0=None, return_fft=False): """ Find the most significant frequency of a (complex) time series, :math:`f(t)`, by Fourier transforming the function convolved with a Hanning filter and picking the most significant peak. This assumes the time series, `f`, is aligned with / given at the times specified when constructing this object. An internal function. Parameters ---------- f : array_like Complex time-series, e.g., :math:`x(t) + i \, v_x(t)`. omega0 : numeric (optional) Force finding the peak around the input freuency. return_fft : bool (optional) Return the FFT along with the most significant frequency. Returns ------- freq : numeric The strongest frequency in the specified complex time series, ``f``. omegas : :class:`~numpy.ndarray` An array of frequencies. fft : :class:`~numpy.ndarray` The FFT of the input time series `f`. """ f = np.array(f) if len(f) > self.n: logger.warning("Truncating time series to match shape of time array" " ({0}) ({1})".format(len(f), self.n)) f = f[:self.n] elif len(f) < self.n: raise ValueError("Input time-series, f, has less elements than time" " array, t.") # take Fourier transform of input (complex) function f t1 = time.time() fff = fft(f) / np.sqrt(self.n) # NUMPY FFT logger.log(0, "Took {} seconds to FFT.".format(time.time()-t1)) # frequencies omegas = 2*np.pi*fftfreq(f.size, self.dt) if omega0 is None: # omega_max_ix is the initial guess / centering frequency for # optimization against the Hanning-convolved Fourier spectrum abs_fff = np.abs(fff) omega_max_ix = abs_fff.argmax() if np.allclose(abs_fff[omega_max_ix], 0): # return early -- "this may be an axial or planar orbit" logger.debug("Returning early - may be an axial or planar " "orbit?") return 0. # frequency associated with the peak index omega0 = omegas[omega_max_ix] # real and complex part of input time series Re_f = f.real.copy() Im_f = f.imag.copy() freq = naff_frequency(omega0, self.tz, self.chi, Re_f, Im_f, self.T) if return_fft: return freq, omegas, fff else: return freq
[docs] def frecoder(self, f, nintvec=12, break_condition=1E-7): """ For a given number of iterations, or until the break condition is met: solve for strongest frequency of the input time series, subtract it from the time series, and iterate. Parameters ---------- f : array_like Complex time-series, e.g., :math:`x(t) + i \, v_x(t)`. nintvec : int (optional) Number of integer vectors to find or number of frequencies to find and subtract. break_condition : numeric (optional) Break the iterations of the time series maximum value or amplitude of the subtracted frequency is smaller than this value. Set to ``None`` if you want to always iterate for `nintvec` frequencies. Returns ------- omega : :class:`numpy.ndarray` Array of frequencies for each component in the time series. ampl : :class:`numpy.ndarray` Array of real amplitudes for each component in the time series. phi : :class:`numpy.ndarray` Array of phases for the complex amplitudes for each component in the time series. """ # initialize container arrays ecap = np.zeros((nintvec, len(self.t)), dtype=np.complex64) omega = np.zeros(nintvec) A = np.zeros(nintvec) phi = np.zeros(nintvec) fk = f.copy() logger.log(5, "-"*50) logger.log(5, "k ωk Ak φk(deg) ak") broke = False for k in range(nintvec): try: omega[k] = self.frequency(fk) except RuntimeError: if self.keep_calm: broke = True break else: raise if k == 0: # compute exp(iωt) for first frequency ecap[k] = np.exp(1j*omega[k]*self.t) else: ecap[k] = self.gso(ecap, omega[k], k) # get complex amplitude by projecting exp(iωt) on to f(t) ab = self.hanning_product(fk, ecap[k]) A[k] = np.abs(ab) phi[k] = np.arctan2(ab.imag, ab.real) # remove the new orthogonal frequency component from the f_k fk -= ab*ecap[k] logger.log(5, "{} {:.6f} {:.6f} {:.2f} {:.6f}" .format(k, omega[k], A[k], np.degrees(phi[k]), ab)) if break_condition is not None and A[k] < break_condition: broke = True break if broke: return omega[:k], A[:k], phi[:k] else: return omega[:k+1], A[:k+1], phi[:k+1]
[docs] def hanning_product(self, u1, u2): r""" Compute the scalar product of two 'vectors', `u1` and `u2`. The scalar product is defined with the Hanning filter as .. math:: <u_1, u_2> = \frac{1}{2 T} \int \, u_1(t) \, \chi(t) \, u_2^*(t)\,dt Parameters ---------- u1 : array_like u2 : array_like Returns ------- prod : float Scalar product. """ # First find complex conjugate of vector u2 and construct integrand integ = u1 * np.conj(u2) * self.chi integ_r = np.ascontiguousarray(integ.real) integ_i = np.ascontiguousarray(integ.imag) # Integrate the real part real = simpson(integ_r, self.dt) # Integrate Imaginary part imag = simpson(integ_i, self.dt) return (real + 1j*imag) / (2.*self.T)
[docs] def gso(self, ecap, omega, k): r""" Gram-Schmidt orthonormalization of the time series. ..math:: e_k(t) = \exp (i \omega_k t) with all previous functions. Parameters ---------- ecap : array_like omega : numeric Frequency of current component. k : int Index of maximum frequency found so far. Returns ------- ei : :class:`numpy.ndarray` Orthonormalized time series. """ # coefficients c_ik = np.zeros(k, dtype=np.complex64) u_n = np.exp(1j*omega*self.t) # first find the k complex constants cik(k,ndata): for j in range(k): c_ik[j] = self.hanning_product(u_n, ecap[j]) # Now construct the orthogonal vector e_i = u_n - np.sum(c_ik[:,np.newaxis]*ecap[:k], axis=0) # Now normalize this vector prod = self.hanning_product(e_i, e_i) norm = 1. / np.sqrt(prod) if prod == 0.: norm = 0. + 0j return e_i*norm
[docs] def find_fundamental_frequencies(self, fs, min_freq=1E-6, min_freq_diff=1E-6, **frecoder_kwargs): """ Solve for the fundamental frequencies of each specified time series. This is most commonly a 2D array, a tuple, or iterable of individual complex time series. For example, if your orbit is 2D, you might pass in a tuple with :math:`x +i \, v_x` as the 0th element and :math:`y +i \, v_y` as the 1st element. Any extra keyword arguments are passed to `SuperFreq.frecoder()`. Parameters ---------- fs : array_like, iterable The iterable of (complex) time series. If an array-like object, should be 2D with length along axis=0 equal to the number of time series. See description above. min_freq : numeric (optional) The minimum (absolute) frequency value to consider a non-zero frequency component. min_freq_diff : numeric (optional) The minimum (absolute) frequency difference to distinguish two frequencies. **frecoder_kwargs Any extra keyword arguments are passed to `SuperFreq.frecoder()`. Returns ------- freqs : :class:`numpy.ndarray` The fundamental frequencies of the orbit. This will have the same number of elements as the dimensionality of the orbit. table : :class:`numpy.ndarray` The full table of frequency modes, amplitudes, and phases for all components detected in the FFT. freq_ixes : :class:`numpy.ndarray` The indices of the rows of the table that correspond to the modes identified as the fundamental frequencies. """ # containers freqs = [] As = [] amps = [] phis = [] component_ix = [] nfreqstotal = 0 ndim = len(fs) for i in range(ndim): omega,A,phi = self.frecoder(fs[i][:self.n], **frecoder_kwargs) freqs.append(omega) # angular frequencies As.append(A*np.exp(1j*phi)) # complex amplitudes amps.append(A) # abs amplitude phis.append(phi) # phase angle component_ix.append(np.zeros_like(omega) + i) # index of the component nfreqstotal += len(omega) d = np.zeros(nfreqstotal, dtype=list(zip(('freq','A','|A|','phi','idx'), ('f8','c8','f8','f8',np.int)))) d['freq'] = np.concatenate(freqs) d['A'] = np.concatenate(As) d['|A|'] = np.concatenate(amps) d['phi'] = np.concatenate(phis) d['idx'] = np.concatenate(component_ix).astype(int) # sort terms by amplitude d = d[d['|A|'].argsort()[::-1]] # reverse argsort for descending # container arrays for return fund_freqs = np.zeros(ndim) ffreq_ixes = np.zeros(ndim, dtype=int) comp_ixes = np.zeros(ndim, dtype=int) # first frequency is largest amplitude, nonzero freq. ixes = np.where(np.abs(d['freq']) > min_freq)[0] fund_freqs[0] = d[ixes[0]]['freq'] ffreq_ixes[0] = ixes[0] comp_ixes[0] = d[ixes[0]]['idx'] if ndim == 1: return SuperFreqResult(fund_freqs, d, ffreq_ixes) # choose the next nontrivially related frequency in a different # component as the 2nd fundamental frequency abs_freq1 = np.abs(fund_freqs[0]) ixes = np.where((np.abs(d['freq']) > min_freq) & (d['idx'] != d[ffreq_ixes[0]]['idx']) & # different component index (np.abs(abs_freq1 - np.abs(d['freq'])) > min_freq_diff))[0] fund_freqs[1] = d[ixes[0]]['freq'] ffreq_ixes[1] = ixes[0] comp_ixes[1] = d[ixes[0]]['idx'] if ndim == 2: return SuperFreqResult(fund_freqs[comp_ixes.argsort()], d, ffreq_ixes[comp_ixes.argsort()]) # third frequency is the largest amplitude frequency in the remaining # component dimension abs_freq2 = np.abs(fund_freqs[1]) ixes = np.where((np.abs(d['freq']) > min_freq) & (d['idx'] != d[ffreq_ixes[0]]['idx']) & # different component index (d['idx'] != d[ffreq_ixes[1]]['idx']) & # different component index (np.abs(abs_freq1 - np.abs(d['freq'])) > min_freq_diff) & (np.abs(abs_freq2 - np.abs(d['freq'])) > min_freq_diff))[0] if len(ixes) == 0 and self.keep_calm: # may be a planar orbit logger.warning("May be a planar orbit") return SuperFreqResult(fund_freqs[comp_ixes.argsort()], d, ffreq_ixes[comp_ixes.argsort()]) fund_freqs[2] = d[ixes[0]]['freq'] ffreq_ixes[2] = ixes[0] comp_ixes[2] = d[ixes[0]]['idx'] return SuperFreqResult(fund_freqs[comp_ixes.argsort()], d, ffreq_ixes[comp_ixes.argsort()])
[docs]def find_integer_vectors(freqs, table, max_int=12): r""" Given the fundamental frequencies and table of all frequency components, determine how each frequency component is related to the fundamental frequencies (e.g., determine the integer vector for each frequency such that :math:`n\cdot \Omega \approx 0.`). These are the non-zero Fourier modes. Parameters ---------- freqs : array_like The fundamental frequencies. table : structured array The full table of frequency modes, amplitudes, and phases for all components detected in the FFT. max_int : int (optional) The integer vectors considered will go from ``-max_int`` to ``max_int`` in each dimension. Returns ------- nvec : :class:`numpy.ndarray` An array of integer vectors that correspond to the frequency components. """ # make sure the fundamental frequencies are a numpy array freqs = np.array(freqs) nfreqs = len(freqs) ncomponents = len(table) # define meshgrid of integer vectors grid = np.meshgrid(*[np.arange(-max_int, max_int+1, dtype=int) for i in range(nfreqs)]) nvecs = np.vstack(list(map(np.ravel, grid))).T # integer vectors d_nvec = np.zeros((ncomponents, nfreqs)).astype(int) for i in range(ncomponents): errs = np.abs(nvecs.dot(freqs) - table[i]['freq']) d_nvec[i] = nvecs[errs.argmin()] return d_nvec.T
[docs]def closest_resonance(freqs, max_int=12): r""" Find the closest resonant vector for the given set of fundamental frequencies. Parameters ---------- freqs : array_like The fundamental frequencies. max_int : int (optional) The integer vectors considered will go from ``-max_int`` to ``max_int`` in each dimension. Returns ------- intvec : :class:`numpy.ndarray` The integer vector of the closest resonance. dist : float The distance to the closest resonance, e.g., if :math:`\boldsymbol{n}` is the resonant integer vector, this is just :math:`\boldsymbol{n} \cdot \boldsymbol{\Omega}`. """ # make sure the fundamental frequencies are a numpy array freqs = np.array(freqs) # define meshgrid of integer vectors nfreqs = len(freqs) slc = [slice(-max_int, max_int+1, None)] * nfreqs nvecs = np.vstack(np.vstack(np.mgrid[slc].T)) ndf = nvecs.dot(freqs) min_ix = ndf.argmin() return nvecs[min_ix], ndf[min_ix]
[docs]def find_frequencies(orbit, force_box=False, silently_fail=True, **kwargs): """ Compute the fundamental frequencies of an orbit, ``w``. If not forced, this function tries to figure out whether the input orbit is a tube or box orbit and then uses the appropriate set of coordinates (Poincaré polar coordinates for tube, ordinary Cartesian for box). Any extra keyword arguments (``kwargs``) are passed to `SuperFreq.find_fundamental_frequencies`. Requires Gala. Parameters ---------- orbit : :class:`gala.dynamics.CartesianOrbit` The orbit to analyze. force_box : bool (optional) Force the routine to assume the orbit is a box orbit. Default is ``False``. silently_fail : bool (optional) Return NaN's and None's if SuperFreq fails, rather than raising an exception. **kwargs Any extra keyword arguments are passed to `SuperFreq.find_fundamental_frequencies`. """ from gala.coordinates import cartesian_to_poincare_polar if len(orbit.shape) > 1 and orbit.shape[-1] > 1: raise ValueError("Only works for a single orbit.") # now get other frequencies if force_box: is_tube = False else: circ = orbit.circulation() is_tube = np.any(circ) naff = SuperFreq(orbit.t.value) d = None ixes = None if is_tube: # need to flip coordinates until circulation is around z axis new_orbit = orbit.align_circulation_with_z() # new_ws = align_circulation_with_z(w, circ) if orbit.shape[-1]: new_ws = new_orbit.w()[...,0].T else: new_ws = new_orbit.w().T new_ws = cartesian_to_poincare_polar(new_ws) fs = [(new_ws[:,j] + 1j*new_ws[:,j+3]) for j in range(3)] if silently_fail: try: logger.info('Solving for Rφz frequencies...') fRphiz,d,ixes = naff.find_fundamental_frequencies(fs, **kwargs) except: fRphiz = np.ones(3)*np.nan else: logger.info('Solving for Rφz frequencies...') fRphiz,d,ixes = naff.find_fundamental_frequencies(fs, **kwargs) freqs = fRphiz else: if orbit.shape[-1]: ws = orbit.w()[...,0].T else: ws = orbit.w().T # first get x,y,z frequencies logger.info('Solving for XYZ frequencies...') fs = [(ws[:,j] + 1j*ws[:,j+3]) for j in range(3)] if silently_fail: try: fxyz,d,ixes = naff.find_fundamental_frequencies(fs, **kwargs) except: fxyz = np.ones(3)*np.nan else: fxyz,d,ixes = naff.find_fundamental_frequencies(fs, **kwargs) freqs = fxyz return freqs, d, ixes, is_tube
[docs]def compute_actions(freqs, table, max_int=12): """ Reconstruct approximations to the actions using Percival's equation. Approximate the values of the actions using a Fourier decomposition. You must pass in the frequencies and frequency table determined from and orbit in Cartesian coordinates. For example, see: - `Valluri & Merritt (1999) <http://arxiv.org/abs/astro-ph/9906176>`_ - `Percival (1974) <http://iopscience.iop.org/0301-0015/7/7/005>`_ Parameters ---------- freqs : array_like The fundamental frequencies. table : structured array The full table of frequency modes, amplitudes, and phases for all components detected in the FFT. max_int : int (optional) The integer vectors considered will go from ``-max_int`` to ``max_int`` in each dimension. Returns ------- actions : :class:`numpy.ndarray` Numerical estimates of the orbital actions. """ # NOTE: THIS IS BORKED raise NotImplementedError("Sorry.") ndim = len(freqs) # get integer vectors for each component nvecs = find_integer_vectors(freqs, table, max_int=max_int) # container to store |X_k|^2 amp2 = np.zeros([2*max_int+2]*ndim) for nvec, row in zip(nvecs, table): slc = [slice(x+max_int, x+max_int+1, None) for x in nvec] amp2[slc] += (row['A']*row['A'].conj()).real Js = np.zeros(ndim) for nvec in nvecs: slc = [slice(x+max_int, x+max_int+1, None) for x in nvec] Js += nvec * nvec.dot(freqs) * float(amp2[slc]) return Js