[Kafka] 이중화 서버에서 Kafka의 데이터를 순서 보장하여 읽기

 

원하는 시나리오

1. Producer 서버에서 채팅 메시지를 kafka로 전송할 것이다

2. Listner를 통해서 kafka에 저장된 메시지를 이중화된 Consumer 서버 중 하나가 받을 것이다

3. 이때, 각각의 Consumer 서버는 순서대로 메시지를 처리할 수 있어야 한다 (중복처리, 중복 메시지 발송이 일어나지 않도록)

 

채팅 메시지가 읽히는 순서를 보장하기 위해서 SpringBoot 서버, kafka 서버 각각을 어떻게 설정해야하는가?

 

 

Kafka 설정하기

파티션(Partition) 설정

파티션은 kafka에서 메시지가 병렬 처리될 수 있는 단위이다. 그러나, 메시지 순서를 보장하려면 특정 토픽에 대해서 단일 파티션을 사용해야한다. Kafka가 파티션 내에서만 메시지 순서를 보장하기 때문이다. 

따라서, Producer측에서는 해당 Topic에 대한 메시지들이 저장될 곳으로  '단일 파티션'을 이용해야 한다

 

예시로 들면 사전에 Kafka Cluster에 3개의 브로커가 띄워져있다

그리고 Chat Topic에 대한 메시지를 발송하는데, 이 메시지가 갈 곳은 한 Partition으로만 고정시켜놨으니 해당 파티션이 본인의 리더 브로커를 찾아가 저장될 테고, Consumer 측에서 이 파티션만 읽으라고 시키면 된다. 

 

덧붙이자면, 원래는 하나의 토픽에 대한 메시지들은 여러 파티션에 나눠질 수 있도록 설계되어있다. 그리고 그 파티션은 각 브로커 내에 분산되어 있는데, 만일 여러 파티션에 걸쳐 분산되어 있는 메시지들이라면 순서가 보장되지 않게 되는 것이다.

 

이를 적용시키자면, Chat의 경우는 메시지 순서 보장을 위하여 단일 파티션 구성을 고려했다. 반대로 Log와 같이 읽기 순서 보장이 필요하지 않고 그 처리량도 많다면 특정 유형에 따라 로그 데이터를 파티션 별로 분산 지정시킴이 필요해진다.

 

복제계수(Replication Factor) 설정

예를 먼저 들어 복제 계수가 2이라고 했을 때, 이 Topic에 대한 데이터는 kafka 클러스터 내의 2개의 브로커에 복제된다. 따라서 이것이 성립되려면 복제 계수는 사용중인 카프카의  브로커 개수보다 적어야 한다. (이 상황에선 Broker가 3개 이상이어야 만족하는 것처럼)

 

결론적으로 복제계수는 2이상으로 함으로써 Kafka 클러스터의 가용성을 유지하도록 한다. 최소한 다른 곳에 한 곳 이상 복제를 시켰으니, 데이터의 손실을 최소화할 수 있게 되는 것이다.

더불어 복제 수준은 저장된 메시지, 오프셋, 메시지 순서를 포함하여 완전히 동일함으로써, 리더 브로커의 장애 발생 시 대체할 수 있도록 되어있다.

 

메시지 키 설정

기존에 메시지 키를 이용하면 같은 키를 가진 메시지는 동일한 파티션에 전송 되도록 하는 방법이 있다. 이를 활용하자면 아예 키를 설정하지 않거나, 모든 메시지를 같은 키로 설정하여 단일 파티션에 메시지에 기록하는 방법이 있다.

 

 

SpringBoot에서 kafka 설정하기

위에서 언급한 사항을 코드단위로 설정해본다

Producer 서버 측 application과 Topic 설정

spring.kafka.producer.acks=all
spring.kafka.producer.retries=3

acks=all를 통해, 동기화된 복제본이 메시지를 받았을 때 확인 응답을 받도록 한다. 따라서 동기화된 복제본이 메시지를 수신하면, Producer 서버가 성공적인 전송이 되었다고 간주하며 전송이 실패했다고 판단된다면 최대 3번까지 재전송을 수행한다. 설정을 다했다면 아래처럼 순서대로 offset을 가진 데이터들이 한 파티션에 쌓이게 된다.

@Configuration
public class UsrConfigKafkaTopic {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public KafkaAdmin kafkaAdmin() {

        Map<String, Object> configs = new HashMap<>();
        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);

        return new KafkaAdmin(configs);
    }

    @Bean
    public NewTopic topic1() {
        return new NewTopic("chat", 1, (short) 3);
    }

}

사전에 토픽의 수동 생성 설정을 해놨기에, SpringBoot에서 Config를 통해 토픽 생성을 시켜준다

중요한 점은 NewTopic 객체 Bean 설정 부분인데 각 파라미터는 (토픽명, 할당 파티션 수, 복제 계수) 이다.

할당 파티션 수를 하나로 하였으니 순서 보장에 유리하다 (단, 병렬 처리 및 확장성의 제한이 온다) 더불어 현재 내 브로커 수는 3개이기 때문에 3개의 브로커 모두에게 분산 저장하도록 max값인 3을 지정했다.

 

 

Consumer 서버 측 application과 KafkaListener 객체 설정

spring.kafka.consumer.enable-auto-commit=true  # 자동으로 오프셋을 커밋
spring.kafka.consumer.auto-commit-interval=100  # 100ms 마다 오프셋을 자동으로 커밋

설정 파일에서는 메시지 처리 후, Kafka에 자동으로 Offset를 커밋해주는 설정을 넣는다 (내가 여기까지 했어~)

그 자동 커밋 기준으로 interval 간격은 100ms으로 설정했다

 

@KafkaListener(topics = "chat", groupId = "chat-group")
public void listen(String message) {
    // 메시지 처리 로직
}

직관적으로 명시되어있는 값이 topic, group으로 두 개 있다.

해당 서버에서 listener는 'chat' 토픽에 대하여 본인 서버가 특정 'chat-group'라는 소속을 밝히고 있다. 따라서, 이 설정값을 지닌 두 개의 이중화 SpringBoot 인스턴스는 같은 컨슈머 그룹에 속해있기에 하나의 파티션을 공유하며 개 중 하나의 컨슈머가 메시지를 처리할 수 있게 된다.

 

 

 

 

 

 

Kafka Consumer Groups by Example

Kafka Consumer Groups are the way to horizontally scale out event consumption from Kafka topics... with failover resiliency.  'With failover resiliency' you say!?  That sounds interesting.  Well, hold on, let's leave out the resiliency part for now and

supergloo.com

 

카프카가 무엇이고, 왜 사용하는 것 일까?

메시지 큐와 MOM 출처: https://www.cloudamqp.com/blog/what-is-message-queuing.html 카프카를 이해하기 위해서는 메시지 큐와 MOM을 먼저 알아야한다. 메시지 큐는 분산화된 환경에서 발신자와 수신자 사이에서

hudi.blog

 

POINT-TO-POINT AND PUBLISH/SUBSCRIBE MESSAGING MODEL

Origin source: https://www.programmingsharing.com/point-to-point-and-publish-subscribe-messaging-mode...

dev.to

 

Kafka Consumer Design: Consumers, Consumer Groups, and Offsets | Confluent Documentation

An Apache Kafka® consumer is a client application that reads and processes events from a broker. A consumer issues fetch requests to brokers that are leading partitions that it wants to consume from. When a consumer issues a request, it specifies a log of

docs.confluent.io