$title =

DuckDB Use Case#1 – Processing Complex JSON Data

;

$content = [

📦Migrating from Distributed data pipelines to Microservices for small to medium sized data processing

A Cost-Effective and Performance-Driven Approach

TL;DR

For organizations managing data transformation pipelines, achieving 95% faster processing, lower cloud costs, and a simplified, maintainable architecture is a game-changer. In this project, we optimized a Spark-based Azure Data Factory Mapping Data Flow pipeline, replacing it with DuckDB (a high-performance in-process SQL engine), leading to significant cost savings and better performance.

By moving away from distributed computing for a small-to-medium JSON workload, we reduced execution time from 3 minutes to just 1-2 seconds, while also making the system more cost-efficient and adaptable to future needs.

Key takeaways:

  • Why change? Spark-based Data Flow was overkill for this use case, leading to unnecessary expenses and performance inefficiencies.
  • What’s different? We moved transformation logic to SQL Server and DuckDB, eliminating the need for external distributed compute resources.
  • The result? A leaner, faster, and more cost-effective data pipeline that’s easier to maintain.

For engineers, this case study highlights the importance of choosing the right tools for the job. If your workload doesn’t require distributed computing, reconsidering Data Flow could lead to substantial performance and cost improvements. 🚀

Benefit CategoryAzure Data FlowDuckDB + Stored Proc
Execution Time~3 minutes1-2 seconds
Cost EfficiencyHigh compute costLower compute cost
MaintainabilityComplex pipelineSimpler, more transparent
ScalabilityAutoscalingLimited to the data or hardware sizes
Processing FlexibilityLimited to Data Flow capabilitiesSQL + Python flexibility

A Closer Look at Migration

When I started working on migrating an Azure Data Factory Mapping Data Flow pipeline, the first issue I noticed was the long execution time. The pipeline was responsible for ingesting external API data from a vendor and loading it into an Azure MS SQL database. Since it ran once per day, overnight, and the JSON payload was already well-compressed, it became clear that Data Flow might not be the best fit for this use case.

The Inefficiencies

The pipeline had two main steps: flattening and exploding JSON data from the API and filtering records based on a CSV file stored in Azure Data Lake. While these transformations were necessary, they were being done in Data Flow, which introduced performance and cost inefficiencies.

The main issue was that the pipeline read data from SQL Server, processed it externally in Data Flow, and then wrote it back to SQL Server. Given that both the comparison dataset and the final destination table were already in SQL Server, it made much more sense to handle those operations inside SQL Server itself instead of using an external processing engine.

Another bottleneck was JSON processing. Initially, I attempted to flatten and explode JSON using Pandas, but performance was a major issue. That’s when I turned to DuckDB.

The Optimized Approach

Instead of using Data Flow for transformations, I moved filtering and merging operations directly into SQL Server. Since this wasn’t an analytical workload, avoiding Spark-based distributed processing was the right move. SQL Server can efficiently handle filtering and merging without unnecessary data movement.

For JSON processing, DuckDB turned out to be a game-changer. Unlike Pandas, which struggled with performance, DuckDB handled JSON unnesting operations quickly and efficiently using SQL. This allowed me to flatten and explode the necessary fields seamlessly, making the process both faster and more transparent.

The Results

By switching from Data Flow to SQL Server and DuckDB, I reduced execution time from 3 minutes to just 1-2 seconds. The new approach eliminated unnecessary compute costs, significantly improved performance, and simplified maintenance. Without the complexity of Data Flow, the new pipeline was easier to debug and maintain.

This migration reinforced a key takeaway: just because a service exists doesn’t mean it’s the right tool for every job. In cases like this, where the workload is relatively small and SQL Server can handle transformations efficiently, eliminating Spark-based distributed processing can lead to huge performance and cost benefits.


Code

Checkout the sample code from our git repo – https://github.com/Schemon-Inc/blogs/blob/master/duckdb/duckdb_use_case1.py

Step 1: Create a sample JSON data to mimic vendor API response

sample_json = {
    "customer_id": 123,
    "orders": [
        {
            "order_id": "A001",
            "items": [
                {"product_id": "P1001", "quantity": 2},
                {"product_id": "P1002", "quantity": 1},
            ],
        },
        {
            "order_id": "A002",
            "items": [
                {"product_id": "P1003", "quantity": 5},
                {"product_id": "P1004", "quantity": 3},
            ],
        },
    ],
}

Step 2: Convert JSON to DataFrame format

It adds a new root level key “json_data_col” as only column to the DataFrame for demonstration purposes.
In real-world scenarios, json_normalize() will parse the possible columns automatically.

json_df = pd.json_normalize({"json_data_col": sample_json}, max_level=0)

Step 3: Initialize DuckDB and register JSON Data as json_table

# ===========================
# Step 3: Initialize DuckDB and register JSON Data as json_table
# ===========================
conn = duckdb.connect(database=":memory:")
conn.register("json_table", json_df)

Step 4: Flattening and Exploding JSON Using DuckDB

DuckDB’s SQL syntax makes it incredibly simple to flatten and explode JSON data.

UNNEST() function explodes the list and generates unnest column to use to extract the keys.

# ===========================
# Step 4: Flattening and Exploding JSON Using DuckDB
# ===========================
query = """
SELECT 
    json_data_col.customer_id as customer_id
    ,order_exploded.unnest.order_id as order_id
    ,item_exploded.unnest.product_id as product_id
    ,item_exploded.unnest.quantity as quantity
FROM json_table,
LATERAL UNNEST(json_data_col.orders) AS order_exploded,
LATERAL UNNEST(order_exploded.unnest.items) AS item_exploded
"""

result_df = conn.execute(query).fetchdf()
print(result_df)

Now we have output as below

   customer_id order_id product_id  quantity
0          123     A001      P1001         2
1          123     A001      P1002         1
2          123     A002      P1003         5
3          123     A002      P1004         3

Step 5: Load Data into SQL Server Staging Table for Further Processing

# ===========================
# Step 5: Load Data into SQL Server Staging Table for further processing
# ===========================

# Connect to SQL Server
conn_str = (
    f"DRIVER={DRIVER};SERVER={SERVER};DATABASE={DATABASE};UID={USERNAME};PWD={PASSWORD}"
)
sql_conn = pyodbc.connect(conn_str)
cursor = sql_conn.cursor()

# Ensure staging table exists
cursor.execute(
    """
IF OBJECT_ID('dbo.stg_sale', 'U') IS NULL
CREATE TABLE dbo.stg_sale (
    customer_id INT,
    order_id NVARCHAR(50),
    product_id NVARCHAR(50),
    quantity INT
);
"""
)
sql_conn.commit()

# Truncate the staging table before loading
cursor.execute("TRUNCATE TABLE dbo.stg_sale")
sql_conn.commit()

# Insert data into staging table
for _, row in result_df.iterrows():
    cursor.execute(
        """
        INSERT INTO dbo.stg_sale (customer_id, order_id, product_id, quantity)
        VALUES (?, ?, ?, ?)
    """,
        row["customer_id"],
        row["order_id"],
        row["product_id"],
        row["quantity"],
    )

sql_conn.commit()

Step 6: Process Data into Final Table using Stored Procedure

# ===========================
# Step 6: Process Data into Final Table using Stored Procedure
# ===========================

# Ensure final table exists
cursor.execute(
    """
IF OBJECT_ID('dbo.sale', 'U') IS NULL
CREATE TABLE dbo.sale (
    customer_id INT NOT NULL,
    order_id NVARCHAR(50) NOT NULL,
    product_id NVARCHAR(50) NOT NULL,
    quantity INT NOT NULL,
    PRIMARY KEY (customer_id, order_id, product_id)
);
"""
)
sql_conn.commit()

# Ensure the stored procedure exists
cursor.execute(
    """
IF OBJECT_ID('dbo.MergeSalesData', 'P') IS NULL
EXEC('
    CREATE PROCEDURE dbo.MergeSalesData AS
    BEGIN
        MERGE INTO dbo.sale AS target
        USING dbo.stg_sale AS source
        ON target.customer_id = source.customer_id 
           AND target.order_id = source.order_id 
           AND target.product_id = source.product_id
        WHEN MATCHED THEN 
            UPDATE SET target.quantity = source.quantity
        WHEN NOT MATCHED THEN 
            INSERT (customer_id, order_id, product_id, quantity)
            VALUES (source.customer_id, source.order_id, source.product_id, source.quantity);
    END;')
"""
)
sql_conn.commit()

# Execute the stored procedure
cursor.execute("EXEC dbo.MergeSalesData")
sql_conn.commit()

# Close the connections
cursor.close()
sql_conn.close()

print("Data processing completed successfully.")

Now you should see an output as

Data processing completed successfully.

Conclusion

By migrating from Azure Data Factory Mapping Data Flow to a DuckDB solution, I achieved substantial performance improvements and cost savings.

If you’re dealing with small to medium-sized JSON payloads and a non-analytical workload, you should consider dropping Spark in favor of DuckDB (or other in-memory processing) for faster, more efficient processing.

Transform Your Data Operations with Schemon

If you’re looking to optimize your data pipelines, reduce processing time, and cut unnecessary costs, Schemon can help you transform your data operations. Whether you’re building new data solutions on any cloud-based data platforms or transforming your existing data solutions, we provide expert guidance tailored to your business needs.

👉 Get in touch with us today to explore how Schemon can streamline your data workflows!

Stay tuned for more! 🚀

];

$date =

;

$category =

,

;

$author =

;

$previous =

;

Discover more from schemon

Subscribe now to keep reading and get access to the full archive.

Continue reading