커맨드 라인 툴을 통해 카프카 브로커 운영에 필요한 다양한 명령을 내릴 수 있다. !!
카프카 클라이언트 애플리케이션을 운영할 때는 카프카 클러스터와 연동하여 데이터를 주고받는 것도 중요하지만
토픽이나 파티션 개수 변경과 같은 명령을 실행해야 하는 경우도 자주 발생함. ==> 커맨드 라인 툴, 툴별 옵션에 대해 알고있어야 한다!
명령어 실행 방법
1. 카프카 브로커가 설치된 인스턴스에 ssh로 원격 접속하여 명령을 실행
2. 브로커에 9092 로 접근 가능한 컴퓨터에서 명령 실행
데이터를 전송 (프로듀서)
받는 (컨슈머) 실습
1. kafka-topics.sh
토픽과 관련된 명령 실행 (토픽 : 카프카에서 데이터 구분하는 가장 기본적인 개념.)
카프카 클러스터에 토픽은 여러 개 존재할 수 있다.
토픽 생성하는 방법 1. 컨슈머 또는 프로듀서가 카프카 브로커에 생성되지 않은 토픽에 대해 데이터를 요청할 때. 2. 커맨드라인 툴로 명시적으로 토픽 생성. (추천!)
토픽을 생성할 때 동시에 처리하는 데이터 양이 많을 때는 파티션 개수를 100으로 설정 가능.
단기간 데이터 처리만 필요할 때는 데이터 보관기간 옵션을 짧게 설정할 수도 있다.
==> 토픽에 들어오는 데이터양과 병렬로 처리되어야 하는 용량을 잘 파악해 생성하는 것이 중요
<토픽 생성>
--create : 토픽 생성
--bootstrap-server : 토픽을 생성할 클러스터를 구성하는 브로커들의 IP,port 적음
--topic : 토픽 이름 작성 (내부 데이터가 무엇이 있는지 유추 가능할 정도로 자세히!)
--partitions : 파티션 개수 지정
--replication-factor : 토픽의 파티션을 복제할 복제 개수
1 : 복제하지않고 사용.
2 : 1개의 복제본 이용
--config : kafka-topics.sh 명령에 포함되지 않은 추가적인 설정 할 수 있음. (ex. retention.ms : 토픽의 데이터 유지하는 기간)
<토픽 리스트 조회>
bin/kafka-topics.sh --bootstrap-server 토픽이름:9092 --list
<토픽 상세 조회>
위에명령에 -describe --topic 토픽이름
파티션 개수가 몇개인지, 복제된 파티션이 위치한 브로커의 번호, 기타 토픽 구성하는 설정들을 출력함. 토픽이 가진 파티션의 리더가 현재 어느 브로커에 존재하는지도 같이 확인할 수 있음.
=> 클러스터의 성능이 좋지 않다면 이 명령어를 통해 파티션 쏠림 현상 확인하는 것도 좋음!
<토픽 옵션 수정>
kafka-topics.sh : 파티션 개수 변경
kafka-configs.sh : 토픽 삭제 정책인 리텐션 기간 변경. // 카프카 2.5 까지는 --alter 옵션 사용했지만 추후 삭제 예정
다이나믹 토픽 옵션 (log.segment.bytes , log.retention.ms ) : kafka-configs.sh 로 수정
alter, partitions 옵션들을 사용해 파티션 개수 변경 가능.
토픽의 파티션을 늘릴 수는 있지만 줄일 수는 없으므로 주의 !!
--add-config 옵션 : 이미 존재하는 설정값은 변경하고 존재하지 않는 설정값은 신규로 추가한다.
2. kafka-console-producer.sh
생성된 토픽에 데이터 넣을 수 있는 명령어.
토픽에 넣는 데이터는 레코드라고 부르며 메시지 키, 메시지 값으로 이루어짐. (key, value)
(1) 메시지 키 없이 메시지 값만 보내기
메시지 키는 자바의 null 로 기본 설정되어 브로커로 전송된다.
단, String 이 아닌 타입으로는 직렬화하여 전송 불가. 하고싶으면 프로듀서 애플리케이션 직접 개발해야!
(2) 메시지 키 가지는 레코드 전송하기
parse.key 를 true 로 두면 레코드를 전송할 때 메시지 키를 추가할 수 있음.
key.separator : 메시지 키와 메시지 값 구분하는 구분자 선언 (default : \t 탭!)
메시지 키와 값을 함께 전송한 레코드는 토픽의 파티션에 저장된다 .
3. kafka-console-consumer.sh
토픽으로 전송한 데이터 확인하는 명령어
필수옵션 : --bootstrap-server 에 클러스터 정보
--topic 에 토픽 이름 적기
추가 : --from-beginning 에 토픽에 저장된 가장 처음 데이터부터 출력함
데이터 순서가 출력되는 순서와 다른 이유 : 파티션때문에.
위의 명령으로 토픽의 데이터를 가져가게 되면 토픽의 모든 파티션으로부터 동일한 중요도로 데이터 가져감.
이로인해 데이터 순서가 달라지게 되는 것 .
데이터 순서 보장하고 싶다면 파티션 1개로 구성된 토픽을 만드는 것 !!
4. kafka-consumer-group.sh
생성된 컨슈머 그룹의 리스트 보는 명령어
group , topic, partition : 조회한 컨슈머 그룹이 마지막으로 커밋한 토픽,파티션 나타냄
- 가장 첫번째 줄 hallo-group 이름의 컨슈머 그룹이 hallo.kafka 토픽의 1번 파티션의 레코드가 마지막으로 커밋됨
current-offset : 컨슈머 그룹이 가져간 토픽의 파티션에 가장 최신 오프셋(파티션의 각 레코드의 할당된 번호)이 몇번인지 나타냄.
- 이 번호는 데이터가 파티션에 들어올 때마다 1씩 증가함!!
- 위 사진에선 1번 파티션에 가장 최신 오프셋은 3. 3개의 데이터가 들어간 것 알 수 있음.
log-end-offset : 해당 컨슈머 그룹의 컨슈머가 어느 오프셋까지 커밋했는지 알 수 있음.
- 커런트는 로그앤드보다 같거나 작은 값.
lag : 컨슈머 그룹이 파티션에 있는 데이터를 가져가는데 얼마나 지연이 발생하는지 나타냄
- 커밋한 오프셋과 해당 파티션의 가장 최신 오프셋 간의 차이
consumer-id : 컨슈머의 토픽 할당을 카프카 내부적으로 구분하기 위해 사용하는 id. 자동설정됨
5. kafka-verifiable-producer , consumer.sh
kafka-verifiable로 시작하는 2개의 스크립트를 사용하면 String 타입 메시지 값을 코드 없이 주고받을 수 있음.
카프카 클러스터 설치가 완료된 이후 토픽에 데이터를 전송해
간단한 네트워크 통신 테스트를 할 때 유용!
max~. :verify 로 보내는 데이터개수 지정. -1 : 종료될 때까지 계속 데이터를 토픽으로 보냄
--topic : 데이터를 받을 대상 토픽 입력
마지막 줄에서 avg_~ 에서 평균 처리량 확인 가능
6. kafka-delete-records.sh
이미 적재된 토픽의 데이터 지우는 명령어
가장 오래된 데이터 (가장 낮은 숫자의 오프셋) ~ 특정 시점의 오프셋까지 삭제 가능!
삭제하고자 하는 데이터에 대한 정보를 파일로 저장해 사용해야 한다.
삭제하고자 하는 토픽, 파티션, 오프셋 정보가 들어가야 한다.
'Data Engineering > Kafka' 카테고리의 다른 글
토픽, 파티션 (0) | 2021.06.24 |
---|---|
카프카 각 컨포넌트들의 특징 (0) | 2021.06.21 |
카프카 브로커 실행 옵션 설정 (0) | 2021.05.27 |
Kafka 브로커 힙 메모리 설정 (0) | 2021.05.27 |
스타트업에서 kafka가 유용한 이유 (0) | 2021.05.18 |