当前位置: 首页 > news >正文

Springboot整合Kafka消息队列服务实例

一、Kafka相关概念

1、关于Kafka的描述

Kafka是由Apache开源,具有分布式、分区的、多副本的、多订阅者,基于Zookeeper协调的分布式处理平台,由Scala和Java语言编写。通常用来搜集用户在应用服务中产生的动作日志数据,并高速的处理。日志类的数据需要高吞吐量的性能要求,对于像Hadoop一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。

2、关于Kafka的功能特点

  1. 通过磁盘数据结构提供消息的持久化,消息存储也能够保持长时间稳定性;
  2. 高吞吐量,即使是非常普通的硬件Kafka也可以支持每秒超高的并发量;
  3. 支持通过Kafka服务器和消费机集群来分区消息;
  4. 支持Hadoop并行数据加载;
  5. API包封装的非常好,简单易用,上手快 ;
  6. 分布式消息队列。Kafka对消息保存时根据Topic(主题)进行归类,发送消息者称为Producer(生产者),消息接受者称为Consumer(消费者);

3、Kafka消息功能

如下图所示,Kafka作为一个中间服务,代表一个broker(经纪人)角色,负责接收APP的消费与推送消息给其他相关APP。这里APP可分为Producer,Consumer。

消息的消费模式

点对点模式:点对点模式通常是一个基于拉取或者轮询的消息传递模型,消费者主动拉取数据,消息收到后从队列移除消息,这种模型不是将消息推送到客户端,而是从队列中请求消息。特点是发送到队列的消息被一个且只有一个消费者接收处理,即使有多个消费者监听队列也是如此。

发布订阅模式:订阅模式是一个基于推送的消费传送模型,消息产生后,Kafka会推送给所有订阅相关Topic的订阅者。发布订阅模型可以有多种不同的订阅者,临时订阅者只在主动监听主题时才接收消息,而持久订阅者则监听主题的所有消息,即使当前订阅者不可用,处于离线状态。

4、Kafka消息队列的作用

  • 应用程序之间解耦,生产者与消费者相互独立,各自异步执行。
  • 消息数据持久化存储,直到所有消息都被消费,规避消息数据丢失的风险。
  • 流量削峰,使用Kafka消息队列可以帮助server承接访问压力,尽可能避免应用程序崩溃。
  • 降低进程间的耦合度,系统部分应用组件发生崩溃时,不会影响到整体系统的运行。
  • 保证消息顺序执行,解决特定场景业务需求。

5、Kafka相关术语介绍

  • Broker

   一台kafka服务器就是一个broker(经纪人)。一个集群由多个broker组成。一个broker可以容纳多个topic(消息主题)。

  • Producer

    消息生产者,就是向kafka broker发消息的APP客户端。

  • Consumer

    消息消费者,向kafka broker取消息的APP客户端。

  • Topic

    每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic,可以理解为一个队列。

  • Consumer Group

     每个Consumer属于一个特定的Consumer Group,可为每个Consumer指定group name,若不指定group name则属于默认的分组。

  • Partition

一个庞大大的topic可以分布到多个broker上,一个topic可以分为多个partition,每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id。kafka只保证按一个partition中的顺序将消息发给consumer,不保证一个topic的整体的顺序。Partition是物理上的概念,方便在集群中扩展,提高并发。

二、liunx系统下搭建Kafka环境

       

--新建kafka应用目录。并下载到当前目录下
cd  /usr/localmkdir kafkacd kafka 
--下载wget https://downloads.apache.org/kafka/3.7.0/kafka-3.7.0-src.tgz--解压tar -zxvf  kafka-3.7.0-src.tgz--启动服务cd kafka-3.7.0./bin/kafka-server-start.sh    config/server.properties--查看服务ps -aux |grep kafka--开放kafka地址端口vim server.properties--添加下面注释advertised.listeners=PLAINTEXT://10.98.3.22:9092

三、Springboot2整合Kafka 服务

1、导入基础依赖

<!-- SpringBoot依赖 -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- kafka 依赖 -->
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.2.4.RELEASE</version>
</dependency>

2、项目目录结构

3、生产者与消费者yml文件配置

