Thursday, 24 April 2025

Inside Spark Joins: A Deep Dive into Physical Join Strategies

Joins are central to transforming and analyzing data. In Apache Spark—a distributed data processing engine—the efficiency of a join can vary dramatically based on the physical execution strategy chosen. This blog demystifies Spark's physical join strategies and offers real-world comparisons with SQL Server join techniques to help engineers design better-performing data pipelines.


Understanding Broadcast and Shuffle in Spark

Before diving into the physical join types, it's important to understand the two fundamental mechanisms Spark uses to move and access data:

Broadcast

Broadcasting sends a small table to all worker nodes, avoiding shuffle. Ideal for cases where one dataset is significantly smaller than the other. (Azure Synapse Parallel: In Synapse Dedicated SQL Pools, this is equivalent to using replicated tables to minimize data movement during joins.)

Shuffle

Shuffling is the redistribution of data across nodes based on join keys so that matching keys co-locate on the same partition. This process is expensive due to the network and disk I/O it requires.


Join Strategies in Apache Spark

Spark uses its Catalyst Optimizer to select an appropriate physical join strategy based on dataset size, partitioning, and statistics.

1️⃣ Broadcast Hash Join (BHJ)

A fast, memory-efficient join used when one of the tables is small enough to be broadcast across the cluster. This avoids the need to shuffle the larger table, making execution significantly faster.

When to Use:

  • When one side of the join is small (default: <10MB).

How It Works:

  • Spark broadcasts the smaller dataset to every executor.
  • Each executor builds a hash map from the broadcasted table.
  • The larger dataset is scanned locally against the hash map.

Performance:

  • Fastest join method.
  • Avoids shuffles but relies on memory availability.
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "10MB")
SELECT /*+ BROADCAST(dim_table) */ * FROM fact_table JOIN dim_table ON ...

SQL Server Analogy: Like an indexed nested loop join using an in-memory hash structure.

️2️⃣ Shuffle Hash Join (SHJ)

A suitable join strategy when both datasets are large and unsorted. Spark redistributes data by the join key and builds in-memory hash tables on each partition.

When to Use:

  • Both datasets are large.
  • Sorting is unnecessary or disabled.

How It Works:

  • Spark shuffles both datasets by the join key.
  • On each partition, it builds a hash table on one side and probes it with the other.

Performance:

  • Involves full cluster shuffle.
  • Risk of out-of-memory errors if hash tables grow too large.

SQL Server Analogy: Resembles a standard hash join but distributed.

3️⃣ Sort-Merge Join (SMJ)

An efficient join for large datasets that can be sorted. It relies on sorted inputs and merges them like a traditional merge-sort algorithm, making it suitable for massive volumes.

When to Use:

  • Both sides of the join are large and sortable.

How It Works:

  • Spark shuffles and sorts both sides by the join key.
  • Sorted datasets are merged similarly to merge-sort.

Performance:

  • CPU intensive due to sort operation.
  • More stable than SHJ for large datasets.
spark.conf.set("spark.sql.join.preferSortMergeJoin", true)

SQL Server Analogy: Like a merge join on sorted indexes.

4️⃣ Cartesian Join

A full cross-product join with no join condition provided. Every row from the first table is paired with every row from the second, generating a large result set.

When to Use:

  • When no join condition is specified.

How It Works:

  • Every row of the first dataset is joined with every row of the second.

Performance:

  • Extremely costly (O(n*m) complexity).
  • Use with caution.

SQL Server Analogy: Equivalent to CROSS JOIN.

 

Real-World Join Scenarios

  • BHJ: Lookup product details (small) for each sales transaction (large).

  • SHJ: Joining large raw logs from different systems without indexes.

  • SMJ: Merging time-series data from two high-volume sources.

  • Cartesian Join: Creating all pair combinations for recommender systems.


Common Developer Pitfalls

  • Skewed keys causing large shuffles → Use .repartition() or salting.

  • Unexpected Cartesian joins due to missing conditions → Always validate join keys.

  • Broadcast tables exceeding memory → Monitor sizes, set thresholds.

  • Poor stats lead to bad plans → Use ANALYZE TABLE regularly.


Delta Lake Join Optimizations

