본문 바로가기

DATA SCIENCE/DATA ENGINEERING

[Spark] RDD와 DataFrame, 그리고 Dataset

최근 업무에서 테이블 형식의 데이터를 정제하는 과정에서 spark를 활용해보기 시작했다.
예전에 한창 데이터 분석을 배울 때 많이 사용하던 pandas와 유사하게 접근할 수 있는 dataframe으로 많이 작업을 진행하게 됐는데,
특정 열의 목록을 list 형식으로 추출하는 등의 작업에서는 RDD로만 작업이 가능한 케이스가 존재했다.

그래서 이번 글에서는 RDD와 DataFrame, 그리고 DataFrame과 같이 종종 등장하는 개념인 DataSet은
각각 무엇인지 어떨 때 활용하면 좋은 것인지 살펴본 뒤,
DataFrame을 많이 활용하는 추세임에도 불구하고 여전히 RDD를 같이 활용되고 있는 이유에 대해 알아보고자 한다.

 

RDD(Resilient Distributed Datasets)

RDD를 용어 그대로 번역기에 돌려보면 '탄력적인 분산 데이터셋'이라고 나온다.
말 그대로 다양한 데이터셋을 탄력적으로 분산 처리하기 위해 추상화한 것으로 볼 수 있는데,
쉽게 말해 분할된 데이터셋 모음이며, 각 분할된 데이터셋이 분산되어 처리되는 형식이다.

RDD가 등장하게 된 배경은 기존에 존재하던 MapReduce의 한계점을 개선하기 위해서였다.
MapReduce 작업은 데이터를 변형하기 위해 mapper와 reducer를 사용해야 했으며,
작업이 디스크 기반으로 이루어지기 때문에 반복 작업을 수행할 때는 비효율적인 측면이 존재했다.
이러한 부분을 해결하기 위하여 in-memory 기반의 RDD가 등장하게 된 것이며, 
mapper와 reducer로 짜야 해서 제한적인 기능만 있던 기존과 다르게 다양한 API 기능(map, filter, ...)을 보다 쉽게 이용할 수 있도록 제공하고 있다.

그렇다면 spark에서는 RDD를 어떻게 사용할 수 있을까?
처리되는 방식은 간단하게 (1)RDD 객체를 만들어서 → (2)데이터를 불러온 뒤에 → (3)분산 작업으로 데이터 처리 과정을 진행하는 순서로 이루어진다.
RDD는 spark가 처음 출시되었던 v1에서부터 제공된 API로, 간단히는 아래와 같이 이용할 수 있다.

from pyspark import SparkContext
sc = SparkContext("local", "RDD Example")

lines = sc.textFile("data.txt")

# 1. 파일의 총 줄 수 계산
lineCount = lines.count()

# 2. 각 줄을 단어로 분리하여, 모든 단어가 하나의 RDD에 담기도록 변환
words = lines.flatMap(lambda line: line.split())

# 3. 각 단어를 (단어, 1) 형태의 튜플로 매핑
wordPairs = words.map(lambda word: (word, 1))

# 4. 같은 단어끼리 합산하여 단어 빈도수 계산 (reduceByKey 사용)
wordCounts = wordPairs.reduceByKey(lambda count1, count2: count1 + count2)

# 5. 결과를 드라이버로 수집하고 출력
for word, count in wordCounts.collect():
    print(f"{word}: {count}")

sc.stop()  # 리소스 해제

 

제공되는 API 처리 기능은 크게 아래와 같이 transformation 작업과 action 작업으로 나눌 수 있으며,
spark의 lazy evaluation 특성에 따라 transformation 작업은 action 작업이 수행될 때 같이 진행된다.

  • Transformation: 새로운 데이터 형식으로 변형
    • map: 각 원소를 1:1 대응하여 새로운 값으로 변환
    • flatMap: 한 입력을 여러 개의 출력으로 변환
    • filter: 조건에 따라 데이터를 필터링
    • distinct: 중복 값 제거
    • sample: 샘플링
    • union, intersection, subtract, cartesian: 여러 RDD 간의 집합 연산
  • Action: 결과값 계산
    • collect: 모든 데이터를 수집하여 리스트로 반환
    • count: 요소의 개수 계산
    • countByValue: 각 값의 빈도수 계산
    • take: 상위 몇 개의 원소를 반환
    • top: 가장 큰 값(또는 정렬 후 상위 값) 반환
    • reduce: 요소들을 집계하여 하나의 값으로 축소

 

DataFrame

비록 RDD를 통해 MapReduce보다는 훨씬 빠르게 (약 20배 정도) 작업할 수 있지만,
특정 작업을 위해 사용자가 구체적으로 처리해줘야 하는 부분이 많아 다소 복잡한 측면이 있었다.

