비록 Spark, Tez 등 데이터를 빠르게 처리하는 프레임워크가 등장하면서 MapReduce가 과거의 기술로 여겨지고 있지만,
여전히 큰 데이터를 다루는 데에 있어서 기초가 되는 부분 중 하나이다.
MapReduce는 하둡 프레임워크로 일반적으로는 하둡 환경에서 진행할 수 있지만,
분산 환경 없이 로컬 환경에서도 이용할 수 있는 오픈소스 Local MapReduce가 존재했다.
(https://github.com/d2207197/local-mapreduce/tree/master)
해당 오픈소스를 알려주신 분이 내부가 그렇게 복잡하지 않다고 말씀주셔서
한번 내부를 구경해봤더니 정말 100줄도 되지 않는 하나의 스크립트로 구현되어 있었다.
어떻게 구현해놨길래 100줄도 안되는 스크립트로 이렇게 유용하게 이용할 수 있는지
이번 글에서는 local mapreduce의 내부 구조를 파헤쳐 보고자 한다.
그전에 MapReduce가 무엇인지 간략하게 알아보자.
MapReduce란?
데이터를 분할하여 처리(Map)한 뒤 집계(Reduce)하는 분산 처리 프레임워크로,
대표적인 아래 wordcount 예시로 쉽게 이해할 수 있다.
- split
: 우선 데이터를 원하는 개수만큼으로 분할한다. - map
: split된 데이터에 대해 동일한 작업을 수행시켜준다 (ex: 각 글자가 몇번 등장했는지 count) - shuffle
: 첫번째 열을 기준으로 데이터를 정렬하여 같이 reduce를 시켜줄 그룹을 만들어 준다. - reduce
: shuffle된 데이터를 집계해 준다.
map과 reduce에는 원하는 작업을 구현해서 수행하도록 해줄 수 있고,
해당 과정(Map → Reduce)을 원하는 결과가 나올 때까지 반복시키면서 MapReduce 프레임워크를 이용할 수 있다.
이러한 맵리듀스 개념을 오픈소스화 시켜둔 것이 바로 하둡이었기에 보통 맵리듀스를 이용하려면 하둡을 통해 이용해야 했지만,
아래와 같이 로컬에서도 쉽게 이용할 수 있는 오픈소스가 존재했다.
Local MapReduce (lmr)
Requirements
해당 오픈소스를 이용하기 위해서는 아래 4가지가 필요하다.
- bash : 메인 스크립트 구현
- GNU parallel : mapper 및 reducer 병렬 처리
- sort : shuffle 진행
- python : hashing 스크립트 구현
특히 GNU parallel의 경우, 여기에서는 아래 옵션을 주로 사용하게 된다.
- --pipe : 입력 형식을 파일이 아닌 표준 입력(stdin)으로 하도록 설정
- --block-size : 한번에 얼마나 많은 데이터를 작업에 전달할지 block size 지정 (ex: 2m(=2MB))
- --ungroup : 작업 출력을 순서대로 정렬시키지 않고, 완료되는대로 즉시 출력
- --halt now,fail=1 : 병렬 작업 중 하나라도 실패하면 모두 중지
기본 함수
usage
: 사용 방법을 출력해주는 것으로, lmr은 아래와 같이 이용할 수 있다고 출력해주고 있다.
"lmr [-k] <블록 크기> <hash segment 수> <mapper 스크립트 파일> <reducer 스크립트 파일> <최종 결과 출력 경로>"
(-k: mapper 출력 유지 여부)
function usage() {
echo "Usage: $0 [-k] <BLOCKSIZE> <NUM_HASHING_SEGS> <MAPPER> <REDUCER> <OUTPUT_DIR>" 1>&2
exit 1
}
timer
: 경과 시간을 측정해주는 함수로, input으로 이전 시간을 입력하면 지금으로부터 얼마나 경과되었는지 알려준다.
function timer()
{
local stime=$1
[[ -z $stime ]] && stime=$START_TIME
[[ -z $stime ]] && stime=0
dt=$((SECONDS - stime))
date -d@$dt -u +%H:%M:%S 2>/dev/null || date -u -r $dt +%T
}
clean_up
: 작업이 완료된 뒤, -k 옵션에 따라 mapper 결과를 제거해주고 임시 hashing 스크립트를 제거해준다.
function clean_up() {
echo $' \e[1;32m>>>\e[m Cleaning...'
sleep 1
if [ $keep_mapper_out = false ] ; then
rm -r "$TEMPDIR" &&
echo $' \e[1;32m>>>\e[m' Temporary directory deleted: $'\e[1;32m'"$TEMPDIR"$'\e[m' ||
echo $' \e[1;31m*\e[m' Failed to delete temporary directoy: $'\e[1;32m'"$TEMPDIR"$'\e[m'
else
echo $' \e[1;33m*\e[m' Mapper output directory: $'\e[1;32m'"$TEMPDIR"$'\e[m'
fi
rm "$HASHING_SCRIPT" &&
echo $' \e[1;32m>>>\e[m' Temporary hashing script deleted: $'\e[1;32m'"$HASHING_SCRIPT"$'\e[m' ||
echo $' \e[1;31m*\e[m' Failed to delete hashing script: $'\e[1;32m'"$HASHING_SCRIPT"$'\e[m'
} >&2
HASHING_SCRIPT
: mapper의 결과를 hash 함수를 통해 reducer로 분배하는 스크립트를 생성한다.
cat <<EOF > "${HASHING_SCRIPT}"
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import sys, os, fileinput
N_REDUCER, MAPPER_ID, BASE_DIR = int(sys.argv[1]), int(sys.argv[2]), sys.argv[3]
seg_file = [open(os.path.join(BASE_DIR, f"reducer-{seg_id:02}", f"mapper-{MAPPER_ID:02}"), 'w')
for seg_id in range(N_REDUCER)]
for line in fileinput.input(sys.argv[4:]):
key, _, value = line.rstrip().partition('\t')
print(key, value, sep='\t', file=seg_file[hash(key) % N_REDUCER])
EOF
################
HASHING_SCRIPT=`mktemp hashing.py.XXXX`
{
echo $' \e[1;32m>>>\e[m Temporary mapper hashing script created: \e[1;32m'$HASHING_SCRIPT$'\e[m'
}>&2
TEMPDIR=`mktemp -d mapper_tmp.XXXX`
{
mkdir `seq -f $TEMPDIR'/reducer-%02g' 0 $((NUM_HASHING_SEGS-1))`
echo $' \e[1;32m>>>\e[m Temporary mapper output directory created: \e[1;32m'$TEMPDIR$'\e[m'
}>&2
준비 사항
1. 옵션 처리
-k 옵션을 입력하게 되면, keep_mapper_out 이라는 변수를 true로 설정하여 mapper의 출력을 유지해준다.
keep_mapper_out=false
while getopts ":k" OPTION
do
case $OPTION in
k) keep_mapper_out=true;;
?) usage;;
esac
done
shift $((OPTIND-1))
2. 입력 값 확인
5개의 입력(블록 크기, hash segment 수, mapper, reducer, 출력 경로)을 받으며,
하나라도 누락되어 있으면 usage를 출력하고 종료된다.
[ $# -ne 5 ] && usage
BLOCKSIZE="$1"
NUM_HASHING_SEGS="$2"
MAPPER="$3"
REDUCER="$4"
OUTPUT_DIR="$5"
3. 임시 경로 생성 및 시작 시간 저장
출력 저장을 위한 경로를 생성하고, timer로 소요 시간을 계산하기 위해 START_TIME을 저장한다.
mkdir "$OUTPUT_DIR" || exit 1
START_TIME=timer
진행 과정
1. 병렬 Mapper 실행
parallel을 이용하여 MAPPER에 입력된 명령어를 실행한 뒤,
결과를 HASHING_SCRIPT로 NUM_HASHING_SEGS 수만큼 나눠서 TEMP_DIR에 저장한다.
echo $' \e[1;33m>>>\e[m' Mappers running... $'\e[1;33m'$(timer START_TIME)$'\e[m'
echo
parallel --pipe --block-size "${BLOCKSIZE}" --ungroup --halt now,fail=1 \
"echo -n $'\e[s\e[F\e[2K #{#}\e[u'; $MAPPER | PYTHONHASHSEED=0 python $HASHING_SCRIPT ${NUM_HASHING_SEGS} {#} $TEMPDIR" &&
2. 병렬 Sorting (shuffling) 실행
tab(-t $\t)으로 분할된 첫번째 필드(-k1,1)를 기준으로 정렬한 뒤, 입력 파일과 같은 파일에(-o {} {}) 결과를 저장한다.
::: "${TEMPDIR}"/*/* 부분은 GNU parallel 문법으로, ::: 뒤에 오는 부분을 parallel의 입력으로 받는다.
{
echo $' \e[1;33m>>>\e[m' Sort parts running... $'\e[1;33m'$(timer START_TIME)$'\e[m'
parallel --bar --ungroup --halt now,fail=1 \
"LC_ALL=C sort -k1,1 -t $'\t' -o {} {}" ::: "${TEMPDIR}"/*/*
} &&
3. 병렬 Reducer 실행
TEMPDIR에 shuffling된 데이터를 기준으로 REDUCER를 실행시켜준 뒤, OUTPUT_DIR에 결과를 입력해준다.
{
echo $' \e[1;33m>>>\e[m' Reducer running... $'\e[1;33m'$(timer START_TIME)$'\e[m'
parallel --bar --ungroup --halt now,fail=1 \
"LC_ALL=C sort -m -k1,1 -t $'\t' {}/* | $REDUCER > '$OUTPUT_DIR/{/.}'" ::: "${TEMPDIR}"/*
} &&
4. 최종 결과 출력
OUTPUT_DIR에 결과 저장까지 완료되면, output directory와 elapsed time을 반환하고 종료가 된다.
{
echo
echo $' \e[1;33m*\e[m' Output directory: $'\e[1;32m'"$OUTPUT_DIR"$'\e[m'
echo $' \e[1;33m*\e[m' Elasped time: $'\e[1;32m'$(timer START_TIME)$'\e[m'
} >&2
실행 결과
그렇다면 한번 실제로 이용해보자.
먼저 실행하기 전, GNU parallel과 local mapreduce를 설치해줘야 한다.
- GNU parallel 설치
- macos : brew install parallel
- centos : yum install parallel
- ubuntu : apt-get install parallel - local mapreduce 설치
$ git clone --depth 1 https://github.com/d2207197/local-mapreduce.git
$ sudo ln -s `pwd`/local-mapreduce/lmr /usr/local/bin
wordcount는 너무 많이 다뤄지는 예시이니, 여기에서는 데이터셋에 대한 평균을 계산하는 예시를 들어보고자 한다.
각각 mapper.py와 reducer.py는 다음과 같다.
import sys
def mapper():
for line in sys.stdin:
number = line.strip()
print(f"{number}\t1")
if __name__ == "__main__":
mapper()
import sys
def reducer():
total = 0
count = 0
for line in sys.stdin:
value, _ = line.strip().split("\t")
total += int(value)
count += 1
if count > 0:
average = total / count
print(f"Average: {average}")
if __name__ == "__main__":
reducer()
실행할 파일(data.txt)에는 평균을 내기 위한 숫자가 입력되어 있다.
실행은 아래와 같은 명령어로 진행할 수 있다.
lmr 2m 8 'python3 mapper.py' 'python3 reducer.py' result
여기에서는 블록 사이즈를 2MB로 데이터를 분할하고, 8개의 hash segment로 mapper 결과를 reducer에게 전달하고,
map과 reduce 명령어는 각각 "python3 mapper.py"와 "python3 reducer.py"가 된다.
해당 과정을 거친 최종 결과는 result 경로에 저장된다.
reducer-XX 파일을 열어보면 각 hashing된 파일별 평균을 알 수 있다.
본 목적이었던 파일 전체 평균을 알고자 한다면
hash segment 수를 1로 지정하고 돌리면 되고, 아래와 같은 결과가 나온다.
이렇게 하둡 환경 없이도 쉽게 MapReduce를 이용할 수 있다.
비록 맵리듀스보다 더 성능 좋은 프레임워크들이 많이 나왔지만,
간편하게 로컬에서 맵리듀스를 이용해보고자 한다면 해당 패키지를 이용해보는 것도 좋을 것 같다 :)
'DATA SCIENCE > DATA ENGINEERING' 카테고리의 다른 글
[udemy - Apache Spark와 Python으로 빅데이터 다루기] Spark란? (1) | 2024.04.14 |
---|---|
[DE] 2023 DEVIEW - SCDF로 하루 N만곡 이상 VIBE 메타 데이터 실시간으로 적재하기 (스트림 처리 레거시 극복일지 엿보기) (0) | 2023.06.04 |
[DE] 개발자들은 어떤 데이터베이스를 많이 사용할까? (2) | 2023.05.07 |
[DE] 쿠버네티스(kubernetes): 컨테이너도 자동화가 필요해! (0) | 2023.04.09 |