diff --git a/imap_processing/cli.py b/imap_processing/cli.py index bf1501f50..f4ceee950 100644 --- a/imap_processing/cli.py +++ b/imap_processing/cli.py @@ -856,19 +856,33 @@ def do_processing( # noqa: PLR0912 elif self.data_level == "l1c": if "pset" in self.descriptor: # L1C PSET processing - science_paths = dependencies.get_file_paths( - source="hi", data_type="l1b" + l1b_de_paths = dependencies.get_file_paths( + source="hi", data_type="l1b", descriptor="de" ) - if len(science_paths) != 1: + if len(l1b_de_paths) != 1: raise ValueError( - f"Expected only one science dependency. Got {science_paths}" + f"Expected exactly one DE science dependency. " + f"Got {l1b_de_paths}" ) anc_paths = dependencies.get_file_paths(data_type="ancillary") if len(anc_paths) != 1: raise ValueError( - f"Expected only one ancillary dependency. Got {anc_paths}" + f"Expected exactly one ancillary dependency. Got {anc_paths}" + ) + # Load goodtimes dependency + goodtimes_paths = dependencies.get_file_paths( + source="hi", data_type="l1b", descriptor="goodtimes" + ) + if len(goodtimes_paths) != 1: + raise ValueError( + f"Expected exactly one goodtimes dependency. " + f"Got {goodtimes_paths}" ) - datasets = hi_l1c.hi_l1c(load_cdf(science_paths[0]), anc_paths[0]) + datasets = hi_l1c.hi_l1c( + load_cdf(l1b_de_paths[0]), + anc_paths[0], + load_cdf(goodtimes_paths[0]), + ) elif self.data_level == "l2": science_paths = dependencies.get_file_paths(source="hi", data_type="l1c") anc_dependencies = dependencies.get_processing_inputs(data_type="ancillary") diff --git a/imap_processing/hi/hi_l1c.py b/imap_processing/hi/hi_l1c.py index c8e57c1de..57c80f4b4 100644 --- a/imap_processing/hi/hi_l1c.py +++ b/imap_processing/hi/hi_l1c.py @@ -24,10 +24,11 @@ SpiceFrame, frame_transform, frame_transform_az_el, + get_spacecraft_to_instrument_spin_phase_offset, ) from imap_processing.spice.repoint import get_pointing_times from imap_processing.spice.spin import ( - get_instrument_spin_phase, + get_spacecraft_spin_phase, get_spin_data, ) from imap_processing.spice.time import met_to_ttj2000ns, ttj2000ns_to_et @@ -40,22 +41,21 @@ def hi_l1c( - de_dataset: xr.Dataset, calibration_prod_config_path: Path + de_dataset: xr.Dataset, + calibration_prod_config_path: Path, + goodtimes_ds: xr.Dataset, ) -> list[xr.Dataset]: """ High level IMAP-Hi l1c processing function. - This function will be expanded once the l1c processing is better defined. It - will need to add inputs such as Ephemerides, Goodtimes inputs, and - instrument status summary and will output a Pointing Set CDF as well as a - Goodtimes list (CDF?). - Parameters ---------- de_dataset : xarray.Dataset IMAP-Hi l1b de product. calibration_prod_config_path : pathlib.Path Calibration product configuration file. + goodtimes_ds : xarray.Dataset + Goodtimes dataset with cull_flags. Returns ------- @@ -64,15 +64,17 @@ def hi_l1c( """ logger.info("Running Hi l1c processing") - # TODO: I am not sure what the input for Goodtimes will be so for now, - # If the input is an xarray Dataset, do pset processing - l1c_dataset = generate_pset_dataset(de_dataset, calibration_prod_config_path) + l1c_dataset = generate_pset_dataset( + de_dataset, calibration_prod_config_path, goodtimes_ds + ) return [l1c_dataset] def generate_pset_dataset( - de_dataset: xr.Dataset, calibration_prod_config_path: Path + de_dataset: xr.Dataset, + calibration_prod_config_path: Path, + goodtimes_ds: xr.Dataset, ) -> xr.Dataset: """ Generate IMAP-Hi l1c pset xarray dataset from l1b product. @@ -83,6 +85,8 @@ def generate_pset_dataset( IMAP-Hi l1b de product. calibration_prod_config_path : pathlib.Path Calibration product configuration file. + goodtimes_ds : xarray.Dataset + Goodtimes dataset with cull_flags. Returns ------- @@ -110,9 +114,11 @@ def generate_pset_dataset( ) pset_dataset.update(pset_geometry(pset_midpoint_et, logical_source_parts["sensor"])) # Bin the counts into the spin-bins - pset_dataset.update(pset_counts(pset_dataset.coords, config_df, de_dataset)) + pset_dataset.update( + pset_counts(pset_dataset.coords, config_df, de_dataset, goodtimes_ds) + ) # Calculate and add the exposure time to the pset_dataset - pset_dataset.update(pset_exposure(pset_dataset.coords, de_dataset)) + pset_dataset.update(pset_exposure(pset_dataset.coords, de_dataset, goodtimes_ds)) # Get the backgrounds pset_dataset.update(pset_backgrounds(pset_dataset.coords)) @@ -322,6 +328,7 @@ def pset_counts( pset_coords: dict[str, xr.DataArray], config_df: pd.DataFrame, l1b_de_dataset: xr.Dataset, + goodtimes_ds: xr.Dataset, ) -> dict[str, xr.DataArray]: """ Bin direct events into PSET spin-bins. @@ -334,6 +341,8 @@ def pset_counts( The calibration product configuration dataframe. l1b_de_dataset : xarray.Dataset The L1B dataset for the pointing being processed. + goodtimes_ds : xarray.Dataset + Goodtimes dataset with cull_flags. Returns ------- @@ -362,11 +371,23 @@ def pset_counts( # Remove DEs with invalid trigger_id. This should only occur for a # pointing with no events that gets a single fill event good_mask = de_ds["trigger_id"].data != de_ds["trigger_id"].attrs["FILLVAL"] + if not np.any(good_mask): + return counts_var + elif not np.all(good_mask): + raise ValueError( + "An event with trigger_id=FILLVAL should only occur for a pointing " + "with no events that gets a single fill event. Events with mixed " + "valid and FILLVAL trigger_ids found." + ) + # Remove DEs not in Goodtimes/angles - good_mask &= good_time_and_phase_mask( - l1b_de_dataset.event_met.values, l1b_de_dataset.spin_phase.values + # For direct events, use nominal_bin (spacecraft spin bin 0-89) to look up goodtimes + goodtimes_mask = good_time_and_phase_mask( + de_ds.event_met.values, + de_ds.nominal_bin.values, + goodtimes_ds, ) - de_ds = de_ds.isel(event_met=good_mask) + de_ds = de_ds.isel(event_met=goodtimes_mask) # Get esa_energy_step for each event (recorded per packet, use ccsds_index) esa_energy_steps = l1b_de_dataset["esa_energy_step"].data[de_ds["ccsds_index"].data] @@ -435,7 +456,9 @@ def pset_backgrounds(pset_coords: dict[str, xr.DataArray]) -> dict[str, xr.DataA def pset_exposure( - pset_coords: dict[str, xr.DataArray], l1b_de_dataset: xr.Dataset + pset_coords: dict[str, xr.DataArray], + l1b_de_dataset: xr.Dataset, + goodtimes_ds: xr.Dataset, ) -> dict[str, xr.DataArray]: """ Calculate PSET exposure time. @@ -446,6 +469,8 @@ def pset_exposure( The PSET coordinates from the xarray.Dataset. l1b_de_dataset : xarray.Dataset The L1B dataset for the pointing being processed. + goodtimes_ds : xarray.Dataset + Goodtimes dataset with cull_flags. Returns ------- @@ -482,15 +507,26 @@ def pset_exposure( # Clock tick MET times are accumulation "edges". To get the mean spin-phase # for a given clock tick, add 1/2 clock tick and compute spin-phase. - spin_phases = np.atleast_1d( - get_instrument_spin_phase( - clock_tick_mets + HiConstants.HALF_CLOCK_TICK_S, - SpiceFrame[f"IMAP_HI_{sensor_number}"], - ) - ) + mid_tick_mets = clock_tick_mets + HiConstants.HALF_CLOCK_TICK_S + + # Compute spacecraft spin phase first (used for goodtimes filtering) + spacecraft_spin_phase = np.atleast_1d(get_spacecraft_spin_phase(mid_tick_mets)) + + # Convert spacecraft spin phase to nominal_bins (0-89) for goodtimes lookup + nominal_bins = (spacecraft_spin_phase * 90).astype(np.int32) + nominal_bins = np.clip(nominal_bins, 0, 89) + + # Compute instrument spin phase from spacecraft spin phase + # This implementation is identical to spin.get_instrument_spin_phase and + # is replicated here to avoid querying the spin dataframe again. + instrument_frame = SpiceFrame[f"IMAP_HI_{sensor_number}"] + phase_offset = get_spacecraft_to_instrument_spin_phase_offset(instrument_frame) + spin_phases = (spacecraft_spin_phase + phase_offset) % 1.0 # Remove ticks not in good times/angles - good_mask = good_time_and_phase_mask(clock_tick_mets, spin_phases) + good_mask = good_time_and_phase_mask( + clock_tick_mets, nominal_bins, goodtimes_ds + ) spin_phases = spin_phases[good_mask] clock_tick_weights = clock_tick_weights[good_mask] @@ -637,22 +673,40 @@ def get_de_clock_ticks_for_esa_step( def good_time_and_phase_mask( - tick_mets: np.ndarray, spin_phases: np.ndarray -) -> npt.NDArray: + mets: np.ndarray, + nominal_bins: np.ndarray, + goodtimes_ds: xr.Dataset, +) -> npt.NDArray[np.bool_]: """ - Filter out the clock tick times that are not in good times and angles. + Filter out times that are not in good times based on the goodtimes dataset. Parameters ---------- - tick_mets : np.ndarray - Clock-tick MET times. - spin_phases : np.ndarray - Spin phases for each clock tick. + mets : np.ndarray + MET times for each event or clock tick. + nominal_bins : np.ndarray + Spacecraft spin bins (0-89) for each event or clock tick. + goodtimes_ds : xarray.Dataset + Goodtimes dataset with cull_flags variable dimensioned (met, spin_bin). Returns ------- keep_mask : np.ndarray - Boolean mask indicating which clock ticks are in good times/phases. + Boolean mask indicating which events/ticks are in good times. """ - # TODO: Implement this once we have Goodtimes data product defined. - return np.full_like(tick_mets, True, dtype=bool) + gt_mets = goodtimes_ds["met"].values + cull_flags = goodtimes_ds["cull_flags"].values + + # Map each event/tick to the nearest goodtimes MET interval + # searchsorted with side='right' - 1 gives the largest MET <= query MET + met_indices = np.searchsorted(gt_mets, mets, side="right") - 1 + met_indices = np.clip(met_indices, 0, len(gt_mets) - 1) + + # Convert nominal_bins to int32 for indexing + spin_bins = nominal_bins.astype(np.int32) + + # Look up cull_flags for each event/tick + # Events are good if cull_flags == 0 + event_cull_flags = cull_flags[met_indices, spin_bins] + + return event_cull_flags == 0 diff --git a/imap_processing/tests/external_test_data_config.py b/imap_processing/tests/external_test_data_config.py index 6aa8238bc..839f67d92 100644 --- a/imap_processing/tests/external_test_data_config.py +++ b/imap_processing/tests/external_test_data_config.py @@ -109,6 +109,7 @@ ("imap_hi_l1b_90sensor-hk_20241105-repoint00099_v001.cdf", "hi/data/l1/"), ("imap_hi_l1a_90sensor-de_20241105-repoint00099_v001.cdf", "hi/data/l1/"), ("imap_hi_l1c_45sensor-pset_20250415_v999.cdf", "hi/data/l1/"), + ("imap_hi_l1b_45sensor-goodtimes_20250415_v999.cdf", "hi/data/l1/"), # I-ALiRT ("apid_478.bin", "ialirt/data/l0/"), diff --git a/imap_processing/tests/hi/test_hi_l1c.py b/imap_processing/tests/hi/test_hi_l1c.py index 6b86f4eb9..de7891a46 100644 --- a/imap_processing/tests/hi/test_hi_l1c.py +++ b/imap_processing/tests/hi/test_hi_l1c.py @@ -16,11 +16,27 @@ from imap_processing.spice.time import met_to_ttj2000ns, ttj2000ns_to_et +@pytest.fixture(scope="module") +def hi_l1b_de_dataset(hi_l1_test_data_path): + """Load the Hi L1B DE test dataset.""" + l1b_de_path = hi_l1_test_data_path / "imap_hi_l1b_45sensor-de_20250415_v999.cdf" + return load_cdf(l1b_de_path) + + +@pytest.fixture(scope="module") +def hi_goodtimes_dataset(hi_l1_test_data_path): + """Load the Hi goodtimes test dataset.""" + goodtimes_path = ( + hi_l1_test_data_path / "imap_hi_l1b_45sensor-goodtimes_20250415_v999.cdf" + ) + return load_cdf(goodtimes_path) + + @mock.patch("imap_processing.hi.hi_l1c.generate_pset_dataset") def test_hi_l1c(mock_generate_pset_dataset, hi_test_cal_prod_config_path): """Test coverage for hi_l1c function""" mock_generate_pset_dataset.return_value = xr.Dataset() - pset = hi_l1c.hi_l1c(xr.Dataset(), hi_test_cal_prod_config_path)[0] + pset = hi_l1c.hi_l1c(xr.Dataset(), hi_test_cal_prod_config_path, xr.Dataset())[0] # Empty attributes, global values get added in post-processing assert pset.attrs == {} @@ -28,7 +44,8 @@ def test_hi_l1c(mock_generate_pset_dataset, hi_test_cal_prod_config_path): @pytest.mark.external_kernel @pytest.mark.external_test_data def test_generate_pset_dataset( - hi_l1_test_data_path, + hi_l1b_de_dataset, + hi_goodtimes_dataset, hi_test_cal_prod_config_path, use_fake_spin_data_for_time, use_fake_repoint_data_for_time, @@ -36,8 +53,7 @@ def test_generate_pset_dataset( ): """Test coverage for generate_pset_dataset function""" use_fake_spin_data_for_time(482372987.999) - l1b_de_path = hi_l1_test_data_path / "imap_hi_l1b_45sensor-de_20250415_v999.cdf" - l1b_dataset = load_cdf(l1b_de_path) + l1b_dataset = hi_l1b_de_dataset l1b_met = l1b_dataset["ccsds_met"].values[0] # Set repoint start and end times. seconds_per_day = 24 * 60 * 60 @@ -45,8 +61,10 @@ def test_generate_pset_dataset( np.asarray([l1b_met - 15 * 60, l1b_met + seconds_per_day]), np.asarray([l1b_met, l1b_met + seconds_per_day + 1]), ) + goodtimes = hi_goodtimes_dataset + l1c_dataset = hi_l1c.generate_pset_dataset( - l1b_dataset, hi_test_cal_prod_config_path + l1b_dataset, hi_test_cal_prod_config_path, goodtimes ) assert l1c_dataset.epoch.data[0] == l1b_dataset.epoch.data[0].astype(np.int64) @@ -115,7 +133,9 @@ def test_generate_pset_dataset_uses_midpoint_time( mock_pset_backgrounds.return_value = {} # Call generate_pset_dataset - _ = hi_l1c.generate_pset_dataset(mock_l1b_dataset, hi_test_cal_prod_config_path) + _ = hi_l1c.generate_pset_dataset( + mock_l1b_dataset, hi_test_cal_prod_config_path, xr.Dataset() + ) # Calculate expected midpoint ET # The PSET dataset should have epoch and epoch_delta based on pointing times @@ -217,22 +237,23 @@ def test_pset_geometry(mock_frame_transform, mock_geom_frame_transform, sensor_s @mock.patch("imap_processing.hi.hi_l1c.get_pointing_times", return_value=(100, 200)) def test_pset_counts( mock_pointing_times, - hi_l1_test_data_path, + hi_l1b_de_dataset, + hi_goodtimes_dataset, hi_test_cal_prod_config_path, ): """Test coverage for pset_counts function.""" - l1b_de_path = hi_l1_test_data_path / "imap_hi_l1b_45sensor-de_20250415_v999.cdf" - l1b_dataset = load_cdf(l1b_de_path) cal_config_df = utils.CalibrationProductConfig.from_csv( hi_test_cal_prod_config_path ) empty_pset = hi_l1c.empty_pset_dataset( 100, - l1b_dataset.esa_energy_step, + hi_l1b_de_dataset.esa_energy_step, cal_config_df.cal_prod_config.calibration_product_numbers, HIAPID.H90_SCI_DE.sensor, ) - counts_var = hi_l1c.pset_counts(empty_pset.coords, cal_config_df, l1b_dataset) + counts_var = hi_l1c.pset_counts( + empty_pset.coords, cal_config_df, hi_l1b_de_dataset, hi_goodtimes_dataset + ) assert "counts" in counts_var @@ -240,14 +261,14 @@ def test_pset_counts( @mock.patch("imap_processing.hi.hi_l1c.get_pointing_times", return_value=(100, 200)) def test_pset_counts_empty_l1b( mock_pointing_times, - hi_l1_test_data_path, + hi_l1b_de_dataset, + hi_goodtimes_dataset, hi_test_cal_prod_config_path, ): """Test coverage for pset_counts function when the input L1b contains no counts.""" - l1b_de_path = hi_l1_test_data_path / "imap_hi_l1b_45sensor-de_20250415_v999.cdf" - l1b_dataset = load_cdf(l1b_de_path) + # Make a copy and modify it - # remove all but one event and set its trigger_id to zero - l1b_dataset = l1b_dataset.isel(event_met=[0]) + l1b_dataset = hi_l1b_de_dataset.isel(event_met=[0]).copy(deep=True) l1b_dataset["trigger_id"].data[0] = 0 cal_config_df = utils.CalibrationProductConfig.from_csv( hi_test_cal_prod_config_path @@ -258,7 +279,9 @@ def test_pset_counts_empty_l1b( cal_config_df.cal_prod_config.calibration_product_numbers, HIAPID.H90_SCI_DE.sensor, ) - counts_var = hi_l1c.pset_counts(empty_pset.coords, cal_config_df, l1b_dataset) + counts_var = hi_l1c.pset_counts( + empty_pset.coords, cal_config_df, l1b_dataset, hi_goodtimes_dataset + ) assert counts_var["counts"].data.sum() == 0 @@ -346,7 +369,7 @@ def test_empty_pset_dataset_arbitrary_cal_prod_numbers(use_fake_repoint_data_for @pytest.mark.external_test_data def test_pset_counts_arbitrary_cal_prod_numbers( - hi_l1_test_data_path, use_fake_repoint_data_for_time + hi_l1b_de_dataset, hi_goodtimes_dataset, use_fake_repoint_data_for_time ): """Test pset_counts with non-sequential calibration product numbers.""" # Create a test calibration product config with non-sequential numbers @@ -358,9 +381,6 @@ def test_pset_counts_arbitrary_cal_prod_numbers( 10,2,0.00085,BC1C2,0,1023,-1023,1023,-1023,1023,0,1023 """ - l1b_de_path = hi_l1_test_data_path / "imap_hi_l1b_45sensor-de_20250415_v999.cdf" - l1b_dataset = load_cdf(l1b_de_path) - cal_config_df = utils.CalibrationProductConfig.from_csv(io.StringIO(csv_content)) # Create PSET with non-sequential calibration product numbers @@ -371,7 +391,7 @@ def test_pset_counts_arbitrary_cal_prod_numbers( empty_pset = hi_l1c.empty_pset_dataset( l1b_met, - l1b_dataset.esa_energy_step, + hi_l1b_de_dataset.esa_energy_step, cal_config_df.cal_prod_config.calibration_product_numbers, HIAPID.H90_SCI_DE.sensor, ) @@ -383,7 +403,9 @@ def test_pset_counts_arbitrary_cal_prod_numbers( with mock.patch( "imap_processing.hi.hi_l1c.get_pointing_times", return_value=(100, 200) ): - counts_var = hi_l1c.pset_counts(empty_pset.coords, cal_config_df, l1b_dataset) + counts_var = hi_l1c.pset_counts( + empty_pset.coords, cal_config_df, hi_l1b_de_dataset, hi_goodtimes_dataset + ) # Verify counts array has correct shape based on coordinates assert "counts" in counts_var @@ -398,20 +420,106 @@ def test_pset_counts_arbitrary_cal_prod_numbers( assert counts_var["counts"].data.shape == expected_shape # Check that total number of expected counts is correct # ABC1C2 is coincidence type 15 - esa_1_2_mask = (l1b_dataset["esa_step"][l1b_dataset["ccsds_index"]] < 3).values - coincidence_15_mask = (l1b_dataset["coincidence_type"] == 15).values + esa_1_2_mask = ( + hi_l1b_de_dataset["esa_step"][hi_l1b_de_dataset["ccsds_index"]] < 3 + ).values + coincidence_15_mask = (hi_l1b_de_dataset["coincidence_type"] == 15).values np.testing.assert_equal( np.sum(counts_var["counts"].data[:, :, 0]), np.sum(coincidence_15_mask & esa_1_2_mask), ) # BC1C2 is coincidence type 7 - coincidence_7_mask = (l1b_dataset["coincidence_type"] == 7).values + coincidence_7_mask = (hi_l1b_de_dataset["coincidence_type"] == 7).values np.testing.assert_equal( np.sum(counts_var["counts"].data[:, :, 1]), np.sum(coincidence_7_mask & esa_1_2_mask), ) +@mock.patch("imap_processing.hi.hi_l1c.get_pointing_times", return_value=(100, 200)) +@mock.patch("imap_processing.hi.hi_l1c.iter_qualified_events_by_config") +def test_pset_counts_goodtimes_filtering( + mock_iter_qualified, + mock_pointing_times, +): + """Test that pset_counts properly filters events based on goodtimes.""" + # Create 10 events: METs 100-109, nominal_bins 0-9, all at spin_phase=0.5 + # (spin_phase 0.5 -> spin_angle_bin 1800) + n_events = 10 + event_mets = np.arange(100.0, 100.0 + n_events) + nominal_bins = np.arange(n_events, dtype=np.uint8) + + l1b_dataset = xr.Dataset( + coords={ + "epoch": xr.DataArray(np.arange(2), dims=["epoch"]), + "event_met": xr.DataArray(event_mets, dims=["event_met"]), + }, + data_vars={ + "trigger_id": xr.DataArray( + np.ones(n_events, dtype=np.uint16), + dims=["event_met"], + attrs={"FILLVAL": 65535}, + ), + "nominal_bin": xr.DataArray(nominal_bins, dims=["event_met"]), + "spin_phase": xr.DataArray(np.full(n_events, 0.5), dims=["event_met"]), + "ccsds_index": xr.DataArray( + np.zeros(n_events, dtype=np.int32), dims=["event_met"] + ), + "esa_energy_step": xr.DataArray( + np.array([1, 1], dtype=np.uint8), + dims=["epoch"], + attrs={"FILLVAL": 255}, + ), + }, + attrs={"Logical_source": "imap_hi_l1b_90sensor-de"}, + ) + + # Goodtimes: METs 100-104 good, METs 105-109 bad + goodtimes_ds = xr.Dataset( + { + "cull_flags": xr.DataArray( + np.zeros((2, 90), dtype=np.uint8), + dims=["met", "spin_bin"], + ), + }, + coords={"met": [100.0, 105.0], "spin_bin": np.arange(90)}, + ) + goodtimes_ds["cull_flags"].values[1, :] = 1 # All bins bad for MET >= 105 + + # Create empty pset with single ESA step and single calibration product + empty_pset = hi_l1c.empty_pset_dataset( + 100, + l1b_dataset.esa_energy_step, + np.array([0]), + HIAPID.H90_SCI_DE.sensor, + ) + + # Mock iter_qualified_events_by_config to mark all events as qualified + # and return a single (esa_energy, config_row, mask) tuple + mock_config_row = MagicMock() + mock_config_row.Index = (0, 1) # (calibration_prod, esa_energy_step) + + def mock_iter(de_ds, config_df, esa_energy_steps): + n_remaining = len(de_ds["event_met"]) + yield 1, mock_config_row, np.ones(n_remaining, dtype=bool) + + mock_iter_qualified.side_effect = mock_iter + + # Use MagicMock for cal_config since it's not used with our mock + mock_cal_config = MagicMock() + + counts_var = hi_l1c.pset_counts( + empty_pset.coords, mock_cal_config, l1b_dataset, goodtimes_ds + ) + + # Only 5 events (METs 100-104) should pass goodtimes filtering + # All 5 events have spin_phase=0.5 -> spin_angle_bin 1800 + total_counts = counts_var["counts"].data.sum() + assert total_counts == 5, f"Expected 5 counts, got {total_counts}" + # Verify all counts are in the expected spin bin (1800) + assert counts_var["counts"].data[0, 0, 0, 1800] == 5 + + def test_pset_backgrounds(): """Test coverage for pset_backgrounds function.""" # Create some fake coordinates to use @@ -438,17 +546,24 @@ def test_pset_backgrounds(): ) +@mock.patch("imap_processing.hi.hi_l1c.good_time_and_phase_mask") @mock.patch("imap_processing.hi.hi_l1c.get_pointing_times", return_value=(100, 200)) @mock.patch("imap_processing.hi.hi_l1c.get_spin_data", return_value=None) -@mock.patch("imap_processing.hi.hi_l1c.get_instrument_spin_phase") +@mock.patch( + "imap_processing.hi.hi_l1c.get_spacecraft_to_instrument_spin_phase_offset", + return_value=0.0, +) +@mock.patch("imap_processing.hi.hi_l1c.get_spacecraft_spin_phase") @mock.patch("imap_processing.hi.hi_l1c.get_de_clock_ticks_for_esa_step") @mock.patch("imap_processing.hi.hi_l1c.find_last_de_packet_data") def test_pset_exposure( mock_find_last_de_packet_data, mock_de_clock_ticks, - mock_spin_phase, + mock_sc_spin_phase, + mock_phase_offset, mock_spin_data, mock_pointing_times, + mock_good_time_and_phase_mask, ): """Test coverage for pset_exposure function""" l1b_energy_steps = xr.DataArray( @@ -472,7 +587,7 @@ def test_pset_exposure( # deterministic histogram values. # ESA step 1 should have repeating values of 3, 1. # ESA step 2 should have repeating values of 6, 2 - mock_spin_phase.return_value = np.concat( + mock_sc_spin_phase.return_value = np.concat( [hi_l1c.SPIN_PHASE_BIN_CENTERS, hi_l1c.SPIN_PHASE_BIN_CENTERS[::2]] ) mock_de_clock_ticks.return_value = ( @@ -485,8 +600,13 @@ def test_pset_exposure( l1b_dataset = MagicMock() l1b_dataset.attrs = {"Logical_source": "90sensor"} + # Mock goodtime to return all true + mock_good_time_and_phase_mask.side_effect = lambda x, y, z: np.ones( + x.shape, dtype=bool + ) + # All the setup is done, call the pset_exposure function - exposure_dict = hi_l1c.pset_exposure(empty_pset.coords, l1b_dataset) + exposure_dict = hi_l1c.pset_exposure(empty_pset.coords, l1b_dataset, xr.Dataset()) # Based on the spin phase and clock_tick mocks, the expected clock ticks are: # - Repeated values of 3, 1 for the first half of the spin bins @@ -506,6 +626,79 @@ def test_pset_exposure( ) +@mock.patch("imap_processing.hi.hi_l1c.get_pointing_times", return_value=(100, 200)) +@mock.patch("imap_processing.hi.hi_l1c.get_spin_data", return_value=None) +@mock.patch( + "imap_processing.hi.hi_l1c.get_spacecraft_to_instrument_spin_phase_offset", + return_value=0.0, +) +@mock.patch("imap_processing.hi.hi_l1c.get_spacecraft_spin_phase") +@mock.patch("imap_processing.hi.hi_l1c.get_de_clock_ticks_for_esa_step") +@mock.patch("imap_processing.hi.hi_l1c.find_last_de_packet_data") +def test_pset_exposure_goodtimes_filtering( + mock_find_last_de_packet_data, + mock_de_clock_ticks, + mock_sc_spin_phase, + mock_phase_offset, + mock_spin_data, + mock_pointing_times, +): + """Test that pset_exposure properly filters clock ticks based on goodtimes.""" + l1b_energy_steps = xr.DataArray( + np.arange(1) + 1, # Single ESA step for simplicity + attrs={"FILLVAL": 255}, + ) + empty_pset = hi_l1c.empty_pset_dataset( + 100, l1b_energy_steps, np.array([0]), HIAPID.H90_SCI_DE.sensor + ) + + # Mock find_last_de_packet_data to return a single ESA step + mock_find_last_de_packet_data.return_value = xr.Dataset( + coords={"epoch": xr.DataArray(np.arange(1), dims=["epoch"])}, + data_vars={ + "ccsds_met": xr.DataArray(np.array([150.0]), dims=["epoch"]), + "esa_energy_step": xr.DataArray(np.array([1]), dims=["epoch"]), + }, + ) + + # Create 10 clock ticks at METs 100-109 with uniform spin phases + n_ticks = 10 + clock_tick_mets = np.arange(100.0, 100.0 + n_ticks) + mock_de_clock_ticks.return_value = (clock_tick_mets, np.ones(n_ticks)) + + # Mock spacecraft spin phase - each tick maps to a different spin bin + # Spin phases 0.0, 0.1, 0.2, ... -> nominal_bins 0, 9, 18, ... + spin_phases = np.arange(n_ticks) / n_ticks + mock_sc_spin_phase.return_value = spin_phases + + # Create a goodtimes dataset that marks half the clock ticks as bad + # METs 100-104 are good (cull_flags=0), METs 105-109 are bad (cull_flags=1) + goodtimes_ds = xr.Dataset( + { + "cull_flags": xr.DataArray( + np.zeros((2, 90), dtype=np.uint8), + dims=["met", "spin_bin"], + ), + }, + coords={"met": [100.0, 105.0], "spin_bin": np.arange(90)}, + ) + # Mark all spin bins as bad for METs >= 105 + goodtimes_ds["cull_flags"].values[1, :] = 1 + + # Mock l1b_dataset + l1b_dataset = MagicMock() + l1b_dataset.attrs = {"Logical_source": "90sensor"} + + # Call pset_exposure with the goodtimes dataset + exposure_dict = hi_l1c.pset_exposure(empty_pset.coords, l1b_dataset, goodtimes_ds) + + # Only the first 5 clock ticks (METs 100-104) should contribute + # Their spin phases are 0.0, 0.1, 0.2, 0.3, 0.4 -> spin_angle_bins 0, 360, 720, ... + total_exposure_ticks = exposure_dict["exposure_times"].data.sum() + expected_ticks = 5.0 * HiConstants.DE_CLOCK_TICK_S + np.testing.assert_allclose(total_exposure_ticks, expected_ticks, rtol=0.01) + + def test_find_second_de_packet_data(): """Test coverage for find_second_de_packet_data function""" # Create a test l1b_dataset @@ -608,3 +801,72 @@ def test_get_de_clock_ticks_for_esa_step_exceptions(fake_spin_df): ValueError, match="Error determining start/end time for exposure time" ): hi_l1c.get_de_clock_ticks_for_esa_step(bad_ccsds_met, fake_spin_df) + + +class TestGoodTimeAndPhaseMask: + """Tests for good_time_and_phase_mask function.""" + + def test_filters_bad_times_with_nominal_bins(self): + """Events in bad times are filtered out using nominal_bins.""" + # Create mock goodtimes with some bad times + gt_ds = xr.Dataset( + { + "cull_flags": xr.DataArray( + np.zeros((3, 90), dtype=np.uint8), + dims=["met", "spin_bin"], + ) + }, + coords={"met": [100.0, 200.0, 300.0], "spin_bin": np.arange(90)}, + ) + # Mark spin_bin 10 as bad at MET 200 + gt_ds["cull_flags"].values[1, 10] = 1 + + mets = np.array([150.0, 250.0, 250.0]) + nominal_bins = np.array([10, 10, 20]) + + mask = hi_l1c.good_time_and_phase_mask(mets, nominal_bins, gt_ds) + # Event at 150 maps to MET index 0, bin 10 → good (cull_flags[0,10]=0) + # Event at 250 maps to MET index 1, bin 10 → bad (cull_flags[1,10]=1) + # Event at 250 maps to MET index 1, bin 20 → good (cull_flags[1,20]=0) + expected = np.array([True, False, True]) + np.testing.assert_array_equal(mask, expected) + + def test_met_before_goodtimes_range(self): + """Events before goodtimes range are clipped to first interval.""" + gt_ds = xr.Dataset( + { + "cull_flags": xr.DataArray( + np.zeros((2, 90), dtype=np.uint8), + dims=["met", "spin_bin"], + ) + }, + coords={"met": [100.0, 200.0], "spin_bin": np.arange(90)}, + ) + # Mark spin_bin 0 as bad at first MET + gt_ds["cull_flags"].values[0, 0] = 1 + + # Event at MET 50 (before goodtimes range) should use index 0 + mets = np.array([50.0]) + nominal_bins = np.array([0]) + + mask = hi_l1c.good_time_and_phase_mask(mets, nominal_bins, gt_ds) + # Clipped to index 0, bin 0 is bad + assert not mask[0] + + def test_all_bins_bad_for_interval(self): + """When all bins are bad for an interval, all events are filtered.""" + gt_ds = xr.Dataset( + { + "cull_flags": xr.DataArray( + np.ones((1, 90), dtype=np.uint8), # All bad + dims=["met", "spin_bin"], + ) + }, + coords={"met": [100.0], "spin_bin": np.arange(90)}, + ) + + mets = np.array([100.0, 150.0, 200.0]) + nominal_bins = np.array([0, 45, 89]) + + mask = hi_l1c.good_time_and_phase_mask(mets, nominal_bins, gt_ds) + assert not np.any(mask) diff --git a/imap_processing/tests/test_cli.py b/imap_processing/tests/test_cli.py index 0bbbe1ecd..8a3963cbf 100644 --- a/imap_processing/tests/test_cli.py +++ b/imap_processing/tests/test_cli.py @@ -300,7 +300,10 @@ def test_post_processing_returns_empty_list_if_invoked_with_no_data( "l1c", "45sensor-pset", "hi_l1c", - ["imap_hi_l1b_45sensor-de_20250415_v001.cdf"], + [ + "imap_hi_l1b_45sensor-de_20250415_v001.cdf", + "imap_hi_l1b_45sensor-goodtimes_20250415_v001.cdf", + ], ["imap_hi_calibration-prod-config_20240101_v001.csv"], 1, ),