Using Apache Spark with Ozone
Apache Spark is a widely used unified analytics engine for large-scale data processing. Ozone can serve as a scalable storage layer for Spark applications, allowing you to read and write data directly from/to Ozone clusters using familiar Spark APIs.
Overview
Spark interacts with Ozone primarily through the OzoneFileSystem (OFS) connector, which allows access using the ofs://
URI scheme. You can also use the older o3fs://
scheme, though ofs://
is generally recommended, especially in CDP environments.
Key benefits include:
- Storing large datasets generated or consumed by Spark jobs directly in Ozone.
- Leveraging Ozone's scalability and object storage features for Spark workloads.
- Using standard Spark DataFrame and RDD APIs to interact with Ozone data.
Prerequisites
- Ozone Cluster: A running Ozone cluster.
- Ozone Client JARs: The
hadoop-ozone-filesystem-hadoop3.jar
(or hadoop2 variant depending on your Spark's Hadoop version) must be available on the Spark driver and executor classpaths. - Configuration: Spark needs access to Ozone configuration (
core-site.xml
and potentiallyozone-site.xml
) to connect to the Ozone cluster.
Configuration
1. Core Site (core-site.xml
)
Ensure your Hadoop core-site.xml
(accessible by Spark) includes the necessary Ozone configurations:
<configuration>
<property>
<name>fs.ofs.impl</name>
<value>org.apache.hadoop.fs.ozone.OzoneFileSystem</value>
<description>Ozone FileSystem implementation.</description>
</property>
<property>
<name>fs.o3fs.impl</name>
<value>org.apache.hadoop.fs.ozone.OzoneFileSystem</value>
<description>Legacy Ozone FileSystem implementation.</description>
</property>
<property>
<name>ozone.om.service.ids</name>
<value>ozone1</value> <!-- Replace with your OM Service ID -->
<description>Logical identifier for the Ozone Manager service.</description>
</property>
<property>
<name>ozone.om.address.ozone1</name> <!-- Use your OM Service ID -->
<value>om_host1:9862,om_host2:9862,om_host3:9862</value> <!-- Replace with your OM addresses -->
<description>Address list for Ozone Manager nodes.</description>
</property>
<!-- Add other necessary Ozone client configurations -->
</configuration>
2. Spark Configuration (spark-defaults.conf
or --conf
)
While Spark often picks up settings from core-site.xml
on the classpath, explicitly setting the implementation can sometimes be necessary:
spark.hadoop.fs.ofs.impl=org.apache.hadoop.fs.ozone.OzoneFileSystem
spark.hadoop.fs.o3fs.impl=org.apache.hadoop.fs.ozone.OzoneFileSystem
3. Client JAR Placement
Copy the hadoop-ozone-filesystem-*.jar
to the $SPARK_HOME/jars/
directory on all nodes where Spark driver and executors run. Alternatively, provide it using the --jars
option in spark-submit
.
4. Security (Kerberos)
If your Ozone and Spark clusters are Kerberos-enabled, Spark needs permission to obtain delegation tokens for Ozone. Configure the following property in spark-defaults.conf
or via --conf
, specifying your Ozone filesystem URI:
# For YARN deployments
spark.yarn.access.hadoopFileSystems=ofs://ozone1/
# For non-YARN deployments or general access
spark.kerberos.access.hadoopFileSystems=ofs://ozone1/
Replace ozone1
with your OM Service ID. Ensure the user running the Spark job has a valid Kerberos ticket (kinit
).
Usage Examples
You can read and write data using ofs://
URIs like any other Hadoop-compatible filesystem.
URI Format: ofs://<om-service-id>/<volume>/<bucket>/<path/to/key>
Reading Data (Scala)
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder.appName("Ozone Spark Read Example").getOrCreate()
// Read a CSV file from Ozone
val df = spark.read.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load("ofs://ozone1/volume1/bucket1/input/data.csv")
df.show()
spark.stop()
Writing Data (Scala)
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder.appName("Ozone Spark Write Example").getOrCreate()
// Assume 'df' is a DataFrame you want to write
val data = Seq(("Alice", 1), ("Bob", 2), ("Charlie", 3))
val df = spark.createDataFrame(data).toDF("name", "id")
// Write DataFrame to Ozone as Parquet files
df.write.mode("overwrite")
.parquet("ofs://ozone1/volume1/bucket1/output/users.parquet")
spark.stop()
Reading Data (Python)
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Ozone Spark Read Example").getOrCreate()
# Read a CSV file from Ozone
df = spark.read.format("csv") \
.option("header", "true") \
.option("inferSchema", "true") \
.load("ofs://ozone1/volume1/bucket1/input/data.csv")
df.show()
spark.stop()
Writing Data (Python)
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Ozone Spark Write Example").getOrCreate()
# Assume 'df' is a DataFrame you want to write
data = [("Alice", 1), ("Bob", 2), ("Charlie", 3)]
columns = ["name", "id"]
df = spark.createDataFrame(data, columns)
# Write DataFrame to Ozone as Parquet files
df.write.mode("overwrite") \
.parquet("ofs://ozone1/volume1/bucket1/output/users.parquet")
spark.stop()
Spark on Kubernetes
To run Spark jobs on Kubernetes accessing Ozone:
-
Build a Custom Spark Image: Create a Docker image based on your desired Spark version. Add the
hadoop-ozone-filesystem-*.jar
and the necessary Ozone configuration files (core-site.xml
,ozone-site.xml
) into the image (e.g., under/opt/hadoop/conf
). -
Configure
spark-submit
:- Set
--master k8s://<kubernetes-api-server>
. - Specify the custom image:
--conf spark.kubernetes.container.image=<your-repo>/spark-ozone:latest
. - Point to the config directory:
--conf spark.kubernetes.hadoop.configMapName=<configmap-name>
(if using ConfigMap) or ensure the image has the files and potentially setHADOOP_CONF_DIR
. - Include the JAR path if not baked into the default classpath:
--jars local:///path/in/container/to/hadoop-ozone-filesystem.jar
. - Add necessary Kubernetes configurations (
namespace
,serviceAccountName
, etc.).
Example using
o3fs
(adapt forofs
):./bin/spark-submit \
--master k8s://https://<KUBERNETES_MASTER_IP>:<PORT> \
--deploy-mode cluster \
--name spark-ozone-test \
--class org.apache.spark.examples.SparkPi \
--conf spark.executor.instances=1 \
--conf spark.kubernetes.container.image=<your-docker-repo>/spark-ozone:latest \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
--conf spark.kubernetes.namespace=your-namespace \
--conf spark.hadoop.fs.o3fs.impl=org.apache.hadoop.fs.ozone.OzoneFileSystem \
--conf spark.driver.extraJavaOptions="-Dsun.security.krb5.rcache=/tmp/krb5cc_spark -Dsun.security.krb5.debug=true" \
--conf spark.executor.extraJavaOptions="-Dsun.security.krb5.rcache=/tmp/krb5cc_spark -Dsun.security.krb5.debug=true" \
local:///opt/spark/examples/jars/spark-examples_*.jar \
o3fs://bucket1.volume1.ozone-om-host:9862/testoutput \
10(Adapt the
o3fs
path and configuration forofs
scheme as needed.) - Set