diff --git a/rmgpy/tools/uncertainty.py b/rmgpy/tools/uncertainty.py index 75b150db48..a23c314362 100644 --- a/rmgpy/tools/uncertainty.py +++ b/rmgpy/tools/uncertainty.py @@ -141,6 +141,43 @@ def get_uncertainty_factor(self, source): dG = self.get_uncertainty_value(source) f = np.sqrt(3) * dG return f + + def _get_covariance_qq(self, q_label1, q_label2): + """ + Gets the covariance between two intermediate sources q1 and q2 + Where q_label1 and q_label2 are both labels for a given thermo source + + The possible intermediate parameter types are: + Library, Surface_Library, QM, Estimation, AdsorptionCorrection, or Group + """ + intermediate_parameters = { + 'Library': self.dG_library, + 'Surface_Library': self.dG_surf_lib, + 'QM': self.dG_QM, + 'Estimation': self.dG_GAV, + 'AdsorptionCorrection': self.dG_ADS_correction, + 'Group': self.dG_group, + } + + # figure out the type of each correlated parameter from its label + corr_type1 = None + corr_type2 = None + + for intermediate_type in intermediate_parameters.keys(): + if q_label1.startswith(intermediate_type): + corr_type1 = intermediate_type + if q_label2.startswith(intermediate_type): + corr_type2 = intermediate_type + + if corr_type1 is None or corr_type2 is None: + raise ValueError(f'Could not determine the type of the correlated parameters from their labels {q_label1} and {q_label2}') + + if corr_type1 != corr_type2: + return 0 + elif q_label1 == q_label2: + # If the two correlated parameters are exactly the same, return the variance of that parameter + return intermediate_parameters[corr_type1] ** 2.0 + return 0 class KineticParameterUncertainty(object): @@ -198,39 +235,23 @@ def get_uncertainty_value(self, source): varlnk += self.dlnk_family * self.dlnk_family N = len(rule_weights) + len(training_weights) - if 'node_std_dev' in source_dict: - # Handle autogen BM trees - if source_dict['node_std_dev'] < 0: - raise ValueError('Invalid value for std dev of kinetics family rule node') - varlnk += np.float_power(source_dict['node_std_dev'], 2.0) - if source_dict['node_n_train'] is None: - raise ValueError('Invalid number of training reactions for kinetics family rule node') - N = source_dict['node_n_train'] - - # Technically every lookup in the autogenerated trees is an "exact" match because - # every node template has its own fitted rate rule by definition, but here we use the - # number of training reactions as an approximation of the node's specificity/generality - # and add a penalty for being too general (large # of training reactions) - varlnk += (np.log10(N + 1) * self.dlnk_nonexact) ** 2 - else: - # Handle hand-made trees - if not exact: - # nonexactness contribution increases as N increases - varlnk += (np.log10(N + 1) * self.dlnk_nonexact) ** 2 + if not exact: + # nonexactness contribution increases as N increases + varlnk += (np.log10(N + 1) * self.dlnk_nonexact) * (np.log10(N + 1) * self.dlnk_nonexact) - if 'surface' in family_label.lower(): - varlnk += np.sum([weight * weight * self.dlnk_surf_rule * self.dlnk_surf_rule for weight in rule_weights]) - varlnk += np.sum([weight * weight * self.dlnk_surf_training * self.dlnk_surf_training for weight in training_weights]) - else: - # Add the contributions from rules - varlnk += np.sum([weight * weight * self.dlnk_rule * self.dlnk_rule for weight in rule_weights]) - # Add the contributions from training - # Even though these source from training reactions, we actually - # use the uncertainty for rate rules, since these are now approximations - # of the original reaction. We consider these to be independent of original the training - # parameters because the rate rules may be reversing the training reactions, - # which leads to more complicated dependence - varlnk += np.sum([weight * weight * self.dlnk_rule * self.dlnk_rule for weight in training_weights]) + if 'surface' in family_label.lower(): + varlnk += np.sum([weight * weight * self.dlnk_surf_rule * self.dlnk_surf_rule for weight in rule_weights]) + varlnk += np.sum([weight * weight * self.dlnk_surf_training * self.dlnk_surf_training for weight in training_weights]) + else: + # Add the contributions from rules + varlnk += np.sum([weight * weight * self.dlnk_rule * self.dlnk_rule for weight in rule_weights]) + # Add the contributions from training + # Even though these source from training reactions, we actually + # use the uncertainty for rate rules, since these are now approximations + # of the original reaction. We consider these to be independent of original the training + # parameters because the rate rules may be reversing the training reactions, + # which leads to more complicated dependence + varlnk += np.sum([weight * weight * self.dlnk_rule * self.dlnk_rule for weight in training_weights]) return np.sqrt(varlnk) @@ -239,7 +260,7 @@ def get_partial_uncertainty_value(self, source, corr_source_type, corr_param=Non Obtain the partial uncertainty dlnk/dlnk_corr*dlnk_corr, where dlnk_corr is the correlated parameter `corr_param` is the parameter identifier itself, which is the string identifier of the rate rule - `corr_source_type` is a string, being either 'Rate Rules', 'Library', 'PDep', 'Training' or 'Estimation' + `corr_source_type` is a string, being either 'Rate Rules', 'Library', 'PDep', 'Training', 'Estimation Nonexact', or 'Estimation Family' `corr_family` is a string used only when the source type is 'Rate Rules' and indicates the family """ @@ -291,24 +312,22 @@ def get_partial_uncertainty_value(self, source, corr_source_type, corr_param=Non else: return self.dlnk_training - elif corr_source_type == 'Estimation': - # Return all the uncorrelated uncertainty associated with using an estimation scheme + elif corr_source_type == 'Estimation Nonexact': + # Return the uncorrelated uncertainty associated with using a non-exact rate rule if 'Rate Rules' in source: source_dict = source['Rate Rules'][1] exact = source_dict['exact'] - - family_label = source['Rate Rules'][0] - dlnk = self.dlnk_family # Base uncorrelated uncertainty just from using rate rule estimation - - # Additional uncertainty from using non-exact rate rule N = len(source_dict['rules']) + len(source_dict['training']) if not exact: # nonexactness contribution increases as N increases - dlnk += np.log10(N + 1) * self.dlnk_nonexact - return dlnk + return np.log10(N + 1) * self.dlnk_nonexact + elif corr_source_type == 'Estimation Family': + # Return the uncertainty associated with using a family decision tree + if 'Rate Rules' in source: + return self.dlnk_family else: - raise Exception('Kinetics correlated source must be Rate Rules, Library, PDep, Training, or Estimation') + raise Exception('Kinetics correlated source must be Rate Rules, Library, PDep, Training, Estimation Nonexact, or Estimation Family') # If we get here, it means that we did not find the correlated parameter in the source return None @@ -322,6 +341,51 @@ def get_uncertainty_factor(self, source): dlnk = self.get_uncertainty_value(source) f = np.sqrt(3) / np.log(10) * dlnk return f + + def _get_covariance_qq(self, q_label1, q_label2): + """ + Gets the covariance between two intermediate sources q1 and q2 + Where q_label1 and q_label2 are both labels for a given kinetics source + + The possible intermediate parameter types are: + Library, Surface_Library, PDEP, Training, Rate Rule, + Estimation Family, Estimation Nonexact, Surface Training, and Surface Rate Rule + """ + + intermediate_parameters = { + 'Library': self.dlnk_library, + 'Surface_Library': self.dlnk_surf_library, + 'PDep': self.dlnk_pdep, + 'Training': self.dlnk_training, + 'Rate Rule': self.dlnk_rule, + 'Estimation Family': self.dlnk_family, + 'Estimation Nonexact': self.dlnk_nonexact, + 'Surface Training': self.dlnk_surf_training, + 'Surface Rate Rule': self.dlnk_surf_rule, + } + + corr_type1 = None + corr_type2 = None + + # figure out the intermediate parameter type + for intermediate_type in intermediate_parameters.keys(): + if q_label1.startswith(intermediate_type): + corr_type1 = intermediate_type + if q_label2.startswith(intermediate_type): + corr_type2 = intermediate_type + + if corr_type1 is None or corr_type2 is None: + raise ValueError(f'Could not determine the type of the correlated parameters from their labels {q_label1} and {q_label2}') + + if corr_type1 != corr_type2: + # If the two correlated parameters are of different types, we consider them to be uncorrelated + return 0 + + elif q_label1 == q_label2: + # If the two correlated parameters are exactly the same, return the variance of that parameter + return intermediate_parameters[corr_type1] ** 2.0 + + return 0 class Uncertainty(object): @@ -343,8 +407,16 @@ def __init__(self, species_list=None, reaction_list=None, output_directory=''): self.reaction_sources_dict = None self.all_thermo_sources = None self.all_kinetic_sources = None - self.thermo_input_uncertainties = None - self.kinetic_input_uncertainties = None + self.thermo_input_uncertainties = None # previous formulation thermo parameter uncertainties + self.kinetic_input_uncertainties = None # previous formulation kinetic parameter uncertainties + self.thermo_intermediate_uncertainties = None # new formulation thermo parameter uncertainties - can be dependent on each other + self.kinetic_intermediate_uncertainties = None # new formulation kinetic parameter uncertainties - can be dependent on each other + self.thermo_covariance_matrix = None # covariance matrix of all species thermo uncertainties + self.kinetic_covariance_matrix = None # covariance matrix of all reaction kinetic uncertainties + self.Sigma_ww_thermo = None # covariance matrix of all underlying thermo parameter uncertainties + self.Sigma_ww_kinetics = None # covariance matrix of all underlying kinetics parameter uncertainties + self.all_thermo_intermediates = None # list of labels of underlying thermo parameters + self.all_kinetics_intermediates = None # list of labels of underlying kinetic parameters self.output_directory = output_directory if output_directory else os.getcwd() # For extra species needed for correlated analysis but not in model @@ -546,28 +618,7 @@ def extract_sources_from_model(self): # Do nothing here because training source already saves the entry from the training reaction pass elif 'Rate Rules' in source: - # Fetch standard deviation if autogenerated tree - if source['Rate Rules'][1]['autogenerated']: - std_dev = 1.329 # Default value is uniform distribution with upper bound of 10x the nominal value - n_train = None - try: - # try to look up the node variance in the rule data's uncertainty object - std_dev = np.sqrt(source['Rate Rules'][1]['rules'][0][0].data.uncertainty.var) - n_train = int(source['Rate Rules'][1]['rules'][0][0].data.uncertainty.N) - except AttributeError: - # fall back on the standard deviation reported in the long description of the rule - # this is the absolute average deviation and not the standard deviation of the node, but it is still probably better than the default - # note, we'd need abs(mu) to be able to convert back to std_dev from absolute average deviation, which is why we're not doing it here - long_desc = source['Rate Rules'][1]['rules'][0][0].long_desc - # This search string handles scientific notation or regular float with optional + or - sign - std_dev_matches = re.search(r'Standard Deviation in ln\(k\): ([-+]?(?:\d+(?:\.\d*)?|\.\d+)(?:[eE][-+]?\d+)?)', long_desc) - if std_dev_matches is not None: - std_dev = float(std_dev_matches[1]) - n_train_matches = re.search('rule fitted to ([0-9]*) training reactions', long_desc) - if n_train_matches is not None: - n_train = int(n_train_matches[1]) - source['Rate Rules'][1]['node_std_dev'] = std_dev - source['Rate Rules'][1]['node_n_train'] = n_train + pass else: raise Exception('Source of kinetics must be either Library, PDep, Training, or Rate Rules') self.reaction_sources_dict[reaction] = source @@ -706,7 +757,7 @@ def assign_parameter_uncertainties(self, g_param_engine=None, k_param_engine=Non for adsGroupType, groupList in source['ADS'].items(): for group, weight in groupList: pdG = g_param_engine.get_partial_uncertainty_value(source, 'ADS', group, adsGroupType) - label = 'AdsorptionGroup({}) {}'.format(adsGroupType, group.label) + label = 'AdsorptionCorrection({}) {}'.format(adsGroupType, group.label) dG[label] = pdG if 'GAV' in source: for groupType, groupList in source['GAV'].items(): @@ -733,23 +784,32 @@ def assign_parameter_uncertainties(self, g_param_engine=None, k_param_engine=Non source_dict = source['Rate Rules'][1] rules = source_dict['rules'] training = source_dict['training'] + surface_prefix = '' + if reaction.is_surface_reaction(): + surface_prefix = 'Surface ' + for ruleEntry, weight in rules: dplnk = k_param_engine.get_partial_uncertainty_value(source, 'Rate Rules', corr_param=ruleEntry, corr_family=family) - label = '{} {}'.format(family, ruleEntry) + label = '{}Rate Rule {} {}'.format(surface_prefix, family, ruleEntry) dlnk[label] = dplnk for ruleEntry, trainingEntry, weight in training: dplnk = k_param_engine.get_partial_uncertainty_value(source, 'Rate Rules', corr_param=ruleEntry, corr_family=family) - label = '{} {}'.format(family, ruleEntry) + label = '{}Rate Rule {} {}'.format(surface_prefix, family, ruleEntry) dlnk[label] = dplnk - # There is also estimation error if rate rules are used - est_dplnk = k_param_engine.get_partial_uncertainty_value(source, 'Estimation') - if est_dplnk: - label = 'Estimation {}'.format(reaction.to_chemkin(self.species_list, kinetics=False)) - dlnk[label] = est_dplnk + # There is also estimation error if rate rules are used (nonexact and family contribute to this) + nonexact_dplnk = k_param_engine.get_partial_uncertainty_value(source, 'Estimation Nonexact', corr_family=family) + if nonexact_dplnk: + label = 'Estimation Nonexact {}'.format(reaction.to_chemkin(self.species_list, kinetics=False)) + dlnk[label] = nonexact_dplnk + + family_dplnk = k_param_engine.get_partial_uncertainty_value(source, 'Estimation Family', corr_family=family) + if family_dplnk: + label = 'Estimation Family {}'.format(reaction.to_chemkin(self.species_list, kinetics=False)) + dlnk[label] = family_dplnk elif 'PDep' in source: dplnk = k_param_engine.get_partial_uncertainty_value(source, 'PDep', source['PDep']) @@ -774,6 +834,132 @@ def assign_parameter_uncertainties(self, g_param_engine=None, k_param_engine=Non self.kinetic_input_uncertainties.append(dlnk) + def assign_intermediate_uncertainties(self, g_param_engine=None, k_param_engine=None, correlated=False): + """ + Assign uncertainties to the intermediate parameters based on the sources of the species thermo and reaction kinetics. + + This fills out the class variables thermo_intermediate_uncertainties and kinetic_intermediate_uncertainties + these are each list of dictionaries. For every species or reaction, it lists all the intermediate sources contributing to that parameter's uncertainty. + + So for example, thermo_intermediate_uncertainties might look something like this: + + thermo_intermediate_uncertainties = [ + {'Group(group) Cds-CdsHH': 2.0, 'Group(radical) CCJ': 1.0, 'Estimation CH(4)': 1.0}, + {'Library CH2(5)': 1.0}, + ] + The keys of the dictionaries are the label names for the intermediate parameters. + and the values are partial derivatives dG_i/dq_w, how the species i Gibbs uncertainty changes with the intermediate parameter w. + + This function is the new formulation's equivalent to assign_parameter_uncertainties and similarly handles both correlated and uncorrelated cases. + But instead of assuming all underlying parameters are independent, here we can allow for dependence as long as we have the covariance + """ + if g_param_engine is None: + g_param_engine = ThermoParameterUncertainty() + if k_param_engine is None: + k_param_engine = KineticParameterUncertainty() + + self.thermo_intermediate_uncertainties = [] # store the intermediate dG_i/dq for each parameter q that contributes to the uncertainty of G_i, for use in correlated uncertainty analysis + self.kinetic_intermediate_uncertainties = [] + + for species in self.species_list: + if not correlated: + dG = g_param_engine.get_uncertainty_value(self.species_sources_dict[species]) + self.thermo_intermediate_uncertainties.append(dG) # in the uncorrelated case, the intermediate is just the uncertainty value itself, since there is only one parameter that contributes to the uncertainty + else: + source = self.species_sources_dict[species] + dGdq = {} + if 'Library' in source: + try: + label = 'Library {}'.format(self.species_list[source['Library']].to_chemkin()) + except IndexError: + label = 'Library {}'.format(self.extra_species[source['Library'] - len(self.species_list)].to_chemkin()) + dGdq[label] = 1 # dG/dG_lib = 1, because the parameter is never scaled by anything other than 1 when it is used + if 'Surface_Library' in source: + try: + label = 'Surface_Library {}'.format(self.species_list[source['Surface_Library']].to_chemkin()) + except IndexError: + label = 'Surface_Library {}'.format(self.extra_species[source['Surface_Library'] - len(self.species_list)].to_chemkin()) + dGdq[label] = 1 # dG/dG_surf = 1, because the parameter is never scaled by anything other than 1 when it is used + if 'QM' in source: + label = 'QM {}'.format(self.species_list[source['QM']].to_chemkin()) + dGdq[label] = 1 + if 'ADS' in source: + for adsGroupType, groupList in source['ADS'].items(): + for group, weight in groupList: + label = 'AdsorptionCorrection({}) {}'.format(adsGroupType, group.label) + if weight != 1: + raise ValueError('Weight for adsorption group contribution to thermo should be 1, but got weight={weight} for {adsGroupType} in species {species}'.format(weight=weight, adsGroupType=adsGroupType, species=species)) + dGdq[label] = weight # This should be 1 + if 'GAV' in source: + for groupType, groupList in source['GAV'].items(): + for group, weight in groupList: + label = 'Group({}) {}'.format(groupType, group.label) + dGdq[label] = weight # dG/dG_group = weight, because the group contribution is scaled by the weight when it is used in the thermo estimation + # We also know if there is group additivity used, there will be uncorrelated estimation error + label = 'Estimation {}'.format(species.to_chemkin()) + dGdq[label] = 1 # dG/dG_est = 1, because the estimation error is added on top of the group additivity value, so it is never scaled by anything other than 1 when it is used + + self.thermo_intermediate_uncertainties.append(dGdq) + + for reaction in self.reaction_list: + if not correlated: + dlnk = k_param_engine.get_uncertainty_value(self.reaction_sources_dict[reaction]) + self.kinetic_intermediate_uncertainties.append(dlnk) # in the uncorrelated case, the intermediate is just the uncertainty value itself, since there is only one parameter that contributes to the uncertainty + else: + source = self.reaction_sources_dict[reaction] + dlnkdq = {} + if 'Rate Rules' in source: + family = source['Rate Rules'][0] + source_dict = source['Rate Rules'][1] + rules = source_dict['rules'] + training = source_dict['training'] + exact = source_dict['exact'] + surface_prefix = '' + if reaction.is_surface_reaction(): + surface_prefix = 'Surface ' + for ruleEntry, weight in rules: + label = '{}Rate Rule {} {}'.format(surface_prefix, family, ruleEntry) + dlnkdq[label] = weight # dlnk/dlnk_rule = weight, because the rate rule is scaled by the weight when it is used in the kinetics estimation + + for ruleEntry, trainingEntry, weight in training: + # TODO - test that training reactions in a tree are correlated with the exact match kind of training reaction + # for now, we follow the old convention of treating these as rate rules + label = '{}Rate Rule {} {}'.format(surface_prefix, family, ruleEntry) # ruleEntry should probably be the reaction equation itself + dlnkdq[label] = weight # dlnk/dlnk_training = weight, because the training entry is scaled by the weight when it is used in the kinetics estimation + + # There is also estimation error if rate rules are used + # Record dlnk/dlnk_family, the derivative with respect to the family estimation uncertainty + label = 'Estimation Family {}'.format(reaction.to_chemkin(self.species_list, kinetics=False)) + dlnkdq[label] = 1 # dlnk/dlnk_family = 1, because the family estimation uncertainty is added on top of the rate rule values, so it is never scaled by anything other than 1 when it is used + + # Record the non-exact estimation error if not an exact match for a rate rule + if not exact: + N = len(source_dict['rules']) + len(source_dict['training']) + label = 'Estimation Nonexact {}'.format(reaction.to_chemkin(self.species_list, kinetics=False)) + dlnkdq[label] = np.log10(N + 1) + + elif 'PDep' in source: + label = 'PDep {}'.format(reaction.to_chemkin(self.species_list, kinetics=False)) + dlnkdq[label] = 1.0 # dlnk/dlnk_PDep = 1, because the PDep kinetics is never scaled by anything other than 1 when it is used + + elif 'Library' in source: + label = 'Library {}'.format(reaction.to_chemkin(self.species_list, kinetics=False)) + dlnkdq[label] = 1.0 # dlnk/dlnk_lib = 1, because the library kinetics is never scaled by anything other than 1 when it is used + + elif 'Surface_Library' in source: + label = 'Surface_Library {}'.format(reaction.to_chemkin(self.species_list, kinetics=False)) + dlnkdq[label] = 1.0 # dlnk/dlnk_surf_lib = 1, because the surface library kinetics is never scaled by anything other than 1 when it is used + + elif 'Training' in source: + family = source['Training'][0] + surface_prefix = '' + if reaction.is_surface_reaction(): + surface_prefix = 'Surface ' + label = '{}Training {} {}'.format(surface_prefix, family, reaction.to_chemkin(self.species_list, kinetics=False)) + dlnkdq[label] = 1.0 + + self.kinetic_intermediate_uncertainties.append(dlnkdq) + def sensitivity_analysis(self, initial_mole_fractions, sensitive_species, T, P, termination_time, sensitivity_threshold=1e-3, number=10, fileformat='.png'): """ @@ -925,6 +1111,311 @@ def local_analysis(self, sensitive_species, reaction_system_index=0, correlated= return output + def local_analysis_intermediate(self, sensitive_species, reaction_system_index=0, correlated=False, number=10, + fileformat='.png', t=None): + """ + local uncertainty analysis using new formulation where parameters might not be fully independent + + sensitive_species is a list of sensitive Species objects + number is the number of highest contributing uncertain parameters desired to be plotted + fileformat can be either .png, .pdf, or .svg + + t is the time in seconds at which to perform the analysis. + The default (None) uses the final timestep in the sensitivity analysis + + returns a dictionary of tuples for every sensitive species with: + - the total variance in concentration of that species + - the kinetic contributions to variance of that species's concentration + - the thermo contributions to variance of that species's concentration + """ + + output = {} + for sens_species in sensitive_species: + # 1. ------------------------- Get sensitivities -------------------------- + csvfile_path = os.path.join(self.output_directory, 'solver', + 'sensitivity_{0}_SPC_{1}.csv'.format(reaction_system_index + 1, + sens_species.index)) + time, data_list = parse_csv_data(csvfile_path) + + if t is None: + t_index = -1 + else: + t_index = int(np.argmin(np.abs(time.data - t))) + + # get the sensitivities and compile a list of the species and reaction indices actually used + # record sensitivities for all time, so sensitivity array is #time steps x #species + species_sensitivity_full_array = np.zeros(len(self.species_list)) + reaction_sensitivity_full_array = np.zeros(len(self.reaction_list)) + + # keeping track of which species/reactions are sensitive gives us a speedup in computing covariance matrices later on + species_used = [] + reactions_used = [] + for data in data_list: + if data.species: + for species in self.species_list: + if species.to_chemkin() == data.species: + index = self.species_list.index(species) + break + else: + raise ValueError(f'Chemkin name {data.species} of species in the CSV file does not match anything in the species list.') + species_sensitivity_full_array[index] = data.data[t_index] + species_used.append(index) + + if data.reaction: + rxn_index = int(data.index) - 1 + reaction_sensitivity_full_array[rxn_index] = data.data[t_index] + reactions_used.append(rxn_index) + species_used = sorted(species_used) + reactions_used = sorted(reactions_used) + + # shorten the sensitivity vectors to only the nonzero elements + species_sensitivity = np.array([species_sensitivity_full_array[i] for i in species_used]) + reaction_sensitivity = np.array([reaction_sensitivity_full_array[i] for i in reactions_used]) + + # 2. ------------------------- Compute intermediate covariance -------------------------- + # Now get the covariance matrix of the intermediate parameters (if uncorrelated, these are just covariance(G_i, G_j) or covariance(lnk_i, lnk_j) + Sigma_qq_thermo = self._get_intermediate_thermo_covariance_matrix(subset_indices=species_used) + Sigma_qq_kinetics = self._get_intermediate_kinetics_covariance_matrix(subset_indices=reactions_used) + + # 3. ------------------------- Compute partial derivatives dG/dq or dlnk/dq -------------------------- + # get the parital derivative of G or lnk with respect to intermediates (if uncorrelated, these are identity matrices) + dG_dq = self._get_dG_dq_matrix(subset_indices=species_used) + dlnkdq = self._get_dlnk_dq_matrix(subset_indices=reactions_used) + N_q_thermo = dG_dq.shape[1] # the number of thermo contributions + N_q_kinetics = dlnkdq.shape[1] # the number of kinetics contributions + + # 4. ------------------------- Multiply all matrices together to get total variance -------------------------- + # total variance = + # species_sensitivity * dG_dq * Sigma_qq_thermo * dG_dq' * species_sensitivity' + + # reaction_sensitivity * dlnk_dq * Sigma_qq_kinetics * dlnk_dq' * reaction_sensitivity' + + + # we split this into: + # species_sensitivity' * dG_dq' and Sigma_qq_thermo * dG_dq * species_sensitivity + # because this gives us the contributions of each intermediate parameter + thermo_contributions = np.multiply(np.dot(species_sensitivity, dG_dq), np.dot(Sigma_qq_thermo, np.dot(dG_dq.T, species_sensitivity.T)).T) + kinetic_contributions = np.multiply(np.dot(reaction_sensitivity, dlnkdq), np.dot(Sigma_qq_kinetics, np.dot(dlnkdq.T, reaction_sensitivity.T)).T) + + total_variance = np.sum(thermo_contributions) + np.sum(kinetic_contributions) + + # 5. ------------------------- Make plots -------------------------- + # define labels if they're not already listed in all_thermo/kinetics_intermediates + if self.all_thermo_intermediates is None or len(self.all_thermo_intermediates) != N_q_thermo: + self.all_thermo_intermediates = [f'dln[{sens_species.to_chemkin()}]/dG[{self.species_list[sp_idx].to_chemkin()}]' for sp_idx in species_used] + if self.all_kinetics_intermediates is None or len(self.all_kinetics_intermediates) != N_q_kinetics: + self.all_kinetics_intermediates = ['k' + str(self.reaction_list[rxn_idx].index) + ': ' + self.reaction_list[rxn_idx].to_chemkin(kinetics=False) for rxn_idx in reactions_used] + + # append all data points + thermo_plotting_data = [] + kinetics_plotting_data = [] + for i in range(N_q_thermo): + label = self.all_thermo_intermediates[i] + thermo_plotting_data.append(GenericData(data=[np.sqrt(thermo_contributions[i])], uncertainty=1.0, label=label, species='dummy')) + for i in range(N_q_kinetics): + label = self.all_kinetics_intermediates[i] + kinetics_plotting_data.append(GenericData(data=[np.sqrt(kinetic_contributions[i])], uncertainty=1.0, label=label, reaction='dummy')) + + # set up the folders and filenames for plotting + folder = os.path.join(self.output_directory, 'uncorrelated') + if correlated: + folder = os.path.join(self.output_directory, 'correlated') + os.makedirs(folder, exist_ok=True) + + r_path = os.path.join(folder, f'kineticsLocalUncertainty_{sens_species.to_chemkin()}{fileformat}') + t_path = os.path.join(folder, f'thermoLocalUncertainty_{sens_species.to_chemkin()}{fileformat}') + reaction_uncertainty = ReactionSensitivityPlot(x_var=time, y_var=kinetics_plotting_data, num_reactions=number).uncertainty_plot(total_variance, filename=r_path) + thermo_uncertainty = ThermoSensitivityPlot(x_var=time, y_var=thermo_plotting_data, num_species=number).uncertainty_plot(total_variance, filename=t_path) + + output[sens_species] = (total_variance, reaction_uncertainty, thermo_uncertainty) + + return output + + def get_thermo_covariance_matrix(self, g_param_engine=None): + """ + Return the thermo covariance matrix as a numpy array. + NxN square matrix where N is the number of species in the model, + with the covariance between species i and j in the ith row and jth column. + Units are in (kcal/mol)^2. + Must call assign_intermediate_uncertainties first to populate the source dictionaries. + + TODO speed this up with sparse matrix multiplication? + """ + assert self.thermo_intermediate_uncertainties is not None, 'Must call assign_intermediate_uncertainties first' + assert len(self.thermo_intermediate_uncertainties) > 0, 'No thermodynamic parameters found' + if isinstance(self.thermo_intermediate_uncertainties[0], np.float64): + self.thermo_covariance_matrix = np.float_power(np.diag(self.thermo_intermediate_uncertainties), 2.0) + return self.thermo_covariance_matrix + + self.thermo_covariance_matrix = np.zeros((len(self.species_list), len(self.species_list))) + + if g_param_engine is None: + g_param_engine = ThermoParameterUncertainty() + + for i in range(len(self.species_list)): + for j in range((len(self.species_list))): + for q in self.thermo_intermediate_uncertainties[i].keys(): + dG_i_dq = self.thermo_intermediate_uncertainties[i][q] + for r in self.thermo_intermediate_uncertainties[j].keys(): + dG_j_dr = self.thermo_intermediate_uncertainties[j][r] + self.thermo_covariance_matrix[i, j] += dG_i_dq * g_param_engine._get_covariance_qq(q, r) * dG_j_dr + + return self.thermo_covariance_matrix + + def get_kinetic_covariance_matrix(self, k_param_engine=None): + """ + Return the kinetic covariance matrix as a numpy array. + MxM square matrix where M is the number of reactions in the model, + with the covariance between reaction i and j in the ith row and jth column. + Units are in (ln(k))^2. + Must call assign_intermediate_uncertainties first to populate the source dictionaries. + + TODO speed this up with sparse matrix multiplication? + """ + assert self.kinetic_intermediate_uncertainties is not None, 'Must call assign_intermediate_uncertainties first' + assert len(self.kinetic_intermediate_uncertainties) > 0, 'No kinetic parameters found' + if isinstance(self.kinetic_intermediate_uncertainties[0], np.float64): + self.kinetic_covariance_matrix = np.float_power(np.diag(self.kinetic_intermediate_uncertainties), 2.0) + return self.kinetic_covariance_matrix + + if k_param_engine is None: + k_param_engine = KineticParameterUncertainty() + + self.kinetic_covariance_matrix = np.zeros((len(self.reaction_list), len(self.reaction_list))) + + for i in range(len(self.reaction_list)): + for j in range(len(self.reaction_list)): + for q in self.kinetic_intermediate_uncertainties[i].keys(): + dlnk_i_dq = self.kinetic_intermediate_uncertainties[i][q] + for r in self.kinetic_intermediate_uncertainties[j].keys(): + dlnk_j_dr = self.kinetic_intermediate_uncertainties[j][r] + self.kinetic_covariance_matrix[i, j] += dlnk_i_dq * k_param_engine._get_covariance_qq(q, r) * dlnk_j_dr + + return self.kinetic_covariance_matrix + + def _get_intermediate_thermo_covariance_matrix(self, g_param_engine=None, subset_indices=None): + """ + Make an explicit covariance matrix of all the qs (intermediate thermo parameters, like specific groups or library entries) + + Requires calling assign_intermediate_uncertainties first + + if subset_indices is None, computes the full matrix. + Otherwise, only computes the matrices relevant to the species indicated by subset_indices + """ + if subset_indices is None: + subset_indices = np.arange(len(self.species_list)) + + if isinstance(self.thermo_intermediate_uncertainties[0], np.float64): + self.Sigma_ww_thermo = np.diag(np.float_power([self.thermo_intermediate_uncertainties[i] for i in subset_indices], 2.0)) + return self.Sigma_ww_thermo + + if g_param_engine is None: + g_param_engine = ThermoParameterUncertainty() + + self.all_thermo_intermediates = set() + for sp_idx in subset_indices: + for q in self.thermo_intermediate_uncertainties[sp_idx].keys(): + self.all_thermo_intermediates.add(q) + self.all_thermo_intermediates = list(self.all_thermo_intermediates) + W = len(self.all_thermo_intermediates) + + self.Sigma_ww_thermo = np.zeros((W, W)) + for i in range(W): + q_i = self.all_thermo_intermediates[i] + for j in range(i + 1): + q_j = self.all_thermo_intermediates[j] + self.Sigma_ww_thermo[i, j] = g_param_engine._get_covariance_qq(q_i, q_j) + self.Sigma_ww_thermo[j, i] = self.Sigma_ww_thermo[i, j] # symmetric matrix + return self.Sigma_ww_thermo + + def _get_intermediate_kinetics_covariance_matrix(self, k_param_engine=None, subset_indices=None): + """ + Make an explicit covariance matrix of all the qs (intermediate kinetic parameters, like specific rate rules or libraries entries) + + Requires calling assign_intermediate_uncertainties first + + if subset_indices is None, computes the full matrix. + Otherwise, only computes the matrices relevant to the reactions indicated by subset_indices + """ + if subset_indices is None: + subset_indices = np.arange(len(self.reaction_list)) + + if isinstance(self.kinetic_intermediate_uncertainties[0], np.float64): + # TODO this might have to be squared + self.Sigma_ww_kinetics = np.diag(np.float_power([self.kinetic_intermediate_uncertainties[i] for i in subset_indices], 2.0)) + return self.Sigma_ww_kinetics + + if k_param_engine is None: + k_param_engine = KineticParameterUncertainty() + + self.all_kinetics_intermediates = set() + for rxn_idx in subset_indices: + for q in self.kinetic_intermediate_uncertainties[rxn_idx].keys(): + self.all_kinetics_intermediates.add(q) + self.all_kinetics_intermediates = list(self.all_kinetics_intermediates) + W = len(self.all_kinetics_intermediates) + + self.Sigma_ww_kinetics = np.zeros((W, W)) + for i in range(W): + q_i = self.all_kinetics_intermediates[i] + for j in range(i + 1): + q_j = self.all_kinetics_intermediates[j] + self.Sigma_ww_kinetics[i, j] = k_param_engine._get_covariance_qq(q_i, q_j) + self.Sigma_ww_kinetics[j, i] = self.Sigma_ww_kinetics[i, j] # symmetric matrix + return self.Sigma_ww_kinetics + + def _get_dG_dq_matrix(self, subset_indices=None): + """ + Returns an nxW matrix of partial derivatives where + n is number of species (or subset species) and W is number of relevant intermediate paramaters + + subset_indices is the set of species indices that matter + if subset_indices is None, it computes the full matrix + use at your own risk! + + assumes that get_intermediate_thermo_covariance_matrix was called with matching subset_indices + if not, then matrix dimensions will probably not match up for later multiplication, + which will signal the user that there's a problem + """ + if subset_indices is None: + subset_indices = np.arange(len(self.species_list)) + + # return a square identity matrix if uncorrelated + if isinstance(self.thermo_intermediate_uncertainties[0], np.float64): + return np.eye(len(subset_indices)) + + dGdq = np.zeros((len(subset_indices), len(self.all_thermo_intermediates))) + + for i, sp_idx in enumerate(subset_indices): + for key in self.thermo_intermediate_uncertainties[sp_idx].keys(): + q_index = self.all_thermo_intermediates.index(key) + dGdq[i, q_index] = self.thermo_intermediate_uncertainties[sp_idx][key] + + return dGdq + + def _get_dlnk_dq_matrix(self, subset_indices=None): + """ + Returns an mxW matrix of partial derivatives where + m is number of reactions and W is number of intermediate paramaters + + assumes that get_intermediate_kinetic_covariance_matrix was called with matching subset_indices + if not, then matrix dimensions will probably not match up for later multiplication, + which will signal the user that there's a problem + """ + if subset_indices is None: + subset_indices = np.arange(len(self.reaction_list)) + + # return a square identity matrix if uncorrelated + if isinstance(self.kinetic_intermediate_uncertainties[0], np.float64): + return np.eye(len(subset_indices)) + + dlnkdq = np.zeros((len(subset_indices), len(self.all_kinetics_intermediates))) + + for i, rxn_idx in enumerate(subset_indices): + for key in self.kinetic_intermediate_uncertainties[rxn_idx].keys(): + q_index = self.all_kinetics_intermediates.index(key) + dlnkdq[i, q_index] = self.kinetic_intermediate_uncertainties[rxn_idx][key] + + return dlnkdq + def process_local_results(results, sensitive_species, number=10): """ diff --git a/test/rmgpy/tools/chemDir/solver/sensitivity_1_SPC_18.csv b/test/rmgpy/tools/chemDir/solver/sensitivity_1_SPC_18.csv new file mode 100644 index 0000000000..6cd25bd13a --- /dev/null +++ b/test/rmgpy/tools/chemDir/solver/sensitivity_1_SPC_18.csv @@ -0,0 +1,6 @@ +Time (s),dln[C2H6(18)]/dln[k1]: O(0)+H2O2(3)<=>OH(1)+HO2(2),dln[C2H6(18)]/dln[k2]: CH3(14)+PC3H7(15)<=>CH4(16)+CH2CH2CH2(17),dln[C2H6(18)]/dln[k3]: C2H6(18)+PC3H7(15)<=>C2H5(12)+C3H8(19),dln[C2H6(18)]/dln[k4]: C2H3(20)+C3H8(19)<=>C2H4(11)+PC3H7(15),dln[C2H6(18)]/dln[k5]: CH3(14)+C3H8(19)<=>CH4(16)+PC3H7(15),dln[C2H6(18)]/dln[k6]: CH3(14)+C2H6(18)<=>CH4(16)+C2H5(12),dln[C2H6(18)]/dln[k7]: HCCO(10)(+M)<=>O(0)+C2H(8)(+M),dln[C2H6(18)]/dln[k8]: C2H5(12)+CH3CHCH3(21)<=>C2H6(18)+C3H6(22),dln[C2H6(18)]/dln[k9]: C3H5(24)+CH2CH2CH2(17)<=>C3H5(23)+C3H6(22),dln[C2H6(18)]/dln[k10]: CH3(14)+C2H5(12)<=>CH4(16)+C2H4(11),dln[C2H6(18)]/dG[C2H6(18)],dln[C2H6(18)]/dG[CH4(16)],dln[C2H6(18)]/dG[CH2(5)],dln[C2H6(18)]/dG[CH(4)],dln[C2H6(18)]/dG[C2H3(20)],dln[C2H6(18)]/dG[CH3(14)],dln[C2H6(18)]/dG[C2H4(11)],dln[C2H6(18)]/dG[C2H5(12)] +0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0 +2.74888478006841E-18,-2.54948202221999E-16,-2.09986641193727E-28,-1.68177517988414E-18,9.47815581205925E-43,-4.74240993812363E-29,1.56391028678446E-54,6.05624693198534E-47,-1.10008312605422E-28,3.64619470520282E-56,-3.31477117062407E-66,-9.93392859924497E-17,-9.89301193768683E-48,6.51000897308177E-19,6.51000897344892E-19,1.83574663375722E-29,-4.06622729584823E-53,3.06108016791651E-49,4.22004499424344E-59 +1E-12,-9.27460693505743E-11,-2.0460917755381E-17,-6.1181805764998E-13,3.40947333001306E-26,-4.62135447625443E-18,1.34821506115326E-32,2.66263273777064E-30,-1.07185782928551E-17,3.14317510387031E-34,-7.91446790404999E-39,-3.61380457995889E-11,-2.37250016862895E-31,2.36827788154325E-13,2.36831365928509E-13,1.78888709197875E-18,-8.03285951906872E-37,7.34152354170913E-33,2.43214792225303E-37 +6.14209255203048E-06,-0.0108552687245762,-0.00977366113631576,0.000596136582961715,0.00200221535342703,-0.00846755042343432,0.00013450902403258,0.000108628449374283,-0.00628567915883179,8.65139619102715E-05,-1.74774283067548E-05,-0.00452281325424481,1.26216421675667E-05,-0.00328956503163016,0.00347689688022537,0.00337489330053554,-8.94698376820376E-06,2.89680320928337E-06,6.86042557778715E-07 +0.000511952119227999,-0.259795045759184,-0.127199037182731,0.201488393268386,0.147536983612322,-0.0820373529691973,0.0227660362837704,0.0633512248907625,-0.692292464491573,0.0214205601994878,-0.0568546779535624,-0.119942334769817,0.0416873316334017,-0.146923796990328,0.184660504897779,0.142566434880232,-0.0247332248569192,0.00807885139388224,0.00380827975950797 diff --git a/test/rmgpy/tools/uncertaintyTest.py b/test/rmgpy/tools/uncertaintyTest.py index 14a4e26f37..35ac31049a 100644 --- a/test/rmgpy/tools/uncertaintyTest.py +++ b/test/rmgpy/tools/uncertaintyTest.py @@ -28,7 +28,7 @@ ############################################################################### import os - +import copy import numpy as np @@ -48,8 +48,10 @@ def setup_class(cls): chemkin_file = os.path.join(chem_dir, "chem_annotated.inp") spc_dict = os.path.join(chem_dir, "species_dictionary.txt") - cls.uncertainty = Uncertainty(output_directory="chemDir") + cls.uncertainty = Uncertainty(output_directory=os.path.abspath(os.path.join(os.path.dirname(__file__), "chemDir"))) cls.uncertainty.load_model(chemkin_file, spc_dict) + for i in range(len(cls.uncertainty.species_list)): + cls.uncertainty.species_list[i].index = i # local analysis depends on species being indexed # load database properly cls.uncertainty.database = RMGDatabase() @@ -174,10 +176,235 @@ def test_uncertainty_assignment(self): ) np.testing.assert_allclose( kinetic_unc, - [0.5, 1.118, 1.9783, 1.9783, 1.5363, 0.5, 2.0, 5.9369, 5.9369, 0.5], + [0.5, 1.118, 1.9783, 1.9783, 1.5363, 0.5, 2.0, 1.5363, 1.5363, 0.5], rtol=1e-4 ) + def test_local_analysis(self): + """ + Test to run uncorrelated and then correlated local_analysis and make sure the results are expected + """ + # variances are listed in decreasing order + # names are listed in order of decreasing variance contribution + expected_uncorrelated_total_variance = 1.8329056941266446 + expected_uncorrelated_thermo_variances = np.array([0.17092419, 0.09781627, 0.06186124, 0.04856985, 0.00391013, 0.00306632, 0.00041446, 9.953e-05]) + expected_uncorrelated_kinetics_variances = np.array([1.1311145, 0.15888459, 0.085189, 0.02022449, 0.01687337, 0.01605351, 0.01588366, 0.0010829, 0.00080811, 0.00012957]) + expected_correlated_total_variance = 1.7732795017083922 + expected_correlated_thermo_variances = np.array([0.09145902, 0.07672388, 0.04856985, 0.04573167, 0.03236887, 0.01747643, 0.01098087, 0.00143231, 0.0013764, 0.00031352, 0.00028968, 0.00014685, 0.0001338, 3.263e-05]) + expected_correlated_kinetics_variances = np.array([0.53202843, 0.47926886, 0.11981721, 0.11321232, 0.06070094, 0.04059757, 0.02176716, 0.01687337, 0.0161796, 0.01605351, 0.007471, 0.00673013, 0.0040449, 0.00253735, 0.00253735, 0.00168253, 0.00136045, 0.00136045, 0.00080811, 0.00050935, 0.00045884, 0.00012957, 0.00011471]) + expected_uncorrelated_thermo_labels = [ + 'dln[C2H6(18)]/dG[CH(4)]', + 'dln[C2H6(18)]/dG[C2H3(20)]', + 'dln[C2H6(18)]/dG[C2H6(18)]', + 'dln[C2H6(18)]/dG[CH2(5)]', + 'dln[C2H6(18)]/dG[CH4(16)]', + 'dln[C2H6(18)]/dG[CH3(14)]', + 'dln[C2H6(18)]/dG[C2H4(11)]', + 'dln[C2H6(18)]/dG[C2H5(12)]', + ] + expected_uncorrelated_kinetics_labels = [ + 'k8: C2H5(12)+CH3CHCH3(21)<=>C2H6(18)+C3H6(22)', + 'k3: C2H6(18)+PC3H7(15)<=>C2H5(12)+C3H8(19)', + 'k4: C2H3(20)+C3H8(19)<=>C2H4(11)+PC3H7(15)', + 'k2: CH3(14)+PC3H7(15)<=>CH4(16)+CH2CH2CH2(17)', + 'k1: O(0)+H2O2(3)<=>OH(1)+HO2(2)', + 'k7: HCCO(10)(+M)<=>O(0)+C2H(8)(+M)', + 'k5: CH3(14)+C3H8(19)<=>CH4(16)+PC3H7(15)', + 'k9: C3H5(24)+CH2CH2CH2(17)<=>C3H5(23)+C3H6(22)', + 'k10: CH3(14)+C2H5(12)<=>CH4(16)+C2H4(11)', + 'k6: CH3(14)+C2H6(18)<=>CH4(16)+C2H5(12)', + ] + expected_correlated_thermo_labels = [ + 'Library CH4(16)', + 'Estimation CH(4)', + 'Library CH2(5)', + 'Estimation C2H3(20)', + 'Estimation C2H6(18)', + 'Group(radical) CJ3', + 'Group(radical) CCJ', + 'Group(group) Cs-CsHHH', + 'Estimation CH3(14)', + 'Group(radical) CH3', + 'Group(other) R', + 'Estimation C2H4(11)', + 'Group(group) Cds-CdsHH', + 'Estimation C2H5(12)', + ] + expected_correlated_kinetics_labels = [ + 'Estimation Nonexact C2H5(12)+CH3CHCH3(21)<=>C2H6(18)+C3H6(22)', + 'Estimation Family C2H5(12)+CH3CHCH3(21)<=>C2H6(18)+C3H6(22)', + 'Rate Rule Disproportionation Root_Ext-2R!H-R_2R!H->C_4R->C', + 'Estimation Nonexact C2H6(18)+PC3H7(15)<=>C2H5(12)+C3H8(19)', + 'Estimation Nonexact C2H3(20)+C3H8(19)<=>C2H4(11)+PC3H7(15)', + 'Estimation Family C2H6(18)+PC3H7(15)<=>C2H5(12)+C3H8(19)', + 'Estimation Family C2H3(20)+C3H8(19)<=>C2H4(11)+PC3H7(15)', + 'Library O(0)+H2O2(3)<=>OH(1)+HO2(2)', + 'Estimation Family CH3(14)+PC3H7(15)<=>CH4(16)+CH2CH2CH2(17)', + 'PDep HCCO(10)(+M)<=>O(0)+C2H(8)(+M)', + 'Estimation Nonexact CH3(14)+C3H8(19)<=>CH4(16)+PC3H7(15)', + 'Estimation Family CH3(14)+C3H8(19)<=>CH4(16)+PC3H7(15)', + 'Rate Rule H_Abstraction C/H3/Cs;C_methyl', + 'Rate Rule H_Abstraction C/H3/Cs\\H3;C_rad/H2/Cs\\H\\Cs\\Cs|O', + 'Rate Rule H_Abstraction C/H3/Cs\\H3;C_rad/H2/Cs\\H3', + 'Rate Rule H_Abstraction C/H3/Cs\\H2\\O;C_methyl', + 'Rate Rule H_Abstraction C/H3/Cs\\H3;Cd_Cd\\H2_pri_rad', + 'Rate Rule H_Abstraction C/H3/Cs\\H2\\Cs|O;Cd_Cd\\H2_rad/Cs', + 'Training Disproportionation CH3(14)+C2H5(12)<=>CH4(16)+C2H4(11)', + 'Estimation Nonexact C3H5(24)+CH2CH2CH2(17)<=>C3H5(23)+C3H6(22)', + 'Estimation Family C3H5(24)+CH2CH2CH2(17)<=>C3H5(23)+C3H6(22)', + 'Training H_Abstraction CH3(14)+C2H6(18)<=>CH4(16)+C2H5(12)', + 'Rate Rule Disproportionation Root_Ext-1R!H-R_N-4R->O_N-Sp-5R!H=1R!H_Ext-4CHNS-R_N-6R!H->S_4CHNS->C_N-Sp-6BrBrBrCCCClClClFFFIIINNNOOOPPPSiSiSi#4C_6BrCClFINOPSi->C_N-1R!H-inRing_N-Sp-6C-4C', + ] + + sensitive_species = [self.uncertainty.species_list[18]] + + # uncorrelated analysis first + self.uncertainty.assign_parameter_uncertainties() + output = self.uncertainty.local_analysis(sensitive_species=sensitive_species) + total_variance, kinetic_uncertainty, thermo_uncertainty = output[sensitive_species[0]] + assert np.isclose(total_variance, expected_uncorrelated_total_variance) + + # order of kinetic or thermo uncertainty is not guaranteed, this sorts by contribution + kinetic_variances = [r[2] for r in kinetic_uncertainty] + kinetics_names = [r[0] for r in kinetic_uncertainty] + sorted_kinetics_names = [x for _, x in sorted(zip(kinetic_variances, kinetics_names))][::-1] + sorted_kinetic_variances = sorted(kinetic_variances, reverse=True) + assert np.isclose(sorted_kinetic_variances, expected_uncorrelated_kinetics_variances).all() + assert sorted_kinetics_names == expected_uncorrelated_kinetics_labels + + thermo_variances = [s[2] for s in thermo_uncertainty] + thermo_names = [s[0] for s in thermo_uncertainty] + sorted_thermo_names = [x for _, x in sorted(zip(thermo_variances, thermo_names))][::-1] + sorted_thermo_variances = sorted(thermo_variances, reverse=True) + assert np.isclose(sorted_thermo_variances, expected_uncorrelated_thermo_variances).all() + assert sorted_thermo_names == expected_uncorrelated_thermo_labels + + # now repeat for correlated analysis + self.uncertainty.assign_parameter_uncertainties(correlated=True) + output = self.uncertainty.local_analysis(sensitive_species=sensitive_species, correlated=True) + total_variance, kinetic_uncertainty, thermo_uncertainty = output[sensitive_species[0]] + assert np.isclose(total_variance, expected_correlated_total_variance) + + # order of kinetic or thermo uncertainty is not guaranteed, this sorts by contribution + kinetic_variances = [r[2] for r in kinetic_uncertainty] + kinetics_names = [r[0] for r in kinetic_uncertainty] + sorted_kinetic_variances = sorted(kinetic_variances, reverse=True) + sorted_kinetics_names = [x for _, x in sorted(zip(kinetic_variances, kinetics_names))][::-1] + assert np.isclose(sorted_kinetic_variances, expected_correlated_kinetics_variances).all() + assert sorted_kinetics_names == expected_correlated_kinetics_labels + + thermo_variances = [s[2] for s in thermo_uncertainty] + thermo_names = [s[0] for s in thermo_uncertainty] + sorted_thermo_variances = sorted(thermo_variances, reverse=True) + sorted_thermo_names = [x for _, x in sorted(zip(thermo_variances, thermo_names))][::-1] + assert np.isclose(sorted_thermo_variances, expected_correlated_thermo_variances).all() + assert sorted_thermo_names == expected_correlated_thermo_labels + + # -------------------- repeat the exact same test for new formulation -------------------------- + # uncorrelated analysis first + self.uncertainty.assign_intermediate_uncertainties() + output = self.uncertainty.local_analysis_intermediate(sensitive_species=sensitive_species) + total_variance, kinetic_uncertainty, thermo_uncertainty = output[sensitive_species[0]] + assert np.isclose(total_variance, expected_uncorrelated_total_variance) + + # order of kinetic or thermo uncertainty is not guaranteed, this sorts by contribution + kinetic_variances = [r[2] for r in kinetic_uncertainty] + kinetics_names = [r[0] for r in kinetic_uncertainty] + sorted_kinetics_names = [x for _, x in sorted(zip(kinetic_variances, kinetics_names))][::-1] + sorted_kinetic_variances = sorted(kinetic_variances, reverse=True) + assert np.isclose(sorted_kinetic_variances, expected_uncorrelated_kinetics_variances).all() + assert sorted_kinetics_names == expected_uncorrelated_kinetics_labels + + thermo_variances = [s[2] for s in thermo_uncertainty] + thermo_names = [s[0] for s in thermo_uncertainty] + sorted_thermo_names = [x for _, x in sorted(zip(thermo_variances, thermo_names))][::-1] + sorted_thermo_variances = sorted(thermo_variances, reverse=True) + assert np.isclose(sorted_thermo_variances, expected_uncorrelated_thermo_variances).all() + assert sorted_thermo_names == expected_uncorrelated_thermo_labels + + # now repeat for correlated analysis + self.uncertainty.assign_intermediate_uncertainties(correlated=True) + output = self.uncertainty.local_analysis_intermediate(sensitive_species=sensitive_species, correlated=True) + total_variance, kinetic_uncertainty, thermo_uncertainty = output[sensitive_species[0]] + assert np.isclose(total_variance, expected_correlated_total_variance) + + # order of kinetic or thermo uncertainty is not guaranteed, this sorts by contribution + kinetic_variances = [r[2] for r in kinetic_uncertainty] + kinetics_names = [r[0] for r in kinetic_uncertainty] + sorted_kinetic_variances = sorted(kinetic_variances, reverse=True) + sorted_kinetics_names = [x for _, x in sorted(zip(kinetic_variances, kinetics_names))][::-1] + assert np.isclose(sorted_kinetic_variances, expected_correlated_kinetics_variances).all() + assert sorted_kinetics_names == expected_correlated_kinetics_labels + + thermo_variances = [s[2] for s in thermo_uncertainty] + thermo_names = [s[0] for s in thermo_uncertainty] + sorted_thermo_variances = sorted(thermo_variances, reverse=True) + sorted_thermo_names = [x for _, x in sorted(zip(thermo_variances, thermo_names))][::-1] + assert np.isclose(sorted_thermo_variances, expected_correlated_thermo_variances).all() + assert sorted_thermo_names == expected_correlated_thermo_labels + + def test_covariance_matrices(self): + """ + Test that the covariance matrices are being constructed correctly, and that the correlated uncertainties are different from the uncorrelated ones + """ + + # have to add an extra reaction to see any kinetic correlations + # copy reaction 4 and change the index so it is a new reaction, but with the same source (rate rule) as the original reaction + extra_reaction = copy.deepcopy(self.uncertainty.reaction_list[4]) + self.uncertainty.reaction_list.append(extra_reaction) + try: # this will still error out if there's a problem, but will reset the reaction list so it doesn't affect other tests + self.uncertainty.extract_sources_from_model() # this will assign the same source to the new reaction as the original reaction + + self.uncertainty.assign_parameter_uncertainties(correlated=False) + uncorrelated_thermo_inputs = np.array(self.uncertainty.thermo_input_uncertainties) + uncorrelated_kinetic_inputs = np.array(self.uncertainty.kinetic_input_uncertainties) + + self.uncertainty.assign_intermediate_uncertainties(correlated=False) + uncorrelated_thermo_covariance = self.uncertainty.get_thermo_covariance_matrix() + uncorrelated_kinetic_covariance = self.uncertainty.get_kinetic_covariance_matrix() + + self.uncertainty.assign_intermediate_uncertainties(correlated=True) + correlated_thermo_covariance = self.uncertainty.get_thermo_covariance_matrix() + correlated_kinetic_covariance = self.uncertainty.get_kinetic_covariance_matrix() + Sigma_ww_thermo = self.uncertainty._get_intermediate_thermo_covariance_matrix() + Sigma_ww_kinetics = self.uncertainty._get_intermediate_kinetics_covariance_matrix() + finally: + self.uncertainty.reaction_list.pop() # remove the extra reaction so it doesn't affect other tests + + # check that the diagonal elements of the correlated and uncorrelated covariance matrices are the same and equal to the squares of the input uncertainties + np.testing.assert_allclose(np.diag(uncorrelated_thermo_covariance), np.float_power(uncorrelated_thermo_inputs, 2.0), rtol=1e-4) + np.testing.assert_allclose(np.diag(correlated_thermo_covariance), np.float_power(uncorrelated_thermo_inputs, 2.0), rtol=1e-4) + np.testing.assert_allclose(np.diag(uncorrelated_kinetic_covariance), np.float_power(uncorrelated_kinetic_inputs, 2.0), rtol=1e-4) + np.testing.assert_allclose(np.diag(correlated_kinetic_covariance), np.float_power(uncorrelated_kinetic_inputs, 2.0), rtol=1e-4) + + # check that the off-diagonal elements of the uncorrelated covariance matrix are zero + off_diagonal_kinetic_uncorrelated = uncorrelated_kinetic_covariance - np.diag(np.diag(uncorrelated_kinetic_covariance)) + assert np.allclose(off_diagonal_kinetic_uncorrelated, 0, atol=1e-8) + off_diagonal_thermo_uncorrelated = uncorrelated_thermo_covariance - np.diag(np.diag(uncorrelated_thermo_covariance)) + assert np.allclose(off_diagonal_thermo_uncorrelated, 0, atol=1e-8) + + # check that the off-diagonal elements of the correlated covariance matrix are not all zero + off_diagonal_kinetic_correlated = correlated_kinetic_covariance - np.diag(np.diag(correlated_kinetic_covariance)) + assert not np.allclose(off_diagonal_kinetic_correlated, 0, atol=1e-8) + off_diagonal_thermo_correlated = correlated_thermo_covariance - np.diag(np.diag(correlated_thermo_covariance)) + assert not np.allclose(off_diagonal_thermo_correlated, 0, atol=1e-8) + + # check that the correlated covariance matrices are symmetric + assert np.allclose(correlated_kinetic_covariance, correlated_kinetic_covariance.T, atol=1e-8) + assert np.allclose(correlated_thermo_covariance, correlated_thermo_covariance.T, atol=1e-8) + assert np.allclose(Sigma_ww_kinetics, Sigma_ww_kinetics.T, atol=1e-8) + assert np.allclose(Sigma_ww_thermo, Sigma_ww_thermo.T, atol=1e-8) + + # check that the matrix is positive semi-definite by confirming that all eigenvalues are non-negative + kinetic_eigenvalues = np.linalg.eigvals(correlated_kinetic_covariance) + assert np.all(kinetic_eigenvalues >= -1e-8) # allow for small numerical errors + thermo_eigenvalues = np.linalg.eigvals(correlated_thermo_covariance) + assert np.all(thermo_eigenvalues >= -1e-8) # allow for small numerical errors + intermediate_kinetic_eigenvalues = np.linalg.eigvals(Sigma_ww_kinetics) + assert np.all(intermediate_kinetic_eigenvalues >= -1e-8) + intermediate_thermo_eigenvalues = np.linalg.eigvals(Sigma_ww_thermo) + assert np.all(intermediate_thermo_eigenvalues >= -1e-8) + def test_specific_species_uncertainties(self): """ Test uncertainties for a few specific examples