본문 바로가기

COMPUTER SCIENCE/PYTHON

[OSSCA] python-mysql-replication - UserVarEvent 구현하기

이전 2023 오픈소스 컨트리뷰션 python-mysql-replication RandEvent 기여 활동 글에 이어
두번째 기여 작업이었던 UserVarEvent에 대해 이야기해보고자 한다.
(이전 RandEvent 구현 내용은 https://heehehe-ds.tistory.com/204에서 확인할 수 있다.)

[OSSCA] python-mysql-replication - RandEvent 구현하기

2023 오픈소스 컨트리뷰션 python-mysql-replication 첫번째 기여 활동으로 진행한 RandEvent 구현 과정에 대해 살펴보고자 한다. (python-mysql-replication 설명은 이전 글 참고 : https://heehehe-ds.tistory.com/203) [OSSCA]

heehehe-ds.tistory.com

 

UserVarEvent란?

이전 글에도 작성했지만 리마인드 차원에서 Event에 대해 간단하게 짚고 넘어가자면,
Event는 데이터 변경에 대한 최소 단위를 의미한다. 
테이블이 새로 생성되거나 테이블에 특정 row가 입력 및 변경되는 등 데이터에 변화가 일어날 때 event가 발생한다고 볼 수 있다.
더욱 다양한 Event 목록은 https://mariadb.com/kb/en/replication-protocol/ 에서도 확인할 수 있다.
이러한 여러 event 중 UserVarEvent는 아래와 같이 사용자가 직접 정의한 변수를 이용할 때 발생되는 event이다.

SET uservar='foo';
INSERT INTO test_table(test_field) VALUES(@uservar);  -- UserVarEvent 발생

 
해당 event를 구현하기로 결정한 이유는 우선 개념 자체가 이해하기 쉬웠다.
그리고 무엇보다 지난번에 진행했던 RandEvent와 동일한 Statement-based Logging 방식이었기 때문에
Statement-based Logging에 대해 구현해놓은 테스트도 더욱 활용할 수 있겠다고 판단이 들어서 진행하게 되었다.
UserVarEvent가 발생하면 아래와 같은 주요 정보가 binlog를 통해서 들어온다.

  1. 변수명에 대한 길이: 뒤에서 변수명에 대해 몇바이트를 읽을지 알기 위해 먼저 변수명 길이를 받게 된다. (uint<4>)
  2. 변수명: 1에서 알아낸 길이만큼 변수명을 읽어온다. (string<len>)
  3. NULL indicator: 해당 변수값이 NULL인지 여부를 알아낸다. (uint<1>)
  4. (3에서 변수값이 NULL이 아니라면)
    • 변수 형식: string / real / int / row / decimal 에 따라 각각 value를 지니고 있다. (uint<1>)
         * 0x00: STRING_RESULT (set @a:="foo")
         * 0x01: REAL_RESULT (set @a:=@@timestamp)
         * 0x02: INT_RESULT (set @a:=4)
         * 0x03: ROW_RESULT (현재 deprecated된 형식으로 보인다..)
         * 0x04: DECIMAL_RESULT (set @a:=1.2345)
    • collation number: collation 번호 (uint<4>)
      (collation은 문자열 정렬 및 비교 위한 규칙으로 ascii_general_ci, utf8mb3_unicode_ci 등이 있다.
      "SHOW COLLATION;" 등의 명령어를 통해 확인할 수 있으며,
      python-mysql-replication에서는 pymysqlreplication/constants/CHARSET.py에 미리 정의해두고 있어서
      매번 collation을 호출하지 않고 이용할 수 있다.)
    • value 길이: 뒤에서 value를 읽어오기 위해 받아온다. (uint<4>)
    • value 값: 앞에서 얻어낸 value 길이를 통해 읽어온다. (string<len>)
    • flags: 변수 형식별 특정 flag (uint<1>)
      (현재 int 형식에 대해서만 이용되고 있으며, 0이면 signed / 1이면 unsigned로 판별하여 value 변환 시 이용한다.)

python-mysql-replication에 직접 구현해보기

이제 python-mysql-replication에서 구현할 때는 어떻게 위 정보들을 읽어왔는지 살펴보려고 한다.
각 event 별로 pymysqlreplication/event.py에 class가 정의되어 있는데,
기본적으로 BinLogEvent를 상속받고 있으며 UserVarEvent class도 마찬가지로 BinLogEvent를 상속받아 생성되었다.
상속받은 BinLogEvent를 통해 super init을 진행해준 뒤, binlog를 순차적으로 읽어서 정보를 받아온다.
정보를 받아올 때는 pymysqlreplicaton/packet.py에서 read_uint8, read_uint32 등 각 type별로 정의된 method를 이용했다.
특히 value 부분의 경우, 추후 type에 따라 변환을 진행해주기 위해 먼저 temp_value_buffer에 저장해둔 뒤
_set_value_from_temp_buffer를 통해 type별로 변환을 진행해주었다.

self.name_len: int = self.packet.read_uint32()
self.name: str = self.packet.read(self.name_len).decode()
self.is_null: int = self.packet.read_uint8()

if not self.is_null:
    self.type: int = self.packet.read_uint8()
    self.charset: int = self.packet.read_uint32()
    self.value_len: int = self.packet.read_uint32()
    self.temp_value_buffer: Union[bytes, memoryview] = self.packet.read(self.value_len)
    self.flags: int = self.packet.read_uint8()
    self._set_value_from_temp_buffer()
else:
    self.type, self.charset, self.value_len, self.value, self.flags = None, None, None, None, None

 
type별로 변환을 해주기 위해서는 우선 type_to_codes_and_method 변수에
각 type 숫자에 대한 type 정보 및 처리할 메소드를 정의해주었다.

self.type_to_codes_and_method: dict = {
    0x00: ['STRING_RESULT', self._read_string],
    0x01: ['REAL_RESULT', self._read_real],
    0x02: ['INT_RESULT', self._read_int],
    0x03: ['ROW_RESULT', self._read_default],
    0x04: ['DECIMAL_RESULT', self._read_decimal]
}

 
그리고 받아온 정보를 통해 _set_value_from_temp_buffer에서 type별 변환을 수행해주었다.
INT_RESULT일 경우 singed 여부를 전달하기 위해 self.flags를 추가로 전달했다.

def _set_value_from_temp_buffer(self):
    """
    Set the value from the temporary buffer based on the type code.
    """
    if self.temp_value_buffer:
        type_code, read_method = self.type_to_codes_and_method.get(self.type, ["UNKNOWN_RESULT", self._read_default])
        if type_code == 'INT_RESULT':
            self.value = read_method(self.temp_value_buffer, self.flags)
        else:
            self.value = read_method(self.temp_value_buffer)

 
각 변수 type별 변환 방식은 다음과 같다.

(1) string 변환

string은 가장 간단하게 decode를 통해 string으로 변환해주면 되었다.

def _read_string(self, buffer: bytes) -> str:
    return buffer.decode()

 

(2) real 변환

string 이외의 형식들은 binary 데이터를 해석하기 위해 python의 struct라는 패키지를 이용했으며, 
https://docs.python.org/ko/3/library/struct.html#format-characters에 따라 unpack할 format을 지정해주었다.
여기에서는 double 변환을 나타내는 '<d'를 사용해주었다.

def _read_real(self, buffer: bytes) -> float:
    return struct.unpack('<d', buffer)[0]

(3) int 변환

int도 real과 마찬가지로 signed 여부에 따라 '<Q'와 '<q'로 format을 다르게 지정하여 진행되었다.

def _read_int(self, buffer: bytes, flags: int) -> int:
    fmt = '<Q' if flags == 1 else '<q'
    return struct.unpack(fmt, buffer)[0]

(4) decimal 변환

가장 고난이도였던 decimal 변환은.. 정수 부분과 소수점 부분을 나눠서 따로 처리해줘야 하는 부분이었다.
pymysqlreplication/util/bytes.py에 정의되어 있던 parse_decimal_from_bytes를 기반으로 아래와 같이 구현되었다.

def _read_decimal(self, buffer: bytes) -> decimal.Decimal:
    self.precision = self.temp_value_buffer[0]
    self.decimals = self.temp_value_buffer[1]
    raw_decimal = self.temp_value_buffer[2:]
    return self._parse_decimal_from_bytes(raw_decimal, self.precision, self.decimals)

@staticmethod
def _parse_decimal_from_bytes(raw_decimal: bytes, precision: int, decimals: int) -> decimal.Decimal:
    digits_per_integer = 9  # 정수 부분에 사용되는 최대 자릿수 설정
    compressed_bytes = [0, 1, 1, 2, 2, 3, 3, 4, 4, 4]  # 압축된 데이터의 byte 크기를 정의하는 배열
    integral = precision - decimals  # 정수 부분 자릿수

    # 정수 부분과 소수 부분을 압축된 부분과 압축되지 않은 부분으로 분할
    uncomp_integral, comp_integral = divmod(integral, digits_per_integer)
    uncomp_fractional, comp_fractional = divmod(decimals, digits_per_integer)

    # 첫 번째 byte에서 부호를 확인하고, 음수인 경우 '-'를 설정
    res = "-" if not raw_decimal[0] & 0x80 else ""
    mask = -1 if res == "-" else 0
    raw_decimal = bytearray([raw_decimal[0] ^ 0x80]) + raw_decimal[1:]

    # 압축 해제를 위한 내부 함수
    def decode_decimal_decompress_value(comp_indx, data, mask):
        size = compressed_bytes[comp_indx]
        if size > 0:
            databuff = bytearray(data[:size])
            for i in range(size):
                databuff[i] = (databuff[i] ^ mask) & 0xFF
            return size, int.from_bytes(databuff, byteorder='big')
        return 0, 0

    # 압축된 정수 부분을 해석
    pointer, value = decode_decimal_decompress_value(comp_integral, raw_decimal, mask)
    res += str(value)

    # 압축되지 않은 정수 부분을 해석
    for _ in range(uncomp_integral):
        value = struct.unpack('>i', raw_decimal[pointer:pointer+4])[0] ^ mask
        res += '%09d' % value
        pointer += 4

    # 소수점 추가
    res += "."

    # 압축되지 않은 소수 부분을 해석
    for _ in range(uncomp_fractional):
        value = struct.unpack('>i', raw_decimal[pointer:pointer+4])[0] ^ mask
        res += '%09d' % value
        pointer += 4

    # 압축된 소수 부분을 해석
    size, value = decode_decimal_decompress_value(comp_fractional, raw_decimal[pointer:], mask)
    if size > 0:
        res += '%0*d' % (comp_fractional, value)
    
    # 최종 결과를 decimal.Decimal 형식으로 반환
    return decimal.Decimal(res)

 

테스트 구현

오픈소스인만큼 여러 테스트케이스에 대해 구현하는 것이 중요했다.
이전에 RandEvent에서 구현해둔 Statement-Based Logging 테스트 class에 추가해 주었으며,
각 변수 값 type마다 테스트케이스를 추가했다.
각각 변수명 및 값 등이 올바르게 들어왔는지는 unittest를 통해 확인하도록 설정했다.

# ex) string type의 user var event 테스트
def test_user_var_string_event(self):
    self.execute("CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR(50), PRIMARY KEY (id))")
    self.execute("SET @test_user_var = 'foo'")
    self.execute("INSERT INTO test (data) VALUES(@test_user_var)")
    self.execute("COMMIT")

    self.assertEqual(self.bin_log_format(), "STATEMENT")
    self.assertIsInstance(self.stream.fetchone(), QueryEvent)
    self.assertIsInstance(self.stream.fetchone(), QueryEvent)

    expected_user_var_event = self.stream.fetchone()
    self.assertIsInstance(expected_user_var_event, UserVarEvent)
    self.assertIsInstance(expected_user_var_event.name_len, int)
    self.assertEqual(expected_user_var_event.name, "test_user_var")
    self.assertEqual(expected_user_var_event.value, "foo")
    self.assertEqual(expected_user_var_event.is_null, 0)
    self.assertEqual(expected_user_var_event.type, 0)
    self.assertEqual(expected_user_var_event.charset, 33)

 
다른 변수 형식에 대한 테스트 구현 내용은 pymysqlreplication/tests/test_basic.py에서 확인할 수 있다.
 

기여 결과 및 소감

먼저 fork를 땄던 repo에서 PR을 올려 멘토님께 검토를 받은 뒤, 메인 repo에 PR을 올리는 방식으로 진행되었다.

여러 시행착오를 거치면서 진행되다보니 commit 개수가 과도하게 많아졌었는데,
히스토리 관리도 필요하다는 멘토님의 피드백을 통해 git rebase로 46개의 commit을 7개로 squash 시킨 뒤 메인 repo에 PR을 올렸다.
그 결과 owner 분의 "wow" 코멘트를 받으며 무사히 merge시킬 수 있었다 😄

사실 이 event의 개념 자체가 쉽게 다가왔기에 구현도 별로 어렵지 않을 것 같아서 시작하자고 했던 event였는데..
막상 뜯어보니 쉽지 않은 난이도였다..(특히 decimal 부분.. 멘토님 피셜 우리 조가 한 작업이 매번 최고난도였다고 한다..^_ㅠ)

해당 event를 구현할 당시 event 개념과 프로젝트 구조가 계속 헷갈려서 많이 헤맸는데,
팀원분들과 함께 세부적으로 뜯어보고 살펴본 덕분에 merge까지 진행될 수 있었다고 생각한다.
특히 융무의 기술블로그를 운영하고 계시는 팀원 분의 활약으로 무사히 마무리하고 많이 배워갈 수 있었다 (다시 한 번 감사드립니다🙇🏻‍♀️👍🏻)

다음 글에서는 MySQL의 여러 버전 및 Mariadb에 대해서 해당 프로젝트를 테스트를 할 수 있도록 구현했던 "테스트 구조 변경"에 대해 살펴보고자 한다.

반응형