kafka란?
kafka란?
카프카(kafka)는 파이프라인, 스트리밍 분석, 데이터 통합 및 미션 크리티컬 애플리케이션을 위해 설계된 고성능 분산 이벤트 스트리밍 플랫폼이다.
Pub-Sub (Publisher-Subscriber) 모델의 메시지 큐 형태로 동작하며 분산환경에 특화되어 있다.
사용하는 이유
카프카는 링크드인(linked-in)에서 개발되었다.
우선 카프카 개발 전 링크드인의 데이터 처리 시스템은 아래 이미지와 같았다.
각 애플리케이션과 DB가 end-to-end로 연결되어 있고 요구사항이 늘어남에 따라 데이터 시스템 복잡도가 높아지면서 다음과 같은 문제가 발생하게 되었다.
- 시스템 복잡도 증가(Complexity)
- 통합된 전송 영역이 없어 데이터 흐름을 파악하기 어렵고, 시스템 관리가 어려움
- 특정 부분에서 장애 발생 시 연결된 모든 앱들을 확인해야하기 때문에 조치 시간 증가
- HW교체 / SW 업그레이드 시 관리포인트가 늘어나고, 작업시간 증가
- 데이터 파이프라인 관리의 어려움
- 각 애플리케이션과 데이터 시스템 간의 별도의 파이프라인이 존재하고, 파이프라인마다 데이터 포맷과 처리 방식이 다름
- 새로운 파이프라인의 확장이 어려워지면서, 확장성 및 유연성이 떨어짐
- 또한 데이터 불일치 가능성이 있어 신뢰도 감소
이러한 문제점들을 해결하기 위해 모든 이벤트/데이터의 흐름을 중앙에서 관리하는 카프카가 개발된 것이다.
카프카를 적용한 후의 모습은 아래와 같다.
동작 방식
카프카는 Pub-Sub 모델의 메세지 큐 형태로 동작한다.
여기서 메세지 큐(Message Queue, MQ)란 메세지 지향 미들웨어를 구현한 시스템으로 프로세스 간의 데이터를 교환할 때 사용하는 기술이다.
- producer: 정보를 제공하는 자
- consumer: 정보를 제공받아서 사용하려는 자
- Queue: producer의 데이터를 임시 저장 및 consumer에 제공하는 곳
위 이미지 처럼 메세지가 Endpoint간에 직접적으로 통신하지 않고 중간의 MQ가 중개를 해주는 점이다.
MQ의 장점은 다음과 같다.
- 비동기: queue라는 임시 저장소가 있기 때문에 나중에 처리 가능
- 낮은 결합도: 애플리케이션과 분리
- 확장성: producer or consumer 서비스를 원하는대로 확장할 수 있음
- 탄력성: consumer 서비스가 다운되더라도 애플리케이션이 중단되는 것은 아니면 메시지는 지속하여 MQ에 남아있다.
- 보장성: MQ에 들어간다면 결국 모든 메시지가 consumer서비스에게 전달된다는 보장을 제공한다.
이 부분을 쉽게 이해하기 위해 유튜브시스템에 비유해서 설명하겠다.
다음과 같이 Publisher,Channel(Topic),Subscriber가 있다.
각각 유튜버, 유튜브알림시스템, 시청자(구독자)로 가정하겠다.
우선 시청자는 자신이 좋아하는 유튜버의 영상을 매번 챙겨보기 위해 유튜버를 구독하게 된다.
그리고 유튜버는 자신이 영상을 하나 만들 때마다 영상을 업로드하게 된다.
그럼 유튜브는 유튜버가 영상을 업로드했으면 해당 유튜버를 구독한 모든 시청자들에게 알림을 전달하게 된다.
여기서 유튜브가 자동으로 구독한 시청자들 모두에게 알림을 보내주는데, 만일 이런 중앙시스템이 없다면 유튜버가 일일이 모든 구독자들에게 영상알림을 보냈어야 할 것이다.
게다가 지금은 구독자가 3명이지만, 만일 1000명이 넘는 구독자들이 있다면 하나하나 보내는 데에도 큰일이 될 것이다.
하지만 유튜브의 알림시스템 덕분에 단순히 영상을 업로드를 한 것만으로 자동으로 메시지를 전달할 수 있고, 업로드로그 또한 자동으로 남길 수 있게 되었다.
지금은 토픽이 알림시스템 밖에 없지만 상황에 따라 다른 시스템(topic)을 추가할 수도 있다.
카프카 구성 요소 및 특징
Topic
- 각각의 메시지를 목적에 맞게 구분할 때 사용한다.
- 메시지를 전송하거나 소비할 때 Topic을 반드시 입력한다.
- Consumer는 자신이 담당하는 Topic의 메시지를 처리한다.
- 한 개의 토픽은 한 개 이상의 파티션으로 구성된다.
Partition
- 분산 처리를 위해 사용된다.
- Topic 생성 시 partition 개수를 지정할 수 있다.
- 파티션이 1개라면 모든 메시지에 대해 순서가 보장된다.
- 파티션 내부에서 각 메시지는 offset(고유번호)로 구분된다.
- 파티션이 여러개라면 Kafka 클러스터가 라운드 로빈 방식으로 분배해서 분산처리되기 때문에 순서가 보장되지 않는다.
- 파티션이 많을수록 처리량이 좋지만 장애 복구 시간이 늘어난다.
Offset
- 컨슈머에서 메세지를 어디까지 읽었는지 저장하는 값
- 컨슈머 그룹의 컨슈머들은 각각의 파티션에 자신이 가져간 메시지의 위치 정보(offset)을 기록
- 컨슈머 장애 발생 후 다시 살아나도, 전에 마지막으로 읽었던 위치에서부터 다시 읽어들일 수 있다.
Producer
- 메시지를 만들어서 카프카 클러스터에 전송한다.
- 메시지 전송 시 Batch처리가 가능하다.
- key값을 지정하여 특정 파티션으로만 전송이 가능하다.
- 전송 acks값을 설정하여 효율성을 높일 수 있다.
- ACKS=0 -> 매우 빠르게 전송. 파티션 리더가 받았는지 알 수 없다.
- ACKS=1 -> 파티션 리더가 받았는지 확인. 기본값
- ACKS=ALL -> 파티션 리더 뿐만 아니라 팔로워까지 메시지를 받았는지 확인
Consumer
- 카프카 클러스터에서 메시지를 읽어서 처리한다.
- 메시지를 Batch처리할 수 있다.
- 한 개의 컨슈머는 여러 개의 토픽을 처리할 수 있다.
- 메시지를 소비하여도 메시지를 삭제하지는 않는다. (Kafka delete policy에 의해 삭제)
- 한번 저장된 메시지를 여러번 소비도 가능하다.
- 컨슈머는 컨슈머 그룹에 속한다.
- 한 개 파티션은 같은 컨슈머그룹의 여러 개의 컨슈머에서 연결할 수 없다.
Broker
- 실행된 카프카 서버를 말한다.
- 프로듀서와 컨슈머는 별도의 애플리케이션으로 구성되는 반면, 브로커는 카프카 자체이다.
- Broker는 Kafka Cluster 내부에 존재한다.
- 서버 내부에 메시지를 저장하고 관리하는 역할을 수행한다.
Zookeeper
- 분산 애플리케이션 관리를 위한 코디네이션 시스템
- 분산 메시지큐의 메타 정보를 중앙에서 관리하는 역할
주요 설계 특징
왜 하나의 topic을 여러개의 partition으로 분산시키는가?
- 병렬로 처리하기 위해 분산 저장을 한다.
- 카프카의 토픽에 메세지가 쓰여지는 것도 어느정도 시간이 소비된다.
- 몇 천건의 메세지가 동시에 카프카에 write되면 병목현상이 발생할 수 있다.
- 따라서 파티션을 여러개 두어서 분산 저장함으로써 write동작을 병렬로 처리할 수 있다.
- 다만, 한번 늘린 파티션은 절대 줄일 수 없기 때문에 운영 중에, 파티션을 늘려야 하는건 충분히 컴토 후 실행되어야한다. (최소한의 파티션으로 운영하고 사용량에 따라 늘리는 것을 권장한다.)
- 파티션을 늘렸을 때 메세지는 Round-Robin 방식으로 쓰여진다. 따라서 하나의 파티션 내에서는 메세지 순서가 보장되지만, 파티션이 여러개일 경우에는 순서가 보장되지 않는다.
컨슈머 그룹은 왜 존재할까?
- consumer의 묶음을 consumer group이라고 한다.
- 컨슈머 그룹은 하나의 topic에 대한 책임을 갖고 있다.
- 즉, 어떤 컨슈머가 down된다면, 파티션 재조정(리밸런싱)을 통해 다른 컨슈머가 해당 파티션의 sub을 맡아서 한다. offset 정보를 그룹간에 공유하고 있기 때문에 down되기 전 마지막으로 읽었던 메시지 위치부터 시작한다.
간단한 예제
우선 도커컴포즈를 이용하여 kafka와 zookeeper를 하나씩 생성해주는 yaml를 작성해준다.(kafdrop은 gui시각화 컨테이너이다)
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.0.0
hostname: zookeeper
container_name: zookeeper
environment:
ZOOKEEPER_SERVER_ID: 1 #주키퍼를 식별하는 아이디로 유일한 값, 1개의 주키퍼를 사용할 예정이라 없어도 문제없음
ZOOKEEPER_CLIENT_PORT: 2181 #주키퍼 포트, 기본 포트로 2181사용
ZOOKEEPER_TICK_TIME: 2000 #클러스터를 구성할 때 동기화를 위한 기본 틱 타임
broker:
image: confluentinc/cp-kafka:7.0.0
container_name: broker
hostname: broker
ports:
- "9092:9092"
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' #주키퍼에 연결하기 위한 대상 지정
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT" #보안을 위한 프로토콜 매핑. PLAINTEXT는 암호화하지 않은 일반 평문
KAFKA_ADVERTISED_LISTENERS: "INTERNAL://broker:29092,EXTERNAL://localhost:9092" # 외부 클라이언트 리스너 주소
KAFKA_INTER_BROKER_LISTENER_NAME: "INTERNAL"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 #토픽 복제에 대한 설정 값
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 #트랜젝션 최소 ISR(InSyncReplicas 설정) 수
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 #트렌젝션 상태에서 복제 수
KAFKA_HEARTBEAT_INTERVAL_MS: 1000 #컨슈머가 주키퍼에 하트비트를 보내는 주기가 짧아짐 너무 길면 컨슈머가 dead됐다고 인식하고 리밸런싱하게됨
KAFKA_SESSION_TIMEOUT_MS: 10000 #리밸런싱으로 인해 세션시간을 늘려야 실패한 소비자로 그룹제거를 하고 재조정할 시간을 벌어준다.
kafdrop:
image: obsidiandynamics/kafdrop
container_name: kafka_ui
restart: "always"
ports:
- "9000:9000"
depends_on:
- broker
- zookeeper
environment:
KAFKA_BROKERCONNECT: 'broker:29092' #여러개면 broker-2:29092,broker-3:29092,...
JVM_OPTS: "-Xms32M -Xmx64M"
#컨테이너들 한번에 삭제
# docker rm -f $(docker ps -qa)
그다음, docker compose up -d 로 컨테이너들을 생성시켜주고 producer.py와 consumer.py를 작성해준다.
producer.py
from kafka import KafkaProducer
from json import dumps
import time
producer = KafkaProducer(
acks=0, #메시지 전송 완료에 대한 체크
compression_type='gzip', #메시지 전달할 때 압축(None, gzip, snappy, lz4 등)
bootstrap_servers=['localhost:9092'], #전달하고자 하는 카프카 브로커의 주소 리스트
value_serializer=lambda x:dumps(x).encode('utf-8') #메시지의 값 직렬화
)
start = time.time()
for i in range(1000):
data = {'str': f'result{i}'}
producer.send('topic1', value=data)
producer.flush()
print(f'[Done]: {time.time()-start}')
consumer.py
from kafka import KafkaConsumer
from json import loads
consumer = KafkaConsumer(
'topic1',
bootstrap_servers=['localhost:9092'], #카프카 브로터 주소 리스트
auto_offset_reset='earliest', #오프셋 위치(eraliest:가장 처음, latest: 가장 최근)
enable_auto_commit=True, #오프셋 자동 커밋 여부
group_id='test-group', #컨슈머 그룹 식별자
value_deserializer=lambda x: loads(x.decode('utf-8')), #메시지의 값 역직렬화
consumer_timeout_ms=1000 #데이터를 기다리는 최대 시간
)
print('[Start] get consumer')
for message in consumer:
print(f'Topic: {message.topic}, Partition: {message.partition}, Offset: {message.offset}, Key: {message.key}, value: {message.value}')
print('[End] get consumer')