Projection and Filter Pushdown with Apache Spark DataFrames and Datasets

Projection and filter pushdown improve query performance. When you apply the select and filter methods on DataFrames and Datasets, the HPE Ezmeral Data Fabric Database OJAI Connector for Apache Spark pushes these elements to HPE Ezmeral Data Fabric Database where possible.

Projection Pushdown

Projection pushdown minimizes data transfer between HPE Ezmeral Data Fabric Database and the Apache Spark engine by omitting unnecessary fields from table scans. It is especially beneficial when a table contains many columns.

When you invoke the following select method on a DataFrame, the connector pushes the projection:

Scala
import org.apache.spark.sql.SparkSession
import com.mapr.db.spark.sql._ 

val df = sparkSession.loadFromMapRDB("/tmp/user_profiles")
df.select("_id", "first_name", "last_name")
Java
import com.mapr.db.spark.sql.api.java.MapRDBJavaSession;
            
MapRDBJavaSession maprSession = new MapRDBJavaSession(sparkSession);
Dataset<Row> df = maprSession.loadFromMapRDB("/tmp/user_profiles");
df.select("_id", "first_name", "last_name");
Python
from pyspark.sql import SparkSession
            
df = spark_session.loadFromMapRDB("/tmp/user_profiles")
df.select("_id", "first_name", "last_name")      

The equivalent example using Datasets is as follows:

Scala
import org.apache.spark.sql.SparkSession
import com.mapr.db.spark.sql._ 
            
val ds = sparkSession.loadFromMapRDB[Person]("/tmp/user_profiles").as[Person]
ds.select("_id", "first_name", "last_name")
Java
import com.mapr.db.spark.sql.api.java.MapRDBJavaSession;
            
MapRDBJavaSession maprSession = new MapRDBJavaSession(sparkSession);
Dataset<Row> ds = maprSession.loadFromMapRDB("/tmp/user_profiles", Person.class);
ds.select("_id", "first_name", "last_name");       

Filter Pushdown

Filter pushdown improves performance by reducing the amount of data passed between HPE Ezmeral Data Fabric Database and the Apache Spark engine when filtering data.

Consider the following example:

Scala
import org.apache.spark.sql.SparkSession
import com.mapr.db.spark.sql._ 
            
val df = sparkSession.loadFromMapRDB("/tmp/user_profiles")
df.filter("first_name = 'Bill'") 
Java
import com.mapr.db.spark.sql.api.java.MapRDBJavaSession;
            
MapRDBJavaSession maprSession = new MapRDBJavaSession(spark);
Dataset<Row> df = maprSession.loadFromMapRDB("/tmp/user_profiles");
df.filter("first_name = 'Bill'")
Python
from pyspark.sql import SparkSession
            
df = spark_session.loadFromMapRDB("/tmp/user_profiles")
df.filter("first_name = 'Bill'")

The HPE Ezmeral Data Fabric Database OJAI Connector for Apache Spark pushes the filter firstName = 'Bill' down to HPE Ezmeral Data Fabric Database.

The equivalent example using Datasets is as follows:

Scala
import org.apache.spark.sql.SparkSession
import com.mapr.db.spark.sql._ 
            
val ds = sparkSession.loadFromMapRDB[Person]("/tmp/user_profiles").as[Person]
ds.filter($"first_name" === "Bill")
Java
import com.mapr.db.spark.sql.api.java.MapRDBJavaSession;
            
Dataset ds =  maprSession.loadFromMapRDB("/tmp/user_profiles").as(Encoders.bean(Person.getClass()));
ds.filter(col("first_name").equalTo("Bill"));

The following DataFrame filters those rows in which first_name is either "David" or "Peter":

Scala
df.filter($"first_name" === "David" || $"first_name" === "Peter")
Java
df.filter(col("first_name").equalTo("David").or(col("first_name").equalTo("Peter")))
Python
df.filter((col("first_name") == "David") | (col("first_name") == "Peter"))

The following DataFrame retrieves only the rows in which the first_name is "David" and the last_name is "Jones":

Scala
df.filter($"first_name" === "David" && $"last_name" === "Jones")
Java
df.filter(col("first_name").equalTo("David").and(col("last_name").equalTo("Jones")))
Python
df.filter((col("first_name") == "David") & (col("last_name") == "Jones"))
The following uses a not condition to return rows where the first_name is not "David" and the last_name is not "Peter":
Scala
df.filter(not($"first_name" === "David || $"last_name" === "Peter"))
Java
df.filter(not(col("first_name").equalTo("David").or(col("last_name").equalTo("Peter"))))
Python
df.filter(~((col("first_name") == "David") | (col("last_name") == "Peter")))

The HPE Ezmeral Data Fabric Database OJAI Connector pushes down all of the filters shown in the earlier examples. It can push down the following types of filters, provided that the field is not an Array or Map:

Restrictions

Pushdowns with DataFrames and Datasets are not supported in the following scenarios:

These limitations do not apply to pushdowns on RDDs. An alternative is to apply the pushdown using an RDD, and then convert the RDD to a DataFrame.
Note: HPE Ezmeral Data Fabric Database 6.0 introduces support for Secondary Indexes, but the HPE Ezmeral Data Fabric Database OJAI Connector for Spark does not currently leverage them.