이전 2023 오픈소스 컨트리뷰션 python-mysql-replication RandEvent 기여 활동 글에 이어
두번째 기여 작업이었던 UserVarEvent에 대해 이야기해보고자 한다.
(이전 RandEvent 구현 내용은 https://heehehe-ds.tistory.com/204에서 확인할 수 있다.)
UserVarEvent란?
이전 글에도 작성했지만 리마인드 차원에서 Event에 대해 간단하게 짚고 넘어가자면,
Event는 데이터 변경에 대한 최소 단위를 의미한다.
테이블이 새로 생성되거나 테이블에 특정 row가 입력 및 변경되는 등 데이터에 변화가 일어날 때 event가 발생한다고 볼 수 있다.
더욱 다양한 Event 목록은 https://mariadb.com/kb/en/replication-protocol/ 에서도 확인할 수 있다.
이러한 여러 event 중 UserVarEvent는 아래와 같이 사용자가 직접 정의한 변수를 이용할 때 발생되는 event이다.
SET uservar='foo';
INSERT INTO test_table(test_field) VALUES(@uservar); -- UserVarEvent 발생
해당 event를 구현하기로 결정한 이유는 우선 개념 자체가 이해하기 쉬웠다.
그리고 무엇보다 지난번에 진행했던 RandEvent와 동일한 Statement-based Logging 방식이었기 때문에
Statement-based Logging에 대해 구현해놓은 테스트도 더욱 활용할 수 있겠다고 판단이 들어서 진행하게 되었다.
UserVarEvent가 발생하면 아래와 같은 주요 정보가 binlog를 통해서 들어온다.
- 변수명에 대한 길이: 뒤에서 변수명에 대해 몇바이트를 읽을지 알기 위해 먼저 변수명 길이를 받게 된다. (uint<4>)
- 변수명: 1에서 알아낸 길이만큼 변수명을 읽어온다. (string<len>)
- NULL indicator: 해당 변수값이 NULL인지 여부를 알아낸다. (uint<1>)
- (3에서 변수값이 NULL이 아니라면)
- 변수 형식: string / real / int / row / decimal 에 따라 각각 value를 지니고 있다. (uint<1>)
* 0x00: STRING_RESULT (set @a:="foo")
* 0x01: REAL_RESULT (set @a:=@@timestamp)
* 0x02: INT_RESULT (set @a:=4)
* 0x03: ROW_RESULT (현재 deprecated된 형식으로 보인다..)
* 0x04: DECIMAL_RESULT (set @a:=1.2345) - collation number: collation 번호 (uint<4>)
(collation은 문자열 정렬 및 비교 위한 규칙으로 ascii_general_ci, utf8mb3_unicode_ci 등이 있다.
"SHOW COLLATION;" 등의 명령어를 통해 확인할 수 있으며,
python-mysql-replication에서는 pymysqlreplication/constants/CHARSET.py에 미리 정의해두고 있어서
매번 collation을 호출하지 않고 이용할 수 있다.) - value 길이: 뒤에서 value를 읽어오기 위해 받아온다. (uint<4>)
- value 값: 앞에서 얻어낸 value 길이를 통해 읽어온다. (string<len>)
- flags: 변수 형식별 특정 flag (uint<1>)
(현재 int 형식에 대해서만 이용되고 있으며, 0이면 signed / 1이면 unsigned로 판별하여 value 변환 시 이용한다.)
- 변수 형식: string / real / int / row / decimal 에 따라 각각 value를 지니고 있다. (uint<1>)
python-mysql-replication에 직접 구현해보기
이제 python-mysql-replication에서 구현할 때는 어떻게 위 정보들을 읽어왔는지 살펴보려고 한다.
각 event 별로 pymysqlreplication/event.py에 class가 정의되어 있는데,
기본적으로 BinLogEvent를 상속받고 있으며 UserVarEvent class도 마찬가지로 BinLogEvent를 상속받아 생성되었다.
상속받은 BinLogEvent를 통해 super init을 진행해준 뒤, binlog를 순차적으로 읽어서 정보를 받아온다.
정보를 받아올 때는 pymysqlreplicaton/packet.py에서 read_uint8, read_uint32 등 각 type별로 정의된 method를 이용했다.
특히 value 부분의 경우, 추후 type에 따라 변환을 진행해주기 위해 먼저 temp_value_buffer에 저장해둔 뒤
_set_value_from_temp_buffer를 통해 type별로 변환을 진행해주었다.
self.name_len: int = self.packet.read_uint32()
self.name: str = self.packet.read(self.name_len).decode()
self.is_null: int = self.packet.read_uint8()
if not self.is_null:
self.type: int = self.packet.read_uint8()
self.charset: int = self.packet.read_uint32()
self.value_len: int = self.packet.read_uint32()
self.temp_value_buffer: Union[bytes, memoryview] = self.packet.read(self.value_len)
self.flags: int = self.packet.read_uint8()
self._set_value_from_temp_buffer()
else:
self.type, self.charset, self.value_len, self.value, self.flags = None, None, None, None, None
type별로 변환을 해주기 위해서는 우선 type_to_codes_and_method 변수에
각 type 숫자에 대한 type 정보 및 처리할 메소드를 정의해주었다.
self.type_to_codes_and_method: dict = {
0x00: ['STRING_RESULT', self._read_string],
0x01: ['REAL_RESULT', self._read_real],
0x02: ['INT_RESULT', self._read_int],
0x03: ['ROW_RESULT', self._read_default],
0x04: ['DECIMAL_RESULT', self._read_decimal]
}
그리고 받아온 정보를 통해 _set_value_from_temp_buffer에서 type별 변환을 수행해주었다.
INT_RESULT일 경우 singed 여부를 전달하기 위해 self.flags를 추가로 전달했다.
def _set_value_from_temp_buffer(self):
"""
Set the value from the temporary buffer based on the type code.
"""
if self.temp_value_buffer:
type_code, read_method = self.type_to_codes_and_method.get(self.type, ["UNKNOWN_RESULT", self._read_default])
if type_code == 'INT_RESULT':
self.value = read_method(self.temp_value_buffer, self.flags)
else:
self.value = read_method(self.temp_value_buffer)
각 변수 type별 변환 방식은 다음과 같다.
(1) string 변환
string은 가장 간단하게 decode를 통해 string으로 변환해주면 되었다.
def _read_string(self, buffer: bytes) -> str:
return buffer.decode()
(2) real 변환
string 이외의 형식들은 binary 데이터를 해석하기 위해 python의 struct라는 패키지를 이용했으며,
https://docs.python.org/ko/3/library/struct.html#format-characters에 따라 unpack할 format을 지정해주었다.
여기에서는 double 변환을 나타내는 '<d'를 사용해주었다.
def _read_real(self, buffer: bytes) -> float:
return struct.unpack('<d', buffer)[0]
(3) int 변환
int도 real과 마찬가지로 signed 여부에 따라 '<Q'와 '<q'로 format을 다르게 지정하여 진행되었다.
def _read_int(self, buffer: bytes, flags: int) -> int:
fmt = '<Q' if flags == 1 else '<q'
return struct.unpack(fmt, buffer)[0]
(4) decimal 변환
가장 고난이도였던 decimal 변환은.. 정수 부분과 소수점 부분을 나눠서 따로 처리해줘야 하는 부분이었다.
pymysqlreplication/util/bytes.py에 정의되어 있던 parse_decimal_from_bytes를 기반으로 아래와 같이 구현되었다.
def _read_decimal(self, buffer: bytes) -> decimal.Decimal:
self.precision = self.temp_value_buffer[0]
self.decimals = self.temp_value_buffer[1]
raw_decimal = self.temp_value_buffer[2:]
return self._parse_decimal_from_bytes(raw_decimal, self.precision, self.decimals)
@staticmethod
def _parse_decimal_from_bytes(raw_decimal: bytes, precision: int, decimals: int) -> decimal.Decimal:
digits_per_integer = 9 # 정수 부분에 사용되는 최대 자릿수 설정
compressed_bytes = [0, 1, 1, 2, 2, 3, 3, 4, 4, 4] # 압축된 데이터의 byte 크기를 정의하는 배열
integral = precision - decimals # 정수 부분 자릿수
# 정수 부분과 소수 부분을 압축된 부분과 압축되지 않은 부분으로 분할
uncomp_integral, comp_integral = divmod(integral, digits_per_integer)
uncomp_fractional, comp_fractional = divmod(decimals, digits_per_integer)
# 첫 번째 byte에서 부호를 확인하고, 음수인 경우 '-'를 설정
res = "-" if not raw_decimal[0] & 0x80 else ""
mask = -1 if res == "-" else 0
raw_decimal = bytearray([raw_decimal[0] ^ 0x80]) + raw_decimal[1:]
# 압축 해제를 위한 내부 함수
def decode_decimal_decompress_value(comp_indx, data, mask):
size = compressed_bytes[comp_indx]
if size > 0:
databuff = bytearray(data[:size])
for i in range(size):
databuff[i] = (databuff[i] ^ mask) & 0xFF
return size, int.from_bytes(databuff, byteorder='big')
return 0, 0
# 압축된 정수 부분을 해석
pointer, value = decode_decimal_decompress_value(comp_integral, raw_decimal, mask)
res += str(value)
# 압축되지 않은 정수 부분을 해석
for _ in range(uncomp_integral):
value = struct.unpack('>i', raw_decimal[pointer:pointer+4])[0] ^ mask
res += '%09d' % value
pointer += 4
# 소수점 추가
res += "."
# 압축되지 않은 소수 부분을 해석
for _ in range(uncomp_fractional):
value = struct.unpack('>i', raw_decimal[pointer:pointer+4])[0] ^ mask
res += '%09d' % value
pointer += 4
# 압축된 소수 부분을 해석
size, value = decode_decimal_decompress_value(comp_fractional, raw_decimal[pointer:], mask)
if size > 0:
res += '%0*d' % (comp_fractional, value)
# 최종 결과를 decimal.Decimal 형식으로 반환
return decimal.Decimal(res)
테스트 구현
오픈소스인만큼 여러 테스트케이스에 대해 구현하는 것이 중요했다.
이전에 RandEvent에서 구현해둔 Statement-Based Logging 테스트 class에 추가해 주었으며,
각 변수 값 type마다 테스트케이스를 추가했다.
각각 변수명 및 값 등이 올바르게 들어왔는지는 unittest를 통해 확인하도록 설정했다.
# ex) string type의 user var event 테스트
def test_user_var_string_event(self):
self.execute("CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR(50), PRIMARY KEY (id))")
self.execute("SET @test_user_var = 'foo'")
self.execute("INSERT INTO test (data) VALUES(@test_user_var)")
self.execute("COMMIT")
self.assertEqual(self.bin_log_format(), "STATEMENT")
self.assertIsInstance(self.stream.fetchone(), QueryEvent)
self.assertIsInstance(self.stream.fetchone(), QueryEvent)
expected_user_var_event = self.stream.fetchone()
self.assertIsInstance(expected_user_var_event, UserVarEvent)
self.assertIsInstance(expected_user_var_event.name_len, int)
self.assertEqual(expected_user_var_event.name, "test_user_var")
self.assertEqual(expected_user_var_event.value, "foo")
self.assertEqual(expected_user_var_event.is_null, 0)
self.assertEqual(expected_user_var_event.type, 0)
self.assertEqual(expected_user_var_event.charset, 33)
다른 변수 형식에 대한 테스트 구현 내용은 pymysqlreplication/tests/test_basic.py에서 확인할 수 있다.
기여 결과 및 소감
먼저 fork를 땄던 repo에서 PR을 올려 멘토님께 검토를 받은 뒤, 메인 repo에 PR을 올리는 방식으로 진행되었다.
- forked repo: https://github.com/python-mysql-replication-kr/python-mysql-replication/pull/44
- main repo: https://github.com/julien-duponchelle/python-mysql-replication/pull/466
여러 시행착오를 거치면서 진행되다보니 commit 개수가 과도하게 많아졌었는데,
히스토리 관리도 필요하다는 멘토님의 피드백을 통해 git rebase로 46개의 commit을 7개로 squash 시킨 뒤 메인 repo에 PR을 올렸다.
그 결과 owner 분의 "wow" 코멘트를 받으며 무사히 merge시킬 수 있었다 😄
사실 이 event의 개념 자체가 쉽게 다가왔기에 구현도 별로 어렵지 않을 것 같아서 시작하자고 했던 event였는데..
막상 뜯어보니 쉽지 않은 난이도였다..(특히 decimal 부분.. 멘토님 피셜 우리 조가 한 작업이 매번 최고난도였다고 한다..^_ㅠ)
해당 event를 구현할 당시 event 개념과 프로젝트 구조가 계속 헷갈려서 많이 헤맸는데,
팀원분들과 함께 세부적으로 뜯어보고 살펴본 덕분에 merge까지 진행될 수 있었다고 생각한다.
특히 융무의 기술블로그를 운영하고 계시는 팀원 분의 활약으로 무사히 마무리하고 많이 배워갈 수 있었다 (다시 한 번 감사드립니다🙇🏻♀️👍🏻)
다음 글에서는 MySQL의 여러 버전 및 Mariadb에 대해서 해당 프로젝트를 테스트를 할 수 있도록 구현했던 "테스트 구조 변경"에 대해 살펴보고자 한다.
'COMPUTER SCIENCE > PYTHON' 카테고리의 다른 글
[CPython 파헤치기] 사실 Python은 C로 구성되어 있다 - CPython 구성 요소 살펴보기 (1) | 2024.03.23 |
---|---|
[OSSCA] python-mysql-replication - 오픈소스는 어떻게 테스트를 진행하고 있을까? (0) | 2024.01.03 |
[OSSCA] python-mysql-replication - RandEvent 구현하기 (0) | 2023.10.04 |
[OSSCA] python-mysql-replication - 개념 이해하기 (0) | 2023.10.02 |
[Python] dateutil import 오류 해결 (0) | 2021.10.14 |