当前位置: 首页 > news >正文

Flink去重计数统计用户数

1.数据

订单表,分别是店铺id、用户id和支付金额

"店铺id,用户id,支付金额",
"shop-1,user-1,1",
"shop-1,user-2,1",
"shop-1,user-2,1",
"shop-1,user-3,1",
"shop-1,user-3,1",
"shop-1,user-1,1",
"shop-1,user-2,1",
"shop-1,user-4,1",
"shop-2,user-4,1",
"shop-2,user-4,1",
"shop-2,user-2,1"

2.可运行案例

import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;public class Test03 {public static void main(String[] args) throws Exception {// 1. 创建流式执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2.创建表执行环境StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);// 3.读取数据源SingleOutputStreamOperator<String> jsonStream = env.fromElements("shop-1,user-1,1","shop-1,user-2,1","shop-1,user-2,1","shop-1,user-3,1","shop-1,user-3,1","shop-1,user-1,1","shop-1,user-2,1","shop-1,user-4,1","shop-2,user-4,1","shop-2,user-4,1","shop-2,user-2,1");// 4.流转换为表Table table = tableEnv.fromDataStream(jsonStream);// 5. 把注册为一个临时视图tableEnv.createTemporaryView("tableTmp", table);// 6.求每个商店的用户数Table table1 = tableEnv.sqlQuery("select shop_id,sum(num) as num,sum(gmv) as gmv from (select shop_id,user_id, 1 as num,sum(gmv) as gmv from (select SPLIT_INDEX(f0,',',0) as shop_id,SPLIT_INDEX(f0,',',1) as user_id,cast(SPLIT_INDEX(f0,',',2) as bigint) as gmv from tableTmp) t1 group by shop_id,user_id) t2 group by shop_id");// 7.打印tableEnv.toRetractStream(table1, Row.class).print(">>>>>>");// 8.执行env.execute("test");}
}

sql:

selectshop_id,sum(num) as num,sum(gmv) as gmv
from(selectshop_id,user_id,1 as num,sum(gmv) as gmvfrom(selectSPLIT_INDEX(f0, ',', 0) as shop_id,SPLIT_INDEX(f0, ',', 1) as user_id,cast(SPLIT_INDEX(f0, ',', 2) as bigint) as gmvfromtableTmp) t1group byshop_id,user_id) t2
group byshop_id

3.运行结果

>>>>>>:7> (true,+U[shop-2, 2, 3])

>>>>>>:1> (true,+U[shop-1, 4, 8])  

>>>>>>:7> (true,+I[shop-2, 1, 1])
>>>>>>:1> (true,+I[shop-1, 1, 1])
>>>>>>:1> (false,-U[shop-1, 1, 1])
>>>>>>:7> (false,-U[shop-2, 1, 1])
>>>>>>:1> (true,+U[shop-1, 2, 2])
>>>>>>:7> (true,+U[shop-2, 2, 2])
>>>>>>:1> (false,-U[shop-1, 2, 2])
>>>>>>:7> (false,-U[shop-2, 2, 2])
>>>>>>:1> (true,+U[shop-1, 1, 1])
>>>>>>:7> (true,+U[shop-2, 1, 1])
>>>>>>:1> (false,-U[shop-1, 1, 1])
>>>>>>:7> (false,-U[shop-2, 1, 1])
>>>>>>:7> (true,+U[shop-2, 2, 3])
>>>>>>:1> (true,+U[shop-1, 2, 3])
>>>>>>:1> (false,-U[shop-1, 2, 3])
>>>>>>:1> (true,+U[shop-1, 3, 4])
>>>>>>:1> (false,-U[shop-1, 3, 4])
>>>>>>:1> (true,+U[shop-1, 2, 3])
>>>>>>:1> (false,-U[shop-1, 2, 3])
>>>>>>:1> (true,+U[shop-1, 3, 5])
>>>>>>:1> (false,-U[shop-1, 3, 5])
>>>>>>:1> (true,+U[shop-1, 2, 3])
>>>>>>:1> (false,-U[shop-1, 2, 3])
>>>>>>:1> (true,+U[shop-1, 3, 6])
>>>>>>:1> (false,-U[shop-1, 3, 6])
>>>>>>:1> (true,+U[shop-1, 4, 7])
>>>>>>:1> (false,-U[shop-1, 4, 7])
>>>>>>:1> (true,+U[shop-1, 3, 6])
>>>>>>:1> (false,-U[shop-1, 3, 6])
>>>>>>:1> (true,+U[shop-1, 4, 8])

4.原理

Flink回撤流原理

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • 【23.12.29期--Redis缓存篇】谈一谈Redis的集群模式
  • 鸿鹄电子招投标系统:基于Spring Boot、Mybatis、Redis和Layui的企业电子招采平台源码与立项流程
  • go-carbon v2.3.1 发布,轻量级、语义化、对开发者友好的 Golang 时间处理库
  • Python与ArcGIS系列(十七)GDAL之shp转geojson
  • 【HTML5】第1章 HTML5入门
  • QT UI自动化测试(1)
  • 从C到C++1
  • C++ 结构体(面向对象编程)
  • 【SpringCloud】-OpenFeign实战及源码解析、与Ribbon结合
  • Notepad++批量更改文件编码格式及文档格式
  • linux iptables简介
  • 14-网络安全框架及模型-分层防护模型
  • 第八周:AIPM面试准备
  • WPF+Halcon 培训项目实战(8):WPF+Halcon初次开发
  • 词法语法语义分析程序设计及实现,包含出错提示和错误恢复
  • 「前端早读君006」移动开发必备:那些玩转H5的小技巧
  • C++入门教程(10):for 语句
  • egg(89)--egg之redis的发布和订阅
  • java第三方包学习之lombok
  • Java基本数据类型之Number
  • js面向对象
  • learning koa2.x
  • pdf文件如何在线转换为jpg图片
  • React组件设计模式(一)
  • SpingCloudBus整合RabbitMQ
  • Vue学习第二天
  • webpack4 一点通
  • 第2章 网络文档
  • 服务器从安装到部署全过程(二)
  • 基于阿里云移动推送的移动应用推送模式最佳实践
  • 让你成为前端,后端或全栈开发程序员的进阶指南,一门学到老的技术
  • 小程序上传图片到七牛云(支持多张上传,预览,删除)
  • ​html.parser --- 简单的 HTML 和 XHTML 解析器​
  • #!/usr/bin/python与#!/usr/bin/env python的区别
  • (5)STL算法之复制
  • (delphi11最新学习资料) Object Pascal 学习笔记---第5章第5节(delphi中的指针)
  • (PHP)设置修改 Apache 文件根目录 (Document Root)(转帖)
  • (附源码)计算机毕业设计SSM保险客户管理系统
  • (贪心) LeetCode 45. 跳跃游戏 II
  • (一)、软硬件全开源智能手表,与手机互联,标配多表盘,功能丰富(ZSWatch-Zephyr)
  • (转)创业家杂志:UCWEB天使第一步
  • (转)关于pipe()的详细解析
  • (转)人的集合论——移山之道
  • .Net Core/.Net6/.Net8 ,启动配置/Program.cs 配置
  • .Net Core中的内存缓存实现——Redis及MemoryCache(2个可选)方案的实现
  • .NET程序集编辑器/调试器 dnSpy 使用介绍
  • .NET下的多线程编程—1-线程机制概述
  • .NET中使用Protobuffer 实现序列化和反序列化
  • @RequestBody与@ModelAttribute
  • [2669]2-2 Time类的定义
  • [ai笔记3] ai春晚观后感-谈谈ai与艺术
  • [ARC066F]Contest with Drinks Hard
  • [BZOJ 3531][Sdoi2014]旅行(树链剖分+线段树)
  • [C#]winform部署yolov9的onnx模型
  • [C++]拼图游戏