이러한 부분을 해소시키고자 spark version 2 이상부터는 DataFrame 기능을 제공하고 있으며,
테이블 형식으로 스키마를 제공해주는 row 객체들을 보유하고 있는 단위로 좀 더 편리하게 데이터를 다룰 수 있다.

from pyspark.sql import SparkSession, Row
spark = SparkSession.builder.appName("sparkSQL").getOrCreate()

inputData = spark.read.json("data.json")

# 임시 뷰로 등록하여 SQL 쿼리 실행
inputData.createOrReplaceTempView("myStructuredStuff")
myResultDataFrame = spark.sql("SELECT foo FROM myStructuredStuff ORDER BY foobar")
myResultDataFrame.show()

 

DataFrame에서 주로 사용되는 API는 다음과 같다.

  • show: DataFrame 내용 표 형태로 출력
    (default는 20행으로, 빠르게 결과를 보고 싶을 때는 5행 등으로 축소해서 확인하면 편리하다.)
  • select: 특정 column만 선택
  • filter: 조건에 맞는 행만 남겨 필터링 (SQL의 WHERE와 비슷)
  • groupBy: 데이터를 특정 열 기준으로 그룹화하여 집계 함수(count, avg 등) 적용
  • rdd: DataFrame을 RDD로 변환해서, RDD의 함수형 연산(map, filter, reduce 등) 사용

 

DataSet

추가적으로 DataSet은 DataFrame도 포함하는 확장된 개념으로, 컴파일 시점에 데이터 타입을 체크하게 된다.
scala 등 함수형 언어에서 사용되기 때문에 pyspark를 주로 다룬다면 접하게 되는 내용은 아니지만, 
타입 안정성(type-safety)을 제공하면서 오류를 미리 방지할 수 있다.

scala를 통해서 dataset를 사용하는 예시는 다음과 같다.

import org.apache.spark.sql.{SparkSession, Encoder, Encoders}
import org.apache.spark.sql.functions._

case class Person(name: String, age: Int)

object DatasetExample {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .appName("Dataset Example")
      .master("local[*]")
      .getOrCreate()

    import spark.implicits._

    // JSON 파일을 Dataset으로 로드
    val peopleDS = spark.read
      .json("people.json")
      .as[Person] // Dataset[Person]으로 변환

    // Dataset API 활용 예시
    peopleDS.filter(_.age > 25).show()   // age > 25인 사람만 필터링
    peopleDS.groupBy("age").count().show() // 나이별 개수 세기

    spark.stop()
  }
}

 

그럼에도 RDD가 여전히 사용되고 있는 이유

spark version 2 이상부터 DataFrame, DataSet 개념이 등장하면서 대체적으로 RDD보다 DataFrame을 더욱 많이 이용하는 추세이지만, RDD가 완전히 deprecated 되지 않고 여전히 병행하여 활용되고 있다.

이는 DataFrame이 구조적인 테이블 형식으로 데이터를 처리하기 때문에,
만약 스키마 정의가 어려운 비구조인 데이터를 다뤄야 할 때에는 RDD를 통해 작업을 진행해야 한다.
간단한 예시로는 로그 데이터를 정제할 때 단순히 "ERROR"가 들어간 대상만 필터링한다고 했을 때에는
테이블 형식의 DataFrame이 아닌 textFile RDD로 읽어 실행할 수 있다.

from pyspark import SparkContext

sc = SparkContext("local", "UnstructuredData")

log_rdd = sc.textFile("server_logs.txt")

# ERROR 로그만 필터링하여 저장
error_logs = log_rdd.filter(lambda line: "ERROR" in line)

print(error_logs.collect())  # 예: ["ERROR - Disk failure at node 3", "ERROR - Out of memory"]

sc.stop()

 

또한 spark 내부 동작을 좀 더 최적화해서 작업을 진행해야 할 경우에는 RDD를 이용해야 한다.
예를 들어 파티션 개수를 조절해서 작업해야 할 때, 아래와 같이 적용이 필요할 수 있다.

from pyspark import SparkContext

sc = SparkContext("local", "LowLevelRDD")

rdd = sc.parallelize(range(100), numSlices=10)

# 특정 파티션 개수로 줄이기
coalesced_rdd = rdd.coalesce(5)

print(f"기존 RDD 파티션 개수: {rdd.getNumPartitions()}")
print(f"최적화 후 RDD 파티션 개수: {coalesced_rdd.getNumPartitions()}")

sc.stop()

 

References

Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing (2012, Zaharia)
https://www.usenix.org/conference/nsdi12/technical-sessions/presentation/zaharia

Udemy - Apache Spark 와 Python으로 빅 데이터 다루기
https://www.udemy.com/course/best-apache-spark-python

 

반응형