# Copyright 2016 The Kubernetes Authors. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """ Creates, updates, and deletes a job object. """ from os import path from time import sleep import yaml from kubernetes import client, config JOB_NAME = "pi" def create_job_object(): # Configureate Pod template container container = client.V1Container( name="pi", image="perl", command=["perl", "-Mbignum=bpi", "-wle", "print bpi(2000)"]) # Create and configure a spec section template = client.V1PodTemplateSpec( metadata=client.V1ObjectMeta(labels={"app": "pi"}), spec=client.V1PodSpec(restart_policy="Never", containers=[container])) # Create the specification of deployment spec = client.V1JobSpec( template=template, backoff_limit=4) # Instantiate the job object job = client.V1Job( api_version="batch/v1", kind="Job", metadata=client.V1ObjectMeta(name=JOB_NAME), spec=spec) return job def create_job(api_instance, job): api_response = api_instance.create_namespaced_job( body=job, namespace="default") print("Job created. status='%s'" % str(api_response.status)) get_job_status(api_instance) def get_job_status(api_instance): job_completed = False while not job_completed: api_response = api_instance.read_namespaced_job_status( name=JOB_NAME, namespace="default") if api_response.status.succeeded is not None or \ api_response.status.failed is not None: job_completed = True sleep(1) print("Job status='%s'" % str(api_response.status)) def update_job(api_instance, job): # Update container image job.spec.template.spec.containers[0].image = "perl" api_response = api_instance.patch_namespaced_job( name=JOB_NAME, namespace="default", body=job) print("Job updated. status='%s'" % str(api_response.status)) def delete_job(api_instance): api_response = api_instance.delete_namespaced_job( name=JOB_NAME, namespace="default", body=client.V1DeleteOptions( propagation_policy='Foreground', grace_period_seconds=5)) print("Job deleted. status='%s'" % str(api_response.status)) def main(): # Configs can be set in Configuration class directly or using helper # utility. If no argument provided, the config will be loaded from # default location. config.load_kube_config() batch_v1 = client.BatchV1Api() # Create a job object with client-python API. The job we # created is same as the `pi-job.yaml` in the /examples folder. job = create_job_object() create_job(batch_v1, job) update_job(batch_v1, job) delete_job(batch_v1) if __name__ == '__main__': main()