Apache kafka 가이드라인 따라해보기

2023. 4. 21. 17:02프로젝트

이번에 카프카 가이드라인을 따라하기위해 사용할 os는 rocky linux다

rocky linux는 레드햇 계열의 운영체제인데 일단 추천받아서 os를 사용하는거라 뭔지 다시 알아보는 시간을 가져야겠다.

 

1. 카프카 가져오기

https://www.apache.org/dyn/closer.cgi?path=/kafka/3.4.0/kafka_2.13-3.4.0.tgz

tar -xzf로 압축을 풀어준다.

 

2. 환경 시작

일단 로컬 환경에 Jdk 8 이상의 버전이 필요하다.

sudo dnf update -y

dnf를 업데이트(이거 엄청 오래 걸린다..) 하고나서 openjdk8을 받아준다.

dnf install java-1.8.0-openjdk-devel

kraft와 zookeeper중 선택해야 하는데 카프카 최신버전에는 zookeeper를 대체하기 위해 나온게 kraft라고 한다.

kraft라는 것을 통해서 시작해보자

 

 1. 클러스터 아이디 생성

KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"

2. 로그 디렉토리 포맷

$ bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties

3. 카프카 서버 시작

$ bin/kafka-server-start.sh config/kraft/server.properties

 

처음에는 클러스터 아이디를 생성하는 부분에서 생성한 클러스터를 뭐 연결해줘야 하는건줄 알고

계속 찾아봤는데 그냥 될대로 대란 생각으로 명령어들을 그대로 붙여넣었다.

명령어를 그대로 붙이니 잘 되길래 ps -ef | grep kafka 를 통해서 카프카가 실행한 것을 확인했다.

 

3. 이벤트를 저장할 주제 만들기

카프카는 여러 시스템에서 이벤트 ( 문서에서 레코드 또는 메시지 라고도 함 ) 를 읽고, 쓰고, 저장하고, 처리할 수 있는 분산 이벤트 스트리밍 플랫폼 입니다. 이벤트의 예로는 결제 거래, 휴대폰의 지리적 위치 업데이트, 배송 주문, IoT 장치 또는 의료 장비의 센서 측정 등이 있습니다. 이러한 이벤트는 주제 에 구성되고 저장됩니다 . 매우 단순화된 토픽은 파일 시스템의 폴더와 유사하며 이벤트는 해당 폴더의 파일입니다. 라고 나와있다.

그래서 첫 번째 이벤트를 작성하기 전에 주제를 만들어야 한다.

$ bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092

실행 해보니 주제가 만들어진다.

빠른 실행을 위한 주제를 만들었다고 한다. vi로 까보니 별 건 안나온다.

$ bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092

주제의 세부 정보를 표시할수도 있다.

 

4. 주제에 몇 가지 이벤트 쓰기

Kafka 클라이언트는 이벤트 쓰기(또는 읽기)를 위해 네트워크를 통해 Kafka 브로커와 통신합니다. 

일단 수신하면 브로커는 필요한 기간 동안(심지어는 영원히) 지속적이고 내결함성 있는 방식으로 이벤트를 저장합니다.

콘솔 생산자 클라이언트를 실행하여 주제에 몇 가지 이벤트를 씁니다. 

기본적으로 입력하는 줄마다 주제에 별도의 이벤트가 기록됩니다.

$ bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092

뭔 말인지는 모르겠지만, 일단 실행을 통해서 주제를 발행하는 것 같다.

This is my first event
This is my second event

라고 쳐보면 뭔가 되지는 않는다. 다음 단계를 통해 이벤트를 읽을 수 있는 소비자를 실행할 수 있는것 같다.

 

5. 이벤트 읽기

$ bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092

kafka-console-consumer를 실행했더니 이벤트를 읽을 수 있게 됐다!

잘 읽는다!

이벤트는 카프카에 영구적으로 저장되기 때문에 원하는 만큼 많은 소비자가 이벤트를 읽을 수 있다고 한다. 다른 터미널 세션을 통해 다시 실행해봐도 또 읽어진다.

 

6. KAFKA CONNECT를 사용하여 이벤트 스트림으로 데이터 가져오기/내보내기

kafka connect를 사용하면 외부 시스템에서 kafka로 또는 그 반대로 데이터를 지속적으로 수집할 수 있다. 외부 시스템과 상호 작용하기 위한 사용자 지정 논리를 구현하는 커넥터를 실행하는 확장 가능한 도구다. 이 덖분에 기존 시스템을 kafka와 통합하는 것이 쉬워진다.

