当前位置: 首页 > news >正文

Flink 流处理API

目录

Environment

 Source

从集合读取数据

从文件读取数据

以Kafka消息队列的数据作为来源


流处理流程

Environment

getExecutionEnvironment

创建一个执行环境,表示当前执行程序的上下文。如果程序是独立调用的,则此方法返回本地执行环境;如果从命令行客户端调用程序以提交到集群,则此方法返回此集群的执行环境,也就是说,getExecutionEnvironment会根据查询运行的方法决定返回什么样的运行环境,是常用的一种创建执行环境的方式。

//创建一个执行环境
val env=StreamExecutionEnvironment.getExecutionEnvironment

如果没有设置并行度,会以 flink-conf.yaml 中的配置为准,默认是 1

 Source

从集合读取数据

测试代码

package com.atguigu.apitest

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._

import java.util.Properties
//
case class SensorReading( id: String, timestamp: Long, temperature: Double)

object SourceTest {
  def main(args: Array[String]): Unit = {
    //创建一个执行环境
    val env=StreamExecutionEnvironment.getExecutionEnvironment
    //从集合中获取数据
    val dataList = List(
      SensorReading("sensor_1", 1547718199, 35.8),
      SensorReading("sensor_6", 1547718201, 15.4),
      SensorReading("sensor_7", 1547718202, 6.7),
      SensorReading("sensor_10", 1547718205, 38.1)
    )

    val stream1=env.fromCollection(dataList)

    //打印输出
    stream1.print()
    //执行
    env.execute("source test")

  }
}

测试结果 

从文件读取数据

测试代码

package com.atguigu.wc

import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._

object WordCount {
  def main(args: Array[String]): Unit = {

     //创建一个批处理的执行环境
    val env:ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment

    //从文件中读取数据
    val inputPath = "D:\\HYF\\FlinkTutorial\\src\\main\\resources\\sensor.txt"
    val stream2 = env.readTextFile(inputPath)

    //打印输出
    stream2.print()

   //执行
    env.execute("source test")

  }
}

测试结果

以Kafka消息队列的数据作为来源

需要pom.xml文件中引入 kafka 连接器的依赖:


<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.12</artifactId>
<version>1.10.1</version>
</dependency>

测试环境

1、先打开zookeeper

2、再打开Kafka

3、jpsall查看是否启动成功

4、先完全关闭Kafka才能关闭zookeeper

测试代码

package com.atguigu.apitest

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011

import java.util.Properties

case class SensorReading( id: String, timestamp: Long, temperature: Double)

object SourceTest {
  def main(args: Array[String]): Unit = {
    //创建一个执行环境
    val env=StreamExecutionEnvironment.getExecutionEnvironment
    //从集合中获取数据
    val dataList = List(
      SensorReading("sensor_1", 1547718199, 35.8),
      SensorReading("sensor_6", 1547718201, 15.4),
      SensorReading("sensor_7", 1547718202, 6.7),
      SensorReading("sensor_10", 1547718205, 38.1)
    )

    val stream1=env.fromCollection(dataList)

    //打印输出
    //stream1.print()

    //从文件中读取数据
    val inputPath = "D:\\HYF\\FlinkTutorial\\src\\main\\resources\\sensor.txt"
    val stream2 = env.readTextFile(inputPath)

    //打印输出
    //stream2.print()

    //从kafka读取数据
    val properties = new Properties()
    properties.setProperty("bootstrap.servers","hadoop102:9092")
    properties.setProperty("group.id","consumer-group")
    val stream3 = env.addSource( new FlinkKafkaConsumer011[String]("sensor",new 
    SimpleStringSchema(),properties) )
    stream3.print()
    
    //执行
    env.execute("source test")

  }
}

测试结果

相关文章:

  • 技术实践|开门见山谈Prometheus
  • npm包版本不一致的问题
  • JVM内存溢出问题排查
  • java计算机毕业设计门诊药品管理系统源码+数据库+系统+lw文档+mybatis+运行部署
  • 真知灼见|客户视图与工作台:金融行业呼叫中心领域驱动设计
  • spring-task进行任务调度
  • npm实现格式化时间---就是实现时间按照要求输出--moment包
  • webdriver API进阶
  • 除自身以外数组的乘积、找到所有数组中消失的数字、两数之和
  • 四川农信分布式核心设计及验证项目成果专家评审会召开
  • 快速知识蒸馏的视觉框架-来自卡耐基梅隆大学等单位
  • c++ 11 线程支持 (std::promise)
  • 一篇文章带你看清C语言中的类型转换规则
  • 单海军:行业AI平台赋能金融企业数智化转型
  • Jmeter接口自动化(十)断言
  • 《网管员必读——网络组建》(第2版)电子课件下载
  • 【编码】-360实习笔试编程题(二)-2016.03.29
  • 8年软件测试工程师感悟——写给还在迷茫中的朋友
  • Angular 响应式表单 基础例子
  • Babel配置的不完全指南
  • Electron入门介绍
  • js递归,无限分级树形折叠菜单
  • k个最大的数及变种小结
  • mysql 5.6 原生Online DDL解析
  • Perseus-BERT——业内性能极致优化的BERT训练方案
  • Python学习笔记 字符串拼接
  • vue学习系列(二)vue-cli
  • 从0实现一个tiny react(三)生命周期
  • 第2章 网络文档
  • 聚簇索引和非聚簇索引
  • 前嗅ForeSpider采集配置界面介绍
  • 如何将自己的网站分享到QQ空间,微信,微博等等
  • 如何胜任知名企业的商业数据分析师?
  • 如何正确配置 Ubuntu 14.04 服务器?
  • 深入 Nginx 之配置篇
  • 微信端页面使用-webkit-box和绝对定位时,元素上移的问题
  • 源码之下无秘密 ── 做最好的 Netty 源码分析教程
  • 怎么把视频里的音乐提取出来
  • Java性能优化之JVM GC(垃圾回收机制)
  • kubernetes资源对象--ingress
  • 说说我为什么看好Spring Cloud Alibaba
  • 选择阿里云数据库HBase版十大理由
  • ​2020 年大前端技术趋势解读
  • ​ssh-keyscan命令--Linux命令应用大词典729个命令解读
  • #我与Java虚拟机的故事#连载15:完整阅读的第一本技术书籍
  • $(selector).each()和$.each()的区别
  • $emit传递多个参数_PPC和MIPS指令集下二进制代码中函数参数个数的识别方法
  • (+3)1.3敏捷宣言与敏捷过程的特点
  • (WSI分类)WSI分类文献小综述 2024
  • (附源码)springboot码头作业管理系统 毕业设计 341654
  • (附源码)计算机毕业设计SSM智能化管理的仓库管理
  • (规划)24届春招和25届暑假实习路线准备规划
  • (七)微服务分布式云架构spring cloud - common-service 项目构建过程
  • (算法)Game
  • (五)c52学习之旅-静态数码管