본문 바로가기
허브 살리기 프로젝트

Spring Kafka Record 삭제하기

by jay-choe 2024. 5. 1.

Spring에서 카프카로 '정확히 한 번' 을 어떻게 테스트 할 수 있을지 고민을 하다가 내린 결론은 테스트 할 수 없을 것 같다는 것이다.

 

MockProducer 등등 에러를 재현할 수 있는 객체들이 있지만, 네트워크 에러를 재현 하는 건 어떻게 해야 할 지.. 감이 아직 안 온다. (디버깅 포인트 걸고.. 카프카 중지시키기?? 밖에 떠오르지 않는다.)

 

코드를 짜보면서 삽질을 조금 하다가 토픽의 레코드 수를 확인하는 코드를 작성하였는데, 다음과 같이 작성하였다.

 

 fun getMessageCount(topicName: String, kafkaTemplate: KafkaTemplate<String, PaymentDone>): Long {
        val adminClient: AdminClient =
            AdminClient.create(kafkaTemplate.producerFactory.configurationProperties)
        val partitions = getTopicPartitions(topicName, adminClient)

        return partitions.sumOf {
            val startOffset = getPartitionPerBeginOffsetMap(partitions, adminClient)[it]!!.offset()
            val endOffSet = getPartitionPerEndOffsetMap(partitions, adminClient)[it]!!.offset()

            endOffSet - startOffset
        }
    }
    
    private fun getTopicPartitions(topicName: String, adminClient: AdminClient) : List<TopicPartition> {
        return adminClient.describeTopics(setOf(topicName))
            .allTopicNames().get()[topicName]?.partitions()!!
            .map { TopicPartition(topicName, it.partition()) }
    }

    private fun getPartitionPerBeginOffsetMap(partitions: List<TopicPartition>, adminClient: AdminClient)
    : Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> {
        val beginOffsetRequest = partitions.associateWith { OffsetSpec.earliest() }
        return adminClient.listOffsets(beginOffsetRequest).all().get()
    }

    private fun getPartitionPerEndOffsetMap(partitions: List<TopicPartition>, adminClient: AdminClient)
            : Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> {
        val endOffsetRequest = partitions.associateWith { OffsetSpec.latest() }
        return adminClient.listOffsets(endOffsetRequest).all().get()
    }

 

토픽명이랑 스프링에서 카프카 메세지를 전송할 때 사용하는 KafkaTemplate은 논외로 하고

 

메세지 갯수를 새려면 각 파티션 별 가장 앞에있는 offset과 가장 마지막(코드에서는 최신이라고 한다) offset의 차이의 합을 구해주면 해당 토픽이 가지고 있는 메세지 수의 총 합이 된다.

 

테스트 코드를 짜면서 특정 메세지가 발급 된 수를 테스트 하고싶었는데
처음 테스트는 성공하지만 이후 테스트 부터는 메세지 수가 누적되기 때문에
누적되는 것을 배제하고 테스트 하기 위해 BeforeEach 같은 Junit 기능을 사용해서 데이터를 날리고 테스트를 하려고 했다.

 

데이터를 날리는 방법은 2가지가 있는데 하나는 토픽을 날리고 재생성 하는 것이었고, 다른 하나는 토픽별 레코드를 전부 지우는 것이다.

 

토픽별 레코드를 전부 지우는 함수들을 보다가 레코드를 지울 때는 파티션 정보와 오프샛 번호 이 두 가지 파라미터 밖에 안 받고 'RecordsToDelete.beforeOffset' 함수로 오프샛 번호를 넣어주는 걸로 봐서는 특정 오프셋 이전의 데이터들을 지우는 것 외에 코드 상으로 할 수 있는 것이 없어 보였다.

 

우선 모든 레코드를 지우는 것은 모든 파티션의 가장 최신의 오프셋을 구한 후, 해당 오프셋 기준으로 다음과 같이 제거하였다.

 

 fun deleteAllRecords(topicName: String, kafkaTemplate: KafkaTemplate<String, PaymentDone>) {
        val adminClient: AdminClient =
            AdminClient.create(kafkaTemplate.producerFactory.configurationProperties)
        val partitions = getTopicPartitions(topicName, adminClient)

        val partitionPerEndOffsetMap = getPartitionPerEndOffsetMap(partitions, adminClient)

        partitionPerEndOffsetMap.forEach{
            adminClient.deleteRecords(mapOf(it.key to RecordsToDelete.beforeOffset(it.value.offset()))).all().get()
        }
    }

 

 

근데 특정 오프셋 이후의 데이터만 지우고 싶다면 제공되는 함수만으로는 할 수가 없는데 왜 그런지 찾아 봤는데 다음과 같이 정리 할 수 있었다.

 

  • 성능 최적화: Kafka는 고성능을 유지하기 위해 디스크 I/O 작업을 최소화하고 순차적 쓰기를 최대화한다. 데이터 파일 중간의 레코드를 삭제하는 기능을 지원하면, 파일 재작성이나 레코드 이동과 같은 비효율적인 작업이 필요하게 되어 성능에 부정적인 영향을 미칠 수 있다.
  • 복제 및 일관성: Kafka는 클러스터 내 여러 브로커에 데이터를 복제하여 고가용성을 보장함. 특정 오프셋 이후의 레코드를 삭제하는 기능은 복제 과정에서 데이터 일관성을 유지하는 데 복잡성과 오류 가능성을 크게 증가시킬 수 있다.

Kafka의 성능에서 중요한 부분은 디스크의 순차 읽기이기 때문에 중간의 특정 범위의 데이터를 지우게 되면 파일을 재작성해서 중간 제거 된 부분을 채워주거나 등 비효율적인 작업이 생길 것이고 실시간 데이터 처리가 목적인 어플리케이션이기 때문에 실시간성에서 성능적인 마이너스가 된다. 추가로  복잡성이 늘어나게 된다.

 

마찬가지로 복제 및 일관성 유지도 같은 맥락에서 복잡성이 늘어나게 된다.

 

그래서 지울거면 아예 특정 오프셋 이전의 데이터를 지워버리고(이는 파일 재작성 필요 없이 계속 새로운 데이터를 append 하면 성능에 안 좋은 영향을 줄 포인트가 없어보인다.) 새로운 데이터는 계속 추가되는 방식을 선택한 것 같다.