当前位置: 首页 > news >正文

spark+phoenix读取hbase

正常来说这个内容应该网上可参考的文章很多,但是我还是捣鼓了好久,现在记录下来,给自己个备忘录。

phoenix是操作hbase的皮肤,他可以轻松的使用sql语句来操作hbase,比直接用hbase的原语操作要友好的多。spark直接操作hbase也是通过hbase的原语操作,操作起来比较繁琐,下面就是将spark和phoenix相结合的方法步骤。

我用的是scala语言,首先pom.xml中添加依赖

         <dependency><groupId>org.apache.phoenix</groupId><artifactId>phoenix-spark</artifactId><version>5.0.0-HBase-2.0</version><scope>provided</scope></dependency><dependency><groupId>org.apache.phoenix</groupId><artifactId>phoenix-core</artifactId><version>5.0.0-HBase-2.0</version></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-client</artifactId><version>2.4.12</version></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-server</artifactId><version>2.4.12</version></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-common</artifactId><version>2.4.12</version></dependency>

这里添加的版本信息要和你要访问的hbase相一致!

接下来,到phoenix官网下载jar包,Overview | Apache Phoenix

然后解压缩,将里面的phoenix-server-hbase-2.4-5.1.3.jar(你的版本可能和我下载的不一致,这个根据hadoop上安装的hbase的版本来定)拷贝到hbase/lib/目录下,然后重启hbase。

然后将解压的phoenix-client-hbase-2.4-5.1.3.jar包拷贝到你的工程resources目录下,然后将hadoop中的配置文件也都放到resources/conf/这个目录下,接下来开始写代码。

import org.apache.spark.SparkContext
import org.apache.spark.sql.{SQLContext, SparkSession}
import org.apache.phoenix.spark.datasource.v2.PhoenixDataSourceval spark = SparkSession.builder().appName("phoenix-test").master("local").getOrCreate()// Load data from TABLE1
val df = spark.sqlContext.read.format("phoenix").options(Map("table" -> "TABLE1", PhoenixDataSource.ZOOKEEPER_URL -> "phoenix-server:2181")).loaddf.filter(df("COL1") === "test_row_1" && df("ID") === 1L).select(df("ID")).show

