Apache Airflow는 Airbnb에서 개발하고 Apache 재단이 관리하는 오픈소스 워크플로우 오케스트레이션 플랫폼입니다. Python으로 DAG(Directed Acyclic Graph)를 정의하여 복잡한 데이터 파이프라인을 스케줄링하고 모니터링할 수 있습니다. 데이터 엔지니어링, ETL, ML 파이프라인 등 대규모 워크플로우 관리의 사실상 표준으로 자리잡았습니다. 이 가이드에서는 Docker를 활용한 다양한 Airflow 설치 방법을 소개합니다.
Apache Airflow란?
Apache Airflow는 2014년 Airbnb에서 시작되어 2016년 Apache 인큐베이터에 합류, 2019년 Top-Level 프로젝트로 승격된 워크플로우 관리 플랫폼입니다. “Workflow as Code” 철학으로, Python 코드로 워크플로우를 정의하여 버전 관리, 테스트, 협업이 가능합니다.
핵심 개념
| 용어 | 설명 |
|---|---|
| DAG | Directed Acyclic Graph – 태스크들의 의존성을 정의하는 워크플로우 |
| Task | DAG 내의 개별 작업 단위 |
| Operator | Task를 정의하는 템플릿 (BashOperator, PythonOperator 등) |
| Scheduler | DAG를 파싱하고 Task를 스케줄링 |
| Executor | Task를 실제로 실행하는 컴포넌트 |
| Webserver | 모니터링 및 관리를 위한 웹 UI |
왜 Airflow인가?
- 코드 기반 워크플로우: Python으로 정의하여 버전 관리, 테스트, 재사용 가능
- 강력한 스케줄링: Cron 표현식 + 의존성 기반 실행
- 확장성: LocalExecutor → CeleryExecutor → KubernetesExecutor로 확장
- 풍부한 UI: 실시간 모니터링, 로그 확인, 수동 트리거
- 광범위한 통합: AWS, GCP, Azure, Slack, DB 등 수백 개의 Provider 지원
- 활발한 커뮤니티: 10,000+ 기여자, 엔터프라이즈 검증
주요 기능
📊 DAG 시각화
- 워크플로우 그래프 뷰로 의존성 시각화
- 태스크별 실행 상태 실시간 확인
- Gantt 차트로 실행 시간 분석
⏰ 유연한 스케줄링
- Cron 표현식 지원
- Data-aware 스케줄링 (Airflow 2.4+)
- Backfill로 과거 데이터 처리
- SLA 모니터링
🔌 풍부한 통합
- 80+ Provider 패키지 (AWS, GCP, Azure, Databricks 등)
- 커스텀 Operator 개발 가능
- Hooks, Sensors, Transfers 지원
🔄 실행 모드 (Executor)
| Executor | 설명 | 사용 사례 |
|---|---|---|
| LocalExecutor | 단일 머신에서 병렬 실행 | 개발/소규모 프로덕션 |
| CeleryExecutor | 분산 워커로 수평 확장 | 대규모 프로덕션 |
| KubernetesExecutor | 태스크별 Pod 생성 | 클라우드 네이티브 |
사전 요구사항
- Docker 및 Docker Compose 설치
- 최소 4GB RAM (권장 8GB+)
- 최소 10GB 디스크 공간
- Docker Compose v2.14.0 이상
Docker Compose 설치 방법
방법 1: 공식 Docker Compose (CeleryExecutor)
공식 docker-compose.yaml을 다운로드합니다:
mkdir airflow && cd airflow
# 공식 Docker Compose 파일 다운로드
curl -LfO 'https://airflow.apache.org/docs/apache-airflow/stable/docker-compose.yaml'
# 필요한 디렉토리 생성
mkdir -p ./dags ./logs ./plugins ./config
# 환경 변수 파일 생성
echo "AIRFLOW_UID=$(id -u)" > .env
실행합니다:
# 데이터베이스 초기화
docker compose up airflow-init
# 서비스 시작
docker compose up -d
방법 2: LocalExecutor (경량 설치)
개발 및 소규모 환경에 적합한 경량 설정입니다:
version: '3.8'
x-airflow-common: &airflow-common
image: apache/airflow:2.10.0
environment: &airflow-common-env
AIRFLOW__CORE__EXECUTOR: LocalExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CORE__FERNET_KEY: ''
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session'
AIRFLOW__SCHEDULER__ENABLE_HEALTH_CHECK: 'true'
_PIP_ADDITIONAL_REQUIREMENTS: ''
volumes:
- ./dags:/opt/airflow/dags
- ./logs:/opt/airflow/logs
- ./plugins:/opt/airflow/plugins
- ./config:/opt/airflow/config
user: "${AIRFLOW_UID:-50000}:0"
depends_on:
postgres:
condition: service_healthy
services:
postgres:
image: postgres:15-alpine
container_name: airflow-postgres
environment:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflow
volumes:
- postgres_data:/var/lib/postgresql/data
healthcheck:
test: ["CMD", "pg_isready", "-U", "airflow"]
interval: 10s
timeout: 5s
retries: 5
restart: unless-stopped
airflow-init:
<<: *airflow-common
container_name: airflow-init
entrypoint: /bin/bash
command:
- -c
- |
mkdir -p /sources/logs /sources/dags /sources/plugins
chown -R "${AIRFLOW_UID}:0" /sources/{logs,dags,plugins}
exec /entrypoint airflow version
environment:
<<: *airflow-common-env
_AIRFLOW_DB_MIGRATE: 'true'
_AIRFLOW_WWW_USER_CREATE: 'true'
_AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-admin}
_AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-admin}
user: "0:0"
volumes:
- ./:/sources
airflow-webserver:
<<: *airflow-common
container_name: airflow-webserver
command: webserver
ports:
- "8080:8080"
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
interval: 30s
timeout: 10s
retries: 5
start_period: 30s
restart: unless-stopped
depends_on:
airflow-init:
condition: service_completed_successfully
airflow-scheduler:
<<: *airflow-common
container_name: airflow-scheduler
command: scheduler
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:8974/health"]
interval: 30s
timeout: 10s
retries: 5
start_period: 30s
restart: unless-stopped
depends_on:
airflow-init:
condition: service_completed_successfully
airflow-triggerer:
<<: *airflow-common
container_name: airflow-triggerer
command: triggerer
healthcheck:
test: ["CMD-SHELL", 'airflow jobs check --job-type TriggererJob --hostname "$${HOSTNAME}"']
interval: 30s
timeout: 10s
retries: 5
start_period: 30s
restart: unless-stopped
depends_on:
airflow-init:
condition: service_completed_successfully
volumes:
postgres_data:
.env 파일:
AIRFLOW_UID=1000
_AIRFLOW_WWW_USER_USERNAME=admin
_AIRFLOW_WWW_USER_PASSWORD=admin
실행합니다:
# 필요 디렉토리 생성
mkdir -p ./dags ./logs ./plugins ./config
# 초기화 및 시작
docker compose up airflow-init
docker compose up -d
방법 3: CeleryExecutor (프로덕션)
대규모 워크로드를 위한 분산 실행 환경입니다:
version: '3.8'
x-airflow-common: &airflow-common
image: apache/airflow:2.10.0
environment: &airflow-common-env
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
AIRFLOW__CORE__FERNET_KEY: ''
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session'
AIRFLOW__SCHEDULER__ENABLE_HEALTH_CHECK: 'true'
AIRFLOW__CELERY__WORKER_CONCURRENCY: 4
_PIP_ADDITIONAL_REQUIREMENTS: ''
volumes:
- ./dags:/opt/airflow/dags
- ./logs:/opt/airflow/logs
- ./plugins:/opt/airflow/plugins
- ./config:/opt/airflow/config
user: "${AIRFLOW_UID:-50000}:0"
depends_on:
redis:
condition: service_healthy
postgres:
condition: service_healthy
services:
postgres:
image: postgres:15-alpine
container_name: airflow-postgres
environment:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflow
volumes:
- postgres_data:/var/lib/postgresql/data
healthcheck:
test: ["CMD", "pg_isready", "-U", "airflow"]
interval: 10s
timeout: 5s
retries: 5
restart: unless-stopped
redis:
image: redis:7-alpine
container_name: airflow-redis
expose:
- 6379
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 10s
timeout: 5s
retries: 5
restart: unless-stopped
airflow-init:
<<: *airflow-common
container_name: airflow-init
entrypoint: /bin/bash
command:
- -c
- |
mkdir -p /sources/logs /sources/dags /sources/plugins
chown -R "${AIRFLOW_UID}:0" /sources/{logs,dags,plugins}
exec /entrypoint airflow version
environment:
<<: *airflow-common-env
_AIRFLOW_DB_MIGRATE: 'true'
_AIRFLOW_WWW_USER_CREATE: 'true'
_AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-admin}
_AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-admin}
user: "0:0"
volumes:
- ./:/sources
airflow-webserver:
<<: *airflow-common
container_name: airflow-webserver
command: webserver
ports:
- "8080:8080"
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
interval: 30s
timeout: 10s
retries: 5
start_period: 30s
restart: unless-stopped
depends_on:
airflow-init:
condition: service_completed_successfully
airflow-scheduler:
<<: *airflow-common
container_name: airflow-scheduler
command: scheduler
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:8974/health"]
interval: 30s
timeout: 10s
retries: 5
start_period: 30s
restart: unless-stopped
depends_on:
airflow-init:
condition: service_completed_successfully
airflow-worker:
<<: *airflow-common
container_name: airflow-worker
command: celery worker
healthcheck:
test:
- "CMD-SHELL"
- 'celery --app airflow.providers.celery.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}" || celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"'
interval: 30s
timeout: 10s
retries: 5
start_period: 30s
environment:
<<: *airflow-common-env
DUMB_INIT_SETSID: "0"
restart: unless-stopped
depends_on:
airflow-init:
condition: service_completed_successfully
airflow-triggerer:
<<: *airflow-common
container_name: airflow-triggerer
command: triggerer
healthcheck:
test: ["CMD-SHELL", 'airflow jobs check --job-type TriggererJob --hostname "$${HOSTNAME}"']
interval: 30s
timeout: 10s
retries: 5
start_period: 30s
restart: unless-stopped
depends_on:
airflow-init:
condition: service_completed_successfully
flower:
<<: *airflow-common
container_name: airflow-flower
command: celery flower
ports:
- "5555:5555"
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:5555/"]
interval: 30s
timeout: 10s
retries: 5
start_period: 30s
restart: unless-stopped
depends_on:
airflow-init:
condition: service_completed_successfully
volumes:
postgres_data:
방법 4: 커스텀 이미지 빌드
추가 Python 패키지가 필요한 경우:
Dockerfile:
FROM apache/airflow:2.10.0
USER root
RUN apt-get update && apt-get install -y --no-install-recommends \
build-essential \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/*
USER airflow
COPY requirements.txt /requirements.txt
RUN pip install --no-cache-dir -r /requirements.txt
requirements.txt:
apache-airflow-providers-amazon==8.0.0
apache-airflow-providers-google==10.0.0
apache-airflow-providers-slack==8.0.0
pandas==2.0.3
requests==2.31.0
docker-compose.yml 수정:
x-airflow-common: &airflow-common
# image: apache/airflow:2.10.0
build:
context: .
dockerfile: Dockerfile
# ... 나머지 설정
빌드 및 실행:
docker compose build
docker compose up -d
샘플 DAG 작성
dags/example_dag.py:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
# 기본 인자 설정
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
# DAG 정의
with DAG(
dag_id='example_etl_pipeline',
default_args=default_args,
description='간단한 ETL 파이프라인 예제',
schedule_interval='@daily', # 매일 실행
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['example', 'etl'],
) as dag:
# Task 1: 데이터 추출
extract = BashOperator(
task_id='extract_data',
bash_command='echo "Extracting data..." && sleep 2',
)
# Task 2: 데이터 변환
def transform_data(**kwargs):
print("Transforming data...")
# 여기에 변환 로직 추가
return "transformed_data"
transform = PythonOperator(
task_id='transform_data',
python_callable=transform_data,
)
# Task 3: 데이터 로드
load = BashOperator(
task_id='load_data',
bash_command='echo "Loading data..." && sleep 2',
)
# Task 의존성 정의
extract >> transform >> load
환경 변수 설정
주요 환경 변수
| 변수명 | 설명 | 기본값 |
|---|---|---|
AIRFLOW__CORE__EXECUTOR | 실행 모드 | SequentialExecutor |
AIRFLOW__CORE__LOAD_EXAMPLES | 예제 DAG 로드 | true |
AIRFLOW__CORE__DAGS_FOLDER | DAG 디렉토리 | /opt/airflow/dags |
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN | DB 연결 문자열 | – |
AIRFLOW__WEBSERVER__SECRET_KEY | 웹서버 시크릿 | – |
AIRFLOW__CELERY__BROKER_URL | Celery 브로커 URL | – |
Executor별 설정
# LocalExecutor
AIRFLOW__CORE__EXECUTOR=LocalExecutor
AIRFLOW__CORE__PARALLELISM=32
AIRFLOW__CORE__DAG_CONCURRENCY=16
# CeleryExecutor
AIRFLOW__CORE__EXECUTOR=CeleryExecutor
AIRFLOW__CELERY__WORKER_CONCURRENCY=16
AIRFLOW__CELERY__BROKER_URL=redis://redis:6379/0
Traefik 리버스 프록시 연동
version: '3.8'
x-airflow-common: &airflow-common
image: apache/airflow:2.10.0
environment: &airflow-common-env
AIRFLOW__CORE__EXECUTOR: LocalExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CORE__FERNET_KEY: ''
AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
AIRFLOW__WEBSERVER__BASE_URL: https://airflow.yourdomain.com
volumes:
- ./dags:/opt/airflow/dags
- ./logs:/opt/airflow/logs
- ./plugins:/opt/airflow/plugins
user: "${AIRFLOW_UID:-50000}:0"
depends_on:
postgres:
condition: service_healthy
services:
postgres:
image: postgres:15-alpine
container_name: airflow-postgres
environment:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflow
volumes:
- postgres_data:/var/lib/postgresql/data
healthcheck:
test: ["CMD", "pg_isready", "-U", "airflow"]
interval: 10s
timeout: 5s
retries: 5
restart: unless-stopped
networks:
- airflow
airflow-webserver:
<<: *airflow-common
container_name: airflow-webserver
command: webserver
labels:
- "traefik.enable=true"
- "traefik.http.routers.airflow.rule=Host(`airflow.yourdomain.com`)"
- "traefik.http.routers.airflow.entrypoints=websecure"
- "traefik.http.routers.airflow.tls=true"
- "traefik.http.routers.airflow.tls.certresolver=letsencrypt"
- "traefik.http.services.airflow.loadbalancer.server.port=8080"
restart: unless-stopped
networks:
- airflow
- traefik
airflow-scheduler:
<<: *airflow-common
container_name: airflow-scheduler
command: scheduler
restart: unless-stopped
networks:
- airflow
airflow-init:
<<: *airflow-common
container_name: airflow-init
entrypoint: /bin/bash
command:
- -c
- |
exec /entrypoint airflow version
environment:
<<: *airflow-common-env
_AIRFLOW_DB_MIGRATE: 'true'
_AIRFLOW_WWW_USER_CREATE: 'true'
_AIRFLOW_WWW_USER_USERNAME: admin
_AIRFLOW_WWW_USER_PASSWORD: admin
user: "0:0"
networks:
- airflow
volumes:
postgres_data:
networks:
airflow:
driver: bridge
traefik:
external: true
업그레이드 및 백업
버전 업그레이드
# 현재 버전 확인
docker exec airflow-webserver airflow version
# 새 버전으로 이미지 업데이트
# docker-compose.yml에서 이미지 태그 변경 후:
docker compose pull
docker compose down
docker compose up airflow-init
docker compose up -d
데이터베이스 백업
# PostgreSQL 백업
docker exec airflow-postgres pg_dump -U airflow airflow > airflow_backup_$(date +%Y%m%d).sql
# 복원
docker exec -i airflow-postgres psql -U airflow airflow < airflow_backup_20250305.sql
DAG 백업
# DAG 디렉토리 백업
tar -czvf dags_backup_$(date +%Y%m%d).tar.gz ./dags
문제 해결
초기화 실패
# 로그 확인
docker compose logs airflow-init
# 수동 DB 초기화
docker compose run --rm airflow-webserver airflow db init
메모리 부족
services:
airflow-webserver:
deploy:
resources:
limits:
memory: 2G
스케줄러 문제
# 스케줄러 로그 확인
docker compose logs -f airflow-scheduler
# 스케줄러 재시작
docker compose restart airflow-scheduler
DAG 인식 안 됨
# DAG 파싱 오류 확인
docker exec airflow-scheduler airflow dags list-import-errors
# DAG 목록 확인
docker exec airflow-scheduler airflow dags list
사용 사례
1. ETL 파이프라인
- 데이터 웨어하우스 적재
- API 데이터 수집 및 변환
- 일별/주별 배치 처리
2. ML 파이프라인
- 모델 학습 스케줄링
- Feature Engineering 자동화
- 모델 배포 파이프라인
3. 인프라 자동화
- 백업 스케줄링
- 리포트 생성 및 이메일 발송
- 시스템 모니터링 및 알림
도구 비교
| 기능 | Airflow | Prefect | Dagster | Luigi |
|---|---|---|---|---|
| 언어 | Python | Python | Python | Python |
| UI | ✅ 풍부함 | ✅ Cloud | ✅ Dagit | ❌ 기본 |
| 스케줄링 | ✅ Cron+DAG | ✅ 유연함 | ✅ | ✅ |
| 확장성 | ✅ Celery/K8s | ✅ | ✅ | ⚠️ 제한 |
| 학습 곡선 | 높음 | 중간 | 중간 | 낮음 |
| 커뮤니티 | 매우 활발 | 활발 | 성장 중 | 안정 |
결론
Apache Airflow는 데이터 엔지니어링과 워크플로우 오케스트레이션의 업계 표준으로, 다음과 같은 경우에 적합합니다:
- 대규모 데이터 파이프라인: ETL, ELT, 데이터 웨어하우스 관리
- 복잡한 의존성 관리: 수십~수백 개의 태스크가 있는 워크플로우
- 엔터프라이즈 환경: 팀 협업, 감사 로그, 권한 관리 필요
- 클라우드 통합: AWS, GCP, Azure 등과의 네이티브 연동
다만 설치와 운영에 상당한 리소스가 필요하므로, 단순한 Cron 대체나 소규모 워크플로우에는 Dagu나 Prefect 같은 경량 대안을 고려해볼 수 있습니다.