Loading Data from HPE Ezmeral Data Fabric Database as an Apache Spark RDD

You can use the following API to load JSON-format data from a HPE Ezmeral Data Fabric Database table into an Apache Spark RDD of a JSON document:

Scala
For loading as an RDD, apply the following method on a SparkContext object:
def loadFromMapRDB[T](table: String): RDD[T]
Java
For loading as an RDD, apply the following method on a MapRDBJavaSparkContext object:
mapRDBSparkContext.loadFromMapRDB(tableName: String, clazz: Class)
Note: The only required parameter to the methods is tableName. All the others are optional.

The following example creates a userprofilesRDD by calling loadFromMapRDB from SparkContext (Scala) or MapRDBSparkContext (Java) and supplying the table ("/tmp/user_profiles"):

Scala
import com.mapr.db.spark._
            
val userprofilesRDD = sc.loadFromMapRDB("/tmp/user_profiles")
Java
import com.mapr.db.spark.api.java.MapRDBJavaSparkContext;
import com.mapr.db.spark.sql.api.java.MapRDBJavaSession;
 
MapRDBJavaSparkContext mapRDBSparkContext = new MapRDBJavaSparkContext(sc);
JavaRDD userprofilesRDD = mapRDBSparkContext.loadFromMapRDB("/tmp/user_profiles")   

The following example creates a usersInfo RDD by calling loadFromMapRDB from SparkContext (Scala) or MapRDBSparkContext (Java) and supplying the table ("/tmp/UserInfo"):

Scala
import com.mapr.db.spark._
            
val usersInfo = sc.loadFromMapRDB("/tmp/UserInfo")
Java
import com.mapr.db.spark.api.java.MapRDBJavaSparkContext;
            
MapRDBJavaRDD<OJAIDocument> usersInfo = mapRDBSparkContext.loadFromMapRDB("/tmp/UserInfo")
In the previous example, the usersInfo data contains the following information:

The following prints the fields and shows the output for a sample user:

Scala
usersInfo.foreach(println(_))
Java
usersInfo.foreach(System.out::println);
{
  "address":
     {"Pin":95035,"city":"milpitas","street":"350 holger way"},
  "dob":"1947-11-29",
  "first_name":"David",
  "interests":["football","books","movies"],
  "last_name":"Jones"
}

The following example shows a join operation performed on two different JSON documents using address.city as the join key:

Scala
import com.mapr.db.spark._
            
val maprd1 = sc.loadFromMapRDB("/tmp/user_profiles")
            
val maprd2 = sc.loadFromMapRDB("/tmp/user_income")
            
val collection = maprd1.map(a => (a.`address.city`[String],a))
.cogroup(maprd2.map(a=>(a.`address.city`[String],a)))
.map(a => (a._1,a._2._1.size,a._2._2.size)).collect
Java
import com.mapr.db.spark.api.java.MapRDBJavaSparkContext;
import scala.Tuple2;
import scala.Tuple3;
import java.util.Collection;
            
MapRDBJavaRDD<OJAIDocument> maprd1 = mapRDBSparkContext.loadFromMapRDB("/tmp/user_profiles");
MapRDBJavaRDD<OJAIDocument> maprd2 = mapRDBSparkContext.loadFromMapRDB("/tmp/user_income");

List collection = maprd1.mapToPair(a -> new Tuple2<>(a.getString("address.city"), a))
.cogroup(maprd2.mapToPair(a -> new Tuple2<>(a.getString("address.city"), a)))
.map(a -> new Tuple3<>(a._1, ((Collection<?>)a._2._1).size(), ((Collection<?>)a._2._2).size()))
.collect();

The resulting RDD, collection, contains the count of the users in the user_profiles and user_income HPE Ezmeral Data Fabric Database tables.

The following example adds a new field into all the JSON documents:

Scala
import com.mapr.db.spark._
          
val maprd = sc.loadFromMapRDB("/tmp/user_profiles")
val documents = maprd.map(a => { a.`address.country` = "USA"; a}).collect
documents.saveToMapRDB("/tmp/cleaned_user_profiles")
Java
import com.mapr.db.spark.api.java.MapRDBJavaSparkContext;
          
