当前位置: 首页 > news >正文

记录使用FlinkSql进行实时工作流开发

使用FlinkSql进行实时工作流开发

  • 引言
  • Flink SQL实战
    • 常用的Connector
      • 1. MySQL-CDC 连接器配置
      • 2. Kafka 连接器配置
      • 3. JDBC 连接器配置
      • 4. RabbitMQ 连接器配置
      • 5. REST Lookup 连接器配置
      • 6. HDFS 连接器配置
    • FlinkSql数据类型
      • 1. 基本数据类型
      • 2. 字符串数据类型
      • 3. 日期和时间数据类型
      • 4. 复杂数据类型
      • 5. 特殊数据类型
      • 数据类型的使用示例

引言

在这里插入图片描述

在大数据时代,实时数据分析和处理变得越来越重要。Apache Flink,作为流处理领域的佼佼者,提供了一套强大的工具集来处理无界和有界数据流。其中,Flink SQL是其生态系统中一个重要的组成部分,允许用户以SQL语句的形式执行复杂的数据流操作,极大地简化了实时数据处理的开发流程。

什么是Apache Flink?

Apache Flink是一个开源框架,用于处理无边界(无尽)和有边界(有限)数据流。它提供了低延迟、高吞吐量和状态一致性,使开发者能够构建复杂的实时应用和微服务。Flink的核心是流处理引擎,它支持事件时间处理、窗口操作以及精确一次的状态一致性。

为什么选择Flink SQL?

易用性:Flink SQL使得非专业程序员也能快速上手,使用熟悉的SQL语法进行实时数据查询和处理。
灵活性:可以无缝地将SQL与Java/Scala API结合使用,为用户提供多种编程模型的选择。
性能:利用Flink的高性能流处理引擎,Flink SQL能够实现实时响应和低延迟处理。
集成能力:支持多种数据源和数据接收器,如Kafka、JDBC、HDFS等,易于集成到现有的数据生态系统中。

Flink SQL实战

常用的Connector

在配置FlinkSQL实时开发时,使用mysql-cdc、Kafka、jdbc和rabbitmq作为连接器是一个很常见的场景。以下是详细的配置说明,你可以基于这些信息来撰写你的博客:

1. MySQL-CDC 连接器配置

MySQL-CDC(Change Data Capture)连接器用于捕获MySQL数据库中的变更数据。配置示例如下:

CREATE TABLE mysql_table (-- 定义表结构id INT,name STRING,-- 其他列
) WITH ('connector' = 'mysql-cdc',  			-- 使用mysql-cdc连接器'hostname' = 'mysql-host',  			-- MySQL服务器主机名'port' = '3306',            			-- MySQL端口号'username' = 'user',        			-- MySQL用户名'password' = 'password',    			-- MySQL密码'database-name' = 'db',     			-- 数据库名'table-name' = 'table'      			-- 表名'server-time-zone' = 'GMT+8',           -- 服务器时区'debezium.snapshot.mode' = 'initial',  	-- 初始快照模式,initial表示从头开始读取所有数据;latest-offset表示从最近的偏移量开始读取;timestamp则可以指定一个时间戳,从该时间戳之后的数据开始读取。'scan.incremental.snapshot.enabled' = 'true'	-- 可选,设置为true时,Flink会尝试维护一个数据库表的增量快照。这意味着Flink不会每次都重新读取整个表,而是只读取自上次读取以来发生变化的数据。这样可以显著提高读取效率,尤其是在处理大量数据且频繁更新的场景下。'scan.incremental.snapshot.chunk.size' = '1024'  -- 可选, 增量快照块大小'debezium.snapshot.locking.mode' = 'none', 		 -- 可选,控制在快照阶段锁定表的方式,以防止数据冲突。none表示不锁定,lock-tables表示锁定整个表,transaction表示使用事务来锁定。'debezium.properties.include-schema-changes' = 'true',  -- 可选,如果设置为true,则在CDC事件中会包含模式变更信息。'debezium.properties.table.whitelist' = 'mydatabase.mytable',  -- 可选,指定要监控的表的白名单。如果table-name未设置,可以通过这个属性来指定。'debezium.properties.database.history' = 'io.debezium.relational.history.FileDatabaseHistory'  -- 可选,设置数据库历史记录的实现类,通常使用FileDatabaseHistory来保存历史记录,以便在重启后能恢复状态。
);

