当前位置: 首页 > news >正文

Spark2.1.1spark写入Hbase的三种方法性能对比

测试条件

以下是我的PC信息


依赖:

<dependencies>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.11</artifactId>
        <version>2.1.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase-client</artifactId>
        <version>1.3.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase-server</artifactId>
        <version>1.3.1</version>
    </dependency>
</dependencies>1234567891011121314151617复制代码

1. 第一种方法
每次写进一条,调用API

  /**
   * Puts some data in the table.
   * 
   * @param put The data to put.
   * @throws IOException if a remote or network exception occurs.
   * @since 0.20.0
   */
  void put(Put put) throws IOException12345678复制代码

我的代码

import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.hadoop.hbase.client.{ConnectionFactory, Put}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}


object Word {
  def main(args: Array[String]): Unit = {
    val sc = new SparkContext(new SparkConf().setMaster("local[2]").setAppName("hbase"))
    val rdd = sc.makeRDD(Array(1)).flatMap(_ => 0 to 1000000)
    rdd.foreachPartition(x => {
      val hbaseConf = HBaseConfiguration.create()
      hbaseConf.set("hbase.zookeeper.quorum", "172.17.11.85,172.17.11.86,172.17.11.87")
      hbaseConf.set("hbase.zookeeper.property.clientPort", "2181")
      hbaseConf.set("hbase.defaults.for.version.skip", "true")
      val hbaseConn = ConnectionFactory.createConnection(hbaseConf)
      val table = hbaseConn.getTable(TableName.valueOf("word"))
      x.foreach(value => {
        var put = new Put(Bytes.toBytes(value.toString))
        put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("c1"), Bytes.toBytes(value.toString))
        table.put(put)
      })
    })
  }
}
123456789101112131415161718192021222324252627282930复制代码

第一条的时间戳:1497973306787


最后一条的时间戳:1497973505273


时间戳之差1497973505273-1497973306787=198486


2.第二种方法 

批量写入Hbase,使用的API:

  /**
   * {@inheritDoc}
   * @throws IOException
   */
  @Override
  public void put(final List<Put> puts) throws IOException {
    getBufferedMutator().mutate(puts);
    if (autoFlush) {
      flushCommits();
    }
  }1234567891011复制代码

我的代码:

import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.hadoop.hbase.client.{ConnectionFactory, HTable, Put}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}


object Word {
  def main(args: Array[String]): Unit = {
    val sc = new SparkContext(new SparkConf().setMaster("local[2]").setAppName("hbase"))
    val rdd = sc.makeRDD(Array(1)).flatMap(_ => 0 to 1000000)
    rdd.map(value => {
      var put = new Put(Bytes.toBytes(value.toString))
      put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("c1"), Bytes.toBytes(value.toString))
      put
    }).foreachPartition(iterator => {
      var jobConf = new JobConf(HBaseConfiguration.create())
      jobConf.set("hbase.zookeeper.quorum", "172.17.11.85,172.17.11.86,172.17.11.87")
      jobConf.set("zookeeper.znode.parent", "/hbase")
      jobConf.setOutputFormat(classOf[TableOutputFormat])
      val table = new HTable(jobConf, TableName.valueOf("word"))
      import scala.collection.JavaConversions._
      table.put(seqAsJavaList(iterator.toSeq))
    })
  }
}
123456789101112131415161718192021222324252627282930复制代码

第一条数据的时间戳是

 0                               column=f1:c1, timestamp=1498013677545, value=0                                              
 1                               column=f1:c1, timestamp=1498013677545, value=1                                              
 10                              column=f1:c1, timestamp=1498013677545, value=10                                             
 100                             column=f1:c1, timestamp=1498013677545, value=100                                            
 1000                            column=f1:c1, timestamp=1498013677545, value=1000  12345复制代码

第9999条数据写进Hbase的时间戳是:

 108993                          column=f1:c1, timestamp=1498013680244, value=108993                                         
 108994                          column=f1:c1, timestamp=1498013680244, value=108994                                         
 108995                          column=f1:c1, timestamp=1498013680244, value=108995                                         
9999 row(s) in 1.2730 seconds1234复制代码

时间戳之差t=1498013680244-1498013677545=2699


3.第三种方法

将写进Hbase转换为Mapreduce任务

我的代码:

import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.hadoop.hbase.client.{ConnectionFactory, Put}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}


