task_runner.py 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. import logging
  2. import time
  3. import datetime
  4. from pathlib import Path
  5. import yaml
  6. from itertools import groupby
  7. # 导入所有需要的组件
  8. from PFAL_SysControl.Utils.database import DatabaseManager
  9. from PFAL_SysControl.Utils.light_sampler import LightSampler
  10. from PFAL_SysControl.Utils.utils import load_config
  11. from PFAL_SysControl.Controller.cultivation_rack import CultivationRack
  12. class TaskRunner:
  13. """
  14. 管理和执行长期的、循环性的任务,例如数据采集和环境模拟。
  15. """
  16. def __init__(self, task_config_path='task.yml', main_config_path='config.yml'):
  17. try:
  18. with open(task_config_path, 'r') as f:
  19. task_config = yaml.safe_load(f)
  20. self.db_params = task_config['database']
  21. self.tasks = task_config['tasks']
  22. self.main_config = load_config(main_config_path)
  23. except (FileNotFoundError, KeyError) as e:
  24. raise ValueError(f"加载配置失败或配置不完整: {e}")
  25. self._validate_tasks()
  26. def _validate_tasks(self):
  27. """验证任务列表,确保同一设备没有重叠的任务。"""
  28. logging.info("正在验证任务时间表...")
  29. # 按 rack_name 分组
  30. tasks_sorted_by_rack = sorted(self.tasks, key=lambda t: t['rack_name'])
  31. for rack_name, tasks in groupby(tasks_sorted_by_rack, key=lambda t: t['rack_name']):
  32. rack_tasks = sorted(list(tasks), key=lambda t: t.get('start_date', ''))
  33. # 检查重叠
  34. for i in range(len(rack_tasks) - 1):
  35. # 确保任务有起止日期
  36. if 'end_date' not in rack_tasks[i] or 'start_date' not in rack_tasks[i+1]:
  37. continue
  38. task1_end = datetime.datetime.fromisoformat(rack_tasks[i]['end_date'])
  39. task2_start = datetime.datetime.fromisoformat(rack_tasks[i+1]['start_date'])
  40. if task1_end >= task2_start:
  41. raise ValueError(
  42. f"任务时间重叠: 设备 '{rack_name}' 的任务 '{rack_tasks[i]['name']}' "
  43. f"(结束于 {task1_end}) 与 '{rack_tasks[i+1]['name']}' (开始于 {task2_start}) 存在冲突。"
  44. )
  45. logging.info("任务验证成功,没有时间重叠。")
  46. def run(self):
  47. """启动并按顺序执行所有已定义的任务。"""
  48. sorted_tasks = sorted(self.tasks, key=lambda t: t.get('start_date', ''))
  49. logging.info(f"任务运行器已启动,共找到 {len(sorted_tasks)} 个待处理任务。")
  50. if not sorted_tasks:
  51. logging.warning("没有定义任何任务,程序退出。")
  52. return
  53. db_manager = None
  54. try:
  55. db_manager = DatabaseManager(self.db_params)
  56. for task in sorted_tasks:
  57. # --- 任务等待阶段 ---
  58. start_time = datetime.datetime.fromisoformat(task['start_date'])
  59. end_time = datetime.datetime.fromisoformat(task['end_date'])
  60. now = datetime.datetime.now()
  61. if now > end_time:
  62. logging.warning(f"任务 '{task['name']}' 的结束时间已过,已跳过。")
  63. continue
  64. if now < start_time:
  65. sleep_duration = (start_time - now).total_seconds()
  66. logging.info(f"下一个任务 '{task['name']}' 将在 {start_time.strftime('%Y-%m-%d %H:%M')} 开始。系统将休眠 {sleep_duration / 3600:.2f} 小时。")
  67. time.sleep(sleep_duration)
  68. # --- 任务执行阶段 ---
  69. logging.info(f"*** 任务 '{task['name']}' 开始执行 (目标设备: {task['rack_name']}) ***")
  70. interval = datetime.timedelta(hours=task['interval_hours'])
  71. # 初始化此任务所需的组件
  72. rack = CultivationRack(task['rack_name'], self.main_config, db_manager)
  73. # 从任务配置中解析光周期为 time 对象,如果未定义则使用 None
  74. start_time_str = task.get('light_period_start')
  75. end_time_str = task.get('light_period_end')
  76. start_time_obj, end_time_obj = None, None
  77. try:
  78. if start_time_str:
  79. start_time_obj = datetime.datetime.strptime(start_time_str, '%H:%M').time()
  80. if end_time_str:
  81. end_time_obj = datetime.datetime.strptime(end_time_str, '%H:%M').time()
  82. except ValueError:
  83. logging.warning(f"无法解析任务 '{task['name']}' 中的光周期时间 '{start_time_str}'/'{end_time_str}'。将使用默认值。")
  84. start_time_obj, end_time_obj = None, None
  85. light_sampler = LightSampler(
  86. light_ids=rack.light_system.get_all_light_ids(),
  87. light_period_start=start_time_obj,
  88. light_period_end=end_time_obj
  89. )
  90. # 为 rack 的 camera_system 补全依赖
  91. rack.camera_system.light_sampler = light_sampler
  92. while datetime.datetime.now() < end_time:
  93. try:
  94. rack.capture_all_photos(task['name'])
  95. except Exception as e:
  96. logging.error(f"任务 '{task['name']}' 的一次循环执行失败: {e}", exc_info=True)
  97. logging.info(f"当前循环结束。将休眠 {interval.total_seconds() / 3600:.2f} 小时。")
  98. time.sleep(interval.total_seconds())
  99. logging.info(f"*** 任务 '{task['name']}' 已完成 ***")
  100. except Exception as e:
  101. logging.error(f"任务运行器发生严重错误: {e}", exc_info=True)
  102. finally:
  103. if db_manager:
  104. db_manager.close()
  105. logging.info("所有任务已处理完毕,任务运行器已停止。")