当前位置: 首页 > news >正文

实时流式计算 kafkaStream

文章目录

  • 实时流式计算
  • Kafka Stream
  • Kafka Streams 的关键概念
  • KStream
  • Kafka Stream入门案例编写
  • SpringBoot 集成 Kafka Stream


实时流式计算

一般流式计算会与批量计算相比较

在这里插入图片描述

流式计算就相当于上图的右侧扶梯,是可以源源不断的产生数据,源源不断的接收数据,没有边界

一般流式计算会与批量计算相比较。在流式计算模型中,输入是持续的,可以认为在时间上是无界的,也就意味着,永远拿不到全量数据去做计算。同时,计算结果是持续输出的,也即计算结果在时间上也是无界的。
流式计算一般对实时性要求较高,同时一般是先定义目标计算,然后数据到来之后将计算逻辑应用于数据。同时为了提高计算效率,往往尽可能采用增量计算代替全量计算。

应用场景

  • 日志分析
    网站的用户访问日志进行实时的分析,计算访问量,用户画像,留存率等等,实时的进行数据分析,帮助企业进行决策
  • 大屏看板统计
    可以实时的查看网站注册数量,订单数量,购买数量,金额等。
  • 公交实时数据
    可以随时更新公交车方位,计算多久到达站牌等
  • 实时文章分值计算
    头条类文章的分值计算,通过用户的行为实时文章的分值,分值越高就越被推荐。

技术方案选型

  • Hadoop
  • Apche Storm
  • Flink
  • Kafka Stream

可以轻松地将其嵌入任何Java应用程序中,并与用户为其流应用程序所拥有的任何现有打包,部署和操作工具集成。

Kafka Stream

Kafka Stream:提供了对存储于 Kafka内 的数据进行流式处理分析的功能

Kafka Stream的特点如下:

  • Kafka Stream提供了一个非常简单而轻量的Library,它可以非常方便地嵌入任意Java应用中,也可以任意方式打包和部署
  • 除了Kafka外,无任何外部依赖
  • 充分利用Kafka分区机制实现水平扩展和顺序性保证
  • 通过可容错的state store实现高效的状态操作(如windowed join和aggregation)
  • 支持正好一次处理语义
  • 提供记录级的处理能力,从而实现毫秒级的低延迟
  • 支持基于事件时间的窗口操作,并且可处理晚到的数据(late arrival of records)
  • 同时提供底层的处理原语Processor(类似于Storm的spout和bolt),以及高层抽象的DSL(类似于Spark的map/group/reduce)

在这里插入图片描述

Kafka Streams 的关键概念

在这里插入图片描述

  • 源处理器(Source Processor):源处理器是一个没有任何上游处理器的特殊类型的流处理器。它从一个或多个kafka主题生成输入流。通过消费这些主题的消息并将它们转发到下游处理器。

  • Sink处理器:sink处理器是一个没有下游流处理器的特殊类型的流处理器。它接收上游流处理器的消息发送到一个指定的Kafka主题

KStream

  • 数据结构类似于map,如下图,key-value 键值对

在这里插入图片描述

KStream数据流(data stream),即是一段顺序的,可以无限长,不断更新的数据集。

在这里插入图片描述

Kafka Stream入门案例编写

  1. 需求分析,求单词个数(word count)

在这里插入图片描述

  1. 创建原生的 kafka staream 入门案例

导入依赖

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-streams</artifactId><exclusions><exclusion><artifactId>connect-json</artifactId><groupId>org.apache.kafka</groupId></exclusion><exclusion><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></exclusion></exclusions>
</dependency>

在这里插入图片描述

在这里插入图片描述

package com.heima.kafka.sample;import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueMapper;import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;/*** 流式处理*/
public class KafkaStreamQuickStart {public static void main(String[] args) {//kafka的配置信心Properties prop = new Properties();prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.130:9092");prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"streams-quickstart");//stream 构建器StreamsBuilder streamsBuilder = new StreamsBuilder();//流式计算streamProcessor(streamsBuilder);//创建kafkaStream对象KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(),prop);//开启流式计算kafkaStreams.start();}/*** 流式计算* 消息的内容:hello kafka  hello itcast* @param streamsBuilder*/private static void streamProcessor(StreamsBuilder streamsBuilder) {//创建kstream对象,同时指定从那个topic中接收消息KStream<String, String> stream = streamsBuilder.stream("itcast-topic-input");/*** 处理消息的value*/stream.flatMapValues(new ValueMapper<String, Iterable<String>>() {@Overridepublic Iterable<String> apply(String value) {return Arrays.asList(value.split(" "));}})//按照value进行聚合处理.groupBy((key,value)->value)//时间窗口.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))//统计单词的个数.count()//转换为kStream.toStream().map((key,value)->{System.out.println("key:"+key+",vlaue:"+value);return new KeyValue<>(key.key().toString(),value.toString());})//发送消息.to("itcast-topic-out");}
}
  1. 测试准备
  • 使用生产者在 topic 为:itcast_topic_input中发送多条消息
  • stream 接收 itcast_topic_input 的数据,进行聚合操作后,将处理结果发送到 itcast_topic_out
  • 使用消费者接收 topic 为:itcast_topic_out

结果:

  • 通过流式计算,会把生产者的多条消息汇总成一条发送到消费者中输出

SpringBoot 集成 Kafka Stream

