博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spark 读写 HBase 的两种方式(RDD、DataFrame)
阅读量:6329 次
发布时间:2019-06-22

本文共 7277 字,大约阅读时间需要 24 分钟。

hot3.png

使用 saveAsHadoopDataset 写入数据

import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, TableName}import org.apache.hadoop.hbase.client.{HBaseAdmin, Put, Result}import org.apache.hadoop.hbase.io.ImmutableBytesWritableimport org.apache.hadoop.hbase.mapreduce.TableInputFormat//import org.apache.hadoop.hbase.mapreduce.TableOutputFormatimport org.apache.hadoop.hbase.mapred.TableOutputFormatimport org.apache.hadoop.hbase.util.Bytesimport org.apache.hadoop.mapred.JobConf//import org.apache.hadoop.mapreduce.Jobimport org.apache.log4j.{Level, Logger}import org.apache.spark.sql.SparkSession/**  * Created by blockchain on 18-9-9 下午3:45 in Beijing.  */object SparkHBaseRDD {  def main(args: Array[String]) {    // 屏蔽不必要的日志显示在终端上    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)    val spark = SparkSession.builder().appName("SparkHBaseRDD").getOrCreate()    val sc = spark.sparkContext    val tablename = "SparkHBase"    val hbaseConf = HBaseConfiguration.create()    hbaseConf.set("hbase.zookeeper.quorum","localhost")  //设置zooKeeper集群地址,也可以通过将hbase-site.xml导入classpath,但是建议在程序里这样设置    hbaseConf.set("hbase.zookeeper.property.clientPort", "2181")       //设置zookeeper连接端口,默认2181    hbaseConf.set(TableOutputFormat.OUTPUT_TABLE, tablename)    // 初始化job,TableOutputFormat 是 org.apache.hadoop.hbase.mapred 包下的    val jobConf = new JobConf(hbaseConf)    jobConf.setOutputFormat(classOf[TableOutputFormat])    val indataRDD = sc.makeRDD(Array("2,jack,16", "1,Lucy,15", "5,mike,17", "3,Lily,14"))    val rdd = indataRDD.map(_.split(',')).map{ arr=>      /*一个Put对象就是一行记录,在构造方法中指定主键       * 所有插入的数据 须用 org.apache.hadoop.hbase.util.Bytes.toBytes 转换       * Put.addColumn 方法接收三个参数:列族,列名,数据*/      val put = new Put(Bytes.toBytes(arr(0)))      put.addColumn(Bytes.toBytes("cf1"),Bytes.toBytes("name"),Bytes.toBytes(arr(1)))      put.addColumn(Bytes.toBytes("cf1"),Bytes.toBytes("age"),Bytes.toBytes(arr(2)))      (new ImmutableBytesWritable, put)    }    rdd.saveAsHadoopDataset(jobConf)    spark.stop()  }}

在 中 查看写入的数据

hbase(main):005:0* scan 'SparkHBase'ROW                        COLUMN+CELL                                                                 1                         column=cf1:age, timestamp=1536494344379, value=15                           1                         column=cf1:name, timestamp=1536494344379, value=Lucy                        2                         column=cf1:age, timestamp=1536494344380, value=16                           2                         column=cf1:name, timestamp=1536494344380, value=jack                        3                         column=cf1:age, timestamp=1536494344379, value=14                           3                         column=cf1:name, timestamp=1536494344379, value=Lily                        5                         column=cf1:age, timestamp=1536494344380, value=17                           5                         column=cf1:name, timestamp=1536494344380, value=mike                       4 row(s) in 0.0940 secondshbase(main):006:0>

如上所示,写入成功。

使用 newAPIHadoopRDD 读取数据

import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, TableName}import org.apache.hadoop.hbase.client.{HBaseAdmin, Put, Result}import org.apache.hadoop.hbase.io.ImmutableBytesWritableimport org.apache.hadoop.hbase.mapreduce.TableInputFormat//import org.apache.hadoop.hbase.mapreduce.TableOutputFormatimport org.apache.hadoop.hbase.mapred.TableOutputFormatimport org.apache.hadoop.hbase.util.Bytesimport org.apache.hadoop.mapred.JobConf//import org.apache.hadoop.mapreduce.Jobimport org.apache.log4j.{Level, Logger}import org.apache.spark.sql.SparkSession/**  * Created by blockchain on 18-9-9 下午3:45 in Beijing.  */object SparkHBaseRDD {  def main(args: Array[String]) {    // 屏蔽不必要的日志显示在终端上    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)    val spark = SparkSession.builder().appName("SparkHBaseRDD").getOrCreate()    val sc = spark.sparkContext    val tablename = "SparkHBase"    val hbaseConf = HBaseConfiguration.create()    hbaseConf.set("hbase.zookeeper.quorum","localhost")  //设置zooKeeper集群地址,也可以通过将hbase-site.xml导入classpath,但是建议在程序里这样设置    hbaseConf.set("hbase.zookeeper.property.clientPort", "2181")       //设置zookeeper连接端口,默认2181    hbaseConf.set(TableInputFormat.INPUT_TABLE, tablename)        // 如果表不存在,则创建表    val admin = new HBaseAdmin(hbaseConf)    if (!admin.isTableAvailable(tablename)) {      val tableDesc = new HTableDescriptor(TableName.valueOf(tablename))      admin.createTable(tableDesc)    }    //读取数据并转化成rdd TableInputFormat 是 org.apache.hadoop.hbase.mapreduce 包下的    val hBaseRDD = sc.newAPIHadoopRDD(hbaseConf, classOf[TableInputFormat],      classOf[ImmutableBytesWritable],      classOf[Result])    hBaseRDD.foreach{ case (_ ,result) =>      //获取行键      val key = Bytes.toString(result.getRow)      //通过列族和列名获取列      val name = Bytes.toString(result.getValue("cf1".getBytes,"name".getBytes))      val age = Bytes.toString(result.getValue("cf1".getBytes,"age".getBytes))      println("Row key:"+key+"\tcf1.Name:"+name+"\tcf1.Age:"+age)    }    admin.close()    spark.stop()  }}

