消息队列kafka组件的安装及使用
什么是Kafka
Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。该项目的目标是为处理实时数据提供一个统一、高吞吐、低延迟的平台。Kafka最初是由LinkedIn开发,并随后于2011年初开源。
kafka软件结构
Kafka是一个结构相对简单的消息队列(MQ)软件
Kafka Cluster(Kafka集群)
Partition(分片)
Producer:消息的发送方,也就是消息的来源,Kafka中的生产者
> order就是消息的发送方
Consumer:消息的接收方,也是消息的目标,Kafka中的消费者
> stock就是消息的接收方
Topic:话题或主题的意思,消息的收发双方要依据同一个话题名称,才不会将信息错发给别人
Record:消息记录,就是生产者和消费者传递的信息内容,保存在指定的Topic中
Kafka的特征与优势
Kafka作为消息队列,它和其他同类产品相比,突出的特点就是性能强大
Kafka将消息队列中的信息保存在硬盘中
Kafka对硬盘的读取规则进行优化后,效率能够接近内存
硬盘的优化规则主要依靠"顺序读写,零拷贝,日志压缩等技术"
Kafka处理队列中数据的默认设置:
- Kafka队列信息能够一直向硬盘中保存(理论上没有大小限制)
- Kafka默认队列中的信息保存7天,可以配置这个时间,缩短这个时间可以减少Kafka的磁盘消耗
Kafka的安装和配置
必须将我们kafka软件的解压位置设置在一个根目录,文件夹名称尽量短(例如:kafka)
然后路径不要有空格和中文
我们要创建一个空目录用于保存Kafka运行过程中产生的数据
本次创建名称为data的空目录
下面进行Kafka启动前的配置
先到F:\kafka\config下配置有文件zookeeper.properties
找到dataDir属性修改如下
```
dataDir=F:/data
```
修改完毕之后要Ctrl+S进行保存,否则修改无效!!!!
注意F盘和data文件夹名称,匹配自己电脑的真实路径和文件夹名称
还要修改server.properties配置文件
```
log.dirs=F:/data
```
修改注意事项和上面相同
启动kafka
要想启动Kafka必须先启动Zookeeper
Zookeeper介绍
zoo:动物园
keeper:园长
可以引申为管理动物的人
Linux服务器中安装的各种软件,很多都是有动物动物形象的
如果这些软件在Linux中需要修改配置信息的话,就需要进入这个软件,去修改配置,每个软件都需要单独修改配置的话,工作量很大
我们使用Zookeeper之后,可以创建一个新的管理各种软件配置的文件管理系统
在Zookeeper中,可以修改服务器系统中的所有软件配置
长此以往,很多软件就删除了自己写配置文件的功能,而直接从Zookeeper中获取
Kafka就是需要将配置编写在Zookeeper中的软件之一
Zookeeper启动
进入路径F:\kafka\bin\windows
输入cmd进入dos命令行
```
F:\kafka\bin\windows>zookeeper-server-start.bat ..\..\config\zookeeper.properties
```
kafka启动
总体方式一样,输入不同指令
```
F:\kafka\bin\windows>kafka-server-start.bat ..\..\config\server.properties
```
**附录**
Mac系统启动Kafka服务命令(参考):
```
# 进入Kafka文件夹
cd Documents/kafka_2.13-2.4.1/bin/
# 动Zookeeper服务
./zookeeper-server-start.sh -daemon ../config/zookeeper.properties
# 启动Kafka服务
./kafka-server-start.sh -daemon ../config/server.properties
```
Mac系统关闭Kafka服务命令(参考):
```
# 关闭Kafka服务
./kafka-server-stop.sh
# 启动Zookeeper服务
./zookeeper-server-stop.sh
```
在启动kafka时有一个常见错误
```
wmic不是内部或外部命令
```
这样的提示,需要安装wmic命令,安装方式参考
https://zhidao.baidu.com/question/295061710.html
Kafka使用演示
启动的zookeeper和kafka的窗口不要关闭
我们在csmall项目中编写一个kafka使用的演示
csmall-cart-webapi模块
添加依赖
```xml
<!-- 使用Kafka 的依赖 -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<!-- 能够将java对象和json字符串相互转换的依赖 -->
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency>
```
修改yml文件进行配置
```yaml
spring:
kafka:
# 定义kafka的配置信息,先配置kafka的位置
bootstrap-servers: localhost:9092
# consumer.group-id是必须配置的信息
# 配置的名称是"话题分组",是为了区分其它项目的话题而定义的分组名称
# 本质上在运行时,当前项目所有话题名称都会用csmall做前缀
consumer:
group-id: csmall
```
在SpringBoot启动类中添加启动Kafka的注解
```java
@SpringBootApplication
@EnableDubbo
// 启动支持Kafka功能
@EnableKafka
// 为了测试Kafka发送和接收消息的效果
// 我们利用SpringBoot自带的任务调度工具,周期的发送消息到Kafka
@EnableScheduling
public class CsmallCartWebapiApplication {
public static void main(String[] args) {
SpringApplication.run(CsmallCartWebapiApplication.class, args);
}
}
下面我们就可以实现周期性的向kafka发送消息并接收的操作了
编写消息的发送
cart-webapi包下创建kafka包
包中创建Producer类来发送消息
```java
// 周期性的向Kafka发送消息的类
// Producer实例化对象保存到Spring容器才能实现Springboot的周期调用
@Component
public class Producer {
// 这个对象会在SpringBoot启动时,自动装配到Spring容器,能够操作Kafka发送消息
// 需要时,从Spring容器中注入(获取)即可
// KafkaTemplate<[话题的类型],[发送消息的类型]>
@Autowired
private KafkaTemplate<String,String> kafkaTemplate;
// 每隔10秒向kafka发送消息的方法
int i=1;
// 设置fixedRate周期运行,单位是毫秒 10000毫秒是10秒
@Scheduled(fixedRate = 10000)
// SpringBoot项目启动后,就会周期运行这个方法
public void sendMessage(){
//实例化一个CartTb对象,当做消息发送
CartTb cart=new CartTb();
cart.setId(i++);
cart.setCommodityCode("PC100");
cart.setPrice(RandomUtils.nextInt(350)+50);
cart.setCount(RandomUtils.nextInt(10)+1);
cart.setUserId("UU100");
// 我们可以利用Gson类型对象,将cart对象转换为json格式字符串
Gson gson=new Gson();
String json=gson.toJson(cart);
System.out.println("本次发送的消息为:"+json);
// 执行发送
kafkaTemplate.send("myCart",json);
}
}
```
创建一个叫Consumer的类来接收消息
> 接收消息的类可以是本模块的类,也可以是其它模块的类,编写的代码是完全一致
```java
// 因为Kafka接收消息的功能在SpringBoot中配置,所以当前类也必须保存在Spring容器中,才能生效
@Component
public class Consumer {
// SpringKafka接收消息依靠的是一个监听机制
// SpringKafka提供一个监听线程,实时关注Kafka的消息往来
// 如果有消息使用指定的话题名称(myCart)发送到kafka,就会自动调用下面的方法
@KafkaListener(topics = "myCart")
// 下面的方法会在kafka中出现话题名称为myCart时运行,而且方法的参数就是消息对象
public void received(ConsumerRecord<String,String> record){
// 方法参数类型必须是ConsumerRecord
// ConsumerRecord<[话题类型],[消息类型]>
// record就是消息本身,下面获得具体的值
String json=record.value();
// 将json格式字符串转换为java对象
Gson gson=new Gson();
// 执行转换时,还有额外指定转换目标类型的反射
CartTb cart=gson.fromJson(json,CartTb.class);
System.out.println("接收到消息:"+cart);
}
}