Merge pull request #50 from kubernetes-incubator/o6

Kube config loader improvement
This commit is contained in:
mbohlool 2016-12-01 16:11:30 -08:00 committed by GitHub
commit 5677578313
7 changed files with 826 additions and 163 deletions

62
examples/example4.py Normal file
View File

@ -0,0 +1,62 @@
# Copyright 2016 The Kubernetes Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from kubernetes import client, config
from kubernetes.client import configuration
def main():
config_file = os.path.join(os.path.expanduser('~'), '.kube', 'config')
contexts, active_context = config.list_kube_config_contexts(config_file)
if not contexts:
print("Cannot find any context in kube-config file.")
return
contexts = [context['name'] for context in contexts]
active_context = active_context['name']
for i, context in enumerate(contexts):
format_str = "%d. %s"
if context == active_context:
format_str = "* " + format_str
print(format_str % (i, context))
context = input("Enter context number: ")
context = int(context)
if context not in range(len(contexts)):
print(
"Number out of range. Using default context %s." %
active_context)
context_name = active_context
else:
context_name = contexts[context]
# Configs can be set in Configuration class directly or using helper
# utility
config.load_kube_config(config_file, context_name)
print("Active host is %s" % configuration.host)
v1 = client.CoreV1Api()
print("Listing pods with their IPs:")
ret = v1.list_pod_for_all_namespaces(watch=False)
for item in ret.items:
print(
"%s\t%s\t%s" %
(item.status.pod_ip,
item.metadata.namespace,
item.metadata.name))
if __name__ == '__main__':
main()

View File

@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from .kube_config import load_kube_config
from .config_exception import ConfigException
from .incluster_config import load_incluster_config
from .incluster_config import ConfigException
from .kube_config import list_kube_config_contexts
from .kube_config import load_kube_config

View File

@ -0,0 +1,17 @@
# Copyright 2016 The Kubernetes Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
class ConfigException(Exception):
pass

View File

