博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spring Boot集成kafka完整版
阅读量:6937 次
发布时间:2019-06-27

本文共 3419 字,大约阅读时间需要 11 分钟。

pom.xml添加maven依赖

org.springframework.boot
spring-boot-starter-parent
2.0.2.RELEASE
org.springframework.kafka
spring-kafka

spring boot会自动配置kafka,接下来只要配置yml属性文件和主题名配置。

application.yml配置kafka

spring:  kafka:    bootstrap-servers: 127.0.0.1:9092,127.0.0.2:9092,127.0.0.3:9092    producer:      retries: 0      batch-size: 16384      buffer-memory: 33554432      key-serializer: org.apache.kafka.common.serialization.StringSerializer      value-serializer: org.apache.kafka.common.serialization.StringSerializer      properties:        linger.ms: 1    consumer:      enable-auto-commit: false      auto-commit-interval: 100ms      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer      properties:        session.timeout.ms: 15000

application.yml配置主题和消费者组

kafka:  topic:    group-id: topicGroupId    topic-name:      - topic1      - topic2      - topic3

新建KafkaTopicProperties

@ConfigurationProperties("kafka.topic")public class KafkaTopicProperties implements Serializable {    private String groupId;    private String[] topicName;    public String getGroupId() {        return groupId;    }    public void setGroupId(String groupId) {        this.groupId = groupId;    }    public String[] getTopicName() {        return topicName;    }    public void setTopicName(String[] topicName) {        this.topicName = topicName;    }

添加KafkaTopicConfiguration

@Configuration@EnableConfigurationProperties(KafkaTopicProperties.class)public class KafkaTopicConfiguration {    private final KafkaTopicProperties properties;    public KafkaTopicConfiguration(KafkaTopicProperties properties) {        this.properties = properties;    }    @Bean    public String[] kafkaTopicName() {        return properties.getTopicName();    }    @Bean    public String topicGroupId() {        return properties.getGroupId();    }}

添加自己的service

@Servicepublic class IndicatorService {    private Logger LOG = LoggerFactory.getLogger(IndicatorService.class);    private final KafkaTemplate
kafkaTemplate; /** * 注入KafkaTemplate * @param kafkaTemplate kafka模版类 */ @Autowired public IndicatorService(KafkaTemplate kafkaTemplate) { this.kafkaTemplate = kafkaTemplate; } @KafkaListener(topics = "#{kafkaTopicName}", groupId = "#{topicGroupId}") public void processMessage(ConsumerRecord
record) { LOG.info("kafka processMessage start"); LOG.info("processMessage, topic = {}, msg = {}", record.topic(), record.value()); // do something ... properties.getProperties(); LOG.info("kafka processMessage end"); } public void sendMessage(String topic, String data) { LOG.info("kafka sendMessage start"); ListenableFuture
> future = kafkaTemplate.send(topic, data); future.addCallback(new ListenableFutureCallback
>() { @Override public void onFailure(Throwable ex) { LOG.error("kafka sendMessage error, ex = {}, topic = {}, data = {}", ex, topic, data); } @Override public void onSuccess(SendResult
result) { LOG.info("kafka sendMessage success topic = {}, data = {}",topic, data); } }); LOG.info("kafka sendMessage end"); }}

至此就可以跑起来了,有什么不明白的可以留言。

转载地址:http://gdbnl.baihongyu.com/

你可能感兴趣的文章
详解 ESLint 规则,规范你的代码
查看>>
redis配置不当导致机器被黑
查看>>
Shell图形化监控网络流量
查看>>
springBoot(15):集成Swagger
查看>>
shell 脚本分析nginx 访问日志状态码
查看>>
规划System Center 2012 R2 Operations manager
查看>>
用 MapFileAndCheckSum 函数检测 exe 或 dll 是否被修改 - 回复 "Joe Lo" 的问题
查看>>
我的友情链接
查看>>
ORM for Net主流框架汇总与效率测试
查看>>
图片切换控件
查看>>
MYSQL 執行 BASH SCRIPT 出現 WARNING: USING A PASSWORD ON THE COMMAND LINE INTERFACE CAN BE INSECURE...
查看>>
解决php的It is not safe to rely on the system’s timezone settings的问题
查看>>
MySQL数据库的优化-运维架构师必会高薪技能,笔者近六年来一线城市工作实战经验...
查看>>
【学神-RHEL7】P2-Python流程控制
查看>>
启动或重启DNS服务时,卡在Generating /etc/rndc.key:上很长时间
查看>>
视频专辑:Python系列视频教程
查看>>
Linux内核更新 linux3.10.tar.gz Redhat 6.0
查看>>
IPv6技术详解:基本概念、应用现状、技术实践(下篇)
查看>>
zabbix自定义key监控mysql重要参数的运行情况
查看>>
CISCO 多协议,多进程,路由重分布及其路由策略应用
查看>>