⚠ This page is served via a proxy. Original site: https://github.com
This service does not collect credentials or authentication data.
Skip to content

Conversation

@Shekharrajak
Copy link
Member

@Shekharrajak Shekharrajak commented Nov 17, 2025

@coveralls
Copy link

coveralls commented Nov 17, 2025

Pull Request Test Coverage Report for Build 20839855538

Details

  • 0 of 0 changed or added relevant lines in 0 files are covered.
  • No unchanged relevant lines lost coverage.
  • Overall coverage remained the same at 66.607%

Totals Coverage Status
Change from base Build 20780172530: 0.0%
Covered Lines: 2599
Relevant Lines: 3902

💛 - Coveralls

# Custom backend implementation
from kubeflow.spark.backends.base import SparkBackend

class CustomBackend(SparkBackend):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Users can extend the backend, if they want to have any specific changes or different way to connect or submit spark job

Copy link
Member

@andreyvelich andreyvelich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this great effort @Shekharrajak!
I left my initial thoughts.

### Goals

- Design a unified, Pythonic SDK for managing Spark applications on Kubernetes
- Support multiple backends (Kubernetes Operator, REST Gateway, Spark Connect) following the Trainer pattern
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For Trainer, backends represent various job submission (local subprocess, container, and Kubernetes). I am not sure if we can replicate it for Spark.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we have job submission using K8S Operator backend, Spark Connector backend , Gateway backend (not implemented - we just have abstract class).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering what is the main motivation to separate SessionClient() and BatchClient()?
Alternatively, we can just have unified SparkClient() which has sessions and batch APIs:

submit_job() <-- to create Spark Application and submit batch job
connect() <-- to create session and connect to existing Spark cluster

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • BatchSparkClient users never see connect(), create_session() methods
  • SparkSessionClient users never see submit_application(), wait_for_job_status() methods
  • This prevents runtime errors: Can't call wait_for_job_status() on a session object.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This helps user to know which methods are available clearly :

 client = SparkClient()
  # User sees BOTH batch AND session methods
  client.submit_job(...)        # For batch
  client.connect(...)           # For session
  client.get_job(...)           # Works with connect() or batch()  ? 

 # event if we take arg in config: 
  client = SparkClient(mode="batch")
  client.create_session(...)  # IDE will not show error, but runtime error 

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we know how the Spark session management will work with the new CRD: SparkCluster proposed by @ChenYi015 here: kubeflow/spark-operator#2744 ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we want to create the spark cluster through the SDK then we need API like :

 from kubeflow.spark import SparkClusterManager

 manager = SparkClusterManager()

 # Create persistent cluster
 cluster = manager.create_cluster(
     name="prod",
     spark_version="4.0.1",
     init_workers=5,
     min_workers=2,
     max_workers=10,
     enable_hpa=True,
 )

# Cluster url to use for spark job submission: cluster.master_url

Comment on lines 67 to 74
┌───────────┴─────────────┬──────────────────┬────────────────┐
▼ ▼ ▼ ▼
┌──────────────────┐ ┌──────────────────┐ ┌──────────────────┐ ┌──────────────────┐
│OperatorBackend │ │ GatewayBackend │ │ ConnectBackend │ │ LocalBackend │
│(Spark Operator │ │ (REST Gateway) │ │ (Spark Connect/ │ │ (Future) │
│ CRDs on K8s) │ │ │ │ Interactive) │ │ │
└──────────────────┘ └──────────────────┘ └──────────────────┘ └──────────────────┘
Batch Jobs Batch Jobs Sessions Local Dev
Copy link
Member

@andreyvelich andreyvelich Nov 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain reason of creating various backends? Can we just have an API: SparkClient().connect() which creates SparkConnect CR and connects to the existing cluster as we discussed?

Copy link
Member Author

@Shekharrajak Shekharrajak Nov 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The batch backend will have APIs like submit_application, wait_for_completion, get_logs, .. where user will just submit the job and can check the logs/results.
example: https://github.com/kubeflow/sdk/pull/158/files#diff-e692a5819ee6b1dc00cba3b58e91f058c0022d3ca9aa6f3ee468126f245eef89R89