MapRDBJavaRDD<OJAIDocument> maprd =   mapRDBSparkContext.loadFromMapRDB("/tmp/user_profiles");
List<OJAIDocument> documents = maprd.map(a -> {a.set("address.country", "USA"); return a;})
                                          .collect();
mapRDBSparkContext.saveToMapRDB(documents, "/tmp/cleaned_user_profiles");

Improving Performance by Using Projection Pushdown and Filter Pushdown

To improve performance, you can supply a WHERE clause and projection fields to the loadFromMapRDB API. In the following example, a condition is supplied to the loadFromMapRDB function and only certain fields are specified in the SELECT clause:

Scala
import com.mapr.db.spark._
              
val userprofilesRDD = sc.loadFromMapRDB("/tmp/user_profiles")
                        .where([condition])
                        .select("address",
                                "first_name",
                                "_id",
                                "last_name")
Java
import com.mapr.db.spark.api.java.MapRDBJavaSparkContext;
import org.ojai.store.QueryCondition;
              
MapRDBJavaSparkContext mapRDBSparkContext = new MapRDBJavaSparkContext(spark.sparkContext());
MapRDBJavaRDD userprofilesRDD = mapRDBSparkContext.loadFromMapRDB("/tmp/user_profiles")
                                                  .where([condition])
                                                  .select("address",
                                                          "first_name",
                                                          "_id",
                                                          "last_name");          

The data is loaded based on the condition. The condition is pushed down to the server, and the server returns data based on the filtering. Only the fields specified in the SELECT clause are projected.

In the following example, the WHERE clause is used as a filter condition:

Scala
import com.mapr.db.spark._
              
val userprofilesRDD = sc.loadFromMapRDB("/tmp/user_profiles")
                      .where(field("salary") >= 100)
Java
import com.mapr.db.spark.api.java.MapRDBJavaSparkContext;
import org.ojai.store.QueryCondition;

MapRDBJavaSparkContext mapRDBSparkContext = new MapRDBJavaSparkContext(spark.sparkContext());
MapRDBJavaRDD userprofilesRDD = mapRDBSparkContext.loadFromMapRDB("/tmp/user_profiles")
.where(MapRDB.newCondition().is("salary", QueryCondition.Op.GREATER_OR_EQUAL, 100));

The userprofilesRDD includes only those documents with a salary field greater than 100.

By specifying an _id field, you can find and retrieve a row for a given key:

Scala
import com.mapr.db.spark._
              
val userprofilesRDD = sc.loadFromMapRDB("/tmp/user_profiles")
                      .where(field("_id") === "k2")
Java
import com.mapr.db.spark.api.java.MapRDBJavaSparkContext;
import org.ojai.store.QueryCondition;

MapRDBJavaSparkContext mapRDBSparkContext = new MapRDBJavaSparkContext(spark.sparkContext());
MapRDBJavaRDD userprofilesRDD = mapRDBSparkContext.loadFromMapRDB("/tmp/user_profiles")
                  .where(MapRDB.newCondition().is("_id", QueryCondition.Op.EQUAL, "k2"));

WHERE Clause Semantics

The loadFromMapRDB API supports a WHERE clause to push down the filter to the JSON document API, ensuring that only relevant documents are propagated to the RDD.

You can use two options to provide the filter condition:

Following is an example of using loadFromMapRDB and supplying a condition by using Scala DSL:

Scala
Condition isDoe = field("last_name") === "Doe" 
val userprofilesRDD = sc.loadFromMapRDB("/tmp/user_profiles").where(isDoe)

For more information about using Scala DSL, see Scala DSL for Specifying Filter Conditions.

Following is an example of passing the condition using the QueryCondition API:

Scala
val maprd  = sc.loadFromMapRDB(tableName)
             .where(MapRDB.newCondition()
             .is("_id", QueryCondition.Op.EQUAL, "k2")
             .build())
Java
MapRDBJavaRDD rdd = mapRDBJavaSparkContext.loadFromMapRDB(tableName)
            .where(MapRDB.newCondition().is("_id", QueryCondition.Op.EQUAL, "k2").build());

For more information about QueryCondition, see Querying with Conditions.

Note: For additional information, see Java Examples in the source code.