⚠ This page is served via a proxy. Original site: https://github.com
This service does not collect credentials or authentication data.
Skip to content

Conversation

@HKanoje
Copy link

@HKanoje HKanoje commented Dec 4, 2025

What this PR does / why we need it

This PR implements dataset and model initializer support in the container backend, bringing it to feature parity with the Kubernetes backend. This addresses issue #171 by enabling users to automatically download and prepare datasets and models before training starts.

Solution Overview

This implementation adds full initializer support to the container backend by:

  1. Running initializer containers before training - Initializers execute sequentially (dataset first, then model) before any training containers start
  2. Using shared volumes - All containers (initializers and training nodes) share the same workspace directory on the host
  3. Proper lifecycle management - Initializers must complete successfully before training begins, with automatic cleanup on failures
  4. Comprehensive error handling - Clear error messages and proper resource cleanup when initialization fails

Detailed Changes

1. New Utility Functions (kubeflow/trainer/backends/container/utils.py)

build_initializer_command(initializer, init_type)

Builds the appropriate container command based on initializer type:

  • HuggingFace: Uses kubeflow.storage_initializer.hugging_face module
  • S3: Uses kubeflow.storage_initializer.s3 module
  • DataCache: Uses kubeflow.storage_initializer.datacache module

build_initializer_env(initializer, init_type)

Constructs environment variables from initializer configuration:

  • Sets STORAGE_URI from the initializer config
  • Sets OUTPUT_PATH to /workspace/dataset or /workspace/model based on type
  • Adds optional fields like ACCESS_TOKEN, ENDPOINT, REGION, etc.
  • Handles DataCache-specific variables like CLUSTER_SIZE, METADATA_LOC

get_initializer_image()

Returns the initializer container image (kubeflow/training-operator:latest).
This can be made configurable via backend config in future iterations.

2. Enhanced ContainerBackend (kubeflow/trainer/backends/container/backend.py)

_run_initializers(job_name, initializer, workdir, network_id)

Orchestrates the initialization phase:

  • Pulls the initializer image if needed (respects pull_policy)
  • Runs dataset initializer if configured
  • Runs model initializer if configured
  • Ensures proper sequencing and error propagation

_run_single_initializer(job_name, initializer_config, init_type, image, workdir, network_id)

Executes a single initializer container:

  • Creates container with proper labels for tracking
  • Mounts shared volume to /workspace
  • Monitors container status with configurable timeout (default 10 minutes)
  • Waits for successful completion (exit code 0)
  • Captures and reports logs on failure
  • Cleans up failed containers automatically

Updated train() method

  • Creates network and working directory first
  • Runs initializers before generating training script
  • Only proceeds to training if initialization succeeds
  • Maintains backward compatibility (initializers are optional)

Updated __get_trainjob_from_containers()

  • Correctly counts only training nodes for num_nodes (excludes initializers)
  • Includes initializer containers in the steps list
  • Properly tracks initializer status

Updated get_job_logs()

  • Supports fetching logs from specific initializer steps
  • When requesting node-0 logs (default), only shows training container logs
  • Can explicitly request initializer logs with step="dataset-initializer" or step="model-initializer"

3. Comprehensive Test Coverage (kubeflow/trainer/backends/container/backend_test.py)

Added 11 new test cases covering:

Initialization Success Scenarios

  • HuggingFace dataset initializer - Tests storage_uri parsing and access_token handling
  • S3 dataset initializer - Tests endpoint, region, and credential configuration
  • HuggingFace model initializer - Tests model downloads with ignore_patterns
  • S3 model initializer - Tests S3-compatible storage with authentication
  • Both dataset and model - Tests sequential execution of both initializers
  • DataCache initializer - Tests distributed cache configuration with metadata_loc and cluster size

Log Retrieval

  • Tests getting logs from dataset-initializer step
  • Tests getting logs from model-initializer step
  • Tests that default node logs exclude initializer logs

Error Handling

  • Non-zero exit code - Verifies proper error reporting when initializer fails
  • Timeout handling - Ensures timeout errors are caught and reported
  • Resource cleanup - Confirms containers and networks are cleaned up on failure

Implementation Details

