当前位置: 首页 > news >正文

Flink Sql Redis Connector

经常做开发的小伙伴肯定知道用flink连接redis的时候比较麻烦,更麻烦的是解析redis数据,如果rdis可以普通数据库那样用flink sql连接并且数据可以像表格那样展示出来就会非常方便。

历时多天,我终于把flink sql redis connector写出来了,并且已经测试过可以用sql解析数据,下面直接展示写好的代码和执行结果,完整的代码可以在我的github上面看:https://github.com/niuhu3/flink_sql_redis_connector.git

目前该connector已提交给flink,详见:[FLINK-35588] flink sql redis connector - ASF JIRA (apache.org)

希望大家可以帮忙点个fork和stars,后面会持续更新这个连接器,欢迎大家试用,试用的时候遇到什么问题也可以给我反馈,或者在社区反馈,有什么好的想法也可以联系我哦。

1.使用案例和讲解

1.读取数据案例

CREATE TABLE orders (`order_id` STRING,`price` STRING,`order_time` STRING,PRIMARY KEY(order_id) NOT ENFORCED
) WITH ('connector' = 'redis','mode' = 'single','single.host' = '192.168.10.101','single.port' = '6379','password' = 'xxxxxx','command' = 'hgetall','key' = 'orders'
);select * from orders#集群模式
create table redis_sink (
site_id STRING,
inverter_id STRING,
start_time STRING,
PRIMARY KEY(site_id) NOT ENFORCED
) WITH (
'connector' = 'redis',
'mode' = 'cluster',
'cluster.nodes' = 'test3:7001,test3:7002,test3:7003,test3:8001,test3:8002,test3:8003',
'password' = '123123',
'command' = 'hgetall',
'key' = 'site_inverter'
)cluster.nodes用来定义集群ip和host,例如:host1:p1,host2:p2,host3:p3

注:redis表必须定义主键,可以是单个主键,也可以是联合主键

以下为sql读取结果,直接将redis数据解析成我们需要的表格形式

2.写入数据案例

1. generate source data
CREATE TABLE order_source (`order_number` BIGINT,`price` DECIMAL(32,2),`order_time` TIMESTAMP(3),PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
'connector' = 'datagen',
'number-of-rows' = '5',
'fields.order_number.min' = '1',
'fields.order_number.max' = '20',
'fields.price.min' = '1001',
'fields.price.max' = '1100'
);2. define redis sink table CREATE TABLE orders (`order_number` STRING,`price` STRING,`order_time` STRING,PRIMARY KEY(order_id) NOT ENFORCED
) WITH ('connector' = 'redis','mode' = 'single','single.host' = '192.168.10.101','single.port' = '6379','password' = 'xxxxxx','command' = 'hmset','key' = 'orders'
);3. insert data to redis sink table (cast data type to string)insert into redis_sinkselectcast(order_number as STRING) order_number,cast(price as STRING) price,cast(order_time as STRING) order_timefrom orders

redis表不会保存数据类型,所以在写入redis之前需要转成字符串类型,以下为写入redis数据的结果,redis的主键用 key + primary key + value 拼接而成,保证每条数据的唯一性,所以这也就要为什么redis table要定义主键

3.目前支持的功能 

1. 该connector目前支持多个写入和读取命令:

        读取:   get    hget     hgetall     hscan   lrange    smembers    zrange

        写入:   set   hset      hmset      lpush    rpush     sadd

2.针对最常用的hash类型数据支持模糊匹配,只输入表名可以查询整张表数据   

4. 连接参数说明

OptionRequiredDefaultTypeDescription
connectorrequirednoStringconnector name
moderequirednoStringredis cluster mode (single or cluster)
single.hostoptionalnoStringredis single mode machine host
single.portoptionalnointredis single mode running port
passwordoptionalnoStringredis database password
commandrequirednoStringredis write data or read data command
keyrequirednoStringredis key
expireoptionalnoIntset key ttl
fieldoptionalnoStringget a value with field when using hget command
cursoroptionalnoIntusing hscan command(e.g:1,2)
startoptional0Intread data when using lrange command
endoptional10Intread data when using lrange command
connection.max.wait-millsoptionalnoIntredis connection parameter
connection.timeout-msoptionalnoIntredis connection parameter
connection.max-totaloptionalnoIntredis connection parameter
connection.max-idleoptionalnoIntredis connection parameter
connection.test-on-borrowoptionalnoBooleanredis connection parameter
connection.test-on-returnoptionalnoBooleanredis connection parameter
connection.test-while-idleoptionalnoBooleanredis connection parameter
so.timeout-msoptionalnoIntredis connection parameter
max.attemptsoptionalnoIntredis connection parameter

