Spring Boot - Kafka 설치 및 실행과 Spring Boot에서 Apache Kafka 설정
1. kafka 설치 및 실행 (
windows10)
1.1 설치
https://www.apache.org/dyn/closer.cgi?path=/kafka/2.6.0/kafka_2.13-2.6.0.tgz
우선 kafka를 설치합니다. (주키퍼는 같이 받아집니다.)
원하는 위치에 tar의 압축을해제 합니다. (저는 C: 에 위치시켰습니다.)
압축을 풀고 config/server.properties를 엽니다.
log.dirs의 경로를 윈도우 운영체제 형식에 맞추어 변경합니다.
1.2 실행
조만간 주키퍼
없이 kafka
를 실행할 수 있도록 한다고 합니다. 하지만 현재는 주키퍼에 의존을하므로 별도로 같이 실행을 해주어야 합니다.
주키퍼 실행
$ bin/windows/zookeeper-server-start.bat config/zookeeper.properties
cmd 또는 별도의 쉘을 이용해 우선 C:kafka로 이동한 뒤 위 명령어를 입력하면 실행이 됩니다.
카프카 실행
xxxxxxxxxx
$ bin/windows/kafka-server-start.bat config/server.properties
주키퍼를 실행하면, shell이 block되므로 별도의 sh을 켭니다.
카프카 역시 같은 위치에서 위 명령어를 이용하여 실행합니다.
위와 같이 주키퍼, 카프카가
실행중인 상태가 됩니다.
1.3 간단한 테스트
마찬가지로 카프카를 실행하면 sh이 block되므로 별도의 sh을 실행합니다. 그 후, 카프카가 설치된 위치로 이동합니다.
- 토픽 생성
xxxxxxxxxx
$ bin/windows/kafka-topics.bat --create --topic quickstart-events --bootstrap-server localhost:9092
위 명령어를 이용하면, quickstart-events
라는 topic이 생성됩니다.
- 토픽에 이벤트 발행
xxxxxxxxxx
$ bin/windows/kafka-console-producer.bat --topic quickstart-events --bootstrap-server localhost:9092
위 명령어는 producer
를 이용해 이벤트를 특정 topic에 발행할 수 있도록 합니다.
아무 메시지를 입력해봅니다.
- 토픽에서 이벤트 읽기
xxxxxxxxxx
$ bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
새로운 sh을 실행한뒤, 카프카가 설치된곳으로 이동하고, 위 명령어를 입력합니다.
명령어를 입력하자마자, 기존에 저장되어있던 이벤트를 읽어오는 것을 볼 수 있습니다.
2. SpringBoot 연동
2.1 의존성
xxxxxxxxxx
{
...
implementation 'org.springframework.kafka:spring-kafka'
testImplementation 'org.springframework.kafka:spring-kafka-test'
}
2.2 설정
1) Spring Boot 자동설정 이용
SpringBoot의 막강한 기능중 하나인 AutoConfiguration에 의해서, 필요한 기본 설정은 거의 모두 되어있습니다.
application.properties
xxxxxxxxxx
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=myGroup
단지 이 설정만 추가하면, 준비 완료입니다. 또한 추가적인 설정들을 위한 properties들도 제공하기 때문에 문서를 보고 설정하셔도 되고, IDE단에서 제공하는 자동완성을 이용해 필요한 설정들을 찾아보실 수도 있습니다.
2) 직접 설정
직접 설정을 하기위해서는, Consumer, Producer, Topic
3가지 설정을 추가하면 됩니다.
KafkaProducerConfig
x
public class KafkaProducerConfig {
"${kafka.bootstrapAddress}") (
private String bootstrapAddress;
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
Producer를 위한 설정입니다. 추가해야할 빈은 2가지입니다.
ProducerFactory
Producer 생산을 위한 ProducerFactory를 위한 빈설정입니다. Factory에서 Producer를 생산할 때 필요한 설정들을 Map형태로, 전달받아 Producer를 생산합니다.
KafkaTemplate
이벤트 생산을 편리하게 할 수 있도록 도와주는 Template입니다.
KafkaConsumerConfig
xxxxxxxxxx
public class KafkaConsumerConfig {
"${kafka.bootstrapAddress}") (
private String bootstrapAddress;
"${kafka.groupId}") (
private String groupId;
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
Consumer를 위한 설정입니다.
ConsumerFactory
Consumer를 생산하기 위한 Factory를 위한 Bean설정입니다. Producer와 마찬가지로 Map형태로 필요한 설정을 전달받아 Consumer를 생산합니다.
ConcurrentKafkaListenerContainerFactory
멀티 스레드에대한 동기화를 제공하는 Consumer를 생산하기 위한 Factory입니다.
2.3 Producer 생성
xxxxxxxxxx
public class Producers {
private final Logger logger = LoggerFactory.getLogger(KafkaProducer.class);
private final KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String topic, String payload) {
logger.info("sending payload = '{}' to topice='{}'", payload, topic);
ListenableFuture<SendResult<String, String>> listenable = kafkaTemplate.send(topic, payload);
}
}
Producer 역할을 할 클래스 입니다. sendMessage에서는, kafkaTemplate를 이용해 topic에 이벤트를 발행합니다.
2.4 Consumer 생성
xxxxxxxxxx
public class Consumers {
topics = "baeldung") (
public void listenGroupFoo(String message) {
System.out.println("received message foo : " + message);
}
}
토픽에 발행된 이벤트를 가져와 처리할 Consumer입니다.
2.5 TestController 생성
xxxxxxxxxx
public class TestController {
private final Producers producers;
"/") (
public void test() {
producers.sendMessage("baeldung", "HIHIHII");
}
}
이벤트를 발행하기 위한 TestController 입니다.
2.6 테스트
어플리케이션을 실행하고, 브라우저를 이용해 앞서 만든 TestController에 요청을 보냅니다.