@ -16,10 +16,12 @@ import os
from kubernetes.client import configuration
_SERVICE_HOST_ENV_NAME = "KUBERNETES_SERVICE_HOST"
_SERVICE_PORT_ENV_NAME = "KUBERNETES_SERVICE_PORT"
_SERVICE_TOKEN_FILENAME = "/var/run/secrets/kubernetes.io/serviceaccount/token"
_SERVICE_CERT_FILENAME = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt"
from .config_exception import ConfigException
SERVICE_HOST_ENV_NAME = "KUBERNETES_SERVICE_HOST"
SERVICE_PORT_ENV_NAME = "KUBERNETES_SERVICE_PORT"
SERVICE_TOKEN_FILENAME = "/var/run/secrets/kubernetes.io/serviceaccount/token"
SERVICE_CERT_FILENAME = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt"
def _join_host_port(host, port):
@ -31,16 +33,10 @@ def _join_host_port(host, port):
return template % (host, port)
class ConfigException(Exception):
pass
class InClusterConfigLoader(object):
def __init__(self, host_env_name, port_env_name, token_filename,
def __init__(self, token_filename,
cert_filename, environ=os.environ):
self._host_env_name = host_env_name
self._port_env_name = port_env_name
self._token_filename = token_filename
self._cert_filename = cert_filename
self._environ = environ
@ -50,24 +46,34 @@ class InClusterConfigLoader(object):
self._set_config()
def _load_config(self):
if (self._host_env_name not in self._environ or
self._port_env_name not in self._environ):
if (SERVICE_HOST_ENV_NAME not in self._environ or
SERVICE_PORT_ENV_NAME not in self._environ):
raise ConfigException("Service host/port is not set.")
if (not self._environ[SERVICE_HOST_ENV_NAME] or
not self._environ[SERVICE_PORT_ENV_NAME]):
raise ConfigException("Service host/port is set but empty.")
self.host = (
"https://" + _join_host_port(self._environ[self._host_env_name],
self._environ[self._port_env_name]))
"https://" + _join_host_port(self._environ[SERVICE_HOST_ENV_NAME],
self._environ[SERVICE_PORT_ENV_NAME]))
if not os.path.isfile(self._token_filename):
raise ConfigException("Service token file does not exists.")
with open(self._token_filename) as f:
self.token = f.read()
if not self.token:
raise ConfigException("Token file exists but empty.")
if not os.path.isfile(self._cert_filename):
raise ConfigException(
"Service certification file does not exists.")
with open(self._cert_filename) as f:
if not f.read():
raise ConfigException("Cert file exists but empty.")
self.ssl_ca_cert = self._cert_filename
def _set_config(self):
@ -81,7 +87,5 @@ def load_incluster_config():
cluster. It's intended for clients that expect to be running inside a pod
running on kubernetes. It will raise an exception if called from a process
not running in a kubernetes environment."""
InClusterConfigLoader(host_env_name=_SERVICE_HOST_ENV_NAME,
port_env_name=_SERVICE_PORT_ENV_NAME,
token_filename=_SERVICE_TOKEN_FILENAME,
cert_filename=_SERVICE_CERT_FILENAME).load_and_set()
InClusterConfigLoader(token_filename=SERVICE_TOKEN_FILENAME,
cert_filename=SERVICE_CERT_FILENAME).load_and_set()

View File

@ -16,19 +16,22 @@ import os
import tempfile
import unittest
from kubernetes.client import configuration
from .incluster_config import (_SERVICE_HOST_ENV_NAME, _SERVICE_PORT_ENV_NAME,
ConfigException, InClusterConfigLoader)
from .config_exception import ConfigException
from .incluster_config import (SERVICE_HOST_ENV_NAME, SERVICE_PORT_ENV_NAME,
InClusterConfigLoader, _join_host_port)
_TEST_TOKEN = "temp_token"
_TEST_CERT = "temp_cert"
_TEST_HOST = "127.0.0.1"
_TEST_IPV6_HOST = "::1"
_TEST_PORT = "80"
_TEST_ENVIRON = {_SERVICE_HOST_ENV_NAME: _TEST_HOST,
_SERVICE_PORT_ENV_NAME: _TEST_PORT}
_TEST_IPV6_ENVIRON = {_SERVICE_HOST_ENV_NAME: _TEST_IPV6_HOST,
_SERVICE_PORT_ENV_NAME: _TEST_PORT}
_TEST_HOST_PORT = "127.0.0.1:80"
_TEST_IPV6_HOST = "::1"
_TEST_IPV6_HOST_PORT = "[::1]:80"
_TEST_ENVIRON = {SERVICE_HOST_ENV_NAME: _TEST_HOST,
SERVICE_PORT_ENV_NAME: _TEST_PORT}
_TEST_IPV6_ENVIRON = {SERVICE_HOST_ENV_NAME: _TEST_IPV6_HOST,
SERVICE_PORT_ENV_NAME: _TEST_PORT}
class InClusterConfigTest(unittest.TestCase):
@ -49,38 +52,29 @@ class InClusterConfigTest(unittest.TestCase):
def get_test_loader(
self,
host_env_name=_SERVICE_HOST_ENV_NAME,
port_env_name=_SERVICE_PORT_ENV_NAME,
token_filename=None,
cert_filename=None,
environ=_TEST_ENVIRON):
if not token_filename:
token_filename = self._create_file_with_temp_content(_TEST_TOKEN)
if not cert_filename:
cert_filename = self._create_file_with_temp_content()
cert_filename = self._create_file_with_temp_content(_TEST_CERT)
return InClusterConfigLoader(
host_env_name=host_env_name,
port_env_name=port_env_name,
token_filename=token_filename,
cert_filename=cert_filename,
environ=environ)
def test_join_host_port(self):
self.assertEqual(_TEST_HOST_PORT,
_join_host_port(_TEST_HOST, _TEST_PORT))
self.assertEqual(_TEST_IPV6_HOST_PORT,
_join_host_port(_TEST_IPV6_HOST, _TEST_PORT))
def test_load_config(self):
cert_filename = self._create_file_with_temp_content()
cert_filename = self._create_file_with_temp_content(_TEST_CERT)
loader = self.get_test_loader(cert_filename=cert_filename)
loader._load_config()
self.assertEqual("https://%s:%s" % (_TEST_HOST, _TEST_PORT),
loader.host)
self.assertEqual(cert_filename, loader.ssl_ca_cert)
self.assertEqual(_TEST_TOKEN, loader.token)
def test_load_config_with_bracketed_hostname(self):
cert_filename = self._create_file_with_temp_content()
loader = self.get_test_loader(cert_filename=cert_filename,
environ=_TEST_IPV6_ENVIRON)
loader._load_config()
self.assertEqual("https://[%s]:%s" % (_TEST_IPV6_HOST, _TEST_PORT),
loader.host)
self.assertEqual("https://" + _TEST_HOST_PORT, loader.host)
self.assertEqual(cert_filename, loader.ssl_ca_cert)
self.assertEqual(_TEST_TOKEN, loader.token)
@ -93,21 +87,45 @@ class InClusterConfigTest(unittest.TestCase):
pass
def test_no_port(self):
loader = self.get_test_loader(port_env_name="not_exists_port")
loader = self.get_test_loader(
environ={SERVICE_HOST_ENV_NAME: _TEST_HOST})
self._should_fail_load(loader, "no port specified")
def test_empty_port(self):
loader = self.get_test_loader(
environ={SERVICE_HOST_ENV_NAME: _TEST_HOST,
SERVICE_PORT_ENV_NAME: ""})
self._should_fail_load(loader, "empty port specified")
def test_no_host(self):
loader = self.get_test_loader(host_env_name="not_exists_host")
loader = self.get_test_loader(
environ={SERVICE_PORT_ENV_NAME: _TEST_PORT})
self._should_fail_load(loader, "no host specified")
def test_empty_host(self):
loader = self.get_test_loader(
environ={SERVICE_HOST_ENV_NAME: "",
SERVICE_PORT_ENV_NAME: _TEST_PORT})
self._should_fail_load(loader, "empty host specified")
def test_no_cert_file(self):
loader = self.get_test_loader(cert_filename="not_exists_file_1123")
self._should_fail_load(loader, "cert file does not exists")
def test_empty_cert_file(self):
loader = self.get_test_loader(
cert_filename=self._create_file_with_temp_content())
self._should_fail_load(loader, "empty cert file provided")
def test_no_token_file(self):
loader = self.get_test_loader(token_filename="not_exists_file_1123")
self._should_fail_load(loader, "token file does not exists")
def test_empty_token_file(self):
loader = self.get_test_loader(
token_filename=self._create_file_with_temp_content())
self._should_fail_load(loader, "empty token file provided")
if __name__ == '__main__':
unittest.main()

View File

@ -22,151 +22,243 @@ import yaml
from kubernetes.client import configuration
from oauth2client.client import GoogleCredentials
_temp_files = []
from .config_exception import ConfigException
_temp_files = {}
def _cleanup_temp_files():
for f in _temp_files:
os.remove(f)
for temp_file in _temp_files.values():
os.remove(temp_file)
def _create_temp_file_with_content(content):
if len(_temp_files) == 0:
atexit.register(_cleanup_temp_files)
# Because we may change context several times, try to remember files we
# created and reuse them at a small memory cost.
content_key = str(content)
if content_key in _temp_files:
return _temp_files[content_key]
_, name = tempfile.mkstemp()
_temp_files.append(name)
if isinstance(content, str):
content = content.encode('utf8')
_temp_files[content_key] = name
with open(name, 'wb') as fd:
fd.write(base64.decodestring(content))
fd.write(content.encode() if isinstance(content, str) else content)
return name
def _file_from_file_or_data(o, file_key_name, data_key_name=None):
if not data_key_name:
data_key_name = file_key_name + "-data"
if data_key_name in o:
return _create_temp_file_with_content(o[data_key_name])
if file_key_name in o:
return o[file_key_name]
class FileOrData(object):
"""Utility class to read content of obj[%data_key_name] or file's
content of obj[%file_key_name] and represent it as file or data.
Note that the data is preferred. The obj[%file_key_name] will be used iff
obj['%data_key_name'] is not set or empty. Assumption is file content is
raw data and data field is base64 string."""
def __init__(self, obj, file_key_name, data_key_name=None):
if not data_key_name:
data_key_name = file_key_name + "-data"
self._file = None
self._data = None
if data_key_name in obj:
self._data = obj[data_key_name]
elif file_key_name in obj:
self._file = obj[file_key_name]
def as_file(self):
"""If obj[%data_key_name] exists, return name of a file with base64
decoded obj[%data_key_name] content otherwise obj[%file_key_name]."""
use_data_if_no_file = not self._file and self._data
if use_data_if_no_file:
self._file = _create_temp_file_with_content(
base64.decodestring(self._data.encode()))
return self._file
def as_data(self):
"""If obj[%data_key_name] exists, Return obj[%data_key_name] otherwise
base64 encoded string of obj[%file_key_name] file content."""
use_file_if_no_data = not self._data and self._file
if use_file_if_no_data:
with open(self._file) as f:
self._data = bytes.decode(
base64.encodestring(str.encode(f.read())))
return self._data
def _data_from_file_or_data(o, file_key_name, data_key_name=None):
if not data_key_name:
data_key_name = file_key_name + "_data"
if data_key_name in o:
return o[data_key_name]
if file_key_name in o:
with open(o[file_key_name], 'r') as f:
data = f.read()
return data
class KubeConfigLoader(object):
def __init__(self, config_dict, active_context=None,
get_google_credentials=None, client_configuration=None):
self._config = ConfigNode('kube-config', config_dict)
self._current_context = None
self._user = None
self._cluster = None
self.set_active_context(active_context)
if get_google_credentials:
self._get_google_credentials = get_google_credentials
else:
self._get_google_credentials = lambda: (
GoogleCredentials.get_application_default()
.get_access_token().access_token)
if client_configuration:
self._client_configuration = client_configuration
else:
self._client_configuration = configuration
def set_active_context(self, context_name=None):
if context_name is None:
context_name = self._config['current-context']
self._current_context = self._config['contexts'].get_with_name(
context_name)
if self._current_context['context'].safe_get('user'):
self._user = self._config['users'].get_with_name(
self._current_context['context']['user'])['user']
else:
self._user = None
self._cluster = self._config['clusters'].get_with_name(
self._current_context['context']['cluster'])['cluster']
def _load_authentication(self):
"""Read authentication from kube-config user section if exists.
This function goes through various authentication methods in user
section of kube-config and stops if it finds a valid authentication
method. The order of authentication methods is:
1. GCP auth-provider
2. token_data
3. token field (point to a token file)
4. username/password
"""
if not self._user:
return
if self._load_gcp_token():
return
if self._load_user_token():
return
self._load_user_pass_token()
def _load_gcp_token(self):
if 'auth-provider' not in self._user:
return
if 'name' not in self._user['auth-provider']:
return
if self._user['auth-provider']['name'] != 'gcp':
return
# Ignore configs in auth-provider and rely on GoogleCredentials
# caching and refresh mechanism.
# TODO: support gcp command based token ("cmd-path" config).
self.token = self._get_google_credentials()
return self.token
def _load_user_token(self):
token = FileOrData(self._user, 'tokenFile', 'token').as_data()
if token:
self.token = token
return True
def _load_user_pass_token(self):
if 'username' in self._user and 'password' in self._user:
self.token = urllib3.util.make_headers(
basic_auth=(self._user['username'] + ':' +
self._user['password'])).get('authorization')
return True
def _load_cluster_info(self):
if 'server' in self._cluster:
self.host = self._cluster['server']
if self.host.startswith("https"):
self.ssl_ca_cert = FileOrData(
self._cluster, 'certificate-authority').as_file()
self.cert_file = FileOrData(
self._user, 'client-certificate').as_file()
self.key_file = FileOrData(self._user, 'client-key').as_file()
def _set_config(self):
if 'token' in self.__dict__:
self._client_configuration.api_key['authorization'] = self.token
# copy these keys directly from self to configuration object
keys = ['host', 'ssl_ca_cert', 'cert_file', 'key_file']
for key in keys:
if key in self.__dict__:
setattr(self._client_configuration, key, getattr(self, key))
def load_and_set(self):
self._load_authentication()
self._load_cluster_info()
self._set_config()
def list_contexts(self):
return [context.value for context in self._config['contexts']]
@property
def current_context(self):
return self._current_context.value
def _load_gcp_token(user):
if 'auth-provider' not in user:
return
if 'name' not in user['auth-provider']:
return
if user['auth-provider']['name'] != 'gcp':
return
# Ignore configs in auth-provider and rely on GoogleCredentials
# caching and refresh mechanism.
# TODO: support gcp command based token ("cmd-path" config).
return (GoogleCredentials
.get_application_default()
.get_access_token()
.access_token)
def _load_authentication(user):
"""Read authentication from kube-config user section.
This function goes through various authetication methods in user section of
kubeconfig and stops if it founds a valid authentication method. The order
of authentication methods is:
1. GCP auth-provider
2. token_data
3. token field (point to a token file)
4. username/password
"""
# Read authentication
token = _load_gcp_token(user)
if not token:
token = _data_from_file_or_data(user, 'tokenFile', 'token')
if token:
configuration.api_key['authorization'] = "bearer " + token
else:
if 'username' in user and 'password' in user:
configuration.api_key['authorization'] = urllib3.util.make_headers(
basic_auth=user['username'] + ':' +
user['password']).get('authorization')
def _load_cluster_info(cluster, user):
"""Loads cluster information from kubeconfig such as host and SSL certs."""
if 'server' in cluster:
configuration.host = cluster['server']
if configuration.host.startswith("https"):
configuration.ssl_ca_cert = _file_from_file_or_data(
cluster, 'certificate-authority')
configuration.cert_file = _file_from_file_or_data(
user, 'client-certificate')
configuration.key_file = _file_from_file_or_data(
user, 'client-key')
class _node:
"""Remembers each key's path and construct a relevant exception message
in case of missing keys."""
class ConfigNode(object):
"""Remembers each config key's path and construct a relevant exception
message in case of missing keys. The assumption is all access keys are
present in a well-formed kube-config."""
def __init__(self, name, value):
self._name = name
self._value = value
self.name = name
self.value = value
def __contains__(self, key):
return key in self._value
return key in self.value
def __len__(self):
return len(self.value)
def safe_get(self, key):
if (isinstance(self.value, list) and isinstance(key, int) or
key in self.value):
return self.value[key]
def __getitem__(self, key):
if key in self._value:
v = self._value[key]
if isinstance(v, dict) or isinstance(v, list):
return _node('%s/%s' % (self._name, key), v)
else:
return v
raise Exception(
'Invalid kube-config file. Expected key %s in %s'
% (key, self._name))
v = self.safe_get(key)
if not v:
raise ConfigException(
'Invalid kube-config file. Expected key %s in %s'
% (key, self.name))
if isinstance(v, dict) or isinstance(v, list):
return ConfigNode('%s/%s' % (self.name, key), v)
else:
return v
def get_with_name(self, name):
if not isinstance(self._value, list):
raise Exception(
if not isinstance(self.value, list):
raise ConfigException(
'Invalid kube-config file. Expected %s to be a list'
% self._name)
for v in self._value:
% self.name)
for v in self.value:
if 'name' not in v:
raise Exception(
raise ConfigException(
'Invalid kube-config file. '
'Expected all values in %s list to have \'name\' key'
% self._name)
% self.name)
if v['name'] == name:
return _node('%s[name=%s]' % (self._name, name), v)
raise Exception(
"Cannot find object with name %s in %s list" % (name, self._name))
return ConfigNode('%s[name=%s]' % (self.name, name), v)
raise ConfigException(
'Invalid kube-config file. '
'Expected object with name %s in %s list' % (name, self.name))
def load_kube_config(config_file):
def list_kube_config_contexts(config_file):
with open(config_file) as f:
loader = KubeConfigLoader(config_dict=yaml.load(f))
return loader.list_contexts(), loader.current_context
def load_kube_config(config_file, context=None):
"""Loads authentication and cluster information from kube-config file
and store them in kubernetes.client.configuration."""
and stores them in kubernetes.client.configuration.
:param config_file: Name of the kube-config file.
:param context: set the active context. If is set to None, current_context
from config file will be used.
"""
with open(config_file) as f:
config = _node('kube-config', yaml.load(f))
current_context = config['contexts'].get_with_name(
config['current-context'])['context']
user = config['users'].get_with_name(current_context['user'])['user']
cluster = config['clusters'].get_with_name(
current_context['cluster'])['cluster']
_load_cluster_info(cluster, user)
_load_authentication(user)
KubeConfigLoader(
config_dict=yaml.load(f), active_context=context).load_and_set()

