지난 24년도 11월에 열렸던 DAN 24에서 가장 인상깊게 들었던 세션인 "CQUERYHUB : DATA WAREHOUSE입니다. 근데 이제 FLINK와 ICEBERG를 곁들인" 에 대해 자세히 살펴보고자 한다.
DAN 24
팀네이버 컨퍼런스 DAN 24는 네이버의 비즈니스 전략과 기술, 크리에이티브, 그리고 다양한 경험의 전문성을 유기적으로 연결하여 네이버가 앞으로 만들어나갈 비즈니스, 서비스의 변화 방향을
dan.naver.com
CQueryHub 는 사내 로그 데이터를 저장하기 위한 data warehouse 명칭으로,
왜 로그 데이터를 저장하기 위한 data warehouse 구조를 변경하게 됐는지와 그 작업의 여정을 함께할 수 있는 세션이었다.
변경하면서 어떤 라이브러리를 선택하셨는지와 여러 트러블슈팅 내용을 간접적으로 경험할 수 있었는데..!
그 간접 경험기를 오래 간직하고자 글로 정리해보았다.
(하단 첨부 사진 출처: DAN24 해당 세션 발표자료)
범용적인 Log Data Warehouse의 필요성
네이버에서의 다양한 서비스는 각각 수많은 로그를 생성해낸다.
이러한 로그를 저장하기 위해 각각 데이터 웨어하우스를 만들게 되면 많은 리소스가 낭비되고, 혹시나 다른 서비스 간 데이터를 같이 join해서 확인해야 하면 어려움을 겪게 될 것이다.
이렇게 한 서비스에서 데이터를 독립적으로 관리하게 되면 다른 부서에 공유되지 않는 고립된 상태의 '데이터 사일로(Data Silo)' 문제가 발생하게 된다.
이를 해결하기 위해 CQueryHub는 범용적인 로그 데이터 웨어하우스를 구축하고자 했다.
기존에도 로그 데이터 웨어하우스가 존재했지만, HBase 기반으로 custom하게 운영되고 있어서 아래와 같은 장단점을 가지고 있었다.
장점 | 단점 |
오랜 운영을 통한 높은 안정성 | 다른 쿼리 엔진에 붙여서 사용하기 어려움 |
높은 스키마 자유도 | 느린 조회 속도 (random access의 강점을 가진 HBase와 <-> 불변적이며 시간 범위로 조회하는 로그의 특성이 맞지 않음) |
이에 따라 좀 더 범용적인 Log Data Warehouse가 필요했으며, 아래 7가지 목표를 기준으로 CQueryHub 개편이 진행되었다.
- 다양한 형태의 로그 데이터 모두 수용할 수 있도록
- 빠른 조회 속도
- 실시간 조회 가능하도록
- 기존 운영중인 시스템들과의 호환성 유지
- 모든 시스템 무중단으로 운영
- 데이터셋 간 의존성 제거 (다른 데이터셋에 영향 주지 않기 위해)
- 운영 관리 자동화 (간편하게 관리할 수 있어야 함)
CQueryHub Log Data Warehouse의 아키텍쳐
우선 아키텍쳐를 구성하기 위하여 어떤 라이브러리를 활용하여 데이터를 저장할 것인지와 유통할 것인지에 대한 선택이 필요했다.
먼저 데이터를 저장하기 위해서 테이블 형식의 라이브러리를 아래 3가지 중에 선택하고자 했다.
Hudi는 primary key와 auto compaction 기능이 있어서 데이터 관리 효율 측면에서는 큰 장점이었지만,
내부적으로 테스트를 진행해봤을 때 데이터 양이 많아지니(ex: 1만 QPS) auto compaction에서 timeout 문제가 발생해서 사용할 수 없었다.
이에 비해 Iceberg는 많은 로그가 인입되어도 안정적으로 동작했고,
다양한 플랫폼과 쿼리 엔진을 지원하고 있어서 운영중이던 시스템과 호환성을 위해 좋은 라이브러리로 판단되었다.
(Delta Lake는 Databricks 제품을 사용할 때 비로소 빛을 발하는 친구라.. 다음을 기약하게 되었다)
스트리밍 라이브러리는 아래 3가지 중에서 API가 사용하기에 간결하면서 다양한 기능을 제공하는 것을 중점적으로 고려했다.
Storm은 API가 복잡해서 아쉽지만 후보 군중에 제외되었으며,
Spark, Flink 중 해당 팀에서 운영했던 경험이 있는 Storm과 유사하게 동작하는 Flink를 더 익숙하게 사용하고자 선택하게 되었다고 한다.
이에 따른 전체 구조도는 다음과 같다.
한꺼번에 들어오는 로그를 데이터셋별로 분리해주는 Classifier와 데이터를 최종적으로 저장해주는 Writer는 Flink application로 구성되어 있으며, 두 application 사이에 kafka가 완충 역할을 해주고 있다.
각 flink cluster는 kubernetes 기반으로 운영되고 있으며, 이에 대한 관리를 위해 control center와 hive metastore가 존재한다.
각 세부 아키텍쳐를 살펴보자면 다음과 같다.
Classifier Architecture
먼저 Classifier에서는 서비스 구분 없이 들어오는 로그들을 데이터셋별로 개별 kafka topic으로 분리하여 저장하면서 데이터셋 간의 의존성을 제거한다.
최종적으로는 하나의 iceberg 테이블로 저장되지만, 각각 분리되어 운영되기 때문에 특정 데이터셋에 대한 파이프라인에 문제가 생겨도 다른 데이터셋에는 영향을 주지 않게 된다.
또한 각 데이터셋마다 필요한 특수 설정에 대해서도 개별 관리가 가능해 관리 효율성이 증가한다.
예를 들어 로그가 인입되다보면 굳이 분석이 필요하지 않은 로그도 같이 들어올 수 있는데,
각 서비스마다 로그 전체를 수집하기 위해 필터링하지 않고 모두 갖고 있을지, 또는 정말 필요한 로그만 가지고 있을지 선택하고자 하는 방향이 다를 수 있다.
그리고 이런 옵션 선택은 사용자 요청에 따라 동적으로 변할 수 있는데,
이는 flink에서 제공되는 broadcast stream 기능을 통해 control center를 지속적으로 polling 해서 필터링 규칙을 지속적으로 업데이트해줄 수 있다. 이 덕분에 무중단으로 운영할 수 있게 되는 것이다!
Writer Architecture
Writer는 각 데이터셋마다 개별 application으로 실행되며,
문자열 형태로 전송된 로그 데이터를 파싱해서 iceberg 테이블 스키마에 맞게 변환하고 저장하게 된다.
(실제 데이터를 적재하는 CQueryHub의 핵심 과정이라고 볼 수 있다!)
내부적으로는..
- Deduplicator 에서 일정 시간 내 중복된 로그 데이터를 제거하고
- Parser & Converter 에서 문자열 형태 로그를 파싱해서 iceberg 테이블에 맞게 맵핑하고
- Writer & Committer 에서 맵핑된 데이터를 파일 시스템에 쓰고 commit을 수행한다.
각 과정에 대한 세부 트러블 슈팅 내용은 아래쪽에서 하나씩 살펴보게 될 예정이다.
Control Center
마지막으로 Control Center에서는 두 주요 flink application (classifier / writer) 와 다른 컴포넌트 등 CQueryHub의 모든 리소스를 관리한다.
파이프라인에 대한 생성, 삭제, 수정 및 테이블 스키마 관리 등 자동으로 처리해주고, 사용자 안내 및 시스템 조절 등도 자동으로 수행해준다.
이제부터는 각 단계별로 세부적으로 살펴본 내용과 트러블슈팅 과정에 대해 좀 더 깊이 있게 알아보고자 한다.
Flink Cluster
Classifier와 Writer를 구성하고 있는 Flink의 cluster는 2가지 컴포넌트(매니저)로 구성되어 있다.
- JobManager
- 모든 작업을 관리하며, 클러스터마다 최소 하나가 필요하다.
- HA(High Availability)가 구성되지 않은 채로 job manager가 다운되면, 실행중이던 작업 정보는 모두 잃게 된다.
- TaskManager
- 실제 작업을 수행하는 컴포넌트이다.
- 각 task manager마다 n개의 task slot이 존재하며, 각 task slot은 1개의 task를 할당받아 수행한다.
예를 들어 flink application 하나가 제출되면,
JobManager가 작업을 여러 task로 분리하고 → 여러 TaskManager의 task slot에 랜덤으로 배정하며,
어떤 TaskManager의 task slot에 저장되었는지에 대한 정보는 JobManager에 등록하게 된다.
이를 통해 하나의 cluster에 여러 작업을 제출할 수 있게 되며, 배정할 task slot이 부족해지게 되면 작업을 수행할 수 없게 된다.
이 cluster를 구성하는 방법에는 아래와 같이 2가지가 있다.
- Session mode
- 하나의 클러스터에 여러 애플리케이션을 띄워, 리소스를 공유하면서 효율적으로 사용할 수 있다.
- 다만 작업 간 영향도가 존재하고, 클러스터 내 애플리케이션 사이에 리소스 경쟁이 존재하기도 한다.
- 따라서 제출된 애플리케이션이 많으면 JobManager에 부하가 발생하게 된다.
- Application mode
- 하나의 애플리케이션을 하나의 클러스터로 분리해서, 제출된 작업이 하나의 클러스터를 독점하여 리소스를 분리하는 방식이다.
- JobManager가 하나의 애플리케이션 만을 관리해서 부하가 적다.
- 다만 각 cluster마다 JobManager, TaskManager가 별도로 생성되어 불필요한 리소스 낭비가 될 수 있다.
CQueryHub에서는 데이터별로 writer application이 생성되어 이를 수행할 Flink cluster를 어떻게 구성할지 고민이 필요했다.
CQueryHub가 운영되는 사내 데이터 플랫폼이 프로젝트 단위로 운영된다는 점을 고려하여 프로젝트 단위로 session mode를 택하여 관리하게 되었다.
이를 통해 프로젝트 간 flink cluster 리소스를 분리시켜 다른 프로젝트에 영향이 없도록 하면서 불필요한 리소스 낭비는 최소화할 수 있었다.
또한 각 프로젝트별 사용하는 리소스를 계산할 때 해당 flink cluster의 사용량만 계산하면 되어 간편하게 리소스 사용량을 구할 수 있다는 장점도 있었다.
Deduplicator - Data Integrity 보장하기
지금까지는 전반적인 flink 관련 설정에 대해 알아봤다면,
이제부터는 writer에서 세부 단계별로 각 어려운 부분을 어떻게 해결해 나갔는지 살펴보고자 한다.
우선 데이터의 정확성, 일관성 및 신뢰성을 보장하기 위해 중복된 로그 데이터를 어떻게 처리했는지 살펴보려고 한다.
현재 CQueryHub에서 로그 collector는 at-least-once 내결함성으로 운영중이다.
따라서 누락은 없어도 중복은 언제든지 발생할 수 있는데, CQueryHub에서는 primary key 기능을 사용하지 않고 있어서 자동으로 중복 제거가 어려웠다.
따라서 별도 중복 로그 처리 기법이 필요하여 deduplicator 과정을 추가하게 되었다.
deduplicator 과정은 로그 데이터의 unique key를 기반으로 진행된다.
writer에서 처리한 unique key를 캐시에 저장해두고, 이후 들어오는 데이터의 unique key가 캐시에 있는지 확인하여
동일한 unique key가 없다면 해당 로그를 무시하고 제거하는 방향으로 진행되었다.
deduplicator는 여러 task에서 동시에 실행되지만 캐시는 각 task별로 존재했기 때문에
같은 unique key를 가지는 로그 데이터는 같은 task로 전송되어야 했다.
이는 flink에서 지원하는 KeyBy 기능을 통해 같은 unique key를 가지는 데이터는 항상 동일한 deduplicator로 전송되도록 처리되었다.
다만 여기에서의 캐시도 결국 메모리에 저장되고, 데이터가 너무 많아지면 OOM(Out Of Memory) 문제가 발생한다.
따라서 제한된 기간 내의 로그만 캐시에 저장되도록 처리가 필요한데, 이 때 중복된 로그가 제한된 기간 이후에 들어올 수도 있다.
이는 iceberg 테이블을 주기적으로 검사해서 중복된 로그 데이터를 자동으로 제거하는 airflow DAG를 통해 처리하게 되었다.
Parser & Converter - Data Cleaning 처리 방법
다음으로 로그를 포맷에 맞게 파싱하고 iceberg 테이블 스키마에 맞게 맵핑하는 작업 중 오류 처리 방법에 대해 살펴보고자 한다.
로그는 단순 문자열로 전송되는데, 이를 포맷에 맞게 파싱하는 작업을 위해 java 객체로 저장된다.
그러나 iceberg 테이블에서는 java 객체 형태를 그대로 받을 수 없고, struct datatype의 데이터만 쓸 수 있다.
이를 위해 java 객체 struct 형태의 데이터로 변환하는 ORM(Object-Relational Mapping) 기능 개발이 필요했다.
이는 iceberg 테이블 스키마를 불러와서 struct 데이터를 상속한 StructRowData 로 변환하는 방식으로 진행되었다.
ORM 과정에서 불러온 StructRowData를 java 객체에서 attribute와 맵핑되는 필드를 찾고, 해당 필드에 값을 직접 넣는 방식으로 수행되었다.
이 때 데이터 맵핑이 정상적으로 되지 않으면 누락이 발생할 수 있다.
이를 막기 위해 인입된 데이터가 iceberg 테이블 스키마에 맞게 맵핑될 수 있도록 데이터를 보정하는 작업이 필요했고,
아래와 같이 case insensitive하게 변경해주거나 사용 가능한 특수문자로 변경하는 작업 등이 진행되었다.
추가로 iceberg 스키마에 맞춰 type conversion을 진행하며 데이터 값이 변경되지 않는 범위 내에서 강제로 타입을 변경하면서 잘못된 타입을 보정하고,
사용자로부터 잘못된 로그 데이터가 들어와 ORM 처리과정 중 오류가 발생할 경우 일부 필드를 null로 저장하면서 자동으로 리포팅이 될 수 있도록 처리되고 있다고 한다.
Writer & Committer - Data Loading
마지막으로 데이터를 저장하고 커밋하는 과정에서 발생한 문제 해결 과정에 대해 살펴보고자 한다.
실제 데이터 로딩은 writer와 committer에서 수행되며,
HDFS의 iceberg로 구성된 데이터 웨어하우스에 데이터 파일을 쓰고 커밋을 수행하는 트랜잭션을 완료하는 작업이다.
commit 작업은 flink의 checkpoint 시점에 발생하게 되며,
체크포인트에 현재까지의 state를 저장하며 이전까지 처리한 데이터의 처리 완료를 보장해주게 된다.
아래와 같이 hive metastore의 metadata 파일은
해당하는 metadata를 포인터로 metadata → manifest list → manifest file → 실제 data file을 가리키는 형식으로 되어있으며,
flink의 checkpoint가 수행될 때 해당 포인터를 변경해주는 방식으로 진행이 된다.
이 때 포인터는 변경되지만 기존에 가지고 있는 metadata 관련 기존 파일은 그대로 남아있게 된다.
이에 따라 파일 개수가 과도하게 많아지게 된다.
(예를 들어 checkpoint가 1분 주기이고 flink 애플리케이션 병렬 처리 수가 10개라면, 하루(1440분)에 총 14,400개 파일이 생성된다)
이는 https://heehehe-ds.tistory.com/223 에서 다루었던 hadoop hdfs의 name quota 문제를 발생시키게 된다.
이 문제를 해결하고자 CQueryHub 팀에서는 spark sql을 통해 주기적인 compaction 기능을 수행하여 데이터 파일 개수를 조절했으며,
테이블 데이터에 대한 유지기간을 사용자가 직접 설정하여 유지기간이 넘는 데이터는 배치 작업을 통해 자동으로 삭제되도록 처리했다.
추가로 iceberg의 table properties에서 최대 metadata 파일 수 및 snapshot 만료 기간 등 설정할 수 있어서 이를 통해 metadata 파일 개수를 관리할 수 있다.
다만 기간을 너무 짧게 가져가면 사용자가 조회하고 있던 데이터 가리키는 snapshot이 삭제될 수 있는데,
이는 snapshot 최소 개수를 설정하여 에러를 방지할 수 있다.
이렇게 범용적인 로그 데이터 웨어하우스 CQueryHub를 구축하여 2024년 6월에 사내 서비스로 정식 오픈하였으며,
수백개의 프로젝트에서 수백개의 테이블을 생성하여 지금도 1초당 최대 수십만개의 로그 데이터가 처리되고 있다고 한다.
또한 로그 양이 갑자기 증가될 때 리소스를 자동으로 증가시켜 지연을 최소화할 수 있는 auto traffic control 기능과
각 데이터별로 OLAP 워크로드를 최적화시키기 위한 커스터마이징 기능이 개발될 예정이라고 한다.
배치성 작업을 주로 다루는 입장에서 실시간의 끝판왕이라고 생각하는 로그 데이터를 어떻게 관리하는지 궁금했었는데,
해당 세션 덕분에 궁금증을 어느 정도 흥미롭게 해소할 수 있었다.
특히 metadata 관리 시 파일 개수가 많아져 quota를 초과하는 문제는 이전 글에서 다뤘던 것처럼 비슷한 문제를 겪기도 해서 해당 세션을 더욱 관심가지고 살펴볼 수 있었다.
나에게도 실제로 이렇게 액티브한 데이터를 다뤄볼 수 있는 날이 오길 바래본다..!! 😁
'DATA SCIENCE > DATA ENGINEERING' 카테고리의 다른 글
[Hadoop] HDFS Quota 살펴보기 (HDFS에 파일 개수 제한이 존재하는 이유) (0) | 2025.03.07 |
---|---|
[Hadoop] Hive란? (Hive 아키텍쳐 및 데이터모델 / HiveQL / SQL on Hadoop) (0) | 2025.03.01 |
[Hadoop] Spark RDD와 DataFrame, 그리고 Dataset (0) | 2025.02.02 |
[udemy - Apache Spark와 Python으로 빅데이터 다루기] Spark란? (1) | 2024.04.14 |
[DE] 하둡 없이 맵리듀스를?! Local MapReduce 오픈소스 파헤치기 (0) | 2023.06.17 |