当前位置: 首页 > news >正文

Spark入Hbase的四种方式效率对比

一、方式介绍

本次测试一种采用了四种方式进行了对比,分别是:1.在RDD内部调用java API。2、调用saveAsNewAPIHadoopDataset()接口。3、saveAsHadoopDataset()。4、BulkLoad方法。

测试使用的大数据版本如下(均为单机版):Hadoop2.7.4、Hbase1.0.2、Spark2.1.0

二、测试

本次测试采用10W条单一列簇单一字段固定值进行测试。

以下是测试结果:

1.JAVA API

  10W条数据:1000ms、944ms

  100w条数据:6308ms、6725ms

2.saveAsNewAPIHadoopDataset()接口

  10W条数据:2585ms、3125ms

  100w条数据:13833ms、14880ms

3.saveAsHadoopDataset()接口

       10W条数据:2623ms、2596ms

  100w条数据:14929ms、13753ms

4.BulkLoad方法(此方法是导入大量数据最好的选择!!!

    10W条数据:9351ms、9364ms

  100w条数据:9342ms、9403ms

       1000w条数据:9690ms、9609ms

三、代码

pom引用

<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase</artifactId>
<version>1.2.6</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.0.2</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.0.2</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
<version>1.0.2</version>
</dependency>

1)javaAPI代码
-------------------------------------
package cn.piesat.app

import java.text.DecimalFormat
import java.util.{ArrayList, List, Random}

import org.apache.hadoop.hbase.{HBaseConfiguration, HColumnDescriptor, HTableDescriptor, TableName}
import org.apache.hadoop.hbase.client._

object SparkJavaApi {
val ZOOKEEPER_ADDRESS = "hadoop01"
val ZOOKEEPER_PORT = "2181"
val df2: DecimalFormat = new DecimalFormat("00")

def main(args: Array[String]) = {
val tableName: String = "test01"
val conn = getConn
val admin = conn.getAdmin
val putList = getPutList()
if (!admin.tableExists(TableName.valueOf(tableName))) {
createTable(admin, tableName, Array("cf"))
}
val start: Long = System.currentTimeMillis
insertBatchData(conn,tableName,admin,putList)
val end: Long = System.currentTimeMillis
System.out.println("用时:" + (end - start))
}

def getConn(): Connection = {
val conf = HBaseConfiguration.create
conf.set("hbase.zookeeper.quorum", ZOOKEEPER_ADDRESS)
conf.set("hbase.zookeeper.property.clientPort", ZOOKEEPER_PORT)
ConnectionFactory.createConnection(conf)
}

def insertBatchData(conn: Connection, tableName: String, admin: Admin, puts:List[Put]) = try {
val tableNameObj = TableName.valueOf(tableName)
if (admin.tableExists(tableNameObj)) {
val table = conn.getTable(tableNameObj)
table.put(puts)
table.close()
admin.close()
}
} catch {
case e: Exception =>
e.printStackTrace()
}

def createTable(admin: Admin, tableName: String, colFamiles: Array[String]) = try {
val tableNameObj = TableName.valueOf(tableName)
if (!admin.tableExists(TableName.valueOf(tableName))) {
val desc = new HTableDescriptor(tableNameObj)
for (colFamily <- colFamiles) {
desc.addFamily(new HColumnDescriptor(colFamily))
}
admin.createTable(desc)
admin.close()
}
} catch {
case e: Exception =>
e.printStackTrace()
}

def getPutList(): List[Put] = {
val random: Random = new Random
val putlist = new ArrayList[Put]();
for (i <- 0 until 100000) {
val rowkey: String = df2.format(random.nextInt(99)) + i
val put: Put = new Put(rowkey.getBytes)
put.add("cf".getBytes, "field".getBytes, "a".getBytes)
putlist.add(put)
}
putlist
}

}
-------------------------------------

2)saveAsNewAPIHadoopDataset()接口
-------------------------------------
package cn.piesat.app

import java.text.DecimalFormat

import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.hbase._
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.sql.SparkSession

import scala.collection.mutable.ListBuffer

