7 분 소요


카프카는 매우 빠른 속도로 오류를 복구하여 시스템에 문제가 없도록 고가용성을 유지합니다.
이는 어떤 방식으로 동작해서 가능한 것일까요??

오늘은 “카프카의 내부 동작 원리와 구현” 이라는 주제로 카프카에 대해 좀 더 알아보도록 하겠습니다.



카프카 리플리케이션

첫번째로 알아볼 것은 카프카 리플리케이션 입니다.
리플리케이션Topic - Partition 단위로 여러 브로커에 저장을 하는 방식이라고
앞선 포스트에서 소개를 했는데 오늘은 이에 대해서 세부적으로 알아보도록 하겠습니다

리플리케이션이란?

카프카는 데이터 파이프라인 서비스에서 정중앙에 위치하여 메시지를 받고 전달해주는
메인 허브 역할(우체부 역할)을 합니다.

그래서 카프카에 에러가 발생하게 된다면 이는 서비스의 전체적인 마비로 이어지게 됩니다.
만약 카카오톡 사용하는데 카프카를 사용하였고 카프카에 에러가 발생하여 일시적으로 메시지 전달이 되지 않는다면 사용자들의 불만이 폭주할 것이고 카카오 개발팀은 엄청난 고통을 받게 되겠죠..?

이처럼 메인 허브는 동작에 문제가 생긴다면 매우 심각한 영향을 끼치게 됩니다.
카프카는 이러한 문제를 사전에 방지하기 위해 리플리케이션이라는 것을 적용하고 있습니다.
이는 토픽의 파티션을 하나의 브로커에만 저장하는 것이 아닌 다른 브로커에도 저장을 해서 어떤 브로커에 문제가 발생했을때 다운된 브로커의 역할을 대신해주게 됩니다.

아래 이미지와 같이 카프카에서는 하나의 topic을 여러개의 브로커에 저장을 하는 방식으로 운영합니다.

alt
[출처] : https://hevodata.com/learn/kafka-replication/

Leader와 Follower

위의 이미지를 자세히보면 broker 1에 있는 topic1-part1 파티션에는 Leader라고 되어있고 broker 2,3에 있는 topic1-part1 파티션에는 Follower라고 적혀있는 것을 볼 수 있습니다.
이는 무엇을 뜻하는 걸까요?

카프카에서 프로듀서나 컨슈머가 topic에 있는 partition과 통신을 할때는 하나의 topic-partition에 대해서만 message를 주고 받을 수 있도록 설계되어 있습니다.

즉, 복사된 모든 파티션이 메시지 수령/전달 역할을 하지 않고 하나의 파티션에 대해서만 이 역할을 수행한다는 것인데 이 메인 Leader 역할을 하는 파티션을 Leader 파티션이라고 합니다.

그럼 Follower 파티션의 역할은 무엇일까요?

Follower 파티션은 Leader 파티션에 문제가 발생했을때 이 역할을 대신해주기 위해 대기하고 있는 파티션입니다. 이 말은 Leader와 항상 동일한 상태를 유지하고 있어야 한다는 뜻이 됩니다.

이러한 상태를 가르켜 카프카에서는 ISR(InSyncReplica)라 칭하며 파티션 단위로 Leader - Follower 간의 논리적인 그룹을 관리하게 됩니다.

실제 topic에 –describe를 해보면 Isr 이라는 항목에 대해서 확인을 할 수 있습니다. 아래를 보면 broker 1,2,3에 있는 파티션들은 ISR(동기화)되고 있다는 것을 알 수 있습니다.

1
2
3
4
/usr/local/kafka/bin/kafka-topics.sh --bootstrap-server peter-kafka01.foo.bar:9092 --topic peter-test01 --describe
>> 
Topic: peter-test01  PartitionCount: 1  ReplicationFactor : 3 Configs : segment.bytes=1073741824
Topic: Peter-test01  Partition: 0  Leader:1,  Replicas: 1,2,3  Isr: 1,2,3


ISR은 메시지를 기준으로 동기화가 이뤄지게 됩니다. 그래서 파티션은 동기화 되지 않은 메시지는 Consumer에 제공하지 않습니다. 만약 이를 허용한다면 아래의 상황에서 메시지가 유실되는 문제가 발생합니다.

