4 분 소요

오늘은 Airflow의 DAG Task를 생성하는 방법에 대해서 정리해보려 한다. 기본적으로 DAG을 어떻게 생성하는지와 Operator 의 종류 및 구성에 대해서 살펴보자.


DAG 생성하기

DAG을 생성하는 위치는 airflow init 명령어를 수행하면 생기는 airflow.cfg에 기록되어 있다. [core] - dags_folder 의 경로가 dag이 위치해야 하는 경로이다. 해당 Directory에 python 파일을 생성하면 dag이 추가되는 형태이다. 생성하고 싶은 단어를 입력해서 dag file 하나를 만들어보자.

DAG은 아래와 같이 airflow Module에서 가져올 수 있다.

DAG 생성 예시

  • airflow module에서 DAG 호출.
  • datetime.datetime을 사용해 날짜를 다룰 수 있다.
  • default_args : dict 자료형으로 기본 인자를 설정할 수 있다. DAG안에 있는 Operator에 설정할 공통 속성에 대해서 이곳에 정의한다. 그렇기에 BaseOperator에 있는 값을 default_args 에 전달할 수 있다.
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    
    from airflow import DAG
    from datetime import datetime, timedelta
    
    default_args = {
        'start_date' : datetime(2022, 1, 28),
        'email' : ['shin12272014@gmail.com'],
        'email_on_failure' : True,
        'owner' : 'admin'
    }
    
    with DAG( dag_id = 'tutorial' , 
              default_args = default_args, 
              description = 'A simple tutorial DAG',
              schedule_interval = timedelta(days=1),
              start_date = datetime(2022, 1, 28 ),
              tags = ['example'] 
            ) as dag :
    

Operator 종류

BashOperator

1
2
3
  t1 = BashOperator ( 
          task_id = 'sleep_date', 
          bash_command = 'echo 1' )

PythonOperator

Python Function Operator Task

  • Python Fucntion을 하나의 Task로 만들 수 있다.
  • PythonOperator도 존재하지만 Airflow 공식 Document에서는 Task Decorator를 이용한 방법이 소개 됨.
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    
    from airflow import DAG
    from airflow.decorators import task
    
    from datetime import datetime, timedelta
    from pprint import pprint
    
    default_args = {
        'start_date' : datetime(2022,1,30),
        'owner' : 'hsshin'
    }
    
    with DAG( dag_id = 'our_first_dag', default_args = default_args , schedule_interval = timedelta(days=1)) as dag:
        @task(task_id="print_the_context")
        def print_context(ds=None, **kwargs):
            print(ds)
            return 'Whatever you return gets printed in the logs'
    
        run_this = print_context(ds = 'abc'  )
    

virtualenv Python Operator Task

  • virtualenv 환경에서도 실행되는 operator가 제공된다. 아래와 같이 코드를 입력하면 virtualenv 환경이 제공된다.
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    
    from airflow import DAG
    from airflow.decorators import task
    
    import shutil
    from datetime import datetime, timedelta
    from pprint import pprint
    
    default_args = {
        'start_date' : datetime(2022,1,30),
        'owner' : 'hsshin'
    }
    
    with DAG( dag_id = 'our_first_dag', default_args = default_args , schedule_interval = timedelta(days=1)) as dag:
        if not shutil.which("virtualenv"):
            pass
        else :
            @task.virtualenv(
                task_id = "virtualenv_python", requirements=["colorama==0.4.0"], system_site_packages = False
            )
            def callable_virtualenv():
                from time import sleep
                from colorama import Back, Fore
                print(Fore.RED + 'some red text')
                print(Back.GREEN + 'and with a green background')
                print('Finished')
              
            virtualenv_task = callable_virtualenv()
    

    alt

BranchDateTimeOperator

  • 시간에 따라 실행해야할 Task를 분기별로 설정하고 싶다면 이 Operator를 사용한다.
    • ex) Operator들을 생성하고 22:00:00 ~ 23:15:00 경에는 Operator1을 실행하고 이외의 시간에는 다른 Operator를 실행하고 싶을 경우 사용.
      • target_upper : 23:15:00
      • target_lower : 22:00:00
    • 시간대를 설정할때 datetime.datetime을 통해 년월시간대 별로 조건 추가 가능.
    • datetime.time을 통해 매일 특정 시간에 실행하도록 조건 추가 가능.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
from airflow import DAG
from datetime import datetime, timedelta, time
from airflow.operators.datetime import BranchDateTimeOperator
from airflow.operators.bash import BashOperator

default_args = {
    'start_date' : datetime( 2022, 2, 2 ) ,
    'owner' : 'hsshin', 
}
with DAG( dag_id = 'tutorial_2', 
          default_args = default_args,
          tags = ['tutorial'],
          schedule_interval = timedelta(days=1)
          ) as dag :
    
        t1 = BashOperator ( 
          task_id = 'date_in_range', 
          bash_command = 'echo "my name is t1"' 
        )

        t2 = BashOperator ( 
          task_id = 'date_outside_range', 
          bash_command = 'echo "my name is t2"' 
        )

        cond1 = BranchDateTimeOperator ( 
          task_id = 'datetime_branch',
          follow_task_ids_if_true=['date_in_range'],
          follow_task_ids_if_false=['date_outside_range'],
          target_upper=datetime(2022,2,2,23,4,0),
          target_lower=datetime(2022,2,2,23,0,0)

        )

        cond1 >> [t1, t2]

SparkSubmitOperator

  • spark-submit을 task로 만들어주는 operator.

[설치 방법]

  1. pip install apache-airflow-providers-apache-spark 로 설치
  2. airflow webserver - Admin - connections (ex. http://localhost:8080/connection/list/ ) 에서 connection 생성.
    1. Connection Type = Spark
    2. Host = local ( test용 )
  3. 이후 아래의 코드와 같이 SparkSubmitOperator를 작성할 수 있음.
    1. SparkSubmitOperator Document 참조
1
2
3
4
5
6
7
8
9
10
11
12
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator

crawler_path = "/home/hsshin/scrap_result"
conv_respath = "/home/hsshin/tmp"
conv_filename = datetime.now().strftime("%Y%m%d_%H%M%S.tmp")

t3 = SparkSubmitOperator(
    application = "/home/hsshin/airflow/dags/NewsStatistics/converter.py",
    task_id = "spark_news_conv",
    conn_id = "spark_default", 
    application_args = [crawler_path, "csv", conv_respath, conv_filename]
)

S3ToRedshiftOperator

  • AWS S3 -> RedShift로 Copy를 진행해주는 Operator

[설치 방법]

  1. pip install apache-airflow-providers-amazon 을 통해 amazon-provider 설치
  2. redshift_default connection 을 설정해준다. alt
  3. 이후 아래 코드와 같이 S3ToRedShiftOperator를 작성한다.
    1. S3ToRedshiftOperator Document
    2. connection_id는 별도로 지정하지 않으면 redshift_default로 설정됨.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
        from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator
        from os import getenv

        S3_BUCKET = getenv("S3_BUCKET", "recommand.news.hsshin")
        S3_KEY = getenv("S3_KEY", key )
        REDSHIFT_TABLE = getenv("REDSHIFT_TABLE", "newsitems")

        task_transfer_s3_to_redshift = S3ToRedshiftOperator(
            s3_bucket=S3_BUCKET,
            s3_key=S3_KEY,
            schema="PUBLIC",
            table=REDSHIFT_TABLE,
            copy_options=['parquet'],
            task_id='transfer_s3_to_redshift',
        )

댓글남기기