输出如下

Row key:1	cf1.Name:Lucy	cf1.Age:15Row key:2	cf1.Name:jack	cf1.Age:16Row key:3	cf1.Name:Lily	cf1.Age:14Row key:5	cf1.Name:mike	cf1.Age:17

Spark DataFrame 通过 Phoenix 读写 HBase

友情提示

部署Maven: 需要添加的依赖如下:

org.apache.phoenix
phoenix-core
${phoenix.version}
org.apache.phoenix
phoenix-spark
${phoenix.version}

下面老规矩,直接上代码。

import org.apache.log4j.{Level, Logger}import org.apache.spark.sql.{SaveMode, SparkSession}/**  * Created by blockchain on 18-9-9 下午8:33 in Beijing.  */object SparkHBaseDataFrame {  def main(args: Array[String]) {    // 屏蔽不必要的日志显示在终端上    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)    val spark = SparkSession.builder().appName("SparkHBaseDataFrame").getOrCreate()    val url = s"jdbc:phoenix:localhost:2181"    val dbtable = "PHOENIXTEST"    //spark 读取 phoenix 返回 DataFrame 的 第一种方式    val rdf = spark.read      .format("jdbc")      .option("driver", "org.apache.phoenix.jdbc.PhoenixDriver")      .option("url", url)      .option("dbtable", dbtable)      .load()    rdf.printSchema()    //spark 读取 phoenix 返回 DataFrame 的 第二种方式    val df = spark.read      .format("org.apache.phoenix.spark")      .options(Map("table" -> dbtable, "zkUrl" -> url))      .load()    df.printSchema()    //spark DataFrame 写入 phoenix,需要先建好表    df.write      .format("org.apache.phoenix.spark")      .mode(SaveMode.Overwrite)      .options(Map("table" -> "PHOENIXTESTCOPY", "zkUrl" -> url))      .save()    spark.stop()  }}

在 中查看写入的数据

0: jdbc:phoenix:localhost:2181> SELECT * FROM PHOENIXTEST ;+-----+----------+| PK  |   COL1   |+-----+----------+| 1   | Hello    || 2   | World    || 3   | HBase    || 4   | Phoenix  |+-----+----------+4 rows selected (0.049 seconds)0: jdbc:phoenix:localhost:2181> 0: jdbc:phoenix:localhost:2181> SELECT * FROM PHOENIXTESTCOPY ;+-----+----------+| PK  |   COL1   |+-----+----------+| 1   | Hello    || 2   | World    || 3   | HBase    || 4   | Phoenix  |+-----+----------+4 rows selected (0.03 seconds)0: jdbc:phoenix:localhost:2181>

如上所示,写入成功。


本文参考链接:

转载于:https://my.oschina.net/uchihamadara/blog/2032481

你可能感兴趣的文章
【原】东拼西凑PBR(1):PBR基础
查看>>
react 从零开始搭建开发环境
查看>>
scala recursive value x$5 needs type
查看>>
ps -ef |grep 输出的具体含义
查看>>
markdown编辑
查看>>
ASCII 在线转换器
查看>>
Linux内核同步:RCU
查看>>
Android逆向进阶——让你自由自在脱壳的热身运动(dex篇)
查看>>
Java设计模式之五大创建型模式(附实例和详解)
查看>>
60 Permutation Sequence
查看>>
主流的RPC框架有哪些
查看>>
Hive学习之路 (七)Hive的DDL操作
查看>>
[转]mysql使用关键字作为列名的处理方式
查看>>
awesome go library 库,推荐使用的golang库
查看>>
树形展示形式的论坛
查看>>
jdbcTemplate 调用存储过程。 入参 array 返回 cursor
查看>>
C++中的stack类、QT中的QStack类
查看>>
Linux常用基本命令[cp]
查看>>
CSS 相对|绝对(relative/absolute)定位系列(一)
查看>>
关于 Nginx 配置 WebSocket 400 问题
查看>>