При автоматизации Spark-задач в Airflow иногда может возникнуть ситуация, когда задача ещё не может правильно стартовать из-за временных условий:
- входные данные ещё не готовы
- таблица в базе не создана
- S3-бакет или какой-то из внешних API недоступен
Стандартный SparkSubmitOperator
в таких случаях завершится с ошибкой, что приведёт к:
- failed-таскам в DAG’е
- трате попыток retry
- засорению логов ненужными ошибками
Наиболее правильным решением такой проблемы может быть добавление перед запуском Spark специального сенсора, который проверит все необходимые условия и в случае, если что-то ещё не готово — Spark просто не будет запущен.
Однако в реальности мы можем столкнуться с ситуацией, когда не получается всё проверить до запуска
Spark-задачи и приходится надеяться на то, что либо она отработает успешно, либо хватит попыток
для перезапуска прежде, чем всё упадёт с ошибкой. В таком случае можно изменить процесс запуска
и заменить используемый SparkSubmitOperator
на сенсор.
Базовая реализация
За основу для нового сенсора можно взять тот же код, который используется в операторе для запуска Spark-задач, обработав возможные исключения:
from airflow.providers.apache.spark.hooks.spark_submit import SparkSubmitHook
from airflow.sensors.base import BaseSensorOperator
from airflow.utils.context import Context
class SparkSubmitSensor(BaseSensorOperator):
"""
Сенсор для запуска Spark-приложений.
"""
template_fields = ("application", "conn_id", "conf")
def __init__(
self,
*,
application: str,
conn_id: str = "spark_default",
conf: Optional[dict] = None,
poke_interval: int = 300, # по умолчанию — 5 мин между проверками
**kwargs
):
super().__init__(
poke_interval=poke_interval,
mode="reschedule",
**kwargs
)
self.application = application
self.conn_id = conn_id
self.conf = conf or {}
self._hook = None
def poke(self, context: Context) -> bool:
try:
if not self._hook:
self._hook = SparkSubmitHook(conn_id=self.conn_id, conf=self.conf)
self._hook.submit(self.application)
self.log.info("Spark application submitted successfully")
return True
except Exception as e:
self.log.warning(f"Submission delayed (reason: {e})")
return False
Как это работает:
- при падении Spark-приложения таск в Airflow будет помечен не как up to retry, а как up to reschedule и будет перезапущен через указанный промежуток времени
- интервал между попытками
poke_interval
можно гибко настраивать в зависимости от SLA
Использование
В рамках DAG’а использование сенсора почти не отличается от использования обычного оператора, лишь добавляется возможность настроить период между попытками запуска:
with DAG("process_data", schedule="@daily", ...) as dag:
submit_task = SparkSubmitSensor(
task_id="submit_spark_job",
application="/path/to/apps/processing.py",
conn_id="spark_cluster",
poke_interval=600 # проверять каждые 10 минут
)
Недостатки такой реализации
Главный компромисс при использовании SparkSubmitSensor
— он не различает временные проблемы и
реальные ошибки. В обоих случаях задача уходит в reschedule. Это создаёт риск «зависания» DAG’а в
случае, если в коде Spark-приложения ошибка (например, синтаксическая) — сенсор будет бесконечно
перезапускать задачу, вместо того чтобы сразу упасть. Задача при этом упадёт только по таймауту,
который может быть достаточно большим.
Доработка решения
Для того, чтобы можно было различать причины падения Spark-приложения, необходимо изменить логику работы нашего сенсора следующим образом:
- вместо использования
SparkSubmitHook
необходимо самостоятельно сформировать команду для запуска Spark-приложения, после чего запустить её отдельным процессом с отслеживанием вывода: - при запуске приложения в client-mode:
- читать и проверять поступаемый от процесса вывод
- в случае получения строк с ошибками — выбрасывать исключение с соответствующим текстом
- при запуске приложения в cluster-mode:
- получить из вывода процесса значение application_id для отслеживания приложения
- из того же вывода процесса получить ссылку на логи драйвера Spark-приложения
- после завершения работы Spark-приложения — скачать по полученной ссылке логи и разобрать их на предмет наличия исключений
Когда стоит использовать?
Реализация подобного решения целесообразна только в том случае, когда проверить предварительные условия до запуска Spark-приложения невозможно, либо ресурсы, которые будут потрачены на проверку больше, чем затраты на попытку запуска Spark.
Используйте базовую реализацию сенсора если:
- задержки входных данных (или другие нарушения предварительных условий) редки и кратковременны
- ресурсы для разработки и поддержки ограничены
Стоит выбирать решение с доработками:
- когда критично различать типы ошибок
- есть ресурсы на реализацию парсинга логов и прочего
Если нужна помощь в реализации такого оператора — пишите.