기록 블로그

프로그래머스 데브코스-데이터 엔지니어/TIL(Today I Learned)

07/13 69일차 Kafka CLI, Topic, Consumer, ksqlDB

usiohc 2023. 7. 13. 17:55

CLI, Topic, Consumer, ksqlDB


주요 메모 사항


Kafka Client Tool

Kafka CLI Tools 접근 방법

docker ps를 통해 Broker의 Container ID 혹은 Container 이름 파악

 

해당 컨테이너로 로그인

ㄴ $ docker exec -it Broker_Container_ID sh

 

거기서 다양한 kafka 관련 클라이언트 툴을 사용 가능

ㄴ kafka-topics

ㄴ kafka-configs

ㄴ kafka-console-consumer

ㄴ kafka-console-producer

 


docker 컨테이너 접속

kafka1의 이름을 가진 컨테이너가 Broker

 

kafka-topics

$ kafka-topics --bootstrap-server kafka1:9092 --list

 

 

$ kafka-topics --bootstrap-server kafka1:9092 --delete --topic topic_test

 

kafka-console-producer

Command line을 통해 Topic 만들고 Message 생성 가능

$ kafka-console-producer --bootstrap-server kafka1:9092 --topic test_console

kafka-console-consumer

Command line을 통해 Topic에서 Message 읽기 가능

--from-beginning 옵션이 있으면 처음부터 읽음 (earliest). 아니면 latest로 동작

$ kafka-console-consumer --bootstrap-server kafka1:9092 --topic test_console --from-beginning

 

 

두 Console 프로세스들의 Side-by-side 실행

터미널을 하나 열고 동일 Broker 로그인 후 console-producer로 메세지 발생

왼쪽 Producer

$ kafka-console-producer --bootstrap-server kafka1:9092 --topic test_console

 

오른쪽 Consumer

$ kafka-console-consumer --bootstrap-server kafka1:9092 --topic test_console

 

 

 

Topic의 메세지들을 Web에서 볼 수 있음

 

 


Topic 파라미터 설정

Topic과 관계된 파라미터들을 KafkaProducer를 통해 설정

Topic 생성시 다수의 Partition이나 Replica를 주려면

- 먼저 KafkaAdminClient 오브젝트를 생성하고 create_topics 함수로 Topic을 추가

- create_topics의 인자로는 NewTopic 클래스의 오브젝트를 지정

NewTopic 클래스

 

 

Kafka Producer 동작

 

 

 

KafkaProducer로 토픽 만들기

랜덤하게 사람 정보를 만들어서 저장하는 Kafka Producer를 구현

ㄴ Faker라는 모듈 사용: pip3 install faker

ㄴ pydantic의 BaseModel을 사용하여 메세지 클래스를 구현 (Person)

    ㄴ pip3 install pydantic

ㄴ 이번에는 Topic을 먼저 만들고 진행

 

pip install

 

설치가 완료되면 fake_person_producer  실행

 

Web UI

 

 


Consumer 옵션 살펴보기

KafkaConsumer 파라미터

Topic 이름을 KafkaConsumer의 첫 번째 인자로 지정 혹은 나중에 별도로 subscribe를 호출해서 지정

 

Consumer가 다수의 Partitions들로부터 어떻게 읽나?

Consumer가 하나이고 다수의 Partitions들로 구성된 Topic으로부터 읽어야한다면?

ㄴ Consumer는 각 Partition들로부터 라운드 로빈 형태로 하나씩 읽게 됨

ㄴ 이 경우 병렬성이 떨어지고 데이터 생산 속도에 따라 Backpressure가 심해질 수 있음

ㄴ 이를 해결하기 위한 것이 뒤에 이야기할 Consumer Group

 

  1. 컨슈머 그룹 설정:
    • 여러 컨슈머로 구성된 컨슈머 그룹을 생성합니다.
    • 컨슈머 그룹은 동일한 주제에 대해 데이터를 병렬로 처리하기 위해 사용됩니다.
    • 각 컨슈머 그룹은 주제의 파티션을 공정하게 분배받아 처리합니다.
  2. 파티션 할당 (Partition Assignment):
    • 컨슈머 그룹이 주제의 데이터를 처리하기 위해 파티션을 할당받습니다.
    • 할당 과정은 Kafka 클러스터의 컨트롤러에 의해 조정됩니다.
    • 할당된 파티션은 각 컨슈머에게 균등하게 분배되어 데이터를 처리합니다.
    • 할당된 파티션은 해당 컨슈머의 오프셋을 추적하여 읽은 데이터의 위치를 기록합니다.
  3. 데이터 읽기:
    • 각 컨슈머는 할당받은 파티션에서 데이터를 읽습니다.
    • 컨슈머는 각 파티션의 리더로부터 데이터를 읽게 됩니다.
    • 컨슈머는 오프셋을 기반으로 데이터의 읽기 위치를 추적하여, 이전에 읽은 데이터 다음부터 읽어나갑니다.
    • 컨슈머는 일정한 주기로 Kafka 브로커에게 오프셋을 커밋하여 읽은 위치를 기록합니다.

 