Delta Lake enhances Spark joins with features like:

  • Z-Ordering to cluster data for efficient filtering/joining:

OPTIMIZE sales ZORDER BY (customer_id)
  • OPTIMIZE: Compacts small files to reduce overhead.

  • VACUUM: Removes obsolete data:

VACUUM my_table RETAIN 168 HOURS;
  • Data Skipping: Automatically skips files that don’t match the filter/join key.


The Importance of Statistics

The Catalyst Optimizer uses table statistics to choose the best join plan.

Why it Matters:

  • Guides decisions like broadcast vs. shuffle.

  • Poor stats can result in inefficient execution.

ANALYZE TABLE my_table COMPUTE STATISTICS;
spark.conf.set("spark.sql.cbo.enabled", "true")


Optimization Tips

  • Use EXPLAIN to check your physical plan.

  • Reduce data skew with partitioning or salting.

  • Align partitions using .repartition().

  • Monitor joins using Spark UI to detect excessive shuffles.

Inside Spark Joins: A Deep Dive


Spark vs SQL Server – Reference Table

ConceptSQL ServerApache Spark
Indexed JoinNested LoopBroadcast Hash Join
Hash JoinHash JoinShuffle Hash Join
Sorted JoinMerge JoinSort-Merge Join
Cross ProductCartesian JoinCartesian Join
Strategy ControlOptimizer + StatsCatalyst + CBO + Hints


Conclusion

Choosing the right join in Apache Spark can drastically affect performance. By understanding how joins are executed and how Spark makes optimization decisions, developers can take control over performance-sensitive operations. Whether you come from a SQL Server background or are optimizing Spark pipelines directly, the key is in understanding your data and the available strategies.

Stay tuned for upcoming examples and walkthroughs of join tuning in Spark production environments!

Tuesday, 15 April 2025

From Synapse to Fabric: Embracing Cost Flexibility with Autoscale Billing for Spark

 

As a Principal Architect evaluating enterprise data platforms, one of the most appreciated features for customers using Azure Synapse Analytics has been the flexibility and cost efficiency of its Spark pools. With dynamic scaling and a true pay-as-you-go billing model, Synapse Spark enabled workloads to spin up only when needed—without any ongoing costs during idle periods.

This model provided the ideal balance for teams running bursty, exploratory, or scheduled Spark workloads—especially where usage patterns were unpredictable or seasonal.

However, when assessing a transition to Microsoft Fabric, many Synapse customers quickly ran into a cost-related roadblock: Fabric’s Capacity Unit (CU) model required reserving resources upfront, introducing a fixed cost—even if Spark was used occasionally. This shift challenged the financial efficiency Synapse Spark users had come to rely on.

At a recent Big Data conference, discussions with the Microsoft Fabric product team revealed that, at that time, managing costs in Fabric required implementing external processes to scale up and down or pause the Fabric Capacity Units (CUs) when not in use. While this approach could mitigate some costs, it introduced additional overhead and administrative complexity, making it less than ideal for certain operational models.


Comparative Analysis: Synapse Spark Pools vs. Fabric Shared Capacity



The Turning Point: Autoscale Billing for Spark

The introduction of Autoscale Billing for Spark in Microsoft Fabric marked a significant advancement. This feature reintroduced the flexibility found in Synapse by allowing Spark jobs to run on dedicated, serverless resources, billed independently from Fabric capacity. It effectively brought back the pay-as-you-go model, enabling dynamic scaling of Spark workloads without the constraints of reserved capacity.

Key Benefits:

  • Cost Efficiency: Pay only for the compute used during Spark job execution, eliminating idle costs.

  • Independent Scaling: Spark workloads scale separately from other Fabric services, ensuring optimal performance.

  • Resource Isolation: Dedicated serverless resources prevent resource contention with other workloads.

  • Quota Management: Set maximum CU limits to control budget and resource allocation.

This model aligns perfectly with our operational patterns, allowing us to run ad-hoc and bursty Spark jobs without overcommitting resources.


Implementing Autoscale Billing: A Step-by-Step Guide

