diff --git a/.github/workflows/core_tests.yml b/.github/workflows/core_tests.yml index 3e4fd52f9..edcc895c7 100644 --- a/.github/workflows/core_tests.yml +++ b/.github/workflows/core_tests.yml @@ -130,6 +130,7 @@ jobs: - run: uv run pytest test/test_skim_name_conflicts.py - run: uv run pytest test/random_seed/test_random_seed.py + - run: uv run pytest test/skip_failed_choices/test_skip_failed_choices.py - run: uv run pytest test/trace_id/test_trace_id.py builtin_regional_models: diff --git a/activitysim/abm/models/location_choice.py b/activitysim/abm/models/location_choice.py index 7f032a8ae..2d629a404 100644 --- a/activitysim/abm/models/location_choice.py +++ b/activitysim/abm/models/location_choice.py @@ -234,6 +234,11 @@ def location_sample( ): # FIXME - MEMORY HACK - only include columns actually used in spec chooser_columns = model_settings.SIMULATE_CHOOSER_COLUMNS + # Drop this when PR #1017 is merged + if ("household_id" not in chooser_columns) and ( + "household_id" in persons_merged.columns + ): + chooser_columns = chooser_columns + ["household_id"] choosers = persons_merged[chooser_columns] # create wrapper with keys for this lookup - in this case there is a home_zone_id in the choosers @@ -390,6 +395,11 @@ def location_presample( # FIXME maybe we should add it for multi-zone (from maz_taz) if missing? chooser_columns = model_settings.SIMULATE_CHOOSER_COLUMNS chooser_columns = [HOME_TAZ if c == HOME_MAZ else c for c in chooser_columns] + # Drop this when PR #1017 is merged + if ("household_id" not in chooser_columns) and ( + "household_id" in persons_merged.columns + ): + chooser_columns = chooser_columns + ["household_id"] choosers = persons_merged[chooser_columns] # create wrapper with keys for this lookup - in this case there is a HOME_TAZ in the choosers @@ -620,6 +630,11 @@ def run_location_simulate( # FIXME - MEMORY HACK - only include columns actually used in spec chooser_columns = model_settings.SIMULATE_CHOOSER_COLUMNS + # Drop this when PR #1017 is merged + if ("household_id" not in chooser_columns) and ( + "household_id" in persons_merged.columns + ): + chooser_columns = chooser_columns + ["household_id"] choosers = persons_merged[chooser_columns] alt_dest_col_name = model_settings.ALT_DEST_COL_NAME @@ -1072,6 +1087,33 @@ def iterate_location_choice( else: choices_df = choices_df_ + if ( + state.settings.skip_failed_choices + and state.get("num_skipped_households", 0) > 0 + ): + # drop choices that belong to the failed households: state.skipped_household_ids + # so that their choices are not considered in shadow price calculations + # first append household_id to choices_df + choices_df = choices_df.merge( + persons_merged_df[["household_id"]], + left_index=True, + right_index=True, + how="left", + ) + if len(choices_df) > 0: + # Get all household IDs from all trace_labels in the dictionary + import itertools + + skipped_household_ids_dict = state.get("skipped_household_ids", dict()) + all_skipped_hh_ids = set( + itertools.chain.from_iterable(skipped_household_ids_dict.values()) + ) + + choices_df = choices_df[ + ~choices_df["household_id"].isin(all_skipped_hh_ids) + ] + choices_df = choices_df.drop(columns=["household_id"]) + spc.set_choices( choices=choices_df["choice"], segment_ids=persons_merged_df[chooser_segment_column].reindex( diff --git a/activitysim/abm/models/school_escorting.py b/activitysim/abm/models/school_escorting.py index 491d0ac0c..d24d52a41 100644 --- a/activitysim/abm/models/school_escorting.py +++ b/activitysim/abm/models/school_escorting.py @@ -468,6 +468,11 @@ def school_escorting( # reduce memory by limiting columns if selected columns are supplied chooser_columns = model_settings.SIMULATE_CHOOSER_COLUMNS if chooser_columns is not None: + # Drop this when PR #1017 is merged + if ("household_id" not in chooser_columns) and ( + "household_id" in choosers.columns + ): + chooser_columns = chooser_columns + ["household_id"] chooser_columns = chooser_columns + participant_columns choosers = choosers[chooser_columns] diff --git a/activitysim/abm/models/trip_matrices.py b/activitysim/abm/models/trip_matrices.py index 5552e3b00..5b5a8351a 100644 --- a/activitysim/abm/models/trip_matrices.py +++ b/activitysim/abm/models/trip_matrices.py @@ -90,6 +90,34 @@ def write_trip_matrices( trips_df = annotate_trips(state, trips, network_los, model_settings) + # This block adjusts household sample rate column to account for skipped households. + # Note: the `HH_EXPANSION_WEIGHT_COL` is pointing to the `sample_rate` column in the households table. + # Based on the calculation in write_matrices() function, the sample_rate is used to calculate the expansion weight as 1 / sample_rate. + # A sample_rate of 0.01 means the sample household should be expanded 1/0.01 = 100 times in the actual population households. + # In simulation, the `sample_rate` is calculated and added to the synthetic households + # based on household_sample_size / total_household_count, and therefore is the same for all households. + # In estimation, the `sample_rate` may vary by household, but weights are not used in estimation, and write_trip_matrices is not called during estimation. + # But we still try to cover both cases (when rates are the same vs when they vary) here for consistency. + hh_weight_col = model_settings.HH_EXPANSION_WEIGHT_COL + if state.get("num_skipped_households", 0) > 0: + logger.info( + f"Adjusting household sample rate in {hh_weight_col} to account for {state.get('num_skipped_households', 0)} skipped households." + ) + # adjust the hh sample rates to account for skipped households + # first get the total expansion weight of the skipped households, which will be the sum of inverse of their sample rates + skipped_household_weights = ( + 1 / state.get_dataframe("households_skipped")[hh_weight_col] + ).sum() + # next get the total expansion weight of the remaining households + remaining_household_weights = ( + 1 / state.get_dataframe("households")[hh_weight_col] + ).sum() + # the adjustment factor is the remaining household weight / (remaining household weight + skipped household weight) + adjustment_factor = remaining_household_weights / ( + remaining_household_weights + skipped_household_weights + ) + trips_df[hh_weight_col] = trips_df[hh_weight_col] * adjustment_factor + if model_settings.SAVE_TRIPS_TABLE: state.add_table("trips", trips_df) diff --git a/activitysim/abm/models/trip_mode_choice.py b/activitysim/abm/models/trip_mode_choice.py index a942b7af8..3378b5421 100644 --- a/activitysim/abm/models/trip_mode_choice.py +++ b/activitysim/abm/models/trip_mode_choice.py @@ -366,7 +366,20 @@ def trip_mode_choice( "trip_mode_choice choices", trips_df[mode_column_name], value_counts=True ) - assert not trips_df[mode_column_name].isnull().any() + # if we're skipping failed choices, the trip modes for failed simulations will be null + if state.settings.skip_failed_choices: + # Get all household IDs from all trace_labels in the dictionary - more efficient flattening + import itertools + + skipped_household_ids_dict = state.get("skipped_household_ids", dict()) + all_skipped_hh_ids = set( + itertools.chain.from_iterable(skipped_household_ids_dict.values()) + ) + + mask_skipped = trips_df["household_id"].isin(all_skipped_hh_ids) + assert not trips_df.loc[~mask_skipped, mode_column_name].isnull().any() + else: + assert not trips_df[mode_column_name].isnull().any() state.add_table("trips", trips_df) @@ -382,6 +395,8 @@ def trip_mode_choice( # need to update locals_dict to access skims that are the same .shape as trips table locals_dict = {} locals_dict.update(constants) + if state.settings.skip_failed_choices: + trips_merged = trips_merged.loc[~mask_skipped] simulate.set_skim_wrapper_targets(trips_merged, skims) locals_dict.update(skims) locals_dict["timeframe"] = "trip" diff --git a/activitysim/abm/models/util/school_escort_tours_trips.py b/activitysim/abm/models/util/school_escort_tours_trips.py index 665844023..1bfdd22aa 100644 --- a/activitysim/abm/models/util/school_escort_tours_trips.py +++ b/activitysim/abm/models/util/school_escort_tours_trips.py @@ -1043,6 +1043,41 @@ def force_escortee_trip_modes_to_match_chauffeur(state: workflow.State, trips): f"Changed {diff.sum()} trip modes of school escortees to match their chauffeur" ) + # trip_mode can be na if the run allows skipping failed choices and the trip mode choice has failed + # in that case we can't assert that all trip modes are filled + # instead, we throw a warning about how many are missing, and return early + if state.settings.skip_failed_choices: + missing_count = trips.trip_mode.isna().sum() + if missing_count > 0: + # check if the missing trip modes are all because of simulation failures + # i.e., they are from households that are in the skipped_household_ids set + import itertools + + skipped_household_ids_dict = state.get("skipped_household_ids", dict()) + skipped_household_ids = set( + itertools.chain.from_iterable(skipped_household_ids_dict.values()) + ) + missing_household_ids = set( + trips[trips.trip_mode.isna()]["household_id"].unique() + ) + # log a warning about the missing trip modes for skipped households + missing_count_due_to_sim_fail = len( + trips[ + trips.trip_mode.isna() + & trips.household_id.isin(skipped_household_ids) + ] + ) + logger.warning( + f"Missing trip mode for {missing_count_due_to_sim_fail} trips due to simulation failures in trip mode choice, " + f"these records and their corresponding households are being skipped: {missing_household_ids}" + ) + # throw assertion error if there are missing trip modes for households that were not skipped + assert missing_household_ids.issubset(skipped_household_ids), ( + f"Missing trip modes for households that were not skipped: {missing_household_ids - skipped_household_ids}. " + f"Missing trip modes for: {trips[trips.trip_mode.isna() & ~trips.household_id.isin(skipped_household_ids)]}" + ) + return trips + assert ( ~trips.trip_mode.isna() ).all(), f"Missing trip mode for {trips[trips.trip_mode.isna()]}" diff --git a/activitysim/abm/models/util/tour_destination.py b/activitysim/abm/models/util/tour_destination.py index d99803bd7..2cac47ee1 100644 --- a/activitysim/abm/models/util/tour_destination.py +++ b/activitysim/abm/models/util/tour_destination.py @@ -625,6 +625,11 @@ def run_destination_sample( # if special person id is passed chooser_id_column = model_settings.CHOOSER_ID_COLUMN + # Drop this when PR #1017 is merged + if ("household_id" not in chooser_columns) and ( + "household_id" in persons_merged.columns + ): + chooser_columns = chooser_columns + ["household_id"] persons_merged = persons_merged[ [c for c in persons_merged.columns if c in chooser_columns] ] @@ -799,6 +804,11 @@ def run_destination_simulate( # if special person id is passed chooser_id_column = model_settings.CHOOSER_ID_COLUMN + # Drop this when PR #1017 is merged + if ("household_id" not in chooser_columns) and ( + "household_id" in persons_merged.columns + ): + chooser_columns = chooser_columns + ["household_id"] persons_merged = persons_merged[ [c for c in persons_merged.columns if c in chooser_columns] ] diff --git a/activitysim/abm/models/util/tour_od.py b/activitysim/abm/models/util/tour_od.py index 5ec9dd493..c2548cbd6 100644 --- a/activitysim/abm/models/util/tour_od.py +++ b/activitysim/abm/models/util/tour_od.py @@ -692,6 +692,9 @@ def run_od_sample( choosers = tours # FIXME - MEMORY HACK - only include columns actually used in spec chooser_columns = model_settings.SIMULATE_CHOOSER_COLUMNS + # Drop this when PR #1017 is merged + if ("household_id" not in chooser_columns) and ("household_id" in choosers.columns): + chooser_columns = chooser_columns + ["household_id"] choosers = choosers[chooser_columns] # interaction_sample requires that choosers.index.is_monotonic_increasing @@ -951,6 +954,9 @@ def run_od_simulate( # FIXME - MEMORY HACK - only include columns actually used in spec chooser_columns = model_settings.SIMULATE_CHOOSER_COLUMNS + # Drop this when PR #1017 is merged + if ("household_id" not in chooser_columns) and ("household_id" in choosers.columns): + chooser_columns = chooser_columns + ["household_id"] choosers = choosers[chooser_columns] # interaction_sample requires that choosers.index.is_monotonic_increasing diff --git a/activitysim/abm/models/util/tour_scheduling.py b/activitysim/abm/models/util/tour_scheduling.py index 80474db59..e591c29db 100644 --- a/activitysim/abm/models/util/tour_scheduling.py +++ b/activitysim/abm/models/util/tour_scheduling.py @@ -40,6 +40,12 @@ def run_tour_scheduling( c for c in model_columns if c not in logsum_columns ] + # Drop this when PR #1017 is merged + if ("household_id" not in chooser_columns) and ( + "household_id" in persons_merged.columns + ): + chooser_columns = chooser_columns + ["household_id"] + persons_merged = expressions.filter_chooser_columns(persons_merged, chooser_columns) timetable = state.get_injectable("timetable") diff --git a/activitysim/abm/tables/households.py b/activitysim/abm/tables/households.py index c0c33dcbc..680cdb6c7 100644 --- a/activitysim/abm/tables/households.py +++ b/activitysim/abm/tables/households.py @@ -110,6 +110,12 @@ def households(state: workflow.State) -> pd.DataFrame: # replace table function with dataframe state.add_table("households", df) + if state.settings.skip_failed_choices: + logger.info( + "Note: 'skip_failed_choices' is enabled; households may be skipped when simulation fails." + ) + # initialize skipped households table as empty and same columns as households + state.add_table("households_skipped", df.iloc[0:0]) state.get_rn_generator().add_channel("households", df) diff --git a/activitysim/cli/run.py b/activitysim/cli/run.py index a1b23048c..ad91c4f16 100644 --- a/activitysim/cli/run.py +++ b/activitysim/cli/run.py @@ -476,6 +476,24 @@ def run(args): if memory_sidecar_process: memory_sidecar_process.stop() + # print out a summary of households skipped due to failed choices + # we want to see number of unique households skipped by trace_label + if state.settings.skip_failed_choices: + skipped_household_ids_dict = state.get("skipped_household_ids", dict()) + for trace_label, hh_id_set in skipped_household_ids_dict.items(): + logger.warning( + f"Number of unique households skipped for trace_label '{trace_label}': {len(hh_id_set)}. They are: {sorted(hh_id_set)}" + ) + # also log the total number of unique households skipped across all trace_labels + import itertools + + all_skipped_hh_ids = set( + itertools.chain.from_iterable(skipped_household_ids_dict.values()) + ) + logger.warning( + f"Total number of unique households skipped across all trace_labels: {len(all_skipped_hh_ids)}." + ) + if state.settings.expression_profile: # generate a summary of slower expression evaluation times # across all models and write to a file diff --git a/activitysim/core/configuration/top.py b/activitysim/core/configuration/top.py index a6c29269a..ffb57db17 100644 --- a/activitysim/core/configuration/top.py +++ b/activitysim/core/configuration/top.py @@ -778,11 +778,27 @@ def _check_store_skims_in_shm(self): check_model_settings: bool = True """ - run checks to validate that YAML settings files are loadable and spec and coefficent csv can be resolved. + run checks to validate that YAML settings files are loadable and spec and coefficient csv can be resolved. should catch many common errors early, including missing required configurations or specified coefficient labels without defined values. """ + skip_failed_choices: bool = True + """ + Skip households that cause errors during processing instead of failing the model run. + + .. versionadded:: 1.6 + """ + + fraction_of_failed_choices_allowed: float = 0.1 + """ + Threshold for the fraction of households that can be skipped before failing the model run, + used in conjunction with `skip_failed_choices`. + We want to skip problems when they are rare, but fail the run if they are common. + + .. versionadded:: 1.6 + """ + other_settings: dict[str, Any] = None def _get_attr(self, attr): diff --git a/activitysim/core/interaction_sample_simulate.py b/activitysim/core/interaction_sample_simulate.py index df1c53fa0..e34974840 100644 --- a/activitysim/core/interaction_sample_simulate.py +++ b/activitysim/core/interaction_sample_simulate.py @@ -351,6 +351,11 @@ def _interaction_sample_simulate( # that is, we want the index value of the row that is offset by rows into the # tranche of this choosers alternatives created by cross join of alternatives and choosers + # when skip failed choices is enabled, the position may be -99 for failed choices, which gets droppped eventually + # here we just need to clip to zero to avoid getting the wrong index in the take() below + if state.settings.skip_failed_choices: + positions = positions.clip(lower=0) + # resulting pandas Int64Index has one element per chooser row and is in same order as choosers choices = alternatives[choice_column].take(positions + first_row_offsets) diff --git a/activitysim/core/logit.py b/activitysim/core/logit.py index 105e18fec..8a0097153 100644 --- a/activitysim/core/logit.py +++ b/activitysim/core/logit.py @@ -30,6 +30,7 @@ def report_bad_choices( state: workflow.State, bad_row_map, df, + skip_failed_choices, trace_label, msg, trace_choosers=None, @@ -87,6 +88,45 @@ def report_bad_choices( logger.warning(row_msg) + if skip_failed_choices: + # update counter in state + num_skipped_households = state.get("num_skipped_households", 0) + skipped_household_ids = state.get("skipped_household_ids", dict()) + + # Get current household IDs and filter out None values + current_hh_ids = set(df[trace_col].dropna().unique()) + + # Get all previously skipped household IDs across all trace_labels + import itertools + + already_skipped = set( + itertools.chain.from_iterable(skipped_household_ids.values()) + ) + + # Find truly new household IDs that haven't been skipped before + new_skipped_hh_ids = current_hh_ids - already_skipped + + # Only process if there are new households to skip + if new_skipped_hh_ids: + # Initialize list for this trace_label if it doesn't exist + if trace_label not in skipped_household_ids: + skipped_household_ids[trace_label] = [] + skipped_household_ids[trace_label].extend(new_skipped_hh_ids) + num_skipped_households += len(new_skipped_hh_ids) + + # make sure the number of skipped households is consistent with the ids recorded + assert num_skipped_households == sum( + len(hh_list) for hh_list in skipped_household_ids.values() + ), "Inconsistent number of skipped households and recorded ids" + state.set("num_skipped_households", num_skipped_households) + state.set("skipped_household_ids", skipped_household_ids) + + logger.warning( + f"Skipping {bad_row_map.sum()} bad choices. Total skipped households so far: {num_skipped_households}. Skipped household IDs: {skipped_household_ids}" + ) + + return + if raise_error: raise InvalidTravelError(msg_with_count) @@ -136,6 +176,7 @@ def utils_to_probs( allow_zero_probs=False, trace_choosers=None, overflow_protection: bool = True, + skip_failed_choices: bool = True, return_logsums: bool = False, ): """ @@ -176,6 +217,16 @@ def utils_to_probs( overflow_protection will have no benefit but impose a modest computational overhead cost. + skip_failed_choices : bool, default True + If True, when bad choices are detected (all zero probabilities or infinite + probabilities), the entire household that's causing bad choices will be skipped instead of + being masked by overflow protection or causing an error. + A counter will be incremented for each skipped household. This is useful when running large + simulations where occasional bad choices are encountered and should not halt the process. + The counter can be accessed via `state.get("num_skipped_households", 0)`. + The number of skipped households and their IDs will be logged at the end of the simulation. + When `skip_failed_choices` is True, `overflow_protection` will be forced to False to avoid conflicts. + Returns ------- probs : pandas.DataFrame @@ -203,6 +254,13 @@ def utils_to_probs( utils_arr.dtype == np.float32 and utils_arr.max() > 85 ) + # get skip_failed_choices from state + skip_failed_choices = state.settings.skip_failed_choices + # when skipping failed choices, we cannot use overflow protection + # because it would mask the underlying issue causing bad choices + if skip_failed_choices: + overflow_protection = False + if overflow_protection: # exponentiated utils will overflow, downshift them shifts = utils_arr.max(1, keepdims=True) @@ -240,6 +298,7 @@ def utils_to_probs( state, zero_probs, utils, + skip_failed_choices, trace_label=tracing.extend_trace_label(trace_label, "zero_prob_utils"), msg="all probabilities are zero", trace_choosers=trace_choosers, @@ -251,11 +310,25 @@ def utils_to_probs( state, inf_utils, utils, + skip_failed_choices, trace_label=tracing.extend_trace_label(trace_label, "inf_exp_utils"), msg="infinite exponentiated utilities", trace_choosers=trace_choosers, ) + # check if any values are nan + nan_utils = np.isnan(arr_sum) + if nan_utils.any(): + report_bad_choices( + state, + nan_utils, + utils, + skip_failed_choices, + trace_label=tracing.extend_trace_label(trace_label, "nan_exp_utils"), + msg="nan exponentiated utilities", + trace_choosers=trace_choosers, + ) + # if allow_zero_probs, this may cause a RuntimeWarning: invalid value encountered in divide with np.errstate( invalid="ignore" if allow_zero_probs else "warn", @@ -316,11 +389,14 @@ def make_choices( np.ones(len(probs.index)) ).abs() > BAD_PROB_THRESHOLD * np.ones(len(probs.index)) + skip_failed_choices = state.settings.skip_failed_choices + if bad_probs.any() and not allow_bad_probs: report_bad_choices( state, bad_probs, probs, + skip_failed_choices, trace_label=tracing.extend_trace_label(trace_label, "bad_probs"), msg="probabilities do not add up to 1", trace_choosers=trace_choosers, @@ -329,6 +405,8 @@ def make_choices( rands = state.get_rn_generator().random_for_df(probs) choices = pd.Series(choice_maker(probs.values, rands), index=probs.index) + # mark bad choices with -99 + choices[bad_probs] = -99 rands = pd.Series(np.asanyarray(rands).flatten(), index=probs.index) diff --git a/activitysim/core/simulate.py b/activitysim/core/simulate.py index 7ced099d9..68fdb0c43 100644 --- a/activitysim/core/simulate.py +++ b/activitysim/core/simulate.py @@ -1326,6 +1326,22 @@ def eval_mnl( ) chunk_sizer.log_df(trace_label, "probs", probs) + # resimulate one of the failed households for tracing + if state.settings.skip_failed_choices: + _resimulate_failed_choice_for_tracing( + state=state, + choosers=choosers, + spec=spec, + locals_d=locals_d, + log_alt_losers=log_alt_losers, + trace_label=trace_label, + have_trace_targets=have_trace_targets, + estimator=estimator, + trace_column_names=trace_column_names, + chunk_sizer=chunk_sizer, + compute_settings=compute_settings, + ) + del utilities chunk_sizer.log_df(trace_label, "utilities", None) @@ -1340,7 +1356,9 @@ def eval_mnl( if custom_chooser: choices, rands = custom_chooser(state, probs, choosers, spec, trace_label) else: - choices, rands = logit.make_choices(state, probs, trace_label=trace_label) + choices, rands = logit.make_choices( + state, probs, trace_label=trace_label, trace_choosers=choosers + ) del probs chunk_sizer.log_df(trace_label, "probs", None) @@ -1505,11 +1523,28 @@ def eval_nl( state, no_choices, base_probabilities, + state.settings.skip_failed_choices, trace_label=tracing.extend_trace_label(trace_label, "bad_probs"), trace_choosers=choosers, msg="base_probabilities do not sum to one", ) + if state.settings.skip_failed_choices: + _resimulate_failed_choice_for_tracing( + state=state, + choosers=choosers, + spec=spec_sh, + locals_d=locals_d, + log_alt_losers=log_alt_losers, + trace_label=trace_label, + have_trace_targets=have_trace_targets, + estimator=estimator, + trace_column_names=trace_column_names, + spec_sh=spec_sh, + chunk_sizer=chunk_sizer, + compute_settings=compute_settings, + ) + if custom_chooser: choices, rands = custom_chooser( state, @@ -2164,3 +2199,113 @@ def simple_simulate_logsums( assert len(logsums.index == len(choosers.index)) return logsums + + +def _resimulate_failed_choice_for_tracing( + state: workflow.State, + choosers, + spec, + locals_d, + log_alt_losers, + trace_label, + have_trace_targets, + estimator, + trace_column_names, + chunk_sizer, + compute_settings, + spec_sh=None, +): + """ + Helper function to resimulate one of the failed choices for tracing purposes. + + This function handles the logic for finding and retracing skipped households + when skip_failed_choices is enabled. + """ + skipped_household_ids_dict = state.get("skipped_household_ids", dict()) + + # check if there are any skipped households to process + if not skipped_household_ids_dict: + return + + # calculate current skipped household count efficiently + current_skipped_count = sum( + len(hh_list) for hh_list in skipped_household_ids_dict.values() + ) + # check if there are newly skipped households + last_updated_count = state.get("_last_updated_skipped_count", 0) + if current_skipped_count <= last_updated_count: + return + + # find the last household ID + last_household_id = None + for hh_list in reversed(list(skipped_household_ids_dict.values())): + if hh_list: # Check if list is not empty + last_household_id = hh_list[-1] + break + if last_household_id is None: + return + + # get failed choosers based on household_id location + failed_choosers = None + if "household_id" in choosers.columns: + failed_choosers = choosers.loc[choosers["household_id"] == last_household_id] + elif "household_id" in choosers.index.names: + failed_choosers = choosers.loc[ + choosers.index.get_level_values("household_id") == last_household_id + ] + else: + logger.warning( + f"{trace_label} - cannot resimulate skipped household_id {last_household_id} " + "because no household_id column or index level found in choosers" + ) + return + + # resimulate failed choosers if found + if failed_choosers is None or len(failed_choosers) == 0: + return + # set up tracing for this chooser + existing_trace_hh_id = state.settings.trace_hh_id + existing_traceable_table_indexes = state.tracing.traceable_table_indexes + # find corresponding traceable name based on choosers index + traceable_table_name = existing_traceable_table_indexes.get( + choosers.index.name, None + ) + if traceable_table_name is None: + return + + # temporarily set trace settings for this household + state.settings.trace_hh_id = last_household_id + if "households" not in state.tracing.traceable_table_ids: + state.tracing.traceable_table_ids["households"] = [] + state.tracing.traceable_table_ids["households"] = state.tracing.traceable_table_ids[ + "households" + ] + [last_household_id] + state.tracing.register_traceable_table(traceable_table_name, failed_choosers) + try: + # update the SkimWrapper and Skim3dWrapper objects in the locals_d based on index of failed_choosers + from activitysim.core.skim_dictionary import SkimWrapper, Skim3dWrapper + + for local_key, local_value in locals_d.items(): + if isinstance(local_value, SkimWrapper): + local_value.set_df(failed_choosers) + elif isinstance(local_value, Skim3dWrapper): + local_value.set_df(failed_choosers) + + # rerun eval_utilities for the failed chooser + _failed_utilities = eval_utilities( + state, + spec, + failed_choosers, + locals_d, + log_alt_losers=log_alt_losers, + trace_label=trace_label + "_resimulate", + have_trace_targets=True, + estimator=estimator, + trace_column_names=trace_column_names, + spec_sh=spec_sh, + chunk_sizer=chunk_sizer, + compute_settings=compute_settings, + ) + finally: + # restore original trace settings + state.settings.trace_hh_id = existing_trace_hh_id diff --git a/activitysim/core/test/test_logit.py b/activitysim/core/test/test_logit.py index e249475de..d0ee07cb2 100644 --- a/activitysim/core/test/test_logit.py +++ b/activitysim/core/test/test_logit.py @@ -78,6 +78,7 @@ def test_utils_to_probs(utilities, test_data): def test_utils_to_probs_raises(): state = workflow.State().default_settings() + state.settings.skip_failed_choices = False idx = pd.Index(name="household_id", data=[1]) with pytest.raises(RuntimeError) as excinfo: logit.utils_to_probs( diff --git a/activitysim/core/util.py b/activitysim/core/util.py index 20f79c760..58bc50be4 100644 --- a/activitysim/core/util.py +++ b/activitysim/core/util.py @@ -683,6 +683,9 @@ def drop_unused_columns( unique_variables_in_spec |= set(additional_columns or []) + # always keep household_id + unique_variables_in_spec.add("household_id") + if locals_d: unique_variables_in_spec.add(locals_d.get("orig_col_name", None)) unique_variables_in_spec.add(locals_d.get("dest_col_name", None)) diff --git a/activitysim/core/workflow/state.py b/activitysim/core/workflow/state.py index 9f7dcd4d6..fcdbe7149 100644 --- a/activitysim/core/workflow/state.py +++ b/activitysim/core/workflow/state.py @@ -938,8 +938,181 @@ def add_table( # mark this salient table as edited, so it can be checkpointed # at some later time if desired. self.existing_table_status[name] = True + + # Set the new table content self.set(name, content) + # Check if we need to update tables for skipped households + skipped_household_ids_dict = self.get("skipped_household_ids", dict()) + if ( + skipped_household_ids_dict + ): # only proceed if there are any skipped households + current_skipped_count = sum( + len(hh_list) for hh_list in skipped_household_ids_dict.values() + ) + # make sure this is consistent with number of skipped households tracked + assert current_skipped_count == self.get("num_skipped_households", 0) + + # Only update this specific table if it actually contains skipped household data + import itertools + + skipped_household_ids = set( + itertools.chain.from_iterable(skipped_household_ids_dict.values()) + ) + + # Check if the new content contains any skipped households + content_needs_cleaning = False + if hasattr(content, "index") and "household_id" in content.index.names: + content_needs_cleaning = ( + content.index.get_level_values("household_id") + .isin(skipped_household_ids) + .any() + ) + elif hasattr(content, "columns") and "household_id" in content.columns: + content_needs_cleaning = ( + content["household_id"].isin(skipped_household_ids).any() + ) + + if content_needs_cleaning: + self.update_table(name) + + # Check if there are newly skipped households and update all tables if needed + last_updated_count = self.get("_last_updated_skipped_count", 0) + if current_skipped_count > last_updated_count: + # update all tables to remove newly skipped households + self.update_table(name=None) + # Track the number of skipped households at the time of last update + self.set("_last_updated_skipped_count", current_skipped_count) + + def update_table(self, name: str = None): + """ + Go through existing tables in the state and + get rid of any rows that correspond to skipped households. + Save skipped household records in state under households_skipped + Parameters + ---------- + name : str, optional + Name of table to update. If None, update all tables. + Returns + ------- + None + """ + import itertools + + skipped_household_ids_dict = self.get("skipped_household_ids", dict()) + skipped_household_ids = set( + itertools.chain.from_iterable(skipped_household_ids_dict.values()) + ) + + if not skipped_household_ids: + return + + # get existing tables in the current state context + existing_tables = self.registered_tables() if name is None else [name] + + for table_name in existing_tables: + if not self.is_table(table_name): + continue + df = self.get_dataframe(table_name, as_copy=False) + # get the initial length of the dataframe + initial_len = len(df) + # we do not drop rows from households_skipped table + if table_name == "households_skipped": + continue + + # determine which column/index contains household_id and create mask + mask = None + if "household_id" in df.index.names: + mask = df.index.get_level_values("household_id").isin( + skipped_household_ids + ) + elif "household_id" in df.columns: + mask = df["household_id"].isin(skipped_household_ids) + else: + continue # skip tables without household_id + + # early exit if no matches found + if not mask.any(): + continue + + # save skipped household records for households table only + if table_name == "households": + newly_skipped_hh_df = df.loc[mask].copy() + skipped_hh_df = self.get("households_skipped", pd.DataFrame()) + skipped_hh_df = pd.concat( + [skipped_hh_df, newly_skipped_hh_df], join="inner" + ) + # make sure household_id is unique in skipped households + if "household_id" in skipped_hh_df.index.names: + assert skipped_hh_df.index.get_level_values( + "household_id" + ).is_unique, "household_id is not unique in households_skipped" + self.set("households_skipped", skipped_hh_df) + # mark households_skipped table as salient and edited + self.existing_table_status["households_skipped"] = True + + # Check if we've exceeded the allowed fraction of skipped households + # Use weighted households if expansion weight column exists, otherwise use counts + if "sample_rate" in df.columns: + # Use weighted calculation + skipped_household_weights = (1 / skipped_hh_df["sample_rate"]).sum() + remaining_household_weights = ( + 1 / df.loc[~mask, "sample_rate"] + ).sum() + total_household_weights = ( + skipped_household_weights + remaining_household_weights + ) + + if total_household_weights > 0: + skipped_fraction = ( + skipped_household_weights / total_household_weights + ) + metric_name = "weighted households" + else: + # Fallback to count-based if weights are zero + skipped_fraction = len(skipped_hh_df) / ( + len(skipped_hh_df) + len(df.loc[~mask]) + ) + metric_name = "households" + else: + # Use count-based calculation + skipped_fraction = len(skipped_hh_df) / ( + len(skipped_hh_df) + len(df.loc[~mask]) + ) + metric_name = "households" + + max_allowed_fraction = self.settings.fraction_of_failed_choices_allowed + + if skipped_fraction > max_allowed_fraction: + raise RuntimeError( + f"Too many {metric_name} skipped: {skipped_fraction:.2%} exceeds the allowed threshold of " + f"{max_allowed_fraction:.2%}. This indicates a systematic problem with the model " + f"simulation. Adjust 'fraction_of_failed_choices_allowed' setting " + f"if this is expected." + ) + + # drop the matching rows using the same mask + df.drop(index=df.index[mask], inplace=True) + + # get the length of the dataframe after dropping rows + final_len = len(df) + logger.debug( + f"update_table: dropped {initial_len - final_len} rows from {table_name} " + f"corresponding to skipped households" + ) + # mark this table as edited if we dropped any rows + if final_len < initial_len: + self.existing_table_status[table_name] = True + # terminate the run if we dropped all rows + # and raise an error + if final_len == 0: + raise RuntimeError( + f"update_table: all rows dropped from {table_name} " + f"corresponding to skipped households, terminating run" + ) + # set the updated dataframe back to the state + self.set(table_name, df) + def is_table(self, name: str): """ Check if a name corresponds to a table in this state's context. diff --git a/activitysim/core/workflow/tracing.py b/activitysim/core/workflow/tracing.py index b89a8bd26..4ef24f0b9 100644 --- a/activitysim/core/workflow/tracing.py +++ b/activitysim/core/workflow/tracing.py @@ -173,7 +173,8 @@ def register_traceable_table(self, table_name: str, df: pd.DataFrame) -> None: prior_traced_ids = traceable_table_ids.get(table_name, []) if new_traced_ids: - assert not set(prior_traced_ids) & set(new_traced_ids) + if not self._obj.settings.skip_failed_choices: + assert not set(prior_traced_ids) & set(new_traced_ids) traceable_table_ids[table_name] = prior_traced_ids + new_traced_ids self.traceable_table_ids = traceable_table_ids diff --git a/docs/users-guide/modelsetup.rst b/docs/users-guide/modelsetup.rst index a3293fdbd..1b3bcc831 100644 --- a/docs/users-guide/modelsetup.rst +++ b/docs/users-guide/modelsetup.rst @@ -66,6 +66,7 @@ ActivitySim has features that makes it possible to customize model runs or impro + Converting string variables to pandas categoricals. ActivitySim releases 1.3.0 and higher have this capability. + Converting higher byte integer variables to lower byte integer variables (such as reducing ‘num tours’ from int64 to int8). ActivitySim releases 1.3.0 and higher have this capability as a switch and defaults to turning this feature off. + Converting higher byte float variables to lower bytes. ActivitySim releases 1.3.0 and higher have this capability as a switch and defaults to turning this feature off. +* :ref:`Skip Failed Choices ` allows the user to skip over households that encounter errors during model execution. This feature helps to ensure that the model can continue running even in the presence of data or specification issues, while also providing visibility into any potential problems that need to be addressed. Steps for enabling/disabling these options are included in the :ref:`Advanced Configuration` sub-section, under :ref:`Ways to Run the Model` page of this Users’ Guide. diff --git a/docs/users-guide/ways_to_run.rst b/docs/users-guide/ways_to_run.rst index 1b2122107..f04413415 100644 --- a/docs/users-guide/ways_to_run.rst +++ b/docs/users-guide/ways_to_run.rst @@ -283,3 +283,35 @@ With the set of output CSV files, the user can trace ActivitySim calculations in help debug data and/or logic errors. Refer to :ref:`trace` for more details on configuring tracing and the various output files. + +.. _skip_failed_choices_ways_to_run : + +Skip Failed Choices +______________________ + +By default, ActivitySim will skip any failed choices during model execution, i.e., ``skip_failed_choices`` is set to ``True``. +A failed choice occurs when the computed utilities for all alternatives are zero, or infinite, or nan, which can happen due to +data issues or model specification problems. A warning message is logged when a failed choice is encountered, +and the corresponding household (along with its persons, vehicles, tours, trips, etc) will be excluded from further model steps. +At the end of the model run, a summary of all skipped households is provided in the log file for user reference. This feature +helps to ensure that the model can continue running even in the presence of data or specification issues, +while also providing visibility into any potential problems that need to be addressed. + +Users can optionally set a ``fraction_of_failed_choices_allowed`` parameter in the settings file to specify a threshold for the +maximum allowable fraction of failed households, this value is expected to be between 0 and 1. +If the fraction of failed households exceeds this threshold, ActivitySim will raise a RuntimeError and terminate the model run. +If the fraction is within the allowable limit, the model will proceed with the skipped households as described above. This threshold +provides an additional layer of control for users to skip problems when they are small, and stop the model when they are large. + +When ``skip_failed_choices`` is enabled, ActivitySim will automatically perform debug tracing for one of the failed households within each +model step where failed choices occur. The trace files will be saved in the output/trace directory with folders suffixed by +``_resimulate``. This automatic tracing feature allows users to easily investigate the reasons behind the failed choices without needing to +manually specify trace IDs. This feature is partially implemented for simple simulate models, and is not yet available for interaction_simulate models. +For interaction_simulate models, users can manually specify trace IDs to perform tracing of failed choices. + +Users can configure ActivitySim to not skip failed choices by setting the +``skip_failed_choices`` option to ``False`` in the settings file. When this option is disabled, the system will fall back to +using the legacy ``overflow_protection`` mechanism to handle such cases. Specifically, if the computed utilities lead to zero or infinite exponentiated values, +overflow protection will adjust the utilities to prevent numerical overflow during exponentiation and arbitarily making a choice. +No loggings will be made for these cases. When ``skip_failed_choices`` is enabled, +ActivitySim will not use the legacy ``overflow_protection`` mechanism to handle failed choices. diff --git a/test/skip_failed_choices/.gitignore b/test/skip_failed_choices/.gitignore new file mode 100644 index 000000000..67176c62d --- /dev/null +++ b/test/skip_failed_choices/.gitignore @@ -0,0 +1,2 @@ +configs*/ +output/ \ No newline at end of file diff --git a/test/skip_failed_choices/test_skip_failed_choices.py b/test/skip_failed_choices/test_skip_failed_choices.py new file mode 100644 index 000000000..5055345e4 --- /dev/null +++ b/test/skip_failed_choices/test_skip_failed_choices.py @@ -0,0 +1,160 @@ +from __future__ import annotations + +# ActivitySim +# See full license in LICENSE.txt. +import importlib.resources +import os +from shutil import copytree + +import pandas as pd +import pytest +import yaml + +from activitysim.core import workflow + + +def example_path(dirname): + resource = os.path.join("examples", "prototype_mtc", dirname) + return str(importlib.resources.files("activitysim").joinpath(resource)) + + +def dir_test_path(dirname): + return os.path.join(os.path.dirname(__file__), dirname) + + +data_dir = example_path("data") +new_configs_dir = dir_test_path("configs") +new_settings_file = os.path.join(new_configs_dir, "settings.yaml") +# copy example configs to test/skip_failed_choices/configs if not already there +if not os.path.exists(new_configs_dir): + copytree(example_path("configs"), new_configs_dir) + + +def update_settings(settings_file, key, value): + with open(settings_file, "r") as f: + settings = yaml.safe_load(f) + + settings[key] = value + + with open(settings_file, "w") as f: + yaml.safe_dump(settings, f) + + +def update_uec_csv(uec_file, expression, coef_value): + # read in the uec file + df = pd.read_csv(uec_file) + # append a new row, set expression and coef_value + df.loc[len(df), "Expression"] = expression + # from the 4th column onward are coefficients + for col in df.columns[3:]: + df.loc[len(df) - 1, col] = coef_value + df.to_csv(uec_file, index=False) + + +@pytest.fixture +def state(): + configs_dir = new_configs_dir + output_dir = dir_test_path("output") + data_dir = example_path("data") + + # turn the global setting on to skip failed choices + update_settings(new_settings_file, "skip_failed_choices", True) + + # make some choices fail by setting extreme coefficients in the uec + # auto ownership + auto_ownership_uec_file = os.path.join(new_configs_dir, "auto_ownership.csv") + # forcing households in home zone 8 (recoded 7) to fail auto ownership choice + update_uec_csv(auto_ownership_uec_file, "@df.home_zone_id==7", -999.0) + + # work location choice + work_location_choice_uec_file = os.path.join( + new_configs_dir, "workplace_location.csv" + ) + # forcing workers from home zone 18 to fail work location choice + # as if there is a network connection problem for zone 18 + update_uec_csv(work_location_choice_uec_file, "@df.home_zone_id==18", -999.0) + + # trip mode choice + trip_mode_choice_uec_file = os.path.join(new_configs_dir, "trip_mode_choice.csv") + # forcing trips on drivealone tours to fail trip mode choice + update_uec_csv(trip_mode_choice_uec_file, "@df.tour_mode=='DRIVEALONEFREE'", -999.0) + + state = workflow.State.make_default( + configs_dir=configs_dir, + output_dir=output_dir, + data_dir=data_dir, + ) + + from activitysim.abm.tables.skims import network_los_preload + + state.get(network_los_preload) + + state.logging.config_logger() + return state + + +def test_skip_failed_choices_low_threshold_prototype_mtc(state): + + # check that the setting is indeed set to True + assert state.settings.skip_failed_choices is True + + # set the threshold to 1% of households are allowed to be skipped + state.settings.fraction_of_failed_choices_allowed = 0.01 + + # the run will fail because more than 1% households will be skipped + with pytest.raises(RuntimeError) as excinfo: + state.run(models=state.settings.models, resume_after=None) + assert "exceeds the allowed threshold of" in str(excinfo.value) + + +def test_skip_failed_choices_high_threshold_prototype_mtc(state): + + # check that the setting is indeed set to True + assert state.settings.skip_failed_choices is True + + # set the threshold to 100% of households are allowed to be skipped + state.settings.fraction_of_failed_choices_allowed = 1.0 + + # state.run(models=state.settings.models, resume_after=None) + + for model in state.settings.models: + state.run.by_name(model) + skipped_household_ids_dict = state.get("skipped_household_ids", dict()) + # get the keys of the dict + skipped_household_ids_keys = skipped_household_ids_dict.keys() + if model == "initialize_households": + # check households with home zone if 7 and 18 are in the simulated households + assert 7 in state.get("households").loc[:, "home_zone_id"].values + assert 18 in state.get("households").loc[:, "home_zone_id"].values + if model == "auto_ownership_simulate": + # assert one of the keys contains "auto_ownership_simulate" + assert any( + "auto_ownership_simulate" in key for key in skipped_household_ids_keys + ) + # assert that the number of skipped households for auto_ownership_simulate is 598 + assert state.get("num_skipped_households", 0) == (69 + 598) + # check no households in home zone 8 (recoded 7) in the remaining households + assert all(state.get("households").loc[:, "home_zone_id"] != 7) + assert all(state.get("persons").loc[:, "home_zone_id"] != 7) + elif model == "workplace_location": + # assert one of the keys contains "workplace_location" + assert any( + "workplace_location" in key for key in skipped_household_ids_keys + ) + # assert that the number of skipped households for workplace_location is 69 + assert state.get("num_skipped_households", 0) == 69 + # assert no workers from home zone 18 in the remaining workers + persons_df = state.get_dataframe("persons") + assert not any( + (persons_df["home_zone_id"] == 18) & (persons_df["is_worker"] == True) + ) + elif model == "trip_mode_choice": + # assert one of the keys contains "trip_mode_choice" + assert any("trip_mode_choice" in key for key in skipped_household_ids_keys) + # assert that the number of skipped households for trip_mode_choice is 276 + assert state.get("num_skipped_households", 0) == (69 + 598 + 276) + # assert no DRIVEALONEFREE tours in the remaining tours + assert all(state.get("tours").loc[:, "tour_mode"] != "DRIVEALONEFREE") + + # check that the number of skipped households is recorded in state + assert state.get("num_skipped_households", 0) == 943