2. Kafka 连接器配置

Kafka连接器用于读写Kafka主题中的数据。配置示例如下:

CREATE TABLE kafka_table (-- 定义表结构id INT,name STRING,-- 其他列
) WITH ('connector' = 'kafka',      -- 使用kafka连接器'topic' = 'topic_name',     -- Kafka主题名'properties.bootstrap.servers' = 'kafka-broker:9092',  -- Kafka服务器地址'format' = 'json'           -- 数据格式,例如json'properties.group.id' = 'flink-consumer-group',  -- 消费者组ID'scan.startup.mode' = 'earliest-offset',  -- 启动模式(earliest-offset, latest-offset, specific-offset, timestamp)'format' = 'json',  -- 数据格式'json.fail-on-missing-field' = 'false',  -- 是否在字段缺失时失败'json.ignore-parse-errors' = 'true',     -- 是否忽略解析错误'properties.security.protocol' = 'SASL_SSL', -- 安全协议(可选)'properties.sasl.mechanism' = 'PLAIN',       -- SASL机制(可选)'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username="user" password="password";'  -- SASL配置(可选)
);

3. JDBC 连接器配置

JDBC连接器用于与其他关系型数据库进行交互。配置示例如下:

CREATE TABLE jdbc_table (-- 定义表结构id INT,name STRING,-- 其他列
) WITH ('connector' = 'jdbc',       -- 使用jdbc连接器'url' = 'jdbc:mysql://mysql-host:3306/db',  -- JDBC连接URL'table-name' = 'table_name', -- 数据库表名'username' = 'user',        -- 数据库用户名'password' = 'password'     -- 数据库密码'driver' = 'com.mysql.cj.jdbc.Driver',   -- JDBC驱动类'lookup.cache.max-rows' = '5000',        -- 可选,查找缓存的最大行数'lookup.cache.ttl' = '10min',            -- 可选,查找缓存的TTL(时间到期)'lookup.max-retries' = '3',              -- 可选,查找的最大重试次数'sink.buffer-flush.max-rows' = '1000',   -- 可选,缓冲区刷新最大行数'sink.buffer-flush.interval' = '2s'      -- 可选,缓冲区刷新间隔
);

4. RabbitMQ 连接器配置

RabbitMQ连接器用于与RabbitMQ消息队列进行交互。配置示例如下:

CREATE TABLE rabbitmq_table (-- 定义表结构id INT,name STRING,-- 其他列
) WITH ('connector' = 'rabbitmq',   -- 使用rabbitmq连接器'host' = 'rabbitmq-host',   -- RabbitMQ主机名'port' = '5672',            -- RabbitMQ端口号'username' = 'user',        -- RabbitMQ用户名'password' = 'password',    -- RabbitMQ密码'queue' = 'queue_name',     -- RabbitMQ队列名'exchange' = 'exchange_name' -- RabbitMQ交换机名'routing-key' = 'routing_key',   -- 路由键'delivery-mode' = '2',           -- 投递模式(2表示持久)'format' = 'json',               -- 数据格式'json.fail-on-missing-field' = 'false',  -- 是否在字段缺失时失败'json.ignore-parse-errors' = 'true'      -- 是否忽略解析错误
);

5. REST Lookup 连接器配置

REST Lookup 连接器允许在 SQL 查询过程中,通过 REST API 进行查找操作。

CREATE TABLE rest_table (id INT,name STRING,price DECIMAL(10, 2),PRIMARY KEY (id) NOT ENFORCED
) WITH ('connector' = 'rest-lookup','url' = 'http://api.example.com/user/{id}',  -- REST API URL,使用占位符 {product_id}'lookup-method' = 'POST'	-- 'GET' 或 'POST''format' = 'json',  -- 数据格式'asyncPolling' = 'false'	-- 可选,指定查找操作是否使用异步轮询模式。默认值为 'false'。当设置为 'true' 时,查找操作会以异步方式执行,有助于提高性能。'gid.connector.http.source.lookup.header.Content-Type' = 'application/json'	-- 可选,设置 Content-Type 请求头。用于指定请求体的媒体类型。例如,设置为 application/json 表示请求体是 JSON 格式。'gid.connector.http.source.lookup.header.Origin' = '*'	-- 可选,设置 Origin 请求头。通常用于跨域请求。'gid.connector.http.source.lookup.header.X-Content-Type-Options' = 'nosniff'	-- 可选,设置 X-Content-Type-Options 请求头。用于防止 MIME 类型混淆攻击。'json.fail-on-missing-field' = 'false',  -- 可选,是否在字段缺失时失败'json.ignore-parse-errors' = 'true'  -- 可选,是否忽略解析错误'lookup.cache.max-rows' = '5000',  -- 可选,查找缓存的最大行数'lookup.cache.ttl' = '10min',  -- 可选,查找缓存的TTL(时间到期)'lookup.max-retries' = '3'  -- 可选,查找的最大重试次数
);

