# -*- coding: utf-8 -*-
"""\
Caelus Tasks Manager
----------------------
"""
import os
import glob
import logging
import shutil
from collections import OrderedDict
import six
from ..utils import osutils
from ..utils.struct import Struct
from . import core as run_cmds
from ..post.logs import SolverLog
from ..post.plots import CaelusPlot
from ..config import cmlenv
from ..io import dictfile as cmlio
from .cmd import CaelusCmd
from .hpc_queue import python_execute
_lgr = logging.getLogger(__name__)
[docs]@six.add_metaclass(TasksMeta)
class Tasks(object):
"""Caelus Tasks.
Tasks provides a simple automated workflow interface that provides various
pre-defined actions via a YAML file interface.
The tasks are defined as methods with a ``cmd_`` prefix and are
automaticaly converted to task names. Users can create additional tasks by
subclassing and adding additional methods with ``cmd_`` prefix. These
methods accept one argument ``options``, a dictionary containing parameters
provided by the user for that particular task.
"""
def __init__(self):
#: List of tasks that must be performed
self.tasks = []
#: File that was used to load tasks
self.task_file = "None"
#: Directory where the tasks are to be executed
self.case_dir = None
#: Caelus environment used when executing tasks
self.env = None
self.dep_job_id = None
self.task_set_count = 0
[docs] @classmethod
def load(cls,
task_file="caelus_tasks.yaml",
task_node="tasks"):
"""Load tasks from a YAML file.
If ``exedir is None`` then the execution directory is set to the
directory where the tasks file is found.
Args:
task_file (filename): Path to the YAML file
"""
self = cls.__new__(cls)
absfile = osutils.abspath(task_file)
act_file = Struct.load_yaml(absfile)
if "tasks" not in act_file:
raise KeyError("Cannot find tasks list in file: " +
task_file)
self.tasks = act_file[task_node]
self.task_file = absfile
_lgr.info("Loaded tasks from: %s", absfile)
return self
def __call__(self, case_dir=None, env=None):
"""Execute the tasks
Args:
case_dir: Absolute path to the case directory (default: CWD)
env (CMLEnv): Environment used for the runs
"""
self._validate_tasks()
self.case_dir = case_dir or os.getcwd()
self.case_dir = osutils.abspath(self.case_dir)
self.env = env or cmlenv.cml_get_version()
self.dep_job_id = None
self.task_set_count = 0
self.used_job_scheduler = False
act_map = self.task_map
num_tasks = len(self.tasks)
_lgr.info("Begin executing tasks in %s", self.case_dir)
with osutils.set_work_dir(self.case_dir):
for act in self.tasks:
for key in act:
act_map[key](self, act[key])
_lgr.info("Successfully executed %d tasks in %s",
num_tasks, self.case_dir)
def _validate_tasks(self):
"""Validate tasks provided by the user before executing"""
invalid_tasks = []
for act in self.tasks:
for key in act:
if key not in self.task_map:
invalid_tasks.append(key)
if invalid_tasks:
print("Invalid tasks detected: ")
for act in invalid_tasks:
print(" - " + act)
print("Valid tasks are: ")
for key, value in self.task_map.items():
docstr = value.__doc__
desc = (docstr.strip().split("\n")[0]
if docstr else "No help description.")
print(" - %s - %s"%(key, desc))
raise RuntimeError("Invalid tasks provided")
[docs] def cmd_run_command(self, options):
"""Execute a Caelus CML binary.
This method is an interface to :class:`CaelusCmd`
"""
cml_exe = options.cmd_name
log_file = options.get("log_file", None)
cml_cmd = CaelusCmd(cml_exe,
casedir=self.case_dir,
cml_env=self.env,
output_file=log_file)
parallel = options.get("parallel", False)
cml_cmd.cml_exe_args = options.get("cmd_args", "")
cml_cmd.parallel = parallel
if parallel:
cml_cmd.num_mpi_ranks = options.get(
"num_ranks", run_cmds.get_mpi_size(self.case_dir))
cml_cmd.mpi_extra_args = options.get(
"mpi_extra_args", "")
if "queue_settings" in options:
cml_cmd.runner.update(options["queue_settings"])
_lgr.info("Executing command: %s", cml_exe)
job_dep = [self.dep_job_id] if self.dep_job_id else None
status = cml_cmd(job_dependencies=job_dep)
self.dep_job_id = cml_cmd.job_id
self.used_job_scheduler = cml_cmd.runner.is_job_scheduler()
if status != 0:
raise RuntimeError("Error executing command: %s"%cml_exe)
[docs] def cmd_run_python(self, options):
"""Execute a python script"""
pyscript = options.script
pysfull = osutils.abspath(pyscript)
pyargs = options.get("script_args", "")
pylog = options.get("log_file", None)
log_to_file = options.get("log_to_file", True)
if not osutils.path_exists(pysfull):
raise FileNotFoundError("Python file not found: %s", pyscript)
status = python_execute(
pysfull, pyargs, env=self.env, log_file=pylog,
log_to_file=log_to_file)
if status != 0:
raise RuntimeError(
"Error executing python script: %s"%pyscript)
[docs] def cmd_copy_files(self, options):
"""Copy given file(s) to the destination."""
srcfiles = glob.glob(options.src)
dest = options.dest
if not srcfiles:
raise RuntimeError(
"Error src pattern %s returns no files", options.src)
if len(srcfiles) > 1:
osutils.ensure_directory(dest)
for srcfile in srcfiles:
shutil.copy2(srcfile, dest)
[docs] def cmd_copy_tree(self, options):
"""Recursively copy a given directory to the destination."""
srcdir = options.src
destdir = options.dest
ignore_pat = options.get("ignore_patterns", None)
symlinks = options.get("preserve_symlinks", False)
ignore_func = None
if ignore_pat:
ignore_func = shutil.ignore_patterns(*ignore_pat)
osutils.copy_tree(srcdir, destdir,
symlinks=symlinks, ignore_func=ignore_func)
[docs] def cmd_clean_case(self, options):
"""Clean a case directory"""
purge_all = options.get("purge_all", False)
purge_generated = options.get("purge_generated", purge_all)
remove_zero = options.get("remove_zero", purge_all)
remove_mesh = options.get("remove_mesh", purge_all)
remove_times = options.get("remove_time_dirs", purge_generated)
remove_processors = options.get("remove_processor", purge_generated)
preserve_extra = options.get("preserve", None)
remove_extra = options.get("remove_extra", None)
_lgr.info("Cleaning case directory: %s", self.case_dir)
run_cmds.clean_casedir(self.case_dir,
preserve_zero=(not remove_zero),
preserve_times=(not remove_times),
preserve_processors=(not remove_processors),
purge_mesh=remove_mesh,
preserve_extra=preserve_extra)
if remove_extra:
osutils.remove_files_dirs(remove_extra, self.case_dir)
[docs] def cmd_process_logs(self, options):
"""Process logs for a case"""
log_file = options.log_file
lgfile = os.path.join(self.case_dir, log_file)
if self.used_job_scheduler and not os.path.exists(lgfile):
_lgr.info("Skipping process_logs; job submitted on scheduler")
return
logs_dir = options.get("logs_directory", "logs")
_lgr.info("Processing log file: %s", log_file)
clog = SolverLog(
case_dir=self.case_dir,
logs_dir=logs_dir,
logfile=log_file)
do_plots = options.get("plot_residuals", None)
if do_plots:
plot_file = options.get("residuals_plot_file", "residuals.png")
fields = options.get("residuals_fields", clog.fields)
cerrors = options.get("plot_continuity_errors", False)
plot = CaelusPlot(self.case_dir)
dname, fname = os.path.split(plot_file)
plot.plotdir = dname or os.getcwd()
plot.solver_log = clog
plot.plot_continuity_errors = cerrors
plot.plot_residuals_hist(plotfile=fname, fields=fields)
_lgr.info("Residual time history saved to %s", plot_file)
try:
with osutils.set_work_dir(self.case_dir):
cname = os.path.basename(self.case_dir)
with open(cname+".foam", 'w') as fh:
fh.write(" ")
except IOError:
_lgr.warning("Error creating .foam file")
[docs] def cmd_exec_tasks(self, options):
"""Execute another task file"""
task_file = options.task_file
casedir = os.path.dirname(task_file)
tasks = Tasks.load(task_file)
_lgr.info("Executing tasks from file: %s", task_file)
tasks(case_dir=casedir, env=self.env)
[docs] def cmd_task_set(self, options):
"""A subset of tasks for grouping"""
self.task_set_count += 1
name = options.get("name", "Task set #%d"%self.task_set_count)
casedir = osutils.abspath(options.case_dir)
_lgr.info("Executing task set: %s", name)
tasks = Tasks()
tasks.tasks = options.tasks
tasks.task_file = self.task_file
tasks(case_dir=casedir, env=self.env)