当前位置: 首页 > news >正文

消息队列kafka组件的安装及使用

什么是Kafka

Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。该项目的目标是为处理实时数据提供一个统一、高吞吐、低延迟的平台。Kafka最初是由LinkedIn开发,并随后于2011年初开源。

 kafka软件结构

Kafka是一个结构相对简单的消息队列(MQ)软件

Kafka Cluster(Kafka集群)

Partition(分片)

Producer:消息的发送方,也就是消息的来源,Kafka中的生产者

> order就是消息的发送方

Consumer:消息的接收方,也是消息的目标,Kafka中的消费者

> stock就是消息的接收方

Topic:话题或主题的意思,消息的收发双方要依据同一个话题名称,才不会将信息错发给别人

Record:消息记录,就是生产者和消费者传递的信息内容,保存在指定的Topic中

Kafka的特征与优势

Kafka作为消息队列,它和其他同类产品相比,突出的特点就是性能强大

Kafka将消息队列中的信息保存在硬盘中

Kafka对硬盘的读取规则进行优化后,效率能够接近内存

硬盘的优化规则主要依靠"顺序读写,零拷贝,日志压缩等技术"

Kafka处理队列中数据的默认设置:

- Kafka队列信息能够一直向硬盘中保存(理论上没有大小限制)
- Kafka默认队列中的信息保存7天,可以配置这个时间,缩短这个时间可以减少Kafka的磁盘消耗

Kafka的安装和配置

必须将我们kafka软件的解压位置设置在一个根目录,文件夹名称尽量短(例如:kafka)

然后路径不要有空格和中文

我们要创建一个空目录用于保存Kafka运行过程中产生的数据

本次创建名称为data的空目录

下面进行Kafka启动前的配置

先到F:\kafka\config下配置有文件zookeeper.properties

找到dataDir属性修改如下

```
dataDir=F:/data
```

修改完毕之后要Ctrl+S进行保存,否则修改无效!!!!

注意F盘和data文件夹名称,匹配自己电脑的真实路径和文件夹名称

还要修改server.properties配置文件

```
log.dirs=F:/data
```

修改注意事项和上面相同

启动kafka

要想启动Kafka必须先启动Zookeeper

Zookeeper介绍

zoo:动物园

keeper:园长

可以引申为管理动物的人

Linux服务器中安装的各种软件,很多都是有动物动物形象的

如果这些软件在Linux中需要修改配置信息的话,就需要进入这个软件,去修改配置,每个软件都需要单独修改配置的话,工作量很大

我们使用Zookeeper之后,可以创建一个新的管理各种软件配置的文件管理系统

在Zookeeper中,可以修改服务器系统中的所有软件配置

长此以往,很多软件就删除了自己写配置文件的功能,而直接从Zookeeper中获取

Kafka就是需要将配置编写在Zookeeper中的软件之一

Zookeeper启动

进入路径F:\kafka\bin\windows

输入cmd进入dos命令行

```
F:\kafka\bin\windows>zookeeper-server-start.bat ..\..\config\zookeeper.properties
```

kafka启动

总体方式一样,输入不同指令

```
F:\kafka\bin\windows>kafka-server-start.bat ..\..\config\server.properties
```

**附录**

Mac系统启动Kafka服务命令(参考):

```
# 进入Kafka文件夹
cd Documents/kafka_2.13-2.4.1/bin/
# 动Zookeeper服务
./zookeeper-server-start.sh -daemon ../config/zookeeper.properties
# 启动Kafka服务
./kafka-server-start.sh -daemon ../config/server.properties
```

Mac系统关闭Kafka服务命令(参考):

```
# 关闭Kafka服务
./kafka-server-stop.sh
# 启动Zookeeper服务
./zookeeper-server-stop.sh
```

在启动kafka时有一个常见错误

```
wmic不是内部或外部命令
```

这样的提示,需要安装wmic命令,安装方式参考

https://zhidao.baidu.com/question/295061710.html

Kafka使用演示

启动的zookeeper和kafka的窗口不要关闭

我们在csmall项目中编写一个kafka使用的演示

csmall-cart-webapi模块

添加依赖


```xml
<!-- 使用Kafka 的依赖  -->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>
<!--    能够将java对象和json字符串相互转换的依赖     -->
<dependency>
    <groupId>com.google.code.gson</groupId>
    <artifactId>gson</artifactId>
</dependency>
```

修改yml文件进行配置

```yaml
spring:
  kafka:
    # 定义kafka的配置信息,先配置kafka的位置
    bootstrap-servers: localhost:9092
    # consumer.group-id是必须配置的信息
    # 配置的名称是"话题分组",是为了区分其它项目的话题而定义的分组名称
    # 本质上在运行时,当前项目所有话题名称都会用csmall做前缀
    consumer:
      group-id: csmall
```

在SpringBoot启动类中添加启动Kafka的注解

```java
@SpringBootApplication
@EnableDubbo
// 启动支持Kafka功能
@EnableKafka
// 为了测试Kafka发送和接收消息的效果
// 我们利用SpringBoot自带的任务调度工具,周期的发送消息到Kafka
@EnableScheduling
public class CsmallCartWebapiApplication {

    public static void main(String[] args) {
        SpringApplication.run(CsmallCartWebapiApplication.class, args);
    }

}

下面我们就可以实现周期性的向kafka发送消息并接收的操作了

编写消息的发送

cart-webapi包下创建kafka包

包中创建Producer类来发送消息

```java
// 周期性的向Kafka发送消息的类
// Producer实例化对象保存到Spring容器才能实现Springboot的周期调用
@Component
public class Producer {