한 프로세스에서 다수의 Topic을 읽는 것 가능

ㄴ Topic 수만큼 KafkaConsumer 인스턴스 생성하고 별도의 Group ID와 Client ID를 지정해야함

 

 

Consumer Group이란?

- Consumer가 Topic을 읽기 시작하면 해당 Topic내 일부 Partition들이 자동으로 할당됨

 

- Consumer의 수보다 Partition의 수가 더 많은 경우, Partition은 라운드 로빈 방식으로 Consumer들에게 할당됨 (한 Partition은 한 Consumer에게만 할당됨)

ㄴ 이를 통해 데이터 소비 병렬성을 늘리고 Backpressure 경감

ㄴ 그리고 Consumer가 일부 중단되더라도 계속해서 데이터 처리 가능

 

- Consumer Group Rebalancing

ㄴ 기존 Consumer가 무슨 이유로 사라지거나 새로운 Consumer가 Group에 참여하는 경우 Partition들이 다시 지정이 되어야함. 이를 Consumer Group Rebalancing이라고 부르면 이는 Kafka에서 알아서 수행해줌

 


autocommit_consumer.py : Offset Auto Commit이 True

 

manualcommit_consumer.py : Offset Auto Commit이 False

 


Message Processing Guarantee 방식

실시간 메시지 처리 및 전송 관점에서 시스템의 보장 방식에는 크게 3가지가 존재

위에서부터 아래로 갈수록 쉬워짐

방식 설명
Exactly Once Exactly Once('정확히 한 번')는 각 Message가 Consumer에게 정확히 한 번만 전달된다는 것을 보장.
ㄴ > 네트워크 문제, 장애 또는 재시도 가능성으로 아주 어려운 문제.

1> Producer 단에서는 enable_idempotence를 True로 설정
2> Producer에서 메세지를 쓸 때와 Consumer에서 읽을 때 Transaction API를 사용
At Least Once At Least Once ('적어도 한 번 이상')는 모든 메시지가 Consumer에게 적어도 한 번 이상 전달되도록 보장하지만, 메시지 중복 가능성 존재. 이 경우 Consumer는 중복 메시지를 처리하기 위해 중복 제거 메커니즘을 구현해야함 (멱등성).
ㄴ 이는 보통 Consumer가 직접 오프셋을 커밋을 할때 발생함.
At Most Once At Most Once ('최대 한 번만')는 메시지 손실 가능성에 중점을 둠. 이는 메시지가 손실될 수는 있지만 중복이 없음을의미 가장 흔한 메시지 전송 보장 방식 (기본 방식)

 

 

 

 

 

 

Consumer/Producer 패턴

- 많은 경우 Consumer는 한 Topic의 메세지를 소비해서 새로운 Topic을 만들기도함

 

- 즉 Consumer이면서 Producer로 동작하는 것이 아주 흔한 패턴임

 

- 데이터 Transformation, Filtering, Enrichment

ㄴ 동일한 프로세스 내에서 Kafka Consumer를 사용하여 한 Topic에서 메시지를 읽고 필요한 데이터 변환 또는 Enrichment을 수행한 다음, Producer를 사용하여 수정된 데이터를 다른 Topic으로 푸시 가능

 

 


ksqlDB 사용

 

- REST API나 ksql 클라이언트 툴을 사용해서 Topic을 테이블처럼 SQL로 조작

- 여기서는 ksql을 사용하는 간단한 데모

ㄴ docker ps 후 confluentinc/cp-ksqldb-server의 Container ID 복사

ㄴ docker exec -it ContainerID sh

 

 

- ksql 실행후 아래 두 개의 명령 실행

ㄴ CREATE STREAM my_stream (id STRING, name STRING, title STRING) with (kafka_topic='fake_people', value_format='JSON');

ㄴ SELECT * FROM my_stream;