Resolving cert files relative path using kube-config file path.
This commit is contained in:
parent
0b6fb4cf25
commit
a429afe91d
@ -28,8 +28,13 @@ _temp_files = {}
|
||||
|
||||
|
||||
def _cleanup_temp_files():
|
||||
global _temp_files
|
||||
for temp_file in _temp_files.values():
|
||||
os.remove(temp_file)
|
||||
try:
|
||||
os.remove(temp_file)
|
||||
except OSError:
|
||||
pass
|
||||
_temp_files = {}
|
||||
|
||||
|
||||
def _create_temp_file_with_content(content):
|
||||
@ -54,7 +59,8 @@ class FileOrData(object):
|
||||
obj['%data_key_name'] is not set or empty. Assumption is file content is
|
||||
raw data and data field is base64 string."""
|
||||
|
||||
def __init__(self, obj, file_key_name, data_key_name=None):
|
||||
def __init__(self, obj, file_key_name, data_key_name=None,
|
||||
file_base_path=""):
|
||||
if not data_key_name:
|
||||
data_key_name = file_key_name + "-data"
|
||||
self._file = None
|
||||
@ -62,7 +68,8 @@ class FileOrData(object):
|
||||
if data_key_name in obj:
|
||||
self._data = obj[data_key_name]
|
||||
elif file_key_name in obj:
|
||||
self._file = obj[file_key_name]
|
||||
self._file = os.path.normpath(
|
||||
os.path.join(file_base_path, obj[file_key_name]))
|
||||
|
||||
def as_file(self):
|
||||
"""If obj[%data_key_name] exists, return name of a file with base64
|
||||
@ -71,6 +78,8 @@ class FileOrData(object):
|
||||
if use_data_if_no_file:
|
||||
self._file = _create_temp_file_with_content(
|
||||
base64.decodestring(self._data.encode()))
|
||||
if not os.path.isfile(self._file):
|
||||
raise ConfigException("File does not exists: %s" % self._file)
|
||||
return self._file
|
||||
|
||||
def as_data(self):
|
||||
@ -87,12 +96,14 @@ class FileOrData(object):
|
||||
class KubeConfigLoader(object):
|
||||
|
||||
def __init__(self, config_dict, active_context=None,
|
||||
get_google_credentials=None, client_configuration=None):
|
||||
get_google_credentials=None, client_configuration=None,
|
||||
config_base_path=""):
|
||||
self._config = ConfigNode('kube-config', config_dict)
|
||||
self._current_context = None
|
||||
self._user = None
|
||||
self._cluster = None
|
||||
self.set_active_context(active_context)
|
||||
self._config_base_path = config_base_path
|
||||
if get_google_credentials:
|
||||
self._get_google_credentials = get_google_credentials
|
||||
else:
|
||||
@ -151,7 +162,9 @@ class KubeConfigLoader(object):
|
||||
return self.token
|
||||
|
||||
def _load_user_token(self):
|
||||
token = FileOrData(self._user, 'tokenFile', 'token').as_data()
|
||||
token = FileOrData(
|
||||
self._user, 'tokenFile', 'token',
|
||||
file_base_path=self._config_base_path).as_data()
|
||||
if token:
|
||||
self.token = token
|
||||
return True
|
||||
@ -168,10 +181,14 @@ class KubeConfigLoader(object):
|
||||
self.host = self._cluster['server']
|
||||
if self.host.startswith("https"):
|
||||
self.ssl_ca_cert = FileOrData(
|
||||
self._cluster, 'certificate-authority').as_file()
|
||||
self._cluster, 'certificate-authority',
|
||||
file_base_path=self._config_base_path).as_file()
|
||||
self.cert_file = FileOrData(
|
||||
self._user, 'client-certificate').as_file()
|
||||
self.key_file = FileOrData(self._user, 'client-key').as_file()
|
||||
self._user, 'client-certificate',
|
||||
file_base_path=self._config_base_path).as_file()
|
||||
self.key_file = FileOrData(
|
||||
self._user, 'client-key',
|
||||
file_base_path=self._config_base_path).as_file()
|
||||
|
||||
def _set_config(self):
|
||||
if 'token' in self.__dict__:
|
||||
@ -244,9 +261,16 @@ class ConfigNode(object):
|
||||
'Expected object with name %s in %s list' % (name, self.name))
|
||||
|
||||
|
||||
def _get_kube_config_loader_for_yaml_file(filename, **kwargs):
|
||||
with open(filename) as f:
|
||||
return KubeConfigLoader(
|
||||
config_dict=yaml.load(f),
|
||||
config_base_path=os.path.abspath(os.path.dirname(filename)),
|
||||
**kwargs)
|
||||
|
||||
|
||||
def list_kube_config_contexts(config_file):
|
||||
with open(config_file) as f:
|
||||
loader = KubeConfigLoader(config_dict=yaml.load(f))
|
||||
loader = _get_kube_config_loader_for_yaml_file(config_file)
|
||||
return loader.list_contexts(), loader.current_context
|
||||
|
||||
|
||||
@ -259,6 +283,5 @@ def load_kube_config(config_file, context=None):
|
||||
from config file will be used.
|
||||
"""
|
||||
|
||||
with open(config_file) as f:
|
||||
KubeConfigLoader(
|
||||
config_dict=yaml.load(f), active_context=context).load_and_set()
|
||||
_get_kube_config_loader_for_yaml_file(
|
||||
config_file, active_context=context).load_and_set()
|
||||
|
||||
@ -14,12 +14,15 @@
|
||||
|
||||
import base64
|
||||
import os
|
||||
import shutil
|
||||
import tempfile
|
||||
import unittest
|
||||
|
||||
from .config_exception import ConfigException
|
||||
from .kube_config import (ConfigNode, FileOrData, KubeConfigLoader,
|
||||
_create_temp_file_with_content)
|
||||
_create_temp_file_with_content, _cleanup_temp_files)
|
||||
|
||||
NON_EXISTING_FILE = "zz_non_existing_file_472398324"
|
||||
|
||||
|
||||
def _base64(string):
|
||||
@ -67,6 +70,11 @@ class BaseTestCase(unittest.TestCase):
|
||||
os.close(handler)
|
||||
return name
|
||||
|
||||
def expect_exception(self, func, message_part):
|
||||
with self.assertRaises(ConfigException) as context:
|
||||
func()
|
||||
self.assertIn(message_part, str(context.exception))
|
||||
|
||||
|
||||
class TestFileOrData(BaseTestCase):
|
||||
|
||||
@ -76,9 +84,16 @@ class TestFileOrData(BaseTestCase):
|
||||
return f.read()
|
||||
|
||||
def test_file_given_file(self):
|
||||
obj = {TEST_FILE_KEY: TEST_FILENAME}
|
||||
temp_filename = _create_temp_file_with_content(TEST_DATA)
|
||||
obj = {TEST_FILE_KEY: temp_filename}
|
||||
t = FileOrData(obj=obj, file_key_name=TEST_FILE_KEY)
|
||||
self.assertEqual(TEST_FILENAME, t.as_file())
|
||||
self.assertEqual(TEST_DATA, self.get_file_content(t.as_file()))
|
||||
|
||||
def test_file_given_non_existing_file(self):
|
||||
temp_filename = NON_EXISTING_FILE
|
||||
obj = {TEST_FILE_KEY: temp_filename}
|
||||
t = FileOrData(obj=obj, file_key_name=TEST_FILE_KEY)
|
||||
self.expect_exception(t.as_file, "does not exists")
|
||||
|
||||
def test_file_given_data(self):
|
||||
obj = {TEST_DATA_KEY: TEST_DATA_BASE64}
|
||||
@ -116,10 +131,20 @@ class TestFileOrData(BaseTestCase):
|
||||
data_key_name=TEST_DATA_KEY)
|
||||
self.assertEqual(TEST_DATA, self.get_file_content(t.as_file()))
|
||||
|
||||
def test_file_with_custom_dirname(self):
|
||||
tempfile = self._create_temp_file(content=TEST_DATA)
|
||||
tempfile_dir = os.path.dirname(tempfile)
|
||||
tempfile_basename = os.path.basename(tempfile)
|
||||
obj = {TEST_FILE_KEY: tempfile_basename}
|
||||
t = FileOrData(obj=obj, file_key_name=TEST_FILE_KEY,
|
||||
file_base_path=tempfile_dir)
|
||||
self.assertEqual(TEST_DATA, self.get_file_content(t.as_file()))
|
||||
|
||||
def test_create_temp_file_with_content(self):
|
||||
self.assertEqual(TEST_DATA,
|
||||
self.get_file_content(
|
||||
_create_temp_file_with_content(TEST_DATA)))
|
||||
_cleanup_temp_files()
|
||||
|
||||
|
||||
class TestConfigNode(BaseTestCase):
|
||||
@ -163,11 +188,6 @@ class TestConfigNode(BaseTestCase):
|
||||
self.assertEqual("test_obj/with_names[name=test_name3]",
|
||||
node.get_with_name("test_name3").name)
|
||||
|
||||
def expect_exception(self, func, message_part):
|
||||
with self.assertRaises(ConfigException) as context:
|
||||
func()
|
||||
self.assertIn(message_part, str(context.exception))
|
||||
|
||||
def test_key_does_not_exists(self):
|
||||
self.expect_exception(lambda: self.node['not-exists-key'],
|
||||
"Expected key not-exists-key in test_obj")
|
||||
@ -281,6 +301,13 @@ class TestKubeConfigLoader(BaseTestCase):
|
||||
"user": "ssl-no_file"
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "ssl-local-file",
|
||||
"context": {
|
||||
"cluster": "ssl-local-file",
|
||||
"user": "ssl-local-file"
|
||||
}
|
||||
},
|
||||
],
|
||||
"clusters": [
|
||||
{
|
||||
@ -296,6 +323,13 @@ class TestKubeConfigLoader(BaseTestCase):
|
||||
"certificate-authority": TEST_CERTIFICATE_AUTH,
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "ssl-local-file",
|
||||
"cluster": {
|
||||
"server": TEST_SSL_HOST,
|
||||
"certificate-authority": "cert_test",
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "ssl",
|
||||
"cluster": {
|
||||
@ -340,6 +374,14 @@ class TestKubeConfigLoader(BaseTestCase):
|
||||
"client-key": TEST_CLIENT_KEY,
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "ssl-local-file",
|
||||
"user": {
|
||||
"tokenFile": "token_file",
|
||||
"client-certificate": "client_cert",
|
||||
"client-key": "client_key",
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "ssl",
|
||||
"user": {
|
||||
@ -420,11 +462,11 @@ class TestKubeConfigLoader(BaseTestCase):
|
||||
ssl_ca_cert=TEST_CERTIFICATE_AUTH
|
||||
)
|
||||
actual = FakeConfig()
|
||||
KubeConfigLoader(
|
||||
loader = KubeConfigLoader(
|
||||
config_dict=self.TEST_KUBE_CONFIG,
|
||||
active_context="ssl-no_file",
|
||||
client_configuration=actual).load_and_set()
|
||||
self.assertEqual(expected, actual)
|
||||
client_configuration=actual)
|
||||
self.expect_exception(loader.load_and_set, "does not exists")
|
||||
|
||||
def test_ssl(self):
|
||||
expected = FakeConfig(
|
||||
@ -464,6 +506,34 @@ class TestKubeConfigLoader(BaseTestCase):
|
||||
self.assertEqual(expected_contexts.get_with_name("ssl").value,
|
||||
loader.current_context)
|
||||
|
||||
def test_ssl_with_relative_ssl_files(self):
|
||||
expected = FakeConfig(
|
||||
host=TEST_SSL_HOST,
|
||||
token=TEST_DATA_BASE64,
|
||||
cert_file=self._create_temp_file(TEST_CLIENT_CERT),
|
||||
key_file=self._create_temp_file(TEST_CLIENT_KEY),
|
||||
ssl_ca_cert=self._create_temp_file(TEST_CERTIFICATE_AUTH)
|
||||
)
|
||||
try:
|
||||
temp_dir = tempfile.mkdtemp()
|
||||
actual = FakeConfig()
|
||||
with open(os.path.join(temp_dir, "cert_test"), "wb") as fd:
|
||||
fd.write(TEST_CERTIFICATE_AUTH.encode())
|
||||
with open(os.path.join(temp_dir, "client_cert"), "wb") as fd:
|
||||
fd.write(TEST_CLIENT_CERT.encode())
|
||||
with open(os.path.join(temp_dir, "client_key"), "wb") as fd:
|
||||
fd.write(TEST_CLIENT_KEY.encode())
|
||||
with open(os.path.join(temp_dir, "token_file"), "wb") as fd:
|
||||
fd.write(TEST_DATA.encode())
|
||||
KubeConfigLoader(
|
||||
config_dict=self.TEST_KUBE_CONFIG,
|
||||
active_context="ssl-local-file",
|
||||
config_base_path=temp_dir,
|
||||
client_configuration=actual).load_and_set()
|
||||
self.assertEqual(expected, actual)
|
||||
finally:
|
||||
shutil.rmtree(temp_dir)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
||||
Loading…
Reference in New Issue
Block a user