当前位置: 首页 > news >正文

聊聊flink的TableFactory

本文主要研究一下flink的TableFactory

实例

class MySystemTableSourceFactory implements StreamTableSourceFactory<Row> {

  @Override
  public Map<String, String> requiredContext() {
    Map<String, String> context = new HashMap<>();
    context.put("update-mode", "append");
    context.put("connector.type", "my-system");
    return context;
  }

  @Override
  public List<String> supportedProperties() {
    List<String> list = new ArrayList<>();
    list.add("connector.debug");
    return list;
  }

  @Override
  public StreamTableSource<Row> createStreamTableSource(Map<String, String> properties) {
    boolean isDebug = Boolean.valueOf(properties.get("connector.debug"));

    # additional validation of the passed properties can also happen here

    return new MySystemAppendTableSource(isDebug);
  }
}

public class MySystemConnector extends ConnectorDescriptor {

  public final boolean isDebug;

  public MySystemConnector(boolean isDebug) {
    super("my-system", 1, false);
    this.isDebug = isDebug;
  }

  @Override
  protected Map<String, String> toConnectorProperties() {
    Map<String, String> properties = new HashMap<>();
    properties.put("connector.debug", Boolean.toString(isDebug));
    return properties;
  }
}
  • 本实例定义了MySystemTableSourceFactory,它的requiredContext为update-mode=append及connector.type=my-system,它的supportedProperties为connector.debug,它的createStreamTableSource方法创建的是MySystemAppendTableSource;MySystemConnector继承了ConnectorDescriptor,它定义了connector.type值为my-system,connector.property-version值为1,formatNeeded属性为false,其toConnectorProperties定义了connector.debug的值

TableFactory

flink-table-common-1.7.1-sources.jar!/org/apache/flink/table/factories/TableFactory.java

@PublicEvolving
public interface TableFactory {

    Map<String, String> requiredContext();

    List<String> supportedProperties();
}
  • TableFactory定义了requiredContext及supportedProperties两个方法,其中requiredContext用于进行factory的匹配,supportedProperties用于指定factory支持的属性,如果传入了factory不支持的属性则会抛出异常

BatchTableSourceFactory

flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/factories/BatchTableSourceFactory.scala

trait BatchTableSourceFactory[T] extends TableFactory {

  def createBatchTableSource(properties: util.Map[String, String]): BatchTableSource[T]
}
  • BatchTableSourceFactory继承了TableFactory,定义了createBatchTableSource方法

BatchTableSinkFactory

flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/factories/BatchTableSinkFactory.scala

trait BatchTableSinkFactory[T] extends TableFactory {

  def createBatchTableSink(properties: util.Map[String, String]): BatchTableSink[T]
}
  • BatchTableSinkFactory继承了TableFactory,定义了createBatchTableSink方法

StreamTableSourceFactory

flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/factories/StreamTableSourceFactory.scala

trait StreamTableSourceFactory[T] extends TableFactory {

  def createStreamTableSource(properties: util.Map[String, String]): StreamTableSource[T]
}
  • StreamTableSourceFactory继承了TableFactory,定义了createStreamTableSource方法

StreamTableSinkFactory

flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/factories/StreamTableSinkFactory.scala

trait StreamTableSinkFactory[T] extends TableFactory {

  def createStreamTableSink(properties: util.Map[String, String]): StreamTableSink[T]
}
  • StreamTableSinkFactory继承了TableFactory,定义了createStreamTableSink方法

ConnectorDescriptor

flink-table-common-1.7.1-sources.jar!/org/apache/flink/table/descriptors/ConnectorDescriptor.java

@PublicEvolving
public abstract class ConnectorDescriptor extends DescriptorBase implements Descriptor {

    private String type;

    private int version;

    private boolean formatNeeded;

    /**
     * Constructs a {@link ConnectorDescriptor}.
     *
     * @param type string that identifies this connector
     * @param version property version for backwards compatibility
     * @param formatNeeded flag for basic validation of a needed format descriptor
     */
    public ConnectorDescriptor(String type, int version, boolean formatNeeded) {
        this.type = type;
        this.version = version;
        this.formatNeeded = formatNeeded;
    }

    @Override
    public final Map<String, String> toProperties() {
        final DescriptorProperties properties = new DescriptorProperties();
        properties.putString(CONNECTOR_TYPE, type);
        properties.putLong(CONNECTOR_PROPERTY_VERSION, version);
        properties.putProperties(toConnectorProperties());
        return properties.asMap();
    }

