끄적끄적

Apache AirFlow를 이용한 scheduling 경험 기록 본문

개발 툴 사용관련

Apache AirFlow를 이용한 scheduling 경험 기록

monkeydev 2019. 6. 7. 22:20

1. 문서의 목적

회사에서 AirFlow를 이용하여서 단순 반복 작업을 자동화 시키는 업무를 진행중이다.

이와 관련하여 도움이 될만한 정보들을 기록한다.

 

2. AirFlow 개요

AirFlow란 순서관계가 있는 여러가지 작업들을 DAG(Directed Acyclic Graph)로 구현할 수 있도록 도와주는 파이썬 패키지이다.

리눅스에서 제공하는 "크론탭"이라는 것과 비슷한 역할을 한다고 한다.(저자는 크론탭은 사용해보지 않았다.)

 

순서관계야 쉘스크립트 하나 짜서, 호출하도록 하면 되는거 아니냐 싶겠지만!

AirFlow를 이용하면 로그수집이나, 실패시의 retry, 그리고 무엇보다 깔끔한 UI를 통해서 작업 상황을 일목요연하게 추적할 수 있다.

 

개인적으로는 모든 stdout을 알아서 시간대별로 기록해 주어서, 따로 로그파일을 작성하지 않아도 되어 편리하다.

이 로그파일은 AirFlow상의 UI를 통해 편하게 접근할 수 있다.

 

아래 페이지의 Concepts 부분의 tutorial들을 모두 따라하다보면, AirFlow의 기능들을 간단히 살펴볼 수 있다.

https://airflow.apache.org/ 

 

Apache Airflow Documentation — Airflow Documentation

Airflow is a platform to programmatically author, schedule and monitor workflows. Use airflow to author workflows as directed acyclic graphs (DAGs) of tasks. The airflow scheduler executes your tasks on an array of workers while following the specified dep

airflow.apache.org

아래의 3번을 이해하기 위해서는 공식문서의 "Concepts" 부분을 어느정도 이해해야 한다.

 

3. AirFlow의 DAG 수행 중 Failure가 발생 했을 때 해결 방법

(3번 목차는 2번에 언급된 에어플로우 공식 문서의 concepts 항목을 충실히 수행하고 왔다고 가정하고 작성한다.)

 

AirFlow의 BackFill을 이용하여 작업을 수행하다가, 어떤 task에서 Failure가 발생했다고 하자.

이 때, 해당 task의 뒤쪽 즉, 이 task가 성공해야 수행될 수 있는 task들은 모두 upstream_failure라는 실패 상태가 된다.

 

이렇게 task의 failure가 발생했을 때, failure의 발생원인을 찾아서 다시 실행한다고 해도, 한번 실패했던 내용은 계속해서 실패하게 된다.

 

이것이 AirFlow 측에서 의도한 것인지는 정확히 모르겠으나, 문제 원인을 해결했으면 문제가 발생한 task들의 실패기록을 지워주어야 재실행할 수 있다.

 

다음과 같이 Task들이 순서대로 실행되는 DAG가 있었다고 가정하자.

A->B->C->D->E

이 때, C에서 문제가 발생했다면 C 이후의 D,E에 대해서도 실패기록을 지워줘야 한다.

이때는 airflow의 CLI help 명령어에 따라 다음과 같이 명령어를 주면 C와 그 downstream들의 실패기록을 삭제할 수 있다.

(airflowtest) sungil@sungil:~$ airflow clear --help
[2019-06-08 10:46:49,826] {__init__.py:51} INFO - Using executor SequentialExecutor
usage: airflow clear [-h] [-t TASK_REGEX] [-s START_DATE] [-e END_DATE]
                     [-sd SUBDIR] [-u] [-d] [-c] [-f] [-r] [-x] [-xp] [-dx]
                     dag_id

positional arguments:
  dag_id                The id of the dag

optional arguments:
  -h, --help            show this help message and exit
  -t TASK_REGEX, --task_regex TASK_REGEX
                        The regex to filter specific task_ids to backfill
                        (optional)
  -s START_DATE, --start_date START_DATE
                        Override start_date YYYY-MM-DD
  -e END_DATE, --end_date END_DATE
                        Override end_date YYYY-MM-DD
  -sd SUBDIR, --subdir SUBDIR
                        File location or directory from which to look for the
                        dag. Defaults to '[AIRFLOW_HOME]/dags' where
                        [AIRFLOW_HOME] is the value you set for 'AIRFLOW_HOME'
                        config you set in 'airflow.cfg'
  -u, --upstream        Include upstream tasks
  -d, --downstream      Include downstream tasks
  -c, --no_confirm      Do not request confirmation
  -f, --only_failed     Only failed jobs
  -r, --only_running    Only running jobs
  -x, --exclude_subdags

(airflowtest) sungil@sungil:~$ airflow clear -t <C의 task name> --downstream <c가 속한 DAG name>

이렇게 실패 기록을 지워주면 backfill 등으로 재실행 했을때, 실패했던 C부터 작업을 진행하게 된다.

 

4. CentOS7에서의 AirFlow LocalExecutor 이용을 위한 MySQL 연동

AirFlow에서 기본적으로 DAG 내에서의 operator들을 SQLite를 이용한다.
SQLite는 동시접근이 되지 않는 문제가 있기 때문에, DAG 내에서 여러개의 Operator가 동시 구동되지 않는다.
SQLite를 이용해도 여러개의 DAG이 동시 실행되는 것에는 문제가 없지만, DAG 내에서의 병렬 작업을 원한다면 반드시 MySQL이나, PostgreSQL 같은 RDB와의 연동이 필요하다.

 

아래로는 CentOS7기반으로 구동되는 AirFlow에서 RDB를 연동하는 방법을 서술한다.

  1. Docker를 이용한 MySQL 설치(mysql 5.7 버전 설치에 대해서 테스트 됐다)
    아래의 url을 참고하면 MySQL을 설치할 수 있다.
    https://jayden-lee.github.io/post/docker/mysql-install/

    설치 후에 MySQL에 ssh로 붙은다음에 다음의 작업을 해주어야 한다.

    mysql은 반드시 5.7 버전을 이용바란다. mysql의 최신버전(latest)에서는 kubernates 관련 오류가 발생했었다.
    mysql 5.7 보다 높은 버전에서는 돌아간다고 장담할 수 없다.

# 1. MySQL이 구동중인 container로 ssh를 통해 붙는다.
$> docker exec -it <container_id> bash

# 2. MySQL에 접속하여 airflow가 사용할 DataBase를 생성해준다.
$> mysql -p
*********
$mysql> create database airflow;
Created database airflow...

# 3. MySQL의 timestamp 정책을 설정해야 한다.(AirFlow를 위해서)
# 아래의 airflow docs를 참고하면 이 작업이 꼭 필요함을 알 수 있다.
# https://airflow.readthedocs.io/en/stable/faq.html 
# 위의 url내용을 적용하기 위해서 mysql에 접근하여 다음과 같은 명령을 이용할 수 있다.
mysql> set global explicit_defaults_for_timestamp=1;

# 아래의 명령어로 제대로 적용됐는지 확인 가능하다.
mysql> show global variables like '%timestamp%';

# 위의 내용은 아래의 stackoverflow의 답변을 참고했다.
https://stackoverflow.com/questions/15701636/how-to-enable-explicit-defaults-for-timestamp/15896461

2. AirFlow가 구동중인 곳에서 airflow.cfg의 설정값을 다음과 같이 바꿔주어야 한다.

# airflow.cfg의 특정 값들을 다음과 같이 변경한다.

...

executor = LocalExecutor
...

sql_alchemy_conn = mysql://root:<MYSQL_ROOT_PASSWORD>@<MYSQL_DOCKER_CONTAINER_IP>:<MYSQL_PORT_NUM>/airflow
(여기서 ...3306/airflow의 airflow는 위에서 생성한 mysql의 데이터베이스 이름을 넣어준것이다.)

...

설정값을 바꿔준 후, CentOS7 기준으로 $> airflow initdb 명령어를 쳐봤을 때 바로 수행되지 않는다.
다음의 사항들을 인스톨해줘야 initdb 명령어가 먹는다.

$> yum install mysql
$> yum install mysql-devel
$> yum install gcc
$> pip install pymysql
$> yum install MySQL-python

# 중요한 건 이 mysqlclient가 인스톨 되어야 airflow initdb 명령어가 먹는다는 것이다.
# mysqlclient가 바로 깔리지 않는다면 위의 것들을 수행해보고 하면 될 것이다.
$> pip3 install mysqlclient

 

3. 마지막 작업
끝으로, 위의 작업을 모두 마친 후에 airflow initdb 명령어가 잘 동작한다면, airflow의 webserver와 scheduler를 재구동해야 한다. 특히 경험상 webserver의 경우 재시작하지 않으면, backfill등으로 DAG을 실행해도, webserver 페이지에 테스크의 진행상황이 전혀 표시되지 않는 요류가 있다.

때문에 마지막으로 initdb -> webserver 재구동 -> scheduler 재구동 -> initdb 작업으로 마무리하도록 한다.

2019-09-03에 테스트한 바로는 다음과 같은 순서로 작업하면 된다.
1. 위의 작업을 마치고 airflow docker를 restart 한다.
2. airflow가 재시작 되면, web으로의 연결을 해주는 gunicorn: master [airflow-webserver]가 닫혀있을거다. 이 때, airflow home의 webserver pid관리파일(~/airflow/airflow-webserver.pid)를 열어서 pid를 지운 다음에, airflow webserver를 다시 띄운다.
3. 2번을 작업하고 나면, airflow docker가 다시 한번 꺼질 것이다. 꺼졌을 때 다시 한번 airflow docker를 restart 해주면, 위의 내용들이 적용된 airflow docker가 구동되기 시작한다.

5. AirFlow의 EmailOperator를 이용한 메일 전송(SMTP)

http://manojvsj.blogspot.com/2016/09/how-smtp-works-and-how-to-configure.html 

 

Technical blog

 

manojvsj.blogspot.com

위의 블로그를 보면, SMTP 서버를 이용해서 보내고자 하는 메일을 보낼 수 있음을 알 수 있다.
AirFlow를 이용하면 특정 Operator에서 Failure가 발생하거나, 아니면 직접 EmailOperator를 이용하여 메일 보내기 작업 자체를 작업으로 만들 수 있다.

Gmail을 이용한다고 했을 때, 사용할 Gmail 계정에 대해서 아래 url의 내용들을 적용시켜야 한다.
https://devgyugyu.tistory.com/14  

그 후, 아래와 같이 airflow.cfg의 [smtp] 부분을 변경해주면 된다.

# airflow.cfg 파일
.....
[smtp]
smtp_host = smtp.gmail.com
smtp_starttls = True
smtp_ssl = False
smtp_user = whwldyd77@gmail.com
smtp_port = 587
smtp_password = whwldydWkd
smtp_mail_from = whwldyd77@gmail.com
......



6. AirFlow의 start_date와 schedule_interval

airflow를 이용하여 DAG을 정기적으로 구동시키고 싶을 때, 사용자는 다음의 3가지를 주의해서 DAG을 구성해야 한다.
주의할 것 3가지는 아래와 같다.

  • start_date: DAG 구동의 기준점이 될 시간. 이름과는 다르게 이 때 시작되는게 아님을 주의해야 한다!
  • schedule_interval: 어느 주기로 실행될지 정한다. schedule_interval은 cron expression와 (from datetime import timedelta)의 timedelta를 이용하여 구현할 수 있다.
  • catchup=False or catchup=True: DAG를 구동하면서 과거데이터를 쭉 수집하면서 갈지(catchup=True) 아니면 현재 수집 되어있어야 할 데이터만 수집할지 정한다.(이 말이 상당히 애매하다는 것을 안다. 아래에서 좀 더, 실제 사례를 들어 설명하겠다)

아래 내용을 주의깊게 읽어주길 바랍니다!
start_date는 schedule_interval에 의해 처음 구동될 시간을 정하게 된다.
즉 start_date 자체가 DAG의 시작시간이 되지 않는 다는것을 기억해야 한다.

(catchup에 대해 아직 설명을 하지 않았지만 catchup에 대한 전제가 필요하다.)만약 DAG의 catchup이 True(=default)라고 했을 때, DAG의 설정이 다음과 같이 되어 있다고 가정하자.

  • start_date = datetime(2019, 1, 5) # 2019년 1월 5일을 나타낸다.
  • schedule_interval = "0 20 1 * *" # 차례대로 (분 시 일 월 년)을 나타내며 매월 1일 20:00시에 구동됨을 의미한다
  • 실제 오늘 날짜가 2019-05-06일이라 가정하자
  • airflow.cfg의 timezone 설정을 Asia/Seoul로 바꿔놓은 상황을 가정한다.

위의 상황에 airflow scheduler가 정상구동되고있고, airflow UI상의 DAG 활성화 버튼 ON/OFF 버튼이 ON으로 되어 있다면 DAG이 정상 구동되게 될 것이다.

이 때, DAG에서는 다음과 같은 run_id를 갖는 job들이 실행되게 된다.

  • 2019-02-01T11:00:00+00:00(=2019-02-01T20:00:00+09:00 이고, airflow.cfg에서 timezone을 설정해도 airflow에서는 내부적으로 항상 UTC로 변환된 값을 표현한다. 단 cron expression으로 정의해 놓은 schedule_interval 형태는 변환 과정을 거치지 않고 보여준다.
  • 2019-03-01T11:00:00+00:00
  • 2019-04-01T11:00:00+00:00

여기서 어째서 5월 1일 날짜의 run_id(2019-05-01T11:00:00+00:00)이 생성되지 않는지 의아했다.
이는 아마 5월 1일 날짜에 대한 구동은 6월 1일에 필요할 것이라고 airflow에서 판단하는 것 같다.
즉 매월 1일에 구동되는 어플리케이션이 있다면, 5월 1일 자 데이터 생성은 6월 1일에 수행하게 될 것이라는 것을 감안하고 이렇게 만든 듯 하다. 

 

위에서 2019년 2,3,4월달의 데이터를 수집해야 하는 상황이라면 위와 같은 설정으로 실행하면 된다.


그런데 만약 2,3,4 월 달 분의 데이터만 필요하다면 catchup=False 옵션을 주어야 한다.
catchup은 과거데이터부터 거슬러 올라가면서 수집하겠다는 표현이다. 이는 곧 현재 날자 기준으로 수집되어 있어야할 가장 가까운 날짜인 4월 1일 기준의 데이터만 수집하도록 하는 옵션이다.

 

7. AirFlow에서 특정 DAG의 특정 execution_date(RUN ID)의 task를 실제 실행없이 success로 찍고 넘어가는 방법

airflow backfill --mark_success --task_regex {task 이름} --ignore_dependencies -s 2019-05-08 -e 2019-05-15 {dag 이름}

줄이면 아래와 같다.

$> airflow backfill -m -t {task 이름} -i -s 2019-05-08 -e 2019-05-15 {dag 이름}

mark success로 표시하려는 아이의 schedule_interval이 존재하는 DAG이라면 해당 스케쥴에 의해 execution_time을 알 수 있을 것이다.

schedule_interval이 None이나 '@once'인 경우는 execution_time은 start_date를 따라간다. 즉 end_date는 막줘도 된다.

[2019년 11월 7일 자에 해당 항목에 대하여 새로이 작성]
위의 경우는 airflow의 DB 세팅을 기본 SQLite로 설정했을때이다.
postgreSQL과 연동한 경우는 위의 방법으로 테스크를 success로 표시할 수는 있으나, 해당 DAG에 대한execution_date 가 완료(success) 상태로 표시된다. 

이렇게 되면, dag_run을 통한 자동실행의 경우에, 해당 DAG의 execution_date는 나머지 task를 처리하지 않고 넘어가게 된다. 

따라서 위의 기능은, UI에서 마우스로 해당 DAG의 execution_date에 대한 task들을 조작할 수 있는 상태를 만들기 위해서 사용하라. 

명령어를 통해서 해당 execution_date의 버튼들을 활성화 시켰으면, DAG을 clear 상태로 변경한 후, success 상태로 만들고 싶은 task들을 마우스로 일일이 클릭해서 성공 상태로 만들어야 한다.

[2020-06-19에 새로이 추가]

위 그림에서 backfill로 task를 success 시켰으면 Run Id가 backfill_2020-06 ... 이런식으로 뜬다.

이때, 이걸 scheduled_ 형태로 수정해주어야 정상적으로 구동된다.

8. AirFlow UI상에 보이는 불필요한 DAG 기록 제거

airflow를 구동하다 보면, 지워야 깔금한 DAG 기록이 있다.

이런것은 해당 DAG의 Tree View에서 가장 꼭대기에 있는 DAG을 누르고 Edit 버튼을 눌러서 지울 수 있다.

그러면 탭에 List Create Edit 이렇게 3개의 버튼이 보일 것이다.

여기서 List를 누른 후에, 지우고자 하는 DAG의 execution_date를 잘 찾아서 연필모양(수정버튼)을 누르면 다음을 변경할 수 있다.

  1. State
  2. Execution Date
  3. Run Id

위의 3개를 모두 빈칸으로 만들어버리고 Save 버튼을 누르면, 지저분한 기록이 지워진다.
DAG Id는 못바꾸니까, 바꾸려고 하지말자.

8. AirFlow에서 모든 recursive upstream이 성공했는지 조건을 걸수는 없다.

A DAG이 구동되는 동안에 B DAG이 구동되면 안되는 경우가 있다.(실무에서 만난 문제)

A와 B는 둘다 제일 처음의 테스크에서 서로가 구동중이라면 홀딩되게 했다.

A가 돌다가 갑자기 죽어서 B가 구동돼버린 상황이 있었다.

이 때, A는 이미 많이 구동되던 상태라 B가 끝나면 A가 다시 돌기를 바라는 상황이다.

이러한 경우 A에서는 DAG에 concurrency=1을 주어서, 기다리는 방법밖에 없다.

상호배타적으로 구동되어야 하는 테스크가 있다면, 둘중에 하나가 중간에 죽어서 순서가 바뀌었을 때, 기다리는 쪽은 잠시 concurrency=1로 set 해 놓아야 한다.

시간이 지나서, 결국은 다음의 해법으로 해결했다.

어떤 TASK1가 무조건 처리되어 있어야만 다른 task들이 진행된다고 하면, 다음과 같이 DAG을 구성하면 된다.

TASK1 >> (TASK2, TASK3, TASK4, TASK5 ....) 

다만, 이렇게 하면 그림이 상당히 지저분해지는 단점이 있다.

이렇게 구성하는게, concurrency=1로 설정할 필요가 없어져서, 훨씬 실용적으로 보인다.

9. 동일 DAG에 대하여 이전 구동이 실패하면 현재의 구동을 하지 않도록 하기

실무에서 일하면서 다음과 같은 상황이 있었다.

일단, 당시의 물리적인 시간은 11월 7일이다.
매월 1일 오후 10시에 구동되는 DAG이 있다. 이 DAG을 이용하여 9,10월 데이터를 수집하려는 상황이다.
DAG의 start_date를 9월 1일 날짜에 잘 수행될 수 있도록 2019년 9월 1일 9시 30분 정도로 잘 세팅해 놓은 상태였다.

위의 DAG을 UI상에서 DAG을 ON 상태로 하여 실행한다.(즉 정해진 execution_date마다 scheduler에 의해 자동 구동되도록 한다.)

이 때, 9월달의 구동에 실패하면 10월달의 구동을 하지 않을 줄 알았는데 9월달이 실패하니 10월달 분의 구동을 바로 시작했다. 9월달이 실패했을 때 10월달의 수집이 되지 않게 하려면 다음과 같이 해야 한다.


from airflow.models import DAG
from airflow.operators.subdag_operator import SubDagOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
from airflow.operators.bash_operator import BashOperator

from airflow.sensors.external_task_sensor import ExternalTaskSensor
from airflow.utils.state import State

args = {
    'owner': 'airflow',
    'start_date': datetime(2019, 2, 2, 10, 10, 10),
}

dag = DAG(
    dag_id='stoper',
    max_active_runs=1,
    default_args=args,
    schedule_interval='0 12 1 * *',
)

start = DummyOperator(
    task_id='start',
    dag=dag,
    depends_on_past=True,
    wait_for_downstream=True,
)

previous = ExternalTaskSensor(
    task_id='Previous_Run',
    external_dag_id='stoper',
    external_task_id='All_Tasks_Completed',
    allowed_state=[State.SUCCESS, State.SKIPPED],
    execution_delta=timedelta(seconds=30),
    dag=dag,
)

task1 = BashOperator(
    task_id='TASK_01',
    bash_command='lsasdfsdafal',
    dag=dag,
)

complete = DummyOperator(
    task_id='All_Tasks_Completed',
    dag=dag,
)

start >> previous >> task1 >> complete

위의 'start'는 ExternalTaskSensor로서, 외부의 다른 DAG(동일 DAG에도 접근 가능)에 접근해서 task의 상태를 확인할 수 있는 airflow의 sensor이다.

즉, 위의 방법은 start에서 자기 자신의 'complete' task가 success 상태인지 확인하는 방법이다.

위의 방법을 이용할 경우, 첫번째 수행의 경우(즉, 어떤 'complete' task도 존재 하지 않는 상황.)에 ExternalTaskSensor는 complete를 찾을 수 없어서 무한정 기다리게 된다. 즉, 첫실행의 경우 사람이 수동으로 UI를 통해서 해당 task를 mark as success로 표시해주어야 한다.

위의 방법 외에도 검색해보면, DAG의 첫번째 task에 다음의 설정을 걸어주면 9월이 실패하면 10월도 수행하지 않는다는 글도 있었다.


 - dependes_on_past=True
 - wait_for_downstream=True

위의 방법대로 하면 실제로 SQLite를 사용한다면, 9월이 실패했을 때, 10월달을 수집하지 않는다.
하지만 내가 airflow를 구동하는 환경은 postgreSQL을 사용하는 환경이였는지 몰라도, 9월달이 실패하고, 바로 10월달의수행을 시작하기 시작했다.

때문에 현재로서는 postgreSQL을 사용한다면, 위의 depens_on_past와 wait_for_downstream을 이용한 방법은 기대하는 효과를 얻지 못한다.
연속적으로 실행되지 않도록 하려면 일단은 ExternalTaskSensor를 이용한 위의 방법을 사용해야 한다.

'개발 툴 사용관련' 카테고리의 다른 글

MAC에서 IntelliJ 단축키  (0) 2022.06.24
유용한 명령어 정리  (0) 2021.06.10
Comments