Apache Airflow
Apache Airflow — это популярная платформа для оркестрации задач и управления workflow, написанная на Python. Она позволяет создавать, планировать и мониторить сложные workflows, которые могут включать множество задач и зависимостей. Airflow особенно популярен в области обработки данных, ETL-процессов (Extract, Transform, Load) и автоматизации задач.
Установка Apache Airflow
1. Установка Airflow
Для начала установите Apache Airflow с помощью pip
. Рекомендуется использовать виртуальное окружение (например, venv
или conda
).
# Создайте виртуальное окружение (опционально)
python -m venv airflow_env
source airflow_env/bin/activate # Для Linux/Mac
# airflow_env\Scripts\activate # Для Windows
# Установите Apache Airflow
pip install apache-airflow
2. Инициализация базы данных
Airflow использует базу данных для хранения метаданных о задачах и workflows. После установки инициализируйте базу данных:
airflow db init
Эта команда создаст файл airflow.db
(по умолчанию используется SQLite) и необходимые таблицы.
3. Создание пользователя
Создайте пользователя для доступа к веб-интерфейсу Airflow:
airflow users create \
--username admin \
--firstname FirstName \
--lastname LastName \
--role Admin \
--email [email protected]
Вам будет предложено ввести пароль.
4. Запуск веб-сервера и планировщика
Запустите веб-интерфейс Airflow:
airflow webserver --port 8080
Веб-интерфейс будет доступен по адресу: http://localhost:8080.
Запустите планировщик (scheduler), который отвечает за выполнение задач:
airflow scheduler
Пример использования Apache Airflow с Python
1. Создание DAG (Directed Acyclic Graph)
DAG — это набор задач, которые выполняются в определенном порядке. Создайте Python-файл для определения DAG.
Создайте папку dags
в директории, где находится airflow.cfg
(обычно это ~/airflow/dags
), и добавьте туда файл example_dag.py
:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
# Функция, которая будет выполняться как задача
def print_hello():
print("Hello, Airflow!")
# Определение DAG
default_args = {
'owner': 'admin',
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'example_dag', # Имя DAG
default_args=default_args,
description='A simple tutorial DAG',
schedule_interval=timedelta(days=1), # Запускать каждый день
start_date=datetime(2023, 10, 1), # Дата начала выполнения
catchup=False, # Не выполнять пропущенные запуски
)
# Задача 1: Выполнить bash-команду
task1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag,
)
# Задача 2: Вызвать Python-функцию
task2 = PythonOperator(
task_id='print_hello',
python_callable=print_hello,
dag=dag,
)
# Задача 3: Выполнить еще одну bash-команду
task3 = BashOperator(
task_id='sleep',
bash_command='sleep 5',
dag=dag,
)
# Определение порядка выполнения задач
task1 >> task2 >> task3
2. Запуск DAG
- Перейдите в веб-интерфейс Airflow: http://localhost:8080.
- Найдите ваш DAG (
example_dag
) в списке. - Включите DAG, переключив переключатель слева от его имени.
- Запустите DAG вручную, нажав на кнопку «Trigger Dag».
3. Мониторинг выполнения
В веб-интерфейсе вы можете отслеживать выполнение задач:
- Graph View: Визуализация зависимостей между задачами.
- Tree View: Статус выполнения задач для каждого запуска.
- Logs: Логи выполнения задач для отладки.
Основные компоненты Airflow
- DAG (Directed Acyclic Graph):
- Описывает workflow как набор задач и их зависимостей.
- Задачи выполняются в определенном порядке, без циклов.
- Operators:
- Операторы определяют, что именно делает задача. Например:
BashOperator
: Выполняет bash-команду.PythonOperator
: Выполняет Python-функцию.EmailOperator
: Отправляет email.Sensor
: Ожидает выполнения определенного условия.
- Tasks:
- Конкретные экземпляры операторов в DAG.
- Scheduler:
- Отвечает за планирование и запуск задач.
- Web Interface:
- Веб-интерфейс для управления и мониторинга DAG.
Полезные команды Airflow
- Проверка синтаксиса DAG:
airflow dags list
- Тестирование задачи:
airflow tasks test example_dag print_hello 2023-10-01
- Очистка логов:
airflow db clean
Заключение
Apache Airflow — это мощный инструмент для оркестрации задач и управления workflow. Он особенно полезен для автоматизации ETL-процессов, обработки данных и других сложных задач. С помощью Python вы можете легко создавать и управлять DAG, а веб-интерфейс Airflow предоставляет удобные инструменты для мониторинга и отладки.