object Word {
  def main(args: Array[String]): Unit = {
    val sc = new SparkContext(new SparkConf().setMaster("local[2]").setAppName("hbase"))
    val conf = HBaseConfiguration.create()
    var jobConf = new JobConf(conf)
    jobConf.set("hbase.zookeeper.quorum", "172.17.11.85,172.17.11.86,172.17.11.87")
    jobConf.set("zookeeper.znode.parent", "/hbase")
    jobConf.set(TableOutputFormat.OUTPUT_TABLE, "word")
    jobConf.setOutputFormat(classOf[TableOutputFormat])

    val rdd = sc.makeRDD(Array(1)).flatMap(_ => 0 to 1000000)
    rdd.map(x => {
      var put = new Put(Bytes.toBytes(x.toString))
      put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("c1"), Bytes.toBytes(x.toString))
      (new ImmutableBytesWritable, put)
    }).saveAsHadoopDataset(jobConf)
  }
}12345678910111213141516171819202122232425262728复制代码

第一条的时间戳:

0                               column=f1:c1, timestamp=1498014877635, value=0    1复制代码

最后一条的时间戳

 108993                          column=f1:c1, timestamp=1498014879526, value=108993                                         
 108994                          column=f1:c1, timestamp=1498014879526, value=108994                                         
 108995                          column=f1:c1, timestamp=1498014879526, value=108995  123复制代码

时间戳之差t=1498014879526-1498014877635=1891

4.总结

通过以上对比可以看出,在其他条件相同的情况下

第三种方法(1498014879526-1498014877635=1891)>第二种方法(1498013680244-1498013677545=2699)>第一种方法(1497973505273-1497973306787=198486)

最优方法是第三种

                     长按识别关注我们,每天都有精彩内容分享哦 ~ ~

                         

相关文章:

  • javascript框架概览备忘
  • mybatis 逆向工程配置文件
  • O2O?啥是“呕吐呕”?
  • 快速搞定常用的ES6新特性
  • Excel 将换行符替换为空
  • android PhoneGap源码详解
  • 数据绑定(九)Binding的数据校验
  • 【转】oc中消息传递机制-附:对performSelector方法的扩充
  • 【WPF】SnapsToDevicePixels与UseLayoutRounding二者到底有什么区别?供参考
  • 每日一Lua(3)-函数
  • Python 学习日记7
  • js实现深拷贝
  • [转载]我再也不想在任何头文件中看到“using namespace xxx;”了
  • Mybatis架构设计及源码分析-SqlSession
  • C# 禁止windows程序重复运行的两种基本方法
  • es6(二):字符串的扩展
  • Eureka 2.0 开源流产,真的对你影响很大吗?
  • React中的“虫洞”——Context
  • 时间复杂度与空间复杂度分析
  • 试着探索高并发下的系统架构面貌
  • gunicorn工作原理
  • 如何在招聘中考核.NET架构师
  • 移动端高清、多屏适配方案
  • ​​​​​​​ubuntu16.04 fastreid训练过程
  • (C++)栈的链式存储结构(出栈、入栈、判空、遍历、销毁)(数据结构与算法)
  • (C语言)字符分类函数
  • (附源码)spring boot球鞋文化交流论坛 毕业设计 141436
  • (教学思路 C#之类三)方法参数类型(ref、out、parmas)
  • (六)c52学习之旅-独立按键
  • (强烈推荐)移动端音视频从零到上手(上)
  • (转)Linux整合apache和tomcat构建Web服务器
  • (转)负载均衡,回话保持,cookie
  • .naturalWidth 和naturalHeight属性,
  • .NET 4 并行(多核)“.NET研究”编程系列之二 从Task开始
  • .net 4.0发布后不能正常显示图片问题
  • .NET的微型Web框架 Nancy
  • .xml 下拉列表_RecyclerView嵌套recyclerview实现二级下拉列表,包含自定义IOS对话框...
  • /3GB和/USERVA开关
  • @AliasFor注解
  • @SuppressWarnings注解
  • [20171113]修改表结构删除列相关问题4.txt
  • [20190113]四校联考
  • [51nod1610]路径计数
  • [Android]使用Git将项目提交到GitHub
  • [BZOJ 2142]礼物(扩展Lucas定理)
  • [C#]winform部署PaddleOCRV3推理模型
  • [C/C++]_[初级]_[关于编译时出现有符号-无符号不匹配的警告-sizeof使用注意事项]
  • [C++] Boost智能指针——boost::scoped_ptr(使用及原理分析)
  • [COGS 622] [NOIP2011] 玛雅游戏 模拟
  • [HackMyVM]靶场 Wild
  • [Hive] CTE 通用表达式 WITH关键字
  • [hive小技巧]同一份数据多种处理
  • [HTML]HTML5实现可编辑表格
  • [Java][方法引用]构造方法的引用事例分析
  • [Neural Network] {Université de Sherbrooke} L2.9 Param Initialization