    /**
     * Returns if this connector requires a format descriptor.
     */
    protected final boolean isFormatNeeded() {
        return formatNeeded;
    }

    /**
     * Converts this descriptor into a set of connector properties. Usually prefixed with
     * {@link FormatDescriptorValidator#FORMAT}.
     */
    protected abstract Map<String, String> toConnectorProperties();
}
  • ConnectorDescriptor继承了DescriptorBase,实现了Descriptor接口,它重写了Descriptor接口的toProperties方法,定义内置属性CONNECTOR_TYPE及CONNECTOR_PROPERTY_VERSION,之后还通过抽象方法toConnectorProperties来合并子类定义的属性

TableFactoryUtil

flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/factories/TableFactoryUtil.scala

object TableFactoryUtil {

  /**
    * Returns a table source for a table environment.
    */
  def findAndCreateTableSource[T](
      tableEnvironment: TableEnvironment,
      descriptor: Descriptor)
    : TableSource[T] = {

    val javaMap = descriptor.toProperties

    tableEnvironment match {
      case _: BatchTableEnvironment =>
        TableFactoryService
          .find(classOf[BatchTableSourceFactory[T]], javaMap)
          .createBatchTableSource(javaMap)

      case _: StreamTableEnvironment =>
        TableFactoryService
          .find(classOf[StreamTableSourceFactory[T]], javaMap)
          .createStreamTableSource(javaMap)

      case e@_ =>
        throw new TableException(s"Unsupported table environment: ${e.getClass.getName}")
    }
  }

  /**
    * Returns a table sink for a table environment.
    */
  def findAndCreateTableSink[T](
      tableEnvironment: TableEnvironment,
      descriptor: Descriptor)
    : TableSink[T] = {

    val javaMap = descriptor.toProperties

    tableEnvironment match {
      case _: BatchTableEnvironment =>
        TableFactoryService
          .find(classOf[BatchTableSinkFactory[T]], javaMap)
          .createBatchTableSink(javaMap)

      case _: StreamTableEnvironment =>
        TableFactoryService
          .find(classOf[StreamTableSinkFactory[T]], javaMap)
          .createStreamTableSink(javaMap)

      case e@_ =>
        throw new TableException(s"Unsupported table environment: ${e.getClass.getName}")
    }
  }
}
  • TableFactoryUtil是个工具类,主要用于根据指定的TableEnvironment及Descriptor来创建TableSource或TableSink;它内部利用TableFactoryService,使用descriptor.toProperties来寻找对应的TableFactory

TableFactoryService

flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/factories/TableFactoryService.scala

object TableFactoryService extends Logging {

  private lazy val defaultLoader = ServiceLoader.load(classOf[TableFactory])

  /**
    * Finds a table factory of the given class and descriptor.
    *
    * @param factoryClass desired factory class
    * @param descriptor descriptor describing the factory configuration
    * @tparam T factory class type
    * @return the matching factory
    */
  def find[T](factoryClass: Class[T], descriptor: Descriptor): T = {
    Preconditions.checkNotNull(descriptor)

    findInternal(factoryClass, descriptor.toProperties, None)
  }

  /**
    * Finds a table factory of the given class, descriptor, and classloader.
    *
    * @param factoryClass desired factory class
    * @param descriptor descriptor describing the factory configuration
    * @param classLoader classloader for service loading
    * @tparam T factory class type
    * @return the matching factory
    */
  def find[T](factoryClass: Class[T], descriptor: Descriptor, classLoader: ClassLoader): T = {
    Preconditions.checkNotNull(descriptor)
    Preconditions.checkNotNull(classLoader)

    findInternal(factoryClass, descriptor.toProperties, Some(classLoader))
  }

  /**
    * Finds a table factory of the given class and property map.
    *
    * @param factoryClass desired factory class
    * @param propertyMap properties that describe the factory configuration
    * @tparam T factory class type
    * @return the matching factory
    */
  def find[T](factoryClass: Class[T], propertyMap: JMap[String, String]): T = {
    findInternal(factoryClass, propertyMap, None)
  }

  /**
    * Finds a table factory of the given class, property map, and classloader.
    *
    * @param factoryClass desired factory class
    * @param propertyMap properties that describe the factory configuration
    * @param classLoader classloader for service loading
    * @tparam T factory class type
    * @return the matching factory
    */
  def find[T](
      factoryClass: Class[T],
      propertyMap: JMap[String, String],
      classLoader: ClassLoader)
    : T = {
    Preconditions.checkNotNull(classLoader)

    findInternal(factoryClass, propertyMap, Some(classLoader))
  }

