Add list / multi resource support

This commit is contained in:
micw523 2019-03-19 00:36:32 -05:00
parent 4d4ce21265
commit c0f71c3ba6
14 changed files with 659 additions and 108 deletions

View File

@ -23,7 +23,8 @@ def main():
# default location.
config.load_kube_config()
k8s_client = client.ApiClient()
k8s_api = utils.create_from_yaml(k8s_client, "nginx-deployment.yaml")
utils.create_from_yaml(k8s_client, "nginx-deployment.yaml")
k8s_api = client.ExtensionsV1beta1Api(k8s_client)
deps = k8s_api.read_namespaced_deployment("nginx-deployment", "default")
print("Deployment {0} created".format(deps.metadata.name))

View File

@ -17,100 +17,303 @@ import unittest
from kubernetes import utils, client
from kubernetes.e2e_test import base
class TestUtils(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.config = base.get_e2e_configuration()
cls.path_prefix = "kubernetes/e2e_test/test_yaml/"
def test_app_yaml(self):
# Tests for creating individual API objects
def test_create_apps_deployment_from_yaml(self):
"""
Should be able to create an apps/v1beta1 deployment.
"""
k8s_client = client.api_client.ApiClient(configuration=self.config)
k8s_api = utils.create_from_yaml(k8s_client,
"kubernetes/e2e_test/test_yaml/apps-deployment.yaml")
self.assertEqual("apps/v1beta1",
k8s_api.get_api_resources().group_version)
dep = k8s_api.read_namespaced_deployment(name="nginx-app",
namespace="default")
utils.create_from_yaml(
k8s_client, self.path_prefix + "apps-deployment.yaml")
app_api = client.AppsV1beta1Api(k8s_client)
dep = app_api.read_namespaced_deployment(name="nginx-app",
namespace="default")
self.assertIsNotNone(dep)
resp = k8s_api.delete_namespaced_deployment(
name="nginx-app", namespace="default",
app_api.delete_namespaced_deployment(
name="nginx-app", namespace="default",
body={})
def test_extension_yaml(self):
def test_create_extensions_deployment_from_yaml(self):
"""
Should be able to create an extensions/v1beta1 deployment.
"""
k8s_client = client.api_client.ApiClient(configuration=self.config)
k8s_api = utils.create_from_yaml(k8s_client,
"kubernetes/e2e_test/test_yaml/extensions-deployment.yaml")
self.assertEqual("extensions/v1beta1",
k8s_api.get_api_resources().group_version)
dep = k8s_api.read_namespaced_deployment(name="nginx-deployment",
namespace="default")
utils.create_from_yaml(
k8s_client, self.path_prefix + "extensions-deployment.yaml")
ext_api = client.ExtensionsV1beta1Api(k8s_client)
dep = ext_api.read_namespaced_deployment(name="nginx-deployment",
namespace="default")
self.assertIsNotNone(dep)
resp = k8s_api.delete_namespaced_deployment(
name="nginx-deployment", namespace="default",
ext_api.delete_namespaced_deployment(
name="nginx-deployment", namespace="default",
body={})
def test_core_pod_yaml(self):
def test_create_pod_from_yaml(self):
"""
Should be able to create a pod.
"""
k8s_client = client.api_client.ApiClient(configuration=self.config)
k8s_api = utils.create_from_yaml(k8s_client,
"kubernetes/e2e_test/test_yaml/core-pod.yaml")
self.assertEqual("v1",
k8s_api.get_api_resources().group_version)
pod = k8s_api.read_namespaced_pod(name="myapp-pod",
namespace="default")
utils.create_from_yaml(
k8s_client, self.path_prefix + "core-pod.yaml")
core_api = client.CoreV1Api(k8s_client)
pod = core_api.read_namespaced_pod(name="myapp-pod",
namespace="default")
self.assertIsNotNone(pod)
resp = k8s_api.delete_namespaced_pod(
core_api.delete_namespaced_pod(
name="myapp-pod", namespace="default",
body={})
def test_core_service_yaml(self):
def test_create_service_from_yaml(self):
"""
Should be able to create a service.
"""
k8s_client = client.api_client.ApiClient(configuration=self.config)
k8s_api = utils.create_from_yaml(k8s_client,
"kubernetes/e2e_test/test_yaml/core-service.yaml")
self.assertEqual("v1",
k8s_api.get_api_resources().group_version)
svc = k8s_api.read_namespaced_service(name="my-service",
namespace="default")
utils.create_from_yaml(
k8s_client, self.path_prefix + "core-service.yaml")
core_api = client.CoreV1Api(k8s_client)
svc = core_api.read_namespaced_service(name="my-service",
namespace="default")
self.assertIsNotNone(svc)
resp = k8s_api.delete_namespaced_service(
core_api.delete_namespaced_service(
name="my-service", namespace="default",
body={})
def test_core_namespace_yaml(self):
k8s_client = client.api_client.ApiClient(configuration=self.config)
k8s_api = utils.create_from_yaml(k8s_client,
"kubernetes/e2e_test/test_yaml/core-namespace.yaml")
self.assertEqual("v1",
k8s_api.get_api_resources().group_version)
nmsp = k8s_api.read_namespace(name="development")
self.assertIsNotNone(nmsp)
resp = k8s_api.delete_namespace(name="development", body={})
def test_deployment_in_namespace(self):
def test_create_namespace_from_yaml(self):
"""
Should be able to create a namespace.
"""
k8s_client = client.api_client.ApiClient(configuration=self.config)
utils.create_from_yaml(
k8s_client, self.path_prefix + "core-namespace.yaml")
core_api = client.CoreV1Api(k8s_client)
nmsp = core_api.read_namespace(name="development")
self.assertIsNotNone(nmsp)
core_api.delete_namespace(name="development", body={})
def test_create_deployment_non_default_namespace_from_yaml(self):
"""
Should be able to create a namespace "dep",
and then create a deployment in the just-created namespace.
"""
k8s_client = client.ApiClient(configuration=self.config)
core_api = utils.create_from_yaml(k8s_client,
"kubernetes/e2e_test/test_yaml/core-namespace-dep.yaml")
self.assertEqual("v1",
core_api.get_api_resources().group_version)
utils.create_from_yaml(
k8s_client, self.path_prefix + "dep-namespace.yaml")
utils.create_from_yaml(
k8s_client, self.path_prefix + "dep-deployment.yaml")
core_api = client.CoreV1Api(k8s_client)
ext_api = client.ExtensionsV1beta1Api(k8s_client)
nmsp = core_api.read_namespace(name="dep")
self.assertIsNotNone(nmsp)
dep_api = utils.create_from_yaml(k8s_client,
"kubernetes/e2e_test/test_yaml/extensions-deployment-dep.yaml")
dep = dep_api.read_namespaced_deployment(name="nginx-deployment",
namespace="dep")
dep = ext_api.read_namespaced_deployment(name="nginx-deployment",
namespace="dep")
self.assertIsNotNone(dep)
resp = dep_api.delete_namespaced_deployment(
name="nginx-deployment", namespace="dep",
ext_api.delete_namespaced_deployment(
name="nginx-deployment", namespace="dep",
body={})
resp = core_api.delete_namespace(name="dep", body={})
def test_api_service(self):
core_api.delete_namespace(name="dep", body={})
def test_create_apiservice_from_yaml_with_conflict(self):
"""
Should be able to create an API service.
Should verify that creating the same API service should
fail due to conflict.
"""
k8s_client = client.api_client.ApiClient(configuration=self.config)
k8s_api = utils.create_from_yaml(k8s_client,
"kubernetes/e2e_test/test_yaml/api-service.yaml")
self.assertEqual("apiregistration.k8s.io/v1beta1",
k8s_api.get_api_resources().group_version)
svc = k8s_api.read_api_service(
utils.create_from_yaml(
k8s_client, self.path_prefix + "api-service.yaml")
reg_api = client.ApiregistrationV1beta1Api(k8s_client)
svc = reg_api.read_api_service(
name="v1alpha1.wardle.k8s.io")
self.assertIsNotNone(svc)
resp = k8s_api.delete_api_service(
name="v1alpha1.wardle.k8s.io", body={})
with self.assertRaises(utils.FailToCreateError) as cm:
utils.create_from_yaml(
k8s_client, "kubernetes/e2e_test/test_yaml/api-service.yaml")
exp_error = ('Error from server (Conflict): '
'{"kind":"Status","apiVersion":"v1","metadata":{},'
'"status":"Failure",'
'"message":"apiservices.apiregistration.k8s.io '
'\\"v1alpha1.wardle.k8s.io\\" already exists",'
'"reason":"AlreadyExists",'
'"details":{"name":"v1alpha1.wardle.k8s.io",'
'"group":"apiregistration.k8s.io","kind":"apiservices"},'
'"code":409}\n'
)
self.assertEqual(exp_error, str(cm.exception))
reg_api.delete_api_service(
name="v1alpha1.wardle.k8s.io", body={})
# Tests for creating API objects from lists
def test_create_general_list_from_yaml(self):
"""
Should be able to create a service and a deployment
from a kind: List yaml file
"""
k8s_client = client.api_client.ApiClient(configuration=self.config)
utils.create_from_yaml(
k8s_client, self.path_prefix + "list.yaml")
core_api = client.CoreV1Api(k8s_client)
ext_api = client.ExtensionsV1beta1Api(k8s_client)
svc = core_api.read_namespaced_service(name="list-service-test",
namespace="default")
self.assertIsNotNone(svc)
dep = ext_api.read_namespaced_deployment(name="list-deployment-test",
namespace="default")
self.assertIsNotNone(dep)
core_api.delete_namespaced_service(name="list-service-test",
namespace="default", body={})
ext_api.delete_namespaced_deployment(name="list-deployment-test",
namespace="default", body={})
def test_create_namespace_list_from_yaml(self):
"""
Should be able to create two namespaces
from a kind: NamespaceList yaml file
"""
k8s_client = client.api_client.ApiClient(configuration=self.config)
utils.create_from_yaml(
k8s_client, self.path_prefix + "namespace-list.yaml")
core_api = client.CoreV1Api(k8s_client)
nmsp_1 = core_api.read_namespace(name="mock-1")
self.assertIsNotNone(nmsp_1)
nmsp_2 = core_api.read_namespace(name="mock-2")
self.assertIsNotNone(nmsp_2)
core_api.delete_namespace(name="mock-1", body={})
core_api.delete_namespace(name="mock-2", body={})
def test_create_implicit_service_list_from_yaml_with_conflict(self):
"""
Should be able to create two services from a kind: ServiceList
json file that implicitly indicates the kind of individual objects
"""
k8s_client = client.api_client.ApiClient(configuration=self.config)
with self.assertRaises(utils.FailToCreateError):
utils.create_from_yaml(
k8s_client, self.path_prefix + "implicit-svclist.json")
core_api = client.CoreV1Api(k8s_client)
svc_3 = core_api.read_namespaced_service(name="mock-3",
namespace="default")
self.assertIsNotNone(svc_3)
svc_4 = core_api.read_namespaced_service(name="mock-4",
namespace="default")
self.assertIsNotNone(svc_4)
core_api.delete_namespaced_service(name="mock-3",
namespace="default", body={})
core_api.delete_namespaced_service(name="mock-4",
namespace="default", body={})
# Tests for multi-resource yaml objects
def test_create_from_multi_resource_yaml(self):
"""
Should be able to create a service and a replication controller
from a multi-resource yaml file
"""
k8s_client = client.api_client.ApiClient(configuration=self.config)
utils.create_from_yaml(
k8s_client, self.path_prefix + "multi-resource.yaml")
core_api = client.CoreV1Api(k8s_client)
svc = core_api.read_namespaced_service(name="mock",
namespace="default")
self.assertIsNotNone(svc)
ctr = core_api.read_namespaced_replication_controller(
name="mock", namespace="default")
self.assertIsNotNone(ctr)
core_api.delete_namespaced_replication_controller(
name="mock", namespace="default", body={})
core_api.delete_namespaced_service(name="mock",
namespace="default", body={})
def test_create_from_list_in_multi_resource_yaml(self):
"""
Should be able to create the items in the PodList and a deployment
specified in the multi-resource file
"""
k8s_client = client.api_client.ApiClient(configuration=self.config)
utils.create_from_yaml(
k8s_client, self.path_prefix + "multi-resource-with-list.yaml")
core_api = client.CoreV1Api(k8s_client)
app_api = client.AppsV1beta1Api(k8s_client)
pod_0 = core_api.read_namespaced_pod(
name="mock-pod-0", namespace="default")
self.assertIsNotNone(pod_0)
pod_1 = core_api.read_namespaced_pod(
name="mock-pod-1", namespace="default")
self.assertIsNotNone(pod_1)
dep = app_api.read_namespaced_deployment(
name="mock", namespace="default")
self.assertIsNotNone(dep)
core_api.delete_namespaced_pod(
name="mock-pod-0", namespace="default", body={})
core_api.delete_namespaced_pod(
name="mock-pod-1", namespace="default", body={})
app_api.delete_namespaced_deployment(
name="mock", namespace="default", body={})
def test_create_from_multi_resource_yaml_with_conflict(self):
"""
Should be able to create a service from the first yaml file.
Should fail to create the same service from the second yaml file
and create a replication controller.
Should raise an exception for failure to create the same service twice.
"""
k8s_client = client.api_client.ApiClient(configuration=self.config)
utils.create_from_yaml(
k8s_client, self.path_prefix + "yaml-conflict-first.yaml")
core_api = client.CoreV1Api(k8s_client)
svc = core_api.read_namespaced_service(name="mock-2",
namespace="default")
self.assertIsNotNone(svc)
with self.assertRaises(utils.FailToCreateError) as cm:
utils.create_from_yaml(
k8s_client, self.path_prefix + "yaml-conflict-multi.yaml")
exp_error = ('Error from server (Conflict): {"kind":"Status",'
'"apiVersion":"v1","metadata":{},"status":"Failure",'
'"message":"services \\"mock-2\\" already exists",'
'"reason":"AlreadyExists","details":{"name":"mock-2",'
'"kind":"services"},"code":409}\n'
)
self.assertEqual(exp_error, str(cm.exception))
ctr = core_api.read_namespaced_replication_controller(
name="mock-2", namespace="default")
self.assertIsNotNone(ctr)
core_api.delete_namespaced_replication_controller(
name="mock-2", namespace="default", body={})
core_api.delete_namespaced_service(name="mock-2",
namespace="default", body={})
def test_create_from_multi_resource_yaml_with_multi_conflicts(self):
"""
Should create an extensions/v1beta1 deployment
and fail to create the same deployment twice.
Should raise an exception that contains two error messages.
"""
k8s_client = client.api_client.ApiClient(configuration=self.config)
with self.assertRaises(utils.FailToCreateError) as cm:
utils.create_from_yaml(
k8s_client, self.path_prefix + "triple-nginx.yaml")
exp_error = ('Error from server (Conflict): {"kind":"Status",'
'"apiVersion":"v1","metadata":{},"status":"Failure",'
'"message":"deployments.extensions \\"triple-nginx\\" '
'already exists","reason":"AlreadyExists",'
'"details":{"name":"triple-nginx","group":"extensions",'
'"kind":"deployments"},"code":409}\n'
)
exp_error += exp_error
self.assertEqual(exp_error, str(cm.exception))
ext_api = client.ExtensionsV1beta1Api(k8s_client)
dep = ext_api.read_namespaced_deployment(name="triple-nginx",
namespace="default")
self.assertIsNotNone(dep)
ext_api.delete_namespaced_deployment(
name="triple-nginx", namespace="default",
body={})

View File

@ -0,0 +1,60 @@
{
"kind":"ServiceList",
"apiVersion":"v1",
"items":[
{
"metadata":{
"name":"mock-3",
"labels":{
"app":"mock-3"
}
},
"spec":{
"ports": [{
"protocol": "TCP",
"port": 99,
"targetPort": 9949
}],
"selector":{
"app":"mock-3"
}
}
},
{
"metadata":{
"name":"mock-3",
"labels":{
"app":"mock-3"
}
},
"spec":{
"ports": [{
"protocol": "TCP",
"port": 99,
"targetPort": 9949
}],
"selector":{
"app":"mock-3"
}
}
},
{
"metadata":{
"name":"mock-4",
"labels":{
"app":"mock-4"
}
},
"spec":{
"ports": [{
"protocol": "TCP",
"port": 99,
"targetPort": 9949
}],
"selector":{
"app":"mock-4"
}
}
}
]
}

View File

@ -0,0 +1,29 @@
apiVersion: v1
kind: List
items:
- apiVersion: v1
kind: Service
metadata:
name: list-service-test
spec:
ports:
- protocol: TCP
port: 80
selector:
app: list-deployment-test
- apiVersion: extensions/v1beta1
kind: Deployment
metadata:
name: list-deployment-test
labels:
app: list-deployment-test
spec:
replicas: 1
template:
metadata:
labels:
app: list-deployment-test
spec:
containers:
- name: nginx
image: nginx

View File

@ -0,0 +1,47 @@
apiVersion: v1
kind: PodList
items:
- apiVersion: v1
kind: Pod
metadata:
name: mock-pod-0
labels:
app: mock-pod-0
spec:
containers:
- name: mock-pod-0
image: busybox
command: ['sh', '-c', 'echo Hello Kubernetes! && sleep 3600']
- apiVersion: v1
kind: Pod
metadata:
name: mock-pod-1
labels:
app: mock-pod-1
spec:
containers:
- name: mock-pod-1
image: busybox
command: ['sh', '-c', 'echo Hello Kubernetes! && sleep 3600']
---
apiVersion: apps/v1beta1
kind: Deployment
metadata:
name: mock
labels:
app: mock
spec:
replicas: 3
selector:
matchLabels:
app: mock
template:
metadata:
labels:
app: mock
spec:
containers:
- name: mock
image: nginx:1.15.4
ports:
- containerPort: 80

View File

@ -0,0 +1,33 @@
apiVersion: v1
kind: Service
metadata:
name: mock
labels:
app: mock
spec:
ports:
- port: 99
protocol: TCP
targetPort: 9949
selector:
app: mock
---
apiVersion: v1
kind: ReplicationController
metadata:
name: mock
spec:
replicas: 1
selector:
app: mock
template:
metadata:
labels:
app: mock
spec:
containers:
- name: mock-container
image: k8s.gcr.io/pause:2.0
ports:
- containerPort: 9949
protocol: TCP

View File

@ -0,0 +1,15 @@
apiVersion: v1
kind: NamespaceList
items:
- apiVersion: v1
kind: Namespace
metadata:
name: mock-1
labels:
name: mock-1
- apiVersion: v1
kind: Namespace
metadata:
name: mock-2
labels:
name: mock-2

View File

@ -0,0 +1,50 @@
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
name: triple-nginx
spec:
replicas: 3
template:
metadata:
labels:
app: nginx
spec:
containers:
- name: nginx
image: nginx:1.7.9
ports:
- containerPort: 80
---
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
name: triple-nginx
spec:
replicas: 3
template:
metadata:
labels:
app: nginx
spec:
containers:
- name: nginx
image: nginx:1.7.9
ports:
- containerPort: 80
---
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
name: triple-nginx
spec:
replicas: 3
template:
metadata:
labels:
app: nginx
spec:
containers:
- name: nginx
image: nginx:1.7.9
ports:
- containerPort: 80

View File

@ -0,0 +1,13 @@
apiVersion: v1
kind: Service
metadata:
name: mock-2
labels:
app: mock-2
spec:
ports:
- port: 99
protocol: TCP
targetPort: 9949
selector:
app: mock-2

View File

@ -0,0 +1,33 @@
apiVersion: v1
kind: Service
metadata:
name: mock-2
labels:
app: mock-2
spec:
ports:
- port: 99
protocol: TCP
targetPort: 9949
selector:
app: mock-2
---
apiVersion: v1
kind: ReplicationController
metadata:
name: mock-2
spec:
replicas: 1
selector:
app: mock-2
template:
metadata:
labels:
app: mock-2
spec:
containers:
- name: mock-container
image: k8s.gcr.io/pause:2.0
ports:
- containerPort: 9949
protocol: TCP

View File

@ -12,4 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from .create_from_yaml import create_from_yaml
from __future__ import absolute_import
from .create_from_yaml import FailToCreateError, create_from_yaml

View File

@ -14,60 +14,125 @@
import re
import sys
from os import path
import yaml
from six import iteritems
from kubernetes import client
def create_from_yaml(k8s_client, yaml_file, verbose=False, **kwargs):
def create_from_yaml(
k8s_client,
yaml_file,
verbose=False,
**kwargs):
"""
Perform an action from a yaml file. Pass True for verbose to
print confirmation information.
Input:
yaml_file: string. Contains the path to yaml file.
k8s_cline: an ApiClient object, initialized with the client args.
k8s_client: an ApiClient object, initialized with the client args.
verbose: If True, print confirmation from the create action.
Default is False.
Available parameters for performing the subsequent action:
Returns:
An k8s api object or list of apis objects created from YAML.
When a single object is generated, return type is dependent
on output_list.
Throws a FailToCreateError exception if creation of any object
fails with helpful messages from the server.
Available parameters for creating <kind>:
:param async_req bool
:param bool include_uninitialized: If true, partially initialized resources are included in the response.
:param bool include_uninitialized: If true, partially initialized
resources are included in the response.
:param str pretty: If 'true', then the output is pretty printed.
:param str dry_run: When present, indicates that modifications should not be persisted. An invalid or unrecognized dryRun directive will result in an error response and no further processing of the request. Valid values are: - All: all dry run stages will be processed
:param str dry_run: When present, indicates that modifications
should not be persisted. An invalid or unrecognized dryRun
directive will result in an error response and no further
processing of the request.
Valid values are: - All: all dry run stages will be processed
"""
with open(path.abspath(yaml_file)) as f:
yml_object = yaml.load(f)
# TODO: case of yaml file containing multiple objects
group, _, version = yml_object["apiVersion"].partition("/")
if version == "":
version = group
group = "core"
# Take care for the case e.g. api_type is "apiextensions.k8s.io"
# Only replace the last instance
group = "".join(group.rsplit(".k8s.io", 1))
fcn_to_call = "{0}{1}Api".format(group.capitalize(),
version.capitalize())
k8s_api = getattr(client, fcn_to_call)(k8s_client)
# Replace CamelCased action_type into snake_case
kind = yml_object["kind"]
kind = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', kind)
kind = re.sub('([a-z0-9])([A-Z])', r'\1_\2', kind).lower()
# Decide which namespace we are going to put the object in,
# if any
if "namespace" in yml_object["metadata"]:
namespace = yml_object["metadata"]["namespace"]
else:
namespace = "default"
# Expect the user to create namespaced objects more often
if hasattr(k8s_api, "create_namespaced_{0}".format(kind)):
resp = getattr(k8s_api, "create_namespaced_{0}".format(kind))(
body=yml_object, namespace=namespace, **kwargs)
else:
resp = getattr(k8s_api, "create_{0}".format(kind))(
body=yml_object, **kwargs)
if verbose:
print("{0} created. status='{1}'".format(kind, str(resp.status)))
return k8s_api
yml_document_all = yaml.safe_load_all(f)
api_exceptions = []
# Load all documents from a single YAML file
for yml_document in yml_document_all:
# If it is a list type, will need to iterate its items
if "List" in yml_document["kind"]:
# Could be "List" or "Pod/Service/...List"
# This is a list type. iterate within its items
kind = yml_document["kind"].replace("List", "")
for yml_object in yml_document["items"]:
# Mitigate cases when server returns a xxxList object
# See kubernetes-client/python#586
if kind is not "":
yml_object["apiVersion"] = yml_document["apiVersion"]
yml_object["kind"] = kind
try:
create_from_yaml_single_item(
k8s_client, yml_object, verbose, **kwargs)
except client.rest.ApiException as api_exception:
api_exceptions.append(api_exception)
else:
# This is a single object. Call the single item method
try:
create_from_yaml_single_item(
k8s_client, yml_document, verbose, **kwargs)
except client.rest.ApiException as api_exception:
api_exceptions.append(api_exception)
# In case we have exceptions waiting for us, raise them
if api_exceptions:
raise FailToCreateError(api_exceptions)
def create_from_yaml_single_item(
k8s_client, yml_object, verbose=False, **kwargs):
group, _, version = yml_object["apiVersion"].partition("/")
if version == "":
version = group
group = "core"
# Take care for the case e.g. api_type is "apiextensions.k8s.io"
# Only replace the last instance
group = "".join(group.rsplit(".k8s.io", 1))
fcn_to_call = "{0}{1}Api".format(group.capitalize(),
version.capitalize())
k8s_api = getattr(client, fcn_to_call)(k8s_client)
# Replace CamelCased action_type into snake_case
kind = yml_object["kind"]
kind = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', kind)
kind = re.sub('([a-z0-9])([A-Z])', r'\1_\2', kind).lower()
# Decide which namespace we are going to put the object in,
# if any
if "namespace" in yml_object["metadata"]:
namespace = yml_object["metadata"]["namespace"]
else:
namespace = "default"
# Expect the user to create namespaced objects more often
if hasattr(k8s_api, "create_namespaced_{0}".format(kind)):
resp = getattr(k8s_api, "create_namespaced_{0}".format(kind))(
body=yml_object, namespace=namespace, **kwargs)
else:
resp = getattr(k8s_api, "create_{0}".format(kind))(
body=yml_object, **kwargs)
if verbose:
print("{0} created. status='{1}'".format(kind, str(resp.status)))
class FailToCreateError(Exception):
"""
An exception class for handling error if an error occurred when
handling a yaml file.
"""
def __init__(self, api_exceptions):
self.api_exceptions = api_exceptions
def __str__(self):
msg = ""
for api_exception in self.api_exceptions:
msg += "Error from server ({0}): {1}".format(
api_exception.reason, api_exception.body)
return msg