//10W用了2585ms
//100W用了13833ms、14880ms
object SparkToHbaseNewAPI {
val tableName = "test01"
val cf = "cf"
val num=1000000
val df2 = new DecimalFormat("00000000")
def main(args: Array[String]) = {
val sc = getSparkSession().sparkContext
val hbaseConf = HBaseConfiguration.create()
hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, "hadoop01:2181")
val hbaseConn = ConnectionFactory.createConnection(hbaseConf)
val admin = hbaseConn.getAdmin
val jobConf = new JobConf(hbaseConf, this.getClass)
// 设置表名
jobConf.set(TableOutputFormat.OUTPUT_TABLE, tableName)

// 如果表不存在则创建表
if (!admin.tableExists(TableName.valueOf(tableName))) {
val desc = new HTableDescriptor(TableName.valueOf(tableName))
val hcd = new HColumnDescriptor(cf)
desc.addFamily(hcd)
admin.createTable(desc)
}

val job = Job.getInstance(jobConf)
job.setOutputKeyClass(classOf[ImmutableBytesWritable])
job.setOutputValueClass(classOf[Put])
job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
var list = ListBuffer[Put]()
println("数据准备中。。。。")
for (i <- 0 to num) {
val put = new Put(df2.format(i).getBytes())
put.addColumn(cf.getBytes(), "field".getBytes(), "abc".getBytes())
list.append(put)
}
println("数据准备完成!")
val data = sc.makeRDD(list.toList).map(x => {
(new ImmutableBytesWritable, x)
})
val start = System.currentTimeMillis()

data.saveAsNewAPIHadoopDataset(job.getConfiguration)
val end = System.currentTimeMillis()
println("入库用时:" + (end - start))
sc.stop()

}

def getSparkSession(): SparkSession = {
SparkSession.builder().
appName("SparkToHbase").
master("local[4]").
config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").
getOrCreate()
}
}
-------------------------------------
3)saveAsHadoopDataset()接口
-------------------------------------
package cn.piesat.app
import java.text.DecimalFormat

import org.apache.hadoop.hbase._
import org.apache.hadoop.hbase.client.{ConnectionFactory, Put}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.sql.SparkSession

import scala.collection.mutable.ListBuffer
object SparkToHbaseOldAPI {
val tableName="test01"
val cf="cf"
val df2 = new DecimalFormat("00000000")
val num=1000000
//10W用时2623ms、2596ms
//100W用时14929ms、13753ms
def main(args: Array[String]): Unit = {
val sc = getSparkSession().sparkContext
val hbaseConf = HBaseConfiguration.create()
hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, "hadoop01:2181")
val hbaseConn = ConnectionFactory.createConnection(hbaseConf)
val admin = hbaseConn.getAdmin
val jobConf = new JobConf(hbaseConf, this.getClass)
// 设置表名
jobConf.set(TableOutputFormat.OUTPUT_TABLE, tableName)
jobConf.setOutputFormat(classOf[TableOutputFormat])

// 如果表不存在则创建表
if (!admin.tableExists(TableName.valueOf(tableName))) {
val desc = new HTableDescriptor(TableName.valueOf(tableName))
val hcd = new HColumnDescriptor(cf)
desc.addFamily(hcd)
admin.createTable(desc)
}

var list = ListBuffer[Put]()
println("数据准备中。。。。")
for (i <- 0 to num) {
val put = new Put(df2.format(i).getBytes())
put.addColumn(cf.getBytes(), "field".getBytes(), "abc".getBytes())
list.append(put)
}
println("数据准备完成!")
val data = sc.makeRDD(list.toList).map(x => {
(new ImmutableBytesWritable, x)
})
val start=System.currentTimeMillis()
data.saveAsHadoopDataset(jobConf)
val end=System.currentTimeMillis()
println("入库用时:"+(end-start))
sc.stop()
}

def getSparkSession(): SparkSession = {
SparkSession.builder().
appName("SparkToHbase").
master("local[4]").
config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").
getOrCreate()
}
}
-------------------------------------
4)BulkLoad方法(需要事先准备好数据文件)
------------------------------------
package cn.piesat

import org.apache.hadoop.fs.Path
import org.apache.hadoop.hbase.client.{HTable, Table, _}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.{HFileOutputFormat2, LoadIncrementalHFiles}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{HBaseConfiguration, KeyValue, TableName}
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.{SparkConf, SparkContext}

