Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
280 changes: 126 additions & 154 deletions PyFHD/beam_setup/antenna.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
import importlib_resources
import numpy as np
from numpy.typing import NDArray
from logging import Logger
from typing import Literal

import astropy
import numpy as np
from astropy.constants import c
from astropy.coordinates import SkyCoord, EarthLocation
from astropy import units
from astropy.time import Time
from numpy.typing import NDArray
from pyuvdata import BeamInterface, UVBeam
from pyuvdata.analytic_beam import AnalyticBeam
from scipy.interpolate import interp1d
from PyFHD.beam_setup.mwa import dipole_mutual_coupling

from PyFHD.pyfhd_tools.unit_conv import pixel_to_radec, radec_to_altaz
from pyuvdata import ShortDipoleBeam, BeamInterface, UVBeam
from pyuvdata.telescopes import known_telescope_location
from pyuvdata.analytic_beam import AnalyticBeam
from typing import Literal
from astropy.time import Time


def init_beam(obs: dict, pyfhd_config: dict, logger: Logger) -> dict:
Expand Down Expand Up @@ -91,18 +92,10 @@ def init_beam(obs: dict, pyfhd_config: dict, logger: Logger) -> dict:
coords = np.array([xc_arr, yc_arr, zc_arr])
# Get the delays
delays = obs["delays"] * 4.35e-10
if pyfhd_config["dipole_mutual_coupling_factor"]:
coupling = dipole_mutual_coupling(freq_center)
else:
coupling = np.tile(
np.identity(n_dipoles), (n_ant_pol, freq_center.size, 1, 1)
)

else:
n_dipoles = 1
coords = np.zeros((3, n_dipoles))
delays = np.zeros(n_dipoles)
coupling = np.tile(np.identity(n_dipoles), (n_ant_pol, freq_center.size, 1, 1))