Initialization Flow

  1. User calls train() with initializer parameter

  2. ContainerBackend creates:

   ├── Working directory: ~/.kubeflow/trainer/containers/<job-name>/
   └── Network: <job-name>-net
  1. If initializer.dataset is set:
   ├── Create dataset-initializer container
   ├── Mount workdir to /workspace
   ├── Run initialization (downloads to /workspace/dataset)
   └── Wait for completion (exit code 0)
  1. If initializer.model is set:
   ├── Create model-initializer container
   ├── Mount workdir to /workspace
   ├── Run initialization (downloads to /workspace/model)
   └── Wait for completion (exit code 0)
  1. Create training containers:
   ├── Mount same workdir to /workspace
   └── Access data at /workspace/dataset and /workspace/model

Volume Layout

Host: ~/.kubeflow/trainer/containers/<job-name>/

├── dataset/ (from dataset-initializer, if configured)
│   └── <downloaded dataset files>
├── model/ (from model-initializer, if configured)
│   └── <downloaded model files>
└── outputs/ (accessible to all training nodes)
    └── <training outputs>

Container Mount: /workspace/

├── dataset/ (read by training code)
├── model/ (read by training code)
└── outputs/ (written by training code)

Testing Results

All tests pass with no regressions:

  • 43/43 container backend tests passed
  • 173/173 trainer module tests passed
  • make verify (ruff lint + format) passed
  • pre-commit hooks passed

Usage Example

from kubeflow.trainer import TrainerClient
from kubeflow.trainer.types import (
    CustomTrainer,
    Initializer,
    HuggingFaceDatasetInitializer,
    HuggingFaceModelInitializer,
)

# Define initializers
initializer = Initializer(
    dataset=HuggingFaceDatasetInitializer(
        storage_uri="hf://username/my-dataset",
        access_token="hf_xxxxx",
    ),
    model=HuggingFaceModelInitializer(
        storage_uri="hf://username/my-model",
        access_token="hf_xxxxx",
    ),
)

# Define trainer
trainer = CustomTrainer(
    func=train_func,
    num_nodes=2,
)

# Train with initializers
client = TrainerClient()
job_name = client.train(
    trainer=trainer,
    initializer=initializer,  # <-- Now works with container backend!
)

# Data is automatically available at:
# - /workspace/dataset (in training containers)
# - /workspace/model (in training containers)

@google-oss-prow
Copy link

[APPROVALNOTIFIER] This PR is NOT APPROVED

This pull-request has been approved by:
Once this PR has been reviewed and has the lgtm label, please assign astefanutti for approval. For more information see the Kubernetes Code Review Process.

The full list of commands accepted by this bot can be found here.

Details Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@github-actions
Copy link

github-actions bot commented Dec 4, 2025

🎉 Welcome to the Kubeflow SDK! 🎉

Thanks for opening your first PR! We're happy to have you as part of our community 🚀

Here's what happens next:

  • If you haven't already, please check out our Contributing Guide for repo-specific guidelines and the Kubeflow Contributor Guide for general community standards
  • Our team will review your PR soon! cc @kubeflow/kubeflow-sdk-team

Join the community:

Feel free to ask questions in the comments if you need any help or clarification!
Thanks again for contributing to Kubeflow! 🙏

Copy link
Contributor

@Fiona-Waters Fiona-Waters left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @HKanoje for this PR! I've left some comments, ptal! Thanks

