
01. Airflow DAG
DAG
- 오퍼레이터 : 특정 행위를 할 수 있는 기능을 모아 놓은 클래스
- Task : 오퍼레이터에서 객체화(인스턴스화)되어 DAG에서 실행 가능한 오브젝트
- Bash 오퍼레이터 : 쉘 스크립트 명령을 수행하는 오퍼레이터
DAG에서는 오퍼레이터를 통해 만들어진 Task들이 실행되는 것
Task는 방향성을 가지고 있고 순환되지 않는 형태로 연결되어 있음
Task의 수행 주체
- 스케줄러 : 머리역할
1. 우리가 만든 DAG 파일을 읽어 들인(파싱) 후 DB에 정보 저장
2. DAG 시작시간 결정
- 워커 : 실제 작업 수행
1. 스케줄러가 시킨 DAG 파일을 찾아 처리
2. 처리가 되기 전 후, 메타DB에 업데이트
02. DAG 작성하기
airflow 프로젝트 디렉토리에 /dags/dags_bash_operator.py 폴더와 파일을 생성한다.

http://localhost:8080/home airflow 서비스에서 샘플 코드를 참고하여 DAG를 작성한다.

DAG 정의
# DAG 정의, 필수
with DAG(
dag_id="dags_bash_operator",
schedule="0 0 * * *",
start_date=pendulum.datetime(2024, 3, 1, tz="Asia/Seoul"),
catchup=False,
# dagrun_timeout=datetime.timedelta(minutes=60),
tags=["example", "example2"],
# params={"example_key": "example_value"},
) as dag:
- dag_id : 화면에서 보이는 DAG 이름
파이썬 파일명과 상관 없지만 일반적으로 DAG 파일명과 id를 일치시키는 것이 직관적으로 빨리 찾기 위해 좋다.
- schedule : 크론 스케쥴로 DAG이 언제 도는지 주기를 설정
"분 시 일 월 요일" 5개의 항목으로 나타냄
"0 0 * * *" 매일 0시 0분에 돌도록 설정
- start_date : DAG이 언제부터 도는지 설정
- catchup : start_date 이전 구간에 대해 DAG을 돌릴지 결정, false면 start_date부터, true면 start_date 이전의 누락된 기간에 대해서도 DAG 실행
- dagrun : timeout 값 설정
- tag : 화면에서 보이는 tag 값
- params : task들에 공통적으로 넘겨줄 파라미터 작성
Task
# [START howto_operator_bash]
bash_t1 = BashOperator(
task_id="bash_t1",
bash_command="echo who am i",
)
bash_t2 = BashOperator(
task_id="bash_t2",
bash_command="echo $HOSTNAME",
)
task는 오퍼레이터를 통해서 만들어진다.
- task_id : Graph에 나오는 이름값
task도 객체명과 상관없지만 동일하게 정의해 준다.

- bash_command : 어떤 shell script를 수행할지 설정
! $HOSTNAME : HOSTNAME이라고 하는 환경변수 값을 출력하라는 의미
Task 정의를 모두 하고 제일 아래 부분에 task 실행 순서를 적어준다.
bash_t1 >> bash_t2
전체 코드
from airflow import DAG
import datetime
import pendulum
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
# DAG 정의, 필수
with DAG(
dag_id="dags_bash_operator",
schedule="0 0 * * *",
start_date=pendulum.datetime(2024, 3, 1, tz="Asia/Seoul"),
catchup=False,
# dagrun_timeout=datetime.timedelta(minutes=60),
tags=["example", "example2"],
# params={"example_key": "example_value"},
) as dag:
# [START howto_operator_bash]
bash_t1 = BashOperator(
task_id="bash_t1",
bash_command="echo who am i",
)
bash_t2 = BashOperator(
task_id="bash_t2",
bash_command="echo $HOSTNAME",
)
bash_t1 >> bash_t2
03. Airflow 컨테이너와 DAG 연결하기
작성한 py 파일을 git에 올린다.
git add dags/
git commit -m "메세지"
git push
airflow를 설치하면서 생성한 디렉토리에 pull 받는다.
git pull

remote repository에 있던 dags 폴더를 가져왔다.

위에서 만든 DAG을 airflow에 연결해줘야 한다.
docker-compose.yaml 파일에서 volumes 항목이
airflow 디렉토리와 연결해 줄 컨테이너 디렉토리를 맵핑해 주는 부분이다.

# AIRFLOW_PROJ_DIR에 값이 있으면 그 값을 출력하고 없으면 . 을 출력하라는 명령어
${AIRFLOW_PROJ_DIR:-.} # .
# ./dags를 /opt/airflow/dags(컨테이너의 dags)와 연결시키라는 의미
${AIRFLOW_PROJ_DIR:-.}/dags:/opt/airflow/dags
지금은 현재 디렉토리(Statground) 아래에 있는 dags 디렉토리가 컨테이너의 dags와 연결되어 있다.
이거를 위에서 pull 받아준 Statground/airflow/dags 디렉토리와 연결해줘야 한다.
volumes 항목을 수정한다.

04. Airflow에서 DAG 파일 확인하기
airflow 서비스를 올려 DAG 파일이 잘 반영되었는지 확인해 준다.
docker compose down # airflow 서비스 내리기
docker compose up # airflow 서비스 내리기

DAG을 처음 업로드 하면 pause 된 상태로 올라오기 때문에
unpause를 눌러 상태를 바꿔준다.
스케줄이 걸려있는 DAG은 unpause를 누르면 기본적으로 한번 실행된다.
Grid 탭에서 실행된 스케줄에 대한 내용과 각각의 task들에 대한 결과를 확인할 수 있다.

Log탭에서 결과를 확인할 수 있다.
bash_t1 결과

bash_t2 결과

결과를 보면 $HOSTNAME 환경변수 값 'fa2d644d58dd'을 출력하였다.
docker ps로 도커 컨테이너 리스트를 확인했을 때, worker 컨테이너의 id가 'fa2d644d58dd'인 것을 볼 수 있다.
실제 Task를 처리하는 요소는 워커 컨테이너이기 때문이다.

worker 컨테이너 안에 들어가서 echo $HOSTNAME을 입력하여 동일한 값이 나오는지 확인한다.
docker exec -it fa2d644d58dd bash
동일한 값이 나왔다. airflow에서 실제 task를 처리하는 요소는 worker라는 것을 알 수 있다.

출처 : Airflow 마스터 클래스
'⚙️ 데이터 엔지니어링 > Airflow' 카테고리의 다른 글
[Airflow] Task 연결하기 (>>, <<, set_downstream(), set_upstream()) (0) | 2024.03.31 |
---|---|
[Airflow] Cron 표현식 정리 (0) | 2024.03.31 |
[Airflow] MacOS m1 환경에 Airflow 개발환경 설정하기 (2) | 2024.03.30 |
[Airflow] 01. Airflow 소개 (0) | 2024.03.16 |

01. Airflow DAG
DAG
- 오퍼레이터 : 특정 행위를 할 수 있는 기능을 모아 놓은 클래스
- Task : 오퍼레이터에서 객체화(인스턴스화)되어 DAG에서 실행 가능한 오브젝트
- Bash 오퍼레이터 : 쉘 스크립트 명령을 수행하는 오퍼레이터
DAG에서는 오퍼레이터를 통해 만들어진 Task들이 실행되는 것
Task는 방향성을 가지고 있고 순환되지 않는 형태로 연결되어 있음
Task의 수행 주체
- 스케줄러 : 머리역할
1. 우리가 만든 DAG 파일을 읽어 들인(파싱) 후 DB에 정보 저장
2. DAG 시작시간 결정
- 워커 : 실제 작업 수행
1. 스케줄러가 시킨 DAG 파일을 찾아 처리
2. 처리가 되기 전 후, 메타DB에 업데이트
02. DAG 작성하기
airflow 프로젝트 디렉토리에 /dags/dags_bash_operator.py 폴더와 파일을 생성한다.

http://localhost:8080/home airflow 서비스에서 샘플 코드를 참고하여 DAG를 작성한다.

DAG 정의
# DAG 정의, 필수
with DAG(
dag_id="dags_bash_operator",
schedule="0 0 * * *",
start_date=pendulum.datetime(2024, 3, 1, tz="Asia/Seoul"),
catchup=False,
# dagrun_timeout=datetime.timedelta(minutes=60),
tags=["example", "example2"],
# params={"example_key": "example_value"},
) as dag:
- dag_id : 화면에서 보이는 DAG 이름
파이썬 파일명과 상관 없지만 일반적으로 DAG 파일명과 id를 일치시키는 것이 직관적으로 빨리 찾기 위해 좋다.
- schedule : 크론 스케쥴로 DAG이 언제 도는지 주기를 설정
"분 시 일 월 요일" 5개의 항목으로 나타냄
"0 0 * * *" 매일 0시 0분에 돌도록 설정
- start_date : DAG이 언제부터 도는지 설정
- catchup : start_date 이전 구간에 대해 DAG을 돌릴지 결정, false면 start_date부터, true면 start_date 이전의 누락된 기간에 대해서도 DAG 실행
- dagrun : timeout 값 설정
- tag : 화면에서 보이는 tag 값
- params : task들에 공통적으로 넘겨줄 파라미터 작성
Task
# [START howto_operator_bash]
bash_t1 = BashOperator(
task_id="bash_t1",
bash_command="echo who am i",
)
bash_t2 = BashOperator(
task_id="bash_t2",
bash_command="echo $HOSTNAME",
)
task는 오퍼레이터를 통해서 만들어진다.
- task_id : Graph에 나오는 이름값
task도 객체명과 상관없지만 동일하게 정의해 준다.