  /**
    * Finds a table factory of the given class, property map, and classloader.
    *
    * @param factoryClass desired factory class
    * @param propertyMap properties that describe the factory configuration
    * @param classLoader classloader for service loading
    * @tparam T factory class type
    * @return the matching factory
    */
  private def findInternal[T](
      factoryClass: Class[T],
      propertyMap: JMap[String, String],
      classLoader: Option[ClassLoader])
    : T = {

    Preconditions.checkNotNull(factoryClass)
    Preconditions.checkNotNull(propertyMap)

    val properties = propertyMap.asScala.toMap

    val foundFactories = discoverFactories(classLoader)

    val classFactories = filterByFactoryClass(
      factoryClass,
      properties,
      foundFactories)

    val contextFactories = filterByContext(
      factoryClass,
      properties,
      foundFactories,
      classFactories)

    filterBySupportedProperties(
      factoryClass,
      properties,
      foundFactories,
      contextFactories)
  }

  /**
    * Searches for factories using Java service providers.
    *
    * @return all factories in the classpath
    */
  private def discoverFactories[T](classLoader: Option[ClassLoader]): Seq[TableFactory] = {
    try {
      val iterator = classLoader match {
        case Some(customClassLoader) =>
          val customLoader = ServiceLoader.load(classOf[TableFactory], customClassLoader)
          customLoader.iterator()
        case None =>
          defaultLoader.iterator()
      }
      iterator.asScala.toSeq
    } catch {
      case e: ServiceConfigurationError =>
        LOG.error("Could not load service provider for table factories.", e)
        throw new TableException("Could not load service provider for table factories.", e)
    }
  }

  /**
    * Filters factories with matching context by factory class.
    */
  private def filterByFactoryClass[T](
      factoryClass: Class[T],
      properties: Map[String, String],
      foundFactories: Seq[TableFactory])
    : Seq[TableFactory] = {

    val classFactories = foundFactories.filter(f => factoryClass.isAssignableFrom(f.getClass))
    if (classFactories.isEmpty) {
      throw new NoMatchingTableFactoryException(
        s"No factory implements '${factoryClass.getCanonicalName}'.",
        factoryClass,
        foundFactories,
        properties)
    }
    classFactories
  }

  /**
    * Filters for factories with matching context.
    *
    * @return all matching factories
    */
  private def filterByContext[T](
      factoryClass: Class[T],
      properties: Map[String, String],
      foundFactories: Seq[TableFactory],
      classFactories: Seq[TableFactory])
    : Seq[TableFactory] = {

    val matchingFactories = classFactories.filter { factory =>
      val requestedContext = normalizeContext(factory)

      val plainContext = mutable.Map[String, String]()
      plainContext ++= requestedContext
      // we remove the version for now until we have the first backwards compatibility case
      // with the version we can provide mappings in case the format changes
      plainContext.remove(CONNECTOR_PROPERTY_VERSION)
      plainContext.remove(FORMAT_PROPERTY_VERSION)
      plainContext.remove(METADATA_PROPERTY_VERSION)
      plainContext.remove(STATISTICS_PROPERTY_VERSION)

      // check if required context is met
      plainContext.forall(e => properties.contains(e._1) && properties(e._1) == e._2)
    }

    if (matchingFactories.isEmpty) {
      throw new NoMatchingTableFactoryException(
        "No context matches.",
        factoryClass,
        foundFactories,
        properties)
    }

    matchingFactories
  }

  /**
    * Prepares the properties of a context to be used for match operations.
    */
  private def normalizeContext(factory: TableFactory): Map[String, String] = {
    val requiredContextJava = factory.requiredContext()
    if (requiredContextJava == null) {
      throw new TableException(
        s"Required context of factory '${factory.getClass.getName}' must not be null.")
    }
    requiredContextJava.asScala.map(e => (e._1.toLowerCase, e._2)).toMap
  }