这是phoenix官网提供的代码,我执行没成功,显示org.apache.phoenix.spark.datasource.v2.PhoenixDataSource这个找不到,我不知道是我依赖包没引对还是其他原因,我的代码在上面的基础上做了一些改动。

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.datasources.jdbc._
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Pathimport org.apache.log4j.Loggerobject SparkPhoenixHbase {@transient lazy val log = Logger.getLogger(this.getClass)def main(args: Array[String]): Unit = {readFromHBaseWithPhoenix()}def readFromHBaseWithPhoenix(): Unit = {val hadoopConf = new Configuration()hadoopConf.addResource(new Path("conf/core-site.xml"))hadoopConf.addResource(new Path("conf/hdfs-site.xml"))hadoopConf.addResource(new Path("conf/mapred-site.xml"))hadoopConf.addResource(new Path("conf/yarn-site.xml"))hadoopConf.addResource(new Path("conf/hbase-site.xml"))val conf = new SparkConf().setAppName("phoenix-spark-hdase").setMaster("local[*]")conf.set("spark.driver.extraClassPath","/resources/phoenix-client-hbase-2.4-5.1.3.jar")conf.set("spark.executor.extraClassPath","/resources/phoenix-client-hbase-2.4-5.1.3.jar")val it = hadoopConf.iterator()while (it.hasNext){val entry = it.next()conf.set(entry.getKey, entry.getValue)}val spark = SparkSession.builder().master("local").appName("phoenix-hbase").config(conf).getOrCreate()val phoenixConfig = Map("url" -> "jdbc:phoenix:10.12.4.51:2181",   //这里是你hadoop上安装的zookeeper的地址"driver" -> "org.apache.phoenix.jdbc.PhoenixDriver")val df = spark.read.format("jdbc").options(phoenixConfig).option("dbtable","student").load()df.show() spark.close()}
}

最好要在工程里配置上日志打印,不然执行过程中的错误信息是看不到的。

最后执行成功的结果如下所示

2024-01-18 08:53:52,487 INFO [org.apache.spark.executor.Executor] : Finished task 0.0 in stage 0.0 (TID 0). 1509 bytes result sent to driver
2024-01-18 08:53:52,493 INFO [org.apache.spark.scheduler.TaskSetManager] : Finished task 0.0 in stage 0.0 (TID 0) in 580 ms on DESKTOP-FT30H9D (executor driver) (1/1)
2024-01-18 08:53:52,494 INFO [org.apache.spark.scheduler.TaskSchedulerImpl] : Removed TaskSet 0.0, whose tasks have all completed, from pool 
2024-01-18 08:53:52,500 INFO [org.apache.spark.scheduler.DAGScheduler] : ResultStage 0 (show at SparkPhoenixHbase.scala:70) finished in 0.774 s
2024-01-18 08:53:52,502 INFO [org.apache.spark.scheduler.DAGScheduler] : Job 0 is finished. Cancelling potential speculative or zombie tasks for this job
2024-01-18 08:53:52,502 INFO [org.apache.spark.scheduler.TaskSchedulerImpl] : Killing all running tasks in stage 0: Stage finished
2024-01-18 08:53:52,504 INFO [org.apache.spark.scheduler.DAGScheduler] : Job 0 finished: show at SparkPhoenixHbase.scala:70, took 0.808840 s
2024-01-18 08:53:52,538 INFO [org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator] : Code generated in 14.3886 ms
+----+--------+---+-------+
|  ID|    NAME|AGE|   ADDR|
+----+--------+---+-------+
|1001|zhangsan| 10|tianjin|
+----+--------+---+-------+// 能看到这个就说明成功了,我的hbase student表里就这么一行信息2024-01-18 08:53:52,555 INFO [org.sparkproject.jetty.server.AbstractConnector] : Stopped Spark@4108fa66{HTTP/1.1, (http/1.1)}{0.0.0.0:4040}
2024-01-18 08:53:52,556 INFO [org.apache.spark.ui.SparkUI] : Stopped Spark web UI at http://DESKTOP-FT30H9D:4040
2024-01-18 08:53:52,566 INFO [org.apache.spark.MapOutputTrackerMasterEndpoint] : MapOutputTrackerMasterEndpoint stopped!
2024-01-18 08:53:52,581 INFO [org.apache.spark.storage.memory.MemoryStore] : MemoryStore cleared
2024-01-18 08:53:52,581 INFO [org.apache.spark.storage.BlockManager] : BlockManager stopped
2024-01-18 08:53:52,587 INFO [org.apache.spark.storage.BlockManagerMaster] : BlockManagerMaster stopped
2024-01-18 08:53:52,589 INFO [org.apache.spark.scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint] : OutputCommitCoordinator stopped!
2024-01-18 08:53:52,595 INFO [org.apache.spark.SparkContext] : Successfully stopped SparkContext
2024-01-18 08:53:59,207 INFO [org.apache.spark.util.ShutdownHookManager] : Shutdown hook called
2024-01-18 08:53:59,207 INFO [org.apache.spark.util.ShutdownHookManager] : Deleting directory C:\Users\shell\AppData\Local\Temp\spark-344ef832-7438-47dd-9126-725e6c2d8af4

相关文章:

  • gin中间件篇
  • SAP ABAP SUBMIT常用用法
  • Spring Boot 4.0:构建云原生Java应用的前沿工具
  • 爬虫模板(附完整代码+案例)
  • 常用的Qt开源库分享
  • 一、基础数据结构——2.队列——3.双端队列和单调队列1
  • 【Ant Design of Vue】Modal.confirm无法关闭的bug
  • 如何在Linux部署JumpServer堡垒机并实现远程访问本地服务
  • mybatis的缓存机制
  • vue中合并下载打包视频图片
  • Gitee Reward让开源作者不再为爱发电
  • 数组练习 Leetcode 566.重塑矩阵
  • Pytest插件pytest-django让Django测试更高效
  • Spring data都包含哪些内容
  • 100天精通Python(实用脚本篇)——第113天:基于Tesseract-OCR实现OCR图片文字识别实战
  • (十五)java多线程之并发集合ArrayBlockingQueue
  • 2017-09-12 前端日报
  • GDB 调试 Mysql 实战(三)优先队列排序算法中的行记录长度统计是怎么来的(上)...
  • JS变量作用域
  • MySQL的数据类型
  • OpenStack安装流程(juno版)- 添加网络服务(neutron)- controller节点
  • Vue ES6 Jade Scss Webpack Gulp
  • WebSocket使用
  • Zepto.js源码学习之二
  • 安装python包到指定虚拟环境
  • 大整数乘法-表格法
  • 浮动相关
  • 七牛云假注销小指南
  • 手机端车牌号码键盘的vue组件
  • 写代码的正确姿势
  • ​Spring Boot 分片上传文件
  • ​香农与信息论三大定律
  • # Python csv、xlsx、json、二进制(MP3) 文件读写基本使用
  • #宝哥教你#查看jquery绑定的事件函数
  • #传输# #传输数据判断#
  • $jQuery 重写Alert样式方法
  • (android 地图实战开发)3 在地图上显示当前位置和自定义银行位置
  • (C语言)深入理解指针2之野指针与传值与传址与assert断言
  • (MonoGame从入门到放弃-1) MonoGame环境搭建
  • (免费领源码)Java#Springboot#mysql农产品销售管理系统47627-计算机毕业设计项目选题推荐
  • (转)我也是一只IT小小鸟
  • .CSS-hover 的解释
  • .form文件_SSM框架文件上传篇
  • .Net Core与存储过程(一)
  • .Net 高效开发之不可错过的实用工具
  • .net 简单实现MD5
  • .NET开源项目介绍及资源推荐:数据持久层 (微软MVP写作)
  • .net最好用的JSON类Newtonsoft.Json获取多级数据SelectToken
  • @EnableAsync和@Async开始异步任务支持
  • [ Linux 长征路第五篇 ] make/Makefile Linux项目自动化创建工具
  • [AI]文心一言出圈的同时,NLP处理下的ChatGPT-4.5最新资讯
  • [ajaxupload] - 上传文件同时附件参数值
  • [iHooya]2023年1月30日作业解析
  • [LeetCode]Multiply Strings
  • [Linux] Ubuntu install Miniconda