There are two options for bulk loading data into HBase with Spark:
Both implementations work more or less like the MapReduce bulk load process. A partitioner partitions the RowKeys based on region splits, and the RowKeys are sent to the reducers in order, so that HFiles can be written directly from the reduce phase.
In Spark terms, the bulk load is implemented around a
SparkrepartitionAndSortWithinPartitions followed by a Spark
foreachPartition. Here is an example of using the basic bulk load
functionality:
:paste mode.import org.apache.hadoop.fs.Path
import org.apache.hadoop.hbase.mapreduce.{LoadIncrementalHFiles, TableInputFormat}
import org.apache.hadoop.hbase.spark._
import org.apache.hadoop.hbase.spark.HBaseRDDFunctions._
import org.apache.hadoop.hbase.util.Bytes._
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.spark.sql.SparkSession
import org.apache.hadoop.hbase.client.{HBaseAdmin, HConnectionManager}
val tableName = "table1"
val stagingFolder = "/home/mapr"
val columnFamily1 = "cf1"
@transient val conf = HBaseConfiguration.create()
val hbaseContext = new HBaseContext(sc, conf)
conf.set(TableInputFormat.INPUT_TABLE, tableName)
conf.set("hbase.zookeeper.quorum", "node1.cluster.com")
conf.setInt("hbase.zookeeper.property.clientPort", 5181)
val rdd = sc.parallelize(Array(
(toBytes("1"), (toBytes(columnFamily1), toBytes("a"), toBytes("foo1"))),
(toBytes("3"), (toBytes(columnFamily1), toBytes("b"), toBytes("foo2.b")))
))
rdd.hbaseBulkLoad(hbaseContext,
TableName.valueOf(tableName),
t => {
val rowKey = t._1
val family: Array[Byte] = t._2._1
val qualifier = t._2._2
val value: Array[Byte] = t._2._3
val keyFamilyQualifier= new KeyFamilyQualifier(rowKey, family, qualifier)
Seq((keyFamilyQualifier, value)).iterator
},
stagingFolder)
val connection = HConnectionManager.createConnection(conf)
val table = connection.getTable(TableName.valueOf(tableName))
val load = new LoadIncrementalHFiles(conf)
load.doBulkLoad(
new Path(stagingFolder),
connection.getAdmin,
table,
connection.getRegionLocator(TableName.valueOf(tableName)))
hbaseBulkLoad function takes three required parameters:KeyFamilyQualifer object and the value being the cell
value. The KeyFamilyQualifer object holds the RowKey, Column Family,
and Column Qualifier. The shuffle partitions on the RowKey but sorts by all three
values. LoadIncrementalHFiles object to load the newly
created HFiles into HBase. hbaseBulkLoad::paste mode.import org.apache.hadoop.fs.Path
import org.apache.hadoop.hbase.client.HConnectionManager
import org.apache.hadoop.hbase.mapreduce.{LoadIncrementalHFiles, TableInputFormat}
import org.apache.hadoop.hbase.spark.HBaseRDDFunctions._
import org.apache.hadoop.hbase.spark.{FamilyHFileWriteOptions, HBaseContext, KeyFamilyQualifier}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{HBaseConfiguration, HConstants, TableName}
import org.apache.spark.sql.SparkSession
val tableName = "table2"
val stagingFolder = "/home/mapr"
val columnFamily1 = "cf1"
val sc = spark.sparkContext
@transient val conf = HBaseConfiguration.create()
conf.set(TableInputFormat.INPUT_TABLE, tableName)
conf.set("hbase.zookeeper.quorum", "node1.cluster.com")
conf.setInt("hbase.zookeeper.property.clientPort", 5181)
val hbaseContext = new HBaseContext(sc, conf)
val rdd = sc.parallelize(Array(
(Bytes.toBytes("1"),
(Bytes.toBytes(columnFamily1),
Bytes.toBytes("a"), Bytes.toBytes("foo1"))),
(Bytes.toBytes("3"),
(Bytes.toBytes(columnFamily1),
Bytes.toBytes("b"),
Bytes.toBytes("foo2.b")))))
val familyHBaseWriterOptions =
new java.util.HashMap[Array[Byte], FamilyHFileWriteOptions]
val f1Options = new FamilyHFileWriteOptions("GZ", "ROW", 128, "PREFIX")
familyHBaseWriterOptions.put(Bytes.toBytes(columnFamily1), f1Options)
rdd.hbaseBulkLoad(hbaseContext,
TableName.valueOf(tableName),
t => {
val rowKey = t._1
val family:Array[Byte] = t._2._1
val qualifier = t._2._2
val value = t._2._3
val keyFamilyQualifier= new KeyFamilyQualifier(rowKey, family, qualifier)
Seq((keyFamilyQualifier, value)).iterator
},
stagingFolder,
familyHBaseWriterOptions,
compactionExclude = false,
HConstants.DEFAULT_MAX_FILE_SIZE)
val connection = HConnectionManager.createConnection(conf)
val table = connection.getTable(TableName.valueOf(tableName))
val load = new LoadIncrementalHFiles(conf)
load.doBulkLoad(new Path(stagingFolder),
connection.getAdmin, table, connection.getRegionLocator(TableName.valueOf(tableName)))
:paste
mode.import org.apache.hadoop.fs.Path
import org.apache.hadoop.hbase.client.HConnectionManager
import org.apache.hadoop.hbase.mapreduce.{LoadIncrementalHFiles, TableInputFormat}
import org.apache.hadoop.hbase.spark.HBaseRDDFunctions._
import org.apache.hadoop.hbase.spark.{HBaseContext, _}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.spark.sql.SparkSession
val tableName = "table3"
val stagingFolder = "/home/mapr"
val columnFamily1 = "cf1"
@transient val conf = HBaseConfiguration.create()
val hbaseContext = new HBaseContext(sc, conf)
conf.set(TableInputFormat.INPUT_TABLE, tableName)
conf.set("hbase.zookeeper.quorum", "node1.cluster.com")
conf.setInt("hbase.zookeeper.property.clientPort", 5181)
val rdd = sc.parallelize(Array(
("1", List(Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo1"))),
("3", List(Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("foo2.b")))))
rdd.hbaseBulkLoadThinRows(hbaseContext,
TableName.valueOf(tableName),
t => {
val rowKey = t._1
val familyQualifiersValues = new FamiliesQualifiersValues
val q = t._2
val family:Array[Byte] = q.head
val qualifier = q(1)
val value:Array[Byte] = q(2)
println(s"family: $family")
println(s"qualifier: $qualifier")
println(s"value: $value")
familyQualifiersValues +=(family, qualifier, value)
(new ByteArrayWrapper(Bytes.toBytes(rowKey)), familyQualifiersValues)}, stagingFolder, new java.util.HashMap[Array[Byte], FamilyHFileWriteOptions], compactionExclude = false, 20)
val connection = HConnectionManager.createConnection(conf)
val table = connection.getTable(TableName.valueOf(tableName))
val load = new LoadIncrementalHFiles(conf)
load.doBulkLoad(
new Path(stagingFolder),
connection.getAdmin,
table,
connection.getRegionLocator(TableName.valueOf(tableName)))The big difference in using bulk load for thin rows is that the function returns a tuple
with the first value being the RowKey and the second value being an object of
FamiliesQualifiersValues. FamiliesQualifiersValues
contains all the values for this row for all column families.