Enabling Autoscale Billing for Spark in Microsoft Fabric is straightforward:

  1. Navigate to the Microsoft Fabric Admin Portal.

  2. Under Capacity settings, select your desired capacity.

  3. In the Autoscale Billing for Fabric Spark section, enable the toggle.

  4. Set the Maximum Capacity Units (CU) limit according to your requirements.

  5. Click Save to apply the settings.

Note: Enabling or adjusting Autoscale Billing settings will cancel all active Spark jobs running under Autoscale Billing to prevent billing overlaps.

Monitoring and Cost Management

Post-implementation, we utilized Azure's Cost Management tools to monitor compute usage effectively:

  • Access the Azure portal and navigate to Cost Analysis.

  • Filter by the meter "Autoscale for Spark Capacity Usage CU" to view real-time compute spend for Spark workloads.

This transparency allowed us to track expenses accurately and adjust our strategies as needed.

The introduction of Autoscale Billing for Spark in Microsoft Fabric addresses a critical concern for Synapse Spark customers—maintaining cost flexibility while transitioning to a modern, unified analytics platform. By allowing Spark jobs to run on dedicated serverless compute, billed independently from reserved Fabric capacity, it brings back the on-demand model that many teams have relied on for years.

This feature, currently in Preview, represents a major step forward in making Microsoft Fabric more accessible and cost-efficient for diverse Spark workloads. I’m looking forward to seeing this capability move into General Availability soon, unlocking its full potential for broader adoption in production-grade environments.

For a detailed walkthrough on configuring Autoscale Billing for Spark, refer to the official documentation here.

Tuesday, 21 February 2023

Powershell to Restore Azure SQL database PITR backup

Today there seems to be a bug on Portal resource Manager API not allowing any SQL databases pitr restores from portal and the only way to do it at this point is using scripts, The below script is an example to restore the database on the same server with Feb 20th 3 PM backup

Copyright © 2023 Vinoth N Manoharan.The information provided in this post is provided "as is" with no implied warranties or guarantees.

Power BI DAX for Start of Year To End of Prior Month Sales

We had a requirement for Power BI Measure Start of the year To End of Prior Month Sales sum for one of the report which was failed for January as the original Measure was just query Previous Month and going to a negative Data Diff value obviously for January Eg:DATEDIFF(1jan2023,31DEC2022). To avoid this we had to introduce a check on the measure if the month is january then we need SUM of Sales of January rather than previous Month. I used the below logic to solve this issue
Copyright © 2023 Vinoth N Manoharan.The information provided in this post is provided "as is" with no implied warranties or guarantees.

Bug In Azure API creating Data Factory Default AutoResolveIntegrationRuntime on Managed Virtual Network

Recently our terraform Infrastructure as code effort to create a datafactory resource with default AutoResolveIntegrationRuntime in Managed Virtual Network failed to create one.

We had managed_virtual_network_enabled flag on terraform and tried to use native azure cli too as a backup but both failed to create the default integration runtime on Managed Virtual Network and created a default public integration runtime. Only work around it was to create a Data Factory using ARM Template. I exported the ARM template and created the below Powershell to create the Data factory. I stored the Template and parameter file on a Fileshare and used it in the powershell to reuse and create a parameter file each time for different DF creation.
Copyright © 2023 Vinoth N Manoharan.The information provided in this post is provided "as is" with no implied warranties or guarantees.

Powershell Azure Devops REST API to create/update a variable group based on json variable input

 I had a requirement to automate creation and updating of Azure devops variable group based on a configuration variable json file proved to me by the application team for CICD process, the below script will look for the variable group if it exists with the project mentioned if not it will create a new variable group, If the variable group exists it will update the variable group with json provided. The idea is to maintain the json file on git for version and see the history of the changes on the variable group variables provided to the pipeline.

The below is the format of the Variable json passed on to the script

Copyright © 2023 Vinoth N Manoharan.The information provided in this post is provided "as is" with no implied warranties or guarantees.

Thursday, 3 November 2022

Work with Azure SQL with Python import result to pandas dataframe

I am going start a series of blogs working with python on Azure SQL, what we can do and limitations, In this first blog we look look into connecting to a database and importing a sql result to pandas dataframe. In the below example we can see we have imported the data and printed the last 10 rows and I have attached the screen shot of the result. In the future blogs we can se how we can work in pandas with the data.