Merge remote-tracking branch 'upstream/master' into fix_base64_padding_for_kconfig

This commit is contained in:
Ben Picolo 2019-03-30 09:35:41 -04:00
commit 72a02cca50
25 changed files with 1184 additions and 88 deletions

View File

@ -1,6 +1,6 @@
# ref: https://docs.travis-ci.com/user/languages/python
language: python
dist: trusty
dist: xenial
sudo: required
matrix:
@ -10,7 +10,7 @@ matrix:
- python: 2.7
env: TOXENV=py27-functional
- python: 2.7
env: TOXENV=update-pep8
env: TOXENV=update-pycodestyle
- python: 2.7
env: TOXENV=docs
- python: 2.7
@ -25,10 +25,15 @@ matrix:
env: TOXENV=py36
- python: 3.6
env: TOXENV=py36-functional
- python: 3.7
env: TOXENV=py37
- python: 3.7
env: TOXENV=py37-functional
install:
- pip install tox
script:
- ./run_tox.sh tox
- ./hack/verify-boilerplate.sh

2
OWNERS
View File

@ -1,3 +1,5 @@
# See the OWNERS docs at https://go.k8s.io/owners
approvers:
- mbohlool
- yliaog

View File

@ -1,3 +1,5 @@
#!/usr/bin/env python
# Copyright 2016 The Kubernetes Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");

View File

@ -1,3 +1,5 @@
#!/usr/bin/env python
# Copyright 2016 The Kubernetes Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");

View File

@ -1,3 +1,5 @@
#!/usr/bin/env python
# Copyright 2017 The Kubernetes Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");

View File

@ -1,3 +1,5 @@
#!/usr/bin/env python
# Copyright 2016 The Kubernetes Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");

97
config/exec_provider.py Normal file
View File

@ -0,0 +1,97 @@
#!/usr/bin/env python
# Copyright 2018 The Kubernetes Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import os
import subprocess
import sys
from .config_exception import ConfigException
class ExecProvider(object):
"""
Implementation of the proposal for out-of-tree client
authentication providers as described here --
https://github.com/kubernetes/community/blob/master/contributors/design-proposals/auth/kubectl-exec-plugins.md
Missing from implementation:
* TLS cert support
* caching
"""
def __init__(self, exec_config):
"""
exec_config must be of type ConfigNode because we depend on
safe_get(self, key) to correctly handle optional exec provider
config parameters.
"""
for key in ['command', 'apiVersion']:
if key not in exec_config:
raise ConfigException(
'exec: malformed request. missing key \'%s\'' % key)
self.api_version = exec_config['apiVersion']
self.args = [exec_config['command']]
if exec_config.safe_get('args'):
self.args.extend(exec_config['args'])
self.env = os.environ.copy()
if exec_config.safe_get('env'):
additional_vars = {}
for item in exec_config['env']:
name = item['name']
value = item['value']
additional_vars[name] = value
self.env.update(additional_vars)
def run(self, previous_response=None):
kubernetes_exec_info = {
'apiVersion': self.api_version,
'kind': 'ExecCredential',
'spec': {
'interactive': sys.stdout.isatty()
}
}
if previous_response:
kubernetes_exec_info['spec']['response'] = previous_response
self.env['KUBERNETES_EXEC_INFO'] = json.dumps(kubernetes_exec_info)
process = subprocess.Popen(
self.args,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=self.env,
universal_newlines=True)
(stdout, stderr) = process.communicate()
exit_code = process.wait()
if exit_code != 0:
msg = 'exec: process returned %d' % exit_code
stderr = stderr.strip()
if stderr:
msg += '. %s' % stderr
raise ConfigException(msg)
try:
data = json.loads(stdout)
except ValueError as de:
raise ConfigException(
'exec: failed to decode process output: %s' % de)
for key in ('apiVersion', 'kind', 'status'):
if key not in data:
raise ConfigException(
'exec: malformed response. missing key \'%s\'' % key)
if data['apiVersion'] != self.api_version:
raise ConfigException(
'exec: plugin api version %s does not match %s' %
(data['apiVersion'], self.api_version))
return data['status']

View File

@ -0,0 +1,147 @@
#!/usr/bin/env python
# Copyright 2018 The Kubernetes Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import unittest
import mock
from .config_exception import ConfigException
from .exec_provider import ExecProvider
from .kube_config import ConfigNode
class ExecProviderTest(unittest.TestCase):
def setUp(self):
self.input_ok = ConfigNode('test', {
'command': 'aws-iam-authenticator',
'args': ['token', '-i', 'dummy'],
'apiVersion': 'client.authentication.k8s.io/v1beta1',
'env': None
})
self.output_ok = """
{
"apiVersion": "client.authentication.k8s.io/v1beta1",
"kind": "ExecCredential",
"status": {
"token": "dummy"
}
}
"""
def test_missing_input_keys(self):
exec_configs = [ConfigNode('test1', {}),
ConfigNode('test2', {'command': ''}),
ConfigNode('test3', {'apiVersion': ''})]
for exec_config in exec_configs:
with self.assertRaises(ConfigException) as context:
ExecProvider(exec_config)
self.assertIn('exec: malformed request. missing key',
context.exception.args[0])
@mock.patch('subprocess.Popen')
def test_error_code_returned(self, mock):
instance = mock.return_value
instance.wait.return_value = 1
instance.communicate.return_value = ('', '')
with self.assertRaises(ConfigException) as context:
ep = ExecProvider(self.input_ok)
ep.run()
self.assertIn('exec: process returned %d' %
instance.wait.return_value, context.exception.args[0])
@mock.patch('subprocess.Popen')
def test_nonjson_output_returned(self, mock):
instance = mock.return_value
instance.wait.return_value = 0
instance.communicate.return_value = ('', '')
with self.assertRaises(ConfigException) as context:
ep = ExecProvider(self.input_ok)
ep.run()
self.assertIn('exec: failed to decode process output',
context.exception.args[0])
@mock.patch('subprocess.Popen')
def test_missing_output_keys(self, mock):
instance = mock.return_value
instance.wait.return_value = 0
outputs = [
"""
{
"kind": "ExecCredential",
"status": {
"token": "dummy"
}
}
""", """
{
"apiVersion": "client.authentication.k8s.io/v1beta1",
"status": {
"token": "dummy"
}
}
""", """
{
"apiVersion": "client.authentication.k8s.io/v1beta1",
"kind": "ExecCredential"
}
"""
]
for output in outputs:
instance.communicate.return_value = (output, '')
with self.assertRaises(ConfigException) as context:
ep = ExecProvider(self.input_ok)
ep.run()
self.assertIn('exec: malformed response. missing key',
context.exception.args[0])
@mock.patch('subprocess.Popen')
def test_mismatched_api_version(self, mock):
instance = mock.return_value
instance.wait.return_value = 0
wrong_api_version = 'client.authentication.k8s.io/v1'
output = """
{
"apiVersion": "%s",
"kind": "ExecCredential",
"status": {
"token": "dummy"
}
}
""" % wrong_api_version
instance.communicate.return_value = (output, '')
with self.assertRaises(ConfigException) as context:
ep = ExecProvider(self.input_ok)
ep.run()
self.assertIn(
'exec: plugin api version %s does not match' %
wrong_api_version,
context.exception.args[0])
@mock.patch('subprocess.Popen')
def test_ok_01(self, mock):
instance = mock.return_value
instance.wait.return_value = 0
instance.communicate.return_value = (self.output_ok, '')
ep = ExecProvider(self.input_ok)
result = ep.run()
self.assertTrue(isinstance(result, dict))
self.assertTrue('token' in result)
if __name__ == '__main__':
unittest.main()