  1. 配置

在这里插入图片描述

package com.heima.kafka.config;import lombok.Getter;
import lombok.Setter;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafkaStreams;
import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
import org.springframework.kafka.config.KafkaStreamsConfiguration;import java.util.HashMap;
import java.util.Map;/*** 通过重新注册KafkaStreamsConfiguration对象,设置自定配置参数*/@Setter
@Getter
@Configuration
@EnableKafkaStreams
@ConfigurationProperties(prefix="kafka")
public class KafkaStreamConfig {private static final int MAX_MESSAGE_SIZE = 16* 1024 * 1024;private String hosts;private String group;@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)public KafkaStreamsConfiguration defaultKafkaStreamsConfig() {Map<String, Object> props = new HashMap<>();props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);props.put(StreamsConfig.APPLICATION_ID_CONFIG, this.getGroup()+"_stream_aid");props.put(StreamsConfig.CLIENT_ID_CONFIG, this.getGroup()+"_stream_cid");props.put(StreamsConfig.RETRIES_CONFIG, 10);props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());return new KafkaStreamsConfiguration(props);}
}

application.yml

在这里插入图片描述

kafka:hosts: 192.168.200.130:9092group: ${spring.application.name}
  1. 在配置类中定义方法
  • 可注入StreamsBuilder
  • 返回值必须是KStream且放入spring容器中

在这里插入图片描述

package com.heima.kafka.stream;import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.time.Duration;
import java.util.Arrays;@Configuration
@Slf4j
public class KafkaStreamHelloListener {@Beanpublic KStream<String,String> kStream(StreamsBuilder streamsBuilder){//创建kstream对象,同时指定从那个topic中接收消息KStream<String, String> stream = streamsBuilder.stream("itcast-topic-input");stream.flatMapValues(new ValueMapper<String, Iterable<String>>() {@Overridepublic Iterable<String> apply(String value) {return Arrays.asList(value.split(" "));}})//根据value进行聚合分组.groupBy((key,value)->value)//聚合计算时间间隔.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))//求单词的个数.count().toStream()//处理后的结果转换为string字符串.map((key,value)->{System.out.println("key:"+key+",value:"+value);return new KeyValue<>(key.key().toString(),value.toString());})//发送消息.to("itcast-topic-out");return stream;}
}
  1. 测试

启动 springboot 项目即可自动监听

相关文章:

  • 【算法思考记录】力扣2653. 滑动子数组的美丽值【C++,滑动窗口】
  • 【算法】希尔排序
  • HR看好的字符函数和字符串处理函数!!!
  • [MySQL]日期和时间函数
  • 计算机网络体系的形成
  • leetcode977. 有序数组的平方
  • springBoot整合task
  • 【STL】手撕 string类
  • llama.cpp部署通义千问Qwen-14B
  • 五分钟带你看完黑客设备
  • WPF窗口样式的比较
  • Chrome显示分享按钮
  • 如何解决谷歌浏览器无法更新、谷歌翻译无法使用问题
  • JavaSE基础50题:7. 写一个方法返回参数二进制中1的个数(3种方法!)
  • go自定义端口监听停用-------解决端口被占用的问题
  • @jsonView过滤属性
  • [原]深入对比数据科学工具箱:Python和R 非结构化数据的结构化
  • 【Linux系统编程】快速查找errno错误码信息
  • canvas绘制圆角头像
  • JavaScript新鲜事·第5期
  • Linux编程学习笔记 | Linux IO学习[1] - 文件IO
  • Linux后台研发超实用命令总结
  • mysql中InnoDB引擎中页的概念
  • Redis的resp协议
  • 安卓应用性能调试和优化经验分享
  • 表单中readonly的input等标签,禁止光标进入(focus)的几种方式
  • 关于springcloud Gateway中的限流
  • 回顾2016
  • 体验javascript之美-第五课 匿名函数自执行和闭包是一回事儿吗?
  • 白色的风信子
  • Spring第一个helloWorld
  • ​Linux Ubuntu环境下使用docker构建spark运行环境(超级详细)
  • ​TypeScript都不会用,也敢说会前端?
  • # 透过事物看本质的能力怎么培养?
  • #13 yum、编译安装与sed命令的使用
  • #Js篇:单线程模式同步任务异步任务任务队列事件循环setTimeout() setInterval()
  • #pragma multi_compile #pragma shader_feature
  • ( 10 )MySQL中的外键
  • (1)Nginx简介和安装教程
  • (3)STL算法之搜索
  • (附源码)ssm高校运动会管理系统 毕业设计 020419
  • (五) 一起学 Unix 环境高级编程 (APUE) 之 进程环境
  • .class文件转换.java_从一个class文件深入理解Java字节码结构
  • .NET CF命令行调试器MDbg入门(一)
  • @FeignClient注解,fallback和fallbackFactory
  • @modelattribute注解用postman测试怎么传参_接口测试之问题挖掘
  • [2008][note]腔内级联拉曼发射的,二极管泵浦多频调Q laser——
  • [④ADRV902x]: Digital Filter Configuration(发射端)
  • [51nod1610]路径计数
  • [AIGC] 如何建立和优化你的工作流?
  • [Android 13]Input系列--获取触摸窗口
  • [BT]BUUCTF刷题第8天(3.26)
  • [BZOJ3211]:花神游历各国(小清新线段树)
  • [C#7] 1.Tuples(元组)
  • [cocos creator]EditBox,editing-return事件,清空输入框