View File

@ -0,0 +1,469 @@
# Copyright 2016 The Kubernetes Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import base64
import os
import tempfile
import unittest
from .config_exception import ConfigException
from .kube_config import (ConfigNode, FileOrData, KubeConfigLoader,
_create_temp_file_with_content)
def _base64(string):
return base64.encodestring(string.encode()).decode()
TEST_FILE_KEY = "file"
TEST_DATA_KEY = "data"
TEST_FILENAME = "test-filename"
TEST_DATA = "test-data"
TEST_DATA_BASE64 = _base64(TEST_DATA)
TEST_ANOTHER_DATA = "another-test-data"
TEST_ANOTHER_DATA_BASE64 = _base64(TEST_ANOTHER_DATA)
TEST_HOST = "test-host"
TEST_USERNAME = "me"
TEST_PASSWORD = "pass"
# token for me:pass
TEST_BASIC_TOKEN = "Basic bWU6cGFzcw=="
TEST_SSL_HOST = "https://test-host"
TEST_CERTIFICATE_AUTH = "cert-auth"
TEST_CERTIFICATE_AUTH_BASE64 = _base64(TEST_CERTIFICATE_AUTH)
TEST_CLIENT_KEY = "client-key"
TEST_CLIENT_KEY_BASE64 = _base64(TEST_CLIENT_KEY)
TEST_CLIENT_CERT = "client-cert"
TEST_CLIENT_CERT_BASE64 = _base64(TEST_CLIENT_CERT)
class BaseTestCase(unittest.TestCase):
def setUp(self):
self._temp_files = []
def tearDown(self):
for f in self._temp_files:
os.remove(f)
def _create_temp_file(self, content=""):
handler, name = tempfile.mkstemp()
self._temp_files.append(name)
os.write(handler, str.encode(content))
os.close(handler)
return name
class TestFileOrData(BaseTestCase):
@staticmethod
def get_file_content(filename):
with open(filename) as f:
return f.read()
def test_file_given_file(self):
obj = {TEST_FILE_KEY: TEST_FILENAME}
t = FileOrData(obj=obj, file_key_name=TEST_FILE_KEY)
self.assertEqual(TEST_FILENAME, t.as_file())
def test_file_given_data(self):
obj = {TEST_DATA_KEY: TEST_DATA_BASE64}
t = FileOrData(obj=obj, file_key_name=TEST_FILE_KEY,
data_key_name=TEST_DATA_KEY)
self.assertEqual(TEST_DATA, self.get_file_content(t.as_file()))
def test_data_given_data(self):
obj = {TEST_DATA_KEY: TEST_DATA_BASE64}
t = FileOrData(obj=obj, file_key_name=TEST_FILE_KEY,
data_key_name=TEST_DATA_KEY)
self.assertEqual(TEST_DATA_BASE64, t.as_data())
def test_data_given_file(self):
obj = {
TEST_FILE_KEY: self._create_temp_file(content=TEST_DATA)}
t = FileOrData(obj=obj, file_key_name=TEST_FILE_KEY)
self.assertEqual(TEST_DATA_BASE64, t.as_data())
def test_data_given_file_and_data(self):
obj = {
TEST_DATA_KEY: TEST_DATA_BASE64,
TEST_FILE_KEY: self._create_temp_file(
content=TEST_ANOTHER_DATA)}
t = FileOrData(obj=obj, file_key_name=TEST_FILE_KEY,
data_key_name=TEST_DATA_KEY)
self.assertEqual(TEST_DATA_BASE64, t.as_data())
def test_file_given_file_and_data(self):
obj = {
TEST_DATA_KEY: TEST_DATA_BASE64,
TEST_FILE_KEY: self._create_temp_file(
content=TEST_ANOTHER_DATA)}
t = FileOrData(obj=obj, file_key_name=TEST_FILE_KEY,
data_key_name=TEST_DATA_KEY)
self.assertEqual(TEST_DATA, self.get_file_content(t.as_file()))
def test_create_temp_file_with_content(self):
self.assertEqual(TEST_DATA,
self.get_file_content(
_create_temp_file_with_content(TEST_DATA)))
class TestConfigNode(BaseTestCase):
test_obj = {"key1": "test", "key2": ["a", "b", "c"],
"key3": {"inner_key": "inner_value"},
"with_names": [{"name": "test_name", "value": "test_value"},
{"name": "test_name2",
"value": {"key1", "test"}},
{"name": "test_name3", "value": [1, 2, 3]}]}
def setUp(self):
super(TestConfigNode, self).setUp()
self.node = ConfigNode("test_obj", self.test_obj)
def test_normal_map_array_operations(self):
self.assertEqual("test", self.node['key1'])
self.assertEqual(4, len(self.node))
self.assertEqual("test_obj/key2", self.node['key2'].name)
self.assertEqual(["a", "b", "c"], self.node['key2'].value)
self.assertEqual("b", self.node['key2'][1])
self.assertEqual(3, len(self.node['key2']))
self.assertEqual("test_obj/key3", self.node['key3'].name)
self.assertEqual({"inner_key": "inner_value"}, self.node['key3'].value)
self.assertEqual("inner_value", self.node['key3']["inner_key"])
self.assertEqual(1, len(self.node['key3']))
def test_get_with_name(self):
node = self.node["with_names"]
self.assertEqual(
"test_value",
node.get_with_name("test_name")["value"])
self.assertTrue(
isinstance(node.get_with_name("test_name2"), ConfigNode))
self.assertTrue(
isinstance(node.get_with_name("test_name3"), ConfigNode))
self.assertEqual("test_obj/with_names[name=test_name2]",
node.get_with_name("test_name2").name)
self.assertEqual("test_obj/with_names[name=test_name3]",
node.get_with_name("test_name3").name)
def expect_exception(self, func, message_part):
with self.assertRaises(ConfigException) as context:
func()
self.assertIn(message_part, str(context.exception))
def test_key_does_not_exists(self):
self.expect_exception(lambda: self.node['not-exists-key'],
"Expected key not-exists-key in test_obj")
self.expect_exception(lambda: self.node['key3']['not-exists-key'],
"Expected key not-exists-key in test_obj/key3")
def test_get_with_name_on_invalid_object(self):
self.expect_exception(
lambda: self.node['key2'].get_with_name('no-name'),
"Expected all values in test_obj/key2 list to have \'name\' key")
def test_get_with_name_on_non_list_object(self):
self.expect_exception(
lambda: self.node['key3'].get_with_name('no-name'),
"Expected test_obj/key3 to be a list")
def test_get_with_name_on_name_does_not_exists(self):
self.expect_exception(
lambda: self.node['with_names'].get_with_name('no-name'),
"Expected object with name no-name in test_obj/with_names list")
class FakeConfig:
FILE_KEYS = ["ssl_ca_cert", "key_file", "cert_file"]
def __init__(self, token=None, **kwargs):
self.api_key = {}
if token:
self.api_key['authorization'] = token
self.__dict__.update(kwargs)
def __eq__(self, other):
if len(self.__dict__) != len(other.__dict__):
return
for k, v in self.__dict__.items():
if k not in other.__dict__:
return
if k in self.FILE_KEYS:
try:
with open(v) as f1, open(other.__dict__[k]) as f2:
if f1.read() != f2.read():
return
except IOError:
# fall back to only compare filenames in case we are
# testing the passing of filenames to the config
if other.__dict__[k] != v:
return
else:
if other.__dict__[k] != v:
return
return True
def __repr__(self):
rep = "\n"
for k, v in self.__dict__.items():
val = v
if k in self.FILE_KEYS:
try:
with open(v) as f:
val = "FILE: %s" % str.decode(f.read())
except IOError as e:
val = "ERROR: %s" % str(e)
rep += "\t%s: %s\n" % (k, val)
return "Config(%s\n)" % rep
class TestKubeConfigLoader(BaseTestCase):
TEST_KUBE_CONFIG = {
"current-context": "no_user",
"contexts": [
{
"name": "no_user",
"context": {
"cluster": "default"
}
},
{
"name": "simple_token",
"context": {
"cluster": "default",
"user": "simple_token"
}
},
{
"name": "gcp",
"context": {
"cluster": "default",
"user": "gcp"
}
},
{
"name": "user_pass",
"context": {
"cluster": "default",
"user": "user_pass"
}
},
{
"name": "ssl",
"context": {
"cluster": "ssl",
"user": "ssl"
}
},
{
"name": "ssl-no_file",
"context": {
"cluster": "ssl-no_file",
"user": "ssl-no_file"
}
},
],
"clusters": [
{
"name": "default",
"cluster": {
"server": TEST_HOST
}
},
{
"name": "ssl-no_file",
"cluster": {
"server": TEST_SSL_HOST,
"certificate-authority": TEST_CERTIFICATE_AUTH,
}
},
{
"name": "ssl",
"cluster": {
"server": TEST_SSL_HOST,
"certificate-authority-data": TEST_CERTIFICATE_AUTH_BASE64,
}
},
],
"users": [
{
"name": "simple_token",
"user": {
"token": TEST_DATA_BASE64,
"username": TEST_USERNAME, # should be ignored
"password": TEST_PASSWORD, # should be ignored
}
},
{
"name": "gcp",
"user": {
"auth-provider": {
"name": "gcp",
"access_token": "not_used",
},
"token": TEST_DATA_BASE64, # should be ignored
"username": TEST_USERNAME, # should be ignored
"password": TEST_PASSWORD, # should be ignored
}
},
{
"name": "user_pass",
"user": {
"username": TEST_USERNAME, # should be ignored
"password": TEST_PASSWORD, # should be ignored
}
},
{
"name": "ssl-no_file",
"user": {
"token": TEST_DATA_BASE64,
"client-certificate": TEST_CLIENT_CERT,
"client-key": TEST_CLIENT_KEY,
}
},
{
"name": "ssl",
"user": {
"token": TEST_DATA_BASE64,
"client-certificate-data": TEST_CLIENT_CERT_BASE64,
"client-key-data": TEST_CLIENT_KEY_BASE64,
}
},
]
}
def test_no_user_context(self):
expected = FakeConfig(host=TEST_HOST)
actual = FakeConfig()
KubeConfigLoader(
config_dict=self.TEST_KUBE_CONFIG,
active_context="no_user",
client_configuration=actual).load_and_set()
self.assertEqual(expected, actual)
def test_simple_token(self):
expected = FakeConfig(host=TEST_HOST, token=TEST_DATA_BASE64)
actual = FakeConfig()
KubeConfigLoader(
config_dict=self.TEST_KUBE_CONFIG,
active_context="simple_token",
client_configuration=actual).load_and_set()
self.assertEqual(expected, actual)
def test_load_user_token(self):
loader = KubeConfigLoader(
config_dict=self.TEST_KUBE_CONFIG,
active_context="simple_token")
self.assertTrue(loader._load_user_token())
self.assertEqual(TEST_DATA_BASE64, loader.token)
def test_gcp(self):
expected = FakeConfig(host=TEST_HOST, token=TEST_ANOTHER_DATA_BASE64)
actual = FakeConfig()
KubeConfigLoader(
config_dict=self.TEST_KUBE_CONFIG,
active_context="gcp",
client_configuration=actual,
get_google_credentials=lambda: TEST_ANOTHER_DATA_BASE64) \
.load_and_set()
self.assertEqual(expected, actual)
def test_load_gcp_token(self):
loader = KubeConfigLoader(
config_dict=self.TEST_KUBE_CONFIG,
active_context="gcp",
get_google_credentials=lambda: TEST_ANOTHER_DATA_BASE64)
self.assertTrue(loader._load_gcp_token())
self.assertEqual(TEST_ANOTHER_DATA_BASE64, loader.token)
def test_user_pass(self):
expected = FakeConfig(host=TEST_HOST, token=TEST_BASIC_TOKEN)
actual = FakeConfig()
KubeConfigLoader(
config_dict=self.TEST_KUBE_CONFIG,
active_context="user_pass",
client_configuration=actual).load_and_set()
self.assertEqual(expected, actual)
def test_load_user_pass_token(self):
loader = KubeConfigLoader(
config_dict=self.TEST_KUBE_CONFIG,
active_context="user_pass")
self.assertTrue(loader._load_user_pass_token())
self.assertEqual(TEST_BASIC_TOKEN, loader.token)
def test_ssl_no_cert_files(self):
expected = FakeConfig(
host=TEST_SSL_HOST,
token=TEST_DATA_BASE64,
cert_file=TEST_CLIENT_CERT,
key_file=TEST_CLIENT_KEY,
ssl_ca_cert=TEST_CERTIFICATE_AUTH
)
actual = FakeConfig()
KubeConfigLoader(
config_dict=self.TEST_KUBE_CONFIG,
active_context="ssl-no_file",
client_configuration=actual).load_and_set()
self.assertEqual(expected, actual)
def test_ssl(self):
expected = FakeConfig(
host=TEST_SSL_HOST,
token=TEST_DATA_BASE64,
cert_file=self._create_temp_file(TEST_CLIENT_CERT),
key_file=self._create_temp_file(TEST_CLIENT_KEY),
ssl_ca_cert=self._create_temp_file(TEST_CERTIFICATE_AUTH)
)
actual = FakeConfig()
KubeConfigLoader(
config_dict=self.TEST_KUBE_CONFIG,
active_context="ssl",
client_configuration=actual).load_and_set()
self.assertEqual(expected, actual)
def test_list_contexts(self):
loader = KubeConfigLoader(
config_dict=self.TEST_KUBE_CONFIG,
active_context="no_user")
actual_contexts = loader.list_contexts()
expected_contexts = ConfigNode("", self.TEST_KUBE_CONFIG)['contexts']
for actual in actual_contexts:
expected = expected_contexts.get_with_name(actual['name'])
self.assertEqual(expected.value, actual)
def test_current_context(self):
loader = KubeConfigLoader(config_dict=self.TEST_KUBE_CONFIG)
expected_contexts = ConfigNode("", self.TEST_KUBE_CONFIG)['contexts']
self.assertEqual(expected_contexts.get_with_name("no_user").value,
loader.current_context)
def test_set_active_context(self):
loader = KubeConfigLoader(config_dict=self.TEST_KUBE_CONFIG)
loader.set_active_context("ssl")
expected_contexts = ConfigNode("", self.TEST_KUBE_CONFIG)['contexts']
self.assertEqual(expected_contexts.get_with_name("ssl").value,
loader.current_context)
if __name__ == '__main__':
unittest.main()