Source code for caelus.run.core

# -*- coding: utf-8 -*-

"""\
CML Execution Utilities
-----------------------
"""

import os
import shutil
import logging
import glob
from ..utils import osutils

_lgr = logging.getLogger(__name__)

[docs]def is_caelus_casedir(root=None): """Check if the path provided looks like a case directory. A directory is determined to be an OpenFOAM/Caelus case directory if the ``system``, ``constant``, and ``system/controlDict`` exist. No check is performed to determine whether the case directory will actually run or if a mesh is present. Args: root (path): Top directory to start traversing (default: CWD) """ casedir_entries = ["constant", "system", os.path.join("system", "controlDict")] cdir = os.getcwd() if root is None else root return all(os.path.exists(os.path.join(cdir, d)) for d in casedir_entries)
[docs]def find_case_dirs(basedir): """Recursively search for case directories existing in a path. Args: basedir (path): Top-level directory to traverse Yields: Absolute path to the case directory """ absdir = osutils.abspath(basedir) # is the root directory itself a case directory? if is_caelus_casedir(absdir): yield absdir else: for root, dirs, _ in os.walk(absdir): for d in list(dirs): cdir = os.path.join(root, d) if is_caelus_casedir(cdir): dirs.remove(d) yield cdir
[docs]def find_caelus_recipe_dirs( basedir, action_file="caelus_tasks.yaml"): """Return case directories that contain action files. A case directory with action file is determined if the directory succeeds checks in :func:`is_caelus_dir` and also contains the action file specified by the user. Args: basedir (path): Top-level directory to traverse action_file (filename): Default is ``caelus_tasks.yaml`` Yields: Path to the case directory with action files """ for cdir in find_case_dirs(basedir): if os.path.exists(os.path.join(cdir, action_file)): yield cdir
[docs]def find_recipe_dirs(basedir, action_file="caelus_tasks.yaml"): """Return directories that contain the action files This behaves differently than :func:`find_caelus_recipe_dirs` in that it doesn't require a valid case directory. It assumes that the case directories are sub-directories and this task file acts on multiple directories. Args: basedir (path): Top-level directory to traverse action_file (filename): Default is ``caelus_tasks.yaml`` Yields: Path to the case directory with action files """ absdir = osutils.abspath(basedir) for root, dirs, _ in os.walk(absdir): if os.path.exists(os.path.join(root, action_file)): for dname in list(dirs): dirs.remove(dname) yield root
[docs]def clean_polymesh(casedir, region=None, preserve_patterns=None): """Clean the polyMesh from the given case directory. Args: casedir (path): Path to the case directory region (str): Mesh region to delete preserve_patterns (list): Shell wildcard patterns of files to preserve """ ppatterns = ["blockMeshDict"] if preserve_patterns: ppatterns += preserve_patterns absdir = osutils.abspath(casedir) meshdir = (os.path.join(absdir, "constant", "polyMesh") if region is None else (os.path.join(absdir, "constant", region, "polyMesh"))) if os.path.exists(meshdir): _lgr.debug("Cleaning polyMesh in %s", absdir) osutils.clean_directory(meshdir, ppatterns) else: _lgr.warning("No polyMesh directory %s; skipping clean_mesh", meshdir)
[docs]def clean_casedir(casedir, preserve_extra=None, preserve_zero=True, preserve_times=False, preserve_processors=False, purge_mesh=False): """Clean a Caelus case directory. Cleans files generated by a run. By default, this function will always preserve ``system``, ``constant``, and ``0`` directories as well as any YAML or python files. Additional files and directories can be preserved by using the ``preserve_extra`` option that accepts a list of shell wildcard patterns of files/directories that must be preserved. Args: casedir (path): Absolute path to a case directory. preserve_extra (list): List of shell wildcard patterns to preserve purge_mesh (bool): If true, also removes mesh from constant/polyMesh preserve_zero (bool): If False, removes the 0 directory preserve_times (bool): If False, removes the time directories preserve_processors (bool): If False, removes processor directories Raises: IOError: ``clean_casedir`` will refuse to remove files from a directory that is not a valid Caelus case directory. """ base_patterns = ["system", "constant", "*.yaml", "*.yml", "*.py", "*.job", "README*", "readme*", "cmlControls"] zero_pat = ["0"] if preserve_zero else [] time_pat = (["[1-9]*", "0.[0-9]*", "-[0-9]*"] if preserve_times else []) proc_pat = ["processor*"] if preserve_processors else [] extra_pat = preserve_extra if preserve_extra else [] ppatterns = (base_patterns + zero_pat + extra_pat + time_pat + proc_pat) absdir = osutils.abspath(casedir) if not is_caelus_casedir(absdir): raise IOError( "Not a valid case directory; refusing to perform destructive " "clean operation on %s"%absdir) _lgr.debug("Cleaning case directory: %s", absdir) osutils.clean_directory(absdir, ppatterns) if purge_mesh: clean_polymesh(absdir)
[docs]def clone_case(casedir, template_dir, copy_polymesh=True, copy_zero=True, copy_scripts=True, extra_patterns=None): """Clone a Caelus case directory. Args: casedir (path): Absolute path to new case directory. template_dir (path): Case directory to be cloned copy_polymesh (bool): Copy contents of constant/polyMesh to new case copy_zero (bool): Copy time=0 directory to new case copy_scripts (bool): Copy python and YAML files extra_patterns (list): List of shell wildcard patterns for copying Returns: path: Absolute path to the newly cloned directory Raises: IOError: If either the ``casedir`` exists or if the ``template_dir`` does not exist or is not a valid Caelus case directory. """ absdir = osutils.abspath(casedir) tmpl_dir = osutils.abspath(template_dir) if os.path.exists(absdir): raise IOError("Cannot overwrite existing file/directory: %s", absdir) if not (os.path.exists(tmpl_dir) and is_caelus_casedir(tmpl_dir)): raise IOError("Invalid Caelus case directory provided as template: %s", template_dir) default_ignore = ["[1-9]*", "0.[0-9]*", "-[0-9]*", "processor*", "lines", "surfaces", "probes*", "forces*", "sets", "VTK", "*.foam", "surfaceSampling", "postProcessing", "*.log", "log.*", "*logs", "*.job", "*.pdf", "*.png"] if not copy_zero: default_ignore += ["0"] if not copy_scripts: default_ignore += ["*.py", "*.yaml"] if not copy_polymesh: default_ignore += ["polyMesh"] if extra_patterns: default_ignore += extra_patterns ignore_func = shutil.ignore_patterns(*default_ignore) osutils.copy_tree(tmpl_dir, absdir, ignore_func=ignore_func) _lgr.info("Cloned directory: %s; template directory: %s", absdir, tmpl_dir) return absdir
[docs]def get_mpi_size(casedir): """Determine the number of MPI ranks to run""" #TODO: Implement decomposeParDict options. How do we handle #redistributePar? with osutils.set_work_dir(casedir): return len(glob.glob("processor*"))