-
Notifications
You must be signed in to change notification settings - Fork 66
feat(docs): KEP- Spark Client for Kubeflow SDK #163
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
dfdb297 to
8ad3b6e
Compare
b458571 to
30f3336
Compare
Pull Request Test Coverage Report for Build 20839855538Details
💛 - Coveralls |
| # Custom backend implementation | ||
| from kubeflow.spark.backends.base import SparkBackend | ||
|
|
||
| class CustomBackend(SparkBackend): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Users can extend the backend, if they want to have any specific changes or different way to connect or submit spark job
andreyvelich
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for this great effort @Shekharrajak!
I left my initial thoughts.
| ### Goals | ||
|
|
||
| - Design a unified, Pythonic SDK for managing Spark applications on Kubernetes | ||
| - Support multiple backends (Kubernetes Operator, REST Gateway, Spark Connect) following the Trainer pattern |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For Trainer, backends represent various job submission (local subprocess, container, and Kubernetes). I am not sure if we can replicate it for Spark.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here we have job submission using K8S Operator backend, Spark Connector backend , Gateway backend (not implemented - we just have abstract class).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am wondering what is the main motivation to separate SessionClient() and BatchClient()?
Alternatively, we can just have unified SparkClient() which has sessions and batch APIs:
submit_job() <-- to create Spark Application and submit batch job
connect() <-- to create session and connect to existing Spark clusterThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- BatchSparkClient users never see connect(), create_session() methods
- SparkSessionClient users never see submit_application(), wait_for_job_status() methods
- This prevents runtime errors: Can't call wait_for_job_status() on a session object.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This helps user to know which methods are available clearly :
client = SparkClient()
# User sees BOTH batch AND session methods
client.submit_job(...) # For batch
client.connect(...) # For session
client.get_job(...) # Works with connect() or batch() ?
# event if we take arg in config:
client = SparkClient(mode="batch")
client.create_session(...) # IDE will not show error, but runtime error
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we know how the Spark session management will work with the new CRD: SparkCluster proposed by @ChenYi015 here: kubeflow/spark-operator#2744 ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added my view: kubeflow/spark-operator#2744 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we want to create the spark cluster through the SDK then we need API like :
from kubeflow.spark import SparkClusterManager
manager = SparkClusterManager()
# Create persistent cluster
cluster = manager.create_cluster(
name="prod",
spark_version="4.0.1",
init_workers=5,
min_workers=2,
max_workers=10,
enable_hpa=True,
)
# Cluster url to use for spark job submission: cluster.master_url
| ┌───────────┴─────────────┬──────────────────┬────────────────┐ | ||
| ▼ ▼ ▼ ▼ | ||
| ┌──────────────────┐ ┌──────────────────┐ ┌──────────────────┐ ┌──────────────────┐ | ||
| │OperatorBackend │ │ GatewayBackend │ │ ConnectBackend │ │ LocalBackend │ | ||
| │(Spark Operator │ │ (REST Gateway) │ │ (Spark Connect/ │ │ (Future) │ | ||
| │ CRDs on K8s) │ │ │ │ Interactive) │ │ │ | ||
| └──────────────────┘ └──────────────────┘ └──────────────────┘ └──────────────────┘ | ||
| Batch Jobs Batch Jobs Sessions Local Dev |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you explain reason of creating various backends? Can we just have an API: SparkClient().connect() which creates SparkConnect CR and connects to the existing cluster as we discussed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The batch backend will have APIs like submit_application, wait_for_completion, get_logs, .. where user will just submit the job and can check the logs/results.
example: https://github.com/kubeflow/sdk/pull/158/files#diff-e692a5819ee6b1dc00cba3b58e91f058c0022d3ca9aa6f3ee468126f245eef89R89
But with interactive session spark client user will be able to run interactive SQL queries and DataFrame operations.
example: https://github.com/kubeflow/sdk/pull/158/files#diff-a5011f48c9d6d16ff6ddd65588f65a7c78abf5fbeb121cccb693c0892ce3a5aeR275
| # Submit a Spark application | ||
| response = client.submit_application( | ||
| app_name="spark-pi", | ||
| main_application_file="local:///opt/spark/examples/src/main/python/pi.py", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am wondering if there is a way to allow users bypass function to SparkApplication similar to Trainer API: https://github.com/kubeflow/sdk?tab=readme-ov-file#run-your-first-pytorch-distributed-job
That might be interesting to explore how we can allow to submit SparkApplication without building an image.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we thinking an API like this ?
def my_job(input_path: str, output_path: str):
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df = spark.read.csv(input_path)
df.groupBy("col").count().write.parquet(output_path)
spark.stop()
client.submit_function(
func=my_job,
func_args={"input_path": "s3://in", "output_path": "s3://out"}
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, that might be interesting to explore.
We can also extend submit_job() API to accept main_application_file or function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me add to KEP and will work on it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Spark requires pre-allocated cluster resources (driver/executor specifications), so I could not find such way for running spark job. But we can explore in next version.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vara-bonthu @ChenYi015 @nabuskey Do you have any ideas on how we can dynamically inject the users' code into Spark driver without re-building an image or put Python file into Pod's filesystem?
I have one idea if we have can leverage PodTemplate spec.
Let's say we define InitContainer that simply creates file in the Pod's filesystem with the Spark code in the pre-defined location (e.g. local:///opt/spark/spark_job.py). This initContainer can share emptyDir with the driver container, so driver container can see the file. To create the file, we can use the same script as for Trainer:
container:
command:
- bash
- -c
- |
read -r -d '' SCRIPT << EOM
def my_job(input_path: str, output_path: str):
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df = spark.read.csv(input_path)
df.groupBy("col").count().write.parquet(output_path)
spark.stop()
EOM
printf "%s" "$SCRIPT" > "/opt/spark/spark_job.py"WDYT ?
cc @shravan-achar @akshaychitneni @bigsur0 for feedback on this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry to interrupt. I implemented a decorator that uses inspect to get the function's body and form an executable file. It also parses the function's signature into submit parameters (they must be JSON-serializable). I could implement this if you're interested.
Example
@spark_submit
def my_task(first, second):
...There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@aagumin Yeah, this looks similar to how we do this in TrainerClient(): https://github.com/kubeflow/sdk/blob/main/kubeflow/trainer/backends/kubernetes/utils.py#L297-L301
I think, having the consistent UX across Data Processing (e.g. SparkJobs) and Training (e.g. TrainJob) workloads would be really cool.
As long as SparkApplication CRD can support initContainer via PodTemplate, that should be relatively easy to implement.
cc @franciscojavierarceo @kubeflow/kubeflow-sdk-team
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
| # Step 1: Interactive development with ConnectBackend | ||
| connect_config = ConnectBackendConfig(connect_url="sc://dev-cluster:15002") | ||
| dev_client = SparkClient(backend_config=connect_config) | ||
|
|
||
| with dev_client.create_session("dev") as session: | ||
| # Test and validate query | ||
| test_df = session.sql("SELECT * FROM data LIMIT 1000") | ||
| test_df.show() | ||
| # Iterate and refine... | ||
|
|
||
| # Step 2: Production batch job with OperatorBackend | ||
| prod_config = OperatorBackendConfig(namespace="production") | ||
| prod_client = SparkClient(backend_config=prod_config) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is pretty interesting experience for dev -> prod Spark lifecycle.
cc @shravan-achar @akshaychitneni @bigsur0 to explore.
| ) | ||
| ``` | ||
|
|
||
| #### Integration with Pipelines |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @kubeflow/wg-pipeline-leads to explore
f428a9d to
33a9ad1
Compare
| config = ConnectBackendConfig( | ||
| connect_url="sc://spark-cluster:15002", | ||
| use_ssl=True, | ||
| ) | ||
|
|
||
| # Create client and session | ||
| client = SparkSessionClient(backend_config=config) | ||
| session = client.create_session(app_name="analysis") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How will this be better than a native SparkConnect call? I can see that you can’t replace a Connect client with a regular one without changing interfaces, considering that the options for configuring a Connect session are significantly more limited.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We will have more kubeflow compatible APIs and infra.
|
|
||
| Implements the backend for managed Spark gateways: | ||
|
|
||
| - Submits applications via REST endpoints |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If a PR is added for gRPC submit support and the ability to implement a custom submit mechanism, then this becomes relevant.
e7dc708 to
92cb005
Compare
andreyvelich
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the update @Shekharrajak!
I left a few comments.
| client = ( | ||
| SparkClient.builder() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the idea of builder(), but shall this comes in the 2nd phase?
We have similar ideas with @kubeflow/kubeflow-sdk-team for TrainerClient and OptimizerClient, so maybe we can unify the experience?
Initially, we can just allow users to specify minimal number of parameters in the connect() API.
But I am happy to debate it during our next SDK call on Dec 17th: https://docs.google.com/document/d/1jH2WAX2ePxOfI4JuiVK9nPlesDMiyg67xzLwhpR7wTQ/edit?tab=t.0#heading=h.jarjwwjkqmul
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree, I'd like us to start with constructor and config objects aligning with existing clients and later expand it with builder approach
| ```python | ||
| class SparkClientBuilder: | ||
| """Builder for SparkClient configuration.""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will SparkApplication and SparkConnect CRD always have the same APIs? I am wondering if we can re-use the same builder for them.
cc @ChenYi015
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would be internal changes, we can take care of it later as well.
| # Kubernetes settings | ||
| def namespace(self, ns: str) -> "SparkClientBuilder": | ||
| """Set Kubernetes namespace.""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think, namespace should be part of KubernetesBackendConfig in SparkClient to align with other clients: https://github.com/kubeflow/sdk/blob/main/kubeflow/trainer/backends/kubernetes/backend.py#L42-L43
| from kfp import dsl | ||
| from kubeflow.spark.pipelines import SparkJobOp | ||
|
|
||
| @dsl.pipeline(name="ml-pipeline") | ||
| def ml_pipeline(): | ||
| # Spark ETL step | ||
| etl = SparkJobOp( | ||
| name="feature-etl", | ||
| main_file="s3a://ml/etl.py", | ||
| executor_instances=20, | ||
| executor_memory="8g", | ||
| ) | ||
|
|
||
| # Training step depends on ETL completion | ||
| train = TrainOp( | ||
| dataset_path=etl.outputs["output_path"], | ||
| ) | ||
| train.after(etl) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should whether we need to move it under: kubeflow.pipelines.components.
We can also leverage KFP lightweight Python tasks to submit such pipelines.
cc @franciscojavierarceo @kubeflow/wg-pipeline-leads
|
I am missing a bit the CPU and memory requests vs Limit . I only see which is somehow mixing up requests with limits. Furthermore we should make sure that it also works with the integrated seaweedfs S3. |
|
|
||
| client = ( | ||
| SparkClient.builder() | ||
| .namespace("spark-jobs") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will this integrate well with our current multi-tenancy architecture ? https://github.com/kubeflow/manifests#architecture
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So if i execute this from a jupyterlab will it just use the current namespace?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes
| # Resource configuration | ||
| def driver(self, cores: int = 1, memory: str = "1g") -> "SparkClientBuilder": | ||
| """Configure driver resources.""" | ||
|
|
||
| def executor(self, cores: int = 1, memory: str = "1g") -> "SparkClientBuilder": | ||
| """Configure executor resources.""" | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am missing a bit the CPU and memory requests vs Limit . I only see
.driver(cores=2, memory="4g")
.executor(cores=4, memory="8g")
which is somehow mixing up requests with limits. Furthermore we should make sure that it also works with the integrated seaweedfs S3.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for pointing it out. There are lot of conf is spark so anyways we need wider key value pair set.
kramaranya
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @Shekharrajak
I left a few comments
| client = ( | ||
| SparkClient.builder() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree, I'd like us to start with constructor and config objects aligning with existing clients and later expand it with builder approach
| SparkClient | ||
| │ | ||
| ├── session() ──► creates Spark Connect server (if needed) | ||
| │ │ | ||
| │ ▼ | ||
| │ SparkApplication CRD | ||
| │ │ | ||
| │ ▼ | ||
| │ Spark Driver Pod | ||
| │ (with Spark Connect enabled) | ||
| │ │ | ||
| └── connect() ──────► sc://server:15002 ──► SparkSession |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So how batch jobs differ from interactive session?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Batch (SparkApplication CRD) : Run a predefined Spark application to completion (ETL pipelines, ML training, data processing)
Interactive Sessions ((SparkConnect CRD)): Long-running server that accepts ad-hoc queries from clients (notebooks, BI tools, data exploration)
| def session(self, app_name: Optional[str] = None) -> SparkSession: | ||
| """Get or create SparkSession. Creates Spark Connect server if needed.""" | ||
|
|
||
| @property | ||
| def spark(self) -> SparkSession: | ||
| """Shortcut to session().""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm currently SparkClient is stateful and it remembers the session, so client.session() stores a reference, and then client.stop() knows which session to stop. But batch jobs work differently since they're managed by name client.delete_job(name) and the client doesn't track them
This means that you can only have one session per client, right? and client.stop() is confusing when you have both sessions and jobs running
What do you think if we make the client stateless like TrainerClient? So create_session() would return a session object that manages itself (so you can session.stop()) and submit_job() returns a job name that you manage through the client:
session = client.create_session()
session.stop()
job_name = client.submit_job(...)
client.delete_job(job_name)wdyt @Shekharrajak @andreyvelich ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR introduces KEP-107, a comprehensive enhancement proposal for a Spark Client SDK for Kubeflow. The proposal outlines a Python SDK that simplifies running Spark on Kubernetes by providing auto-provisioning of Spark Connect servers, connection management to existing servers, and batch job submission capabilities.
Key Changes:
- Introduces a new
SparkClientAPI with interactive session management (create_session(),connect()) and batch job support (submit_job()) - Defines a builder pattern for advanced configuration with resource tuning, volumes, and custom Spark settings
- Proposes a pluggable backend architecture supporting KubernetesBackend, GatewayBackend, and LivyBackend
- Outlines integration points with Kubeflow Pipelines, Trainer, and Spark History MCP Server
| from kubeflow.spark import SparkClient | ||
|
|
||
| # Connect to existing Spark Connect server - returns SparkSession directly | ||
| spark = SparkClient.connect("sc://spark-server:15002") |
Copilot
AI
Dec 30, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The documentation shows "s3a://bucket/data/" but this requires proper S3A configuration which isn't shown in the basic example. Consider adding a comment or note that S3 access requires appropriate Spark configuration (e.g., AWS credentials, endpoint configuration) to avoid confusion for users.
| spark = SparkClient.connect("sc://spark-server:15002") | |
| spark = SparkClient.connect("sc://spark-server:15002") | |
| # Note: Accessing S3 using s3a:// requires appropriate Spark/Hadoop S3A configuration | |
| # (e.g., AWS credentials, endpoint configuration) to be set up in your environment. |
| def submit_job( | ||
| self, | ||
| func: Optional[Callable[[SparkSession], Any]] = None, | ||
| func_args: Optional[Dict[str, Any]] = None, | ||
| main_file: Optional[str] = None, | ||
| main_class: Optional[str] = None, | ||
| arguments: Optional[List[str]] = None, | ||
| name: Optional[str] = None, | ||
| ) -> str: | ||
| """Submit a batch Spark job. | ||
| Supports two modes based on parameters: | ||
| - Function mode: Pass `func` to submit a Python function with Spark transformations | ||
| - File mode: Pass `main_file` to submit an existing Python/Jar file | ||
| Args: | ||
| func: Python function that receives SparkSession (function mode). | ||
| func_args: Arguments to pass to the function. | ||
| main_file: Path to Python/Jar file (file mode). | ||
| main_class: Main class for Jar files. | ||
| arguments: Command-line arguments for the job. | ||
| name: Optional job name. | ||
| Returns: | ||
| The job name (string) for tracking. | ||
| Raises: | ||
| ValueError: If neither `func` nor `main_file` is provided, or both are provided. | ||
| """ |
Copilot
AI
Dec 30, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The submit_job method documentation states it "Raises: ValueError: If neither func nor main_file is provided, or both are provided" but this validation logic is not clear in the method signature. Consider adding a note in the docstring about mutual exclusivity being enforced at runtime, or better yet, use separate methods (submit_job_file and submit_job_func) to make the API more explicit and type-safe.
|
|
||
| ## Dependencies | ||
|
|
||
| - `pyspark>=3.4.0` (Spark Connect support) |
Copilot
AI
Dec 30, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Incomplete documentation: The "Dependencies" section lists pyspark>=3.4.0 but the examples throughout the document reference Spark 3.5.0 (e.g., line 103, 157). Consider clarifying whether 3.4.0 is the minimum required version while 3.5.0 is recommended, or update the version constraint to match the examples.
| - `pyspark>=3.4.0` (Spark Connect support) | |
| - `pyspark>=3.4.0` (minimum for Spark Connect; examples use Spark 3.5.0+) |
| ) -> SparkJob: | ||
| """Wait for a job to reach desired status.""" | ||
|
|
||
| def delete_job(self, name: str) -> None: |
Copilot
AI
Dec 30, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing return type annotation. The method should include -> None to be consistent with the project's type hinting requirements.
| """Add a volume (for driver and executors).""" | ||
|
|
Copilot
AI
Dec 30, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The volume method uses **spec to accept arbitrary keyword arguments, but this API design is unclear. Consider specifying the volume spec structure more explicitly (e.g., using a TypedDict or Pydantic model) or documenting the expected keys in the docstring to improve API clarity and type safety.
| """Add a volume (for driver and executors).""" | |
| """Add a volume (for driver and executors). | |
| The ``spec`` argument describes the underlying Kubernetes volume and | |
| mount configuration. Common keys include: | |
| - ``type``: Volume type, e.g. ``"config_map"``, ``"secret"``, | |
| ``"persistent_volume_claim"``. | |
| - ``config_map``: Name of the ConfigMap (when ``type="config_map"``). | |
| - ``secret``: Name of the Secret (when ``type="secret"``). | |
| - ``persistent_volume_claim``: Name of the PVC | |
| (when ``type="persistent_volume_claim"``). | |
| - ``sub_path``: Optional sub-path within the volume to mount. | |
| - ``read_only``: Optional ``bool`` indicating if the mount is read-only. | |
| Implementations should translate these keys into the appropriate | |
| Kubernetes ``volume`` and ``volumeMount`` fields for both driver and | |
| executor pods. | |
| """ |
| follow: Whether to stream logs in realtime. | ||
| """ | ||
|
|
||
| def delete_session(self, name: str) -> None: |
Copilot
AI
Dec 30, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing return type annotation in the API signature. The method should include -> None to maintain consistency with the coding standards requiring type hints and return types for all functions.
| def connect(cls, url: str, token: str = None, use_ssl: bool = False) -> SparkSession: | ||
| """Connect to an existing Spark Connect server. | ||
| Returns SparkSession directly (stateless). |
Copilot
AI
Dec 30, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The connect() class method returns a SparkSession but doesn't indicate how cleanup should be handled. The documentation mentions "Disconnects only, doesn't delete shared server" at line 404, but this behavior should be clearly documented in the method's docstring to avoid confusion about resource lifecycle management.
| Returns SparkSession directly (stateless). | |
| The returned SparkSession is a direct client to a shared Spark Connect server | |
| and is not responsible for the server's lifecycle. | |
| Cleanup: | |
| Call ``spark.stop()`` on the returned SparkSession when you are done. | |
| This disconnects the client session only and does **not** delete or | |
| shut down the underlying shared Spark Connect server. | |
| Returns: | |
| SparkSession connected to the existing Spark Connect server. |
| def connect(cls, url: str, token: str = None, use_ssl: bool = False) -> SparkSession: | ||
| """Connect to an existing Spark Connect server. | ||
| Returns SparkSession directly (stateless). |
Copilot
AI
Dec 30, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The docstring states "Returns SparkSession directly (stateless)" but this is misleading. A SparkSession itself is inherently stateful - it maintains connection state, configuration, and manages resources. Consider rephrasing to clarify that the client itself remains stateless, but the returned SparkSession is a stateful connection object.
| Returns SparkSession directly (stateless). | |
| Returns a stateful SparkSession connection object directly; the SparkClient itself remains stateless. |
| """List batch Spark jobs.""" | ||
|
|
||
| def get_job(self, name: str) -> SparkJob: | ||
| """Get a specific Spark job by name.""" | ||
|
|
||
| def get_job_logs( | ||
| self, | ||
| name: str, | ||
| container: str = "driver", | ||
| follow: bool = False, | ||
| ) -> Iterator[str]: | ||
| """Get logs from a Spark job (driver or executor).""" | ||
|
|
||
| def wait_for_job_status( | ||
| self, | ||
| name: str, | ||
| status: Set[SparkJobStatus] = {SparkJobStatus.COMPLETED}, | ||
| timeout: int = 600, | ||
| ) -> SparkJob: | ||
| """Wait for a job to reach desired status.""" | ||
|
|
||
| def delete_job(self, name: str) -> None: | ||
| """Delete a Spark job.""" |
Copilot
AI
Dec 30, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The API documentation lacks information about error handling and potential exceptions. Consider adding a section or notes about what exceptions users should expect and handle (e.g., connection failures, timeout errors, resource conflicts) to help users write robust code.
| """List batch Spark jobs.""" | |
| def get_job(self, name: str) -> SparkJob: | |
| """Get a specific Spark job by name.""" | |
| def get_job_logs( | |
| self, | |
| name: str, | |
| container: str = "driver", | |
| follow: bool = False, | |
| ) -> Iterator[str]: | |
| """Get logs from a Spark job (driver or executor).""" | |
| def wait_for_job_status( | |
| self, | |
| name: str, | |
| status: Set[SparkJobStatus] = {SparkJobStatus.COMPLETED}, | |
| timeout: int = 600, | |
| ) -> SparkJob: | |
| """Wait for a job to reach desired status.""" | |
| def delete_job(self, name: str) -> None: | |
| """Delete a Spark job.""" | |
| """List batch Spark jobs. | |
| Args: | |
| status: Optional status filter to return only jobs in the given state. | |
| Raises: | |
| ConnectionError: If the client cannot communicate with the Kubernetes cluster | |
| or underlying Spark resources. | |
| TimeoutError: If listing jobs exceeds an internal client timeout. | |
| """ | |
| def get_job(self, name: str) -> SparkJob: | |
| """Get a specific Spark job by name. | |
| Args: | |
| name: Name of the job to retrieve. | |
| Raises: | |
| ConnectionError: If the client cannot communicate with the Kubernetes cluster | |
| or underlying Spark resources. | |
| TimeoutError: If retrieving the job exceeds an internal client timeout. | |
| KeyError: If no job with the given name exists. | |
| """ | |
| def get_job_logs( | |
| self, | |
| name: str, | |
| container: str = "driver", | |
| follow: bool = False, | |
| ) -> Iterator[str]: | |
| """Get logs from a Spark job (driver or executor). | |
| Args: | |
| name: Name of the job whose logs should be streamed or fetched. | |
| container: Container to read logs from (e.g., ``"driver"`` or an executor). | |
| follow: If True, stream logs until the job completes or the stream is closed. | |
| Raises: | |
| ConnectionError: If the client cannot connect to the log stream source. | |
| TimeoutError: If reading logs exceeds an internal client timeout. | |
| ValueError: If the requested container name is invalid for the given job. | |
| KeyError: If no job with the given name exists. | |
| """ | |
| def wait_for_job_status( | |
| self, | |
| name: str, | |
| status: Set[SparkJobStatus] = {SparkJobStatus.COMPLETED}, | |
| timeout: int = 600, | |
| ) -> SparkJob: | |
| """Wait for a job to reach desired status. | |
| Args: | |
| name: Name of the job to wait on. | |
| status: Set of desired terminal or intermediate statuses to wait for. | |
| timeout: Maximum time in seconds to wait before giving up. | |
| Raises: | |
| ConnectionError: If the client cannot communicate with the Kubernetes cluster | |
| or underlying Spark resources while polling status. | |
| TimeoutError: If the job does not reach a desired status within ``timeout`` | |
| seconds, or if a single poll exceeds an internal timeout. | |
| KeyError: If no job with the given name exists. | |
| """ | |
| def delete_job(self, name: str) -> None: | |
| """Delete a Spark job. | |
| Args: | |
| name: Name of the job to delete. | |
| Raises: | |
| ConnectionError: If the client cannot communicate with the Kubernetes cluster | |
| or underlying Spark resources. | |
| TimeoutError: If deleting the job exceeds an internal client timeout. | |
| KeyError: If no job with the given name exists. | |
| """ |
| def get_server_logs( | ||
| self, | ||
| name: str, | ||
| follow: bool = False, | ||
| ) -> Iterator[str]: | ||
| """Get logs from a Spark Connect server. | ||
| Args: | ||
| name: Name of the Spark Connect server. |
Copilot
AI
Dec 30, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Inconsistent terminology: The method is named get_server_logs but operates on "sessions" (as indicated by the name parameter referring to a session name). This naming inconsistency between "server" and "session" could confuse users. Consider renaming to get_session_logs for consistency with other session-related methods.
| def get_server_logs( | |
| self, | |
| name: str, | |
| follow: bool = False, | |
| ) -> Iterator[str]: | |
| """Get logs from a Spark Connect server. | |
| Args: | |
| name: Name of the Spark Connect server. | |
| def get_session_logs( | |
| self, | |
| name: str, | |
| follow: bool = False, | |
| ) -> Iterator[str]: | |
| """Get logs from a Spark Connect session. | |
| Args: | |
| name: Name of the Spark Connect session. |
| from typing import Optional | ||
|
|
||
| @dataclass | ||
| class Resources: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is used for Driver#resources and Executor#resources_per_executor - we are following composite design pattern.
Signed-off-by: shekharrajak <[email protected]>
6d12de7 to
c785109
Compare
andreyvelich
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @Shekharrajak!
Mostly lgtm, I suggest that we move some topics to phase 2 as we discussed.
/assign @kramaranya @kubeflow/kubeflow-sdk-team for the final feedback.
| } | ||
| ``` | ||
|
|
||
| ### Structured Types |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess, we can remove this from the version 1, right?
And implement this in the 2nd phase if needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes +1
| ``` | ||
|
|
||
| ```python | ||
| class SparkClientBuilder: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you move Builder to the future plans, since we will implement it later if needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, we will start simpler in first MVP
| token: Optional authentication token for existing server. | ||
| use_ssl: Whether to use SSL for connection (only for existing servers). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does SparkConnect CRD support ssl and auth?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No built-in SSL/TLS or authentication support in the SparkConnect CRD. I was thinking for external servers or submitting spark job. But let me remove it for now - we will think later on this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed it for now a2a4837
| token: Optional authentication token for existing server. | ||
| use_ssl: Whether to use SSL for connection (only for existing servers). | ||
| name: Optional session name. Auto-generated if not provided (create mode only). | ||
| app_name: Optional Spark application name (create mode only). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need this, right ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If user want to provide specific name - let's park it for phase 2
| If provided, connects to existing server. If None, creates new session. | ||
| token: Optional authentication token for existing server. | ||
| use_ssl: Whether to use SSL for connection (only for existing servers). | ||
| name: Optional session name. Auto-generated if not provided (create mode only). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
WIll name be equal to the SparkConnect CR name? Can we move it to options for now, to be consistent with Trainer.
| def delete_session(self, name: str) -> None: | ||
| """Delete a Spark Connect session.""" | ||
|
|
||
| def submit_job( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's say that job submission APIs will be implemented in the phase 2.
|
I understand there will be bunch of iterations on the API design and to improve user experience. But definitely we will focus on first MVP as simple as possible with strong foundation and phase wise implement features. The more important point is to start the journey otherwise we will see no progress. |
Co-authored-by: Andrey Velichkevich <[email protected]> Signed-off-by: Shekhar Prasad Rajak <[email protected]>
Signed-off-by: shekharrajak <[email protected]>
andreyvelich
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I agree with you.
Thank you for this work @Shekharrajak, excited to see this moving forward!
/lgtm
/approve
/hold in case @kramaranya @astefanutti @szaher @Fiona-Waters wanted to give more comments.
/assign @kramaranya @astefanutti @szaher @Fiona-Waters
|
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: andreyvelich The full list of commands accepted by this bot can be found here. The pull request process is described here DetailsNeeds approval from an approver in each of these files:
Approvers can indicate their approval by writing |
Ref https://docs.google.com/document/d/1l57bBlpxrW4gLgAGnoq9Bg7Shre7Cglv4OLCox7ER_s/edit?tab=t.0
PR: #158