"""
# Use the training-operator image which contains initializer scripts
# This can be made configurable via backend config in the future
return "kubeflow/training-operator:latest"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we make this configurable rather than hardcoding it?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, Definitely. I've made this configurable via ContainerBackendConfig.initializer_image (default: kubeflow/training-operator:latest). Users can now customize it when creating the backend.

try:
import time

timeout = 600 # 10 minutes timeout for initialization
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be configurable, or is 10 minutes always going to be enough time?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added ContainerBackendConfig.initializer_timeout (default: 600 seconds / 10 minutes). This gives users flexibility for large datasets/models that may take longer to download.

# Clean up the failed container
from contextlib import suppress

with suppress(Exception):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As well as cleaning up when a failure occurs, should we clean up the initializer containers when they have been successful also?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implemented wait_for_container() in the adapter interface and both Docker/Podman adapters. This replaces the polling loop with a single blocking wait call - much more efficient.

logger.debug(f"Created network: {network_id}")

# Run initializers if configured
if initializer:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the initializer fails should we clean up the network we have created?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Absolutely! Added cleanup for successful initializer containers after completion to prevent accumulation. Also added cleanup for timed-out containers.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great! Would it make sense to add a helper function for the cleanup logic to reduce duplication?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done! Added _cleanup_container_resources() helper method in commit 0b7a952 to consolidate the duplicated cleanup logic across exception handlers and delete_job().

if isinstance(
initializer, (types.HuggingFaceDatasetInitializer, types.HuggingFaceModelInitializer)
)
else "python -m kubeflow.storage_initializer.datacache "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we are setting datacache as the default/fallback, do we want to do this? In thekubernetes backend we offer 2 options and raise a value error if the type is invalid.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right - that was inconsistent with the kubernetes backend. Changed to raise ValueError with a clear message listing all supported types instead of defaulting to datacache.

import time

timeout = 600 # 10 minutes timeout for initialization
polling_interval = 2
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could using wait API be supported?

@kramaranya
Copy link
Contributor

/ok-to-test

HKanoje added a commit to HKanoje/sdk that referenced this pull request Jan 5, 2026
- Make initializer image configurable via ContainerBackendConfig
- Make initializer timeout configurable (default 600 seconds)
- Implement wait API in adapters instead of polling
- Clean up successful initializer containers after completion
- Clean up network on initializer failure
- Raise ValueError for unsupported initializer types (no datacache fallback)

All tests passing (173/173). Addresses all feedback from PR kubeflow#188.
@kramaranya
Copy link
Contributor

Hey @HKanoje, could you please sign your commits?

@coveralls
Copy link

coveralls commented Jan 5, 2026

Pull Request Test Coverage Report for Build 20747503876

Details

  • 207 of 241 (85.89%) changed or added relevant lines in 7 files are covered.
  • No unchanged relevant lines lost coverage.
  • Overall coverage increased (+1.3%) to 68.062%

Changes Missing Coverage Covered Lines Changed/Added Lines %
kubeflow/trainer/backends/container/adapters/base.py 2 3 66.67%
kubeflow/trainer/backends/container/backend.py 73 77 94.81%
kubeflow/trainer/backends/container/utils.py 43 48 89.58%
kubeflow/trainer/backends/container/backend_test.py 85 91 93.41%
kubeflow/trainer/backends/container/adapters/podman.py 1 9 11.11%
kubeflow/trainer/backends/container/adapters/docker.py 1 11 9.09%
Totals Coverage Status
Change from base Build 20719761526: 1.3%
Covered Lines: 2747
Relevant Lines: 4036

💛 - Coveralls

… backend

Add support for dataset and model initializers in the container backend
to bring it to feature parity with the Kubernetes backend.

Changes:
- Add utility functions for building initializer commands and environment variables
- Implement _run_initializers() and _run_single_initializer() methods in ContainerBackend
- Run initializers sequentially before training containers start
- Download datasets to /workspace/dataset and models to /workspace/model
- Track initializer containers as separate steps in TrainJob
- Support all initializer types: HuggingFace, S3, and DataCache
- Add comprehensive unit tests for all initializer configurations
- Handle initializer failures with proper cleanup and error messages

Fixes kubeflow#171

Signed-off-by: HKanoje <[email protected]>
- Make initializer image configurable via ContainerBackendConfig
- Make initializer timeout configurable (default 600 seconds)
- Implement wait API in adapters instead of polling
- Clean up successful initializer containers after completion
- Clean up network on initializer failure
- Raise ValueError for unsupported initializer types (no datacache fallback)

All tests passing (173/173). Addresses all feedback from PR kubeflow#188.

Signed-off-by: HKanoje <[email protected]>
@HKanoje HKanoje force-pushed the feat/171-add-initializer-support-container-backend branch from 768a6a9 to 0dbb6b6 Compare January 5, 2026 16:49
@HKanoje
Copy link
Author

HKanoje commented Jan 5, 2026

@kramaranya Done! All commits are now signed.

Add _cleanup_container_resources() helper method to consolidate
duplicated cleanup logic for stopping/removing containers and
deleting networks. Refactor 5 locations across train(), initializer
handlers, and delete_job() to use this helper.

Signed-off-by: HKanoje <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants