fix(GraphNetwork): working on loads of small bugs

Fized stoichiometry matrix initialization, added penames to reablib reactions, began work on LogicalReaction to sum the contributions of different fitting functions provided by reaclib
This commit is contained in:
2025-06-23 15:18:56 -04:00
parent 9f6e360b0f
commit dd03873bc9
31 changed files with 101449 additions and 19120 deletions

View File

@@ -1,9 +1,15 @@
import re
import sys
from collections import defaultdict
from typing import List, Tuple
from re import Match
from typing import List, Tuple, Any, LiteralString
import numpy as np
from serif.atomic import species
from serif.atomic import Species
from serif.constants import Constants
import hashlib
from collections import Counter
import math
#import dataclasses
from dataclasses import dataclass
@@ -18,18 +24,11 @@ class Reaction:
coeffs: List[float]
projectile: str # added
ejectile: str # added
rpName: str # added
reactionType: str # added (e.g. "(p,γ)")
reverse: bool
def format_rp_name(self) -> str:
# radiative or particle captures: 2 reactants -> 1 product
if len(self.reactants) == 2 and len(self.products) == 1:
target = self.reactants[0]
prod = self.products[0]
return f"{target}({self.projectile},{self.ejectile}){prod}"
# fallback: join lists
react_str = '+'.join(self.reactants)
prod_str = '+'.join(self.products)
return f"{react_str}->{prod_str}"
return self.rpName
def __repr__(self):
@@ -61,7 +60,7 @@ class ReaclibParseError(Exception):
def format_cpp_identifier(name: str) -> str:
name_map = {'p': 'H_1', 'n': 'n_1', 'd': 'd', 't': 't', 'a': 'a'}
name_map = {'p': 'H_1', 'd': 'H_2', 't': 'H_3', 'n': 'n_1', 'a': 'He_4'}
if name.lower() in name_map:
return name_map[name.lower()]
match = re.match(r"([a-zA-Z]+)(\d+)", name)
@@ -70,7 +69,7 @@ def format_cpp_identifier(name: str) -> str:
return f"{element.capitalize()}_{mass}"
return f"{name.capitalize()}_1"
def parse_reaclib_entry(entry: str) -> Tuple[List[str], str, float, List[float], bool]:
def parse_reaclib_entry(entry: str) -> tuple[Match[str] | None, bool]:
pattern = re.compile(r"""^([1-9]|1[0-1])\r?\n
[ \t]*
((?:[A-Za-z0-9-*]+[ \t]+)*
@@ -104,35 +103,161 @@ def get_rp(group: str, chapter: int) -> Tuple[List[str], List[str]]:
products = species[nReact:nReact + nProd]
return reactants, products
def translate_names_to_species(names: List[str]) -> List[Species]:
sp = list()
split_alpha_digits = lambda inputString: re.match(r'([A-Za-z]+)[-+*]?(\d+)$', inputString).groups()
for name in names:
if name in ('t', 'a', 'd', 'n', 'p'):
name = {'t': 'H-3', 'a': 'He-4', 'd': 'H-2', 'n': 'n-1', 'p': 'H-1'}[name]
else:
name = '-'.join(split_alpha_digits(name)).capitalize()
try:
sp.append(species[name])
except Exception as e:
print("Error: Species not found in database:", name, e)
raise ReaclibParseError(f"Species '{name}' not found in species database.", line_content=name)
return sp
def determine_reaction_type(reactants: List[str],
products: List[str],
qValue: float
) -> Tuple[str, List[str], List[str], str]:
"""
Return (targetToken, projectiles, ejectiles, residualToken)
• targetToken the nucleus that appears before the parenthesis (A)
• projectiles every explicit projectile that must be written inside ( … )
• ejectiles every explicit ejectile that must be written after the comma
• residualToken the main heavy product that appears after the parenthesis (D)
Photons and neutrinos are added / omitted exactly the way JINA REACLIB expects:
γ is explicit only when it is a **projectile** (photodisintegration)
ν/ν̄ are never explicit
"""
if abs(qValue - 4.621) < 1e-6:
print("Looking at he3(he3, 2p)he4")
# --- helper look-ups ----------------------------------------------------
reactantSpecies = translate_names_to_species(reactants)
productSpecies = translate_names_to_species(products)
# Heaviest reactant → target (A); heaviest product → residual (D)
targetSpecies = max(reactantSpecies, key=lambda s: s.mass())
residualSpecies = max(productSpecies, key=lambda s: s.mass())
# Any other nuclear reactant is the normal projectile candidate
nuclearProjectiles = [x for x in reactantSpecies]
nuclearProjectiles.remove(targetSpecies)
nuclearEjectiles = [x for x in productSpecies]
nuclearEjectiles.remove(residualSpecies)
# --- bulk bookkeeping (nuclei only) -------------------------------------
aReact = sum(s.a() for s in reactantSpecies)
zReact = sum(s.z() for s in reactantSpecies)
nReact = len(reactantSpecies)
aProd = sum(s.a() for s in productSpecies)
zProd = sum(s.z() for s in productSpecies)
nProd = len(productSpecies)
dA = aReact - aProd # must be 0 abort if not
dZ = zReact - zProd # ≠0 ⇒ leptons needed
dN = nReact - nProd # ±1 ⇒ photon candidate
assert dA == 0, (
f"Baryon number mismatch: A₍react₎={aReact}, A₍prod₎={aProd}"
)
projectiles: List[str] = []
ejectiles: List[str] = []
# -----------------------------------------------------------------------
# 1. Charged-lepton bookkeeping (|ΔZ| = 1) ------------------------------
# -----------------------------------------------------------------------
if abs(dZ) == 1:
# Proton → neutron (β⁻ / e- capture)
if dZ == -1:
# Electron capture when (i) exo-thermic and (ii) nucleus count unchanged
if qValue > 0 and dN == 0:
projectiles.append("e-") # write e- as projectile
else:
ejectiles.append("e-") # β⁻ decay: e- is an ejectile
# Neutron → proton (β⁺ / positron capture capture is vanishingly rare)
elif dZ == 1:
ejectiles.append("e+") # β⁺ / weak-proton capture
# Neutrino companion is implicit never written
# (dL is automatically fixed by hiding ν or ν̄)
# -----------------------------------------------------------------------
# 2. Photon bookkeeping (ΔZ = 0) ----------------------------------------
# -----------------------------------------------------------------------
if dZ == 0:
# Two → one nucleus and exothermic ⇒ radiative capture (γ ejectile, implicit)
if dN == 1 and qValue > 0:
ejectiles.append("g")
pass # γ is implicit; nothing to write
# One → two nuclei and endothermic ⇒ photodisintegration (γ projectile, explicit)
elif dN == -1 and qValue < 0:
projectiles.append("g")
# -----------------------------------------------------------------------
# 3. Add the ordinary nuclear projectile (if any) -----------------------
# -----------------------------------------------------------------------
if nuclearProjectiles:
for nucP in nuclearProjectiles:
name = nucP.name().replace("-", "").lower()
if name in ('h1', 'h2', 'h3', 'he4', 'n1', 'p'):
name = name.replace('h1', 'p').replace('h2', 'd').replace('h3', 't').replace('he4', 'a').replace('n1', 'n')
projectiles.append(name) # REACLIB allows exactly one
if nuclearEjectiles:
for nucE in nuclearEjectiles:
name = nucE.name().replace("-", "").lower()
if name in ('h1', 'h2', 'h3', 'he4', 'n1', 'p'):
name = name.replace('h1', 'p').replace('h2', 'd').replace('h3', 't').replace('he4', 'a').replace('n1', 'n')
ejectiles.append(name)
# -----------------------------------------------------------------------
# 4. Build return values -------------------------------------------------
# -----------------------------------------------------------------------
targetToken = targetSpecies.name().replace("-", "").lower()
residualToken = residualSpecies.name().replace("-", "").lower()
if targetToken in ('h1', 'h2', 'h3', 'n1', 'p'):
targetToken = targetToken.replace('h1', 'p').replace('h2', 'd').replace('h3', 't').replace('n1', 'n')
if residualToken in ('h1', 'h2', 'h3', 'n1', 'p'):
residualToken = residualToken.replace('h1', 'p').replace('h2', 'd').replace('h3', 't').replace('n1', 'n')
uniqueProjectiles = set(projectiles)
uniqueEjectiles = set(ejectiles)
numPerUniqueProjectiles = {x: projectiles.count(x) for x in uniqueProjectiles}
numPerUniqueEjectiles = {x: ejectiles.count(x) for x in uniqueEjectiles}
formatedProjectileNames = [f"{numPerUniqueProjectiles[x]}{x}" if numPerUniqueProjectiles[x] > 1 else x for x in uniqueProjectiles]
formatedEjectileNames = [f"{numPerUniqueEjectiles[x]}{x}" if numPerUniqueEjectiles[x] > 1 else x for x in uniqueEjectiles]
rType = f"({" ".join(formatedProjectileNames)},{' '.join(formatedEjectileNames)})"
reactionKey = f"{targetToken}{rType}{residualToken}"
return targetToken, projectiles, ejectiles, residualToken, reactionKey, rType
def determine_reaction_type(reactants: List[str], products: List[str]) -> Tuple[str, str, str]:
# assumes no reverse flag applied
projectile = ''
ejectile = ''
# projectile is the lighter reactant (p, n, he4)
for sp in reactants:
if sp in ('p', 'n', 'he4'):
projectile = sp
break
# ejectile logic
if len(products) == 1:
ejectile = 'g'
elif 'he4' in products:
ejectile = 'a'
elif 'p' in products:
ejectile = 'p'
elif 'n' in products:
ejectile = 'n'
reactionType = f"({projectile},{ejectile})"
return projectile, ejectile, reactionType
def extract_groups(match: re.Match, reverse: bool) -> Reaction:
groups = match.groups()
chapter = int(groups[0].strip())
rawGroup = groups[1].strip()
rList, pList = get_rp(rawGroup, chapter)
if 'c12' in rList and 'mg24' in pList:
print("Found it!")
if reverse:
rList, pList = pList, rList
proj, ejec, rType = determine_reaction_type(rList, pList)
qValue = float(groups[3].strip())
target, proj, ejec, residual, key, rType = determine_reaction_type(rList, pList, qValue)
reaction = Reaction(
reactants=rList,
products=pList,
@@ -142,10 +267,12 @@ def extract_groups(match: re.Match, reverse: bool) -> Reaction:
coeffs=[float(groups[i].strip()) for i in range(4, 11)],
projectile=proj,
ejectile=ejec,
rpName=key,
reactionType=rType,
reverse=reverse
)
return reaction
def format_emplacment(reaction: Reaction) -> str:
reactantNames = [f'{format_cpp_identifier(r)}' for r in reaction.reactants]
productNames = [f'{format_cpp_identifier(p)}' for p in reaction.products]
@@ -163,19 +290,23 @@ def format_emplacment(reaction: Reaction) -> str:
chapter_str = reaction.chapter
rate_sets_str = ', '.join([str(x) for x in reaction.coeffs])
emplacment: str = f"s_all_reaclib_reactions.emplace(\"{label}\", REACLIBReaction(\"{label}\", {chapter_str}, {{{reactants_str}}}, {{{products_str}}}, {q_value_str}, \"{reaction.label}\", {{{rate_sets_str}}}, {"true" if reaction.reverse else "false"}));"
emplacment: str = f"s_all_reaclib_reactions.emplace(\"{label}\", REACLIBReaction(\"{label}\", \"{reaction.format_rp_name()}\", {chapter_str}, {{{reactants_str}}}, {{{products_str}}}, {q_value_str}, \"{reaction.label}\", {{{rate_sets_str}}}, {"true" if reaction.reverse else "false"}));"
return emplacment
def generate_reaclib_header(reaclib_filepath: str, culling: float, T9: float, verbose: bool) -> str:
def generate_reaclib_header(reaclib_filepath: str, culling: float, T9: float, verbose: bool) -> tuple[
LiteralString, int | Any, int | Any]:
"""
Parses a JINA REACLIB file using regular expressions and generates a C++ header file string.
Args:
reaclib_filepath: The path to the REACLIB data file.
culling: The threshold for culling reactions based on their rates at T9.
T9: The temperature in billions of Kelvin to evaluate the reaction rates for culling.
verbose: If True, prints additional information about skipped reactions.
Returns:
A string containing the complete C++ header content.
@@ -190,7 +321,12 @@ def generate_reaclib_header(reaclib_filepath: str, culling: float, T9: float, ve
for entry in entries:
m, r = parse_reaclib_entry(entry)
if m is not None:
reac = extract_groups(m, r)
try:
reac = extract_groups(m, r)
except ReaclibParseError as e:
continue
if verbose:
print(f"Parsed reaction: {reac.format_rp_name()} ({reac.coeffs}) with label {reac.label} (reverse: {reac.reverse})")
reactions.append(reac)
# --- Generate the C++ Header String ---
@@ -204,8 +340,8 @@ def generate_reaclib_header(reaclib_filepath: str, culling: float, T9: float, ve
"// Includes %%TOTAL%% reactions.",
"// Note: Only reactions with species defined in the atomicSpecies.h header will be included at compile time.",
"#pragma once",
"#include \"atomicSpecies.h\"",
"#include \"species.h\"",
"#include \"fourdst/composition/atomicSpecies.h\"",
"#include \"fourdst/composition/species.h\"",
"#include \"reaclib.h\"",
"\nnamespace gridfire::reaclib {\n",
"""
@@ -218,7 +354,13 @@ def generate_reaclib_header(reaclib_filepath: str, culling: float, T9: float, ve
]
totalSkipped = 0
totalIncluded = 0
energy = list()
energyFile = open('energy.txt', 'w')
energyFile.write("name;maxEnergy;QValue,reactants;products;a0;a1;a2;a3;a4;a5;a6\n")
for reaction in reactions:
maxEnergy = calculate_peak_importance(reaction)
energyFile.write(f"{reaction.format_rp_name()};{maxEnergy};{reaction.qValue};{' '.join(reaction.reactants)};{' '.join(reaction.products)};{';'.join([str(x) for x in reaction.coeffs])}\n")
energy.append(maxEnergy)
reactantNames = [f'{format_cpp_identifier(r)}' for r in reaction.reactants]
productNames = [f'{format_cpp_identifier(p)}' for p in reaction.products]
reactionName = f"{'_'.join(reactantNames)}_to_{'_'.join(productNames)}_{reaction.label.upper()}"
@@ -226,11 +368,13 @@ def generate_reaclib_header(reaclib_filepath: str, culling: float, T9: float, ve
rate = evaluate_rate(reaction.coeffs, T9)
if rate < culling:
if verbose:
print(f"Skipping reaction {reactionName} with rate {rate:.6e} at T9={T9} (culling threshold: {culling} at T9={T9})")
print(f"Skipping reaction {reaction.format_rp_name()} ({reactionName}) with rate {rate:.6e} at T9={T9} (culling threshold: {culling} at T9={T9})")
totalSkipped += 1
continue
else:
totalIncluded += 1
else:
totalIncluded += 1
defines = ' && '.join(set([f"defined(SERIF_SPECIES_{name.upper().replace('-', '_min_').replace('+', '_add_').replace('*', '_mult_')})" for name in reactantNames + productNames]))
cpp_lines.append(f" #if {defines}")
@@ -238,8 +382,64 @@ def generate_reaclib_header(reaclib_filepath: str, culling: float, T9: float, ve
cpp_lines.append(f" {emplacment}")
cpp_lines.append(f" #endif // {defines}")
cpp_lines.append("\n }\n} // namespace gridfire::reaclib\n")
energyFile.close()
#save energy data to a file
return "\n".join(cpp_lines), totalSkipped, totalIncluded
def calculate_peak_importance(reaction: Reaction) -> float:
TGrid = np.logspace(-3, 2, 100) # Temperature grid from 0.001 to 100 T9
RhoGrid = np.logspace(0.0, 6.0, 100) # Density grid from 1e0 to 1e3 g/cm^3
N_A: float = Constants['N_a'].value
u: float = Constants['u'].value
max_energy_proxy: float = 0.0
if not reaction.reactants:
return 0.0
numReactants: int = len(reaction.reactants)
maxRate: float = 0.0
reactantCount: Counter = Counter(reaction.reactants)
factorial_correction: float = 1.0
for count in reactantCount.values():
if count > 1:
factorial_correction *= math.factorial(count)
molar_correction_factor = 1.0
if numReactants > 1:
molar_correction_factor = N_A ** (numReactants - 1)
Y_ideal = 1.0 / numReactants
mass_term = 1.0
split_alpha_digits = lambda inputString: re.match(r'([A-Za-z]+)(\d+)$', inputString).groups()
for reactant in reaction.reactants:
try:
if reactant in ('t', 'a', 'he4', 'd', 'n', 'p'):
reactant = {'t': 'H-3', 'a': 'He-4', 'he4': 'He-4', 'd': 'H-2', 'n': 'n-1', 'p': 'H-1'}[reactant]
else:
reactant = '-'.join(split_alpha_digits(reactant)).capitalize()
# print(f"Parsing reactant {reactant} using split_alpha_digits")
reactantMassAMU = species[reactant].mass()
reactantMassG = reactantMassAMU * u
mass_term *= (Y_ideal/ reactantMassG)
except Exception as e:
# print(f"Error: Reactant {reactant} not found in species database. (what: {e})")
return 0.0
for T9 in TGrid:
k_reaclib = evaluate_rate(reaction.coeffs, T9)
for rho in RhoGrid:
n_product_no_rho = mass_term / factorial_correction
full_rate = (n_product_no_rho *( rho ** numReactants) * k_reaclib) / molar_correction_factor
energy_proxy = full_rate * abs(reaction.qValue)
if energy_proxy > max_energy_proxy:
max_energy_proxy = energy_proxy
print(f"For reaction {reaction.format_rp_name()}, max energy proxy: {max_energy_proxy:.6e} MeV")
return max_energy_proxy
# def smart_cull(reactions: List[Reaction], verbose: bool = False):
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser(description="Generate a C++ header from a REACLIB file.")