"""
Helper functions for the "main.py" file.
Helps run simulations and manages output files.
"""
# Imports
import os, yaml, copy, subprocess, shutil, time
from datetime import datetime
import pandas as pd
from mpi4py import MPI
# Module imports
from mdss.utils.helpers import ProblemType, MachineType, make_dir, print_msg, load_yaml_input, deep_update, load_csv_data
from mdss.resources.templates import gl_job_script, python_code_for_hpc, python_code_for_subprocess
try:
from mdss.src.aerostruct import Problem
MODULES_NOT_FOUND = False
except:
MODULES_NOT_FOUND = True
comm = MPI.COMM_WORLD
################################################################################
# Code for running simulations
################################################################################
[docs]
def execute(simulation):
"""
Runs the aerodynamic and/or aerostructural simulations as subprocesses.
This method iterates through all hierarchies, cases, refinement levels, and angles of attack defined in the input YAML file. Runs the simulation by calling `aerostruct.py`, and stores the results.
Outputs
-------
- **A CSV file**:
Contains results for each angle of attack at the current refinement level.
- **A YAML file**:
Stores simulation data for each angle of attack in the corresponding directory.
- **A final YAML file**:
Summarizes all simulation results across hierarchies, cases, and refinement levels.
Notes
-----
This method ensures that:
- Existing successful simulations are skipped.
- Directories are created dynamically if they do not exist.
- Simulation results are saved in structured output files.
"""
if MODULES_NOT_FOUND is True and simulation.subprocess_flag is False:
msg = f"""Required module are not present in the current environment. Cannot run without subprocess.
Turn on the subprocess flag and specify the eligible python environment or install the required packages"""
print_msg(msg, 'error', comm)
raise ModuleNotFoundError()
# Store a copy of input YAML file in output directory
input_yaml_file = os.path.join(simulation.out_dir, "input_file.yaml")
if comm.rank == 0:
with open(input_yaml_file, 'w') as input_yaml_handle:
yaml.dump(simulation.sim_info, input_yaml_handle, sort_keys=False)
sim_info_copy = copy.deepcopy(simulation.sim_info) # Copying to run the loop
sim_out_info = copy.deepcopy(simulation.sim_info) # Copying to write the output YAML file
start_time = time.time()
start_wall_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
for hierarchy, hierarchy_info in enumerate(sim_info_copy['hierarchies']): # loop for Hierarchy level
for case, case_info in enumerate(hierarchy_info['cases']): # loop for cases in hierarchy
# Assign problem type
problem_type = ProblemType.from_string(case_info['problem']) # Convert string to enum
# Define case level outptut directory
case_out_dir = os.path.join(simulation.out_dir, hierarchy_info['name'], case_info['name'])
make_dir(case_out_dir, comm)
comm.Barrier()
# Save case info yaml file in the case_out_dir to pass in subprocess
case_info_fpath = os.path.join(case_out_dir, "case_info.yaml")
with open(case_info_fpath, 'w') as case_info_fhandle:
yaml.dump(case_info, case_info_fhandle, sort_keys=False)
for scenario, scenario_info in enumerate(case_info['scenarios']): # loop for scenarios that may present
# Define scenario level output directory
scenario_out_dir = os.path.join(case_out_dir, scenario_info['name'])
make_dir(scenario_out_dir, comm)
comm.Barrier()
# Save case info yaml file in the case_out_dir to pass in subprocess
scenario_info_fpath = os.path.join(scenario_out_dir, "scenario_info.yaml")
with open(scenario_info_fpath, 'w') as scenario_info_fhandle:
yaml.dump(scenario_info, scenario_info_fhandle, sort_keys=False)
# Extract the Angle of attacks for which the simulation has to be run
aoa_list = scenario_info['aoa_list']
aoa_csv_string = '"' + ",".join(map(str, [float(aoa) for aoa in aoa_list])) + '"' # Convert the aoa list to a csv string
scenario_sim_info = {} # Creating scenario level sim info dictionary for overall sim info file
for ii, mesh_file in enumerate(case_info['mesh_files']): # Loop for refinement levels
# Print simulation info message
msg = f"{'Hierarchy':<20}: {hierarchy_info['name']}\n{'Case Name':<20}: {case_info['name']}\n{'Scenario':<20}: {scenario_info['name']}\n{'Aero Mesh File':<20}: {mesh_file}"
print_msg(msg, f"{'SIMULATION INFO':^30}", comm)
AOAList = []
CLList = []
CDList = []
TList = []
FList = [] # Fail flag list
refinement_level_dict = {} # Creating refinement level sim info dictionary for overall sim info file
refinement_out_dir = os.path.join(scenario_out_dir, f"{mesh_file}")
make_dir(refinement_out_dir, comm)
aero_grid_fpath = os.path.join(case_info['meshes_folder_path'], mesh_file)
# Add struct mesh file for aerostructural case else set it to none
if problem_type == ProblemType.AEROSTRUCTURAL:
struct_mesh_file = case_info['struct_options']['mesh_fpath']
else:
struct_mesh_file = 'none'
# Run subprocess
# Initially running all the aoa in a subprocess. However the optimal number of aoa for single subprocess should be determined and modified accordingly.
other_sim_info = {key: value for key, value in sim_info_copy.items() if key != 'hierarchies'} # To pass just the sim_info without hierarchies
if simulation.subprocess_flag is True:
run_as_subprocess(other_sim_info, case_info_fpath, scenario_info_fpath, refinement_out_dir, aoa_csv_string, aero_grid_fpath, struct_mesh_file, comm, simulation.record_subprocess)
elif simulation.subprocess_flag is False:
problem = Problem(case_info_fpath, scenario_info_fpath, refinement_out_dir, aoa_csv_string, aero_grid_fpath, struct_mesh_file)
problem.run()
failed_aoa_list = [] # Initiate a list to store a list of aoa failed in this refinement level
for aoa in aoa_list: # loop for angles of attack reads the info, adds additional info if needed for the output file for each aoa
aoa = float(aoa) # making sure aoa is a float
aoa_out_dir = os.path.join(refinement_out_dir, f"aoa_{aoa}") # aoa output directory -- Written to store in the parent directory
aoa_info_file = os.path.join(aoa_out_dir ,f"aoa_{aoa}.yaml") # name of the simulation info file at the aoa level directory
aoa_level_dict = {} # Creating aoa level sim info dictionary for overall sim info file
# Checking for existing sucessful simualtion info,
try:
with open(aoa_info_file, 'r') as aoa_file: # open the simulation info file
aoa_sim_info = yaml.safe_load(aoa_file)
fail_flag = aoa_sim_info['fail_flag'] # Read the fail flag
if fail_flag == 0: # Refers successful simulation and makes sure only the sucessful simulations are added to the csv file.
# Add the simulation info to list to be saved as a csv file in the refinement out directory
AOAList.append(aoa_sim_info['AOA'])
CLList.append(aoa_sim_info['cl'])
CDList.append(aoa_sim_info['cd'])
TList.append(float(aoa_sim_info['wall_time'].replace(" sec", "")))
FList.append(fail_flag)
# Store the basic info that is needed to be stored in refinement level dictionary
aoa_level_dict = {
'cl': float(aoa_sim_info['cl']),
'cd': float(aoa_sim_info['cd']),
'wall_time': aoa_sim_info['wall_time'],
'fail_flag': int(fail_flag),
'out_dir': aoa_out_dir,
}
refinement_level_dict[f"aoa_{aoa}"] = aoa_level_dict
elif fail_flag == 1: # refers to failed simulation
failed_aoa_list.append(aoa) # Add to the list of failed aoa
# Save the aoa_out_dict as an yaml file with the updated info
with open(aoa_info_file, 'w') as interim_out_yaml:
yaml.dump(aoa_sim_info, interim_out_yaml, sort_keys=False)
except:
failed_aoa_list.append(aoa) # Add to the list of failed aoa
################################# End of AOA loop ########################################
refinement_level_dict["failed_aoa"] = failed_aoa_list
# Write simulation results to a csv file
refinement_level_data = {
"Alpha": [f"{alpha:6.2f}" for alpha in AOAList],
"CL": [f"{cl:8.4f}" for cl in CLList],
"CD": [f"{cd:8.4f}" for cd in CDList],
"FFlag": [f"{int(FF):12f}" for FF in FList],
"WTime": [f"{wall_time:10.2f}" for wall_time in TList]
}
# Define the output file path
refinement_level_dir = os.path.dirname(aoa_out_dir)
ADflow_out_file = os.path.join(refinement_level_dir, "ADflow_output.csv")
df_new = pd.DataFrame(refinement_level_data) # Create a panda DataFrame
# If the file exists, load existing data and append new data
if os.path.exists(ADflow_out_file):
df_existing = load_csv_data(ADflow_out_file, comm)
df_combined = pd.concat([df for df in [df_existing, df_new] if df is not None], ignore_index=True)
else:
df_combined = df_new
# Ensure Alpha column is float for sorting and deduplication
df_combined['Alpha'] = pd.to_numeric(df_combined['Alpha'], errors='coerce')
df_combined.dropna(subset=['Alpha'], inplace=True)
df_combined.drop_duplicates(subset='Alpha', keep='last', inplace=True)
df_combined.sort_values(by='Alpha', inplace=True)
df_combined.to_csv(ADflow_out_file, index=False)
# Add csv file location to the overall simulation out file
refinement_level_dict['csv_file'] = ADflow_out_file
refinement_level_dict['refinement_out_dir'] = refinement_level_dir
# Add refinement level dict to scenario level dict
scenario_sim_info[f"{mesh_file}"] = refinement_level_dict
################################# End of refinement loop ########################################
# Add scenario level simulation to the overall simulation out file
scenario_out_dir = os.path.dirname(refinement_level_dir)
scenario_sim_info['scenario_out_dir'] = scenario_out_dir
sim_out_info['hierarchies'][hierarchy]['cases'][case]['scenarios'][scenario]['sim_info'] = scenario_sim_info
if os.path.exists(scenario_info_fpath): # Remove the scenario_info yaml file
if comm.rank==0:
os.remove(scenario_info_fpath)
################################# End of scenarios loop ########################################
if os.path.exists(case_info_fpath): # Remove the case_info yaml file
if comm.rank==0:
os.remove(case_info_fpath)
################################# End of case loop ########################################
################################# End of hierarchy loop ########################################
end_time = time.time()
end_wall_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
net_run_time = end_time - start_time
sim_out_info['overall_sim_info'] = {
'start_time': start_wall_time,
'end_time': end_wall_time,
'total_wall_time': f"{net_run_time:.2f} sec"
}
# Store the final simulation out file.
if os.path.exists(simulation.final_out_file):
prev_sim_info,_ = load_yaml_input(simulation.final_out_file, comm) # Load the previous sim_out_info
deep_update(prev_sim_info, sim_out_info) # Updates old sim data with the new sim data.
final_sim_out_info = prev_sim_info
else:
final_sim_out_info = sim_out_info
if comm.rank == 0:
with open(simulation.final_out_file, 'w') as final_out_yaml_handle:
yaml.dump(final_sim_out_info, final_out_yaml_handle, sort_keys=False)
comm.Barrier()
################################################################################
# Code for generating and submitting job script on HPC
################################################################################
[docs]
def submit_job_on_hpc(sim_info, yaml_file_path, wait_for_job, comm):
"""
Generates and submits job script on an HPC cluster.
This function reads a slurm job script template, updates it with specific HPC parameters and file paths, saves the customized script to the output directory, and submits the job on the HPC cluster.
Inputs
------
- **sim_info** : dict
Dictionary containing simulation details details.
- **yaml_file_path** : str
Path to the YAML file containing simulation information.
- **comm** : MPI communicator
An MPI communicator object to handle parallelism.
Outputs
-------
- **None**
Notes
-----
- Supports customization for the GL cluster with Slurm job scheduling.
- Uses regex to update the job script with provided parameters.
- Ensures that the correct Python and YAML file paths are embedded in the job script.
"""
out_dir = os.path.abspath(sim_info['out_dir'])
hpc_info = sim_info['hpc_info'] # Extract HPC info
python_fname = os.path.join(out_dir, "run_sim.py") # Python script to be run on on HPC
out_file = os.path.join(out_dir, f"{hpc_info['job_name']}_%j.txt")
if hpc_info['cluster'] == 'GL':
# Fill in the template of the job script(can be found in `templates.py`) with values from hpc_info, provided by the user
job_script = gl_job_script.format(
job_name=hpc_info.get('job_name'),
account_name=hpc_info.get('account_name'),
partition=hpc_info.get('partition'),
time=hpc_info.get('time', '1:00:00'),
nodes=hpc_info.get('nodes'),
nproc=hpc_info.get('nproc'),
nproc_per_node=hpc_info.get('nproc_per_node'),
mem_per_cpu=hpc_info.get('mem_per_cpu', '1000m'),
mail_types=hpc_info.get('mail_types', 'NONE'),
email_id=hpc_info.get('email_id', 'NONE'),
out_file=out_file,
python_file_path=python_fname,
yaml_file_path=yaml_file_path
)
job_script_path = os.path.join(out_dir, f"{hpc_info['job_name']}_job_file.sh") # Define the path for the job script
if comm.rank==0:
with open(job_script_path, "w") as file: # Save the job script to be submitted on great lakes
file.write(job_script)
with open(python_fname, "w") as file: # Write the python file(can be found in `templates.py`) to be run using the above created job script.
file.write(python_code_for_hpc)
subprocess_out = subprocess.run(["sbatch", job_script_path], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) # Subprocess to submit the job script on Great Lakes
job_id = subprocess_out.stdout.strip().split()[-1]
print_msg(f"Job {job_id} submitted.", "notice", comm)
if wait_for_job:
while True:
check_cmd = ["squeue", "--job", job_id]
result = subprocess.run(check_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
if job_id not in result.stdout:
print_msg(f"Job {job_id} completed.", 'notice', comm)
break
time.sleep(10) # Check every 10 seconds
return job_id
################################################################################
# Helper Functions for running the simulations as subprocesses
################################################################################
[docs]
def run_as_subprocess(sim_info, case_info_fpath, scenario_info_fpath, ref_out_dir, aoa_csv_string, aero_grid_fpath, struct_mesh_fpath, comm, record_flag=False):
"""
Executes a set of Angles of Attack using mpirun for local machine and srun for HPC(Great Lakes).
Inputs
------
- **sim_info** : dict
Dictionary containing simulation details, such as output directory, job name, and nproc.
- **case_info_fpath** : str
Path to the case info yaml file
- **scenario_info_fpath** : str
Path to the scenario info yaml file
- **ref_out_dir** : str
Path to the refinement level directory
- **aoa_csv_string** : str
A list of angles of attack, in the form of csv string, that to be simulated in this subprocess
- **aero_grid_fpath** :
Path to the aero grid file that to be used for this simulation.
- **struct_mesh_fpath**: str
Path to the structural mesh file that to be used. Pass str(None) when running aero problem.
- **comm** : MPI communicator
An MPI communicator object to handle parallelism.
- **record_flag**: bool=False, Optional
Optional flag, strores ouput of the subprocess in a text file.
Outputs
-------
- **None**
This function does not return any value but performs the following actions:
1. Creates necessary directories and input files.
2. Launches a subprocess to execute the simulation using `mpirun` or `srun`.
3. Prints standard output and error logs from the subprocess for debugging.
Notes
-----
- The function ensures the proper setup of the simulation environment for the given angle of attack.
- The generated Python script and YAML input file are specific to each simulation run.
- Captures and displays `stdout` and `stderr` from the subprocess for troubleshooting.
"""
out_dir = os.path.abspath(sim_info['out_dir'])
python_fname = os.path.join(out_dir, "script_for_subprocess.py")
machine_type = MachineType.from_string(sim_info['machine_type'])
subprocess_out_file = os.path.join(ref_out_dir, "subprocess_out.txt")
shell = False
if not os.path.exists(python_fname): # Saves the python script, that is used to run subprocess in the output directory, if the file do not exist already.
if comm.rank==0:
with open(python_fname, "w") as file: # Open the file in write mode
file.write(python_code_for_subprocess)
env = os.environ.copy()
python_version = sim_info.get('python_version', 'python') # Update python with user defined version or defaults to current python version
if shutil.which(python_version) is None: # Check if the python executable exists
python_version = 'python'
print_msg(f"{python_version} not found! Falling back to default 'python'.", 'warning', comm)
if comm.rank==0:
print_msg(f"Starting subprocess for the following aoa: {aoa_csv_string}", "notice", comm)
if machine_type==MachineType.LOCAL:
nproc = sim_info['nproc']
run_cmd = ['mpirun', '-np', str(nproc), python_version]
elif machine_type==MachineType.HPC:
run_cmd = ['srun', python_version]
run_cmd.extend([python_fname, '--caseInfoFile', case_info_fpath, '--scenarioInfoFile', scenario_info_fpath,
'--refLevelDir', ref_out_dir, '--aoaList', aoa_csv_string, '--aeroGrid', aero_grid_fpath, '--structMesh', struct_mesh_fpath])
with open(subprocess_out_file, "w") as outfile:
p = subprocess.Popen(run_cmd,
env=env,
stdout=subprocess.PIPE, # Capture standard output
stderr=subprocess.PIPE, # Capture standard error
text=True, # Ensure output is in text format, not bytes
)
for line in p.stdout:
print(line, end='') # Optional: real-time terminal output
if record_flag is True:
outfile.write(line)
outfile.flush()
p.wait() # Wait for subprocess to end
_, stderr = p.communicate()
if stderr:
print_msg(f"{stderr}", 'subprocess error/warning', comm)
print_msg(f"Subprocess completed", "notice", comm)