Apache Iceberg
Apache Iceberg is an open table format for huge analytic datasets that provides high-performance format for large data tables built on top of distributed storage systems such as Apache Ozone. This integration allows users to combine Iceberg's powerful table management capabilities with Ozone's scalable and resilient storage.
How Iceberg Works with Storage
Iceberg's architecture relies on storage systems for several key components:
- Data Files: The actual data content stored in file formats like Parquet, ORC, or Avro
- Metadata Files: JSON files containing table schemas, partitioning information, and snapshots
- Manifest Files: Lists of data files that belong to a table snapshot
- Manifest Lists: Files that track table snapshots and their corresponding manifests
Iceberg requires storage systems to provide:
- File-level operations (create, read, update, delete)
- Atomic operations for metadata updates
- Consistent reads for reliable table access
- Support for both small metadata files and large data files
How Ozone Enables Iceberg Integration
Ozone provides native compatibility with Iceberg through the Ozone File System (OFS) interface, which implements the Hadoop Compatible File System (HCFS) API. This enables Iceberg to work with Ozone without code modifications.
The key enabler is the ofs://
protocol, which:
- Provides HDFS-like file system semantics
- Supports atomic directory operations with File System Optimized (FSO) buckets
- Maintains hierarchical namespace required by Iceberg's table structure
- Delivers consistent reads and writes for reliable metadata operations
Configuration Requirements
1. Add Ozone File System JAR to Classpath
Ensure the Ozone filesystem JAR is added to the classpath of all services that will use Iceberg with Ozone:
export HADOOP_CLASSPATH=/opt/ozone/share/ozone/lib/ozone-filesystem-hadoop3-*.jar:$HADOOP_CLASSPATH
2. Configure Hadoop Core Site
Add Ozone filesystem implementation to your Hadoop configuration:
<property>
<n>fs.ofs.impl</n>
<value>org.apache.hadoop.fs.ozone.RootedOzoneFileSystem</value>
</property>
<!-- For OM HA (Recommended for Production) -->
<property>
<n>ozone.om.service.ids</n>
<value>ozone1</value>
</property>
<property>
<n>ozone.om.nodes.ozone1</n>
<value>om1.host:9862,om2.host:9862,om3.host:9862</value>
</property>
3. Configure Iceberg Catalog
Set up an Iceberg catalog to use Ozone as the storage backend. Here's an example for a Hadoop catalog:
Spark Configuration
// In Spark
spark.sql.catalog.ozone_catalog = org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.ozone_catalog.type = hadoop
spark.sql.catalog.ozone_catalog.warehouse = ofs://ozone1/vol1/bucket1/warehouse/iceberg
// Optional: set as default catalog
spark.sql.defaultCatalog = ozone_catalog
Flink Configuration
catalogs:
- name: ozone_catalog
type: iceberg
catalog-type: hadoop
warehouse: ofs://ozone1/vol1/bucket1/warehouse/iceberg
Trino Configuration
connector.name=iceberg
iceberg.catalog.type=hadoop
iceberg.catalog.warehouse=ofs://ozone1/vol1/bucket1/warehouse/iceberg
Bucket Layout Considerations
For Iceberg workloads, use File System Optimized (FSO) bucket layout, which:
- Maintains a hierarchical directory structure in Ozone
- Enables atomic operations needed for reliable metadata updates
- Provides optimal performance for table operations
To create an FSO bucket for Iceberg:
ozone sh bucket create --layout FILE_SYSTEM_OPTIMIZED /vol1/bucket1
Working with Iceberg Tables in Ozone
Creating Tables
-- Spark SQL example
CREATE TABLE ozone_catalog.db.table (
id bigint,
data string,
ts timestamp)
USING iceberg
PARTITIONED BY (days(ts));
Reading and Writing Data
-- Write data to the table
INSERT INTO ozone_catalog.db.table VALUES (1, 'a', timestamp('2022-01-01 00:00:00'));
-- Read data from the table
SELECT * FROM ozone_catalog.db.table;
Using Iceberg Table Features
Iceberg tables in Ozone support all Iceberg features, including:
-- Time travel queries
SELECT * FROM ozone_catalog.db.table VERSION AS OF 1234;
-- Incremental queries
SELECT * FROM ozone_catalog.db.table INCREMENTAL FROM 1234 TO 5678;
-- Metadata queries
SELECT * FROM ozone_catalog.db.table.snapshots;
Performance Optimizations
Optimizing Read Performance
-
Utilize Metadata Pruning: Iceberg's metadata pruning works effectively with Ozone.
-- Efficient pruning using partition and filter pushdown
SELECT * FROM ozone_catalog.db.table
WHERE ts > timestamp('2022-01-01') AND ts < timestamp('2022-02-01'); -
Optimize for Concurrent Reads: Ozone's distributed nature supports high concurrency.
// Set higher parallelism for reads
spark.read
.option("read.split.target-size", "134217728") // 128 MB
.format("iceberg")
.load("ozone_catalog.db.table")
Optimizing Write Performance
-
Tuning File Sizes: Optimize file sizes for Ozone's block size.
ALTER TABLE ozone_catalog.db.table SET
TBLPROPERTIES ('write.target-file-size-bytes'='134217728'); -
Optimizing Partitioning: Choose partitioning schemes appropriate for your query patterns and Ozone's characteristics.
-- Partitioning by date with proper cardinality
CREATE TABLE ozone_catalog.db.table (
id bigint,
data string,
ts timestamp)
USING iceberg
PARTITIONED BY (days(ts));
Table Maintenance Operations
Regular table maintenance ensures optimal performance:
-- Compact small files
CALL ozone_catalog.system.rewrite_data_files('db.table');
-- Remove old snapshots
CALL ozone_catalog.system.expire_snapshots('db.table', TIMESTAMP '2022-01-01 00:00:00');
-- Run full vacuum
CALL ozone_catalog.system.remove_orphan_files('db.table');
Best Practices for Iceberg with Ozone
-
Use File System Optimized (FSO) buckets for Iceberg workloads to enable atomic operations needed for metadata consistency.
-
Choose appropriate partition schemes to balance query performance with Ozone's data placement.
-
Regularly maintain tables by compacting small files and expiring old snapshots to optimize read performance and storage usage.
-
Configure catalog for high availability by using Ozone's HA features with the
ofs://
protocol. -
Tune file sizes based on your workload patterns and Ozone's block size.
-
Use Ozone Recon to monitor storage usage and performance metrics.
-
Ensure proper JAR inclusion in all services that interact with Iceberg tables in Ozone.
Limitations and Considerations
- Iceberg tables in Ozone require FSO bucket layout for atomic operations
- Performance characteristics may differ from HDFS or S3, particularly for metadata-heavy operations
- Thoroughly test table maintenance operations in your environment
- Ensure network connectivity between compute and storage is optimized
Open Data Lakehouse with Iceberg on Ozone
Apache Ozone with Iceberg provides an ideal foundation for building an open data lakehouse architecture that combines the best of data lakes and data warehouses.
Architecture Overview
A typical data lakehouse architecture with Iceberg on Ozone consists of:
-
Storage Layer: Apache Ozone provides the scalable, reliable storage foundation
- Multi-protocol access (S3, HDFS-compatible)
- High durability through replication or erasure coding
- Cost-effective compared to public cloud object stores
-
Table Format Layer: Apache Iceberg manages table structure and metadata
- Schema evolution
- Partition evolution
- Snapshot isolation
- Time travel capabilities
-
Compute Layer: Multiple engines can work on the same data
- Apache Spark for batch and streaming
- Apache Flink for streaming
- Trino/Presto for interactive queries
- Hive for batch processing
Implementation Patterns
A common pattern is building a multi-zone data lakehouse using Iceberg tables stored in Ozone:
-
Landing Zone: Store raw data as-is in Ozone using the S3 API
# Ingest data through the S3 gateway
aws s3 cp --endpoint-url http://ozone-s3g:9878 data.csv s3://landing-bucket/data/
# Or using the OFS interface for direct ingestion
hadoop fs -put data.csv ofs://ozone1/vol1/landing-bucket/data/ -
Bronze/Silver/Gold Architecture: Create Iceberg tables for different data refinement stages
-- Bronze: Raw data with minimal schema enforcement and validation
CREATE TABLE ozone_catalog.bronze.sales (
transaction_id STRING,
data STRING, -- JSON or raw format
ingest_time TIMESTAMP
) USING iceberg
PARTITIONED BY (days(ingest_time))
LOCATION 'ofs://ozone1/vol1/lakehouse/bronze/sales';
-- Silver: Cleansed and transformed data with proper schema
CREATE TABLE ozone_catalog.silver.sales (
transaction_id STRING,
product_id STRING,
customer_id STRING,
quantity INT,
price DECIMAL(10,2),
transaction_time TIMESTAMP,
process_time TIMESTAMP
) USING iceberg
PARTITIONED BY (days(transaction_time))
LOCATION 'ofs://ozone1/vol1/lakehouse/silver/sales';
-- Gold: Business-ready, aggregated data for analytics
CREATE TABLE ozone_catalog.gold.daily_sales_by_region (
date DATE,
region STRING,
product_category STRING,
total_sales DECIMAL(20,2),
transaction_count BIGINT
) USING iceberg
PARTITIONED BY (date)
LOCATION 'ofs://ozone1/vol1/lakehouse/gold/daily_sales_by_region'; -
ETL Processes: Implement data transformation pipelines between zones
// Spark example: Bronze to Silver transformation
spark.sql("""
INSERT INTO ozone_catalog.silver.sales
SELECT
CAST(get_json_object(data, '$.transaction_id') AS STRING) as transaction_id,
CAST(get_json_object(data, '$.product_id') AS STRING) as product_id,
CAST(get_json_object(data, '$.customer_id') AS STRING) as customer_id,
CAST(get_json_object(data, '$.quantity') AS INT) as quantity,
CAST(get_json_object(data, '$.price') AS DECIMAL(10,2)) as price,
CAST(get_json_object(data, '$.transaction_time') AS TIMESTAMP) as transaction_time,
current_timestamp() as process_time
FROM ozone_catalog.bronze.sales
WHERE ingest_time > (SELECT max(process_time) FROM ozone_catalog.silver.sales)
""") -
Analytics and BI Tools: Use SQL engines to query the data
-- Business analytics on gold layer
SELECT
region,
SUM(total_sales) as quarterly_sales,
SUM(transaction_count) as quarterly_transactions
FROM ozone_catalog.gold.daily_sales_by_region
WHERE date BETWEEN '2023-01-01' AND '2023-03-31'
GROUP BY region
ORDER BY quarterly_sales DESC;
Advantages of This Architecture
This open data lakehouse approach with Iceberg on Ozone provides several key benefits:
- Open Format: No vendor lock-in; data stored in open formats (Parquet, Avro, ORC)
- Schema Evolution: Add, remove, or change columns without impacting operations
- ACID Transactions: Reliable concurrent reads and writes with consistent snapshots
- Time Travel: Query data as it existed at a specific point in time
- Cost Efficiency: Ozone provides cost-effective storage compared to cloud alternatives
- Multi-Engine Support: Use the right tool for each job (Spark, Flink, Trino, etc.)
- Separation of Storage and Compute: Scale each independently
Infrastructure Considerations
For optimal performance in a production environment:
- Network Topology: Ensure Ozone datanodes are on the same network as compute nodes
- Storage Configuration: Use high-performance storage (NVMe SSDs) for Ozone metadata (OM and SCM)
- Memory Allocation: Provide sufficient memory for RocksDB caches in Ozone Manager
- Compute Resources: Size Spark/Trino clusters appropriately for your workloads
- Data Placement: Consider rack awareness configuration to optimize data locality
This pattern leverages Ozone's scalability and multi-protocol access with Iceberg's table management capabilities for a complete, open data lakehouse solution that avoids vendor lock-in while providing advanced data management features.
Have questions about using Apache Iceberg with Ozone? Ask in the Apache Ozone GitHub Discussions.