当前位置: 首页 > news >正文

[Spark SQL]Spark SQL读取Kudu,写入Hive

SparkUnit

Function:用于获取Spark Session

package com.example.unitlimport org.apache.spark.sql.SparkSessionobject SparkUnit {def getLocal(appName: String): SparkSession = {SparkSession.builder().appName(appName).master("local[*]").getOrCreate()}def getLocal(appName: String, supportHive: Boolean): SparkSession = {if (supportHive) getLocal(appName,"local[*]",true)else getLocal(appName)}def getLocal(appName:String,master:String,supportHive:Boolean): SparkSession = {if (supportHive) SparkSession.builder().appName(appName).master(master).enableHiveSupport().getOrCreate()else  SparkSession.builder().appName(appName).master(master).getOrCreate()}def stopSs(ss:SparkSession): Unit ={if (ss != null) {ss.stop()}}
}

log4j.properties

Function:设置控制台输出级别

# Set everything to be logged to the console
log4j.rootCategory=ERROR, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n# Set the default spark-shell log level to WARN. When running the spark-shell, the
# log level for this class is used to overwrite the root logger's log level, so that
# the user can have different defaults for the shell and regular Spark apps.
log4j.logger.org.apache.spark.repl.Main=WARN# Settings to quiet third party logs that are too verbose
log4j.logger.org.spark_project.jetty=WARN
log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
log4j.logger.org.apache.parquet=ERROR
log4j.logger.parquet=ERROR# SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support
log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR

KTV

Function:读取kudu,写入hive。Kudu_To_Hive,简称KTV

package com.example.daoimport com.example.unitl.SparkUnit
import org.apache.spark.sql.SparkSessionobject KTV {def getKuduTableDataFrame(ss: SparkSession): Unit = {// 读取kudu// 获取tb对象val kuduTb = ss.read.format("org.apache.kudu.spark.kudu").option("kudu.master", "10.168.1.12:7051").option("kudu.table", "impala::realtimedcs.bakup_db") // Tips:注意指定库.load()// create viewkuduTb.createTempView("v1")val kudu_unit1_df = ss.sql("""|SELECT * FROM `sources_tb1`|WHERE `splittime` = "2021-07-11"|""".stripMargin)// printkudu_unit1_df.printSchema()kudu_unit1_df.show()// load of memorykudu_unit1_df.createOrReplaceTempView("v2")}def insertHive(ss: SparkSession): Unit = {// create tabless.sql("""|USE `bakup_db`|""".stripMargin)ss.sql("""|  CREATE TABLE IF NOT EXISTS `bak_tb1`(|   `id` int,|   `packtimestr` string,|   `dcs_name` string,|   `dcs_type` string,|   `dcs_value` string,|   `dcs_as` string,|   `dcs_as2` string)| PARTITIONED BY (|   `splittime` string)|""".stripMargin)println("创建表成功!")// create viewss.sql("""|INSERT INTO `bakup_db`|SELECT * FROM bak_tb1|""".stripMargin)println("保存成功!")}def main(args: Array[String]): Unit = {//get ssval ss = SparkUnit.getLocal("KTV", true)// 做动态分区, 所以要先设定partition参数// default是false, 需要额外下指令打开这个开关ss.sqlContext.setConf("hive.exec.dynamic.partition;","true");ss.sqlContext.setConf("hive.exec.dynamic.partition.mode","nonstrict");// 调用方法getKuduTableDataFrame(ss)insertHive(ss)// 关闭连接SparkUnit.stopSs(ss)}
}

运行:

运行时请将hive的配置文件 hive-site.xml文件,复制到项目resource下。

hue查看写入的数据:

略

相关文章:

  • 第1篇:Mysql数据库表结构导出字段到Excel(一个sheet中)
  • react native 键盘事件
  • 机器学习——贝叶斯分类器(基础理论+编程)
  • DC电源模块的设计与调试技巧
  • 安卓studio连接手机之后,一两秒之后就自动断开了。问题解决。
  • LeetCode——贪心算法
  • C++初阶:STL容器list的使用与初版自实现
  • Python学习从0到1 day18 Python可视化基础综合案例 1.折线图
  • Spring Boot:基础配置
  • sentinel使用控制台实现
  • Qt 压缩/解压文件
  • stm32启动文件里面的__main和主函数main()
  • cad vba 打开excel并弹窗打开指定文件
  • C++项目——集群聊天服务器项目(三)muduo网络库
  • 操作系统是怎么发展来的
  • @jsonView过滤属性
  • [数据结构]链表的实现在PHP中
  • 【node学习】协程
  • 【从零开始安装kubernetes-1.7.3】2.flannel、docker以及Harbor的配置以及作用
  • 【译】理解JavaScript:new 关键字
  • 77. Combinations
  • CAP理论的例子讲解
  • java8-模拟hadoop
  • Laravel 实践之路: 数据库迁移与数据填充
  • Linux快速复制或删除大量小文件
  • node-glob通配符
  • PHP 程序员也能做的 Java 开发 30分钟使用 netty 轻松打造一个高性能 websocket 服务...
  • Python学习之路16-使用API
  • quasar-framework cnodejs社区
  • rabbitmq延迟消息示例
  • Rancher如何对接Ceph-RBD块存储
  • TCP拥塞控制
  • 记一次和乔布斯合作最难忘的经历
  • 前端技术周刊 2019-01-14:客户端存储
  • 首页查询功能的一次实现过程
  • Redis4.x新特性 -- 萌萌的MEMORY DOCTOR
  • 湖北分布式智能数据采集方法有哪些?
  • #define与typedef区别
  • #HarmonyOS:基础语法
  • #pragma multi_compile #pragma shader_feature
  • (2)(2.10) LTM telemetry
  • (delphi11最新学习资料) Object Pascal 学习笔记---第8章第5节(封闭类和Final方法)
  • (NO.00004)iOS实现打砖块游戏(九):游戏中小球与反弹棒的碰撞
  • (zt)最盛行的警世狂言(爆笑)
  • (编译到47%失败)to be deleted
  • (二)PySpark3:SparkSQL编程
  • (附源码)流浪动物保护平台的设计与实现 毕业设计 161154
  • (剑指Offer)面试题41:和为s的连续正数序列
  • (十) 初识 Docker file
  • (一)基于IDEA的JAVA基础1
  • .MyFile@waifu.club.wis.mkp勒索病毒数据怎么处理|数据解密恢复
  • .net core IResultFilter 的 OnResultExecuted和OnResultExecuting的区别
  • .net专家(高海东的专栏)
  • /proc/stat文件详解(翻译)
  • /使用匿名内部类来复写Handler当中的handlerMessage()方法