import os
import threading
from collections import OrderedDict, defaultdict, deque
from time import perf_counter
from typing import Union, Any
from concurrent.futures import ThreadPoolExecutor, as_completed
from floatcsep.infrastructure.environments import DockerManager
import logging
log = logging.getLogger("floatLogger")
[docs]
class Task:
"""
Represents a unit of work to be executed later as part of a task graph.
A Task wraps an object instance, a method, and its arguments to allow for deferred
execution. This is useful in workflows where tasks need to be executed in a specific order,
often dictated by dependencies on other tasks.
For instance, can wrap a floatcsep.model.Model, its method 'create_forecast' and the
argument 'time_window', which can be executed later with Task.call() when, for example,
task dependencies (parent nodes) have been completed.
Args:
instance (object): The instance whose method will be executed later.
method (str): The method of the instance that will be called.
**kwargs: Arguments to pass to the method when it is invoked.
"""
def __init__(self, instance: object, method: str, **kwargs):
self.obj = instance
self.method = method
self.kwargs = kwargs
self.store = None # In-case the returned object by the call is required as output
[docs]
def sign_match(self, obj: Union[object, str] = None, meth: str = None, kw_arg: Any = None):
"""
Checks whether the task matches a given function signature.
This method is used to verify if a task belongs to a given object, method, or if it
uses a specific keyword argument. Useful for identifying tasks in a graph based on
partial matches of their attributes.
Args:
obj: The object instance or its name (str) to match against.
meth: The method name to match against.
kw_arg: A specific keyword argument value to match against in the task's arguments.
Returns:
bool: True if the task matches the provided signature, False otherwise.
"""
if self.obj == obj or obj == getattr(self.obj, "name", None):
if meth == self.method:
if kw_arg in self.kwargs.values():
return True
return False
def __str__(self):
"""
Returns a string representation of the task, including the instance name, method, and
arguments. Useful for debugging purposes.
Returns:
str: A formatted string describing the task.
"""
task_str = f"\tClass: {self.obj.__class__.__name__}\n"
a = getattr(self.obj, "name", None)
if a:
task_str += f"\tName: {a}\n"
task_str += f"\tMethod: {self.method}\n"
for i, j in self.kwargs.items():
try:
if isinstance(j, list):
task_str += f"\t\t{i}: {[k.name for k in j]} \n"
else:
task_str += f"\t\t{i}: {j.name} \n"
except AttributeError:
task_str += f"\t\t{i}: {j} \n"
return task_str[:-2]
[docs]
def run(self):
"""
Executes the task by calling the method on the object instance with the stored
arguments. If the instance has a `store` attribute, it will use that instead of the
instance itself. Once executed, the result is stored in the `store` attribute if any
output is produced.
Returns:
The output of the method execution, or None if the method does not return anything.
"""
if hasattr(self.obj, "store"):
self.obj = self.obj.store
output = getattr(self.obj, self.method)(**self.kwargs)
return output
[docs]
class TaskGraph:
"""
Context manager of floatcsep workload distribution.
A TaskGraph is responsible for adding tasks, managing dependencies between tasks, and
executing tasks in the correct order. Tasks in the graph can depend on one another, and
the graph ensures that each task is run after all of its dependencies have been satisfied.
Contains a `Task` dictionary whose dict_keys are the Task to be executed with dict_values
as the Task's dependencies.
"""
def __init__(self) -> None:
"""
Initializes the TaskGraph with an empty task dictionary and task count.
"""
self.tasks = OrderedDict()
self._ntasks = 0
self.name = "floatcsep.infrastructure.engine.TaskGraph"
self.profiler = Profiler()
@property
def ntasks(self) -> int:
"""
Returns the number of tasks currently in the graph.
Returns:
int: The total number of tasks in the graph.
"""
return self._ntasks
@ntasks.setter
def ntasks(self, n):
self._ntasks = n
[docs]
def add(self, task: Task):
"""
Adds a new task to the task graph.
The task is added to the dictionary of tasks with no dependencies by default.
Args:
task (Task): The task to be added to the graph.
"""
self.tasks[task] = []
self.ntasks += 1
[docs]
def add_dependency(
self, task, dep_inst: Union[object, str] = None, dep_meth: str = None, dkw: Any = None
):
"""
Adds a dependency to a task already within the graph.
Searches for other tasks within the graph whose signature matches the provided
object instance, method name, or keyword argument. Any matches are added as
dependencies to the provided task.
Args:
task (Task): The task to which dependencies will be added.
dep_inst: The object instance or name of the dependency.
dep_meth: The method name of the dependency.
dkw: A specific keyword argument value of the dependency.
Returns:
None
"""
deps = []
for i, other_tasks in enumerate(self.tasks.keys()):
if other_tasks.sign_match(dep_inst, dep_meth, dkw):
deps.append(other_tasks)
self.tasks[task].extend(deps)
def _build_dependency_maps(self): #
"""Return indegree and dependents maps for current tasks."""
indegree = {t: 0 for t in self.tasks}
dependents = defaultdict(list)
for t, deps in self.tasks.items():
indegree[t] = len(deps)
for d in deps:
dependents[d].append(t)
return indegree, dependents
def _run_task(self, task):
"""Execute a single task and record its duration"""
t0 = perf_counter()
try:
return task.run()
finally:
dt_ms = (perf_counter() - t0) * 1000.0
self.profiler.record(task, dt_ms)
[docs]
def run(self):
"""
Executes in sequential all tasks in the task graph according to the order set in
Experiment.set_tasks().
Iterates over each task in the graph and runs it after its dependencies have been
resolved.
Returns:
None
"""
log.info(f"[Engine] Running {self.ntasks} tasks.")
try:
self.profiler.begin(mode="sequential", ntasks=self.ntasks)
for task, deps in self.tasks.items():
log.debug(f"[Engine] Running task: \n{task}")
self._run_task(task)
log.debug(f"[Engine] Done")
except KeyboardInterrupt:
log.warning("[Engine] Keyboard Interrupt")
try:
DockerManager.kill_containers(label_key="model_timewindow")
except Exception as e:
log.error(f"[Engine] Cleanup failed: {e}")
finally:
log.warning("[Engine] Exiting after cleanup.")
os._exit(130)
finally:
self.profiler.end()
[docs]
def run_parallel(self, max_workers: int):
indegree, dependents = self._build_dependency_maps()
ready = deque([t for t, deg in indegree.items() if deg == 0])
log.info(
f"[Engine] Running {self.ntasks} tasks in parallel (max_workers={max_workers})"
)
running = {}
completed = 0
def submit_task(executor, task):
log.debug(f"[Engine] Submit \n{task}")
fut = executor.submit(self._run_task, task) # note: TaskGraph still executes tasks
running[fut] = task
with ThreadPoolExecutor(max_workers=max_workers) as ex:
try:
self.profiler.begin(
mode=f"parallel (max_workers={max_workers})", ntasks=self.ntasks
)
while ready and len(running) < max_workers:
submit_task(ex, ready.popleft())
while running:
for fut in as_completed(list(running.keys()), timeout=None):
task = running.pop(fut)
try:
fut.result()
log.debug(f"[Engine] Done \n{task}")
except Exception as e:
log.error(f"[Engine] Fail \n{task}: {e}")
completed += 1
for dep in dependents[task]:
indegree[dep] -= 1
if indegree[dep] == 0:
ready.append(dep)
while ready and len(running) < max_workers:
submit_task(ex, ready.popleft())
except KeyboardInterrupt:
log.warning("[Engine] Keyboard Interrupt")
try:
DockerManager.kill_containers(label_key="model_timewindow")
except Exception as e:
log.error(f"[Engine] Cleanup failed: {e}")
finally:
log.warning("[Engine] Exiting after cleanup.")
os._exit(130)
finally:
self.profiler.end()
class Profiler:
"""Collect per-task timings and emit one clean summary at session end."""
def __init__(self):
self._lock = threading.Lock()
self._groups = {}
self._t0 = 0.0
self._mode = ""
self._ntasks = 0
def begin(self, mode: str, ntasks: int) -> None:
self._mode = mode
self._ntasks = ntasks
self._groups.clear()
self._t0 = perf_counter()
def end(self) -> None:
total_wall = perf_counter() - self._t0
log.info(
f"[Engine] Calculation in {self._mode} completed | Total time: {fmt_wall(total_wall)} | Tasks: {self._ntasks}"
)
if not self._groups or not log.isEnabledFor(logging.DEBUG):
return
items = sorted(self._groups.items(), key=lambda kv: kv[1]["ms"], reverse=True)
total_ms = sum(v["ms"] for v in self._groups.values()) or 1.0
def _fmt(ms: float) -> str:
return f"{ms / 1000:.2f}s" if ms >= 1000 else f"{ms:.0f}ms"
log.debug("[Engine] Breakdown by group:")
log.debug("[Engine] Task: Share | N Tasks | Mean t | Max t | Total t")
log.debug("[Engine] ------------------------------------------------")
for name, stats in items:
cnt = stats["count"]
ms = stats["ms"]
mx = stats.get("max_ms", 0.0)
avg = (ms / cnt) if cnt else 0.0
share = 100.0 * ms / total_ms
log.debug(
f"[Engine] {name}: {share:.0f}% | {cnt}x | {_fmt(avg)} | {_fmt(mx)} | {_fmt(ms)}"
)
def record(self, task, dt_ms: float) -> None:
"""Record a single task duration (in ms) under its group."""
g = self.group_for(task)
with self._lock:
entry = self._groups.setdefault(g, {"count": 0, "ms": 0.0, "max_ms": 0.0})
entry["count"] += 1
entry["ms"] += dt_ms
if dt_ms > entry["max_ms"]:
entry["max_ms"] = dt_ms
@staticmethod
def group_for(task) -> str:
"""Map task.object type to the requested group name."""
cls = getattr(task.obj, "__class__", type(task.obj)).__name__
if cls == "CatalogRepository":
return "Catalogs"
if cls in ("Model", "TimeDependentModel", "TimeIndependentModel"):
name = getattr(task.obj, "name", "Model")
return f"Forecasts / {name}" # one-line change for finer buckets
if cls == "Evaluation":
return "Evaluations"
return "Other"
def fmt_wall(s: float) -> str:
mins, secs = divmod(s, 60)
hrs, mins = divmod(int(mins), 60)
if hrs:
return f"{hrs}h {mins}m {secs:.2f}s"
if mins:
return f"{mins}m {secs:.2f}s"
return f"{secs:.2f}s"