Job Class¶
The Job
class manages Kubernetes Jobs for batch processing workloads that run to completion.
Overview¶
from celestra import Job
# Basic usage
job = Job("data-migration").image("migrator:latest").command(["python", "migrate.py"])
Functions¶
image(image: str) -> Job¶
Set the container image for the job.
# Basic image
job = Job("migration").image("migrator:latest")
# Specific version
job = Job("backup").image("backup-tool:v1.2.0")
# Custom registry
job = Job("processor").image("gcr.io/myproject/processor:latest")
command(command: List[str]) -> Job¶
Set the command to run in the container.
# Simple command
job = Job("migration").command(["python", "migrate.py"])
# Complex command
job = Job("backup").command(["bash", "-c", "pg_dump | gzip > backup.sql.gz"])
# Shell command
job = Job("cleanup").command(["sh", "-c", "rm -rf /tmp/* && echo 'Cleanup complete'"])
args(args: List[str]) -> Job¶
Set the arguments for the command.
# Command with arguments
job = Job("migration").command(["python"]).args(["migrate.py", "--env", "production"])
# Multiple arguments
job = Job("backup").command(["pg_dump"]).args(["--host", "postgres", "--port", "5432", "myapp"])
environment(env_vars: Dict[str, str]) -> Job¶
Set multiple environment variables at once.
# Bulk environment variables
env_config = {
"DATABASE_URL": "postgres://localhost:5432/myapp",
"BACKUP_PATH": "/backups",
"LOG_LEVEL": "info"
}
job = Job("backup").environment(env_config)
env(key: str, value: str) -> Job¶
Add a single environment variable.
# Single environment variable
job = Job("migration").env("DATABASE_URL", "postgres://localhost:5432/myapp")
# Multiple individual variables
job = (Job("backup")
.env("BACKUP_PATH", "/backups")
.env("RETENTION_DAYS", "30")
.env("COMPRESSION", "gzip"))
resources(cpu: str = None, memory: str = None, cpu_limit: str = None, memory_limit: str = None, gpu: int = None) -> Job¶
Set resource requests and limits for the job.
# Basic resources
job = Job("migration").resources(cpu="500m", memory="1Gi")
# With limits
job = (Job("backup")
.resources(
cpu="1000m",
memory="2Gi",
cpu_limit="2000m",
memory_limit="4Gi"
))
# GPU-enabled job
job = Job("ml-training").resources(cpu="4", memory="16Gi", gpu=2)
parallelism(count: int) -> Job¶
Set the number of parallel pods to run.
# Single pod
job = Job("migration").parallelism(1)
# Parallel processing
job = Job("data-processing").parallelism(5)
# High parallelism
job = Job("batch-processing").parallelism(20)
completions(count: int) -> Job¶
Set the number of successful completions required.
# Single completion
job = Job("migration").completions(1)
# Multiple completions
job = Job("data-processing").completions(10)
# All pods must complete
job = Job("batch-job").completions(100)
retry_limit(limit: int) -> Job¶
Set the number of retries for failed pods.
# No retries
job = Job("critical-job").retry_limit(0)
# Default retries
job = Job("migration").retry_limit(6)
# Many retries
job = Job("unreliable-job").retry_limit(10)
timeout(timeout: Union[str, int]) -> Job¶
Set the timeout for the job.
# Timeout in seconds
job = Job("migration").timeout(3600)
# Timeout as string
job = Job("backup").timeout("2h")
# Short timeout
job = Job("quick-job").timeout("5m")
# Long timeout
job = Job("long-job").timeout("24h")
restart_policy(policy: str) -> Job¶
Set the restart policy for the job.
# On failure (default)
job = Job("migration").restart_policy("OnFailure")
# Never restart
job = Job("critical-job").restart_policy("Never")
# Always restart
job = Job("monitoring-job").restart_policy("Always")
port(port: int, name: str = "http", protocol: str = "TCP") -> Job¶
Add a port to the job.
# Basic port
job = Job("web-job").port(8080, "http")
# Multiple ports
job = (Job("api-job")
.port(8080, "http")
.port(8443, "https")
.port(9090, "metrics"))
add_port(port: int, name: str = "http", protocol: str = "TCP") -> Job¶
Add a port to the job (alias for port()).
# Add multiple ports
job = (Job("multi-port-job")
.add_port(8080, "http")
.add_port(8443, "https")
.add_port(9090, "metrics"))
ports(ports: List[Dict[str, Any]]) -> Job¶
Add multiple ports at once.
# Bulk port configuration
ports_config = [
{"port": 8080, "name": "http"},
{"port": 8443, "name": "https"},
{"port": 9090, "name": "metrics"}
]
job = Job("api-job").ports(ports_config)
Convenience Port Methods¶
Metrics Port¶
Web Port¶
Status Port¶
add_secret(secret: Secret) -> Job¶
Add a secret to the job.
# Add database secret
db_secret = Secret("db-secret").add("password", "secret123")
job = Job("migration").add_secret(db_secret)
# Add API key secret
api_secret = Secret("api-keys").add("stripe_key", "sk_live_...")
job = Job("payment-job").add_secret(api_secret)
add_config(config_map: ConfigMap) -> Job¶
Add a ConfigMap to the job.
# Add configuration
app_config = ConfigMap("app-config").add("debug", "true")
job = Job("migration").add_config(app_config)
# Add configuration file
config_map = ConfigMap("migration-config").from_file("migrate.conf", "configs/migrate.conf")
job = Job("migration").add_config(config_map)
add_volume(name: str, volume_spec: Dict[str, Any]) -> Job¶
Add a volume to the job.
# Add persistent volume
job = Job("backup").add_volume("backup-storage", {
"persistentVolumeClaim": {
"claimName": "backup-pvc"
}
})
# Add empty directory
job = Job("temp-job").add_volume("temp-dir", {
"emptyDir": {}
})
mount_volume(volume_name: str, mount_path: str, read_only: bool = False) -> Job¶
Mount a volume in the job.
# Mount volume
job = Job("backup").mount_volume("backup-storage", "/backups")
# Read-only mount
job = Job("read-job").mount_volume("config-volume", "/config", read_only=True)
security_context(context: Dict[str, Any]) -> Job¶
Set security context for the job.
# Non-root user
job = Job("secure-job").security_context({
"runAsNonRoot": True,
"runAsUser": 1000,
"fsGroup": 1000
})
node_selector(selector: Dict[str, str]) -> Job¶
Set node selectors for pod placement.
# GPU nodes
job = Job("ml-job").node_selector({"accelerator": "gpu"})
# High-memory nodes
job = Job("memory-job").node_selector({"node-type": "high-memory"})
toleration(key: str, operator: str = "Equal", value: str = "", effect: str = "NoSchedule") -> Job¶
Add a toleration for node taints.
# Tolerate GPU taints
job = Job("ml-job").toleration("nvidia.com/gpu", "Equal", "present", "NoSchedule")
# Tolerate spot instances
job = Job("batch-job").toleration("spot", "Equal", "true", "NoSchedule")
affinity(affinity_spec: Dict[str, Any]) -> Job¶
Set pod affinity rules.
# Prefer nodes with SSD
job = Job("io-job").affinity({
"nodeAffinity": {
"preferredDuringSchedulingIgnoredDuringExecution": [{
"weight": 100,
"preference": {
"matchExpressions": [{
"key": "disk-type",
"operator": "In",
"values": ["ssd"]
}]
}
}]
}
})
Complete Example¶
#!/usr/bin/env python3
"""
Complete Job Example - Production Data Migration
"""
import os
from celestra import Job, Secret, ConfigMap, KubernetesOutput
def load_config(config_path: str) -> str:
"""Load configuration from external file."""
with open(f"configs/{config_path}", "r") as f:
return f.read()
def create_production_jobs():
"""Create production-ready jobs."""
# Load external configurations
migration_config = load_config("jobs/migration.conf")
# Database migration job
migration_job = (Job("database-migration")
.image("migrator:v2.1.0")
.command(["python", "migrate.py"])
.args(["--env", "production", "--dry-run", "false"])
.env("DATABASE_URL", "postgres://postgres-service:5432/myapp")
.env("MIGRATION_PATH", "/migrations")
.env("LOG_LEVEL", "info")
.resources(cpu="1000m", memory="2Gi", cpu_limit="2000m", memory_limit="4Gi")
.parallelism(1)
.completions(1)
.retry_limit(3)
.timeout("2h")
.restart_policy("OnFailure")
.add_secret(Secret("db-secret").add("password", "secure-password"))
.add_config(ConfigMap("migration-config").add_data("migration.conf", migration_config))
.security_context({
"runAsNonRoot": True,
"runAsUser": 1000,
"fsGroup": 1000
})
.node_selector({"node-type": "database"})
.toleration("database", "Equal", "true", "NoSchedule"))
# Data backup job
backup_job = (Job("data-backup")
.image("backup-tool:v1.0.0")
.command(["bash", "-c", "pg_dump $DATABASE_URL | gzip > /backups/backup-$(date +%Y%m%d).sql.gz"])
.env("DATABASE_URL", "postgres://postgres-service:5432/myapp")
.env("BACKUP_RETENTION", "30")
.resources(cpu="500m", memory="1Gi")
.parallelism(1)
.completions(1)
.retry_limit(5)
.timeout("1h")
.add_secret(Secret("backup-secret").add("password", "backup-password"))
.add_volume("backup-storage", {
"persistentVolumeClaim": {
"claimName": "backup-pvc"
}
})
.mount_volume("backup-storage", "/backups"))
# Batch processing job
batch_job = (Job("batch-processing")
.image("processor:v3.0.0")
.command(["python", "process.py"])
.args(["--input", "/data/input", "--output", "/data/output"])
.env("BATCH_SIZE", "1000")
.env("WORKERS", "4")
.resources(cpu="2000m", memory="4Gi", cpu_limit="4000m", memory_limit="8Gi")
.parallelism(5)
.completions(10)
.retry_limit(3)
.timeout("6h")
.add_volume("input-data", {
"persistentVolumeClaim": {
"claimName": "input-pvc"
}
})
.add_volume("output-data", {
"persistentVolumeClaim": {
"claimName": "output-pvc"
}
})
.mount_volume("input-data", "/data/input", read_only=True)
.mount_volume("output-data", "/data/output"))
# Monitoring job
monitoring_job = (Job("health-check")
.image("monitor:v1.0.0")
.command(["python", "health_check.py"])
.env("CHECK_INTERVAL", "30")
.env("TIMEOUT", "10")
.resources(cpu="100m", memory="256Mi")
.parallelism(1)
.completions(1)
.retry_limit(2)
.timeout("5m")
.restart_policy("Never")
.metrics_port(9090)
.web_port(8080))
return [migration_job, backup_job, batch_job, monitoring_job]
if __name__ == "__main__":
jobs = create_production_jobs()
# Generate Kubernetes resources
output = KubernetesOutput()
for job in jobs:
output.generate(job, "production-jobs/")
print("✅ Production jobs generated!")
print("🚀 Deploy: kubectl apply -f production-jobs/")
Generated Kubernetes Resources¶
The Job class generates the following Kubernetes resources:
- Job - Kubernetes Job with the specified configuration
- Secret - Secrets (if configured)
- ConfigMap - ConfigMaps (if configured)
- Volume - Volumes (if configured)
Usage Patterns¶
Database Migration Jobs¶
# Database migration
migration_job = (Job("db-migration")
.image("migrator:latest")
.command(["python", "migrate.py"])
.env("DATABASE_URL", "postgres://localhost:5432/myapp")
.resources(cpu="1000m", memory="2Gi")
.timeout("1h")
.retry_limit(3))
Backup Jobs¶
# Database backup
backup_job = (Job("db-backup")
.image("backup-tool:latest")
.command(["pg_dump", "myapp", "|", "gzip", ">", "/backups/backup.sql.gz"])
.env("DATABASE_URL", "postgres://localhost:5432/myapp")
.resources(cpu="500m", memory="1Gi")
.timeout("30m")
.add_volume("backup-storage", {"persistentVolumeClaim": {"claimName": "backup-pvc"}})
.mount_volume("backup-storage", "/backups"))
Batch Processing Jobs¶
# Batch data processing
batch_job = (Job("data-processing")
.image("processor:latest")
.command(["python", "process.py"])
.env("BATCH_SIZE", "1000")
.resources(cpu="2000m", memory="4Gi")
.parallelism(5)
.completions(10)
.timeout("6h"))
Monitoring Jobs¶
# Health check job
health_job = (Job("health-check")
.image("monitor:latest")
.command(["python", "health_check.py"])
.env("CHECK_INTERVAL", "30")
.resources(cpu="100m", memory="256Mi")
.timeout("5m")
.restart_policy("Never"))
ML Training Jobs¶
# Machine learning training
ml_job = (Job("ml-training")
.image("ml-trainer:latest")
.command(["python", "train.py"])
.env("MODEL_TYPE", "transformer")
.env("EPOCHS", "100")
.resources(cpu="4000m", memory="16Gi", gpu=2)
.timeout("24h")
.node_selector({"accelerator": "gpu"})
.toleration("nvidia.com/gpu", "Equal", "present", "NoSchedule"))
Best Practices¶
1. Set Appropriate Timeouts¶
# ✅ Good: Set reasonable timeouts
job = Job("migration").timeout("2h")
# ❌ Bad: No timeout (may run forever)
job = Job("migration") # No timeout
2. Use Resource Limits¶
# ✅ Good: Set resource limits
job = Job("processing").resources(cpu="1000m", memory="2Gi", cpu_limit="2000m", memory_limit="4Gi")
# ❌ Bad: No resource limits
job = Job("processing") # No resource limits
3. Set Retry Limits¶
# ✅ Good: Set retry limits
job = Job("migration").retry_limit(3)
# ❌ Bad: Too many retries
job = Job("migration").retry_limit(100) # May cause infinite loops
4. Use Appropriate Restart Policies¶
# ✅ Good: Use appropriate restart policy
job = Job("migration").restart_policy("OnFailure")
# ❌ Bad: Always restart for batch jobs
job = Job("migration").restart_policy("Always") # Not suitable for batch jobs