[message가 유실되는 케이스]

  • 리더 파티션에 “test-message2”라는 msg가 전달됨.
  • 팔로우 파티션이 “test-message2”를 복사하기 전 consumer 하나가 메시지를 가져감.
  • 리더 파티션에 문제 발생하여 down.
  • 남아있는 팔로우 파티션 중 하나가 리더 파티션으로 선정.
  • 다른 consumer에서 새로운 리더 파티션에서 message를 가져가려할때 “test-message2”는 가져올 수 없음.


하이워터마크(High water mark)

위와 같이 Leader와 Follower 파티션 간의 sync를 맞추기 위해서 하이워터마크를 사용합니다. 이는 각 파티션이 동일한 Message를 가지고 있다는 것을 뜻합니다.

이는 다음과 같은 과정을 거쳐 각각의 broker들로 부터 관리됩니다.
alt

  1. broker1(Leader) message2를 전달 받음.
  2. broker2(Follower)에서 mesasge1을 받고 message2를 Leader에 요청.
  3. broker2(Follower)에서 mesasge1을 받고 message2를 Leader에 요청.
  4. broker1(leader)은 Follower들이 모두 message2를 요청한 것을 확인
     => message 1은 모두 받았다고 생각하고 Commit. (자신의 highwatermark up)
  5. broker1(leader)은 Follower들에게 highwatermark를 올리라고 요청.


Kafka가 빠른 이유 중 하나는 “Isr 작업이 빠르다” 입니다.
다른 플랫폼은 sync를 맞추기 위해 리더가 팔로워들에게 확인 메시지를 전달하고 ack을 받는 방식으로 구현이 되어있으나 카프카는 그렇지 않습니다.

위의 하이워터마크 update 과정을 다시보면 Leader가 별다른 확인 메시지를 보내지 않고 Follower가 다음 message를 요청하는 것만으로 sync를 맞추고 있는 것이 보이지요?

이렇게 카프카에서는 ack 방식을 이용하지 않고 팔로워가 pull하는 방식으로 sync를 확인 하기에 다른 플랫폼보다 빠르다는 이점을 가지고 있습니다.

리더 에포크(LeaderEpoch)

리더 에포크는 카프카의 파티션들이 복구 동작을 할때 메시지의 일관성을 유지하기 위한 용도로 이용됩니다.
이를 통해 여러 상황에서 복구될때 파티션들이 동일한 메시지를 가질 수 있도록 해줍니다.

아래와 같이 Follower가 message2(offset 1)을 받은 상태에서 다운이 되었다고 합시다. Follower는 복구된 이후에 message2를 지워야 할까요?
alt

리더에포크를 사용하지 않는다면 위에서 message2는 바로 삭제가 됩니다. 하지만 리더에포크를 사용했다면 message2는 바로 삭제되지 않고 현재 Leader에게 리더 에포크를 요청한 이후에 삭제를 할지 말지에 대해 결정을 하게 됩니다.

만약 이 과정을 거치지 않은 상태로 Leader가 중간에 Down 되버린다면 어떻게 될까요?
Follower가 복구되자마자 message2가 삭제되고 Leader도 Down 되었다고 가정해봅시다.
이렇게 된다면 아래 이미지와 같이 기존 Follower 역할을 하던 파티션은 message2를 유실하게 되고, New Leader와 이전 Leader는 Sync가 맞지 않은 상태로 운용되어 버립니다.

alt

하지만 리더에포크를 사용한다면 message2는 바로 삭제되지 않고 Leader와 message가 일치하는지 비교한 이후에 삭제할지 말지 결정하기 때문에 메시지의 유실은 일어나지 않습니다.

역으로 이전 Leader였던 파티션이 다시 복구되었을때도 리더에포크를 사용해 NewLeader와 Sync를 맞추도록 동작합니다.
alt

이러한 리더에포크는 leader-epoch-checkpoint 파일에서 관리가 됩니다.
만약 Leader가 다운된 이후에 복구가 되었다면 이 파일을 보고 msg sync를 맞추게 되는 것입니다.

아래와 같이 NewLeader 서버에서 leader-epoch-checkpoint를 보면 메시지를 확인 할 수 있습니다.

1
2
3
4
5
6
[NewLeader Broker]$ cat /data/kafka-logs/peter-test02-0/leader-epoch-checkpoint

0
2 -> 현재 leader epoch 개수
0 0 -> leader epoch 0번일때 commit된 offset 번호 0
1 1 -> leader epoch 1번일때 commit된 offset 번호 1

