Kafka 컨슈머(Consumer) (4)
Kafka 컨슈머(Consumer)
- 토픽 파티션에서 레코드 조회
- Broker 지정
- Group ID 지정
- Key, Value 역직렬화 (deserializer)
- 컨슈머 객체 생성
- subscribe 호출할 때 Topic 구독
- Record 읽기
컨슈머 그룹 단위로 파티션 할당
파티션 개수는 컨슈머 갯수와 밀접하게 관련이 있다.
파티션이 두개이고 컨슈머가 한개일 경우 두파티션의 데이터를 한 컨슈머가 레코드를 받게되고
파티션이 두개이고 컨슈머가 두개일 경우 두파티션의 데이터를 두 컨슈머가 레코드를 나눠 받습니다.
파티션이 두개이고 컨슈머가 세개인 경우 한 컨슈머가 일을 하지 않고 놀게 됩니다.
컨슈머 개수가 파티션 개수보다 많으면 안됍니다. 처리량을 늘리고 싶다면 컨슈머와 파티션 개수가 동시에 늘어나는 것을 추천드립니다.
커밋(commit)과 오프셋(offset)
- 컨슈머가 처음 Poll할 경우 1~3의 Offset 데이터를 가져왔다고 생각해봅니다. 이전 커밋 오프셋(commit offset)이 1이였고 poll하는 순간 offset 3까지 데이터를 가져온 후 오프셋 커밋을 하게되었다고 가정해봅니다.
1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | 10 | … |
---|---|---|---|---|---|---|---|---|---|---|
이전 커밋 오프셋 | 오프셋 커밋 |
- 두번 째 컨슈머가 Poll할경우 이전 커밋 오프셋 부터 다시 가져오게되고 이 과정을 계속 반복하게 됩니다.
1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | 10 | … |
---|---|---|---|---|---|---|---|---|---|---|
이전 커밋 오프셋 | 오프셋 커밋 |
처음 접근이거나 커밋한 오프셋이 없는경우에는 이런 경우에는 auto.offset.reset
설정을 사용합니다.
- earliest : 맨 처음 오프셋 사용
- latest : 가장 마지막 오프셋 사용(기본값)
- none : 컨슈머 그룹에 대한 이전 커밋이 없으면 Exception 발생
컨슈머(Consumer) 설정
조회에 영할을 주는 주요 설정
- fetch.min.byte: 조회 시 브로커(broker)가 전송할 최소 데이터 크기 , 기본값은 1이며 이 값이 크면 대기 시간은 늘지만 처리량이 증가합니다.
- fetch.max.wait.ms: 데이터가 최소 크기가 될 때 까지 기다릴 시간, 기본값은 500ms이며, 브로커(broker)가 return될 때 까지 대기하는 시간으로 poll() 메서드의 대기 시간과는 다릅니다.
- max.partition.fetch.bytes: 파티션(partition) 당 서버가 return할 수 있는 최대 크기, 기본값은 1048576(1MB) 입니다.
자동 커밋 / 수동 커밋
- enable.auto.commit 설정
- true : 일정 주기로 컨슈머가 읽은 오프셋을 커밋(기본값)
- false: 수동으로 커밋 실행
- auto.commit.interval.ms : 자동 커밋 주기 / 기본값 5초
- poll(), close() 메소드 호출 될 때 자동 커밋 실행
재처리와 순서
동일 메시지 조회 가능성 일시적 커밋 실패 , 리밸런스 등에 의해 발생
컨슈머는 멱등성(idempotence)을 고려해야 합니다. 조회수 1증가 -> 좋아요 1증가 -> 조회수 1증가 동일한 순서로 동일 메시지를 조회한다면
단순 처리하면 조회수는 2가 아닌 4가 될 수 있습니다.
데이터 특성에 따라 타임스탬프, 일련 번호등을 활용해서 두번 조회될때 방지해야 합니다.
세션 타임아웃(Session Timeout), 하트비트(HeartBeat), 최대 poll 간격
컨슈머(Consumer)는 하트비트(Heartbeat) 전송해서 연결유지 브로커(Broker)는 일정시간 컨슈머(Consumer)로 부터 하트비트(HeartBeat)가 없으면 컨슈머(Consumer)를 그룹에서 빼고 리밸런스 진행합니다.
- session.timeout.ms : 세션 타임 아웃 시간 (기본값 10초)
- heartbeat.internal.ms : 하트비트 전송 주기 (기본값 3초) : session.timeout.ms의 1/3 이하로 값을 설정하길 추천
- max.poll.interval.ms: poll()메서드의 최대 호출 간격 : 이 시간이 지나도록 poll하지 않으면 컨슈머를 그룹에서 빼고 리벨런스 진행합니다.
종료 처리
- 다른 쓰레드에서 wakeUp 메소드를 호출하여 wakeException으로 종료 처리하게 됩니다.
주의 : 쓰레드 안전하지 않음
KafkaConsumer는 쓰레드에 안전하지 않아 여러 쓰레드에서 동시에 사용하지 않아야 합니다. wakeup() 메서드만 예외로 다른 쓰레드에서 사용가능 합니다.
댓글남기기