Merge pull request #69 from mbohlool/c6

Bugfix: Python client does not resolve relative paths in kubeconfig
This commit is contained in:
mbohlool 2016-12-15 14:34:14 -08:00 committed by GitHub
commit 8d536e6642
3 changed files with 124 additions and 27 deletions

View File

@ -1,7 +1,11 @@
# current-version
# v1.0.0-beta1
- Add context switch to kube config loader
- Test coverage improvments
- Add context switch to kube config loader #46
- Add default kube config location #64
- Bugfix: Python client does not resolve relative paths in kubeconfig #68
- Bugfix: `read_namespaced_pod_log` get None response #57
- Improved test coverage #54
- Improved client generator #49
# v1.0.0-alpha2

View File

@ -28,8 +28,13 @@ _temp_files = {}
def _cleanup_temp_files():
global _temp_files
for temp_file in _temp_files.values():
os.remove(temp_file)
try:
os.remove(temp_file)
except OSError:
pass
_temp_files = {}
def _create_temp_file_with_content(content):
@ -54,7 +59,8 @@ class FileOrData(object):
obj['%data_key_name'] is not set or empty. Assumption is file content is
raw data and data field is base64 string."""
def __init__(self, obj, file_key_name, data_key_name=None):
def __init__(self, obj, file_key_name, data_key_name=None,
file_base_path=""):
if not data_key_name:
data_key_name = file_key_name + "-data"
self._file = None
@ -62,7 +68,8 @@ class FileOrData(object):
if data_key_name in obj:
self._data = obj[data_key_name]
elif file_key_name in obj:
self._file = obj[file_key_name]
self._file = os.path.normpath(
os.path.join(file_base_path, obj[file_key_name]))
def as_file(self):
"""If obj[%data_key_name] exists, return name of a file with base64
@ -71,6 +78,8 @@ class FileOrData(object):
if use_data_if_no_file:
self._file = _create_temp_file_with_content(
base64.decodestring(self._data.encode()))
if not os.path.isfile(self._file):
raise ConfigException("File does not exists: %s" % self._file)
return self._file
def as_data(self):
@ -87,12 +96,14 @@ class FileOrData(object):
class KubeConfigLoader(object):
def __init__(self, config_dict, active_context=None,
get_google_credentials=None, client_configuration=None):
get_google_credentials=None, client_configuration=None,
config_base_path=""):
self._config = ConfigNode('kube-config', config_dict)
self._current_context = None
self._user = None
self._cluster = None
self.set_active_context(active_context)
self._config_base_path = config_base_path
if get_google_credentials:
self._get_google_credentials = get_google_credentials
else:
@ -151,7 +162,9 @@ class KubeConfigLoader(object):
return self.token
def _load_user_token(self):
token = FileOrData(self._user, 'tokenFile', 'token').as_data()
token = FileOrData(
self._user, 'tokenFile', 'token',
file_base_path=self._config_base_path).as_data()
if token:
self.token = token
return True
@ -168,10 +181,14 @@ class KubeConfigLoader(object):
self.host = self._cluster['server']
if self.host.startswith("https"):
self.ssl_ca_cert = FileOrData(
self._cluster, 'certificate-authority').as_file()
self._cluster, 'certificate-authority',
file_base_path=self._config_base_path).as_file()
self.cert_file = FileOrData(
self._user, 'client-certificate').as_file()
self.key_file = FileOrData(self._user, 'client-key').as_file()
self._user, 'client-certificate',
file_base_path=self._config_base_path).as_file()
self.key_file = FileOrData(
self._user, 'client-key',
file_base_path=self._config_base_path).as_file()
def _set_config(self):
if 'token' in self.__dict__:
@ -244,9 +261,16 @@ class ConfigNode(object):
'Expected object with name %s in %s list' % (name, self.name))
def _get_kube_config_loader_for_yaml_file(filename, **kwargs):
with open(filename) as f:
return KubeConfigLoader(
config_dict=yaml.load(f),
config_base_path=os.path.abspath(os.path.dirname(filename)),
**kwargs)
def list_kube_config_contexts(config_file):
with open(config_file) as f:
loader = KubeConfigLoader(config_dict=yaml.load(f))
loader = _get_kube_config_loader_for_yaml_file(config_file)
return loader.list_contexts(), loader.current_context
@ -259,6 +283,5 @@ def load_kube_config(config_file, context=None):
from config file will be used.
"""
with open(config_file) as f:
KubeConfigLoader(
config_dict=yaml.load(f), active_context=context).load_and_set()
_get_kube_config_loader_for_yaml_file(
config_file, active_context=context).load_and_set()

View File

@ -14,12 +14,15 @@
import base64
import os
import shutil
import tempfile
import unittest
from .config_exception import ConfigException
from .kube_config import (ConfigNode, FileOrData, KubeConfigLoader,
_create_temp_file_with_content)
_create_temp_file_with_content, _cleanup_temp_files)
NON_EXISTING_FILE = "zz_non_existing_file_472398324"
def _base64(string):
@ -67,6 +70,11 @@ class BaseTestCase(unittest.TestCase):
os.close(handler)
return name
def expect_exception(self, func, message_part):
with self.assertRaises(ConfigException) as context:
func()
self.assertIn(message_part, str(context.exception))
class TestFileOrData(BaseTestCase):
@ -76,9 +84,16 @@ class TestFileOrData(BaseTestCase):
return f.read()
def test_file_given_file(self):
obj = {TEST_FILE_KEY: TEST_FILENAME}
temp_filename = _create_temp_file_with_content(TEST_DATA)
obj = {TEST_FILE_KEY: temp_filename}
t = FileOrData(obj=obj, file_key_name=TEST_FILE_KEY)
self.assertEqual(TEST_FILENAME, t.as_file())
self.assertEqual(TEST_DATA, self.get_file_content(t.as_file()))
def test_file_given_non_existing_file(self):
temp_filename = NON_EXISTING_FILE
obj = {TEST_FILE_KEY: temp_filename}
t = FileOrData(obj=obj, file_key_name=TEST_FILE_KEY)
self.expect_exception(t.as_file, "does not exists")
def test_file_given_data(self):
obj = {TEST_DATA_KEY: TEST_DATA_BASE64}
@ -116,10 +131,20 @@ class TestFileOrData(BaseTestCase):
data_key_name=TEST_DATA_KEY)
self.assertEqual(TEST_DATA, self.get_file_content(t.as_file()))
def test_file_with_custom_dirname(self):
tempfile = self._create_temp_file(content=TEST_DATA)
tempfile_dir = os.path.dirname(tempfile)
tempfile_basename = os.path.basename(tempfile)
obj = {TEST_FILE_KEY: tempfile_basename}
t = FileOrData(obj=obj, file_key_name=TEST_FILE_KEY,
file_base_path=tempfile_dir)
self.assertEqual(TEST_DATA, self.get_file_content(t.as_file()))
def test_create_temp_file_with_content(self):
self.assertEqual(TEST_DATA,
self.get_file_content(
_create_temp_file_with_content(TEST_DATA)))
_cleanup_temp_files()
class TestConfigNode(BaseTestCase):
@ -163,11 +188,6 @@ class TestConfigNode(BaseTestCase):
self.assertEqual("test_obj/with_names[name=test_name3]",
node.get_with_name("test_name3").name)
def expect_exception(self, func, message_part):
with self.assertRaises(ConfigException) as context:
func()
self.assertIn(message_part, str(context.exception))
def test_key_does_not_exists(self):
self.expect_exception(lambda: self.node['not-exists-key'],
"Expected key not-exists-key in test_obj")
@ -281,6 +301,13 @@ class TestKubeConfigLoader(BaseTestCase):
"user": "ssl-no_file"
}
},
{
"name": "ssl-local-file",
"context": {
"cluster": "ssl-local-file",
"user": "ssl-local-file"
}
},
],
"clusters": [
{
@ -296,6 +323,13 @@ class TestKubeConfigLoader(BaseTestCase):
"certificate-authority": TEST_CERTIFICATE_AUTH,
}
},
{
"name": "ssl-local-file",
"cluster": {
"server": TEST_SSL_HOST,
"certificate-authority": "cert_test",
}
},
{
"name": "ssl",
"cluster": {
@ -340,6 +374,14 @@ class TestKubeConfigLoader(BaseTestCase):
"client-key": TEST_CLIENT_KEY,
}
},
{
"name": "ssl-local-file",
"user": {
"tokenFile": "token_file",
"client-certificate": "client_cert",
"client-key": "client_key",
}
},
{
"name": "ssl",
"user": {
@ -420,11 +462,11 @@ class TestKubeConfigLoader(BaseTestCase):
ssl_ca_cert=TEST_CERTIFICATE_AUTH
)
actual = FakeConfig()
KubeConfigLoader(
loader = KubeConfigLoader(
config_dict=self.TEST_KUBE_CONFIG,
active_context="ssl-no_file",
client_configuration=actual).load_and_set()
self.assertEqual(expected, actual)
client_configuration=actual)
self.expect_exception(loader.load_and_set, "does not exists")
def test_ssl(self):
expected = FakeConfig(
@ -464,6 +506,34 @@ class TestKubeConfigLoader(BaseTestCase):
self.assertEqual(expected_contexts.get_with_name("ssl").value,
loader.current_context)
def test_ssl_with_relative_ssl_files(self):
expected = FakeConfig(
host=TEST_SSL_HOST,
token=TEST_DATA_BASE64,
cert_file=self._create_temp_file(TEST_CLIENT_CERT),
key_file=self._create_temp_file(TEST_CLIENT_KEY),
ssl_ca_cert=self._create_temp_file(TEST_CERTIFICATE_AUTH)
)
try:
temp_dir = tempfile.mkdtemp()
actual = FakeConfig()
with open(os.path.join(temp_dir, "cert_test"), "wb") as fd:
fd.write(TEST_CERTIFICATE_AUTH.encode())
with open(os.path.join(temp_dir, "client_cert"), "wb") as fd:
fd.write(TEST_CLIENT_CERT.encode())
with open(os.path.join(temp_dir, "client_key"), "wb") as fd:
fd.write(TEST_CLIENT_KEY.encode())
with open(os.path.join(temp_dir, "token_file"), "wb") as fd:
fd.write(TEST_DATA.encode())
KubeConfigLoader(
config_dict=self.TEST_KUBE_CONFIG,
active_context="ssl-local-file",
config_base_path=temp_dir,
client_configuration=actual).load_and_set()
self.assertEqual(expected, actual)
finally:
shutil.rmtree(temp_dir)
if __name__ == '__main__':
unittest.main()