2.动态读取和写入的工厂类

import org.apache.flink.common.RedisOptions;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.sink.RedisDynamicTableSink;
import org.apache.flink.source.RedisDynamicTableSource;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;public class RedisSourceSinkFactory implements DynamicTableSinkFactory, DynamicTableSourceFactory {private ReadableConfig options;public RedisSourceSinkFactory(){}public RedisSourceSinkFactory(ReadableConfig options){this.options = options;}//DynamicTableSourceFactory的实现方法,要用flink sql 读取数据需要实现这个接口@Overridepublic DynamicTableSource createDynamicTableSource(Context context) {FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);helper.validate();options = helper.getOptions();ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();List<Column> columns = schema.getColumns();ArrayList<String> columnNames = new ArrayList<>();columns.forEach(column -> columnNames.add(column.getName()));List<String> primaryKey = schema.getPrimaryKey().get().getColumns();return new RedisDynamicTableSource(options,columnNames,primaryKey);}/DynamicTableSinkFactory的实现方法,要用flink sql往redis中写数据这个也必须要实现@Overridepublic DynamicTableSink createDynamicTableSink(Context context) {FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);helper.validate();ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();List<Column> columns = schema.getColumns();ArrayList<String> columnNames = new ArrayList<>();columns.forEach(column -> columnNames.add(column.getName()));List<String> primaryKey = schema.getPrimaryKey().get().getColumns();ReadableConfig options = helper.getOptions();return new RedisDynamicTableSink(options,columnNames,primaryKey);}@Overridepublic String factoryIdentifier() {return "redis";}//sql connector 必填项@Overridepublic Set<ConfigOption<?>> requiredOptions() {HashSet<ConfigOption<?>> options = new HashSet<>();options.add(RedisOptions.PASSWORD);options.add(RedisOptions.KEY);options.add(RedisOptions.MODE);return options;}//sql connector 选填项@Overridepublic Set<ConfigOption<?>> optionalOptions() {HashSet<ConfigOption<?>> options = new HashSet<>();options.add(RedisOptions.SINGLE_HOST);options.add(RedisOptions.SINGLE_PORT);options.add(RedisOptions.CLUSTER_NODES);options.add(RedisOptions.FIELD);options.add(RedisOptions.CURSOR);options.add(RedisOptions.EXPIRE);options.add(RedisOptions.COMMAND);options.add(RedisOptions.START);options.add(RedisOptions.END);options.add(RedisOptions.CONNECTION_MAX_TOTAL);options.add(RedisOptions.CONNECTION_MAX_IDLE);options.add(RedisOptions.CONNECTION_TEST_WHILE_IDLE);options.add(RedisOptions.CONNECTION_TEST_ON_BORROW);options.add(RedisOptions.CONNECTION_TEST_ON_RETURN);options.add(RedisOptions.CONNECTION_TIMEOUT_MS);options.add(RedisOptions.TTL_SEC);options.add(RedisOptions.LOOKUP_ADDITIONAL_KEY);options.add(RedisOptions.LOOKUP_CACHE_MAX_ROWS);options.add(RedisOptions.LOOKUP_CACHE_TTL_SEC);return options;}

3. Redis Source 读取类

import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.SourceFunctionProvider;
import org.apache.flink.util.Preconditions;import java.util.List;public class RedisDynamicTableSource implements ScanTableSource {private ReadableConfig options;private List<String> primaryKey;private List<String> columns;public RedisDynamicTableSource(ReadableConfig options, List<String> columns, List<String> primaryKey) {this.options = Preconditions.checkNotNull(options);this.columns = Preconditions.checkNotNull(columns);this.primaryKey = Preconditions.checkNotNull(primaryKey);}@Overridepublic DynamicTableSource copy() {return new RedisDynamicTableSource(this.options, this.columns, this.primaryKey);}@Overridepublic String asSummaryString() {return "redis table source";}@Overridepublic ChangelogMode getChangelogMode() {return ChangelogMode.all();}@Overridepublic ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {RedisSourceFunction redisSourceFunction = new RedisSourceFunction(this.options, this.columns, this.primaryKey);return SourceFunctionProvider.of(redisSourceFunction,false);}
}

支持redis string, set ,zset ,hash数据的读取并解析成rowdata传入 flink

import org.apache.flink.common.RedisClusterMode;
import org.apache.flink.common.RedisCommandOptions;
import org.apache.flink.common.RedisOptions;
import org.apache.flink.common.RedisSplitSymbol;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryStringData;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.RedisUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.ScanResult;import java.util.*;public class RedisSourceFunction extends RichSourceFunction<RowData>{private static final Logger LOG = LoggerFactory.getLogger(RedisSourceFunction.class);private ReadableConfig options;private List<String> primaryKey;private List<String> columns;private Jedis jedis;private JedisCluster jedisCluster;private String value;private String field;private String[] fields;private String cursor;private Integer start;private Integer end;private String[] keySplit;private static int position = 1;private GenericRowData rowData;public RedisSourceFunction(ReadableConfig options, List<String> columns, List<String> primaryKey){this.options = Preconditions.checkNotNull(options);this.columns = Preconditions.checkNotNull(columns);this.primaryKey = Preconditions.checkNotNull(primaryKey);}@Overridepublic void run(SourceContext<RowData> ctx) throws Exception {String password = options.get(RedisOptions.PASSWORD);Preconditions.checkNotNull(password,"password is null,please set value for password");Integer expire = options.get(RedisOptions.EXPIRE);String key = options.get(RedisOptions.KEY);Preconditions.checkNotNull(key,"key is null,please set value for key");String[] keyArr = key.split(RedisSplitSymbol.CLUSTER_NODES_SPLIT);String command = options.get(RedisOptions.COMMAND);// judge if command is redis set data command and stop methodList<String> sourceCommand = Arrays.asList(RedisCommandOptions.SET, RedisCommandOptions.HSET, RedisCommandOptions.HMSET, RedisCommandOptions.LPUSH,RedisCommandOptions.RPUSH, RedisCommandOptions.SADD);if(sourceCommand.contains(command.toUpperCase())){ return;}Preconditions.checkNotNull(command,"command is null,please set value for command");String mode = options.get(RedisOptions.MODE);Preconditions.checkNotNull(command,"mode is null,please set value for mode");Integer maxIdle = options.get(RedisOptions.CONNECTION_MAX_IDLE);Integer maxTotal = options.get(RedisOptions.CONNECTION_MAX_TOTAL);Integer maxWaitMills = options.get(RedisOptions.CONNECTION_MAX_WAIT_MILLS);Boolean testOnBorrow = options.get(RedisOptions.CONNECTION_TEST_ON_BORROW);Boolean testOnReturn = options.get(RedisOptions.CONNECTION_TEST_ON_RETURN);Boolean testWhileIdle = options.get(RedisOptions.CONNECTION_TEST_WHILE_IDLE);if(mode.toUpperCase().equals(RedisClusterMode.SINGLE.name())){String host = options.get(RedisOptions.SINGLE_HOST);Integer port = options.get(RedisOptions.SINGLE_PORT);JedisPool jedisPool = RedisUtil.getSingleJedisPool(mode, host, port, maxTotal,maxIdle, maxWaitMills, testOnBorrow, testOnReturn, testWhileIdle);jedis = jedisPool.getResource();jedis.auth(password);switch (command.toUpperCase()){case RedisCommandOptions.GET:value = jedis.get(key);rowData = new GenericRowData(2);rowData.setField(0,BinaryStringData.fromString(key));rowData.setField(1,BinaryStringData.fromString(value));break;case RedisCommandOptions.HGET:field = options.get(RedisOptions.FIELD);value = jedis.hget(key, field);rowData = new GenericRowData(3);keySplit = key.split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);for (int i = 0; i < primaryKey.size(); i++) {rowData.setField(i,BinaryStringData.fromString(keyArr[2 * primaryKey.size()]));}rowData.setField(primaryKey.size(),BinaryStringData.fromString(value));break;case RedisCommandOptions.HGETALL:if (keyArr.length > 1){for (String str : keyArr) {rowData = new GenericRowData(columns.size());keySplit = str.split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);for (int i = 0; i < primaryKey.size(); i++) {rowData.setField(i,BinaryStringData.fromString(keySplit[2 * primaryKey.size()]));}for (int i = primaryKey.size(); i < columns.size(); i++) {String value = jedis.hget(str, columns.get(i));rowData.setField(i,BinaryStringData.fromString(value));}ctx.collect(rowData);}}else if(key.split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT).length == (primaryKey.size() * 2 + 1)){rowData = new GenericRowData(columns.size());keySplit = key.split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);for (int i = 0; i < primaryKey.size(); i++) {rowData.setField(i,BinaryStringData.fromString(keySplit[2 * primaryKey.size()]));}for (int i = primaryKey.size(); i < columns.size(); i++) {String value = jedis.hget(key, columns.get(i));rowData.setField(i,BinaryStringData.fromString(value));}ctx.collect(rowData);}else{//Fuzzy matching ,gets the data of the entire tableString fuzzyKey = new StringBuffer(key).append("*").toString();Set<String> keys = jedis.keys(fuzzyKey);for (String keyStr : keys) {keySplit = keyStr.split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);rowData = new GenericRowData(columns.size());for (int i = 0; i < primaryKey.size(); i++) {rowData.setField(i,BinaryStringData.fromString(keySplit[2 * primaryKey.size()]));}for (int i = primaryKey.size(); i < columns.size(); i++) {String value = jedis.hget(keyStr, columns.get(i));rowData.setField(i,BinaryStringData.fromString(value));}ctx.collect(rowData);}}break;case RedisCommandOptions.HSCAN:cursor = options.get(RedisOptions.CURSOR);ScanResult<Map.Entry<String, String>> entries = jedis.hscan(key, cursor);List<Map.Entry<String, String>> result = entries.getResult();keySplit = key.split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);rowData = new GenericRowData(columns.size());for (int i = 0; i < primaryKey.size(); i++) {rowData.setField(i,BinaryStringData.fromString(keySplit[2 * primaryKey.size()]));}position = primaryKey.size();for (int i = 0; i < result.size(); i++) {value = result.get(i).getValue();rowData.setField(position,BinaryStringData.fromString(value));position++;}break;case RedisCommandOptions.LRANGE:start = options.get(RedisOptions.START);end = options.get(RedisOptions.END);List<String> list = jedis.lrange(key, start, end);rowData = new GenericRowData(list.size() +1);rowData.setField(0,BinaryStringData.fromString(key));list.forEach(s -> {rowData.setField(position,BinaryStringData.fromString(s));position++;});break;case RedisCommandOptions.SMEMBERS:Set<String> smembers = jedis.smembers(key);rowData = new GenericRowData(smembers.size() +1);rowData.setField(0,BinaryStringData.fromString(key));smembers.forEach(s -> {rowData.setField(position,BinaryStringData.fromString(s));position++;});break;case RedisCommandOptions.ZRANGE:start = options.get(RedisOptions.START);end = options.get(RedisOptions.END);Set<String> sets = jedis.zrange(key, start, end);rowData = new GenericRowData(sets.size() +1);rowData.setField(0,BinaryStringData.fromString(key));sets.forEach(s -> {rowData.setField(position,BinaryStringData.fromString(s));position++;});break;default:LOG.error("Cannot process such data type: {}", command);break;}if(!command.toUpperCase().equals(RedisCommandOptions.HGETALL)){ctx.collect(rowData);}}else if(mode.toUpperCase().equals(RedisClusterMode.CLUSTER.name())){String nodes = options.get(RedisOptions.CLUSTER_NODES);String[] hostAndPorts = nodes.split(RedisSplitSymbol.CLUSTER_NODES_SPLIT);String[] host = new String[hostAndPorts.length];int[] port = new int[hostAndPorts.length];for (int i = 0; i < hostAndPorts.length; i++) {String[] splits = hostAndPorts[i].split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);host[i] = splits[0];port[i] = Integer.parseInt(splits[1]);}Integer connTimeOut = options.get(RedisOptions.CONNECTION_TIMEOUT_MS);Integer soTimeOut = options.get(RedisOptions.SO_TIMEOUT_MS);Integer maxAttempts = options.get(RedisOptions.MAX_ATTEMPTS);jedisCluster = RedisUtil.getJedisCluster(mode, host, password, port, maxTotal,maxIdle, maxWaitMills, connTimeOut, soTimeOut, maxAttempts, testOnBorrow, testOnReturn, testWhileIdle);switch (command.toUpperCase()){case RedisCommandOptions.GET:value = jedisCluster.get(key);rowData = new GenericRowData(2);rowData.setField(0,BinaryStringData.fromString(key));rowData.setField(1,BinaryStringData.fromString(value));break;case RedisCommandOptions.HGET:field = options.get(RedisOptions.FIELD);value = jedisCluster.hget(key, field);rowData = new GenericRowData(3);keySplit = key.split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);for (int i = 0; i < primaryKey.size(); i++) {rowData.setField(i,BinaryStringData.fromString(keyArr[2 * primaryKey.size()]));}rowData.setField(primaryKey.size(),BinaryStringData.fromString(value));break;case RedisCommandOptions.HGETALL:if (keyArr.length > 1){for (String str : keyArr) {rowData = new GenericRowData(columns.size());keySplit = str.split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);for (int i = 0; i < primaryKey.size(); i++) {rowData.setField(i,BinaryStringData.fromString(keySplit[2 * primaryKey.size()]));}for (int i = primaryKey.size(); i < columns.size(); i++) {String value = jedisCluster.hget(str, columns.get(i));rowData.setField(i,BinaryStringData.fromString(value));}ctx.collect(rowData);}}else if(key.split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT).length == (primaryKey.size() * 2 + 1)){rowData = new GenericRowData(columns.size());keySplit = key.split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);for (int i = 0; i < primaryKey.size(); i++) {rowData.setField(i,BinaryStringData.fromString(keySplit[2 * primaryKey.size()]));}for (int i = primaryKey.size(); i < columns.size(); i++) {String value = jedisCluster.hget(key, columns.get(i));rowData.setField(i,BinaryStringData.fromString(value));}ctx.collect(rowData);}else{//Fuzzy matching ,gets the data of the entire tableString fuzzyKey = new StringBuffer(key).append("*").toString();Set<String> keys = jedisCluster.keys(fuzzyKey);for (String keyStr : keys) {keySplit = keyStr.split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);rowData = new GenericRowData(columns.size());for (int i = 0; i < primaryKey.size(); i++) {rowData.setField(i,BinaryStringData.fromString(keySplit[2 * primaryKey.size()]));}for (int i = primaryKey.size(); i < columns.size(); i++) {String value = jedisCluster.hget(keyStr, columns.get(i));rowData.setField(i,BinaryStringData.fromString(value));}ctx.collect(rowData);}}break;case RedisCommandOptions.HSCAN:cursor = options.get(RedisOptions.CURSOR);ScanResult<Map.Entry<String, String>> entries = jedisCluster.hscan(key, cursor);List<Map.Entry<String, String>> result = entries.getResult();keySplit = key.split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);rowData = new GenericRowData(columns.size());for (int i = 0; i < primaryKey.size(); i++) {rowData.setField(i,BinaryStringData.fromString(keySplit[2 * primaryKey.size()]));}position = primaryKey.size();for (int i = 0; i < result.size(); i++) {value = result.get(i).getValue();rowData.setField(position,BinaryStringData.fromString(value));position++;}break;case RedisCommandOptions.LRANGE:start = options.get(RedisOptions.START);end = options.get(RedisOptions.END);List<String> list = jedisCluster.lrange(key, start, end);rowData = new GenericRowData(list.size() +1);rowData.setField(0,BinaryStringData.fromString(key));list.forEach(s -> {rowData.setField(position,BinaryStringData.fromString(s));position++;});break;case RedisCommandOptions.SMEMBERS:Set<String> smembers = jedisCluster.smembers(key);rowData = new GenericRowData(smembers.size() +1);rowData.setField(0,BinaryStringData.fromString(key));smembers.forEach(s -> {rowData.setField(position,BinaryStringData.fromString(s));position++;});break;case RedisCommandOptions.ZRANGE:start = options.get(RedisOptions.START);end = options.get(RedisOptions.END);Set<String> sets = jedisCluster.zrange(key, start, end);rowData = new GenericRowData(sets.size() +1);rowData.setField(0,BinaryStringData.fromString(key));sets.forEach(s -> {rowData.setField(position,BinaryStringData.fromString(s));position++;});break;default:LOG.error("Cannot process such data type: {}", command);break;}if(!command.toUpperCase().equals(RedisCommandOptions.HGETALL)){ctx.collect(rowData);}}else{LOG.error("Unsupport such {} mode",mode);}}@Overridepublic void cancel() {if(jedis != null){jedis.close();}if(jedisCluster != null){jedisCluster.close();}}
}

4. Redis sink 写入类

public class RedisDynamicTableSink implements DynamicTableSink {private static final long serialVersionUID = 1L;private static final Logger LOG = LoggerFactory.getLogger(RedisDynamicTableSink.class);private ReadableConfig options;private List<String> primaryKey;private List<String> columns;public RedisDynamicTableSink(ReadableConfig options, List<String> columns, List<String> primaryKey) {this.options = Preconditions.checkNotNull(options);this.columns = Preconditions.checkNotNull(columns);this.primaryKey = Preconditions.checkNotNull(primaryKey);}@Overridepublic ChangelogMode getChangelogMode(ChangelogMode changelogMode) {return ChangelogMode.newBuilder().addContainedKind(RowKind.INSERT).addContainedKind(RowKind.DELETE).addContainedKind(RowKind.UPDATE_BEFORE).addContainedKind(RowKind.UPDATE_AFTER).build();}@Overridepublic SinkRuntimeProvider getSinkRuntimeProvider(Context context) {RedisSinkFunction myRedisSinkFunction = new RedisSinkFunction(this.options,this.columns,this.primaryKey);return SinkFunctionProvider.of(myRedisSinkFunction);}@Overridepublic DynamicTableSink copy() {return new RedisDynamicTableSink(this.options,this.columns,this.primaryKey);}@Overridepublic String asSummaryString() {return "redis table sink";}
}
package org.apache.flink.sink;import org.apache.flink.common.RedisClusterMode;
import org.apache.flink.common.RedisCommandOptions;
import org.apache.flink.common.RedisOptions;
import org.apache.flink.common.RedisSplitSymbol;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.table.data.RowData;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.RedisUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisPool;import java.util.List;public class RedisSinkFunction extends RichSinkFunction<RowData>{private static final long serialVersionUID = 1L;private static final Logger LOG = LoggerFactory.getLogger(RedisSinkFunction.class);private ReadableConfig options;private List<String> primaryKey;private List<String> columns;private String fields;private Jedis jedis;private JedisCluster jedisCluster;private String[] fieldsArr;private StringBuffer redisTableKey;private String value;public RedisSinkFunction(ReadableConfig options, List<String> columns, List<String> primaryKey){this.options = Preconditions.checkNotNull(options);this.columns = Preconditions.checkNotNull(columns);this.primaryKey = Preconditions.checkNotNull(primaryKey);}@Overridepublic void invoke(RowData rowData, Context context) throws Exception {String password = options.get(RedisOptions.PASSWORD);Preconditions.checkNotNull(password,"password is null,please set value for password");Integer expire = options.get(RedisOptions.EXPIRE);String key = options.get(RedisOptions.KEY);Preconditions.checkNotNull(key,"key is null,please set value for key");String command = options.get(RedisOptions.COMMAND);Preconditions.checkNotNull(command,"command is null,please set value for command");String mode = options.get(RedisOptions.MODE);Preconditions.checkNotNull(command,"mode is null,please set value for mode");Integer maxIdle = options.get(RedisOptions.CONNECTION_MAX_IDLE);Integer maxTotal = options.get(RedisOptions.CONNECTION_MAX_TOTAL);Integer maxWaitMills = options.get(RedisOptions.CONNECTION_MAX_WAIT_MILLS);Boolean testOnBorrow = options.get(RedisOptions.CONNECTION_TEST_ON_BORROW);Boolean testOnReturn = options.get(RedisOptions.CONNECTION_TEST_ON_RETURN);Boolean testWhileIdle = options.get(RedisOptions.CONNECTION_TEST_WHILE_IDLE);if (mode.toUpperCase().equals(RedisClusterMode.SINGLE.name())) {String host = options.get(RedisOptions.SINGLE_HOST);Integer port = options.get(RedisOptions.SINGLE_PORT);JedisPool jedisPool = RedisUtil.getSingleJedisPool(mode, host, port, maxTotal,maxIdle, maxWaitMills, testOnBorrow, testOnReturn, testWhileIdle);jedis = jedisPool.getResource();jedis.auth(password);switch (command.toUpperCase()){case RedisCommandOptions.SET:value = rowData.getString(0).toString();jedis.set(String.valueOf(key),String.valueOf(value));break;case RedisCommandOptions.HSET:String field = columns.get(1);//construct redis key:table_name:primary key col name: primary key valueredisTableKey = new StringBuffer(key).append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);for (int i = 0; i < primaryKey.size(); i++) {if(primaryKey.size() <= 1){redisTableKey.append(primaryKey.get(i)).append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);redisTableKey.append(rowData.getString(i).toString());break;}else{redisTableKey.append(primaryKey.get(i)).append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);redisTableKey.append(rowData.getString(i).toString());}redisTableKey.append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);}value = rowData.getString(1).toString();jedis.hset(String.valueOf(redisTableKey),String.valueOf(field),String.valueOf(value));case RedisCommandOptions.HMSET://construct redis key:table_name:primary key col name: primary key valueredisTableKey = new StringBuffer(key).append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);for (int i = 0; i < primaryKey.size(); i++) {if(primaryKey.size() <= 1){redisTableKey.append(primaryKey.get(i)).append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);redisTableKey.append(rowData.getString(i).toString());break;}else{redisTableKey.append(primaryKey.get(i)).append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);redisTableKey.append(rowData.getString(i).toString());}if (i != primaryKey.size() -1){redisTableKey.append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);}}for (int i = 1; i < columns.size(); i++) {if (!primaryKey.contains(columns.get(i))){value = rowData.getString(i).toString();jedis.hset(String.valueOf(redisTableKey),String.valueOf(columns.get(i)),String.valueOf(value));}}break;case RedisCommandOptions.LPUSH:value = rowData.getString(0).toString();jedis.lpush(key,value);break;case RedisCommandOptions.RPUSH:value = rowData.getString(0).toString();jedis.rpush(key,value);break;case RedisCommandOptions.SADD:value = rowData.getString(0).toString();jedis.sadd(key,value);break;default:LOG.error("Cannot process such data type: {}", command);break;}}else if(mode.toUpperCase().equals(RedisClusterMode.CLUSTER.name())){String nodes = options.get(RedisOptions.CLUSTER_NODES);String[] hostAndPorts = nodes.split(RedisSplitSymbol.CLUSTER_NODES_SPLIT);String[] host = new String[hostAndPorts.length];int[] port = new int[hostAndPorts.length];for (int i = 0; i < hostAndPorts.length; i++) {String[] splits = hostAndPorts[i].split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);host[i] = splits[0];port[i] = Integer.parseInt(splits[1]);}Integer connTimeOut = options.get(RedisOptions.CONNECTION_TIMEOUT_MS);Integer soTimeOut = options.get(RedisOptions.SO_TIMEOUT_MS);Integer maxAttempts = options.get(RedisOptions.MAX_ATTEMPTS);jedisCluster = RedisUtil.getJedisCluster(mode, host, password, port, maxTotal,maxIdle, maxWaitMills, connTimeOut, soTimeOut, maxAttempts, testOnBorrow, testOnReturn, testWhileIdle);switch (command.toUpperCase()){case RedisCommandOptions.SET:value = rowData.getString(0).toString();jedisCluster.set(String.valueOf(key),String.valueOf(value));break;case RedisCommandOptions.HSET:String field = columns.get(1);//construct redis key:table_name:primary key col name: primary key valueredisTableKey = new StringBuffer(key).append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);for (int i = 0; i < primaryKey.size(); i++) {if(primaryKey.size() <= 1){redisTableKey.append(primaryKey.get(i)).append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);redisTableKey.append(rowData.getString(i).toString());break;}else{redisTableKey.append(primaryKey.get(i)).append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);redisTableKey.append(rowData.getString(i).toString());}redisTableKey.append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);}value = rowData.getString(1).toString();jedisCluster.hset(String.valueOf(redisTableKey),String.valueOf(field),String.valueOf(value));case RedisCommandOptions.HMSET://construct redis key:table_name:primary key col name: primary key valueredisTableKey = new StringBuffer(key).append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);for (int i = 0; i < primaryKey.size(); i++) {if(primaryKey.size() <= 1){redisTableKey.append(primaryKey.get(i)).append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);redisTableKey.append(rowData.getString(i).toString());break;}else{redisTableKey.append(primaryKey.get(i)).append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);redisTableKey.append(rowData.getString(i).toString());}redisTableKey.append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);}for (int i = 1; i < columns.size(); i++) {value = rowData.getString(i).toString();jedisCluster.hset(String.valueOf(redisTableKey),String.valueOf(columns.get(i)),String.valueOf(value));}break;case RedisCommandOptions.LPUSH:value = rowData.getString(0).toString();jedisCluster.lpush(key,value);break;case RedisCommandOptions.RPUSH:value = rowData.getString(0).toString();jedisCluster.rpush(key,value);break;case RedisCommandOptions.SADD:value = rowData.getString(0).toString();jedisCluster.sadd(key,value);break;default:LOG.error("Cannot process such data type: {}", command);break;}}else{LOG.error("Unsupport such {} mode",mode);}}@Overridepublic void close() throws Exception {if(jedis != null){jedis.close();}if(jedisCluster != null){jedisCluster.close();}}
}

对以上代码不理解为啥这样写的,可以参考我的上一篇帖子:

Flink Sql-用户自定义 Sources & Sinks_source表和sink表-CSDN博客

 最后再次希望大家可以去github或者社区支持一下,让这个连接器可以正式开源

相关文章:

  • 数据结构之B数
  • 在JPA项目启动时新增MySQL字段
  • 华为欧拉 openEuler24.03 更新 阿里 yum源
  • 算是一些Transformer学习当中的重点内容
  • suuk-s.php.jpg-python 库劫持
  • 北京宠物美容护理app,化身奇迹“萌”宠
  • 【Java】Java基础语法
  • 使用Python进行自然语言处理:从基础到实战
  • Python开发日记--手撸加解密小工具(2)
  • 数组元素去重
  • WHAT - NextJS 系列之 Rendering - Server Rendering Strategies
  • @PostConstruct 注解的方法用于资源的初始化
  • HTML(12)——背景属性
  • 图解注意力
  • kafka的单机、集群部署安装
  • [分享]iOS开发 - 实现UITableView Plain SectionView和table不停留一起滑动
  • 【技术性】Search知识
  • DataBase in Android
  • docker容器内的网络抓包
  • JavaScript 是如何工作的:WebRTC 和对等网络的机制!
  • JavaScript实现分页效果
  • JS实现简单的MVC模式开发小游戏
  • JS专题之继承
  • 彻底搞懂浏览器Event-loop
  • 从伪并行的 Python 多线程说起
  • 分布式任务队列Celery
  • 配置 PM2 实现代码自动发布
  • 前端设计模式
  • 入职第二天:使用koa搭建node server是种怎样的体验
  • 提升用户体验的利器——使用Vue-Occupy实现占位效果
  • 微信小程序开发问题汇总
  • 小程序滚动组件,左边导航栏与右边内容联动效果实现
  • PostgreSQL之连接数修改
  • Unity3D - 异步加载游戏场景与异步加载游戏资源进度条 ...
  • 如何通过报表单元格右键控制报表跳转到不同链接地址 ...
  • ​TypeScript都不会用,也敢说会前端?
  • #Ubuntu(修改root信息)
  • #单片机(TB6600驱动42步进电机)
  • (04)Hive的相关概念——order by 、sort by、distribute by 、cluster by
  • (二)七种元启发算法(DBO、LO、SWO、COA、LSO、KOA、GRO)求解无人机路径规划MATLAB
  • (附源码)ssm基于jsp高校选课系统 毕业设计 291627
  • .naturalWidth 和naturalHeight属性,
  • .net framework profiles /.net framework 配置
  • .Net 知识杂记
  • .NET6 开发一个检查某些状态持续多长时间的类
  • .net企业级架构实战之7——Spring.net整合Asp.net mvc
  • .Net中wcf服务生成及调用
  • /proc/interrupts 和 /proc/stat 查看中断的情况
  • @RequestBody详解:用于获取请求体中的Json格式参数
  • [ NOI 2001 ] 食物链
  • [ 英语 ] 马斯克抱水槽“入主”推特总部中那句 Let that sink in 到底是什么梗?
  • [100天算法】-每个元音包含偶数次的最长子字符串(day 53)
  • [12] 使用 CUDA 进行图像处理
  • [2024] 十大免费电脑数据恢复软件——轻松恢复电脑上已删除文件
  • [2669]2-2 Time类的定义