6. HDFS 连接器配置

HDFS connector用于读取或写入Hadoop分布式文件系统中的数据。

创建HDFS Source

CREATE TABLE hdfsSource (line STRING
) WITH ('connector' = 'filesystem','path' = 'hdfs://localhost:9000/data/input',		-- HDFS上的路径。'format' = 'csv'		-- 文件格式。
);

创建HDFS Sink

CREATE TABLE hdfsSink (line STRING
) WITH ('connector' = 'filesystem','path' = 'hdfs://localhost:9000/data/output','format' = 'csv'
);

FlinkSql数据类型

在FlinkSQL中,数据类型的选择和定义是非常重要的,因为它们直接影响数据的存储和处理方式。FlinkSQL提供了多种数据类型,可以满足各种业务需求。以下是FlinkSQL中的常见数据类型及其详细介绍:

1. 基本数据类型

  • BOOLEAN: 布尔类型,表示TRUEFALSE

    CREATE TABLE example_table (is_active BOOLEAN
    );
    
  • TINYINT: 8位带符号整数,范围是-128127

    CREATE TABLE example_table (tiny_value TINYINT
    );
    
  • SMALLINT: 16位带符号整数,范围是-3276832767

    CREATE TABLE example_table (small_value SMALLINT
    );
    
  • INT: 32位带符号整数,范围是-21474836482147483647

    CREATE TABLE example_table (int_value INT
    );
    
  • BIGINT: 64位带符号整数,范围是-92233720368547758089223372036854775807

    CREATE TABLE example_table (big_value BIGINT
    );
    
  • FLOAT: 单精度浮点数。

    CREATE TABLE example_table (float_value FLOAT
    );
    
  • DOUBLE: 双精度浮点数。

    CREATE TABLE example_table (double_value DOUBLE
    );
    
  • DECIMAL(p, s): 精确数值类型,p表示总精度,s表示小数位数。

    CREATE TABLE example_table (decimal_value DECIMAL(10, 2)
    );
    

2. 字符串数据类型

  • CHAR(n): 定长字符串,n表示字符串的长度。

    CREATE TABLE example_table (char_value CHAR(10)
    );
    
  • VARCHAR(n): 可变长字符串,n表示最大长度。

    CREATE TABLE example_table (varchar_value VARCHAR(255)
    );
    
  • STRING: 可变长字符串,无长度限制。

    CREATE TABLE example_table (string_value STRING
    );
    

3. 日期和时间数据类型

  • DATE: 日期类型,格式为YYYY-MM-DD

    CREATE TABLE example_table (date_value DATE
    );
    
  • TIME§: 时间类型,格式为HH:MM:SSp表示秒的小数位精度。

    CREATE TABLE example_table (time_value TIME(3)
    );
    
  • TIMESTAMP§: 时间戳类型,格式为YYYY-MM-DD HH:MM:SS.sssp表示秒的小数位精度。

    CREATE TABLE example_table (timestamp_value TIMESTAMP(3)
    );
    
  • TIMESTAMP§ WITH LOCAL TIME ZONE: 带有本地时区的时间戳类型。

    CREATE TABLE example_table (local_timestamp_value TIMESTAMP(3) WITH LOCAL TIME ZONE
    );
    

4. 复杂数据类型

  • ARRAY: 数组类型,T表示数组中的元素类型。

    CREATE TABLE example_table (array_value ARRAY<INT>
    );
    
  • MAP<K, V>: 键值对映射类型,K表示键的类型,V表示值的类型。

    CREATE TABLE example_table (map_value MAP<STRING, INT>
    );
    
  • ROW<…>: 行类型,可以包含多个字段,每个字段可以有不同的类型。

    CREATE TABLE example_table (row_value ROW<name STRING, age INT>
    );
    