이전 Leader 파티션이였던 브로커는 이를 통해서 “내가 Leader 였을때는 0 Offset 까지 관리했었고 NewLeader는 1번 Offset까지 관리 중이니깐 지금 내가 가지고 있는 1번 Offset은 Sync 보장이 되지 않을 수도 있겠구나” 라고 판단을 하고 message2를 지우고 message3을 1번 offset에 저장하게 되는 것입니다.

“message2는 유실이 되었는데 이것은 문제가 되지 않냐?”라고 생각할 수 있는데 리더에포크는 sync를 맞추는 것에 대한 개념이기 때문에 이는 문제가 되지 않는 것으로 보입니다.



컨트롤러

이제는 리더 선출의 역할을 맡고있는 컨트롤러에 대해 알아봅시다.
카프카 클러스터 중 하나의 브로커가 컨트롤러 역할을 하게 되며, 파티션의 ISR 리스트 중에서 리더를 선출하게 됩니다.
리더를 선출하기 위한 ISR 리스트 정보는 안전한 저장소에 보관되어야 하는데, 가용성 보장을 위해 주키퍼에 저장합니다.

리더 선출 예시

앞선 소개한 예시인 peter-test-01 토픽을 이용해서 리더 선출이 어떻게 이뤄지는지 대해 살펴보겠습니다.

예기치 못한 종료 과정

토픽이름 pter-test-01
파티션수 1
리플리케이션 팩터 수 2
브로커 배치 1,3번 브로커
리더위치 1번 브로커

alt

  1. 파티션 0번의 리더가 있는 브로커 1번이 예기치 않게 다운
  2. 주키퍼는 1번 브로커와 연결이 끊어진 후, 0번 파티션의 ISR에서 변화가 생겼음을 감지
  3. 컨트롤러는 주키퍼 워치를 통해 0번 파티션에 변화가 생긴 것을 감지하고 해당 파티션 ISR 중 3번을 새로운 리더로 선출
  4. 컨트롤러는 0번 파티션의 새로운 리더가 3이라는 정보를 주키퍼에 기록
  5. 컨트롤러는 이렇게 갱신된 정보는 현재 활성화 상태인 모든 브로커에 전파

리더 선출 진행 속도

리더 선출 과정은 위와 같이 컨트롤러에 의해 이뤄집니다. 이 속도는 파티션당 0.2 sec씩 소요됩니다. 파티션의 개수가 많아질수록 속도가 증가하게 되며 1만개의 파티션이 구성된 경우에는 약 3분의 시간이 소요됩니다.

만약 실시간 서비스를 운영중이라면 이러한 리더 선출 속도는 치명적일 수 있는데 다행히 2018년 11월에 릴리즈된 카프카 1.1.0 부터는 이를 개선한 상태입니다.

기존에는 6분 30초 소요되던 작업이 1.1.0 부터는 약 3초만에 완료됩니다.

예정된 종료인 경우

예정된 종료인 경우는 관리자가 의도적으로 종료한 경우를 뜻합니다.
이 경우에는 앞선 동작과는 조금 다릅니다.

alt

  1. 관리자가 브로커 종료 명령어를 실행하고, SIG_TERM 신호가 브로커에게 전달됩니다.
  2. SIG_TERM 신호를 받은 브로커는 컨트롤러에게 알립니다.
  3. 컨트롤러는 리더 선출 작업을 진행하고, 해당 정보를 주키퍼에 기록합니다
  4. 컨트롤러는 새로운 리더 정보를 다른 브로커들에게 전송합니다.
  5. 컨트롤러는 종료 요청을 보낸 브로커에게 정상 종료한다는 응답을 보냅니다.
  6. 응답을 받은 브로커는 캐시에 있는 내용을 디스크에 저장하고 종료합니다.
    예정되지 않은 종료와 차이는 무엇일까요?
    바로 downtime 입니다.

이 둘의 차이는 다음 리더를 선출할때 “리더 파티션이 동작되고 있는 상태인가?”를 기점으로 갈립니다.

예상치 못하게 브로커가 종료된다면 리더 선출전까지는 downtime이 계속 누적이 되나, 의도된 종료에서는 그렇지 않다는 것이 가장 큰 차이입니다.

물론 예정된 종료또한 downtime이 발생하긴 하지만 갑작스럽게 종료되는 경우와 비교해서는 더 빠르게 동작하게 됩니다.

이를 사용하려면 server.properties에서 controlled.shutdown.enable = true 설정이 된 상태로 운영되어야 합니다.



로그(로그 세그먼트)