여기선 파일에서 kafka 주제로 데이터를 가져오고 kafka 주제에서 파일로 데이터를 내보내는 간단한 커넥터로  kafka connect를 실행해보자.

먼저 connect-file-3.4.0.jar 를 추가해야하는데, config/connect-standalone.properties 파일에 추가해주자.

맨 밑에 보면 plugin.path 가 주석처리 되있는데, 주석을 풀고 절대경로로 저장하는것이 좋다고 추천한다.그리고 테스트 용 파일을 만들어준다.

echo -e "foo\nbar" > test.txt

 

뭔가 실행해주는 것 같은데 뭘 하는건지 아직 잘 모르겠다

다음으로 독립 실행 형 모드 에서 실행되는 두 개의 커넥터를 시작합니다 . 즉, 단일 로컬 전용 프로세스에서 실행됩니다. 세 가지 구성 파일을 매개변수로 제공합니다. 첫 번째는 항상 Kafka Connect 프로세스에 대한 구성으로, 연결할 Kafka 브로커와 데이터의 직렬화 형식과 같은 공통 구성을 포함합니다. 나머지 구성 파일은 각각 만들 커넥터를 지정합니다. 이러한 파일에는 고유한 커넥터 이름, 인스턴스화할 커넥터 클래스 및 커넥터에 필요한 기타 구성이 포함됩니다.

> bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties

Kafka에 포함된 이 샘플 구성 파일은 이전에 시작한 기본 로컬 클러스터 구성을 사용하고 두 개의 커넥터를 생성합니다. 첫 번째는 입력 파일에서 라인을 읽고 각각을 Kafka 주제에 생성하는 소스 커넥터이고 두 번째는 싱크 커넥터입니다. Kafka 주제에서 메시지를 읽고 각각을 출력 파일의 한 줄로 생성합니다.

시작하는 동안 커넥터가 인스턴스화되고 있음을 나타내는 일부를 포함하여 많은 로그 메시지가 표시됩니다. Kafka Connect 프로세스가 시작되면 소스 커넥터는 test.txt주제에서 라인을 읽고 생산하기 시작해야 하며 connect-test싱크 커넥터는 주제에서 메시지 읽기를 시작 connect-test 하고 파일에 기록해야 합니다 test.sink.txt. 출력 파일의 내용을 검사하여 전체 파이프라인을 통해 데이터가 전달되었는지 확인할 수 있습니다.

 

한꺼번에 뭔가가 실행된다.

갑자기 test.sink.txt 라는 파일이 생성됨.. ㄷ

$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning

실행해보면 connect-test 소비자를 실행하여 주제의 데이터를 볼 수 있다.

foo와 bar

$ echo Another line >> test.txt

덮어씌워서 확인하면, 실시간으로 데이터가 파이파라인을 통해 이동하는 것을 볼 수 있다.

 

 

7. KAFKA STREAMS로 이벤트 처리

데이터가 Kafka에 이벤트로 저장되면 Java/Scala용 Kafka Streams 클라이언트 라이브러리 를 사용하여 데이터를 처리할 수 있다. 입력 및/또는 출력 데이터가 Kafka 주제에 저장되는 미션 크리티컬 실시간 애플리케이션 및 마이크로 서비스를 구현할 수 있습니다. Kafka Streams는 클라이언트 측에서 표준 Java 및 Scala 애플리케이션을 작성하고 배포하는 단순성과 Kafka의 서버 측 클러스터 기술의 이점을 결합하여 이러한 애플리케이션을 확장성, 탄력성, 내결함성 및 분산성을 높입니다. 라이브러리는 정확히 1회 처리, 상태 저장 작업 및 집계, 기간 설정, 조인, 이벤트 시간 기반 처리 등을 지원합니다. 라고 나와있다!

여기서부터는 직접 Java 애플리케이션을 이용해 처리해야하는 것 같다! 애플리케이션 또한 가이드라인을 따라가보도록 하겠다.

카프카 책을 보고 다시 하는게 나을듯ㅎㅎ,, 2.7과 2.8은 환경이 아예 다르다고 한다. 종속성 분리를 위해 2.8이 나왔다고 하니

일단 2.7 책으로 한번 맛을 봐야겠다.. 뭐하는건지도 모르겠다!!

 

 

https://kafka.apache.org/quickstart