티스토리 뷰

MSA

Kafka 로컬 연동 및 테스트

Hilu 2023. 2. 25. 15:03

오늘은 Kafka 를 로컬에 테스트로 설치하고, Kafka 를 이용한 api 통신 처리를 해도록 하겠다.

 

1. kafka 설치

 

homeBrew 설치

https://brew.sh/index_ko

 

Homebrew

The Missing Package Manager for macOS (or Linux).

brew.sh

 

wget 설치

$ brew install

 

kafaka 설치

brew install kafka

 

zookeeper 설치

  • zookeeper 를 이해 하기 위해 Kafka 구조를 알아 보자
brew install zookeeper

 

Kafka 구조

카프카 구조

 

Kafka 크게 Producer, Kafka (Broker), Zookeeper Consumer 로 이루어져 있으며, 상세 내용은 아래와 같다.

 

Producer : Kafka 와 통신하며, 메시지를 전송하는 역활

Consumber : Kafka 와 통신하며, 메시지를 가져오는 역활

Zookeeper : Consumber 와 통신, Kafka 메타 데이터 저장, Kafka 상태 관리를 목적으로 함

 

Zookeeper 에 대해서 조금만 더 알아 보도록 하자

 

분산 어플리케이션을 많이 사용하게 되면, 분산 어플리케이션을 관리하기 위한 코디네이션 프로그램이 필요하기 마련이다. 여기서 주로 사용하는 코디네이션 프로그램이 Zookeeper 이다.

 

Zookeeper 는 분산 어플리케이션 관리 코디네이션 시스템이고, 분산되어 있는 각 어플리케이션을 정보를 집중화 하고, 구성관리, 네이밍, 동기화등 서비스를 제공 한다.

Zookeeper

 

각각의 client(kafka) 는 클러스터 이루어 하나의 server (zookeeper) 와 상태 정보를 주고 받게 된다.

 

Zookeeper 무엇이고, 왜 필요 한지는 대략 적으로 이해 했다.

 

이제, kafka, zookeeper 를 로컬에서 실행해 보도록 하자.

 

Zookeeper, kafka 실행

brew services start zookeeper
brew services start kafka

 

2. 샘플 어플케이션으로 테스트

 

이제 Spring Boot 프로젝트에서 Kafka를 통해 message 를 Produce 하고 Consume 하도록 해보자

 

Consumer 테스트 코드 작성

package com.example.kafkasample;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

import java.io.IOException;

@Service
public class KafkaSampleService {

	@KafkaListener(topics = "pay", groupId = "group-id-pay")
	public void consumer(String message) throws IOException {
		System.out.println("rec message : " + message);
	}
}

 

KafkaListener 에 등록된 groupId 에 중 topics 찾아서 Produce 된 message 를 consum 할 것이다.

 

여기에서 전달받은 message 를 비니지스 적으로 처리하는 로직은 개발 하면 될 것으로 보인다.

 

Producer 테스트 코드 작성

package com.example.kafkasample;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class KafkaSampleProducerService {

	@Autowired
	private KafkaTemplate<String, String> kafkaTemplate;

	public void sendMessage(String message) {
		System.out.println("sendMessage : " + message);
		this.kafkaTemplate.send("pay", message);
	}
}

 

KafkaTemplate 을 통해 정의된 메시지를 전달 할 수 있다. 

 

여기서는 kafka 전송이나, message 가공 관련 개발을 진행 하면 될 것 같다.

 

KafkaSampleProducerController 샘플 테스트 코드 작성

package com.example.kafkasample;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class KafkaSampleProducerController {

	@Autowired
	private KafkaSampleProducerService kafkaSampleProducerService;

	@PostMapping(value = "/sendMessage")
	public void sendMessage(String message) {
		kafkaSampleProducerService.sendMessage(message);
	}
}

 

그런데 웹서비스를 실했더니 아래 에러가 엄청나게 발생했다.

해당 에러는 kafka 에 topic 이 생성 되어 있지 않아서 이다. 

 

Topic 생성을 자동으로 설정 할 수도 있지만, Topic 관리가 어려워 지고, 디스크 용량이 꽉찰 위험이 있어서 추천하지 않는 다고 한다.

 

그러므로 이를 해결하기 위해, Topic 을 생성해 주도록 하자.

 

kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic pay

 

Topic 을 생성 하려고 하니, 이제는 아래와 같은 에러가 발생 하였다.

사유는 kafka 서버가 기동은 되었지만 localhost:9092 접근이 안되어서 그렇다.

 

Kafka 에 Server.properties 수정하기 위해

 

brew info kafka 로 path 를 조회 하여 Server.properties 들어가 보니

 

listensers=PLAINTEXT://:9092 가 주석처리 되어 있었다. listensers=PLAINTEXT:localhost//:9092로 변경 하고 다시 Kafka 를 재기동 하고 Topic 을 생성해 보자.

 

kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic pay

 

토픽이 정상적으로 생성이 되었다. 

 

이제 다시 스프링 부트에서 테스트를 진행해 보자.

 

정상적으로 테스트 웹어플리케이션이 실행이 되었다. 

 

이제 postMan 으로 테스트를 진행해 보자.

 

정상적으로 Produce 된 Message 가 Consumer 에 의해 처리 된 것을 알 수 있다.

 

 

다음장에서는 Kafa 를 이용한 비동기화/동기화 샘플에 대해서 알아보고, 사용시 아키텍쳐에서 발생 할 수 있는 사항을 고민하고 작성해 보도록 하겠다.

공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
«   2024/11   »
1 2
3 4 5 6 7 8 9
10 11 12 13 14 15 16
17 18 19 20 21 22 23
24 25 26 27 28 29 30
글 보관함