Kafka

프로듀서의 내부 동작 원리(파티셔너, 배치)

반응형

파티셔너

  • 프로듀서는 데이터를 전송할 때 send() 메소드를 수행함으로써 프로듀서 → 시리얼 라이저 → 파티셔너를 거쳐 카프 카프카의 Topic으로 전송됩니다.
  • 이때 복수개의 Topic중 어떤 Topic으로 메시지 를 전송할 지를 결정해야 하는데 이때 사용하는 것이 파티셔너입니다.
  • 파티셔너는 예상치 못한 많은 양의 메시지가 카프카로 들어오는 경우 파티션을 늘릴 수 있는 기능을 제공합니다. 그러나 이러한 방식은 기존에 저장하던 Hash 값의 파티션의 위치와 다르게 저장되기 때문에 관리자의 의도와는 다른 방식으로 메시지 전송이 이뤄질수 있으므로 권장하는 방식은 아닙니다.

파티셔너의 라운드 로빈 전략

  • 프로듀서가 메시지를 생산할 때 키값은 필수값이 아니므로 개발자는 별도의 레코드 키 값을 지정하지 않고 메시지를 전송할 수 있습니다.
  • 이때 키 값을 지정하지 않는다면 키 값은 null이 되어 파티셔너의 기본 메시지 할당 정책인 라운드 로빈 알고리즘을 사용하게 됩니다.
  • 그러나 라운드 로빈 알고리즘은 배치 전송시 배치를 수행하는 최소 레코드 수를 충족하지 못할 경우 수행을 안한다는 단점을 가지고 있습니다.
  • 예를 들어 5개의 레코드를 3개의 파티션에 보내는 작업을 수행한다면 각 파티션안에 레코드의 수는 [2, 2, 1]로 채워져 있습니다.(전송되기위한 파티션의 기본값은 3, 수정 가능)
  • 위와 같은 상황일 경우 각 파티션은 개수를 채우지 못했기 때문에 브로커에 메시지를 보내지 않게 됩니다.
  • 이를 해결하기 위해 프로듀셔의 옵션을 조절해서 특정 시간을 초과할 경우 카프로 레코드를 전송하도록 설정할 수 있지만 배치의 압축 효과를 얻지 못한 채 레코드 하나만 카프카로 전송되므로 비효율적입니다.
  • 이런 문제를 해결하기 위해 카프카 진영에서는 스티키 파티셔닝 전략을 개발하여 공개했습니다.

라운드 로빈 전략의 단점을 극복하기 위한 스티키 파티셔닝 전략

  • 스티키 파티셔닝이란 하나의 파티션에 레코드 수를 먼저 채워 카프카로 빠르게 배치 전송하는 전략을 말합니다.
  • 예를 들어 5개의 메시지가 3개의 파티션을 이루고 있는 Topic에 도달하게 된다면 [3, 2, 0] 순으로 레코드가 파티션에 쌓이게 되고 1번 파티션의 메시지는 카프카로 전송하게 됩니다.(전송되기위한 파티션의 기본값은 3, 수정 가능)
  • Confluent에서 발표한 글에 따르면 기본 설정 대비 30% 이상 지연시간이 감소하고 프로듀서의 CPU 사용률도 줄어드는 효과를 얻을 수 있었다고 합니다.

프로듀서의 배치

  • 카프카에서는 토픽의 처리량을 높이기 위한 방법으로 레코드의 인입을 토픽의 파티션으로 나눠 처리하며 파티션으로 들어온 레코드의 개수가 일정 개수가 되었을 경우 배치 처리하는 방법을 권장합니다.
  • 이러한 배치 처리를 권장하기 위해 카프카에서는 아래와 같은 옵션을 제공합니다.
    • buffer.memory: 카프카로 메시지들을 전송하기 위해 담아두는 프로듀서의 버퍼 메모리 옵션을 의미합니다. 기본값은 32MB로 설정되어 있으며 개발자가 수정할 수 있습니다.
    • batch.size: 배치 전송을 위해 메시지 들을 묶는 단위를 설정하는 배치 크기 옵션을 의미합니다.
    • linger.ms : 배치 전송을 위해 버퍼 메모리에서 대기하는 메시지들의 최대 대기 시간을 설정하는 옵션. 단위는 ms이며 기본값은 0입니다. 기본값이 0인 경우에는 대기 시간이 없다는 것을 의미하기 때문에 메시지가 카프카로 즉시 전송됩니다.
  • 배치를 설정하지 않고 메시지를 카프카에게 1000번 보내야 하는 상황에서 batch.size = 100 으로 설정할 경우에 10번만 보내야 되기 때문에 카프카의 성능을 끌어낼수 있습니다.
  • 하지만 배치 처리가 항상 정답만은 아닙니다. 프로듀서가 카프카로 즉시 메시지를 보내야 하는 상황, 네트워크 환경, 초당 메시지 전송 수 등 여러 환경적 요인에 따라 배치를 사용할 지를 결정해야 합니다.
💡
buffer.memory는 batch.size보다 항상 커야 합니다. 레코드들이 모여 카프카로 메시지가 전송되기 전에 buffer.meory에 보관되기 때문에 레코드가 모이게 되는 단위를 의미하는 batch.size의 크기 보다 buffer.memory는 항상 커야 합니다. 나아가 프로듀서가 전송에 실패하게 되면 재시도를 하게 되는데 이때 재시도 하게되는 레코드의 크기까지 고려하여 여유값을 설정해야 합니다.
💡
배치 처리를 진행하게 되면 처리량을 증가하고 응답률은 감소하게 됩니다.

메시지 전송 방식 정책

  • 프로듀서는 멱등성(결과가 달라지지 않는 성질)을 유지하기 위해 아래와 같은 전송 방식을 제공합니다.

적어도 한 번 전송(무슨 일이 있어도 적어도 한 번은 전송한다.)

  • 적어도 한 번 전송은 프로듀서가 브로커에게 메시지를 전송할 때, 브로커가 메시지를 받았는지 안 받았는지를 고려하지 않고 ACK 응답값이 없다면 재전송 하는 방식입니다.
  • 만약 프로듀서가 전송해준 메시지를 브로커가 저장은 하고 ACK를 보내려고 하는 순간 서버가 다운된다면 프로듀서는 메시지를 중복으로 보낼 수 있습니다.
  • 적어도 한 번 전송은 카프카가 메시지를 보내는 기본 정책입니다.

최대 한 번 전송(무슨 일이 있어도 한 번만 전송한다.)

  • 최대 한 번 전송은 프로듀서가 브로커에게 메시지를 전송할 때 메시지를 한 번만 보내는 방식을 의미합니다.
  • 적어도 한 번 전송과 다르게 최대 한 번 전송은 ACK를 받지 못하더라도 재전송을 하지 않습니다.
  • 일부 메시지가 손실되어도 상관없는 환경에서 사용되게에 적합한 전송 방식입니다.(UDP가 생각나네요)

중복 없이 전송

  • 중복 없이 전송은 프로듀서가 브로커에게 메시지를 전송할 때 중복 없이 전송 하는 방식입니다.
  • 중복 여부를 확인 하기 위해 프로듀서는 브로커에게 메시지를 전송할 때 PID와 메시지 번호를 헤더에 포함하여 전송합니다.
  • 브로커는 메시지를 받게 되면 PID의 메시지 번호를 메모리에 저장하게 됩니다.
  • 브로커는 PID의 메시지 번호를 저장하고 있어 프로듀서가 보낸 메시지의 메시지 번호가 브로커의 메시지 번호보다 1 크지 않은 경우에 중복된 메시지라고 판단합니다
  • 중복 없이 전송 방식은 중복을 피하기 위해 오버헤드가 있는 방향으로 구현되어 있지만 단순히 숫자만 추가하는 방향으로 구현하였기 때문에 기존 성능 대비 20%만 감소하는 방향으로 구현된 정책입니다.
  • 중복 없이 전송하는 방식이 말 그대로 중복없이 전송하지만 반드시 전송을 보장하지는 않습니다. 전송을 보장하기 위해서는 하나의 트랜잭션이 정상적으로 성공했는지의 여부를 확인해야 하는데 이러한 여부는 정확히 한 번 전송 정책에서 확인하고 있습니다.

정확히 한 번 전송

  • 중복 없이 전송의 기능을 가지고 있으면서 전송을 보장하는 정확히 한 번 전송 정책입니다.
  • 카프카는 메시지를 정확히 한 번 보냈는지를 원자적(전체성공 or 전체실패)으로 처리하기 위해 트랜잭션 코디네이터라는 개념을 도입하였습니다.
  • 프로듀서는 트랜잭션 관련 정보를 트랜잭션 코디네이터에게 알리게 되고 알린 정보는 트린잭션 코디네이터가 기록하게 됩니다.
  • 트랜잭션 코디네이터는 컨슈머 그룹 코디네이터와 동일한 개념으로 트랜잭션 코디네이터는 프로듀서의 의해 전송된 메시지를 관리하며 상태값을 Empty(트랜잭션 초기화 상태), Ongoing(작업 시작 상태), PrepareCommit(commit 상태), CompletCommit(commit 성공 상태) 순으로 변경합니다.
  • 정확히 한 번 전송의 트랜잭션 작동 순서

  1. 트랜잭션 코디네이터를 찾기위한 프로듀서는 코디네이터를 가지고 있는 브로커에게 FindCoordinator Request를 요청하고 브로커에게 트랜잭션 코디네이터를 가지고 있는 브로커의 정보를 응답 받습니다.
  1. InitTransactions() 메소드를 사용하여 트랜 잭션 전송을 위한 InitPidRequest를 트랜잭션 코디네이터에 전송합니다.
  1. 트랜잭션을 시작하기 위해 프로듀서는 beginTransaction() 메소드를 수행하여 트랜잭션을 시작합니다.(프로듀서 관점에서는 트랜잭션을 시작한게 맞아 로그를 수집하지만 트랜잭션 코디네이터 관점에서는 첫 번째 레코드가 전송되지 않았기 때문에 시작된것은 아닙니다.)
  1. 프로듀서는 파티셔너를 수행하여 전송할 파티션 정보를 사용해 브로커의 트랜잭션 코디네이터에게 전달합니다. 이때 트랜잭션 코디네이터는 로그에 Onging 상태로 기록합니다.
  1. 프로듀서는 브로커의 파티션에 PID, 에포크,시퀀스 번호를 담아 메시지를 전송합니다.
  1. 프로듀서는 트랜잭션 종료 요청을 의미하는 CommitTransaction() 메소드 또는 abortTransaction()를 호출하여 트랜잭션 코디네이터에게 트랜잭션이 완료되었음을 알립니다. 이때 로그에 PrepareCommit 상태 또는 PrepareAbort를 기록합니다.
  1. 트랜잭션 코디네이터는 트랜잭션 로그에 기록된 토픽의 파티션에 트랜잭션 커밋 표시를 진행합니다. 이때 기록하는 메시지가 컨트롤러 메시지입니다.
  1. 트랜잭션 코디네이터가 로그에 CompletCommit 상태를 기록합니다.
  1. 트랜잭션 코디네이터는 프로듀서에게 해당 트랜잭션이 완료되었음을 알립니다.
💡
트랜잭션을 이용하는 컨슈머는 read_committed 설정을 하면 트랜잭션에 성공한 메시지들만 읽을 수 있습니다.
💡
정확히 한 번 전송은 커밋의 성공, 실패 유무를 확하기 위해 다른 전송 정책들과는 다르게 진행 상태 등 원자성을 확인할 수 있는 정보를 담은 컨트롤러 메시지라는 특수한 메시지 타입을 사용합니다.

전송 정책 한 눈에 비교하기

적어도 한 번 전송최대 한 번 전송중복 없이 전송정확히 한 번 전송
메시지 중복 전송 여부OXXX
메시지 전송 보장 여부OXXO

반응형