- bash_command : 어떤 shell script를 수행할지 설정
! $HOSTNAME : HOSTNAME이라고 하는 환경변수 값을 출력하라는 의미
Task 정의를 모두 하고 제일 아래 부분에 task 실행 순서를 적어준다.
bash_t1 >> bash_t2
전체 코드
from airflow import DAG
import datetime
import pendulum
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
# DAG 정의, 필수
with DAG(
dag_id="dags_bash_operator",
schedule="0 0 * * *",
start_date=pendulum.datetime(2024, 3, 1, tz="Asia/Seoul"),
catchup=False,
# dagrun_timeout=datetime.timedelta(minutes=60),
tags=["example", "example2"],
# params={"example_key": "example_value"},
) as dag:
# [START howto_operator_bash]
bash_t1 = BashOperator(
task_id="bash_t1",
bash_command="echo who am i",
)
bash_t2 = BashOperator(
task_id="bash_t2",
bash_command="echo $HOSTNAME",
)
bash_t1 >> bash_t2
03. Airflow 컨테이너와 DAG 연결하기
작성한 py 파일을 git에 올린다.
git add dags/
git commit -m "메세지"
git push
airflow를 설치하면서 생성한 디렉토리에 pull 받는다.
git pull

remote repository에 있던 dags 폴더를 가져왔다.

위에서 만든 DAG을 airflow에 연결해줘야 한다.
docker-compose.yaml 파일에서 volumes 항목이
airflow 디렉토리와 연결해 줄 컨테이너 디렉토리를 맵핑해 주는 부분이다.

# AIRFLOW_PROJ_DIR에 값이 있으면 그 값을 출력하고 없으면 . 을 출력하라는 명령어
${AIRFLOW_PROJ_DIR:-.} # .
# ./dags를 /opt/airflow/dags(컨테이너의 dags)와 연결시키라는 의미
${AIRFLOW_PROJ_DIR:-.}/dags:/opt/airflow/dags
지금은 현재 디렉토리(Statground) 아래에 있는 dags 디렉토리가 컨테이너의 dags와 연결되어 있다.
이거를 위에서 pull 받아준 Statground/airflow/dags 디렉토리와 연결해줘야 한다.
volumes 항목을 수정한다.

04. Airflow에서 DAG 파일 확인하기
airflow 서비스를 올려 DAG 파일이 잘 반영되었는지 확인해 준다.
docker compose down # airflow 서비스 내리기
docker compose up # airflow 서비스 내리기

DAG을 처음 업로드 하면 pause 된 상태로 올라오기 때문에
unpause를 눌러 상태를 바꿔준다.
스케줄이 걸려있는 DAG은 unpause를 누르면 기본적으로 한번 실행된다.
Grid 탭에서 실행된 스케줄에 대한 내용과 각각의 task들에 대한 결과를 확인할 수 있다.

Log탭에서 결과를 확인할 수 있다.
bash_t1 결과

bash_t2 결과

결과를 보면 $HOSTNAME 환경변수 값 'fa2d644d58dd'을 출력하였다.
docker ps로 도커 컨테이너 리스트를 확인했을 때, worker 컨테이너의 id가 'fa2d644d58dd'인 것을 볼 수 있다.
실제 Task를 처리하는 요소는 워커 컨테이너이기 때문이다.

worker 컨테이너 안에 들어가서 echo $HOSTNAME을 입력하여 동일한 값이 나오는지 확인한다.
docker exec -it fa2d644d58dd bash
동일한 값이 나왔다. airflow에서 실제 task를 처리하는 요소는 worker라는 것을 알 수 있다.

출처 : Airflow 마스터 클래스
'⚙️ 데이터 엔지니어링 > Airflow' 카테고리의 다른 글
[Airflow] Task 연결하기 (>>, <<, set_downstream(), set_upstream()) (0) | 2024.03.31 |
---|---|
[Airflow] Cron 표현식 정리 (0) | 2024.03.31 |
[Airflow] MacOS m1 환경에 Airflow 개발환경 설정하기 (2) | 2024.03.30 |
[Airflow] 01. Airflow 소개 (0) | 2024.03.16 |