Spark Connect (Spark 3.4+)#

Spark Connect is a new client-server architecture introduced in Spark 3.4. It enables connecting to remote Spark clusters with a thin client.

Traditional Approach vs Spark Connect#

flowchart TB
    subgraph Traditional["Traditional Approach"]
        App1[Application]
        Driver1[Driver + SparkContext]
        Exec1[Executors]
        App1 --> Driver1
        Driver1 --> Exec1
    end

    subgraph Connect["Spark Connect"]
        App2[Thin Client]
        Server[Spark Connect Server]
        Driver2[Driver]
        Exec2[Executors]
        App2 -->|gRPC| Server
        Server --> Driver2
        Driver2 --> Exec2
    end

Key Differences#

AspectTraditional ApproachSpark Connect
ArchitectureMonolithic (Driver embedded)Client-server separation
Client sizeHundreds of MB (full Spark)Few MB (gRPC client)
Language supportJVM requiredLanguage independent (gRPC)
Resource isolationDriver resources needed on clientResources used only on server
UpgradesClient redeployment requiredOnly server upgrade needed
StabilityJob stops on client crashServer runs independently stable

Spark Connect Advantages#

1. Lightweight Client#

<!-- Traditional approach: spark-core + spark-sql (~200MB) -->
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.13</artifactId>
    <version>3.5.1</version>
</dependency>

<!-- Spark Connect: lightweight client (~10MB) -->
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-connect-client-jvm_2.13</artifactId>
    <version>3.5.1</version>
</dependency>

2. Isolated Execution Environment#

  • Client memory exhaustion doesn’t affect Spark jobs
  • Multiple clients can share the same cluster
  • Server continues work on client failure

3. Language Independence#

  • gRPC-based connectivity from various languages
  • Supports Python, Scala, Java, Go, etc.
  • More languages planned for future support

Server Setup#

Starting Spark Connect Server#

# Start standalone server
./sbin/start-connect-server.sh \
    --packages org.apache.spark:spark-connect_2.13:3.5.1 \
    --conf spark.connect.grpc.binding.port=15002

# Or start with spark-submit
spark-submit \
    --class org.apache.spark.sql.connect.service.SparkConnectServer \
    --packages org.apache.spark:spark-connect_2.13:3.5.1 \
    --conf spark.connect.grpc.binding.port=15002 \
    local:///dev/null

Running Server with Docker#

# docker-compose.yml
version: '3.8'
services:
  spark-connect:
    image: apache/spark:3.5.1
    command: >
      /opt/spark/sbin/start-connect-server.sh
      --packages org.apache.spark:spark-connect_2.13:3.5.1
    ports:
      - "15002:15002"  # gRPC port
      - "4040:4040"    # Spark UI
    environment:
      - SPARK_CONNECT_GRPC_BINDING_PORT=15002
    volumes:
      - ./data:/opt/spark/data

Server Configuration Options#

# spark-defaults.conf
spark.connect.grpc.binding.port=15002
spark.connect.grpc.arrow.maxBatchSize=4194304
spark.connect.grpc.maxInboundMessageSize=134217728
spark.connect.extensions.relation.classes=
spark.connect.extensions.expression.classes=
spark.connect.extensions.command.classes=

Java Client Usage#

Dependency Setup#

// build.gradle.kts
dependencies {
    // Spark Connect client (lightweight)
    implementation("org.apache.spark:spark-connect-client-jvm_2.13:3.5.1")
}

Connection and Usage#

import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import static org.apache.spark.sql.functions.*;

public class SparkConnectExample {
    public static void main(String[] args) {
        // Connect to Spark Connect server
        SparkSession spark = SparkSession.builder()
                .remote("sc://localhost:15002")  // Spark Connect URL
                .build();

        try {
            // Use existing DataFrame API identically
            Dataset<Row> df = spark.read()
                    .option("header", "true")
                    .option("inferSchema", "true")
                    .csv("data/sales.csv");

            Dataset<Row> result = df
                    .filter(col("amount").gt(100))
                    .groupBy("category")
                    .agg(
                        sum("amount").alias("total"),
                        count("*").alias("count")
                    )
                    .orderBy(col("total").desc());

            result.show();

            // Collect results
            List<Row> rows = result.collectAsList();
            for (Row row : rows) {
                System.out.println(row.getString(0) + ": " + row.getDouble(1));
            }

        } finally {
            spark.stop();
        }
    }
}