# Create basic antenna dictionary
antenna = {
Expand All @@ -117,14 +110,14 @@ def init_beam(obs: dict, pyfhd_config: dict, logger: Logger) -> dict:
"beam_model_version": pyfhd_config["beam_model_version"],
"freq": freq_center,
"nfreq_bin": nfreq_bin,
"n_ant_elements": 0,
"n_ant_elements": n_dipoles,
# Anything that was pointer arrays in IDL will be None until assigned in Python
"jones": None,
"coupling": coupling,
"gain": np.ones([n_ant_pol, freq_center.size, n_dipoles], dtype=np.float64),
"coords": coords,
"delays": delays,
"response": None,
"iresponse": None,
"projection": None,
# PyFHD supports one instrument at a time, so we setup the group so they're all in the same group.
"group_id": np.zeros([n_ant_pol, obs["n_tile"]], dtype=np.int8),
"pix_window": None,
Expand Down Expand Up @@ -174,7 +167,9 @@ def init_beam(obs: dict, pyfhd_config: dict, logger: Logger) -> dict:
psf["pix_horizon"] = obs["dimension"] / psf["scale"]
psf["superres_dim"] = psf["dim"] * psf["resolution"]

location = EarthLocation.of_site(obs["instrument"])
location = EarthLocation.from_geodetic(
lon=obs["lon"], lat=obs["lat"], height=obs["alt"]
)

# Get the zenith angle and azimuth angle arrays
xvals_celestial, yvals_celestial = np.meshgrid(
Expand All @@ -193,7 +188,8 @@ def init_beam(obs: dict, pyfhd_config: dict, logger: Logger) -> dict:
)
ra_arr, dec_arr = pixel_to_radec(xvals_celestial, yvals_celestial, obs["astr"])
del xvals_celestial, yvals_celestial
valid_i = np.where(np.isfinite(ra_arr))
# Only keep the pixels that are above the horizon to save memory
valid_i = np.nonzero(np.isfinite(ra_arr))
ra_arr = ra_arr[valid_i]
dec_arr = dec_arr[valid_i]
alt_arr, az_arr = radec_to_altaz(
Expand All @@ -204,47 +200,108 @@ def init_beam(obs: dict, pyfhd_config: dict, logger: Logger) -> dict:
location.height.value,
jdate_use,
)
zenith_angle_arr = np.full([psf["image_dim"], psf["image_dim"]], 90)
zenith_angle_arr[valid_i] = 90 - alt_arr.value
# Initialize the azimuth angle array in degrees
azimuth_arr = np.zeros([psf["image_dim"], psf["image_dim"]])
azimuth_arr[valid_i] = az_arr.value
# astropy's WCS is being used to go from x/y grid values to RA/Dec
# then using SkyCoord to go from RA/Dec to alt/az
# the first step should drop pixels beyond the horizon, but occasionally a
# few pixels survive that step but have negative altitudes after the second
# step which breaks things downstream. drop those pixels.
# TODO: understand this apparent disagreement between WCS & SkyCoord...
good_alt = np.nonzero(alt_arr >= 0.0)[0]

# valid_i is a tuple with per-dimension indices.
valid_i = list(valid_i)
for ind, elem in enumerate(valid_i):
valid_i[ind] = elem[good_alt]
valid_i = tuple(valid_i)

ra_arr = ra_arr[good_alt]
dec_arr = dec_arr[good_alt]
alt_arr = alt_arr[good_alt]
az_arr = az_arr[good_alt]

# Save some memory by deleting the unused arrays
del ra_arr, dec_arr, alt_arr, az_arr
del ra_arr, dec_arr

if pyfhd_config["instrument"] == "mwa":
mwa_beam_file = importlib_resources.files(
"PyFHD.resources.instrument_config"
).joinpath("mwa_full_embedded_element_pattern.h5")
if not mwa_beam_file.exists():
# Download the MWA beam file if it does not exist
raise FileNotFoundError(
f"MWA beam file {mwa_beam_file} does not exist. "
"Please download it from http://ws.mwatelescope.org/static/mwa_full_embedded_element_pattern.h5 into the."
f"directory {mwa_beam_file.parent}"
)
beam = UVBeam.from_file(mwa_beam_file, delays=obs["delays"])
# If you wish to add a different insturment, do it by adding a new elif here
# Convert to radians for pyuvdata functions
zenith_angle_arr = np.deg2rad(90 - alt_arr)
azimuth_arr = np.deg2rad(az_arr)

# Store pixel indices above the horizon
antenna["pix_use"] = np.ravel_multi_index(
valid_i, (psf["image_dim"], psf["image_dim"])
)

# Save some memory by deleting the unused arrays
del alt_arr, az_arr, valid_i

if pyfhd_config["analytic_beam_yaml"] is not None:
beam = pyfhd_config["analytic_beam_yaml"]
else:
# Do an analytic beam as a placeholder
beam = ShortDipoleBeam()
# only read in the range of beam frequencies we need. add a buffer
# to ensure that we have enough outside our range for interpolation
uvbeam_kwargs = {}
if pyfhd_config["instrument"] == "mwa":
uvbeam_kwargs["delays"] = obs["delays"]

if pyfhd_config["uvbeam_freq_buffer"] is not None:
freq_range = [
np.min(obs["baseline_info"]["freq"])
- pyfhd_config["uvbeam_freq_buffer"],
np.max(obs["baseline_info"]["freq"])
+ pyfhd_config["uvbeam_freq_buffer"],
]
uvbeam_kwargs["freq_range"] = freq_range

# select above the horizon (this only does something for beamfits files)
# but it saves some memory in that case
uvbeam_kwargs["za_range"] = [0, 90.0]

if pyfhd_config["uvbeam_file_path"] is not None:
beam = UVBeam.from_file(pyfhd_config["uvbeam_file_path"], **uvbeam_kwargs)
elif pyfhd_config["instrument"] == "mwa":
# fall back to looking for the MWA beam in resources folder (older pattern)
mwa_beam_file = importlib_resources.files(
"PyFHD.resources.instrument_config"
).joinpath("mwa_full_embedded_element_pattern.h5")
if not mwa_beam_file.exists():
# Download the MWA beam file if it does not exist
raise FileNotFoundError(
f"MWA beam file {mwa_beam_file} does not exist. "
"Please download it from http://ws.mwatelescope.org/static/mwa_full_embedded_element_pattern.h5 into the."
f"directory {mwa_beam_file.parent}"
)
beam = UVBeam.from_file(mwa_beam_file, **uvbeam_kwargs)

# check for nans in beam. If they can be removed by a horizon cut do it.
if np.any(np.isnan(beam.data_array)):
if beam.pixel_coordinate_system != "healpix":
above_hor_inc = np.nonzero(beam.axis2_array > (np.pi / 2))[0]
above_hor_exc = np.nonzero(beam.axis2_array >= (np.pi / 2))[0]
if not np.any(np.isnan(beam.data_array[above_hor_inc])):
logger.info("Cutting the beam below the horizon to remove NaNs.")
beam.select(axis2_inds=above_hor_inc)
elif not np.any(np.isnan(beam.data_array[above_hor_exc])):
logger.info(
"Cutting the beam at and below the horizon to remove NaNs."
)
beam.select(axis2_inds=above_hor_exc)
else:
raise ValueError(
"UVBeam object has NaNs in the data array above the horizon."
)

# Get the jones matrix for the antenna
antenna["jones"] = general_jones_matrix(
# shape is: (number of vector directions (usually 2), number of feeds (usually 2),
# number of frequencies, number of directions on the sky)
antenna["iresponse"], antenna["projection"] = general_jones_matrix(
beam,
za_array=zenith_angle_arr,
az_array=azimuth_arr,
freq_array=frequency_array,
za_array=zenith_angle_arr.flatten(),
az_array=azimuth_arr.flatten(),
freq_array=freq_center,
telescope_location=location,
)

# Get the antenna response
antenna["response"] = general_antenna_response(
obs,
antenna,
za_arr=zenith_angle_arr,
az_arr=azimuth_arr,
)
# remove the initial shallow dimension in iresponse
antenna["iresponse"] = antenna["iresponse"][0]

return antenna, psf, beam

Expand Down Expand Up @@ -276,7 +333,7 @@ def general_jones_matrix(
Parameters
----------
beam_obj : UVBeam or AnalyticBeam or BeamInterface
A pyuvdata beam, can be a UVBeam, and AnalyticBeam subclass, or a
A pyuvdata beam, can be a UVBeam, an AnalyticBeam subclass, or a
BeamInterface object.
alt_array : np.ndarray[float]
Array of altitudes (also called elevations) in radians. Must be a 1D array.
Expand Down Expand Up @@ -344,9 +401,6 @@ def general_jones_matrix(
f"It was {az_convention}."
)

# FHD requires an Efield beam, so set it here to be explicit
beam = BeamInterface(beam_obj, beam_type="efield")

if ra_dec_in:
if ra_array.shape != dec_array.shape:
raise ValueError("ra_array and dec_array must have the same shape")
Expand Down Expand Up @@ -382,110 +436,28 @@ def general_jones_matrix(
where_neg_az = np.nonzero(noe_az_array < 0)
noe_az_array[where_neg_az] = noe_az_array[where_neg_az] + np.pi * 2.0

# use the faster interpolation method if appropriate
if beam._isuvbeam and beam.beam.pixel_coordinate_system == "az_za":
interpol_fn = "az_za_map_coordinates"
if isinstance(beam_obj, UVBeam):
f_obj, k_obj = beam_obj.decompose_feed_iresponse_projection()
f_beam = BeamInterface(f_obj)
k_beam = BeamInterface(k_obj)
else:
interpol_fn = None
f_beam = BeamInterface(beam_obj, beam_type="feed_iresponse")
k_beam = BeamInterface(beam_obj, beam_type="feed_projection")

return beam.compute_response(
f_vals = f_beam.compute_response(
az_array=noe_az_array,
za_array=za_array,
freq_array=freq_array,
interpolation_function=interpol_fn,
spline_opts=spline_opts,
check_azza_domain=check_azza_domain,
)


def general_antenna_response(
obs: dict,
antenna: dict,
za_arr: NDArray[np.floating],
az_arr: NDArray[np.floating],
) -> NDArray[np.complexfloating]:
"""
Calculate the response of a set of antennas for a given observation and antenna configuration,
including the electrical delays and coupling.

Parameters
----------
obs : dict
Observation metadata dictionary
antenna : dict
Antenna metadata dictionary
za_arr : NDArray[np.floating]
Zenith angle array in radians
az_arr : NDArray[np.floating]
Azimuth angle array in radians

Returns
-------
response
The unpolarised response of a set of antennas
"""
light_speed = c.value

"""
Given that in FHD the antenna response is a pointer array of shape (antenna["n_pol", obs["n_tile"])
where each pointer is an array of pointers of shape (antenna["n_freq_bin"]). Each pointer in the array
of shape (antenna["n_freq_bin"]) points to a complex array of shape (antenna["pix_use"].size,).

Furthermore, when the antenna response is calculated, it looks like this is done on a per frequency bin
basis and each tile will point to the same antenna response for that frequency bin. This means we can ignore
the tile dimension and just calculate the antenna response for each frequency bin and polarization to save
memory in Python.
"""
response = np.zeros(
[antenna["n_pol"], antenna["nfreq_bin"], antenna["pix_use"].size],
dtype=np.complex128,
)

# Calculate projections only at locations of non-zero pixels
proj_east_use = np.sin(za_arr[antenna["pix_use"]]) * np.sin(
az_arr[antenna["pix_use"]]
)
proj_north_use = np.sin(za_arr[antenna["pix_use"]]) * np.cos(
az_arr[antenna["pix_use"]]
k_vals = k_beam.compute_response(
az_array=noe_az_array,
za_array=za_array,
freq_array=freq_array,
spline_opts=spline_opts,
check_azza_domain=check_azza_domain,
)
proj_z_use = np.cos(za_arr[antenna["pix_use"]])

# FHD assumes you might be dealing with more than one antenna, hence the groupings it used.
# PyFHD currently only supports one antenna, so we can ignore the groupings.
for pol_i in range(antenna["n_pol"]):
# Phase of each dipole for the source (relative to the beamformer settings)
D_d = (
np.outer(antenna["coords"][0], proj_east_use)
+ np.outer(antenna["coords"][1], proj_north_use)
+ np.outer(antenna["coords"][2], proj_z_use)
)

for freq_i in range(antenna["nfreq_bin"]):
Kconv = 2 * np.pi * antenna["freq"][freq_i] / light_speed
voltage_delay = np.exp(
1j
* 2
* np.pi
* antenna["delays"]
* antenna["freq"][freq_i]
* antenna["gain"][pol_i, freq_i]
)
# TODO: Check if it's actually outer, although it does look like voltage_delay is likely 1D
measured_current = np.outer(
voltage_delay, antenna["coupling"][pol_i, freq_i]
)
zenith_norm = np.outer(
np.ones(antenna["n_ant_elements"]),
antenna["coupling"][pol_i, freq_i],
)
measured_current /= zenith_norm

# TODO: This loop can probably be vectorized
for ii in range(antenna["n_ant_elements"]):
# TODO: check the way D_d needs to be indexed
antenna_gain_arr = np.exp(-1j * Kconv * D_d[ii, :])
response[pol_i, freq_i] += (
antenna_gain_arr * measured_current[ii] / antenna["n_ant_elements"]
)

return response
return f_vals, k_vals
Loading
Loading