카프카의 토픽으로 들어오는 메시지(레코드)는 세그먼트(로그 세그먼트)라는 파일에 저장됩니다.
로그 세그먼트에는 메시지의 내용뿐 아니라 메시지의 Key, Value, Offset, Message 크기와 같은 정보들이 함께 저장되며 이들은 브로커의 로컬 디스크에 보관합니다.

제한없이 파일 보관을 한다면 관리가 어렵기 때문에 기본 제한 값은 1GB로 설정되어있습니다.

만약 메시지를 계속 append 하다가 파일의 크기가 1GB를 넘어가면 다음 파일에 저장하는 방식으로 진행됩니다.

이러한 파일들은 계속 유지가 된다면 언젠가는 디스크의 용량이 터지기 때문에 카프카에서는 이에 대해 관리할 수 있도록 로그 세그먼트 삭제컴팩션을 제공합니다.

차례로 알아보도록 합시다.

로그 세그먼트 삭제

오래된 로그는 삭제하는 정책입니다. 카프카는 기본적으로 로그 삭제 정책을 적용합니다. retention.ms 을 통해서 관리하며 5분이 디폴트 입니다.

하나의 로그가 적재된 이후 5분 뒤에는 삭제가 된다는 것을 뜻합니다. /data/kafka-logs/partition 위치를 보면 확인이 가능합니다.

로그 세그먼트 컴팩션

컴팩션은 로그 세그먼트를 삭제하지 않고 저장을 해두는 방식을 말합니다. 모든 메시지를 저장하지 않고 Key값을 기준으로 마지막의 데이터만 보관합니다.

이후에 나오는 consumer group별 offset을 관리하는 __consumer_offset 토픽이 이와 동일한 방식으로 관리됩니다.

alt
[출처] : https://kafka.apache.org/documentation/#compaction

이러한 로그 컴팩션을 이용하기 위해서는 Producer에서 Message 전달시 key값 또한 필수로 보내도록 설정해야합니다.
왜냐면 기본적으로 value에 대해서만 필수로 보내도록 설정되어 있기 때문입니다.

로그 세그먼트 컴팩션의 장점

이러한 과정의 장점은 무엇일까요? 바로 빠른 장애 복구에 있습니다.

장애 복구시에 전체 로그를 복구하지 않고, 메시지의 키를 기준으로 최신의 상태만 복구하기 때문에 전체 로그를 복구할 때보다 복구 시간을 줄일 수 있다는 장점이 있습니다.

단, 이는 키값을 기준으로 최종값만 필요한 과정에서만 적용하니 메시지가 유실되지 말아야하는 경우에서는 적용되지 않아야 합니다.

로그 세그먼트 컴팩션 관련 옵션

옵션 이름 옵션값 적용 범위 설명
cleanup.policy compact 토픽의 옵션으로 적용 토픽 레벨에서 로그 컴팩션을 설정할 때 적용하는 옵션
log.cleanup.policy compact 브로커의 설정 파일에 적용 브로커 레벨에서 로그 컴팩션을 설정할 때 적용하는 옵션
log.cleaner.min.compaction.lag.ms 0 브로커의 설정 파일에 적용 메시지가 기록된 후 컴팩션하기 전 경과되어야 할 최소시간을 지정합니다. 만약 이 옵션을 설정하지 않으면 마지막 세그먼트를 제외하고 모든 세그먼트를 컴팩션 할 수 있습니다.
log.cleaner.max.compaction.lag.ms 92233720368… 브로커에 설정 파일에 적용 메시지가 기록된 후 컴팩션하기 전 경과되어야 할 최대 시간을 지정
log.cleaner.min.cleanable.ratio 0.5 브로커의 설정 파일에 적용 로그에서 압축이 되지 않은 부분을 더티(dirty)라고 표현합니다. ‘전체로그’ 대비 ‘더티’의 비율이 50%가 넘으면 로그 캠펴션이 실행됩니다.


글을 마치며

오늘은 카프카의 내부 구현 상태에 대해서 정리를 해봤습니다.
정리를 하면서 “가용성 보장을 위해 정말 많은 노력을 기울였다”라고 생각하지 않을 수 없었습니다.

개인적으로 카프카 개발자들이 예상치 못한 케이스를 발견해서 에러를 만들어보고 싶은 욕구가 생깁니다.

다음 글에서는 프로듀서에 대해서 정리해보도록 하겠습니다. 😃


참고자료

  • 《실전 카프카 개발부터 운영까지》- 고승범 지음

태그:

카테고리:

업데이트:

댓글남기기