当前位置: 首页 > news >正文

Spark+实例解读

第一部分 Spark入门

学习教程:Spark 教程 | Spark 教程

Spark 集成了许多大数据工具,例如 Spark 可以处理任何 Hadoop 数据源,也能在 Hadoop 集群上执行。大数据业内有个共识认为,Spark 只是Hadoop MapReduce 的扩展(事实并非如此),如Hadoop MapReduce 中没有的迭代查询和流处理。然而Spark并不需要依赖于 Hadoop,它有自己的集群管理系统。更重要的是,同样数据量,同样集群配置,Spark 的数据处理速度要比 Hadoop MapReduce 快10倍左右。

Spark 的一个关键的特性是数据可以在内存中迭代计算,提高数据处理的速度。虽然Spark是用 Scala开发的,但是它对 Java、Scala、Python 和 R 等高级编程语言提供了开发接口。

第二部分 SparkCore

2 RDD

2.1 转换算子-map

map是将RDD的数据一条条处理,返回新的RDD

# 定义方法
def add(data):return data*10
print(rdd.map(add).collect)
# 定义lamabda表达式
rdd.map(lambda data:data*10)
2.2 转换算子-flatMap

flatMap对RDD执行map操作,然后执行解除嵌套操作

rdd = sc.parallelize([('a',1),('a',11)])
# 将二元元组的所有value都*10进行处理
rdd.mapValues(lambda x:x*10)
    data.map { case (label, feature) => ((feature, label), 1)}.reduceByKey(_ + _).map { case ((feature, label), num) =>(feature, List((label, num))) //feature,label,cnt}.reduceByKey(_ ::: _).mapValues { x =>val size_entro = x.map(_._2).sumval res = x.map(_._2.toDouble / size_entro).map { t =>-t * (Math.log(t) / Math.log(2))}.sumsize_entro * res}.mapValues { x => x / size }.map(_._2).sum

2.3转换算子-reduceByKey

针对KV型RRDD自动按照key进行分组,然后按照提供的聚合逻辑,对组内数据value完成聚合操作

rdd.reduceByKey(func)

      val clickStat = joinDf.where(F.col("active_type")==="click").rdd.map(row => {val mapInfo = Option(row.getMap[String,Double](row.fieldIndex(feat)))mapInfo match {case Some(x) => xcase _ => null}}).filter(_!=null).flatMap(x=>x).reduceByKey(_+_)
2.4 转换算子-mapValues

针对二元元组RDD,对其内部的二元元组的value进行map操作

rdd = sc.parallelize([('a',1),('a',11)])
# 将二元元组的所有value都*10进行处理
rdd.mapValues(lambda x:x*10)
    data.map { case (label, feature) => ((feature, label), 1)}.reduceByKey(_ + _).map { case ((feature, label), num) =>(feature, List((label, num))) //feature,label,cnt}.reduceByKey(_ ::: _).mapValues { x =>val size_entro = x.map(_._2).sumval res = x.map(_._2.toDouble / size_entro).map { t =>-t * (Math.log(t) / Math.log(2))}.sumsize_entro * res}.mapValues { x => x / size }.map(_._2).sum
2.5 转换算子-groupBy

将RDD的数据进行分组

rdd.groupBy(func)

rdd = sc.parallelize([('a',1),('a',11),('b',1)])
# 通过这个函数确认按照谁来分组(返回谁即可)
print(rdd.groupBy(lambda x:x[0]).collect())
print(rdd.groupBy(lambda x:x[0]).collect())
# 结果为:
​
    val userContentListHis = spark.thriftSequenceFile(inpath_his, classOf[LongVideoUserContentStat]).map(l=>{(l.getUid,l.getContent_properties.get(0).getId)}).toDF("uid", "docid").groupBy($"uid")
2.6 转换算子-filter

过滤想要的数据进行保存

rdd = sc.parallelize([1,2,3,4,5,6])
rdd.filter(lamdba x:x%2 == 1) # 只保留奇数
    val treatmentUser = spark.read.option("header", false).option("sep", "\t").csv(inpath).select("_c0").withColumnRenamed("_c0", "userid").withColumn("flow", getexpId($"userid")).filter($"flow" >= start and $"flow" <= end).select("userid").dropDuplicates()
2.7 转换算子-其他算子
distinct算子
rdd.distinct() 一般不写去重分区val userContentHis = hisPathList.map(path =>{val hisData = spark.thriftSequenceFile(path, classOf[LongVideoUserContentStat])println(s"hisData ==>${hisData.count()}")hisData}).reduce(_ union _).distinct().repartition(partition)
union算子 
2个rdd合并成一个rdd:rdd.union(other_rdd)
只合并不去重  rdd的类型不同也是可以合并的
rdd1 = sc.parallelize([1,2,3])
rdd2 = sc.parallelize([1,2,3,4])
rdd3 = rdd1.union(rdd2)
2.8 算子面试题
1.groupByKey和reduceByKey的区别:
groupByKey仅仅有分组功能而已,reduceByKey除了分组还有聚合作用,是一个分组+聚合一体化的算子. 分组前先聚合再shuffle,预聚合,被shuffle的数据极大的减少,提升了性能.数据量越大,reduceByKey的性能优势也就越大.
​
2.rdd的分区数怎么查看? 
通过getNumPartitions API查看,返回int
​
3.Transformation和Action的区别:
转换算子的返回值100%是rdd,而Action算子不一定.转换算子是懒加载的,只有遇到Action才会执行
​
4.哪两个算子不经过Driver直接输出?
foreach 和 saveAsTextFile

3 RDD的持久化

3.1 RDD的持久化

rdd是过程数据 rdd进行相互迭代计算,执行开启时,新的RDD生成,老的RDD消失

3.2 RDD的缓存

val rawLog = profilePushLogReader(spark, date, span).persist()
3.3 RDD的checkPoint

也是将RDD的数据保存起来,仅支持磁盘存储,被认为是安全的, 不保留血缘关系

3.4 缓存面试题

4 案例

4.1 搜素引擎日志分析案例
4.2
4.3 ....
4.4 计算资源面试题
1.如何尽量提升任务计算的资源?
计算cpu核心和内存量,通过--executor-memory指定executor内存,通过--executor-cores指定executor的核心数
​

5 广播变量 累加器

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • 安全服务面试
  • java调用WebService接口
  • centos7 docker空间不足
  • 项目经理面试总结
  • JDK8新特性之Lambda表达式快速入门
  • 2024 Java 高分面试宝典 一站式搞定技术面
  • yum install git
  • golang JSON序列化
  • 【Unity2D 2022:UI】TextMeshPro组件无法显示中文
  • Spring Boot中的 6 种API请求参数读取方式
  • 核心线程创建之后是否受keepAliveTime影响?
  • SRv6 和IGP/BGP协议区别
  • 自制熊猫烧香进阶
  • centos安装python 3.9
  • 【diffusers极速入门(五)】扩散模型中的 Scheduler(noise_scheduler)的作用是什么?
  • [译]如何构建服务器端web组件,为何要构建?
  • CSS 提示工具(Tooltip)
  • If…else
  • Javascript Math对象和Date对象常用方法详解
  • java正则表式的使用
  • mongodb--安装和初步使用教程
  • php ci框架整合银盛支付
  • React-redux的原理以及使用
  • Spring Cloud中负载均衡器概览
  • 对话 CTO〡听神策数据 CTO 曹犟描绘数据分析行业的无限可能
  • 基于OpenResty的Lua Web框架lor0.0.2预览版发布
  • 爬虫进阶 -- 神级程序员:让你的爬虫就像人类的用户行为!
  • 如何合理的规划jvm性能调优
  • 设计模式走一遍---观察者模式
  • 深入 Nginx 之配置篇
  • 怎么将电脑中的声音录制成WAV格式
  • 测评:对于写作的人来说,Markdown是你最好的朋友 ...
  • 正则表达式-基础知识Review
  • (1)SpringCloud 整合Python
  • (16)Reactor的测试——响应式Spring的道法术器
  • (175)FPGA门控时钟技术
  • (笔试题)分解质因式
  • (附源码)node.js知识分享网站 毕业设计 202038
  • (附源码)springboot家庭财务分析系统 毕业设计641323
  • (转)从零实现3D图像引擎:(8)参数化直线与3D平面函数库
  • (转载)利用webkit抓取动态网页和链接
  • (转载)在C#用WM_COPYDATA消息来实现两个进程之间传递数据
  • ..thread“main“ com.fasterxml.jackson.databind.JsonMappingException: Jackson version is too old 2.3.1
  • ./include/caffe/util/cudnn.hpp: In function ‘const char* cudnnGetErrorString(cudnnStatus_t)’: ./incl
  • .mkp勒索病毒解密方法|勒索病毒解决|勒索病毒恢复|数据库修复
  • .NET Core6.0 MVC+layui+SqlSugar 简单增删改查
  • .NET Micro Framework初体验
  • .net MVC中使用angularJs刷新页面数据列表
  • .net MySql
  • .NET NPOI导出Excel详解
  • .Net程序猿乐Android发展---(10)框架布局FrameLayout
  • .NET开发不可不知、不可不用的辅助类(三)(报表导出---终结版)
  • .net开发日常笔记(持续更新)
  • .NET与java的MVC模式(2):struts2核心工作流程与原理
  • ::前边啥也没有