[SparkSQL] SparkSQLOperator를 사용하는 Airflow 파이프라인 성능 개선기
안녕하세요 마개입니다.
SparkSQL을 사용하는 파이프라인에 성능 이슈가 있어서 이를 튜닝했던 과정을 공유합니다. 회사에서 진행한 것이기에 정확한 정보가 나오지는 않습니다.
튜닝을 하게 되는 배경
SparkSQLOperator 하나로 돌아가는 파이프라인이 한 번 실행될 때 약 4~5시간이 걸리는 파이프라인이 있었습니다. 해당 파이프라인이 4~5시간 동안 실행이 되는 것 자체도 문제고 그만큼 Spark 리소스를 소모하고 있는 것이기 때문에 다른 파이프라인에도 영향을 많이 끼쳤습니다. 그렇기에 해당 파이프라인에 대한 성능을 해결하는 것이 필수인 상황이 되었습니다.
현재 상황 파악
Spark를 튜닝할 때 가볍게 튜닝할 수 있는 부분은 메모리나 파티션 등의 설정을 수정해가며 테스트해보는 방법도 있지만 근본 원인을 해결하는 것이 중요하다고 생각해서 시간을 더 투자해서 접근하기로 했습니다. 그렇기에 현재 테이블에 대한 상황을 파악하려고 했습니다.
테이블에 대한 상황을 파악할 때 기본적으로 확인한 것은 데이터 사이즈, 파티션 여부, 파일 포맷 3가지로 접근했습니다.
첫 번째로는 데이터 사이즈
를 파악하고자 했습니다.
타겟 테이블을 생성하기 위한 소스 테이블은 총 5개가 있었습니다. 이 5개의 테이블 중에 가장 적은 데이터를 가진 테이블은 2700만건, 가장 많은 데이터를 가진 테이블은 155억건을 가지고 있을 정도로 건수가 엄청 많았습니다. (건수를 조사할 때는 테이블이 가진 전체 데이터를 기준으로 한 것이 아닌 쿼리 상에 있는 조건 기준으로 조사했습니다.) 기본적으로 엄청난 양의 데이터를 가져온다는 것을 알 수 있었습니다.
두 번째로는 테이블의 파티션
입니다.
이정도로 엄청난 사이즈를 가진 테이블인데 파티션이 없다면 재앙인 수준인 것이죠. 그래서 파티션이 있는지의 여부를 체크했고 있다면 쿼리에서 해당 파티션을 잘 사용하고 있는지 파악했습니다. (여기서 잘 사용하고 있는지는 테이블의 특성에 맞게 파티션을 잘 설정했는지, 추가해야할 파티션은 없는지 확인했습니다.) 다행히도 파티션은 제대로 설정되어 있고 잘 사용중인 것으로 파악했습니다.
세 번째로는 파일 포맷
입니다.
데이터를 프로세싱하기 위한 소스 테이블이나 타겟 테이블 모두 ORC
파일 포맷으로 되어 있는 것으로 파악했습니다. JSON이나 CSV와 같은 형태로 되어있지 않다는 것에 안도감을 느끼게 되었습니다.
쿼리 튜닝
쿼리 튜닝이라고 작성을 했지만 쿼리 플랜을 짜서 튜닝 포인트를 찾고 HINT를 변경하는 작업을 하기보다는 기본에 먼저 충실하려고 했습니다. 쿼리를 봤을 때 몇 가지 포인트를 찾은 것이 있었습니다.
첫 번째로는 불필요한 컬럼까지 추출한다는 것입니다.
ORC는 Columnar 기반 포맷으로 일반 RDB와는 다른 방식으로 데이터를 저장하기 때문에 전체 컬럼을 가져오는 것보다는 필요한 컬럼만 가져오는 것이 매우 효율적입니다. 불필요한 컬럼까지 가져오는 것은 Columnar 기반 포맷을 제대로 사용하지 못한 것이라고 생각했습니다.
두 번째로는 같은 기간으로 데이터를 가져오는 쿼리가 여러번 있었다는 것입니다.
위에서 이미 봤듯이 데이터가 어마어마하게 많은 테이블에서 데이터를 가져오는데 같은 기간으로 데이터를 가져오는 쿼리가 2번 이상 있었습니다. 그렇기에 해당 기간으로 데이터를 가져오는 쿼리는 한번 실행하여 캐시에 넣고 거기에서 데이터를 가져오도록 변경했습니다.
세 번째로는 DISTINCT의 부재입니다.
특정 쿼리에서 JOIN을 진행하는데 JOIN을 하는 Key 역할을 하는 ID가 중복이 많았습니다. 해당 ID가 실질적으로 DISTINCT했을 때는 13만건인데 DISTINCT 작업이 없어서 약 3천만건으로 JOIN을 진행하고 있었습니다. 그 말은 그만큼 더 많은 중복값으로 JOIN을 해서 몇 배의 데이터를 생성하고 있었다는 것이죠. 이를 DISTINCT를 적용하여 JOIN해야 하는 건수를 줄여서 진행했습니다.
파일 포맷 변경
현재 소스 테이블과 타겟 테이블은 모두 ORC 파일 포맷을 사용하고 있었습니다. ORC
파일 포맷은 Apache Hive에서 개발된 파일 포맷으로 Hive에 특화되어 있다고 볼 수 있습니다. 해당 파이프라인은 기존에 HiveOperator를 이용했었는데 올해에 SparkSQLOperator로 변경하게된 파이프라인이었습니다. 그렇기에 이전에는 ORC 파일 포맷을 이용하는 것이 맞았지만 현재는 SparkSQL을 이용하기 때문에 ORC가 최적의 파일 포맷은 아니라고 생각했습니다.
그렇기에 Parquet
로 변경하는 작업을 진행했습니다. Parquet
는 Spark나 Hadoop 등의 다양한 환경에서 호환이 좋은 파일 포맷으로 ORC와 같은 Columnar 파일 포맷이지만 현재 Spark를 사용하는 상황에서는 더 좋을 것이라고 판단했습니다. 이를 적용하기 위해서 소스 테이블과 타겟 테이블 5개 테이블에 대해 몇 백억건이 되는 데이터들을 모두 Parquet로 변경하는 작업을 진행했습니다. 소요시간은 며칠이 걸릴 정도로 오랜 시간이 걸렸습니다.
추가적으로 파일 포맷뿐만 아니라 압축 방식도 변경했습니다. 기존에는 SNAPPY 방식으로 압축을 하고 있었는데 그보다 더 압축률이 좋은 GZIP
방식으로 압축을 변경했습니다.
Parquet 포맷으로 변경하고 압축도 적용하느라 많은 시간이 걸렸지만 이번 성능 튜닝에서 가장 큰 효과를 본 부분입니다.
Spark 설정 튜닝
위에서 말씀드렸던 것처럼 HiveQL에서 SparkSQL로 이관했던 작업이 있었는데 HiveQL일 때 사용하던 설정 값들이 있어 해당 설정들은 제거하고 spark.dynamicAllocation.enabled=true
설정을 추가했습니다. 자원을 동적으로 할당하여 리소스를 효율적으로 사용하고 작업을 향상할 수 있도록 설정했습니다.
도메인 비즈니스 분석
도메인 비즈니스적인 요소로는 모든 것이 상황에 따라 다르겠지만 이번 케이스에서는 소스 테이블로부터 데이터를 가져오는 기간이었습니다. 저만큼이나 많은 기간의 데이터들이 필요할까?라는 생각이 들었고 실제로는 어느정도 필요할지 파악해서 수정하는 작업을 진행했습니다.
결과
위와 같은 과정을 거쳐서 운영에 적용했고 효과를 본 부분은 다음과 같습니다.
첫 번째로는 성능
입니다. 기존에 4~5시간이나 걸리던 프로세스를 약 10~20분까지 줄이면서 성능을 해결할 수 있었습니다.
두 번째로는 데이터 사이즈
입니다. Parquet를 사용하고 GZIP으로 압축을 진행하면서 기존에 파티션 하나당 약 2.8GB 정도의 데이터를 약 2GB로 30%정도로 파일의 크기를 줄일 수 있었습니다. S3에 데이터를 저장하고 있었기에 이는 S3 비용도 줄일 수 있게 되었다고 볼 수 있었죠.