📦Migrating from Distributed data pipelines to Microservices for small to medium sized data processing
A Cost-Effective and Performance-Driven Approach
TL;DR
For organizations managing data transformation pipelines, achieving 95% faster processing, lower cloud costs, and a simplified, maintainable architecture is a game-changer. In this project, we optimized a Spark-based Azure Data Factory Mapping Data Flow pipeline, replacing it with DuckDB (a high-performance in-process SQL engine), leading to significant cost savings and better performance.
By moving away from distributed computing for a small-to-medium JSON workload, we reduced execution time from 3 minutes to just 1-2 seconds, while also making the system more cost-efficient and adaptable to future needs.
Key takeaways:
- Why change? Spark-based Data Flow was overkill for this use case, leading to unnecessary expenses and performance inefficiencies.
- What’s different? We moved transformation logic to SQL Server and DuckDB, eliminating the need for external distributed compute resources.
- The result? A leaner, faster, and more cost-effective data pipeline that’s easier to maintain.
For engineers, this case study highlights the importance of choosing the right tools for the job. If your workload doesn’t require distributed computing, reconsidering Data Flow could lead to substantial performance and cost improvements. 🚀
| Benefit Category | Azure Data Flow | DuckDB + Stored Proc |
|---|---|---|
| Execution Time | ~3 minutes | 1-2 seconds |
| Cost Efficiency | High compute cost | Lower compute cost |
| Maintainability | Complex pipeline | Simpler, more transparent |
| Scalability | Autoscaling | Limited to the data or hardware sizes |
| Processing Flexibility | Limited to Data Flow capabilities | SQL + Python flexibility |
A Closer Look at Migration
When I started working on migrating an Azure Data Factory Mapping Data Flow pipeline, the first issue I noticed was the long execution time. The pipeline was responsible for ingesting external API data from a vendor and loading it into an Azure MS SQL database. Since it ran once per day, overnight, and the JSON payload was already well-compressed, it became clear that Data Flow might not be the best fit for this use case.
The Inefficiencies
The pipeline had two main steps: flattening and exploding JSON data from the API and filtering records based on a CSV file stored in Azure Data Lake. While these transformations were necessary, they were being done in Data Flow, which introduced performance and cost inefficiencies.
The main issue was that the pipeline read data from SQL Server, processed it externally in Data Flow, and then wrote it back to SQL Server. Given that both the comparison dataset and the final destination table were already in SQL Server, it made much more sense to handle those operations inside SQL Server itself instead of using an external processing engine.
Another bottleneck was JSON processing. Initially, I attempted to flatten and explode JSON using Pandas, but performance was a major issue. That’s when I turned to DuckDB.
The Optimized Approach
Instead of using Data Flow for transformations, I moved filtering and merging operations directly into SQL Server. Since this wasn’t an analytical workload, avoiding Spark-based distributed processing was the right move. SQL Server can efficiently handle filtering and merging without unnecessary data movement.
For JSON processing, DuckDB turned out to be a game-changer. Unlike Pandas, which struggled with performance, DuckDB handled JSON unnesting operations quickly and efficiently using SQL. This allowed me to flatten and explode the necessary fields seamlessly, making the process both faster and more transparent.
The Results
By switching from Data Flow to SQL Server and DuckDB, I reduced execution time from 3 minutes to just 1-2 seconds. The new approach eliminated unnecessary compute costs, significantly improved performance, and simplified maintenance. Without the complexity of Data Flow, the new pipeline was easier to debug and maintain.
This migration reinforced a key takeaway: just because a service exists doesn’t mean it’s the right tool for every job. In cases like this, where the workload is relatively small and SQL Server can handle transformations efficiently, eliminating Spark-based distributed processing can lead to huge performance and cost benefits.
Code
Checkout the sample code from our git repo – https://github.com/Schemon-Inc/blogs/blob/master/duckdb/duckdb_use_case1.py
Step 1: Create a sample JSON data to mimic vendor API response
sample_json = {
"customer_id": 123,
"orders": [
{
"order_id": "A001",
"items": [
{"product_id": "P1001", "quantity": 2},
{"product_id": "P1002", "quantity": 1},
],
},
{
"order_id": "A002",
"items": [
{"product_id": "P1003", "quantity": 5},
{"product_id": "P1004", "quantity": 3},
],
},
],
}
Step 2: Convert JSON to DataFrame format
It adds a new root level key “json_data_col” as only column to the DataFrame for demonstration purposes.
In real-world scenarios, json_normalize() will parse the possible columns automatically.
json_df = pd.json_normalize({"json_data_col": sample_json}, max_level=0)
Step 3: Initialize DuckDB and register JSON Data as json_table
# ===========================
# Step 3: Initialize DuckDB and register JSON Data as json_table
# ===========================
conn = duckdb.connect(database=":memory:")
conn.register("json_table", json_df)
Step 4: Flattening and Exploding JSON Using DuckDB
DuckDB’s SQL syntax makes it incredibly simple to flatten and explode JSON data.
UNNEST() function explodes the list and generates unnest column to use to extract the keys.
# ===========================
# Step 4: Flattening and Exploding JSON Using DuckDB
# ===========================
query = """
SELECT
json_data_col.customer_id as customer_id
,order_exploded.unnest.order_id as order_id
,item_exploded.unnest.product_id as product_id
,item_exploded.unnest.quantity as quantity
FROM json_table,
LATERAL UNNEST(json_data_col.orders) AS order_exploded,
LATERAL UNNEST(order_exploded.unnest.items) AS item_exploded
"""
result_df = conn.execute(query).fetchdf()
print(result_df)
Now we have output as below
customer_id order_id product_id quantity
0 123 A001 P1001 2
1 123 A001 P1002 1
2 123 A002 P1003 5
3 123 A002 P1004 3
Step 5: Load Data into SQL Server Staging Table for Further Processing
# ===========================
# Step 5: Load Data into SQL Server Staging Table for further processing
# ===========================
# Connect to SQL Server
conn_str = (
f"DRIVER={DRIVER};SERVER={SERVER};DATABASE={DATABASE};UID={USERNAME};PWD={PASSWORD}"
)
sql_conn = pyodbc.connect(conn_str)
cursor = sql_conn.cursor()
# Ensure staging table exists
cursor.execute(
"""
IF OBJECT_ID('dbo.stg_sale', 'U') IS NULL
CREATE TABLE dbo.stg_sale (
customer_id INT,
order_id NVARCHAR(50),
product_id NVARCHAR(50),
quantity INT
);
"""
)
sql_conn.commit()
# Truncate the staging table before loading
cursor.execute("TRUNCATE TABLE dbo.stg_sale")
sql_conn.commit()
# Insert data into staging table
for _, row in result_df.iterrows():
cursor.execute(
"""
INSERT INTO dbo.stg_sale (customer_id, order_id, product_id, quantity)
VALUES (?, ?, ?, ?)
""",
row["customer_id"],
row["order_id"],
row["product_id"],
row["quantity"],
)
sql_conn.commit()
Step 6: Process Data into Final Table using Stored Procedure
# ===========================
# Step 6: Process Data into Final Table using Stored Procedure
# ===========================
# Ensure final table exists
cursor.execute(
"""
IF OBJECT_ID('dbo.sale', 'U') IS NULL
CREATE TABLE dbo.sale (
customer_id INT NOT NULL,
order_id NVARCHAR(50) NOT NULL,
product_id NVARCHAR(50) NOT NULL,
quantity INT NOT NULL,
PRIMARY KEY (customer_id, order_id, product_id)
);
"""
)
sql_conn.commit()
# Ensure the stored procedure exists
cursor.execute(
"""
IF OBJECT_ID('dbo.MergeSalesData', 'P') IS NULL
EXEC('
CREATE PROCEDURE dbo.MergeSalesData AS
BEGIN
MERGE INTO dbo.sale AS target
USING dbo.stg_sale AS source
ON target.customer_id = source.customer_id
AND target.order_id = source.order_id
AND target.product_id = source.product_id
WHEN MATCHED THEN
UPDATE SET target.quantity = source.quantity
WHEN NOT MATCHED THEN
INSERT (customer_id, order_id, product_id, quantity)
VALUES (source.customer_id, source.order_id, source.product_id, source.quantity);
END;')
"""
)
sql_conn.commit()
# Execute the stored procedure
cursor.execute("EXEC dbo.MergeSalesData")
sql_conn.commit()
# Close the connections
cursor.close()
sql_conn.close()
print("Data processing completed successfully.")
Now you should see an output as
Data processing completed successfully.
Conclusion
By migrating from Azure Data Factory Mapping Data Flow to a DuckDB solution, I achieved substantial performance improvements and cost savings.
If you’re dealing with small to medium-sized JSON payloads and a non-analytical workload, you should consider dropping Spark in favor of DuckDB (or other in-memory processing) for faster, more efficient processing.
Transform Your Data Operations with Schemon
If you’re looking to optimize your data pipelines, reduce processing time, and cut unnecessary costs, Schemon can help you transform your data operations. Whether you’re building new data solutions on any cloud-based data platforms or transforming your existing data solutions, we provide expert guidance tailored to your business needs.
👉 Get in touch with us today to explore how Schemon can streamline your data workflows!
Stay tuned for more! 🚀