当前位置: 首页 > news >正文

FlinkSql一个简单的测试程序

FlinkSql一个简单的测试程序

以下是一个简单的 Flink SQL 示例,展示了如何使用 Flink Table API 和 Flink SQL 进行基本的数据流处理。


  1. 定义数据实体 CC :
    - CC 类表示数据流中的元素,包含两个字段: character (字符)和 count (计数)。
    - 提供了无参构造函数和带参构造函数,用于创建 CC 对象。
    // 1. 定义数据实体public static class CC {public String character;public long count;public CC() {}public CC(String character, long count) {this.character = character;this.count = count;}} 

  1. 创建执行环境并模拟数据流:
    - 创建了 Flink 执行环境 StreamExecutionEnvironment 和 StreamTableEnvironment 。
    - 创建了一个包含字符串元素的数据流 inputStream ,其中包括 “hello”, “world” 和 “!!!”。
        // 2. 创建执行环境并模拟数据流StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, environmentSettings);DataStream<String> inputStream = env.fromElements("hello","world","!!!").uid("source").name("source");

  1. 对数据流进行 flatMap 操作:
    - 使用 flatMap 对每个输入字符串进行拆分,并将每个字符映射为一个 CC 对象。
        // 3. 对数据流进行flatMap()操作SingleOutputStreamOperator<CC> streamOperator = inputStream.flatMap(new FlatMapFunction<String, CC>() {@Overridepublic void flatMap(String value, Collector<CC> out) throws Exception {for (char c : value.toCharArray()) {out.collect(new CC(c + "",1L));}}});

  1. 将数据流转为 Table :
    - 使用 tableEnv.fromDataStream 将 streamOperator 转换为一个 Table 对象。
        // 4. 将数据流转为TableTable table = tableEnv.fromDataStream(streamOperator);

  1. 使用 Table API 操作数据流:
    - 对 table 进行选择和过滤操作,保留字符不为空的记录。
    - 对过滤后的数据进行分组,并计算每个字符的计数总和,将结果存储在 result 中。
        // 5. 使用tableApi操作数据流,并输出结果Table filter = table.select($("character"), $("count")).filter($("character").isNotEqual(""));Table result = filter.groupBy($("character")).select($("character"), $("count").sum().as("character_count"));tableEnv.toRetractStream(result, Row.class).print();

  1. 使用 Flink SQL 操作数据流:
    - 将 table 注册为临时视图 “CC”。
    - 执行 SQL 查询,对 “CC” 进行分组,计算每个字符的计数总和,并将结果存储在 result2 中。
        // 6. 使用FlinkSql操作数据流,并输出结果tableEnv.createTemporaryView("CC", table);Table result2 = tableEnv.sqlQuery("SELECT `character`, SUM(`count`) FROM CC group by `character`");tableEnv.toRetractStream(result2, Row.class).print();

  1. 执行任务:
    - 使用 env.execute(“Flink Sql Test”) 启动 Flink 作业,处理数据流并输出结果。
        // 7.执行任务env.execute("Flink Sql Test");

  1. 执行结果:
(true,+I[h, 1])
(true,+I[e, 1])
(true,+I[l, 1])
(false,-U[l, 1])
(true,+U[l, 2])
(true,+I[o, 1])
(true,+I[w, 1])
(false,-U[o, 1])
(true,+U[o, 2])
(true,+I[r, 1])
(false,-U[l, 2])
(true,+U[l, 3])
(true,+I[d, 1])
(true,+I[!, 1])
(false,-U[!, 1])
(true,+U[!, 2])
(false,-U[!, 2])
(true,+U[!, 3])Process finished with exit code 0

通过这段代码,您可以了解如何使用 Flink Table API 和 Flink SQL 对数据流进行简单的处理和分析,包括数据拆分、选择、过滤、分组和计算。最后,通过 toRetractStream 方法将结果打印输出。

相关文章:

  • Docker中部署flink集群的两种方式
  • SQL字符集
  • Web 前端 UI 框架Bootstrap简介与基本使用
  • 手拉手Vite+Vue3+TinyVue+Echarts+TailwindCSS
  • 武汉AAA企业信用等级认证
  • 【MATLAB】 EWT信号分解+FFT傅里叶频谱变换组合算法
  • 【DAY03 软考中级备考笔记】存储系统,总线系统,输入输出系统和可靠性
  • verilog学习
  • vue 使用docx库生成word表格文档
  • 编程笔记 Golang基础 013 格式化输入输出
  • 企业级SAS盘SSDPM1643a PM1653 Nytro 2050 KPM71VUG3T20固态硬盘
  • Rust-知多少?
  • mysql-多表查询-内连接
  • java日志框架总结(六、logback日志框架 扩展)
  • 解决 PLC QModbusTcpClient 通信自动断开
  • Linux编程学习笔记 | Linux IO学习[1] - 文件IO
  • Spark in action on Kubernetes - Playground搭建与架构浅析
  • webpack入门学习手记(二)
  • 创建一个Struts2项目maven 方式
  • 大快搜索数据爬虫技术实例安装教学篇
  • 读懂package.json -- 依赖管理
  • 开发了一款写作软件(OSX,Windows),附带Electron开发指南
  • 聊一聊前端的监控
  • 模型微调
  • 你真的知道 == 和 equals 的区别吗?
  • 前端每日实战 2018 年 7 月份项目汇总(共 29 个项目)
  • 算法-插入排序
  • 体验javascript之美-第五课 匿名函数自执行和闭包是一回事儿吗?
  • Nginx实现动静分离
  • Prometheus VS InfluxDB
  • Salesforce和SAP Netweaver里数据库表的元数据设计
  • 关于Android全面屏虚拟导航栏的适配总结
  • ​猴子吃桃问题:每天都吃了前一天剩下的一半多一个。
  • (2)STL算法之元素计数
  • (33)STM32——485实验笔记
  • (52)只出现一次的数字III
  • (C++17) std算法之执行策略 execution
  • (附源码)ssm失物招领系统 毕业设计 182317
  • (附源码)计算机毕业设计ssm基于Internet快递柜管理系统
  • (使用vite搭建vue3项目(vite + vue3 + vue router + pinia + element plus))
  • (算法)N皇后问题
  • (已更新)关于Visual Studio 2019安装时VS installer无法下载文件,进度条为0,显示网络有问题的解决办法
  • (转)Linux NTP配置详解 (Network Time Protocol)
  • (转)淘淘商城系列——使用Spring来管理Redis单机版和集群版
  • .a文件和.so文件
  • .Net(C#)常用转换byte转uint32、byte转float等
  • .NET开发不可不知、不可不用的辅助类(一)
  • .NET学习教程二——.net基础定义+VS常用设置
  • /etc/sudoers (root权限管理)
  • :“Failed to access IIS metabase”解决方法
  • @EventListener注解使用说明
  • @kafkalistener消费不到消息_消息队列对战之RabbitMq 大战 kafka
  • @Responsebody与@RequestBody
  • [ C++ ] STL---仿函数与priority_queue
  • [ vulhub漏洞复现篇 ] JBOSS AS 4.x以下反序列化远程代码执行漏洞CVE-2017-7504