当前位置: 首页 > news >正文

Flink SQL 基础操作

Flink SQL是建立在Apache Flink之上的SQL处理引擎,它允许用户以SQL的方式处理流数据和批数据。以下是一些Flink SQL的基础操作:

一、环境准备

1.启动flink集群

./start-cluster.sh
  1. 启动sql-client
./sql-client.sh

二、数据源定义

  1. 创建表(Source):
  • 使用CREATE TABLE语句定义输入数据源,包括其schema、存储格式(如CSV、JSON等)以及连接器的配置(如Kafka、FileSystem等)。
  • 示例:
CREATE TABLE students (  id STRING,  name STRING,  age INT,  sex STRING,  clazz STRING  
) WITH (  'connector' = 'kafka',  'topic' = 'students',  'properties.bootstrap.servers' = 'localhost:9092',  'format' = 'csv'  
);

三、数据处理

  1. 编写SQL查询:
  • 使用标准的SQL语句对数据进行查询、过滤、聚合等操作。
  • 示例:
SELECT id, name, age  
FROM students  
WHERE age > 18;

四、数据输出

  1. 创建表(Sink):
  • 使用CREATE TABLE语句定义输出数据源,用于将处理后的数据写入外部系统,如Kafka、数据库等。
  • 示例:
CREATE TABLE results (  id STRING,  name STRING,  age INT  
) WITH (  'connector' = 'kafka',  'topic' = 'results',  'properties.bootstrap.servers' = 'localhost:9092',  'format' = 'csv'  
);
  1. 插入数据:
  • 使用INSERT INTO语句将查询结果写入Sink表。
  • 示例:
INSERT INTO results  
SELECT id, name, age  
FROM students  
WHERE age > 18;

五、执行与监控

  1. 执行SQL语句:
  • 在Flink SQL客户端或程序中执行SQL语句。
  • 可以通过Flink的Dashboard或其他监控工具来查看作业的执行状态和性能指标。
  1. 结果展示:
  • Flink SQL客户端支持多种结果显示模式,如表格模式、变更日志模式和Tableau模式,可以根据需要设置。

六、其他操作

  1. 动态表:
  • Flink SQL中的表是动态表,支持对流数据的实时查询和处理。
  1. Join操作:
  • Flink SQL支持多种Join方式,包括Regular Joins、Interval Joins、Temporal Joins和Lookup Joins,用于处理表之间的关联查询。
  1. 窗口函数:
  • Flink SQL支持窗口函数,用于对时间序列数据进行分组和聚合操作。

注意事项

  • 在进行Flink SQL操作时,需要确保已经正确配置了Flink环境,并且已经添加了必要的依赖库。
  • Flink SQL的语法和功能可能会随着Flink版本的更新而发生变化,因此建议查阅最新的官方文档以获取准确的信息。

样例操作

1、 从csv中读取数据

CREATE TABLE well_casting_alarm (_id VARCHAR,comCode VARCHAR,wellCode VARCHAR,uuid VARCHAR,type INT,alarmType INT,alarmGrade INT,zp INT,startAlarmTime TIME,startAlarmValue DECIMAL,threshold INT,warnStatus INT,isDeal INT,createTime TIME,_class VARCHAR
) WITH ( 'connector' = 'filesystem','path' = '/wfg/data/sjzz.wellCastingAlarm0606.csv','format' = 'csv'
);

2、查看所有表

Flink SQL> show tables;
+----------------------+
|           table name |
+----------------------+
| employee_information |
|   well_casting_alarm |
+----------------------+
2 rows in set

3、删除表

DROP TABLE well_casting_alarm;

4、查询数据

select *from well_casting_alarm limit 1;

5、删除一条数据

DELETE FROM well_casting_alarm where '_id'='_id';

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • 注解Spring @AliasFor使用笔记
  • 知识点——样本间独立性,传统表征学习,显式物理连接,隐含交互,噪声,类相关类无关
  • 从零开始的CPP(37)跳跃游戏,动态规划,贪心算法
  • 纷享销客CRM AI产品架构概览、产品特色
  • Github 2024-08-09 开源项目日报 Top10
  • git的一些操作指令
  • 工作随记:oracle中偶发遇到存储过程编辑,删除等卡死问题
  • 下一代 AI 搜索引擎 MindSearch:多智能体 + 系统2,模拟人类认知过程的 AI 搜索引擎
  • 在Ubuntu 18.04上安装和配置pgAdmin 4服务器模式的方法
  • Docker最佳实践(六):安装Nacos
  • 9.动态导航栏怎么做
  • uniapp微信小程序 canvas绘制圆形半透明阴影 createCircularGradient函数不支持透明度部分解决方案
  • 100道C/C++面试题
  • mysql8.4.2数据库做主从复制
  • 【Python基础】Python六种标准数据类型中哪些是可变数据,哪些是不可变数据
  • SegmentFault for Android 3.0 发布
  • [deviceone开发]-do_Webview的基本示例
  • [译]CSS 居中(Center)方法大合集
  • el-input获取焦点 input输入框为空时高亮 el-input值非法时
  • JavaScript设计模式之工厂模式
  • Less 日常用法
  • Mysql5.6主从复制
  • nodejs调试方法
  • opencv python Meanshift 和 Camshift
  • react-core-image-upload 一款轻量级图片上传裁剪插件
  • Stream流与Lambda表达式(三) 静态工厂类Collectors
  • VuePress 静态网站生成
  • 面试题:给你个id,去拿到name,多叉树遍历
  • 思考 CSS 架构
  • 小程序滚动组件,左边导航栏与右边内容联动效果实现
  • 异步
  • LIGO、Virgo第三轮探测告捷,同时探测到一对黑洞合并产生的引力波事件 ...
  • 阿里云API、SDK和CLI应用实践方案
  • ​LeetCode解法汇总1276. 不浪费原料的汉堡制作方案
  • # Kafka_深入探秘者(2):kafka 生产者
  • #etcd#安装时出错
  • (55)MOS管专题--->(10)MOS管的封装
  • (C#)if (this == null)?你在逗我,this 怎么可能为 null!用 IL 编译和反编译看穿一切
  • (SpringBoot)第七章:SpringBoot日志文件
  • (二开)Flink 修改源码拓展 SQL 语法
  • (附源码)php新闻发布平台 毕业设计 141646
  • (附源码)基于SSM多源异构数据关联技术构建智能校园-计算机毕设 64366
  • (附源码)计算机毕业设计SSM基于java的云顶博客系统
  • (企业 / 公司项目)前端使用pingyin-pro将汉字转成拼音
  • (四) Graphivz 颜色选择
  • (学习日记)2024.02.29:UCOSIII第二节
  • (转) ns2/nam与nam实现相关的文件
  • (转)scrum常见工具列表
  • (转载)PyTorch代码规范最佳实践和样式指南
  • *(长期更新)软考网络工程师学习笔记——Section 22 无线局域网
  • .NET C# 使用 SetWindowsHookEx 监听鼠标或键盘消息以及此方法的坑
  • .NET Micro Framework初体验
  • .NET 发展历程
  • .NET 跨平台图形库 SkiaSharp 基础应用
  • .net生成的类,跨工程调用显示注释