当前位置: 首页 > news >正文

Spark2.x 入门:DStream 输出操作

在Spark应用中,外部系统经常需要使用到Spark DStream处理后的数据,因此,需要采用输出操作把DStream的数据输出到数据库或者文件系统中。

这里以《Spark2.1.0入门:DStream输出操作》中介绍的NetworkWordCountStateful.scala为基础进行修改。

把DStream输出到文本文件中

NetworkWordCountStateful.scala

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.storage.StorageLevelobject NetworkWordCountStateful {def main(args: Array[String]) {//定义状态更新函数val updateFunc = (values: Seq[Int], state: Option[Int]) => {val currentCount = values.foldLeft(0)(_ + _)val previousCount = state.getOrElse(0)Some(currentCount + previousCount)}StreamingExamples.setStreamingLogLevels()  //设置log4j日志级别val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCountStateful")val sc = new StreamingContext(conf, Seconds(5))sc.checkpoint("file:///usr/local/spark/mycode/streaming/dstreamoutput/")    //设置检查点,检查点具有容错机制val lines = sc.socketTextStream("localhost", 9999)val words = lines.flatMap(_.split(" "))val wordDstream = words.map(x => (x, 1))val stateDstream = wordDstream.updateStateByKey[Int](updateFunc)stateDstream.print()//下面是新增的语句,把DStream保存到文本文件中stateDstream.saveAsTextFiles("file:///usr/local/spark/mycode/streaming/dstreamoutput/output.txt")sc.start()sc.awaitTermination()}
}

把DStream写入到MySQL数据库中

mysql> use spark
mysql> create table wordcount (word char(20), count int(4));
mysql> select * from wordcount
//这个时候wordcount表是空的,没有任何记录

NetworkWordCountStateful.scala

import java.sql.{PreparedStatement, Connection, DriverManager}
import java.util.concurrent.atomic.AtomicInteger
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.storage.StorageLevelobject NetworkWordCountStateful {def main(args: Array[String]) {//定义状态更新函数val updateFunc = (values: Seq[Int], state: Option[Int]) => {val currentCount = values.foldLeft(0)(_ + _)val previousCount = state.getOrElse(0)Some(currentCount + previousCount)}val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCountStateful")val sc = new StreamingContext(conf, Seconds(5))sc.checkpoint("file:///usr/local/spark/mycode/streaming/dstreamoutput/")    //设置检查点,检查点具有容错机制val lines = sc.socketTextStream("localhost", 9999)val words = lines.flatMap(_.split(" "))val wordDstream = words.map(x => (x, 1))val stateDstream = wordDstream.updateStateByKey[Int](updateFunc)stateDstream.print()//下面是新增的语句,把DStream保存到MySQL数据库中     stateDstream.foreachRDD(rdd => {//内部函数def func(records: Iterator[(String,Int)]) {var conn: Connection = nullvar stmt: PreparedStatement = nulltry {val url = "jdbc:mysql://localhost:3306/spark"val user = "root"val password = "hadoop"  //笔者设置的数据库密码是hadoop,请改成你自己的mysql数据库密码conn = DriverManager.getConnection(url, user, password)records.foreach(p => {val sql = "insert into wordcount(word,count) values (?,?)"stmt = conn.prepareStatement(sql);stmt.setString(1, p._1.trim)stmt.setInt(2,p._2.toInt)stmt.executeUpdate()})} catch {case e: Exception => e.printStackTrace()} finally {if (stmt != null) {stmt.close()}if (conn != null) {conn.close()}}}val repartitionedRDD = rdd.repartition(3)repartitionedRDD.foreachPartition(func)})sc.start()sc.awaitTermination()}
}

对于stateDstream,为了把它保存到MySQL数据库中,我们采用了如下的形式:

stateDstream.foreachRDD(function)

其中,function就是一个RDD[T]=>Unit类型的函数,对于本程序而言,就是RDD[(String,Int)]=>Unit类型的函数,也就是说,stateDstream中的每个RDD都是RDD[(String,Int)]类型(想象一下,统计结果的形式是(“hadoop”,3))。这样,对stateDstream中的每个RDD都会执行function中的操作(即把该RDD保存到MySQL的操作)。

下面看function的处理逻辑,在function部分,函数体要执行的处理逻辑实际上是下面的形式:

 def func(records: Iterator[(String,Int)]){……}val repartitionedRDD = rdd.repartition(3)repartitionedRDD.foreachPartition(func) 

也就是说,这里定义了一个内部函数func,它的功能是,接收records,然后把records保存到MySQL中。到这里,你可能会有疑问?为什么不是把stateDstream中的每个RDD直接拿去保存到MySQL中,还要调用rdd.repartition(3)对这些RDD重新设置分区数为3呢?这是因为,每次保存RDD到MySQL中,都需要启动数据库连接,如果RDD分区数量太大,那么就会带来多次数据库连接开销,为了减少开销,就有必要把RDD的分区数量控制在较小的范围内,所以,这里就把RDD的分区数量重新设置为3。然后,对于每个RDD分区,就调用repartitionedRDD.foreachPartition(func),把每个分区的数据通过func保存到MySQL中,这时,传递给func的输入参数就是Iterator[(String,Int)]类型的records。如果你不好理解下面这种调用形式:

repartitionedRDD.foreachPartition(func) //这种形式func没有带任何参数,可能不太好理解,不是那么直观

实际上,这句语句和下面的语句是等价的,下面的语句形式你可能会更好理解:

repartitionedRDD.foreachPartition(records => func(records)) 

上面这种等价的形式比较直观,为func()函数传入了一个records参数,这就正好和 def func(records: Iterator[(String,Int)])定义对应起来了,方便理解。

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • 鹏哥C语言自定义笔记重点(29-)
  • Oracle问题笔记
  • 跟李沐学AI:语义分割
  • Leetcode-day30-动态规划-不同路径
  • STM32G474的HRTIM用作时基定时器
  • R语言统计分析——回归分析的改进措施
  • 【机器学习】YOLO 关闭控制台推理日志
  • 2024前端面试题-js篇
  • ffmpeg6.1集成Plus-OpenGL-Patch滤镜
  • Java二十三种设计模式-解释器模式(23/23)
  • web开发html前端使用javascript脚本库JsBarcode生成条形码(条码)
  • Vue的生命周期了解
  • 数学基础 -- 线性代数之行列式不变性推导
  • linux文本分析工具grep、sed和awk打印输出文本的单双奇偶行(grep也可以打印奇偶行)以及熟悉的ssh命令却有你不知道的一些用法
  • 记录一下在IIS上部署服务器上遇到的一系列问题及解决方案
  • github从入门到放弃(1)
  • Java多线程(4):使用线程池执行定时任务
  • java小心机(3)| 浅析finalize()
  • Java知识点总结(JDBC-连接步骤及CRUD)
  • 简单易用的leetcode开发测试工具(npm)
  • 浏览器缓存机制分析
  • 前端学习笔记之观察者模式
  • 前端自动化解决方案
  • 如何解决微信端直接跳WAP端
  • 数据仓库的几种建模方法
  • 数组的操作
  • 写代码的正确姿势
  • 移动端 h5开发相关内容总结(三)
  • - 语言经验 - 《c++的高性能内存管理库tcmalloc和jemalloc》
  • 源码安装memcached和php memcache扩展
  • 小白应该如何快速入门阿里云服务器,新手使用ECS的方法 ...
  • ######## golang各章节终篇索引 ########
  • ###C语言程序设计-----C语言学习(3)#
  • #LLM入门|Prompt#1.8_聊天机器人_Chatbot
  • $.ajax中的eval及dataType
  • (3) cmake编译多个cpp文件
  • (C语言版)链表(三)——实现双向链表创建、删除、插入、释放内存等简单操作...
  • (DenseNet)Densely Connected Convolutional Networks--Gao Huang
  • (react踩过的坑)Antd Select(设置了labelInValue)在FormItem中initialValue的问题
  • (第61天)多租户架构(CDB/PDB)
  • (附源码)spring boot车辆管理系统 毕业设计 031034
  • (六)DockerCompose安装与配置
  • (三) diretfbrc详解
  • (十八)devops持续集成开发——使用docker安装部署jenkins流水线服务
  • (十八)用JAVA编写MP3解码器——迷你播放器
  • (原創) X61用戶,小心你的上蓋!! (NB) (ThinkPad) (X61)
  • (原創) 物件導向與老子思想 (OO)
  • (转)Linq学习笔记
  • (转)Mysql的优化设置
  • (转)母版页和相对路径
  • .NET Framework、.NET Core 、 .NET 5、.NET 6和.NET 7 和.NET8 简介及区别
  • .net mvc actionresult 返回字符串_.NET架构师知识普及
  • .pings勒索病毒的威胁:如何应对.pings勒索病毒的突袭?
  • ::before和::after 常见的用法
  • @EnableWebMvc介绍和使用详细demo