  /**
    * Filters the matching class factories by supported properties.
    */
  private def filterBySupportedProperties[T](
      factoryClass: Class[T],
      properties: Map[String, String],
      foundFactories: Seq[TableFactory],
      classFactories: Seq[TableFactory])
    : T = {

    val plainGivenKeys = mutable.ArrayBuffer[String]()
    properties.keys.foreach { k =>
      // replace arrays with wildcard
      val key = k.replaceAll(".\\d+", ".#")
      // ignore duplicates
      if (!plainGivenKeys.contains(key)) {
        plainGivenKeys += key
      }
    }
    var lastKey: Option[String] = None
    val supportedFactories = classFactories.filter { factory =>
      val requiredContextKeys = normalizeContext(factory).keySet
      val (supportedKeys, wildcards) = normalizeSupportedProperties(factory)
      // ignore context keys
      val givenContextFreeKeys = plainGivenKeys.filter(!requiredContextKeys.contains(_))
      // perform factory specific filtering of keys
      val givenFilteredKeys = filterSupportedPropertiesFactorySpecific(
        factory,
        givenContextFreeKeys)

      givenFilteredKeys.forall { k =>
        lastKey = Option(k)
        supportedKeys.contains(k) || wildcards.exists(k.startsWith)
      }
    }

    if (supportedFactories.isEmpty && classFactories.length == 1 && lastKey.isDefined) {
      // special case: when there is only one matching factory but the last property key
      // was incorrect
      val factory = classFactories.head
      val (supportedKeys, _) = normalizeSupportedProperties(factory)
      throw new NoMatchingTableFactoryException(
        s"""
          |The matching factory '${factory.getClass.getName}' doesn't support '${lastKey.get}'.
          |
          |Supported properties of this factory are:
          |${supportedKeys.sorted.mkString("\n")}""".stripMargin,
        factoryClass,
        foundFactories,
        properties)
    } else if (supportedFactories.isEmpty) {
      throw new NoMatchingTableFactoryException(
        s"No factory supports all properties.",
        factoryClass,
        foundFactories,
        properties)
    } else if (supportedFactories.length > 1) {
      throw new AmbiguousTableFactoryException(
        supportedFactories,
        factoryClass,
        foundFactories,
        properties)
    }

    supportedFactories.head.asInstanceOf[T]
  }

  /**
    * Prepares the supported properties of a factory to be used for match operations.
    */
  private def normalizeSupportedProperties(factory: TableFactory): (Seq[String], Seq[String]) = {
    val supportedPropertiesJava = factory.supportedProperties()
    if (supportedPropertiesJava == null) {
      throw new TableException(
        s"Supported properties of factory '${factory.getClass.getName}' must not be null.")
    }
    val supportedKeys = supportedPropertiesJava.asScala.map(_.toLowerCase)

    // extract wildcard prefixes
    val wildcards = extractWildcardPrefixes(supportedKeys)

    (supportedKeys, wildcards)
  }

  /**
    * Converts the prefix of properties with wildcards (e.g., "format.*").
    */
  private def extractWildcardPrefixes(propertyKeys: Seq[String]): Seq[String] = {
    propertyKeys
      .filter(_.endsWith("*"))
      .map(s => s.substring(0, s.length - 1))
  }

  /**
    * Performs filtering for special cases (i.e. table format factories with schema derivation).
    */
  private def filterSupportedPropertiesFactorySpecific(
      factory: TableFactory,
      keys: Seq[String])
    : Seq[String] = factory match {

    case formatFactory: TableFormatFactory[_] =>
      val includeSchema = formatFactory.supportsSchemaDerivation()
      // ignore non-format (or schema) keys
      keys.filter { k =>
        if (includeSchema) {
          k.startsWith(SchemaValidator.SCHEMA + ".") ||
            k.startsWith(FormatDescriptorValidator.FORMAT + ".")
        } else {
          k.startsWith(FormatDescriptorValidator.FORMAT + ".")
        }
      }

    case _ =>
      keys
  }
}
  • TableFactoryService主要用于根据factoryClass及Descriptor(或者descriptor.toProperties)来查找匹配的TableFactory,其主要的匹配逻辑在于filterByFactoryClass、filterByContext、filterBySupportedProperties这几个方法中;filterByFactoryClass方法根据指定的factoryClass查找classFactories;filterByContext方法根据descriptor.toProperties来进一步过滤classFactories得到contextFactories;filterBySupportedProperties方法则根据supportedProperties进一步过滤contextFactories

