Answer a question

What does this error mean? I have a simple airflow setup where I ran the airflow helm chart on a local kind/kubernetes cluster with the CeleryKubernetesExecutor

│ scheduler [2022-05-25 06:55:40,086] {dagrun.py:648} WARNING - Failed to get task '<TaskInstance: tutorial.generated-task-13 manual__2022 │
│ scheduler [2022-05-25 06:55:40,086] {dagrun.py:648} WARNING - Failed to get task '<TaskInstance: tutorial.generated-task-17 manual__2022 │
│ scheduler [2022-05-25 06:55:40,207] {scheduler_job.py:1347} WARNING - Failing (6) jobs without heartbeat after 2022-05-25 06:50:40.19719 │
│ scheduler [2022-05-25 06:55:40,207] {scheduler_job.py:1355} ERROR - Detected zombie job: {'full_filepath': '/opt/airflow/dags/repo/dags/ │
│ scheduler [2022-05-25 06:55:40,208] {scheduler_job.py:753} ERROR - Exception when executing SchedulerJob._run_scheduler_loop             │
│ scheduler Traceback (most recent call last):                                                                                             │
│ scheduler   File "/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 736, in _execute                 │
│ scheduler     self._run_scheduler_loop()                                                                                                 │
│ scheduler   File "/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 836, in _run_scheduler_loop      │
│ scheduler     next_event = timers.run(blocking=False)                                                                                    │
│ scheduler   File "/usr/local/lib/python3.7/sched.py", line 151, in run                                                                   │
│ scheduler     action(*argument, **kwargs)                                                                                                │
│ scheduler   File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/event_scheduler.py", line 36, in repeat                 │
│ scheduler     action(*args, **kwargs)                                                                                                    │
│ scheduler   File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/session.py", line 71, in wrapper                        │
│ scheduler     return func(*args, session=session, **kwargs)                                                                              │
│ scheduler   File "/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 1356, in _find_zombies           │
│ scheduler     self.executor.send_callback(request)                                                                                       │
│ scheduler AttributeError: 'CeleryKubernetesExecutor' object has no attribute 'send_callback'                                             │
│ scheduler [2022-05-25 06:55:40,218] {kubernetes_executor.py:807} INFO - Shutting down Kubernetes executor                                │
│ scheduler [2022-05-25 06:55:41,237] {process_utils.py:129} INFO - Sending Signals.SIGTERM to group 64. PIDs of all processes in the grou │
│ scheduler [2022-05-25 06:55:41,237] {process_utils.py:80} INFO - Sending the signal Signals.SIGTERM to group 64                          │
│ scheduler [2022-05-25 06:55:41,409] {process_utils.py:75} INFO - Process psutil.Process(pid=64, status='terminated', exitcode=0, started │
│ scheduler [2022-05-25 06:55:41,410] {scheduler_job.py:765} INFO - Exited execute loop                                                    │
│ scheduler Traceback (most recent call last):                                                                                             │
│ scheduler   File "/home/airflow/.local/bin/airflow", line 8, in <module>                                                                 │
│ scheduler     sys.exit(main())                                                                                                           │
│ scheduler   File "/home/airflow/.local/lib/python3.7/site-packages/airflow/__main__.py", line 38, in main                                │
│ scheduler     args.func(args)                                                                                                            │
│ scheduler   File "/home/airflow/.local/lib/python3.7/site-packages/airflow/cli/cli_parser.py", line 51, in command                       │
│ scheduler     return func(*args, **kwargs)                                                                                               │
│ scheduler   File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/cli.py", line 99, in wrapper                            │
│ scheduler     return f(*args, **kwargs)                                                                                                  │
│ scheduler   File "/home/airflow/.local/lib/python3.7/site-packages/airflow/cli/commands/scheduler_command.py", line 75, in scheduler     │
│ scheduler     _run_scheduler_job(args=args)                                                                                              │
│ scheduler   File "/home/airflow/.local/lib/python3.7/site-packages/airflow/cli/commands/scheduler_command.py", line 46, in _run_schedule │
│ scheduler     job.run()                                                                                                                  │
│ scheduler   File "/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/base_job.py", line 244, in run                           │
│ scheduler     self._execute()                                                                                                            │
│ scheduler   File "/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 736, in _execute                 │
│ scheduler     self._run_scheduler_loop()                                                                                                 │
│ scheduler   File "/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 836, in _run_scheduler_loop      │
│ scheduler     next_event = timers.run(blocking=False)                                                                                    │
│ scheduler   File "/usr/local/lib/python3.7/sched.py", line 151, in run                                                                   │
│ scheduler     action(*argument, **kwargs)                                                                                                │
│ scheduler   File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/event_scheduler.py", line 36, in repeat                 │
│ scheduler     action(*args, **kwargs)                                                                                                    │
│ scheduler   File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/session.py", line 71, in wrapper                        │
│ scheduler     return func(*args, session=session, **kwargs)                                                                              │
│ scheduler   File "/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 1356, in _find_zombies           │
│ scheduler     self.executor.send_callback(request)                                                                                       │
│ scheduler AttributeError: 'CeleryKubernetesExecutor' object has no attribute 'send_callback'                                             │

Isit erroring because I updated my dag definition and some tasks are now zombie/invalid? I suspect this is causing a crash tho "AttributeError: 'CeleryKubernetesExecutor' object has no attribute 'send_callback'", but whats wrong?

Answers

Seem to be an airflow issue in version 2.3.0

It is fixed in 2.3.1

Logo

K8S/Kubernetes社区为您提供最前沿的新闻资讯和知识内容

更多推荐