View File

@ -1,3 +1,5 @@
#!/usr/bin/env python
# Copyright 2016 The Kubernetes Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
@ -85,7 +87,8 @@ class InClusterConfigLoader(object):
def load_incluster_config():
"""Use the service account kubernetes gives to pods to connect to kubernetes
"""
Use the service account kubernetes gives to pods to connect to kubernetes
cluster. It's intended for clients that expect to be running inside a pod
running on kubernetes. It will raise an exception if called from a process
not running in a kubernetes environment."""

View File

@ -1,3 +1,5 @@
#!/usr/bin/env python
# Copyright 2016 The Kubernetes Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");

View File

@ -1,4 +1,6 @@
# Copyright 2016 The Kubernetes Authors.
#!/usr/bin/env python
# Copyright 2018 The Kubernetes Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -14,13 +16,15 @@
import atexit
import base64
import copy
import datetime
import json
import logging
import os
import platform
import tempfile
import time
import adal
import google.auth
import google.auth.transport.requests
import oauthlib.oauth2
@ -30,12 +34,19 @@ from requests_oauthlib import OAuth2Session
from six import PY3
from kubernetes.client import ApiClient, Configuration
from kubernetes.config.exec_provider import ExecProvider
from .config_exception import ConfigException
from .dateutil import UTC, format_rfc3339, parse_rfc3339
try:
import adal
except ImportError:
pass
EXPIRY_SKEW_PREVENTION_DELAY = datetime.timedelta(minutes=5)
KUBE_CONFIG_DEFAULT_LOCATION = os.environ.get('KUBECONFIG', '~/.kube/config')
ENV_KUBECONFIG_PATH_SEPARATOR = ';' if platform.system() == 'Windows' else ':'
_temp_files = {}
@ -98,8 +109,12 @@ class FileOrData(object):
use_data_if_no_file = not self._file and self._data
if use_data_if_no_file:
if self._base64_file_content:
if isinstance(self._data, str):
content = self._data.encode()
else:
content = self._data
self._file = _create_temp_file_with_content(
base64.decodestring(self._data.encode()))
base64.standard_b64decode(content))
else:
self._file = _create_temp_file_with_content(self._data)
if self._file and not os.path.isfile(self._file):
@ -114,7 +129,7 @@ class FileOrData(object):
with open(self._file) as f:
if self._base64_file_content:
self._data = bytes.decode(
base64.encodestring(str.encode(f.read())))
base64.standard_b64encode(str.encode(f.read())))
else:
self._data = f.read()
return self._data
@ -126,7 +141,12 @@ class KubeConfigLoader(object):
get_google_credentials=None,
config_base_path="",
config_persister=None):
self._config = ConfigNode('kube-config', config_dict)
if isinstance(config_dict, ConfigNode):
self._config = config_dict
else:
self._config = ConfigNode('kube-config', config_dict)
self._current_context = None
self._user = None
self._cluster = None
@ -172,11 +192,10 @@ class KubeConfigLoader(object):
section of kube-config and stops if it finds a valid authentication
method. The order of authentication methods is:
1. GCP auth-provider
2. token_data
3. token field (point to a token file)
4. oidc auth-provider
5. username/password
1. auth-provider (gcp, azure, oidc)
2. token field (point to a token file)
3. exec provided plugin
4. username/password
"""
if not self._user:
return
@ -184,6 +203,8 @@ class KubeConfigLoader(object):
return
if self._load_user_token():
return
if self._load_from_exec_plugin():
return
self._load_user_pass_token()
def _load_auth_provider_token(self):
@ -211,6 +232,9 @@ class KubeConfigLoader(object):
return self.token
def _refresh_azure_token(self, config):
if 'adal' not in globals():
raise ImportError('refresh token error, adal library not imported')
tenant = config['tenant-id']
authority = 'https://login.microsoftonline.com/{}'.format(tenant)
context = adal.AuthenticationContext(
@ -353,10 +377,24 @@ class KubeConfigLoader(object):
provider['config'].value['id-token'] = refresh['id_token']
provider['config'].value['refresh-token'] = refresh['refresh_token']
def _load_from_exec_plugin(self):
if 'exec' not in self._user:
return
try:
status = ExecProvider(self._user['exec']).run()
if 'token' not in status:
logging.error('exec: missing token field in plugin output')
return None
self.token = "Bearer %s" % status['token']
return True
except Exception as e:
logging.error(str(e))
def _load_user_token(self):
base_path = self._get_base_path(self._user.path)
token = FileOrData(
self._user, 'tokenFile', 'token',
file_base_path=self._config_base_path,
file_base_path=base_path,
base64_file_content=False).as_data()
if token:
self.token = "Bearer %s" % token
@ -369,24 +407,48 @@ class KubeConfigLoader(object):
self._user['password'])).get('authorization')
return True
def _get_base_path(self, config_path):
if self._config_base_path is not None:
return self._config_base_path
if config_path is not None:
return os.path.abspath(os.path.dirname(config_path))
return ""
def _load_cluster_info(self):
if 'server' in self._cluster:
self.host = self._cluster['server']
self.host = self._cluster['server'].rstrip('/')
if self.host.startswith("https"):
base_path = self._get_base_path(self._cluster.path)
self.ssl_ca_cert = FileOrData(
self._cluster, 'certificate-authority',
file_base_path=self._config_base_path).as_file()
file_base_path=base_path).as_file()
self.cert_file = FileOrData(
self._user, 'client-certificate',
file_base_path=self._config_base_path).as_file()
file_base_path=base_path).as_file()
self.key_file = FileOrData(
self._user, 'client-key',
file_base_path=self._config_base_path).as_file()
file_base_path=base_path).as_file()
if 'insecure-skip-tls-verify' in self._cluster:
self.verify_ssl = not self._cluster['insecure-skip-tls-verify']
def _using_gcp_auth_provider(self):
return self._user and \
'auth-provider' in self._user and \
'name' in self._user['auth-provider'] and \
self._user['auth-provider']['name'] == 'gcp'
def _set_config(self, client_configuration):
if self._using_gcp_auth_provider():
# GCP auth tokens must be refreshed regularly, but swagger expects
# a constant token. Replace the swagger-generated client config's
# get_api_key_with_prefix method with our own to allow automatic
# token refresh.
def _gcp_get_api_key(*args):
return self._load_gcp_token(self._user['auth-provider'])
client_configuration.get_api_key_with_prefix = _gcp_get_api_key
if 'token' in self.__dict__:
# Note: this line runs for GCP auth tokens as well, but this entry
# will not be updated upon GCP token refresh.
client_configuration.api_key['authorization'] = self.token
# copy these keys directly from self to configuration object
keys = ['host', 'ssl_ca_cert', 'cert_file', 'key_file', 'verify_ssl']
@ -412,9 +474,10 @@ class ConfigNode(object):
message in case of missing keys. The assumption is all access keys are
present in a well-formed kube-config."""
def __init__(self, name, value):
def __init__(self, name, value, path=None):
self.name = name
self.value = value
self.path = path
def __contains__(self, key):
return key in self.value
@ -434,7 +497,7 @@ class ConfigNode(object):
'Invalid kube-config file. Expected key %s in %s'
% (key, self.name))
if isinstance(v, dict) or isinstance(v, list):
return ConfigNode('%s/%s' % (self.name, key), v)
return ConfigNode('%s/%s' % (self.name, key), v, self.path)
else:
return v
@ -459,7 +522,12 @@ class ConfigNode(object):
'Expected only one object with name %s in %s list'
% (name, self.name))
if result is not None:
return ConfigNode('%s[name=%s]' % (self.name, name), result)
if isinstance(result, ConfigNode):
return result
else:
return ConfigNode(
'%s[name=%s]' %
(self.name, name), result, self.path)
if safe:
return None
raise ConfigException(
@ -467,18 +535,87 @@ class ConfigNode(object):
'Expected object with name %s in %s list' % (name, self.name))
def _get_kube_config_loader_for_yaml_file(filename, **kwargs):
with open(filename) as f:
return KubeConfigLoader(
config_dict=yaml.load(f),
config_base_path=os.path.abspath(os.path.dirname(filename)),
**kwargs)
class KubeConfigMerger:
"""Reads and merges configuration from one or more kube-config's.
The propery `config` can be passed to the KubeConfigLoader as config_dict.
It uses a path attribute from ConfigNode to store the path to kubeconfig.
This path is required to load certs from relative paths.
A method `save_changes` updates changed kubeconfig's (it compares current
state of dicts with).
"""
def __init__(self, paths):
self.paths = []
self.config_files = {}
self.config_merged = None
for path in paths.split(ENV_KUBECONFIG_PATH_SEPARATOR):
if path:
path = os.path.expanduser(path)
if os.path.exists(path):
self.paths.append(path)
self.load_config(path)
self.config_saved = copy.deepcopy(self.config_files)
@property
def config(self):
return self.config_merged
def load_config(self, path):
with open(path) as f:
config = yaml.safe_load(f)
if self.config_merged is None:
config_merged = copy.deepcopy(config)
for item in ('clusters', 'contexts', 'users'):
config_merged[item] = []
self.config_merged = ConfigNode(path, config_merged, path)
for item in ('clusters', 'contexts', 'users'):
self._merge(item, config[item], path)
self.config_files[path] = config
def _merge(self, item, add_cfg, path):
for new_item in add_cfg:
for exists in self.config_merged.value[item]:
if exists['name'] == new_item['name']:
break
else:
self.config_merged.value[item].append(ConfigNode(
'{}/{}'.format(path, new_item), new_item, path))
def save_changes(self):
for path in self.paths:
if self.config_saved[path] != self.config_files[path]:
self.save_config(path)
self.config_saved = copy.deepcopy(self.config_files)
def save_config(self, path):
with open(path, 'w') as f:
yaml.safe_dump(self.config_files[path], f,
default_flow_style=False)
def _get_kube_config_loader_for_yaml_file(
filename, persist_config=False, **kwargs):
kcfg = KubeConfigMerger(filename)
if persist_config and 'config_persister' not in kwargs:
kwargs['config_persister'] = kcfg.save_changes()
return KubeConfigLoader(
config_dict=kcfg.config,
config_base_path=None,
**kwargs)
def list_kube_config_contexts(config_file=None):
if config_file is None:
config_file = os.path.expanduser(KUBE_CONFIG_DEFAULT_LOCATION)
config_file = KUBE_CONFIG_DEFAULT_LOCATION
loader = _get_kube_config_loader_for_yaml_file(config_file)
return loader.list_contexts(), loader.current_context
@ -500,18 +637,12 @@ def load_kube_config(config_file=None, context=None,
"""
if config_file is None:
config_file = os.path.expanduser(KUBE_CONFIG_DEFAULT_LOCATION)
config_persister = None
if persist_config:
def _save_kube_config(config_map):
with open(config_file, 'w') as f:
yaml.safe_dump(config_map, f, default_flow_style=False)
config_persister = _save_kube_config
config_file = KUBE_CONFIG_DEFAULT_LOCATION
loader = _get_kube_config_loader_for_yaml_file(
config_file, active_context=context,
config_persister=config_persister)
persist_config=persist_config)
if client_configuration is None:
config = type.__call__(Configuration)
loader.load_and_set(config)
@ -524,9 +655,11 @@ def new_client_from_config(
config_file=None,
context=None,
persist_config=True):
"""Loads configuration the same as load_kube_config but returns an ApiClient
"""
Loads configuration the same as load_kube_config but returns an ApiClient
to be used with any API object. This will allow the caller to concurrently
talk with multiple clusters."""
talk with multiple clusters.
"""
client_config = type.__call__(Configuration)
load_kube_config(config_file=config_file, context=context,
client_configuration=client_config,

View File

@ -1,4 +1,6 @@
# Copyright 2016 The Kubernetes Authors.
#!/usr/bin/env python
# Copyright 2018 The Kubernetes Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -24,8 +26,11 @@ import mock
import yaml
from six import PY3, next
from kubernetes.client import Configuration
from .config_exception import ConfigException
from .kube_config import (ConfigNode, FileOrData, KubeConfigLoader,
from .kube_config import (ENV_KUBECONFIG_PATH_SEPARATOR, ConfigNode,
FileOrData, KubeConfigLoader, KubeConfigMerger,
_cleanup_temp_files, _create_temp_file_with_content,
list_kube_config_contexts, load_kube_config,
new_client_from_config)
@ -34,13 +39,15 @@ BEARER_TOKEN_FORMAT = "Bearer %s"
EXPIRY_DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ"
# should be less than kube_config.EXPIRY_SKEW_PREVENTION_DELAY
EXPIRY_TIMEDELTA = 2
PAST_EXPIRY_TIMEDELTA = 2
# should be more than kube_config.EXPIRY_SKEW_PREVENTION_DELAY
FUTURE_EXPIRY_TIMEDELTA = 60
NON_EXISTING_FILE = "zz_non_existing_file_472398324"
def _base64(string):
return base64.encodestring(string.encode()).decode()
return base64.standard_b64encode(string.encode()).decode()
def _urlsafe_unpadded_b64encode(string):
@ -51,9 +58,9 @@ def _format_expiry_datetime(dt):
return dt.strftime(EXPIRY_DATETIME_FORMAT)
def _get_expiry(loader):
def _get_expiry(loader, active_context):
expired_gcp_conf = (item for item in loader._config.value.get("users")
if item.get("name") == "expired_gcp")
if item.get("name") == active_context)
return next(expired_gcp_conf).get("user").get("auth-provider") \
.get("config").get("expiry")
@ -77,8 +84,11 @@ TEST_USERNAME = "me"
TEST_PASSWORD = "pass"
# token for me:pass
TEST_BASIC_TOKEN = "Basic bWU6cGFzcw=="
TEST_TOKEN_EXPIRY = _format_expiry_datetime(
datetime.datetime.utcnow() - datetime.timedelta(minutes=EXPIRY_TIMEDELTA))
DATETIME_EXPIRY_PAST = datetime.datetime.utcnow(
) - datetime.timedelta(minutes=PAST_EXPIRY_TIMEDELTA)
DATETIME_EXPIRY_FUTURE = datetime.datetime.utcnow(
) + datetime.timedelta(minutes=FUTURE_EXPIRY_TIMEDELTA)
TEST_TOKEN_EXPIRY_PAST = _format_expiry_datetime(DATETIME_EXPIRY_PAST)
TEST_SSL_HOST = "https://test-host"
TEST_CERTIFICATE_AUTH = "cert-auth"
@ -226,6 +236,18 @@ class TestFileOrData(BaseTestCase):
_create_temp_file_with_content(TEST_DATA)))
_cleanup_temp_files()
def test_file_given_data_bytes(self):
obj = {TEST_DATA_KEY: TEST_DATA_BASE64.encode()}
t = FileOrData(obj=obj, file_key_name=TEST_FILE_KEY,
data_key_name=TEST_DATA_KEY)
self.assertEqual(TEST_DATA, self.get_file_content(t.as_file()))
def test_file_given_data_bytes_no_base64(self):
obj = {TEST_DATA_KEY: TEST_DATA.encode()}
t = FileOrData(obj=obj, file_key_name=TEST_FILE_KEY,
data_key_name=TEST_DATA_KEY, base64_file_content=False)
self.assertEqual(TEST_DATA, self.get_file_content(t.as_file()))
class TestConfigNode(BaseTestCase):
@ -384,6 +406,13 @@ class TestKubeConfigLoader(BaseTestCase):
"user": "expired_gcp"
}
},
{
"name": "expired_gcp_refresh",
"context": {
"cluster": "default",
"user": "expired_gcp_refresh"
}
},
{
"name": "oidc",
"context": {
@ -463,6 +492,13 @@ class TestKubeConfigLoader(BaseTestCase):
"user": "non_existing_user"
}
},
{
"name": "exec_cred_user",
"context": {
"cluster": "default",
"user": "exec_cred_user"
}
},
],
"clusters": [
{
@ -531,7 +567,24 @@ class TestKubeConfigLoader(BaseTestCase):
"name": "gcp",
"config": {
"access-token": TEST_DATA_BASE64,
"expiry": TEST_TOKEN_EXPIRY, # always in past
"expiry": TEST_TOKEN_EXPIRY_PAST, # always in past
}
},
"token": TEST_DATA_BASE64, # should be ignored
"username": TEST_USERNAME, # should be ignored
"password": TEST_PASSWORD, # should be ignored
}
},
# Duplicated from "expired_gcp" so test_load_gcp_token_with_refresh
# is isolated from test_gcp_get_api_key_with_prefix.
{
"name": "expired_gcp_refresh",
"user": {
"auth-provider": {
"name": "gcp",
"config": {
"access-token": TEST_DATA_BASE64,
"expiry": TEST_TOKEN_EXPIRY_PAST, # always in past
}
},
"token": TEST_DATA_BASE64, # should be ignored
@ -646,6 +699,16 @@ class TestKubeConfigLoader(BaseTestCase):
"client-key-data": TEST_CLIENT_KEY_BASE64,
}
},
{
"name": "exec_cred_user",
"user": {
"exec": {
"apiVersion": "client.authentication.k8s.io/v1beta1",
"command": "aws-iam-authenticator",
"args": ["token", "-i", "dummy-cluster"]
}
}
},
]
}
@ -674,16 +737,20 @@ class TestKubeConfigLoader(BaseTestCase):
self.assertEqual(BEARER_TOKEN_FORMAT % TEST_DATA_BASE64, loader.token)
def test_gcp_no_refresh(self):
expected = FakeConfig(
host=TEST_HOST,
token=BEARER_TOKEN_FORMAT % TEST_DATA_BASE64)
actual = FakeConfig()
fake_config = FakeConfig()
# swagger-generated config has this, but FakeConfig does not.
self.assertFalse(hasattr(fake_config, 'get_api_key_with_prefix'))
KubeConfigLoader(
config_dict=self.TEST_KUBE_CONFIG,
active_context="gcp",
get_google_credentials=lambda: _raise_exception(
"SHOULD NOT BE CALLED")).load_and_set(actual)
self.assertEqual(expected, actual)
"SHOULD NOT BE CALLED")).load_and_set(fake_config)
# Should now be populated with a gcp token fetcher.
self.assertIsNotNone(fake_config.get_api_key_with_prefix)
self.assertEqual(TEST_HOST, fake_config.host)
# For backwards compatibility, authorization field should still be set.
self.assertEqual(BEARER_TOKEN_FORMAT % TEST_DATA_BASE64,
fake_config.api_key['authorization'])
def test_load_gcp_token_no_refresh(self):
loader = KubeConfigLoader(
@ -698,20 +765,48 @@ class TestKubeConfigLoader(BaseTestCase):
def test_load_gcp_token_with_refresh(self):
def cred(): return None
cred.token = TEST_ANOTHER_DATA_BASE64
cred.expiry = datetime.datetime.now()
cred.expiry = datetime.datetime.utcnow()
loader = KubeConfigLoader(
config_dict=self.TEST_KUBE_CONFIG,
active_context="expired_gcp",
get_google_credentials=lambda: cred)
original_expiry = _get_expiry(loader)
original_expiry = _get_expiry(loader, "expired_gcp")
self.assertTrue(loader._load_auth_provider_token())
new_expiry = _get_expiry(loader)
new_expiry = _get_expiry(loader, "expired_gcp")
# assert that the configs expiry actually updates
self.assertTrue(new_expiry > original_expiry)
self.assertEqual(BEARER_TOKEN_FORMAT % TEST_ANOTHER_DATA_BASE64,
loader.token)
def test_gcp_get_api_key_with_prefix(self):
class cred_old:
token = TEST_DATA_BASE64
expiry = DATETIME_EXPIRY_PAST
class cred_new:
token = TEST_ANOTHER_DATA_BASE64
expiry = DATETIME_EXPIRY_FUTURE
fake_config = FakeConfig()
_get_google_credentials = mock.Mock()
_get_google_credentials.side_effect = [cred_old, cred_new]
loader = KubeConfigLoader(
config_dict=self.TEST_KUBE_CONFIG,
active_context="expired_gcp_refresh",
get_google_credentials=_get_google_credentials)
loader.load_and_set(fake_config)
original_expiry = _get_expiry(loader, "expired_gcp_refresh")
# Call GCP token fetcher.
token = fake_config.get_api_key_with_prefix()
new_expiry = _get_expiry(loader, "expired_gcp_refresh")
self.assertTrue(new_expiry > original_expiry)
self.assertEqual(BEARER_TOKEN_FORMAT % TEST_ANOTHER_DATA_BASE64,
loader.token)
self.assertEqual(BEARER_TOKEN_FORMAT % TEST_ANOTHER_DATA_BASE64,
token)
def test_oidc_no_refresh(self):
loader = KubeConfigLoader(
config_dict=self.TEST_KUBE_CONFIG,
@ -897,14 +992,16 @@ class TestKubeConfigLoader(BaseTestCase):
def test_load_kube_config(self):
expected = FakeConfig(host=TEST_HOST,
token=BEARER_TOKEN_FORMAT % TEST_DATA_BASE64)
config_file = self._create_temp_file(yaml.dump(self.TEST_KUBE_CONFIG))
config_file = self._create_temp_file(
yaml.safe_dump(self.TEST_KUBE_CONFIG))
actual = FakeConfig()
load_kube_config(config_file=config_file, context="simple_token",
client_configuration=actual)
self.assertEqual(expected, actual)
def test_list_kube_config_contexts(self):
config_file = self._create_temp_file(yaml.dump(self.TEST_KUBE_CONFIG))
config_file = self._create_temp_file(
yaml.safe_dump(self.TEST_KUBE_CONFIG))
contexts, active_context = list_kube_config_contexts(
config_file=config_file)
self.assertDictEqual(self.TEST_KUBE_CONFIG['contexts'][0],
@ -917,7 +1014,8 @@ class TestKubeConfigLoader(BaseTestCase):
contexts)
def test_new_client_from_config(self):
config_file = self._create_temp_file(yaml.dump(self.TEST_KUBE_CONFIG))
config_file = self._create_temp_file(
yaml.safe_dump(self.TEST_KUBE_CONFIG))
client = new_client_from_config(
config_file=config_file, context="simple_token")
self.assertEqual(TEST_HOST, client.configuration.host)
@ -942,6 +1040,210 @@ class TestKubeConfigLoader(BaseTestCase):
active_context="non_existing_user").load_and_set(actual)
self.assertEqual(expected, actual)
@mock.patch('kubernetes.config.kube_config.ExecProvider.run')
def test_user_exec_auth(self, mock):
token = "dummy"
mock.return_value = {
"token": token
}
expected = FakeConfig(host=TEST_HOST, api_key={
"authorization": BEARER_TOKEN_FORMAT % token})
actual = FakeConfig()
KubeConfigLoader(
config_dict=self.TEST_KUBE_CONFIG,
active_context="exec_cred_user").load_and_set(actual)
self.assertEqual(expected, actual)
class TestKubernetesClientConfiguration(BaseTestCase):
# Verifies properties of kubernetes.client.Configuration.
# These tests guard against changes to the upstream configuration class,
# since GCP authorization overrides get_api_key_with_prefix to refresh its
# token regularly.
def test_get_api_key_with_prefix_exists(self):
self.assertTrue(hasattr(Configuration, 'get_api_key_with_prefix'))
def test_get_api_key_with_prefix_returns_token(self):
expected_token = 'expected_token'
config = Configuration()
config.api_key['authorization'] = expected_token
self.assertEqual(expected_token,
config.get_api_key_with_prefix('authorization'))
def test_auth_settings_calls_get_api_key_with_prefix(self):
expected_token = 'expected_token'
def fake_get_api_key_with_prefix(identifier):
self.assertEqual('authorization', identifier)
return expected_token
config = Configuration()
config.get_api_key_with_prefix = fake_get_api_key_with_prefix
self.assertEqual(expected_token,
config.auth_settings()['BearerToken']['value'])
class TestKubeConfigMerger(BaseTestCase):
TEST_KUBE_CONFIG_PART1 = {
"current-context": "no_user",
"contexts": [
{
"name": "no_user",
"context": {
"cluster": "default"
}
},
],
"clusters": [
{
"name": "default",
"cluster": {
"server": TEST_HOST
}
},
],
"users": []
}
TEST_KUBE_CONFIG_PART2 = {
"current-context": "",
"contexts": [
{
"name": "ssl",
"context": {
"cluster": "ssl",
"user": "ssl"
}
},
{
"name": "simple_token",
"context": {
"cluster": "default",
"user": "simple_token"
}
},
],
"clusters": [
{
"name": "ssl",
"cluster": {
"server": TEST_SSL_HOST,
"certificate-authority-data":
TEST_CERTIFICATE_AUTH_BASE64,
}
},
],
"users": [
{
"name": "ssl",
"user": {
"token": TEST_DATA_BASE64,
"client-certificate-data": TEST_CLIENT_CERT_BASE64,
"client-key-data": TEST_CLIENT_KEY_BASE64,
}
},
]
}
TEST_KUBE_CONFIG_PART3 = {
"current-context": "no_user",
"contexts": [
{
"name": "expired_oidc",
"context": {
"cluster": "default",
"user": "expired_oidc"
}
},
{
"name": "ssl",
"context": {
"cluster": "skipped-part2-defined-this-context",
"user": "skipped"
}
},
],
"clusters": [
],
"users": [
{
"name": "expired_oidc",
"user": {
"auth-provider": {
"name": "oidc",
"config": {
"client-id": "tectonic-kubectl",
"client-secret": "FAKE_SECRET",
"id-token": TEST_OIDC_EXPIRED_LOGIN,
"idp-certificate-authority-data": TEST_OIDC_CA,
"idp-issuer-url": "https://example.org/identity",
"refresh-token":
"lucWJjEhlxZW01cXI3YmVlcYnpxNGhzk"
}
}
}
},
{
"name": "simple_token",
"user": {
"token": TEST_DATA_BASE64,
"username": TEST_USERNAME, # should be ignored
"password": TEST_PASSWORD, # should be ignored
}
},
]
}
def _create_multi_config(self):
files = []
for part in (
self.TEST_KUBE_CONFIG_PART1,
self.TEST_KUBE_CONFIG_PART2,
self.TEST_KUBE_CONFIG_PART3):
files.append(self._create_temp_file(yaml.safe_dump(part)))
return ENV_KUBECONFIG_PATH_SEPARATOR.join(files)
def test_list_kube_config_contexts(self):
kubeconfigs = self._create_multi_config()
expected_contexts = [
{'context': {'cluster': 'default'}, 'name': 'no_user'},
{'context': {'cluster': 'ssl', 'user': 'ssl'}, 'name': 'ssl'},
{'context': {'cluster': 'default', 'user': 'simple_token'},
'name': 'simple_token'},
{'context': {'cluster': 'default', 'user': 'expired_oidc'}, 'name': 'expired_oidc'}]
contexts, active_context = list_kube_config_contexts(
config_file=kubeconfigs)
self.assertEqual(contexts, expected_contexts)
self.assertEqual(active_context, expected_contexts[0])
def test_new_client_from_config(self):
kubeconfigs = self._create_multi_config()
client = new_client_from_config(
config_file=kubeconfigs, context="simple_token")
self.assertEqual(TEST_HOST, client.configuration.host)
self.assertEqual(BEARER_TOKEN_FORMAT % TEST_DATA_BASE64,
client.configuration.api_key['authorization'])
def test_save_changes(self):
kubeconfigs = self._create_multi_config()
# load configuration, update token, save config
kconf = KubeConfigMerger(kubeconfigs)
user = kconf.config['users'].get_with_name('expired_oidc')['user']
provider = user['auth-provider']['config']
provider.value['id-token'] = "token-changed"
kconf.save_changes()
# re-read configuration
kconf = KubeConfigMerger(kubeconfigs)
user = kconf.config['users'].get_with_name('expired_oidc')['user']
provider = user['auth-provider']['config']
# new token
self.assertEqual(provider.value['id-token'], "token-changed")
if __name__ == '__main__':
unittest.main()

201
hack/boilerplate/boilerplate.py Executable file
View File

@ -0,0 +1,201 @@
#!/usr/bin/env python
# Copyright 2018 The Kubernetes Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import argparse
import datetime
import difflib
import glob
import os
import re
import sys
parser = argparse.ArgumentParser()
parser.add_argument(
"filenames",
help="list of files to check, all files if unspecified",
nargs='*')
rootdir = os.path.dirname(__file__) + "/../../"
rootdir = os.path.abspath(rootdir)
parser.add_argument(
"--rootdir", default=rootdir, help="root directory to examine")
default_boilerplate_dir = os.path.join(rootdir, "hack/boilerplate")
parser.add_argument(
"--boilerplate-dir", default=default_boilerplate_dir)
parser.add_argument(
"-v", "--verbose",
help="give verbose output regarding why a file does not pass",
action="store_true")
args = parser.parse_args()
verbose_out = sys.stderr if args.verbose else open("/dev/null", "w")
def get_refs():
refs = {}
for path in glob.glob(os.path.join(
args.boilerplate_dir, "boilerplate.*.txt")):
extension = os.path.basename(path).split(".")[1]
ref_file = open(path, 'r')
ref = ref_file.read().splitlines()
ref_file.close()
refs[extension] = ref
return refs
def file_passes(filename, refs, regexs):
try:
f = open(filename, 'r')
except Exception as exc:
print("Unable to open %s: %s" % (filename, exc), file=verbose_out)
return False
data = f.read()
f.close()
basename = os.path.basename(filename)
extension = file_extension(filename)
if extension != "":
ref = refs[extension]
else:
ref = refs[basename]
# remove extra content from the top of files
if extension == "sh":
p = regexs["shebang"]
(data, found) = p.subn("", data, 1)
data = data.splitlines()
# if our test file is smaller than the reference it surely fails!
if len(ref) > len(data):
print('File %s smaller than reference (%d < %d)' %
(filename, len(data), len(ref)),
file=verbose_out)
return False
# trim our file to the same number of lines as the reference file
data = data[:len(ref)]
p = regexs["year"]
for d in data:
if p.search(d):
print('File %s has the YEAR field, but missing the year of date' %
filename, file=verbose_out)
return False
# Replace all occurrences of regex "2014|2015|2016|2017|2018" with "YEAR"
p = regexs["date"]
for i, d in enumerate(data):
(data[i], found) = p.subn('YEAR', d)
if found != 0:
break
# if we don't match the reference at this point, fail
if ref != data:
print("Header in %s does not match reference, diff:" %
filename, file=verbose_out)
if args.verbose:
print(file=verbose_out)
for line in difflib.unified_diff(
ref, data, 'reference', filename, lineterm=''):
print(line, file=verbose_out)
print(file=verbose_out)
return False
return True
def file_extension(filename):
return os.path.splitext(filename)[1].split(".")[-1].lower()
# list all the files contain 'DO NOT EDIT', but are not generated
skipped_ungenerated_files = ['hack/boilerplate/boilerplate.py']
def normalize_files(files):
newfiles = []
for pathname in files:
newfiles.append(pathname)
for i, pathname in enumerate(newfiles):
if not os.path.isabs(pathname):
newfiles[i] = os.path.join(args.rootdir, pathname)
return newfiles
def get_files(extensions):
files = []
if len(args.filenames) > 0:
files = args.filenames
else:
for root, dirs, walkfiles in os.walk(args.rootdir):
for name in walkfiles:
pathname = os.path.join(root, name)
files.append(pathname)
files = normalize_files(files)
outfiles = []
for pathname in files:
basename = os.path.basename(pathname)
extension = file_extension(pathname)
if extension in extensions or basename in extensions:
outfiles.append(pathname)
return outfiles
def get_dates():
years = datetime.datetime.now().year
return '(%s)' % '|'.join((str(year) for year in range(2014, years+1)))
def get_regexs():
regexs = {}
# Search for "YEAR" which exists in the boilerplate,
# but shouldn't in the real thing
regexs["year"] = re.compile('YEAR')
# get_dates return 2014, 2015, 2016, 2017, or 2018 until the current year
# as a regex like: "(2014|2015|2016|2017|2018)";
# company holder names can be anything
regexs["date"] = re.compile(get_dates())
# strip #!.* from shell scripts
regexs["shebang"] = re.compile(r"^(#!.*\n)\n*", re.MULTILINE)
return regexs
def main():
regexs = get_regexs()
refs = get_refs()
filenames = get_files(refs.keys())
for filename in filenames:
if not file_passes(filename, refs, regexs):
print(filename, file=sys.stdout)
return 0
if __name__ == "__main__":
sys.exit(main())

View File

@ -0,0 +1,15 @@
#!/usr/bin/env python
# Copyright YEAR The Kubernetes Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

View File

@ -0,0 +1,13 @@
# Copyright YEAR The Kubernetes Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

35
hack/verify-boilerplate.sh Executable file
View File

@ -0,0 +1,35 @@
#!/usr/bin/env bash
# Copyright 2018 The Kubernetes Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
set -o errexit
set -o nounset
set -o pipefail
KUBE_ROOT=$(dirname "${BASH_SOURCE}")/..
boilerDir="${KUBE_ROOT}/hack/boilerplate"
boiler="${boilerDir}/boilerplate.py"
files_need_boilerplate=($(${boiler} "$@"))
# Run boilerplate check
if [[ ${#files_need_boilerplate[@]} -gt 0 ]]; then
for file in "${files_need_boilerplate[@]}"; do
echo "Boilerplate header is wrong for: ${file}" >&2
done
exit 1
fi

View File

@ -11,7 +11,7 @@
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# See the License for the specific language governing permissions and
# limitations under the License.
set -o errexit
@ -51,4 +51,3 @@ git status
echo "Running tox from the main repo on $TOXENV environment"
# Run the user-provided command.
"${@}"

View File

@ -1,3 +1,5 @@
#!/usr/bin/env python
# Copyright 2017 The Kubernetes Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");

View File

@ -1,14 +1,18 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#!/usr/bin/env python
# Copyright 2018 The Kubernetes Authors.
#
# http://www.apache.org/licenses/LICENSE-2.0
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from . import ws_client

View File

@ -1,14 +1,18 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#!/usr/bin/env python
# Copyright 2018 The Kubernetes Authors.
#
# http://www.apache.org/licenses/LICENSE-2.0
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from kubernetes.client.rest import ApiException
@ -49,7 +53,8 @@ class WSClient:
header.append("authorization: %s" % headers['authorization'])
if headers and 'sec-websocket-protocol' in headers:
header.append("sec-websocket-protocol: %s" % headers['sec-websocket-protocol'])
header.append("sec-websocket-protocol: %s" %
headers['sec-websocket-protocol'])
else:
header.append("sec-websocket-protocol: v4.channel.k8s.io")
@ -182,8 +187,8 @@ class WSClient:
data = data[1:]
if data:
if channel in [STDOUT_CHANNEL, STDERR_CHANNEL]:
# keeping all messages in the order they received for
# non-blocking call.
# keeping all messages in the order they received
# for non-blocking call.
self._all += data
if channel not in self._channels:
self._channels[channel] = data

View File

@ -1,4 +1,6 @@
# Copyright 2017 The Kubernetes Authors.
#!/usr/bin/env python
# Copyright 2018 The Kubernetes Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.

View File

@ -1,6 +1,6 @@
[tox]
skipsdist = True
envlist = py27, py34, py35, py36
envlist = py27, py34, py35, py36, py37
[testenv]
passenv = TOXENV CI TRAVIS TRAVIS_*

View File

@ -1,3 +1,5 @@
#!/usr/bin/env python
# Copyright 2016 The Kubernetes Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");

View File

@ -1,3 +1,5 @@
#!/usr/bin/env python
# Copyright 2016 The Kubernetes Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
@ -18,6 +20,7 @@ import pydoc
from kubernetes import client
PYDOC_RETURN_LABEL = ":return:"
PYDOC_FOLLOW_PARAM = ":param bool follow:"
# Removing this suffix from return type name should give us event's object
# type. e.g., if list_namespaces() returns "NamespaceList" type,
@ -63,7 +66,7 @@ class Watch(object):
self._raw_return_type = return_type
self._stop = False
self._api_client = client.ApiClient()
self.resource_version = 0
self.resource_version = None
def stop(self):
self._stop = True
@ -76,8 +79,17 @@ class Watch(object):
return return_type[:-len(TYPE_LIST_SUFFIX)]
return return_type
def get_watch_argument_name(self, func):
if PYDOC_FOLLOW_PARAM in pydoc.getdoc(func):
return 'follow'
else:
return 'watch'
def unmarshal_event(self, data, return_type):
js = json.loads(data)
try:
js = json.loads(data)
except ValueError:
return data
js['raw_object'] = js['object']
if return_type:
obj = SimpleNamespace(data=json.dumps(js['raw_object']))
@ -120,8 +132,10 @@ class Watch(object):
self._stop = False
return_type = self.get_return_type(func)
kwargs['watch'] = True
kwargs[self.get_watch_argument_name(func)] = True
kwargs['_preload_content'] = False
if 'resource_version' in kwargs:
self.resource_version = kwargs['resource_version']
timeouts = ('timeout_seconds' in kwargs)
while True:
@ -132,9 +146,12 @@ class Watch(object):
if self._stop:
break
finally:
kwargs['resource_version'] = self.resource_version
resp.close()
resp.release_conn()
if self.resource_version is not None:
kwargs['resource_version'] = self.resource_version
else:
self._stop = True
if timeouts or self._stop:
break

View File

@ -1,3 +1,5 @@
#!/usr/bin/env python
# Copyright 2016 The Kubernetes Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
@ -14,12 +16,15 @@
import unittest
from mock import Mock
from mock import Mock, call
from .watch import Watch
class WatchTests(unittest.TestCase):
def setUp(self):
# counter for a test that needs test global state
self.callcount = 0
def test_watch_with_decode(self):
fake_resp = Mock()
@ -62,6 +67,103 @@ class WatchTests(unittest.TestCase):
fake_resp.close.assert_called_once()
fake_resp.release_conn.assert_called_once()
def test_watch_for_follow(self):
fake_resp = Mock()
fake_resp.close = Mock()
fake_resp.release_conn = Mock()
fake_resp.read_chunked = Mock(
return_value=[
'log_line_1\n',
'log_line_2\n'])
fake_api = Mock()
fake_api.read_namespaced_pod_log = Mock(return_value=fake_resp)
fake_api.read_namespaced_pod_log.__doc__ = ':param bool follow:\n:return: str'
w = Watch()
count = 1
for e in w.stream(fake_api.read_namespaced_pod_log):
self.assertEqual("log_line_1", e)
count += 1
# make sure we can stop the watch and the last event with won't be
# returned
if count == 2:
w.stop()
fake_api.read_namespaced_pod_log.assert_called_once_with(
_preload_content=False, follow=True)
fake_resp.read_chunked.assert_called_once_with(decode_content=False)
fake_resp.close.assert_called_once()
fake_resp.release_conn.assert_called_once()
def test_watch_resource_version_set(self):
# https://github.com/kubernetes-client/python/issues/700
# ensure watching from a resource version does reset to resource
# version 0 after k8s resets the watch connection
fake_resp = Mock()
fake_resp.close = Mock()
fake_resp.release_conn = Mock()
values = [
'{"type": "ADDED", "object": {"metadata": {"name": "test1",'
'"resourceVersion": "1"}, "spec": {}, "status": {}}}\n',
'{"type": "ADDED", "object": {"metadata": {"name": "test2",'
'"resourceVersion": "2"}, "spec": {}, "sta',
'tus": {}}}\n'
'{"type": "ADDED", "object": {"metadata": {"name": "test3",'
'"resourceVersion": "3"}, "spec": {}, "status": {}}}\n'
]
# return nothing on the first call and values on the second
# this emulates a watch from a rv that returns nothing in the first k8s
# watch reset and values later
def get_values(*args, **kwargs):
self.callcount += 1
if self.callcount == 1:
return []
else:
return values
fake_resp.read_chunked = Mock(
side_effect=get_values)
fake_api = Mock()
fake_api.get_namespaces = Mock(return_value=fake_resp)
fake_api.get_namespaces.__doc__ = ':return: V1NamespaceList'
w = Watch()
# ensure we keep our requested resource version or the version latest
# returned version when the existing versions are older than the
# requested version
# needed for the list existing objects, then watch from there use case
calls = []
iterations = 2
# first two calls must use the passed rv, the first call is a
# "reset" and does not actually return anything
# the second call must use the same rv but will return values
# (with a wrong rv but a real cluster would behave correctly)
# calls following that will use the rv from those returned values
calls.append(call(_preload_content=False, watch=True,
resource_version="5"))
calls.append(call(_preload_content=False, watch=True,
resource_version="5"))
for i in range(iterations):
# ideally we want 5 here but as rv must be treated as an
# opaque value we cannot interpret it and order it so rely
# on k8s returning the events completely and in order
calls.append(call(_preload_content=False, watch=True,
resource_version="3"))
for c, e in enumerate(w.stream(fake_api.get_namespaces,
resource_version="5")):
if c == len(values) * iterations:
w.stop()
# check calls are in the list, gives good error output
fake_api.get_namespaces.assert_has_calls(calls)
# more strict test with worse error message
self.assertEqual(fake_api.get_namespaces.mock_calls, calls)
def test_watch_stream_twice(self):
w = Watch(float)
for step in ['first', 'second']: