2 분 소요

 안녕하세요 마개입니다.
Airflow를 이용하는데 있어 Task 간에 데이터를 주고 받는 방법에는 여러 방법이 있는데 기본적으로는 XCom이라는 기능을 제공합니다. 이 XCom에 대해서 Custom해보는 과정을 알아봅니다.


image



기본 XCom의 한계

 Airflow에는 Task 간의 데이터 전달을 위해 XCom이라는 기능을 제공하는데 이 XCom은 Metadata Database에 저장하고 가져오는 방식이기 때문에 데이터를 저장하는데 한계가 있습니다. 각 Database 종류에 따라 허용되는 사이즈는 아래와 같습니다.


DB Size
SQLite 2 GB
Postgres 1 GB
MySQL 64 KB


 각 XCom 데이터는 SQLAlchemy가 제공하는 LargeBinary 타입의 컬럼에 저장되는데 이는 MySQL에서는 BLOB 타입, PostgreSQL에서는 BYTEA 타입에 해당되기 때문에 용량도 각각 다르게 됩니다.
 이러한 데이터 저장 용량의 한계 때문에 Custom XCom Backend를 개발하였습니다. Custom XCom에서는 각 클라우드의 스토리지를 이용할 수 있는데 AWS의 S3를 이용해서 저장 용량의 한계가 없도록 진행했습니다.
 Custom XCom Backend를 만들기 위해서는 BaseXCom을 상속받아 진행해야 합니다.



XCom 메커니즘

 진행하기 전에 앞서 XCom을 이용해서 데이터를 주고 받을 때는 직렬화역직렬화의 과정을 거칩니다. 둘다 BaseXCom 클래스를 이용하는데 데이터를 저장(push)할 때는 serialize_value 메서드를 이용하고 가져올(pull) 때는 deserialize_value 메서드를 이용합니다.
 Custom XCom을 작성할 때에는 이 부분을 오버라이딩해서 Metadata Database가 아닌 원하는 Storage(local, AWS, GCP, Azure)에 저장하는 로직으로 변경하면 됩니다.



Custom XCom 코드 작성

디렉터리 구조

image

 Custom으로 제작하는 소스들의 경우 전부 plugins 디렉터리에 넣어둡니다. plugins 안에는 custom, include가 있는데 custom에는 custom operator와 hook이 있고 include 디렉터리 안에 Custom XCom Backend를 작성했습니다. (파일명 : aws_xcom_backend.py)


코드

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# include/aws_xcom_backend.py
from airflow.models.xcom import BaseXCom
from airflow.providers.amazon.aws.hooks.s3 import S3Hook

from typing import Any
import common.constant.Global as const
import pandas as pd
import uuid
import os
import json


class CustomXComBackendS3(BaseXCom):
    PREFIX = "xcom_s3://"
    BUCKET_NAME = "airflow_xcom_test"
    AWS_CONN_ID = "aws_conn_default"

    @staticmethod
    def serialize_value(
        value: Any,
        key: str | None = None,
        task_id: str | None = None,
        dag_id: str | None = None,
        run_id: str | None = None,
        map_index: str | None = None,
        **kwargs
    ):

        hook = S3Hook(aws_conn_id=CustomXComBackendS3.AWS_CONN_ID)

        if isinstance(value, pd.DataFrame):
            filename = f"data_{str(uuid.uuid4())}.parquet"
            s3_key = f"temp/airflow/{dag_id}/{run_id}/{task_id}/{filename}"

            value.to_parquet(filename, compression="gzip")
        else:
            filename = f"data_{str(uuid.uuid4())}.json"
            s3_key = f"temp/airflow/{dag_id}/{run_id}/{task_id}/{filename}"

            with open(filename, "a+") as f:
                json.dump(value, f)

        hook.load_file(
            filename=filename,
            key=s3_key,
            bucket_name=CustomXComBackendS3.BUCKET_NAME,
            replace=True
        )

        os.remove(filename)

        reference_string = CustomXComBackendS3.PREFIX + s3_key

        return BaseXCom.serialize_value(value=reference_string)

    @staticmethod
    def deserialize_value(result):
        result = BaseXCom.deserialize_value(result)

        hook = S3Hook(aws_conn_id=CustomXComBackendS3.AWS_CONN_ID)
        key = result.replace(CustomXComBackendS3.PREFIX, "")

        filename = hook.download_file(
            key=key,
            bucket_name=CustomXComBackendS3.BUCKET_NAME,
            local_path="/tmp"
        )

        if key.split(".")[-1] == "parquet":
            output = pd.read_parquet(filename)
        else:
            with open(filename, "r") as f:
                output = json.load(f)

        os.remove(filename)

        return output

 코드 자체는 간단합니다. xcom_push 를 통해 데이터를 저장할 때는 serialize_value를 이용하고 xcom_pull로 데이터를 불러올 때는 deserialize_value를 이용하게 됩니다. 그래서 이 두 부분을 변경하면 됩니다.
 XCom으로 데이터를 주고받을 때 사용하는 오브젝트는 여러 가지가 있지만 Pandas의 DataFrame을 이용하면 parquet형태로 저장하고 그 외에는 json 형태로 저장한 후 S3Hook을 이용해서 데이터를 load하고 download하는 방식입니다.


설정 변경

위의 코드를 개발했다면 이를 Airflow설정에 지정하여 기본 XCom으로 사용되도록 해야 합니다.

1
2
3
4
5
6
# airflow.cfg
[core]
xcom_backend=include.aws_xcom_backend.CustomXComBackendS3

# 환경변수
AIRFLOW__CORE__XCOM_BACKEND=include.aws_xcom_backend.CustomXComBackendS3

설정에는 위와 같은 방법으로 지정하면 되는데 xcom_backend, AIRFLOW__CORE__XCOM_BACKEND에 우리가 위에서 만들었던 패키지명을 지정하면 됩니다.


재시작

설정이 완료되면 Airflow를 재시작합니다. 재시작하고 적용이 잘 되었는지는 파이썬 코드를 이용해서 확인합니다.


1
2
3
from airflow.models.xcom import XCom

print(XCom.__name__)

image

위와 같이 제대로 나온다면 적용 완료되었습니다.


사용

사용하는 방법은 특별하게 다른게 아니라 기존에 사용하던 XCom 사용 방법으로 하면 됩니다.


데이터 확인

XCom을 사용하면 데이터는 S3에서 확인할 수 있습니다.

  • 버킷명 : airflow_xcom_test
  • Key : temp/airflow/[DAG ID]/[Run ID]/[Task ID]/data_해시값
  • 파일 확장자 : .parquet 또는 .json