| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124 |
- import logging
- import time
- import datetime
- from pathlib import Path
- import yaml
- from itertools import groupby
- # 导入所有需要的组件
- from PFAL_SysControl.Utils.database import DatabaseManager
- from PFAL_SysControl.Utils.light_sampler import LightSampler
- from PFAL_SysControl.Utils.utils import load_config
- from PFAL_SysControl.Controller.cultivation_rack import CultivationRack
- class TaskRunner:
- """
- 管理和执行长期的、循环性的任务,例如数据采集和环境模拟。
- """
- def __init__(self, task_config_path='task.yml', main_config_path='config.yml'):
- try:
- with open(task_config_path, 'r') as f:
- task_config = yaml.safe_load(f)
- self.db_params = task_config['database']
- self.tasks = task_config['tasks']
- self.main_config = load_config(main_config_path)
- except (FileNotFoundError, KeyError) as e:
- raise ValueError(f"加载配置失败或配置不完整: {e}")
-
- self._validate_tasks()
- def _validate_tasks(self):
- """验证任务列表,确保同一设备没有重叠的任务。"""
- logging.info("正在验证任务时间表...")
- # 按 rack_name 分组
- tasks_sorted_by_rack = sorted(self.tasks, key=lambda t: t['rack_name'])
- for rack_name, tasks in groupby(tasks_sorted_by_rack, key=lambda t: t['rack_name']):
- rack_tasks = sorted(list(tasks), key=lambda t: t.get('start_date', ''))
-
- # 检查重叠
- for i in range(len(rack_tasks) - 1):
- # 确保任务有起止日期
- if 'end_date' not in rack_tasks[i] or 'start_date' not in rack_tasks[i+1]:
- continue
- task1_end = datetime.datetime.fromisoformat(rack_tasks[i]['end_date'])
- task2_start = datetime.datetime.fromisoformat(rack_tasks[i+1]['start_date'])
- if task1_end >= task2_start:
- raise ValueError(
- f"任务时间重叠: 设备 '{rack_name}' 的任务 '{rack_tasks[i]['name']}' "
- f"(结束于 {task1_end}) 与 '{rack_tasks[i+1]['name']}' (开始于 {task2_start}) 存在冲突。"
- )
- logging.info("任务验证成功,没有时间重叠。")
- def run(self):
- """启动并按顺序执行所有已定义的任务。"""
- sorted_tasks = sorted(self.tasks, key=lambda t: t.get('start_date', ''))
-
- logging.info(f"任务运行器已启动,共找到 {len(sorted_tasks)} 个待处理任务。")
- if not sorted_tasks:
- logging.warning("没有定义任何任务,程序退出。")
- return
- db_manager = None
- try:
- db_manager = DatabaseManager(self.db_params)
-
- for task in sorted_tasks:
- # --- 任务等待阶段 ---
- start_time = datetime.datetime.fromisoformat(task['start_date'])
- end_time = datetime.datetime.fromisoformat(task['end_date'])
- now = datetime.datetime.now()
- if now > end_time:
- logging.warning(f"任务 '{task['name']}' 的结束时间已过,已跳过。")
- continue
- if now < start_time:
- sleep_duration = (start_time - now).total_seconds()
- logging.info(f"下一个任务 '{task['name']}' 将在 {start_time.strftime('%Y-%m-%d %H:%M')} 开始。系统将休眠 {sleep_duration / 3600:.2f} 小时。")
- time.sleep(sleep_duration)
-
- # --- 任务执行阶段 ---
- logging.info(f"*** 任务 '{task['name']}' 开始执行 (目标设备: {task['rack_name']}) ***")
- interval = datetime.timedelta(hours=task['interval_hours'])
-
- # 初始化此任务所需的组件
- rack = CultivationRack(task['rack_name'], self.main_config, db_manager)
- # 从任务配置中解析光周期为 time 对象,如果未定义则使用 None
- start_time_str = task.get('light_period_start')
- end_time_str = task.get('light_period_end')
- start_time_obj, end_time_obj = None, None
- try:
- if start_time_str:
- start_time_obj = datetime.datetime.strptime(start_time_str, '%H:%M').time()
- if end_time_str:
- end_time_obj = datetime.datetime.strptime(end_time_str, '%H:%M').time()
- except ValueError:
- logging.warning(f"无法解析任务 '{task['name']}' 中的光周期时间 '{start_time_str}'/'{end_time_str}'。将使用默认值。")
- start_time_obj, end_time_obj = None, None
- light_sampler = LightSampler(
- light_ids=rack.light_system.get_all_light_ids(),
- light_period_start=start_time_obj,
- light_period_end=end_time_obj
- )
- # 为 rack 的 camera_system 补全依赖
- rack.camera_system.light_sampler = light_sampler
- while datetime.datetime.now() < end_time:
- try:
- rack.capture_all_photos(task['name'])
- except Exception as e:
- logging.error(f"任务 '{task['name']}' 的一次循环执行失败: {e}", exc_info=True)
-
- logging.info(f"当前循环结束。将休眠 {interval.total_seconds() / 3600:.2f} 小时。")
- time.sleep(interval.total_seconds())
-
- logging.info(f"*** 任务 '{task['name']}' 已完成 ***")
- except Exception as e:
- logging.error(f"任务运行器发生严重错误: {e}", exc_info=True)
- finally:
- if db_manager:
- db_manager.close()
- logging.info("所有任务已处理完毕,任务运行器已停止。")
|