#消费者配置
spring:kafka:bootstrap-servers: 127.0.0.1:9092consumer:group-id: test-consumer-group#生产者配置spring:kafka:bootstrap-servers: 127.0.0.1:9092

4、生成消息

@RestController
@RequestMapping("/kafka")
public class ProducerController {@Resourceprivate KafkaTemplate<String, String> kafkaTemplate;@RequestMapping("/send")public HttpResult sendMsg () {MsgLog msgLog = new MsgLog(1,"消息生成",1,"消息日志",new Date()) ;String msg = JSON.toJSONString(msgLog) ;// 这里Topic如果不存在,会自动创建kafkaTemplate.send("cicada-topic", msg);return HttpResult.create(HttpStatus.SUCCESS,msg);}
}
@Component
public class ConsumerMsg {private static Logger LOGGER = LoggerFactory.getLogger(ConsumerMsg.class);//此注解是监听主题为cicada-topic的消息队列@KafkaListener(topics = "cicada-topic")public void listenMsg (ConsumerRecord<?,String> record) {String value = record.value();LOGGER.info("ConsumerMsg====>>"+value);}
}

相关文章:

  • thinkphp单独为某个接口设置缓存
  • 预期和视角之外是什么?
  • 01--MySQL数据库概述
  • vue3自动导入---组件库elements-ui,vuetify以及scss样式的自动导入
  • 【C++11 之强类型枚举enum class/struct 基本结构及应用场景】了解在enum基础上增加了什么
  • 水系统阻力计算
  • git log 过滤
  • Redis的实战常用一、验证码登录(解决session共享问题)(思路、意识)
  • 数据仓库之离线数仓
  • Android系统 无法绑定1024以下端口问题
  • 力扣每日一题 6/22 字符串/贪心
  • 【太原理工大学】软件系统安全—分析题
  • 【自动驾驶技术】自动驾驶汽车AI芯片汇总——TESLA篇(FSD介绍)
  • 邻接矩阵实现
  • 【D3.js in Action 3 精译】关于本书
  • [LeetCode] Wiggle Sort
  • 5分钟即可掌握的前端高效利器:JavaScript 策略模式
  • Codepen 每日精选(2018-3-25)
  • CSS 提示工具(Tooltip)
  • Git的一些常用操作
  • hadoop入门学习教程--DKHadoop完整安装步骤
  • JDK 6和JDK 7中的substring()方法
  • JS题目及答案整理
  • JS专题之继承
  • js作用域和this的理解
  • Python_OOP
  • Redis中的lru算法实现
  • 编写符合Python风格的对象
  • 解决iview多表头动态更改列元素发生的错误
  • 码农张的Bug人生 - 见面之礼
  • 线上 python http server profile 实践
  • 写代码的正确姿势
  • 没有任何编程基础可以直接学习python语言吗?学会后能够做什么? ...
  • ​卜东波研究员:高观点下的少儿计算思维
  • #1014 : Trie树
  • #我与Java虚拟机的故事#连载16:打开Java世界大门的钥匙
  • (2)STL算法之元素计数
  • (2)STM32单片机上位机
  • (Matalb时序预测)WOA-BP鲸鱼算法优化BP神经网络的多维时序回归预测
  • (Redis使用系列) Springboot 实现Redis 同数据源动态切换db 八
  • (搬运以学习)flask 上下文的实现
  • (代码示例)使用setTimeout来延迟加载JS脚本文件
  • (二)换源+apt-get基础配置+搜狗拼音
  • (二)正点原子I.MX6ULL u-boot移植
  • (四)js前端开发中设计模式之工厂方法模式
  • (学习日记)2024.03.12:UCOSIII第十四节:时基列表
  • (转)从零实现3D图像引擎:(8)参数化直线与3D平面函数库
  • (转)利用ant在Mac 下自动化打包签名Android程序
  • (转)原始图像数据和PDF中的图像数据
  • .aanva
  • .net流程开发平台的一些难点(1)
  • .NET实现之(自动更新)
  • /run/containerd/containerd.sock connect: connection refused
  • @AutoConfigurationPackage的使用
  • @Resource和@Autowired的区别