小结

  • TableFactory定义了requiredContext及supportedProperties两个方法,其中requiredContext用于进行factory的匹配,supportedProperties用于指定factory支持的属性,如果传入了factory不支持的属性则会抛出异常
  • BatchTableSourceFactory继承了TableFactory,定义了createBatchTableSource方法;BatchTableSinkFactory继承了TableFactory,定义了createBatchTableSink方法;StreamTableSourceFactory继承了TableFactory,定义了createStreamTableSource方法;StreamTableSinkFactory继承了TableFactory,定义了createStreamTableSink方法
  • ConnectorDescriptor继承了DescriptorBase,实现了Descriptor接口,它重写了Descriptor接口的toProperties方法,定义内置属性CONNECTOR_TYPE及CONNECTOR_PROPERTY_VERSION,之后还通过抽象方法toConnectorProperties来合并子类定义的属性
  • TableFactoryUtil是个工具类,主要用于根据指定的TableEnvironment及Descriptor来创建TableSource或TableSink;它内部利用TableFactoryService,使用descriptor.toProperties来寻找对应的TableFactory
  • TableFactoryService主要用于根据factoryClass及Descriptor(或者descriptor.toProperties)来查找匹配的TableFactory,其主要的匹配逻辑在于filterByFactoryClass、filterByContext、filterBySupportedProperties这几个方法中;filterByFactoryClass方法根据指定的factoryClass查找classFactories;filterByContext方法根据descriptor.toProperties来进一步过滤classFactories得到contextFactories;filterBySupportedProperties方法则根据supportedProperties进一步过滤contextFactories

doc

  • Define a TableFactory

相关文章:

  • SpringAOP源码分析总结
  • mongodb分片
  • 某些编程语言更容易出错?研究人员提出质疑
  • 通过npm或yarn自动生成vue组件
  • EOS是什么
  • 互联网大厂Java面试题:使用无界队列的线程池会导致内存飙升吗?
  • App Store 隐私政策网址(URL)
  • 企业级java springboot b2bc商城系统开源源码二次开发(十六)用restTemplate消费服务...
  • GraphQL 官方自述文档(翻译)
  • 网页视频流m3u8/ts视频下载
  • OS开发基础——多线程的简单应用
  • 阿里云ACE认证学习知识点梳理
  • 二、flask配置与应用-flask
  • Linux下的计算器(bc、expr、dc、echo、awk)知多少?
  • CH2601 电路维修(双端队列bfs)建图恶心
  • [原]深入对比数据科学工具箱:Python和R 非结构化数据的结构化
  • 【从零开始安装kubernetes-1.7.3】2.flannel、docker以及Harbor的配置以及作用
  • 【前端学习】-粗谈选择器
  • Android 初级面试者拾遗(前台界面篇)之 Activity 和 Fragment
  • JSDuck 与 AngularJS 融合技巧
  • Redis字符串类型内部编码剖析
  • scala基础语法(二)
  • vue和cordova项目整合打包,并实现vue调用android的相机的demo
  • Webpack 4x 之路 ( 四 )
  • 编写高质量JavaScript代码之并发
  • 汉诺塔算法
  • 前端
  • 前端技术周刊 2019-02-11 Serverless
  • 推荐一个React的管理后台框架
  • 消息队列系列二(IOT中消息队列的应用)
  • 自动记录MySQL慢查询快照脚本
  • NLPIR智能语义技术让大数据挖掘更简单
  • (1)SpringCloud 整合Python
  • (3)STL算法之搜索
  • (6)设计一个TimeMap
  • (Java)【深基9.例1】选举学生会
  • (solr系列:一)使用tomcat部署solr服务
  • (蓝桥杯每日一题)平方末尾及补充(常用的字符串函数功能)
  • (六)vue-router+UI组件库
  • (每日持续更新)jdk api之FileFilter基础、应用、实战
  • (一)pytest自动化测试框架之生成测试报告(mac系统)
  • (转)h264中avc和flv数据的解析
  • (转)关于如何学好游戏3D引擎编程的一些经验
  • * CIL library *(* CIL module *) : error LNK2005: _DllMain@12 already defined in mfcs120u.lib(dllmodu
  • .axf 转化 .bin文件 的方法
  • .java 9 找不到符号_java找不到符号
  • .NET Core日志内容详解,详解不同日志级别的区别和有关日志记录的实用工具和第三方库详解与示例
  • .NET Core中的去虚
  • .NET/ASP.NETMVC 深入剖析 Model元数据、HtmlHelper、自定义模板、模板的装饰者模式(二)...
  • .net安装_还在用第三方安装.NET?Win10自带.NET3.5安装
  • .w文件怎么转成html文件,使用pandoc进行Word与Markdown文件转化
  • @Autowired注解的实现原理
  • @SuppressWarnings注解
  • [100天算法】-x 的平方根(day 61)
  • [20150321]索引空块的问题.txt