5. 特殊数据类型

  • BINARY(n): 定长字节数组,n表示长度。

    CREATE TABLE example_table (binary_value BINARY(10)
    );
    
  • VARBINARY(n): 可变长字节数组,n表示最大长度。

    CREATE TABLE example_table (varbinary_value VARBINARY(255)
    );
    

数据类型的使用示例

以下是一个包含各种数据类型的表的定义示例:

CREATE TABLE example_table (id INT,name STRING,is_active BOOLEAN,salary DECIMAL(10, 2),birth_date DATE,join_time TIMESTAMP(3),preferences ARRAY<STRING>,attributes MAP<STRING, STRING>,address ROW<street STRING, city STRING, zip INT>
);

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • 如何强化学习神经网络
  • 进程状态(一)---- 运行,阻塞,挂起
  • ESP8266+STM32+阿里云保姆级教程(AT指令+MQTT)
  • HBuilder在uni-admin实现unicloud-map中poi管理
  • linux 虚拟机解压arm-linux-gcc-4.6.4-arm-x86_64.tar.bz2并arm-linux-gcc
  • openEuler使用mariadb
  • C# 组合CancellationTokenSource的使用
  • 搭建日志系统ELK(二)
  • arkhamintelligence 请求头加密 X-Payload 完整逆向分析+自动化解决方案
  • 【CTFWP】ctfshow-web42-52
  • python颠倒一下列表
  • C# LinqToExcel 读取Excel
  • 重生之我们在ES顶端相遇第9 章- 搜索框最常用的功能 - 搜索建议
  • 一个超强的Python机器学习超参优化库
  • QtQuick Text-对齐方式
  • [PHP内核探索]PHP中的哈希表
  • 分享一款快速APP功能测试工具
  • Electron入门介绍
  • Hibernate最全面试题
  • iOS小技巧之UIImagePickerController实现头像选择
  • JavaScript 基础知识 - 入门篇(一)
  • JavaScript 无符号位移运算符 三个大于号 的使用方法
  • Linux各目录及每个目录的详细介绍
  • 记一次删除Git记录中的大文件的过程
  • 精益 React 学习指南 (Lean React)- 1.5 React 与 DOM
  • 算法系列——算法入门之递归分而治之思想的实现
  • 通过来模仿稀土掘金个人页面的布局来学习使用CoordinatorLayout
  • 我从编程教室毕业
  • 2017年360最后一道编程题
  • 3月27日云栖精选夜读 | 从 “城市大脑”实践,瞭望未来城市源起 ...
  • Linux权限管理(week1_day5)--技术流ken
  • MPAndroidChart 教程:Y轴 YAxis
  • 我们雇佣了一只大猴子...
  • ​中南建设2022年半年报“韧”字当头,经营性现金流持续为正​
  • # C++之functional库用法整理
  • #HarmonyOS:基础语法
  • #pragma预处理命令
  • #在线报价接单​再坚持一下 明天是真的周六.出现货 实单来谈
  • (剑指Offer)面试题41:和为s的连续正数序列
  • (四)JPA - JQPL 实现增删改查
  • (转) Face-Resources
  • (转)ObjectiveC 深浅拷贝学习
  • (转)利用PHP的debug_backtrace函数,实现PHP文件权限管理、动态加载 【反射】...
  • .apk 成为历史!
  • .NET : 在VS2008中计算代码度量值
  • .net 流——流的类型体系简单介绍
  • .NET/C# 使用 #if 和 Conditional 特性来按条件编译代码的不同原理和适用场景
  • .Net程序猿乐Android发展---(10)框架布局FrameLayout
  • /使用匿名内部类来复写Handler当中的handlerMessage()方法
  • ?.的用法
  • [ 2222 ]http://e.eqxiu.com/s/wJMf15Ku
  • [C#]科学计数法(scientific notation)显示为正常数字
  • [C#]猫叫人醒老鼠跑 C#的委托及事件
  • [ChromeApp]指南!让你的谷歌浏览器好用十倍!
  • [DAU-FI Net开源 | Dual Attention UNet+特征融合+Sobel和Canny等算子解决语义分割痛点]