The HPE Ezmeral Data Fabric Database Binary Connector for Apache Spark supports different data formats such as Avro, JSON, and others. The following use case shows how Spark supports Avro. You can persist the Avro record into HPE Ezmeral Data Fabric Database binary tables directly. Internally, the Avro schema is converted to a native Spark Catalyst data type automatically. Note that both key-value parts in a HPE Ezmeral Data Fabric Database binary table can be defined in Avro format.
catalog for schema mapping. catalog is a
schema for a HPE Ezmeral Data Fabric Database binary table named Avrotable, a row key as
key, and one column col1. The rowkey also has to be
defined in details as a column (col0), which has a specific
cf
(rowkey).def catalog = s"""{
|"table":{"namespace":"default", "name":"/path_to/avro_table"},
|"rowkey":"key",
|"columns":{
|"col0":{"cf":"rowkey", "col":"key", "type":"string"},
|"col1":{"cf":"cf1", "col":"col1", "type":"binary"}
|}
|}""".stripMargin
schemaString is defined first. Then it is parsed to get
avroSchema. avroSchema is used to generate
AvroHBaseRecord. data prepared by users is a local Scala
collection that has 256 AvroHBaseRecord
objects.object AvroHBaseRecord {
val schemaString =
s"""{"namespace": "example.avro",
| "type": "record", "name": "User",
| "fields": [
| {"name": "name", "type": "string"},
| {"name": "favorite_number", "type": ["int", "null"]},
| {"name": "favorite_color", "type": ["string", "null"]},
| {"name": "favorite_array", "type": {"type": "array", "items": "string"}},
| {"name": "favorite_map", "type": {"type": "map", "values": "int"}}
| ] }""".stripMargin
val avroSchema: Schema = {
val p = new Schema.Parser
p.parse(schemaString)
}
def apply(i: Int): AvroHBaseRecord = {
val user = new GenericData.Record(avroSchema);
user.put("name", s"name")
user.put("favorite_number", i)
user.put("favorite_color", s"color")
val favoriteArray = new GenericData.Array[String](
2,
avroSchema.getField("favorite_array").schema())
favoriteArray.add(s"number")
favoriteArray.add(s"number1")
user.put("favorite_array", favoriteArray)
import collection.JavaConverters._
val favoriteMap = Map[String, Int](("key1" -> i), ("key2" -> (i+1))).asJava
user.put("favorite_map", favoriteMap)
val avroByte = AvroSedes.serialize(user, avroSchema)
AvroHBaseRecord(s"name", avroByte)
}
}
val data = (0 to 255).map { i =>
AvroHBaseRecord(i)
}
sc.parallelize(data).toDF.write.options(
Map(
HBaseTableCatalog.tableCatalog -> catalog,
HBaseTableCatalog.newTable -> "5")
).format("org.apache.spark.sql.execution.datasources.hbase")
.save()
withCatalog function, read
returns a DataFrameReader that can be used to read data in as a DataFrame. The
option function adds input options for the underlying data source to the
DataFrameReader. There are two options: one is to set avroSchema as
AvroHBaseRecord.schemaString. The other option is to set
HBaseTableCatalog.tableCatalog as avroCatalog. The
load() function loads input in as a DataFrame. The date frame df returned
by the withCatalog function can be used to access the HPE Ezmeral Data Fabric Database binary
table.def avroCatalog = s"""{
|"table":{"namespace":"default", "name":"avrotable"},
|"rowkey":"key",
|"columns":{
|"col0":{"cf":"rowkey", "col":"key", "type":"string"},
|"col1":{"cf":"cf1", "col":"col1", "avro":"avroSchema"}
|}
|}""".stripMargin
def withCatalog(cat: String): DataFrame = {
sqlContext
.read
.options(Map(
"avroSchema" -> AvroHBaseRecord.schemaString,
HBaseTableCatalog.tableCatalog -> avroCatalog)
).format("org.apache.spark.sql.execution.datasources.hbase")
.load()
}
val df = withCatalog(catalog)
df DataFrame, you can query data.
registerTempTable registers df DataFrame as a temporary
table using the table name avrotable. The sqlContext.sql
function allows you to execute SQL
queries.df.registerTempTable("avrotable")
val c = sqlContext.sql("select count(1) from avrotable"