import logging import time import datetime from pathlib import Path import yaml from itertools import groupby # 导入所有需要的组件 from PFAL_SysControl.Utils.database import DatabaseManager from PFAL_SysControl.Utils.light_sampler import LightSampler from PFAL_SysControl.Utils.utils import load_config from PFAL_SysControl.Controller.cultivation_rack import CultivationRack class TaskRunner: """ 管理和执行长期的、循环性的任务,例如数据采集和环境模拟。 """ def __init__(self, task_config_path='task.yml', main_config_path='config.yml'): try: with open(task_config_path, 'r') as f: task_config = yaml.safe_load(f) self.db_params = task_config['database'] self.tasks = task_config['tasks'] self.main_config = load_config(main_config_path) except (FileNotFoundError, KeyError) as e: raise ValueError(f"加载配置失败或配置不完整: {e}") self._validate_tasks() def _validate_tasks(self): """验证任务列表,确保同一设备没有重叠的任务。""" logging.info("正在验证任务时间表...") # 按 rack_name 分组 tasks_sorted_by_rack = sorted(self.tasks, key=lambda t: t['rack_name']) for rack_name, tasks in groupby(tasks_sorted_by_rack, key=lambda t: t['rack_name']): rack_tasks = sorted(list(tasks), key=lambda t: t.get('start_date', '')) # 检查重叠 for i in range(len(rack_tasks) - 1): # 确保任务有起止日期 if 'end_date' not in rack_tasks[i] or 'start_date' not in rack_tasks[i+1]: continue task1_end = datetime.datetime.fromisoformat(rack_tasks[i]['end_date']) task2_start = datetime.datetime.fromisoformat(rack_tasks[i+1]['start_date']) if task1_end >= task2_start: raise ValueError( f"任务时间重叠: 设备 '{rack_name}' 的任务 '{rack_tasks[i]['name']}' " f"(结束于 {task1_end}) 与 '{rack_tasks[i+1]['name']}' (开始于 {task2_start}) 存在冲突。" ) logging.info("任务验证成功,没有时间重叠。") def run(self): """启动并按顺序执行所有已定义的任务。""" sorted_tasks = sorted(self.tasks, key=lambda t: t.get('start_date', '')) logging.info(f"任务运行器已启动,共找到 {len(sorted_tasks)} 个待处理任务。") if not sorted_tasks: logging.warning("没有定义任何任务,程序退出。") return db_manager = None try: db_manager = DatabaseManager(self.db_params) for task in sorted_tasks: # --- 任务等待阶段 --- start_time = datetime.datetime.fromisoformat(task['start_date']) end_time = datetime.datetime.fromisoformat(task['end_date']) now = datetime.datetime.now() if now > end_time: logging.warning(f"任务 '{task['name']}' 的结束时间已过,已跳过。") continue if now < start_time: sleep_duration = (start_time - now).total_seconds() logging.info(f"下一个任务 '{task['name']}' 将在 {start_time.strftime('%Y-%m-%d %H:%M')} 开始。系统将休眠 {sleep_duration / 3600:.2f} 小时。") time.sleep(sleep_duration) # --- 任务执行阶段 --- logging.info(f"*** 任务 '{task['name']}' 开始执行 (目标设备: {task['rack_name']}) ***") interval = datetime.timedelta(hours=task['interval_hours']) # 初始化此任务所需的组件 rack = CultivationRack(task['rack_name'], self.main_config, db_manager) # 从任务配置中解析光周期为 time 对象,如果未定义则使用 None start_time_str = task.get('light_period_start') end_time_str = task.get('light_period_end') start_time_obj, end_time_obj = None, None try: if start_time_str: start_time_obj = datetime.datetime.strptime(start_time_str, '%H:%M').time() if end_time_str: end_time_obj = datetime.datetime.strptime(end_time_str, '%H:%M').time() except ValueError: logging.warning(f"无法解析任务 '{task['name']}' 中的光周期时间 '{start_time_str}'/'{end_time_str}'。将使用默认值。") start_time_obj, end_time_obj = None, None light_sampler = LightSampler( light_ids=rack.light_system.get_all_light_ids(), light_period_start=start_time_obj, light_period_end=end_time_obj ) # 为 rack 的 camera_system 补全依赖 rack.camera_system.light_sampler = light_sampler while datetime.datetime.now() < end_time: try: rack.capture_all_photos(task['name']) except Exception as e: logging.error(f"任务 '{task['name']}' 的一次循环执行失败: {e}", exc_info=True) logging.info(f"当前循环结束。将休眠 {interval.total_seconds() / 3600:.2f} 小时。") time.sleep(interval.total_seconds()) logging.info(f"*** 任务 '{task['name']}' 已完成 ***") except Exception as e: logging.error(f"任务运行器发生严重错误: {e}", exc_info=True) finally: if db_manager: db_manager.close() logging.info("所有任务已处理完毕,任务运行器已停止。")