Connection URL Format#

// Basic connection
SparkSession.builder().remote("sc://hostname:port")

// Token authentication
SparkSession.builder().remote("sc://hostname:port;token=<auth_token>")

// User ID specification
SparkSession.builder().remote("sc://hostname:port;user_id=my_user")

// SSL/TLS
SparkSession.builder().remote("sc://hostname:port;use_ssl=true")

Supported Features#

Fully Supported#

FeatureStatusNotes
DataFrame API✅ Supportedselect, filter, groupBy, etc.
SQL queries✅ Supportedspark.sql()
UDF (User-defined functions)✅ SupportedMust be registered on server
File read/write✅ SupportedCSV, JSON, Parquet
Aggregate functions✅ Supportedsum, avg, count, etc.
Window functions✅ Supportedrank, row_number, etc.
Joins✅ Supportedinner, left, right, etc.

Limitations#

FeatureStatusAlternative
RDD API❌ Not supportedUse DataFrame API
Streaming⚠️ Partial supportExecute on server side
MLlib⚠️ Partial supportBasic features only
GraphX❌ Not supportedUse GraphFrame
Local file access❌ Not supportedServer path or cloud storage

Spring Boot Integration#

package com.example.config;

import org.apache.spark.sql.SparkSession;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
import jakarta.annotation.PreDestroy;

@Configuration
public class SparkConnectConfig {

    @Value("${spark.connect.url:sc://localhost:15002}")
    private String sparkConnectUrl;

    private SparkSession spark;

    @Bean
    @Profile("!local")  // Use in non-local environments
    public SparkSession sparkSession() {
        this.spark = SparkSession.builder()
                .remote(sparkConnectUrl)
                .build();
        return spark;
    }

    @Bean
    @Profile("local")  // For local development
    public SparkSession localSparkSession() {
        this.spark = SparkSession.builder()
                .appName("local-dev")
                .master("local[*]")
                .getOrCreate();
        return spark;
    }

    @PreDestroy
    public void cleanup() {
        if (spark != null) {
            spark.stop();
        }
    }
}
# application.yml
spring:
  profiles:
    active: local

---
spring:
  config:
    activate:
      on-profile: production

spark:
  connect:
    url: sc://spark-connect.internal:15002;token=${SPARK_TOKEN}

Best Practices#

1. Connection Pooling#

@Component
public class SparkConnectionPool {
    private final SparkSession spark;

    public SparkConnectionPool(SparkSession spark) {
        this.spark = spark;
    }

    // SparkSession is thread-safe so it can be shared
    public SparkSession getSession() {
        return spark;
    }
}

2. Large Result Handling#

// ❌ Collect all - memory risk
List<Row> allRows = df.collectAsList();

// ✅ Collect limited results only
List<Row> topRows = df.limit(1000).collectAsList();

// ✅ Save to file on server then download
df.write().parquet("s3://bucket/results/");

3. Error Handling#

import io.grpc.StatusRuntimeException;

try {
    Dataset<Row> result = spark.sql("SELECT * FROM table");
    result.show();
} catch (StatusRuntimeException e) {
    if (e.getStatus().getCode() == Status.Code.UNAVAILABLE) {
        logger.error("Spark Connect server connection failed");
        // Reconnection logic
    } else {
        throw e;
    }
} catch (AnalysisException e) {
    logger.error("SQL analysis error: {}", e.getMessage());
}

Migration Guide#

Migrating from Existing Code to Spark Connect#

// Before: Traditional approach
SparkSession spark = SparkSession.builder()
        .appName("MyApp")
        .master("spark://master:7077")
        .config("spark.executor.memory", "4g")
        .getOrCreate();

// After: Spark Connect
SparkSession spark = SparkSession.builder()
        .remote("sc://spark-connect:15002")
        .build();

// DataFrame API is used identically
Dataset<Row> df = spark.read().parquet("data.parquet");

Considerations#

  1. No local file access: Use cloud storage instead of client local files
  2. RDD API not supported: Convert to DataFrame/Dataset API
  3. UDF server registration: User-defined functions must be pre-registered on server
  4. Network latency: Some overhead from remote connection