object SparkHbaseBulkload {

def main(args: Array[String]) = {
val sc = new SparkContext("local[4]", "appName")
val columnFamily1 = "cf"
val conf = HBaseConfiguration.create()
conf.set("hbase.zookeeper.property.clientPort", "2181")
conf.set("hbase.zookeeper.quorum", "hadoop01")

val source=sc.textFile("file:///E:/student.txt").map{
x=>{
val splited=x.split(",")
val rowkey=splited(0)
val cf=splited(1)
val clomn=splited(2)
val value=splited(3)
(rowkey,cf,clomn,value)
}
}
val rdd = source.map(x => {
//将rdd转换成HFile需要的格式,我们上面定义了Hfile的key是ImmutableBytesWritable,那么我们定义的RDD也是要以ImmutableBytesWritable的实例为key
//KeyValue的实例为value
//rowkey
val rowKey = x._1
val family = x._2
val colum = x._3
val value = x._4
(new ImmutableBytesWritable(Bytes.toBytes(rowKey)), new KeyValue(Bytes.toBytes(rowKey), Bytes.toBytes(family), Bytes.toBytes(colum), Bytes.toBytes(value)))
})
//生成的HFile的临时保存路径
val stagingFolder = "hdfs://hadoop01:9000/data12"
//将日志保存到指定目录
rdd.saveAsNewAPIHadoopFile(stagingFolder,
classOf[ImmutableBytesWritable],
classOf[KeyValue],
classOf[HFileOutputFormat2],
conf)
//此处运行完成之后,在stagingFolder会有我们生成的Hfile文件

//开始即那个HFile导入到Hbase,此处都是hbase的api操作
val load = new LoadIncrementalHFiles(conf)
//hbase的表名
val tableName = "output_table"
//创建hbase的链接,利用默认的配置文件,实际上读取的hbase的master地址
val conn = ConnectionFactory.createConnection(conf)
//根据表名获取表
val table: Table = conn.getTable(TableName.valueOf(tableName))
try {
//创建一个hadoop的mapreduce的job
val job = Job.getInstance(conf)
//设置job名称
job.setJobName("DumpFile")
//此处最重要,需要设置文件输出的key,因为我们要生成HFil,所以outkey要用ImmutableBytesWritable
job.setMapOutputKeyClass(classOf[ImmutableBytesWritable])
//输出文件的内容KeyValue
job.setMapOutputValueClass(classOf[KeyValue])
//配置HFileOutputFormat2的信息
HFileOutputFormat2.configureIncrementalLoadMap(job, table)
//开始导入
val start=System.currentTimeMillis()
load.doBulkLoad(new Path(stagingFolder), table.asInstanceOf[HTable])
val end=System.currentTimeMillis()
println("用时:"+(end-start)+"毫秒!")
} finally {
table.close()
conn.close()
}
}
}

------------------------------------

转载于:https://www.cnblogs.com/runnerjack/p/10480468.html

相关文章:

  • 如何用30分钟快速优化家中Wi-Fi?阿里工程师有绝招
  • Notepad++ 7.6.4 发布,不会再有可信任的 UAC 弹窗
  • SAP 联合忽米网、重庆高新区,共建「重庆中小企业智能化赋能中心」
  • Elasticsearch在后台启动
  • 小程序开发-8-流行页面编码与组件的细节知识
  • 向量的基本运算
  • 计算几何函数库(转)
  • java并发多线程显式锁Condition条件简介分析与监视器 多线程下篇(四)
  • 2019阿里云峰会-边缘计算专场,邀您共话大连接低时延场景下的技术探索与实践...
  • RPM 包的构建 - 实例
  • macOS Mojave 无法运行未签名程序的解决方案
  • js常见算法题
  • Jenkins控制台显示乱码
  • 代码整洁之道-第9章-单元测试-读书笔记
  • 系列教程丨用 Docker 探索开源软件 —— PostgreSQL(二)
  • (三)从jvm层面了解线程的启动和停止
  • @jsonView过滤属性
  • [笔记] php常见简单功能及函数
  • 【5+】跨webview多页面 触发事件(二)
  • Angular6错误 Service: No provider for Renderer2
  • Java知识点总结(JDBC-连接步骤及CRUD)
  • js中的正则表达式入门
  • Mithril.js 入门介绍
  • redis学习笔记(三):列表、集合、有序集合
  • Spark VS Hadoop:两大大数据分析系统深度解读
  • webpack入门学习手记(二)
  • 从tcpdump抓包看TCP/IP协议
  • 机器学习中为什么要做归一化normalization
  • 使用API自动生成工具优化前端工作流
  • 使用docker-compose进行多节点部署
  • 适配mpvue平台的的微信小程序日历组件mpvue-calendar
  • 数组的操作
  • 为视图添加丝滑的水波纹
  • 携程小程序初体验
  • 智能合约Solidity教程-事件和日志(一)
  • 06-01 点餐小程序前台界面搭建
  • hi-nginx-1.3.4编译安装
  • 大数据全解:定义、价值及挑战
  • 正则表达式-基础知识Review
  • (Demo分享)利用原生JavaScript-随机数-实现做一个烟花案例
  • (javascript)再说document.body.scrollTop的使用问题
  • (poj1.3.2)1791(构造法模拟)
  • (zt)基于Facebook和Flash平台的应用架构解析
  • (zz)子曾经曰过:先有司,赦小过,举贤才
  • (附源码)springboot家庭装修管理系统 毕业设计 613205
  • (附源码)计算机毕业设计大学生兼职系统
  • (九)One-Wire总线-DS18B20
  • (学习日记)2024.04.10:UCOSIII第三十八节:事件实验
  • (幽默漫画)有个程序员老公,是怎样的体验?
  • (转)IOS中获取各种文件的目录路径的方法
  • (转)机器学习的数学基础(1)--Dirichlet分布
  • (转)微软牛津计划介绍——屌爆了的自然数据处理解决方案(人脸/语音识别,计算机视觉与语言理解)...
  • ... 是什么 ?... 有什么用处?
  • .Net Attribute详解(上)-Attribute本质以及一个简单示例
  • .Net CoreRabbitMQ消息存储可靠机制