【Springboot】Springboot集成Kafka(快速入门案例)
个人简介:
一:配置相关参数
在resource包下创建application.yml文件并导入以下配置:
server:
port: 9991
spring:
application:
name: kafka-demo
kafka:
bootstrap-servers: 4.234.52.122:9092
producer:
retries: 10
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: ${spring.application.name}-test
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
二:传递字符串消息
1.发送消息
创建一个Controller包并编写一个测试类用于发送消息
package com.my.kafka.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class HelloController {
@Autowired
private KafkaTemplate<String,String> kafkaTemplate;
@GetMapping("hello")
public String helloProducer(){
kafkaTemplate.send("my-topic","Hello~");
return "ok";
}
}
2.监听消息
编写测试类用于接收消息:
package com.my.kafka.listener;
import org.junit.platform.commons.util.StringUtils;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class HelloListener {
@KafkaListener(topics = "my-topic")
public void helloListener(String message) {
if(StringUtils.isNotBlank(message)) {
System.out.println(message);
}
}
}
3.测试结果
打开浏览器输入localhost:9991/hello,然后到控制台查看消息,可以看到成功消息监听到并且进行了消费。
三:传递对象消息
目前springboot整合后的kafka,因为序列化器是StringSerializer,这个时候如果需要传递对象可以有两种方式:
方式一:可以自定义序列化器,对象类型众多,这种方式通用性不强,这里不做介绍。
方式二:可以把要传递的对象进行转json字符串,接收消息后再转为对象即可,本项目采用这种方式。
1.修改生产者代码
@GetMapping("hello")
public String helloProducer(){
User user = new User();
user.setName("赵四");
user.setAge(20);
kafkaTemplate.send("my-topic", JSON.toJSONString(user));
return "ok";
}
2.结果测试
可以看到成功接收都对象参数,后期要使用该对象只需要将其转换成User对象即可。