    // 这个对象会在SpringBoot启动时,自动装配到Spring容器,能够操作Kafka发送消息
    // 需要时,从Spring容器中注入(获取)即可
    // KafkaTemplate<[话题的类型],[发送消息的类型]>
    @Autowired
    private KafkaTemplate<String,String> kafkaTemplate;

    // 每隔10秒向kafka发送消息的方法
    int i=1;
    // 设置fixedRate周期运行,单位是毫秒 10000毫秒是10秒
    @Scheduled(fixedRate = 10000)
    // SpringBoot项目启动后,就会周期运行这个方法
    public void sendMessage(){
        //实例化一个CartTb对象,当做消息发送
        CartTb cart=new CartTb();
        cart.setId(i++);
        cart.setCommodityCode("PC100");
        cart.setPrice(RandomUtils.nextInt(350)+50);
        cart.setCount(RandomUtils.nextInt(10)+1);
        cart.setUserId("UU100");
        // 我们可以利用Gson类型对象,将cart对象转换为json格式字符串
        Gson gson=new Gson();
        String json=gson.toJson(cart);
        System.out.println("本次发送的消息为:"+json);
        // 执行发送
        kafkaTemplate.send("myCart",json);
    }


}
```

创建一个叫Consumer的类来接收消息

> 接收消息的类可以是本模块的类,也可以是其它模块的类,编写的代码是完全一致


```java
// 因为Kafka接收消息的功能在SpringBoot中配置,所以当前类也必须保存在Spring容器中,才能生效
@Component
public class Consumer {

    // SpringKafka接收消息依靠的是一个监听机制
    // SpringKafka提供一个监听线程,实时关注Kafka的消息往来
    // 如果有消息使用指定的话题名称(myCart)发送到kafka,就会自动调用下面的方法
    @KafkaListener(topics = "myCart")
    // 下面的方法会在kafka中出现话题名称为myCart时运行,而且方法的参数就是消息对象
    public void received(ConsumerRecord<String,String> record){
        // 方法参数类型必须是ConsumerRecord
        // ConsumerRecord<[话题类型],[消息类型]>
        // record就是消息本身,下面获得具体的值
        String json=record.value();
        // 将json格式字符串转换为java对象
        Gson gson=new Gson();
        // 执行转换时,还有额外指定转换目标类型的反射
        CartTb cart=gson.fromJson(json,CartTb.class);
        System.out.println("接收到消息:"+cart);

    }

}

相关文章:

  • PHP PDO简单安全的用户注册和登录表单
  • 第7章 C语言的函数指针与指针函数 (三)
  • apt 的 update 和 upgrade 命令的区别是什么?
  • 【从零学Python】一些工具类的使用:SummaryWriter()、tqdm()
  • 计算机毕业设计ssm+vue基本微信小程序的考试刷题及分析系统小程序
  • MMlab 官方教程链接汇总
  • win10你的设备遇到问题,需要重启的五种解决方法
  • 【MyBatis笔记12】MyBatis中二级缓存相关配置内容
  • EasyExcel 官网观看建议
  • 再苦再累也必须要弄懂的:ES6的ES Module
  • K210使用Mx-yolov3训练
  • Springboot中日志的简单使用
  • 0. SQL细节要点
  • 网络安全——Cobaltstrike
  • 架构师的 36 项修炼第07讲:高性能系统架构设计
  • 【Amaple教程】5. 插件
  • 【腾讯Bugly干货分享】从0到1打造直播 App
  • css的样式优先级
  • ECMAScript6(0):ES6简明参考手册
  • Javascript基础之Array数组API
  • Java读取Properties文件的六种方法
  • Java知识点总结(JDBC-连接步骤及CRUD)
  • js算法-归并排序(merge_sort)
  • Ruby 2.x 源代码分析:扩展 概述
  • vagrant 添加本地 box 安装 laravel homestead
  • 自定义函数
  • “十年磨一剑”--有赞的HBase平台实践和应用之路 ...
  • JavaScript 新语法详解:Class 的私有属性与私有方法 ...
  • Unity3D - 异步加载游戏场景与异步加载游戏资源进度条 ...
  • ​业务双活的数据切换思路设计(下)
  • #数学建模# 线性规划问题的Matlab求解
  • (1)(1.8) MSP(MultiWii 串行协议)(4.1 版)
  • (1)SpringCloud 整合Python
  • (C#)Windows Shell 外壳编程系列4 - 上下文菜单(iContextMenu)(二)嵌入菜单和执行命令...
  • (C语言)球球大作战
  • (附源码)ssm捐赠救助系统 毕业设计 060945
  • (已解决)报错:Could not load the Qt platform plugin “xcb“
  • (译) 理解 Elixir 中的宏 Macro, 第四部分:深入化
  • (转)nsfocus-绿盟科技笔试题目
  • .NET 4.0中的泛型协变和反变
  • .NET CORE 3.1 集成JWT鉴权和授权2
  • .NET CORE Aws S3 使用
  • .Net Core 中间件验签
  • .net 设置默认首页
  • .NET6实现破解Modbus poll点表配置文件
  • ??eclipse的安装配置问题!??
  • @Transactional 详解
  • [2024最新教程]地表最强AGI:Claude 3注册账号/登录账号/访问方法,小白教程包教包会
  • [AI]ChatGPT4 与 ChatGPT3.5 区别有多大
  • [Android]如何调试Native memory crash issue
  • [Asp.net MVC]Bundle合并,压缩js、css文件
  • [bzoj1038][ZJOI2008]瞭望塔
  • [C# 基础知识系列]专题十六:Linq介绍
  • [CISCN2019 华东北赛区]Web2
  • [CTO札记]如何测试用户接受度?