But with interactive session spark client user will be able to run interactive SQL queries and DataFrame operations.
example: https://github.com/kubeflow/sdk/pull/158/files#diff-a5011f48c9d6d16ff6ddd65588f65a7c78abf5fbeb121cccb693c0892ce3a5aeR275

# Submit a Spark application
response = client.submit_application(
app_name="spark-pi",
main_application_file="local:///opt/spark/examples/src/main/python/pi.py",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering if there is a way to allow users bypass function to SparkApplication similar to Trainer API: https://github.com/kubeflow/sdk?tab=readme-ov-file#run-your-first-pytorch-distributed-job

That might be interesting to explore how we can allow to submit SparkApplication without building an image.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we thinking an API like this ?

def my_job(input_path: str, output_path: str):                                                              
    from pyspark.sql import SparkSession
    spark = SparkSession.builder.getOrCreate()                                                         
    df = spark.read.csv(input_path)                                                                    
    df.groupBy("col").count().write.parquet(output_path)                                               
    spark.stop()                                                                                       
                                                                                                                    
client.submit_function(                                                                                
    func=my_job,                                                                                       
    func_args={"input_path": "s3://in", "output_path": "s3://out"}                                     
) 

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that might be interesting to explore.
We can also extend submit_job() API to accept main_application_file or function.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me add to KEP and will work on it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spark requires pre-allocated cluster resources (driver/executor specifications), so I could not find such way for running spark job. But we can explore in next version.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vara-bonthu @ChenYi015 @nabuskey Do you have any ideas on how we can dynamically inject the users' code into Spark driver without re-building an image or put Python file into Pod's filesystem?

I have one idea if we have can leverage PodTemplate spec.
Let's say we define InitContainer that simply creates file in the Pod's filesystem with the Spark code in the pre-defined location (e.g. local:///opt/spark/spark_job.py). This initContainer can share emptyDir with the driver container, so driver container can see the file. To create the file, we can use the same script as for Trainer:

container:
  command:
    - bash
    - -c
    - |
      read -r -d '' SCRIPT << EOM
      def my_job(input_path: str, output_path: str):                                                           
          from pyspark.sql import SparkSession
          spark = SparkSession.builder.getOrCreate()                                                         
          df = spark.read.csv(input_path)                                                                    
          df.groupBy("col").count().write.parquet(output_path)                                               
          spark.stop()    

      EOM
      printf "%s" "$SCRIPT" > "/opt/spark/spark_job.py"

WDYT ?

cc @shravan-achar @akshaychitneni @bigsur0 for feedback on this.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@andreyvelich

Sorry to interrupt. I implemented a decorator that uses inspect to get the function's body and form an executable file. It also parses the function's signature into submit parameters (they must be JSON-serializable). I could implement this if you're interested.

Example

@spark_submit
def my_task(first, second):
    ...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aagumin Yeah, this looks similar to how we do this in TrainerClient(): https://github.com/kubeflow/sdk/blob/main/kubeflow/trainer/backends/kubernetes/utils.py#L297-L301
I think, having the consistent UX across Data Processing (e.g. SparkJobs) and Training (e.g. TrainJob) workloads would be really cool.
As long as SparkApplication CRD can support initContainer via PodTemplate, that should be relatively easy to implement.

cc @franciscojavierarceo @kubeflow/kubeflow-sdk-team

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Comment on lines 476 to 488
# Step 1: Interactive development with ConnectBackend
connect_config = ConnectBackendConfig(connect_url="sc://dev-cluster:15002")
dev_client = SparkClient(backend_config=connect_config)

with dev_client.create_session("dev") as session:
# Test and validate query
test_df = session.sql("SELECT * FROM data LIMIT 1000")
test_df.show()
# Iterate and refine...

# Step 2: Production batch job with OperatorBackend
prod_config = OperatorBackendConfig(namespace="production")
prod_client = SparkClient(backend_config=prod_config)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is pretty interesting experience for dev -> prod Spark lifecycle.
cc @shravan-achar @akshaychitneni @bigsur0 to explore.

)
```

#### Integration with Pipelines
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @kubeflow/wg-pipeline-leads to explore

Comment on lines 295 to 302
config = ConnectBackendConfig(
connect_url="sc://spark-cluster:15002",
use_ssl=True,
)

# Create client and session
client = SparkSessionClient(backend_config=config)
session = client.create_session(app_name="analysis")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How will this be better than a native SparkConnect call? I can see that you can’t replace a Connect client with a regular one without changing interfaces, considering that the options for configuring a Connect session are significantly more limited.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will have more kubeflow compatible APIs and infra.


Implements the backend for managed Spark gateways:

- Submits applications via REST endpoints
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a PR is added for gRPC submit support and the ability to implement a custom submit mechanism, then this becomes relevant.

Copy link
Member

@andreyvelich andreyvelich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update @Shekharrajak!
I left a few comments.

Comment on lines 86 to 87
client = (
SparkClient.builder()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the idea of builder(), but shall this comes in the 2nd phase?
We have similar ideas with @kubeflow/kubeflow-sdk-team for TrainerClient and OptimizerClient, so maybe we can unify the experience?

Initially, we can just allow users to specify minimal number of parameters in the connect() API.

But I am happy to debate it during our next SDK call on Dec 17th: https://docs.google.com/document/d/1jH2WAX2ePxOfI4JuiVK9nPlesDMiyg67xzLwhpR7wTQ/edit?tab=t.0#heading=h.jarjwwjkqmul

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, I'd like us to start with constructor and config objects aligning with existing clients and later expand it with builder approach

Comment on lines 113 to 115
```python
class SparkClientBuilder:
"""Builder for SparkClient configuration."""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will SparkApplication and SparkConnect CRD always have the same APIs? I am wondering if we can re-use the same builder for them.
cc @ChenYi015

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would be internal changes, we can take care of it later as well.

Comment on lines 117 to 119
# Kubernetes settings
def namespace(self, ns: str) -> "SparkClientBuilder":
"""Set Kubernetes namespace."""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think, namespace should be part of KubernetesBackendConfig in SparkClient to align with other clients: https://github.com/kubeflow/sdk/blob/main/kubeflow/trainer/backends/kubernetes/backend.py#L42-L43

Comment on lines 541 to 813
from kfp import dsl
from kubeflow.spark.pipelines import SparkJobOp

@dsl.pipeline(name="ml-pipeline")
def ml_pipeline():
# Spark ETL step
etl = SparkJobOp(
name="feature-etl",
main_file="s3a://ml/etl.py",
executor_instances=20,
executor_memory="8g",
)

# Training step depends on ETL completion
train = TrainOp(
dataset_path=etl.outputs["output_path"],
)
train.after(etl)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should whether we need to move it under: kubeflow.pipelines.components.
We can also leverage KFP lightweight Python tasks to submit such pipelines.

cc @franciscojavierarceo @kubeflow/wg-pipeline-leads

@juliusvonkohout
Copy link
Member

I am missing a bit the CPU and memory requests vs Limit . I only see

.driver(cores=2, memory="4g")
.executor(cores=4, memory="8g")

which is somehow mixing up requests with limits. Furthermore we should make sure that it also works with the integrated seaweedfs S3.


client = (
SparkClient.builder()
.namespace("spark-jobs")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this integrate well with our current multi-tenancy architecture ? https://github.com/kubeflow/manifests#architecture

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So if i execute this from a jupyterlab will it just use the current namespace?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes

Comment on lines 124 to 130
# Resource configuration
def driver(self, cores: int = 1, memory: str = "1g") -> "SparkClientBuilder":
"""Configure driver resources."""

def executor(self, cores: int = 1, memory: str = "1g") -> "SparkClientBuilder":
"""Configure executor resources."""

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am missing a bit the CPU and memory requests vs Limit . I only see

.driver(cores=2, memory="4g")
.executor(cores=4, memory="8g")

which is somehow mixing up requests with limits. Furthermore we should make sure that it also works with the integrated seaweedfs S3.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing it out. There are lot of conf is spark so anyways we need wider key value pair set.

Copy link
Contributor

@kramaranya kramaranya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @Shekharrajak
I left a few comments

Comment on lines 86 to 87
client = (
SparkClient.builder()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, I'd like us to start with constructor and config objects aligning with existing clients and later expand it with builder approach

Comment on lines 449 to 460
SparkClient
├── session() ──► creates Spark Connect server (if needed)
│ │
│ ▼
│ SparkApplication CRD
│ │
│ ▼
│ Spark Driver Pod
│ (with Spark Connect enabled)
│ │
└── connect() ──────► sc://server:15002 ──► SparkSession
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So how batch jobs differ from interactive session?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Batch (SparkApplication CRD) : Run a predefined Spark application to completion (ETL pipelines, ML training, data processing)

Interactive Sessions ((SparkConnect CRD)): Long-running server that accepts ad-hoc queries from clients (notebooks, BI tools, data exploration)

Comment on lines 190 to 195
def session(self, app_name: Optional[str] = None) -> SparkSession:
"""Get or create SparkSession. Creates Spark Connect server if needed."""

@property
def spark(self) -> SparkSession:
"""Shortcut to session()."""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm currently SparkClient is stateful and it remembers the session, so client.session() stores a reference, and then client.stop() knows which session to stop. But batch jobs work differently since they're managed by name client.delete_job(name) and the client doesn't track them
This means that you can only have one session per client, right? and client.stop() is confusing when you have both sessions and jobs running

What do you think if we make the client stateless like TrainerClient? So create_session() would return a session object that manages itself (so you can session.stop()) and submit_job() returns a job name that you manage through the client:

session = client.create_session()
session.stop()

job_name = client.submit_job(...)
client.delete_job(job_name)

wdyt @Shekharrajak @andreyvelich ?

Copilot AI review requested due to automatic review settings December 30, 2025 13:21
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces KEP-107, a comprehensive enhancement proposal for a Spark Client SDK for Kubeflow. The proposal outlines a Python SDK that simplifies running Spark on Kubernetes by providing auto-provisioning of Spark Connect servers, connection management to existing servers, and batch job submission capabilities.

Key Changes:

  • Introduces a new SparkClient API with interactive session management (create_session(), connect()) and batch job support (submit_job())
  • Defines a builder pattern for advanced configuration with resource tuning, volumes, and custom Spark settings
  • Proposes a pluggable backend architecture supporting KubernetesBackend, GatewayBackend, and LivyBackend
  • Outlines integration points with Kubeflow Pipelines, Trainer, and Spark History MCP Server

from kubeflow.spark import SparkClient

# Connect to existing Spark Connect server - returns SparkSession directly
spark = SparkClient.connect("sc://spark-server:15002")
Copy link

Copilot AI Dec 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The documentation shows "s3a://bucket/data/" but this requires proper S3A configuration which isn't shown in the basic example. Consider adding a comment or note that S3 access requires appropriate Spark configuration (e.g., AWS credentials, endpoint configuration) to avoid confusion for users.

Suggested change
spark = SparkClient.connect("sc://spark-server:15002")
spark = SparkClient.connect("sc://spark-server:15002")
# Note: Accessing S3 using s3a:// requires appropriate Spark/Hadoop S3A configuration
# (e.g., AWS credentials, endpoint configuration) to be set up in your environment.

Copilot uses AI. Check for mistakes.
Comment on lines +248 to +402
def submit_job(
self,
func: Optional[Callable[[SparkSession], Any]] = None,
func_args: Optional[Dict[str, Any]] = None,
main_file: Optional[str] = None,
main_class: Optional[str] = None,
arguments: Optional[List[str]] = None,
name: Optional[str] = None,
) -> str:
"""Submit a batch Spark job.
Supports two modes based on parameters:
- Function mode: Pass `func` to submit a Python function with Spark transformations
- File mode: Pass `main_file` to submit an existing Python/Jar file
Args:
func: Python function that receives SparkSession (function mode).
func_args: Arguments to pass to the function.
main_file: Path to Python/Jar file (file mode).
main_class: Main class for Jar files.
arguments: Command-line arguments for the job.
name: Optional job name.
Returns:
The job name (string) for tracking.
Raises:
ValueError: If neither `func` nor `main_file` is provided, or both are provided.
"""
Copy link

Copilot AI Dec 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The submit_job method documentation states it "Raises: ValueError: If neither func nor main_file is provided, or both are provided" but this validation logic is not clear in the method signature. Consider adding a note in the docstring about mutual exclusivity being enforced at runtime, or better yet, use separate methods (submit_job_file and submit_job_func) to make the API more explicit and type-safe.

Copilot uses AI. Check for mistakes.

## Dependencies

- `pyspark>=3.4.0` (Spark Connect support)
Copy link

Copilot AI Dec 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Incomplete documentation: The "Dependencies" section lists pyspark>=3.4.0 but the examples throughout the document reference Spark 3.5.0 (e.g., line 103, 157). Consider clarifying whether 3.4.0 is the minimum required version while 3.5.0 is recommended, or update the version constraint to match the examples.

Suggested change
- `pyspark>=3.4.0` (Spark Connect support)
- `pyspark>=3.4.0` (minimum for Spark Connect; examples use Spark 3.5.0+)

Copilot uses AI. Check for mistakes.
) -> SparkJob:
"""Wait for a job to reach desired status."""

def delete_job(self, name: str) -> None:
Copy link

Copilot AI Dec 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing return type annotation. The method should include -> None to be consistent with the project's type hinting requirements.

Copilot uses AI. Check for mistakes.
Comment on lines +168 to +269
"""Add a volume (for driver and executors)."""

Copy link

Copilot AI Dec 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The volume method uses **spec to accept arbitrary keyword arguments, but this API design is unclear. Consider specifying the volume spec structure more explicitly (e.g., using a TypedDict or Pydantic model) or documenting the expected keys in the docstring to improve API clarity and type safety.

Suggested change
"""Add a volume (for driver and executors)."""
"""Add a volume (for driver and executors).
The ``spec`` argument describes the underlying Kubernetes volume and
mount configuration. Common keys include:
- ``type``: Volume type, e.g. ``"config_map"``, ``"secret"``,
``"persistent_volume_claim"``.
- ``config_map``: Name of the ConfigMap (when ``type="config_map"``).
- ``secret``: Name of the Secret (when ``type="secret"``).
- ``persistent_volume_claim``: Name of the PVC
(when ``type="persistent_volume_claim"``).
- ``sub_path``: Optional sub-path within the volume to mount.
- ``read_only``: Optional ``bool`` indicating if the mount is read-only.
Implementations should translate these keys into the appropriate
Kubernetes ``volume`` and ``volumeMount`` fields for both driver and
executor pods.
"""

Copilot uses AI. Check for mistakes.
follow: Whether to stream logs in realtime.
"""

def delete_session(self, name: str) -> None:
Copy link

Copilot AI Dec 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing return type annotation in the API signature. The method should include -> None to maintain consistency with the coding standards requiring type hints and return types for all functions.

Copilot uses AI. Check for mistakes.
def connect(cls, url: str, token: str = None, use_ssl: bool = False) -> SparkSession:
"""Connect to an existing Spark Connect server.
Returns SparkSession directly (stateless).
Copy link

Copilot AI Dec 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The connect() class method returns a SparkSession but doesn't indicate how cleanup should be handled. The documentation mentions "Disconnects only, doesn't delete shared server" at line 404, but this behavior should be clearly documented in the method's docstring to avoid confusion about resource lifecycle management.

Suggested change
Returns SparkSession directly (stateless).
The returned SparkSession is a direct client to a shared Spark Connect server
and is not responsible for the server's lifecycle.
Cleanup:
Call ``spark.stop()`` on the returned SparkSession when you are done.
This disconnects the client session only and does **not** delete or
shut down the underlying shared Spark Connect server.
Returns:
SparkSession connected to the existing Spark Connect server.

Copilot uses AI. Check for mistakes.
def connect(cls, url: str, token: str = None, use_ssl: bool = False) -> SparkSession:
"""Connect to an existing Spark Connect server.
Returns SparkSession directly (stateless).
Copy link

Copilot AI Dec 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The docstring states "Returns SparkSession directly (stateless)" but this is misleading. A SparkSession itself is inherently stateful - it maintains connection state, configuration, and manages resources. Consider rephrasing to clarify that the client itself remains stateless, but the returned SparkSession is a stateful connection object.

Suggested change
Returns SparkSession directly (stateless).
Returns a stateful SparkSession connection object directly; the SparkClient itself remains stateless.

Copilot uses AI. Check for mistakes.
Comment on lines +282 to +430
"""List batch Spark jobs."""

def get_job(self, name: str) -> SparkJob:
"""Get a specific Spark job by name."""

def get_job_logs(
self,
name: str,
container: str = "driver",
follow: bool = False,
) -> Iterator[str]:
"""Get logs from a Spark job (driver or executor)."""

def wait_for_job_status(
self,
name: str,
status: Set[SparkJobStatus] = {SparkJobStatus.COMPLETED},
timeout: int = 600,
) -> SparkJob:
"""Wait for a job to reach desired status."""

def delete_job(self, name: str) -> None:
"""Delete a Spark job."""
Copy link

Copilot AI Dec 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The API documentation lacks information about error handling and potential exceptions. Consider adding a section or notes about what exceptions users should expect and handle (e.g., connection failures, timeout errors, resource conflicts) to help users write robust code.

Suggested change
"""List batch Spark jobs."""
def get_job(self, name: str) -> SparkJob:
"""Get a specific Spark job by name."""
def get_job_logs(
self,
name: str,
container: str = "driver",
follow: bool = False,
) -> Iterator[str]:
"""Get logs from a Spark job (driver or executor)."""
def wait_for_job_status(
self,
name: str,
status: Set[SparkJobStatus] = {SparkJobStatus.COMPLETED},
timeout: int = 600,
) -> SparkJob:
"""Wait for a job to reach desired status."""
def delete_job(self, name: str) -> None:
"""Delete a Spark job."""
"""List batch Spark jobs.
Args:
status: Optional status filter to return only jobs in the given state.
Raises:
ConnectionError: If the client cannot communicate with the Kubernetes cluster
or underlying Spark resources.
TimeoutError: If listing jobs exceeds an internal client timeout.
"""
def get_job(self, name: str) -> SparkJob:
"""Get a specific Spark job by name.
Args:
name: Name of the job to retrieve.
Raises:
ConnectionError: If the client cannot communicate with the Kubernetes cluster
or underlying Spark resources.
TimeoutError: If retrieving the job exceeds an internal client timeout.
KeyError: If no job with the given name exists.
"""
def get_job_logs(
self,
name: str,
container: str = "driver",
follow: bool = False,
) -> Iterator[str]:
"""Get logs from a Spark job (driver or executor).
Args:
name: Name of the job whose logs should be streamed or fetched.
container: Container to read logs from (e.g., ``"driver"`` or an executor).
follow: If True, stream logs until the job completes or the stream is closed.
Raises:
ConnectionError: If the client cannot connect to the log stream source.
TimeoutError: If reading logs exceeds an internal client timeout.
ValueError: If the requested container name is invalid for the given job.
KeyError: If no job with the given name exists.
"""
def wait_for_job_status(
self,
name: str,
status: Set[SparkJobStatus] = {SparkJobStatus.COMPLETED},
timeout: int = 600,
) -> SparkJob:
"""Wait for a job to reach desired status.
Args:
name: Name of the job to wait on.
status: Set of desired terminal or intermediate statuses to wait for.
timeout: Maximum time in seconds to wait before giving up.
Raises:
ConnectionError: If the client cannot communicate with the Kubernetes cluster
or underlying Spark resources while polling status.
TimeoutError: If the job does not reach a desired status within ``timeout``
seconds, or if a single poll exceeds an internal timeout.
KeyError: If no job with the given name exists.
"""
def delete_job(self, name: str) -> None:
"""Delete a Spark job.
Args:
name: Name of the job to delete.
Raises:
ConnectionError: If the client cannot communicate with the Kubernetes cluster
or underlying Spark resources.
TimeoutError: If deleting the job exceeds an internal client timeout.
KeyError: If no job with the given name exists.
"""

Copilot uses AI. Check for mistakes.
Comment on lines 232 to 240
def get_server_logs(
self,
name: str,
follow: bool = False,
) -> Iterator[str]:
"""Get logs from a Spark Connect server.
Args:
name: Name of the Spark Connect server.
Copy link

Copilot AI Dec 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inconsistent terminology: The method is named get_server_logs but operates on "sessions" (as indicated by the name parameter referring to a session name). This naming inconsistency between "server" and "session" could confuse users. Consider renaming to get_session_logs for consistency with other session-related methods.

Suggested change
def get_server_logs(
self,
name: str,
follow: bool = False,
) -> Iterator[str]:
"""Get logs from a Spark Connect server.
Args:
name: Name of the Spark Connect server.
def get_session_logs(
self,
name: str,
follow: bool = False,
) -> Iterator[str]:
"""Get logs from a Spark Connect session.
Args:
name: Name of the Spark Connect session.

Copilot uses AI. Check for mistakes.
from typing import Optional

@dataclass
class Resources:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is used for Driver#resources and Executor#resources_per_executor - we are following composite design pattern.

@Shekharrajak Shekharrajak force-pushed the feat/kep-spark-client branch from 6d12de7 to c785109 Compare January 8, 2026 14:54
Copy link
Member

@andreyvelich andreyvelich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @Shekharrajak!
Mostly lgtm, I suggest that we move some topics to phase 2 as we discussed.
/assign @kramaranya @kubeflow/kubeflow-sdk-team for the final feedback.

}
```

### Structured Types
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess, we can remove this from the version 1, right?
And implement this in the 2nd phase if needed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes +1

```

```python
class SparkClientBuilder:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you move Builder to the future plans, since we will implement it later if needed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we will start simpler in first MVP

Comment on lines 323 to 324
token: Optional authentication token for existing server.
use_ssl: Whether to use SSL for connection (only for existing servers).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does SparkConnect CRD support ssl and auth?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No built-in SSL/TLS or authentication support in the SparkConnect CRD. I was thinking for external servers or submitting spark job. But let me remove it for now - we will think later on this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed it for now a2a4837

token: Optional authentication token for existing server.
use_ssl: Whether to use SSL for connection (only for existing servers).
name: Optional session name. Auto-generated if not provided (create mode only).
app_name: Optional Spark application name (create mode only).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need this, right ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If user want to provide specific name - let's park it for phase 2

If provided, connects to existing server. If None, creates new session.
token: Optional authentication token for existing server.
use_ssl: Whether to use SSL for connection (only for existing servers).
name: Optional session name. Auto-generated if not provided (create mode only).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WIll name be equal to the SparkConnect CR name? Can we move it to options for now, to be consistent with Trainer.

def delete_session(self, name: str) -> None:
"""Delete a Spark Connect session."""

def submit_job(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's say that job submission APIs will be implemented in the phase 2.

@andreyvelich
Copy link
Member

/cc @ChenYi015 @nabuskey @vara-bonthu

@Shekharrajak
Copy link
Member Author

I understand there will be bunch of iterations on the API design and to improve user experience. But definitely we will focus on first MVP as simple as possible with strong foundation and phase wise implement features.

The more important point is to start the journey otherwise we will see no progress.

Shekharrajak and others added 2 commits January 9, 2026 08:25
Co-authored-by: Andrey Velichkevich <[email protected]>
Signed-off-by: Shekhar Prasad Rajak <[email protected]>
Copy link
Member

@andreyvelich andreyvelich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I agree with you.
Thank you for this work @Shekharrajak, excited to see this moving forward!
/lgtm
/approve
/hold in case @kramaranya @astefanutti @szaher @Fiona-Waters wanted to give more comments.
/assign @kramaranya @astefanutti @szaher @Fiona-Waters

@google-oss-prow
Copy link

[APPROVALNOTIFIER] This PR is APPROVED

This pull-request has been approved by: andreyvelich

The full list of commands accepted by this bot can be found here.

The pull request process is described here

Details Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

10 participants