diff --git a/.travis.yml b/.travis.yml index 887d6647d..aa1d1d3cd 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,6 +1,6 @@ # ref: https://docs.travis-ci.com/user/languages/python language: python -dist: trusty +dist: xenial sudo: required matrix: @@ -10,7 +10,7 @@ matrix: - python: 2.7 env: TOXENV=py27-functional - python: 2.7 - env: TOXENV=update-pep8 + env: TOXENV=update-pycodestyle - python: 2.7 env: TOXENV=docs - python: 2.7 @@ -25,10 +25,15 @@ matrix: env: TOXENV=py36 - python: 3.6 env: TOXENV=py36-functional + - python: 3.7 + env: TOXENV=py37 + - python: 3.7 + env: TOXENV=py37-functional install: - pip install tox script: - ./run_tox.sh tox + - ./hack/verify-boilerplate.sh diff --git a/OWNERS b/OWNERS index 7a860ad20..cfec4b11e 100644 --- a/OWNERS +++ b/OWNERS @@ -1,3 +1,5 @@ +# See the OWNERS docs at https://go.k8s.io/owners + approvers: - mbohlool - yliaog diff --git a/config/__init__.py b/config/__init__.py index 3476ff714..02a7532d5 100644 --- a/config/__init__.py +++ b/config/__init__.py @@ -1,3 +1,5 @@ +#!/usr/bin/env python + # Copyright 2016 The Kubernetes Authors. # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/config/config_exception.py b/config/config_exception.py index 23fab022c..9bf049c69 100644 --- a/config/config_exception.py +++ b/config/config_exception.py @@ -1,3 +1,5 @@ +#!/usr/bin/env python + # Copyright 2016 The Kubernetes Authors. # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/config/dateutil.py b/config/dateutil.py index ed88cba8b..402751cd2 100644 --- a/config/dateutil.py +++ b/config/dateutil.py @@ -1,3 +1,5 @@ +#!/usr/bin/env python + # Copyright 2017 The Kubernetes Authors. # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/config/dateutil_test.py b/config/dateutil_test.py index deb0ea880..7a13fad04 100644 --- a/config/dateutil_test.py +++ b/config/dateutil_test.py @@ -1,3 +1,5 @@ +#!/usr/bin/env python + # Copyright 2016 The Kubernetes Authors. # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/config/exec_provider.py b/config/exec_provider.py new file mode 100644 index 000000000..a0348f1e9 --- /dev/null +++ b/config/exec_provider.py @@ -0,0 +1,97 @@ +#!/usr/bin/env python + +# Copyright 2018 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import json +import os +import subprocess +import sys + +from .config_exception import ConfigException + + +class ExecProvider(object): + """ + Implementation of the proposal for out-of-tree client + authentication providers as described here -- + https://github.com/kubernetes/community/blob/master/contributors/design-proposals/auth/kubectl-exec-plugins.md + + Missing from implementation: + + * TLS cert support + * caching + """ + + def __init__(self, exec_config): + """ + exec_config must be of type ConfigNode because we depend on + safe_get(self, key) to correctly handle optional exec provider + config parameters. + """ + for key in ['command', 'apiVersion']: + if key not in exec_config: + raise ConfigException( + 'exec: malformed request. missing key \'%s\'' % key) + self.api_version = exec_config['apiVersion'] + self.args = [exec_config['command']] + if exec_config.safe_get('args'): + self.args.extend(exec_config['args']) + self.env = os.environ.copy() + if exec_config.safe_get('env'): + additional_vars = {} + for item in exec_config['env']: + name = item['name'] + value = item['value'] + additional_vars[name] = value + self.env.update(additional_vars) + + def run(self, previous_response=None): + kubernetes_exec_info = { + 'apiVersion': self.api_version, + 'kind': 'ExecCredential', + 'spec': { + 'interactive': sys.stdout.isatty() + } + } + if previous_response: + kubernetes_exec_info['spec']['response'] = previous_response + self.env['KUBERNETES_EXEC_INFO'] = json.dumps(kubernetes_exec_info) + process = subprocess.Popen( + self.args, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + env=self.env, + universal_newlines=True) + (stdout, stderr) = process.communicate() + exit_code = process.wait() + if exit_code != 0: + msg = 'exec: process returned %d' % exit_code + stderr = stderr.strip() + if stderr: + msg += '. %s' % stderr + raise ConfigException(msg) + try: + data = json.loads(stdout) + except ValueError as de: + raise ConfigException( + 'exec: failed to decode process output: %s' % de) + for key in ('apiVersion', 'kind', 'status'): + if key not in data: + raise ConfigException( + 'exec: malformed response. missing key \'%s\'' % key) + if data['apiVersion'] != self.api_version: + raise ConfigException( + 'exec: plugin api version %s does not match %s' % + (data['apiVersion'], self.api_version)) + return data['status'] diff --git a/config/exec_provider_test.py b/config/exec_provider_test.py new file mode 100644 index 000000000..8b6517b01 --- /dev/null +++ b/config/exec_provider_test.py @@ -0,0 +1,147 @@ +#!/usr/bin/env python + +# Copyright 2018 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import unittest + +import mock + +from .config_exception import ConfigException +from .exec_provider import ExecProvider +from .kube_config import ConfigNode + + +class ExecProviderTest(unittest.TestCase): + + def setUp(self): + self.input_ok = ConfigNode('test', { + 'command': 'aws-iam-authenticator', + 'args': ['token', '-i', 'dummy'], + 'apiVersion': 'client.authentication.k8s.io/v1beta1', + 'env': None + }) + self.output_ok = """ + { + "apiVersion": "client.authentication.k8s.io/v1beta1", + "kind": "ExecCredential", + "status": { + "token": "dummy" + } + } + """ + + def test_missing_input_keys(self): + exec_configs = [ConfigNode('test1', {}), + ConfigNode('test2', {'command': ''}), + ConfigNode('test3', {'apiVersion': ''})] + for exec_config in exec_configs: + with self.assertRaises(ConfigException) as context: + ExecProvider(exec_config) + self.assertIn('exec: malformed request. missing key', + context.exception.args[0]) + + @mock.patch('subprocess.Popen') + def test_error_code_returned(self, mock): + instance = mock.return_value + instance.wait.return_value = 1 + instance.communicate.return_value = ('', '') + with self.assertRaises(ConfigException) as context: + ep = ExecProvider(self.input_ok) + ep.run() + self.assertIn('exec: process returned %d' % + instance.wait.return_value, context.exception.args[0]) + + @mock.patch('subprocess.Popen') + def test_nonjson_output_returned(self, mock): + instance = mock.return_value + instance.wait.return_value = 0 + instance.communicate.return_value = ('', '') + with self.assertRaises(ConfigException) as context: + ep = ExecProvider(self.input_ok) + ep.run() + self.assertIn('exec: failed to decode process output', + context.exception.args[0]) + + @mock.patch('subprocess.Popen') + def test_missing_output_keys(self, mock): + instance = mock.return_value + instance.wait.return_value = 0 + outputs = [ + """ + { + "kind": "ExecCredential", + "status": { + "token": "dummy" + } + } + """, """ + { + "apiVersion": "client.authentication.k8s.io/v1beta1", + "status": { + "token": "dummy" + } + } + """, """ + { + "apiVersion": "client.authentication.k8s.io/v1beta1", + "kind": "ExecCredential" + } + """ + ] + for output in outputs: + instance.communicate.return_value = (output, '') + with self.assertRaises(ConfigException) as context: + ep = ExecProvider(self.input_ok) + ep.run() + self.assertIn('exec: malformed response. missing key', + context.exception.args[0]) + + @mock.patch('subprocess.Popen') + def test_mismatched_api_version(self, mock): + instance = mock.return_value + instance.wait.return_value = 0 + wrong_api_version = 'client.authentication.k8s.io/v1' + output = """ + { + "apiVersion": "%s", + "kind": "ExecCredential", + "status": { + "token": "dummy" + } + } + """ % wrong_api_version + instance.communicate.return_value = (output, '') + with self.assertRaises(ConfigException) as context: + ep = ExecProvider(self.input_ok) + ep.run() + self.assertIn( + 'exec: plugin api version %s does not match' % + wrong_api_version, + context.exception.args[0]) + + @mock.patch('subprocess.Popen') + def test_ok_01(self, mock): + instance = mock.return_value + instance.wait.return_value = 0 + instance.communicate.return_value = (self.output_ok, '') + ep = ExecProvider(self.input_ok) + result = ep.run() + self.assertTrue(isinstance(result, dict)) + self.assertTrue('token' in result) + + +if __name__ == '__main__': + unittest.main() diff --git a/config/incluster_config.py b/config/incluster_config.py index 60fc0af82..c9bdc907d 100644 --- a/config/incluster_config.py +++ b/config/incluster_config.py @@ -1,3 +1,5 @@ +#!/usr/bin/env python + # Copyright 2016 The Kubernetes Authors. # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -85,7 +87,8 @@ class InClusterConfigLoader(object): def load_incluster_config(): - """Use the service account kubernetes gives to pods to connect to kubernetes + """ + Use the service account kubernetes gives to pods to connect to kubernetes cluster. It's intended for clients that expect to be running inside a pod running on kubernetes. It will raise an exception if called from a process not running in a kubernetes environment.""" diff --git a/config/incluster_config_test.py b/config/incluster_config_test.py index 622b31b37..3cb0abfc8 100644 --- a/config/incluster_config_test.py +++ b/config/incluster_config_test.py @@ -1,3 +1,5 @@ +#!/usr/bin/env python + # Copyright 2016 The Kubernetes Authors. # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/config/kube_config.py b/config/kube_config.py index 9f9dcf8a9..34826848d 100644 --- a/config/kube_config.py +++ b/config/kube_config.py @@ -1,4 +1,6 @@ -# Copyright 2016 The Kubernetes Authors. +#!/usr/bin/env python + +# Copyright 2018 The Kubernetes Authors. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,13 +16,15 @@ import atexit import base64 +import copy import datetime import json +import logging import os +import platform import tempfile import time -import adal import google.auth import google.auth.transport.requests import oauthlib.oauth2 @@ -30,12 +34,19 @@ from requests_oauthlib import OAuth2Session from six import PY3 from kubernetes.client import ApiClient, Configuration +from kubernetes.config.exec_provider import ExecProvider from .config_exception import ConfigException from .dateutil import UTC, format_rfc3339, parse_rfc3339 +try: + import adal +except ImportError: + pass + EXPIRY_SKEW_PREVENTION_DELAY = datetime.timedelta(minutes=5) KUBE_CONFIG_DEFAULT_LOCATION = os.environ.get('KUBECONFIG', '~/.kube/config') +ENV_KUBECONFIG_PATH_SEPARATOR = ';' if platform.system() == 'Windows' else ':' _temp_files = {} @@ -98,8 +109,12 @@ class FileOrData(object): use_data_if_no_file = not self._file and self._data if use_data_if_no_file: if self._base64_file_content: + if isinstance(self._data, str): + content = self._data.encode() + else: + content = self._data self._file = _create_temp_file_with_content( - base64.decodestring(self._data.encode())) + base64.standard_b64decode(content)) else: self._file = _create_temp_file_with_content(self._data) if self._file and not os.path.isfile(self._file): @@ -114,7 +129,7 @@ class FileOrData(object): with open(self._file) as f: if self._base64_file_content: self._data = bytes.decode( - base64.encodestring(str.encode(f.read()))) + base64.standard_b64encode(str.encode(f.read()))) else: self._data = f.read() return self._data @@ -126,7 +141,12 @@ class KubeConfigLoader(object): get_google_credentials=None, config_base_path="", config_persister=None): - self._config = ConfigNode('kube-config', config_dict) + + if isinstance(config_dict, ConfigNode): + self._config = config_dict + else: + self._config = ConfigNode('kube-config', config_dict) + self._current_context = None self._user = None self._cluster = None @@ -172,11 +192,10 @@ class KubeConfigLoader(object): section of kube-config and stops if it finds a valid authentication method. The order of authentication methods is: - 1. GCP auth-provider - 2. token_data - 3. token field (point to a token file) - 4. oidc auth-provider - 5. username/password + 1. auth-provider (gcp, azure, oidc) + 2. token field (point to a token file) + 3. exec provided plugin + 4. username/password """ if not self._user: return @@ -184,6 +203,8 @@ class KubeConfigLoader(object): return if self._load_user_token(): return + if self._load_from_exec_plugin(): + return self._load_user_pass_token() def _load_auth_provider_token(self): @@ -211,6 +232,9 @@ class KubeConfigLoader(object): return self.token def _refresh_azure_token(self, config): + if 'adal' not in globals(): + raise ImportError('refresh token error, adal library not imported') + tenant = config['tenant-id'] authority = 'https://login.microsoftonline.com/{}'.format(tenant) context = adal.AuthenticationContext( @@ -353,10 +377,24 @@ class KubeConfigLoader(object): provider['config'].value['id-token'] = refresh['id_token'] provider['config'].value['refresh-token'] = refresh['refresh_token'] + def _load_from_exec_plugin(self): + if 'exec' not in self._user: + return + try: + status = ExecProvider(self._user['exec']).run() + if 'token' not in status: + logging.error('exec: missing token field in plugin output') + return None + self.token = "Bearer %s" % status['token'] + return True + except Exception as e: + logging.error(str(e)) + def _load_user_token(self): + base_path = self._get_base_path(self._user.path) token = FileOrData( self._user, 'tokenFile', 'token', - file_base_path=self._config_base_path, + file_base_path=base_path, base64_file_content=False).as_data() if token: self.token = "Bearer %s" % token @@ -369,24 +407,48 @@ class KubeConfigLoader(object): self._user['password'])).get('authorization') return True + def _get_base_path(self, config_path): + if self._config_base_path is not None: + return self._config_base_path + if config_path is not None: + return os.path.abspath(os.path.dirname(config_path)) + return "" + def _load_cluster_info(self): if 'server' in self._cluster: - self.host = self._cluster['server'] + self.host = self._cluster['server'].rstrip('/') if self.host.startswith("https"): + base_path = self._get_base_path(self._cluster.path) self.ssl_ca_cert = FileOrData( self._cluster, 'certificate-authority', - file_base_path=self._config_base_path).as_file() + file_base_path=base_path).as_file() self.cert_file = FileOrData( self._user, 'client-certificate', - file_base_path=self._config_base_path).as_file() + file_base_path=base_path).as_file() self.key_file = FileOrData( self._user, 'client-key', - file_base_path=self._config_base_path).as_file() + file_base_path=base_path).as_file() if 'insecure-skip-tls-verify' in self._cluster: self.verify_ssl = not self._cluster['insecure-skip-tls-verify'] + def _using_gcp_auth_provider(self): + return self._user and \ + 'auth-provider' in self._user and \ + 'name' in self._user['auth-provider'] and \ + self._user['auth-provider']['name'] == 'gcp' + def _set_config(self, client_configuration): + if self._using_gcp_auth_provider(): + # GCP auth tokens must be refreshed regularly, but swagger expects + # a constant token. Replace the swagger-generated client config's + # get_api_key_with_prefix method with our own to allow automatic + # token refresh. + def _gcp_get_api_key(*args): + return self._load_gcp_token(self._user['auth-provider']) + client_configuration.get_api_key_with_prefix = _gcp_get_api_key if 'token' in self.__dict__: + # Note: this line runs for GCP auth tokens as well, but this entry + # will not be updated upon GCP token refresh. client_configuration.api_key['authorization'] = self.token # copy these keys directly from self to configuration object keys = ['host', 'ssl_ca_cert', 'cert_file', 'key_file', 'verify_ssl'] @@ -412,9 +474,10 @@ class ConfigNode(object): message in case of missing keys. The assumption is all access keys are present in a well-formed kube-config.""" - def __init__(self, name, value): + def __init__(self, name, value, path=None): self.name = name self.value = value + self.path = path def __contains__(self, key): return key in self.value @@ -434,7 +497,7 @@ class ConfigNode(object): 'Invalid kube-config file. Expected key %s in %s' % (key, self.name)) if isinstance(v, dict) or isinstance(v, list): - return ConfigNode('%s/%s' % (self.name, key), v) + return ConfigNode('%s/%s' % (self.name, key), v, self.path) else: return v @@ -459,7 +522,12 @@ class ConfigNode(object): 'Expected only one object with name %s in %s list' % (name, self.name)) if result is not None: - return ConfigNode('%s[name=%s]' % (self.name, name), result) + if isinstance(result, ConfigNode): + return result + else: + return ConfigNode( + '%s[name=%s]' % + (self.name, name), result, self.path) if safe: return None raise ConfigException( @@ -467,18 +535,87 @@ class ConfigNode(object): 'Expected object with name %s in %s list' % (name, self.name)) -def _get_kube_config_loader_for_yaml_file(filename, **kwargs): - with open(filename) as f: - return KubeConfigLoader( - config_dict=yaml.load(f), - config_base_path=os.path.abspath(os.path.dirname(filename)), - **kwargs) +class KubeConfigMerger: + + """Reads and merges configuration from one or more kube-config's. + The propery `config` can be passed to the KubeConfigLoader as config_dict. + + It uses a path attribute from ConfigNode to store the path to kubeconfig. + This path is required to load certs from relative paths. + + A method `save_changes` updates changed kubeconfig's (it compares current + state of dicts with). + """ + + def __init__(self, paths): + self.paths = [] + self.config_files = {} + self.config_merged = None + + for path in paths.split(ENV_KUBECONFIG_PATH_SEPARATOR): + if path: + path = os.path.expanduser(path) + if os.path.exists(path): + self.paths.append(path) + self.load_config(path) + self.config_saved = copy.deepcopy(self.config_files) + + @property + def config(self): + return self.config_merged + + def load_config(self, path): + with open(path) as f: + config = yaml.safe_load(f) + + if self.config_merged is None: + config_merged = copy.deepcopy(config) + for item in ('clusters', 'contexts', 'users'): + config_merged[item] = [] + self.config_merged = ConfigNode(path, config_merged, path) + + for item in ('clusters', 'contexts', 'users'): + self._merge(item, config[item], path) + self.config_files[path] = config + + def _merge(self, item, add_cfg, path): + for new_item in add_cfg: + for exists in self.config_merged.value[item]: + if exists['name'] == new_item['name']: + break + else: + self.config_merged.value[item].append(ConfigNode( + '{}/{}'.format(path, new_item), new_item, path)) + + def save_changes(self): + for path in self.paths: + if self.config_saved[path] != self.config_files[path]: + self.save_config(path) + self.config_saved = copy.deepcopy(self.config_files) + + def save_config(self, path): + with open(path, 'w') as f: + yaml.safe_dump(self.config_files[path], f, + default_flow_style=False) + + +def _get_kube_config_loader_for_yaml_file( + filename, persist_config=False, **kwargs): + + kcfg = KubeConfigMerger(filename) + if persist_config and 'config_persister' not in kwargs: + kwargs['config_persister'] = kcfg.save_changes() + + return KubeConfigLoader( + config_dict=kcfg.config, + config_base_path=None, + **kwargs) def list_kube_config_contexts(config_file=None): if config_file is None: - config_file = os.path.expanduser(KUBE_CONFIG_DEFAULT_LOCATION) + config_file = KUBE_CONFIG_DEFAULT_LOCATION loader = _get_kube_config_loader_for_yaml_file(config_file) return loader.list_contexts(), loader.current_context @@ -500,18 +637,12 @@ def load_kube_config(config_file=None, context=None, """ if config_file is None: - config_file = os.path.expanduser(KUBE_CONFIG_DEFAULT_LOCATION) - - config_persister = None - if persist_config: - def _save_kube_config(config_map): - with open(config_file, 'w') as f: - yaml.safe_dump(config_map, f, default_flow_style=False) - config_persister = _save_kube_config + config_file = KUBE_CONFIG_DEFAULT_LOCATION loader = _get_kube_config_loader_for_yaml_file( config_file, active_context=context, - config_persister=config_persister) + persist_config=persist_config) + if client_configuration is None: config = type.__call__(Configuration) loader.load_and_set(config) @@ -524,9 +655,11 @@ def new_client_from_config( config_file=None, context=None, persist_config=True): - """Loads configuration the same as load_kube_config but returns an ApiClient + """ + Loads configuration the same as load_kube_config but returns an ApiClient to be used with any API object. This will allow the caller to concurrently - talk with multiple clusters.""" + talk with multiple clusters. + """ client_config = type.__call__(Configuration) load_kube_config(config_file=config_file, context=context, client_configuration=client_config, diff --git a/config/kube_config_test.py b/config/kube_config_test.py index 4ddc6f35b..d89a2a50a 100644 --- a/config/kube_config_test.py +++ b/config/kube_config_test.py @@ -1,4 +1,6 @@ -# Copyright 2016 The Kubernetes Authors. +#!/usr/bin/env python + +# Copyright 2018 The Kubernetes Authors. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -24,8 +26,11 @@ import mock import yaml from six import PY3, next +from kubernetes.client import Configuration + from .config_exception import ConfigException -from .kube_config import (ConfigNode, FileOrData, KubeConfigLoader, +from .kube_config import (ENV_KUBECONFIG_PATH_SEPARATOR, ConfigNode, + FileOrData, KubeConfigLoader, KubeConfigMerger, _cleanup_temp_files, _create_temp_file_with_content, list_kube_config_contexts, load_kube_config, new_client_from_config) @@ -34,13 +39,15 @@ BEARER_TOKEN_FORMAT = "Bearer %s" EXPIRY_DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ" # should be less than kube_config.EXPIRY_SKEW_PREVENTION_DELAY -EXPIRY_TIMEDELTA = 2 +PAST_EXPIRY_TIMEDELTA = 2 +# should be more than kube_config.EXPIRY_SKEW_PREVENTION_DELAY +FUTURE_EXPIRY_TIMEDELTA = 60 NON_EXISTING_FILE = "zz_non_existing_file_472398324" def _base64(string): - return base64.encodestring(string.encode()).decode() + return base64.standard_b64encode(string.encode()).decode() def _urlsafe_unpadded_b64encode(string): @@ -51,9 +58,9 @@ def _format_expiry_datetime(dt): return dt.strftime(EXPIRY_DATETIME_FORMAT) -def _get_expiry(loader): +def _get_expiry(loader, active_context): expired_gcp_conf = (item for item in loader._config.value.get("users") - if item.get("name") == "expired_gcp") + if item.get("name") == active_context) return next(expired_gcp_conf).get("user").get("auth-provider") \ .get("config").get("expiry") @@ -77,8 +84,11 @@ TEST_USERNAME = "me" TEST_PASSWORD = "pass" # token for me:pass TEST_BASIC_TOKEN = "Basic bWU6cGFzcw==" -TEST_TOKEN_EXPIRY = _format_expiry_datetime( - datetime.datetime.utcnow() - datetime.timedelta(minutes=EXPIRY_TIMEDELTA)) +DATETIME_EXPIRY_PAST = datetime.datetime.utcnow( +) - datetime.timedelta(minutes=PAST_EXPIRY_TIMEDELTA) +DATETIME_EXPIRY_FUTURE = datetime.datetime.utcnow( +) + datetime.timedelta(minutes=FUTURE_EXPIRY_TIMEDELTA) +TEST_TOKEN_EXPIRY_PAST = _format_expiry_datetime(DATETIME_EXPIRY_PAST) TEST_SSL_HOST = "https://test-host" TEST_CERTIFICATE_AUTH = "cert-auth" @@ -226,6 +236,18 @@ class TestFileOrData(BaseTestCase): _create_temp_file_with_content(TEST_DATA))) _cleanup_temp_files() + def test_file_given_data_bytes(self): + obj = {TEST_DATA_KEY: TEST_DATA_BASE64.encode()} + t = FileOrData(obj=obj, file_key_name=TEST_FILE_KEY, + data_key_name=TEST_DATA_KEY) + self.assertEqual(TEST_DATA, self.get_file_content(t.as_file())) + + def test_file_given_data_bytes_no_base64(self): + obj = {TEST_DATA_KEY: TEST_DATA.encode()} + t = FileOrData(obj=obj, file_key_name=TEST_FILE_KEY, + data_key_name=TEST_DATA_KEY, base64_file_content=False) + self.assertEqual(TEST_DATA, self.get_file_content(t.as_file())) + class TestConfigNode(BaseTestCase): @@ -384,6 +406,13 @@ class TestKubeConfigLoader(BaseTestCase): "user": "expired_gcp" } }, + { + "name": "expired_gcp_refresh", + "context": { + "cluster": "default", + "user": "expired_gcp_refresh" + } + }, { "name": "oidc", "context": { @@ -463,6 +492,13 @@ class TestKubeConfigLoader(BaseTestCase): "user": "non_existing_user" } }, + { + "name": "exec_cred_user", + "context": { + "cluster": "default", + "user": "exec_cred_user" + } + }, ], "clusters": [ { @@ -531,7 +567,24 @@ class TestKubeConfigLoader(BaseTestCase): "name": "gcp", "config": { "access-token": TEST_DATA_BASE64, - "expiry": TEST_TOKEN_EXPIRY, # always in past + "expiry": TEST_TOKEN_EXPIRY_PAST, # always in past + } + }, + "token": TEST_DATA_BASE64, # should be ignored + "username": TEST_USERNAME, # should be ignored + "password": TEST_PASSWORD, # should be ignored + } + }, + # Duplicated from "expired_gcp" so test_load_gcp_token_with_refresh + # is isolated from test_gcp_get_api_key_with_prefix. + { + "name": "expired_gcp_refresh", + "user": { + "auth-provider": { + "name": "gcp", + "config": { + "access-token": TEST_DATA_BASE64, + "expiry": TEST_TOKEN_EXPIRY_PAST, # always in past } }, "token": TEST_DATA_BASE64, # should be ignored @@ -646,6 +699,16 @@ class TestKubeConfigLoader(BaseTestCase): "client-key-data": TEST_CLIENT_KEY_BASE64, } }, + { + "name": "exec_cred_user", + "user": { + "exec": { + "apiVersion": "client.authentication.k8s.io/v1beta1", + "command": "aws-iam-authenticator", + "args": ["token", "-i", "dummy-cluster"] + } + } + }, ] } @@ -674,16 +737,20 @@ class TestKubeConfigLoader(BaseTestCase): self.assertEqual(BEARER_TOKEN_FORMAT % TEST_DATA_BASE64, loader.token) def test_gcp_no_refresh(self): - expected = FakeConfig( - host=TEST_HOST, - token=BEARER_TOKEN_FORMAT % TEST_DATA_BASE64) - actual = FakeConfig() + fake_config = FakeConfig() + # swagger-generated config has this, but FakeConfig does not. + self.assertFalse(hasattr(fake_config, 'get_api_key_with_prefix')) KubeConfigLoader( config_dict=self.TEST_KUBE_CONFIG, active_context="gcp", get_google_credentials=lambda: _raise_exception( - "SHOULD NOT BE CALLED")).load_and_set(actual) - self.assertEqual(expected, actual) + "SHOULD NOT BE CALLED")).load_and_set(fake_config) + # Should now be populated with a gcp token fetcher. + self.assertIsNotNone(fake_config.get_api_key_with_prefix) + self.assertEqual(TEST_HOST, fake_config.host) + # For backwards compatibility, authorization field should still be set. + self.assertEqual(BEARER_TOKEN_FORMAT % TEST_DATA_BASE64, + fake_config.api_key['authorization']) def test_load_gcp_token_no_refresh(self): loader = KubeConfigLoader( @@ -698,20 +765,48 @@ class TestKubeConfigLoader(BaseTestCase): def test_load_gcp_token_with_refresh(self): def cred(): return None cred.token = TEST_ANOTHER_DATA_BASE64 - cred.expiry = datetime.datetime.now() + cred.expiry = datetime.datetime.utcnow() loader = KubeConfigLoader( config_dict=self.TEST_KUBE_CONFIG, active_context="expired_gcp", get_google_credentials=lambda: cred) - original_expiry = _get_expiry(loader) + original_expiry = _get_expiry(loader, "expired_gcp") self.assertTrue(loader._load_auth_provider_token()) - new_expiry = _get_expiry(loader) + new_expiry = _get_expiry(loader, "expired_gcp") # assert that the configs expiry actually updates self.assertTrue(new_expiry > original_expiry) self.assertEqual(BEARER_TOKEN_FORMAT % TEST_ANOTHER_DATA_BASE64, loader.token) + def test_gcp_get_api_key_with_prefix(self): + class cred_old: + token = TEST_DATA_BASE64 + expiry = DATETIME_EXPIRY_PAST + + class cred_new: + token = TEST_ANOTHER_DATA_BASE64 + expiry = DATETIME_EXPIRY_FUTURE + fake_config = FakeConfig() + _get_google_credentials = mock.Mock() + _get_google_credentials.side_effect = [cred_old, cred_new] + + loader = KubeConfigLoader( + config_dict=self.TEST_KUBE_CONFIG, + active_context="expired_gcp_refresh", + get_google_credentials=_get_google_credentials) + loader.load_and_set(fake_config) + original_expiry = _get_expiry(loader, "expired_gcp_refresh") + # Call GCP token fetcher. + token = fake_config.get_api_key_with_prefix() + new_expiry = _get_expiry(loader, "expired_gcp_refresh") + + self.assertTrue(new_expiry > original_expiry) + self.assertEqual(BEARER_TOKEN_FORMAT % TEST_ANOTHER_DATA_BASE64, + loader.token) + self.assertEqual(BEARER_TOKEN_FORMAT % TEST_ANOTHER_DATA_BASE64, + token) + def test_oidc_no_refresh(self): loader = KubeConfigLoader( config_dict=self.TEST_KUBE_CONFIG, @@ -897,14 +992,16 @@ class TestKubeConfigLoader(BaseTestCase): def test_load_kube_config(self): expected = FakeConfig(host=TEST_HOST, token=BEARER_TOKEN_FORMAT % TEST_DATA_BASE64) - config_file = self._create_temp_file(yaml.dump(self.TEST_KUBE_CONFIG)) + config_file = self._create_temp_file( + yaml.safe_dump(self.TEST_KUBE_CONFIG)) actual = FakeConfig() load_kube_config(config_file=config_file, context="simple_token", client_configuration=actual) self.assertEqual(expected, actual) def test_list_kube_config_contexts(self): - config_file = self._create_temp_file(yaml.dump(self.TEST_KUBE_CONFIG)) + config_file = self._create_temp_file( + yaml.safe_dump(self.TEST_KUBE_CONFIG)) contexts, active_context = list_kube_config_contexts( config_file=config_file) self.assertDictEqual(self.TEST_KUBE_CONFIG['contexts'][0], @@ -917,7 +1014,8 @@ class TestKubeConfigLoader(BaseTestCase): contexts) def test_new_client_from_config(self): - config_file = self._create_temp_file(yaml.dump(self.TEST_KUBE_CONFIG)) + config_file = self._create_temp_file( + yaml.safe_dump(self.TEST_KUBE_CONFIG)) client = new_client_from_config( config_file=config_file, context="simple_token") self.assertEqual(TEST_HOST, client.configuration.host) @@ -942,6 +1040,210 @@ class TestKubeConfigLoader(BaseTestCase): active_context="non_existing_user").load_and_set(actual) self.assertEqual(expected, actual) + @mock.patch('kubernetes.config.kube_config.ExecProvider.run') + def test_user_exec_auth(self, mock): + token = "dummy" + mock.return_value = { + "token": token + } + expected = FakeConfig(host=TEST_HOST, api_key={ + "authorization": BEARER_TOKEN_FORMAT % token}) + actual = FakeConfig() + KubeConfigLoader( + config_dict=self.TEST_KUBE_CONFIG, + active_context="exec_cred_user").load_and_set(actual) + self.assertEqual(expected, actual) + + +class TestKubernetesClientConfiguration(BaseTestCase): + # Verifies properties of kubernetes.client.Configuration. + # These tests guard against changes to the upstream configuration class, + # since GCP authorization overrides get_api_key_with_prefix to refresh its + # token regularly. + + def test_get_api_key_with_prefix_exists(self): + self.assertTrue(hasattr(Configuration, 'get_api_key_with_prefix')) + + def test_get_api_key_with_prefix_returns_token(self): + expected_token = 'expected_token' + config = Configuration() + config.api_key['authorization'] = expected_token + self.assertEqual(expected_token, + config.get_api_key_with_prefix('authorization')) + + def test_auth_settings_calls_get_api_key_with_prefix(self): + expected_token = 'expected_token' + + def fake_get_api_key_with_prefix(identifier): + self.assertEqual('authorization', identifier) + return expected_token + config = Configuration() + config.get_api_key_with_prefix = fake_get_api_key_with_prefix + self.assertEqual(expected_token, + config.auth_settings()['BearerToken']['value']) + + +class TestKubeConfigMerger(BaseTestCase): + TEST_KUBE_CONFIG_PART1 = { + "current-context": "no_user", + "contexts": [ + { + "name": "no_user", + "context": { + "cluster": "default" + } + }, + ], + "clusters": [ + { + "name": "default", + "cluster": { + "server": TEST_HOST + } + }, + ], + "users": [] + } + + TEST_KUBE_CONFIG_PART2 = { + "current-context": "", + "contexts": [ + { + "name": "ssl", + "context": { + "cluster": "ssl", + "user": "ssl" + } + }, + { + "name": "simple_token", + "context": { + "cluster": "default", + "user": "simple_token" + } + }, + ], + "clusters": [ + { + "name": "ssl", + "cluster": { + "server": TEST_SSL_HOST, + "certificate-authority-data": + TEST_CERTIFICATE_AUTH_BASE64, + } + }, + ], + "users": [ + { + "name": "ssl", + "user": { + "token": TEST_DATA_BASE64, + "client-certificate-data": TEST_CLIENT_CERT_BASE64, + "client-key-data": TEST_CLIENT_KEY_BASE64, + } + }, + ] + } + + TEST_KUBE_CONFIG_PART3 = { + "current-context": "no_user", + "contexts": [ + { + "name": "expired_oidc", + "context": { + "cluster": "default", + "user": "expired_oidc" + } + }, + { + "name": "ssl", + "context": { + "cluster": "skipped-part2-defined-this-context", + "user": "skipped" + } + }, + ], + "clusters": [ + ], + "users": [ + { + "name": "expired_oidc", + "user": { + "auth-provider": { + "name": "oidc", + "config": { + "client-id": "tectonic-kubectl", + "client-secret": "FAKE_SECRET", + "id-token": TEST_OIDC_EXPIRED_LOGIN, + "idp-certificate-authority-data": TEST_OIDC_CA, + "idp-issuer-url": "https://example.org/identity", + "refresh-token": + "lucWJjEhlxZW01cXI3YmVlcYnpxNGhzk" + } + } + } + }, + { + "name": "simple_token", + "user": { + "token": TEST_DATA_BASE64, + "username": TEST_USERNAME, # should be ignored + "password": TEST_PASSWORD, # should be ignored + } + }, + ] + } + + def _create_multi_config(self): + files = [] + for part in ( + self.TEST_KUBE_CONFIG_PART1, + self.TEST_KUBE_CONFIG_PART2, + self.TEST_KUBE_CONFIG_PART3): + files.append(self._create_temp_file(yaml.safe_dump(part))) + return ENV_KUBECONFIG_PATH_SEPARATOR.join(files) + + def test_list_kube_config_contexts(self): + kubeconfigs = self._create_multi_config() + expected_contexts = [ + {'context': {'cluster': 'default'}, 'name': 'no_user'}, + {'context': {'cluster': 'ssl', 'user': 'ssl'}, 'name': 'ssl'}, + {'context': {'cluster': 'default', 'user': 'simple_token'}, + 'name': 'simple_token'}, + {'context': {'cluster': 'default', 'user': 'expired_oidc'}, 'name': 'expired_oidc'}] + + contexts, active_context = list_kube_config_contexts( + config_file=kubeconfigs) + + self.assertEqual(contexts, expected_contexts) + self.assertEqual(active_context, expected_contexts[0]) + + def test_new_client_from_config(self): + kubeconfigs = self._create_multi_config() + client = new_client_from_config( + config_file=kubeconfigs, context="simple_token") + self.assertEqual(TEST_HOST, client.configuration.host) + self.assertEqual(BEARER_TOKEN_FORMAT % TEST_DATA_BASE64, + client.configuration.api_key['authorization']) + + def test_save_changes(self): + kubeconfigs = self._create_multi_config() + + # load configuration, update token, save config + kconf = KubeConfigMerger(kubeconfigs) + user = kconf.config['users'].get_with_name('expired_oidc')['user'] + provider = user['auth-provider']['config'] + provider.value['id-token'] = "token-changed" + kconf.save_changes() + + # re-read configuration + kconf = KubeConfigMerger(kubeconfigs) + user = kconf.config['users'].get_with_name('expired_oidc')['user'] + provider = user['auth-provider']['config'] + + # new token + self.assertEqual(provider.value['id-token'], "token-changed") + if __name__ == '__main__': unittest.main() diff --git a/hack/boilerplate/boilerplate.py b/hack/boilerplate/boilerplate.py new file mode 100755 index 000000000..61d4cb947 --- /dev/null +++ b/hack/boilerplate/boilerplate.py @@ -0,0 +1,201 @@ +#!/usr/bin/env python + +# Copyright 2018 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function + +import argparse +import datetime +import difflib +import glob +import os +import re +import sys + +parser = argparse.ArgumentParser() +parser.add_argument( + "filenames", + help="list of files to check, all files if unspecified", + nargs='*') + +rootdir = os.path.dirname(__file__) + "/../../" +rootdir = os.path.abspath(rootdir) +parser.add_argument( + "--rootdir", default=rootdir, help="root directory to examine") + +default_boilerplate_dir = os.path.join(rootdir, "hack/boilerplate") +parser.add_argument( + "--boilerplate-dir", default=default_boilerplate_dir) + +parser.add_argument( + "-v", "--verbose", + help="give verbose output regarding why a file does not pass", + action="store_true") + +args = parser.parse_args() + +verbose_out = sys.stderr if args.verbose else open("/dev/null", "w") + + +def get_refs(): + refs = {} + + for path in glob.glob(os.path.join( + args.boilerplate_dir, "boilerplate.*.txt")): + extension = os.path.basename(path).split(".")[1] + + ref_file = open(path, 'r') + ref = ref_file.read().splitlines() + ref_file.close() + refs[extension] = ref + + return refs + + +def file_passes(filename, refs, regexs): + try: + f = open(filename, 'r') + except Exception as exc: + print("Unable to open %s: %s" % (filename, exc), file=verbose_out) + return False + + data = f.read() + f.close() + + basename = os.path.basename(filename) + extension = file_extension(filename) + + if extension != "": + ref = refs[extension] + else: + ref = refs[basename] + + # remove extra content from the top of files + if extension == "sh": + p = regexs["shebang"] + (data, found) = p.subn("", data, 1) + + data = data.splitlines() + + # if our test file is smaller than the reference it surely fails! + if len(ref) > len(data): + print('File %s smaller than reference (%d < %d)' % + (filename, len(data), len(ref)), + file=verbose_out) + return False + + # trim our file to the same number of lines as the reference file + data = data[:len(ref)] + + p = regexs["year"] + for d in data: + if p.search(d): + print('File %s has the YEAR field, but missing the year of date' % + filename, file=verbose_out) + return False + + # Replace all occurrences of regex "2014|2015|2016|2017|2018" with "YEAR" + p = regexs["date"] + for i, d in enumerate(data): + (data[i], found) = p.subn('YEAR', d) + if found != 0: + break + + # if we don't match the reference at this point, fail + if ref != data: + print("Header in %s does not match reference, diff:" % + filename, file=verbose_out) + if args.verbose: + print(file=verbose_out) + for line in difflib.unified_diff( + ref, data, 'reference', filename, lineterm=''): + print(line, file=verbose_out) + print(file=verbose_out) + return False + + return True + + +def file_extension(filename): + return os.path.splitext(filename)[1].split(".")[-1].lower() + + +# list all the files contain 'DO NOT EDIT', but are not generated +skipped_ungenerated_files = ['hack/boilerplate/boilerplate.py'] + + +def normalize_files(files): + newfiles = [] + for pathname in files: + newfiles.append(pathname) + for i, pathname in enumerate(newfiles): + if not os.path.isabs(pathname): + newfiles[i] = os.path.join(args.rootdir, pathname) + return newfiles + + +def get_files(extensions): + files = [] + if len(args.filenames) > 0: + files = args.filenames + else: + for root, dirs, walkfiles in os.walk(args.rootdir): + for name in walkfiles: + pathname = os.path.join(root, name) + files.append(pathname) + + files = normalize_files(files) + outfiles = [] + for pathname in files: + basename = os.path.basename(pathname) + extension = file_extension(pathname) + if extension in extensions or basename in extensions: + outfiles.append(pathname) + return outfiles + + +def get_dates(): + years = datetime.datetime.now().year + return '(%s)' % '|'.join((str(year) for year in range(2014, years+1))) + + +def get_regexs(): + regexs = {} + # Search for "YEAR" which exists in the boilerplate, + # but shouldn't in the real thing + regexs["year"] = re.compile('YEAR') + # get_dates return 2014, 2015, 2016, 2017, or 2018 until the current year + # as a regex like: "(2014|2015|2016|2017|2018)"; + # company holder names can be anything + regexs["date"] = re.compile(get_dates()) + # strip #!.* from shell scripts + regexs["shebang"] = re.compile(r"^(#!.*\n)\n*", re.MULTILINE) + return regexs + + +def main(): + regexs = get_regexs() + refs = get_refs() + filenames = get_files(refs.keys()) + + for filename in filenames: + if not file_passes(filename, refs, regexs): + print(filename, file=sys.stdout) + + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/hack/boilerplate/boilerplate.py.txt b/hack/boilerplate/boilerplate.py.txt new file mode 100644 index 000000000..d781daf9e --- /dev/null +++ b/hack/boilerplate/boilerplate.py.txt @@ -0,0 +1,15 @@ +#!/usr/bin/env python + +# Copyright YEAR The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/hack/boilerplate/boilerplate.sh.txt b/hack/boilerplate/boilerplate.sh.txt new file mode 100644 index 000000000..34cb349c4 --- /dev/null +++ b/hack/boilerplate/boilerplate.sh.txt @@ -0,0 +1,13 @@ +# Copyright YEAR The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/hack/verify-boilerplate.sh b/hack/verify-boilerplate.sh new file mode 100755 index 000000000..2f54c8cc3 --- /dev/null +++ b/hack/verify-boilerplate.sh @@ -0,0 +1,35 @@ +#!/usr/bin/env bash + +# Copyright 2018 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -o errexit +set -o nounset +set -o pipefail + +KUBE_ROOT=$(dirname "${BASH_SOURCE}")/.. + +boilerDir="${KUBE_ROOT}/hack/boilerplate" +boiler="${boilerDir}/boilerplate.py" + +files_need_boilerplate=($(${boiler} "$@")) + +# Run boilerplate check +if [[ ${#files_need_boilerplate[@]} -gt 0 ]]; then + for file in "${files_need_boilerplate[@]}"; do + echo "Boilerplate header is wrong for: ${file}" >&2 + done + + exit 1 +fi diff --git a/run_tox.sh b/run_tox.sh index 557337855..4b5839248 100755 --- a/run_tox.sh +++ b/run_tox.sh @@ -11,7 +11,7 @@ # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and +# See the License for the specific language governing permissions and # limitations under the License. set -o errexit @@ -51,4 +51,3 @@ git status echo "Running tox from the main repo on $TOXENV environment" # Run the user-provided command. "${@}" - diff --git a/stream/__init__.py b/stream/__init__.py index e72d05836..e9b7d24ff 100644 --- a/stream/__init__.py +++ b/stream/__init__.py @@ -1,3 +1,5 @@ +#!/usr/bin/env python + # Copyright 2017 The Kubernetes Authors. # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/stream/stream.py b/stream/stream.py index 0412fc338..3eab0b9ab 100644 --- a/stream/stream.py +++ b/stream/stream.py @@ -1,14 +1,18 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at +#!/usr/bin/env python + +# Copyright 2018 The Kubernetes Authors. # -# http://www.apache.org/licenses/LICENSE-2.0 +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. from . import ws_client diff --git a/stream/ws_client.py b/stream/ws_client.py index 1cc56cddc..cf8a3fe99 100644 --- a/stream/ws_client.py +++ b/stream/ws_client.py @@ -1,14 +1,18 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at +#!/usr/bin/env python + +# Copyright 2018 The Kubernetes Authors. # -# http://www.apache.org/licenses/LICENSE-2.0 +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. from kubernetes.client.rest import ApiException @@ -49,7 +53,8 @@ class WSClient: header.append("authorization: %s" % headers['authorization']) if headers and 'sec-websocket-protocol' in headers: - header.append("sec-websocket-protocol: %s" % headers['sec-websocket-protocol']) + header.append("sec-websocket-protocol: %s" % + headers['sec-websocket-protocol']) else: header.append("sec-websocket-protocol: v4.channel.k8s.io") @@ -182,8 +187,8 @@ class WSClient: data = data[1:] if data: if channel in [STDOUT_CHANNEL, STDERR_CHANNEL]: - # keeping all messages in the order they received for - # non-blocking call. + # keeping all messages in the order they received + # for non-blocking call. self._all += data if channel not in self._channels: self._channels[channel] = data diff --git a/stream/ws_client_test.py b/stream/ws_client_test.py index e2eca96cc..756d95978 100644 --- a/stream/ws_client_test.py +++ b/stream/ws_client_test.py @@ -1,4 +1,6 @@ -# Copyright 2017 The Kubernetes Authors. +#!/usr/bin/env python + +# Copyright 2018 The Kubernetes Authors. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/tox.ini b/tox.ini index f36f34786..f935a6cd2 100644 --- a/tox.ini +++ b/tox.ini @@ -1,6 +1,6 @@ [tox] skipsdist = True -envlist = py27, py34, py35, py36 +envlist = py27, py34, py35, py36, py37 [testenv] passenv = TOXENV CI TRAVIS TRAVIS_* diff --git a/watch/__init__.py b/watch/__init__.py index ca9ac0698..46a31ceda 100644 --- a/watch/__init__.py +++ b/watch/__init__.py @@ -1,3 +1,5 @@ +#!/usr/bin/env python + # Copyright 2016 The Kubernetes Authors. # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/watch/watch.py b/watch/watch.py index 21899dd80..5966eaceb 100644 --- a/watch/watch.py +++ b/watch/watch.py @@ -1,3 +1,5 @@ +#!/usr/bin/env python + # Copyright 2016 The Kubernetes Authors. # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -18,6 +20,7 @@ import pydoc from kubernetes import client PYDOC_RETURN_LABEL = ":return:" +PYDOC_FOLLOW_PARAM = ":param bool follow:" # Removing this suffix from return type name should give us event's object # type. e.g., if list_namespaces() returns "NamespaceList" type, @@ -63,7 +66,7 @@ class Watch(object): self._raw_return_type = return_type self._stop = False self._api_client = client.ApiClient() - self.resource_version = 0 + self.resource_version = None def stop(self): self._stop = True @@ -76,8 +79,17 @@ class Watch(object): return return_type[:-len(TYPE_LIST_SUFFIX)] return return_type + def get_watch_argument_name(self, func): + if PYDOC_FOLLOW_PARAM in pydoc.getdoc(func): + return 'follow' + else: + return 'watch' + def unmarshal_event(self, data, return_type): - js = json.loads(data) + try: + js = json.loads(data) + except ValueError: + return data js['raw_object'] = js['object'] if return_type: obj = SimpleNamespace(data=json.dumps(js['raw_object'])) @@ -120,8 +132,10 @@ class Watch(object): self._stop = False return_type = self.get_return_type(func) - kwargs['watch'] = True + kwargs[self.get_watch_argument_name(func)] = True kwargs['_preload_content'] = False + if 'resource_version' in kwargs: + self.resource_version = kwargs['resource_version'] timeouts = ('timeout_seconds' in kwargs) while True: @@ -132,9 +146,12 @@ class Watch(object): if self._stop: break finally: - kwargs['resource_version'] = self.resource_version resp.close() resp.release_conn() + if self.resource_version is not None: + kwargs['resource_version'] = self.resource_version + else: + self._stop = True if timeouts or self._stop: break diff --git a/watch/watch_test.py b/watch/watch_test.py index d1ec80a1c..ebc400af4 100644 --- a/watch/watch_test.py +++ b/watch/watch_test.py @@ -1,3 +1,5 @@ +#!/usr/bin/env python + # Copyright 2016 The Kubernetes Authors. # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -14,12 +16,15 @@ import unittest -from mock import Mock +from mock import Mock, call from .watch import Watch class WatchTests(unittest.TestCase): + def setUp(self): + # counter for a test that needs test global state + self.callcount = 0 def test_watch_with_decode(self): fake_resp = Mock() @@ -62,6 +67,103 @@ class WatchTests(unittest.TestCase): fake_resp.close.assert_called_once() fake_resp.release_conn.assert_called_once() + def test_watch_for_follow(self): + fake_resp = Mock() + fake_resp.close = Mock() + fake_resp.release_conn = Mock() + fake_resp.read_chunked = Mock( + return_value=[ + 'log_line_1\n', + 'log_line_2\n']) + + fake_api = Mock() + fake_api.read_namespaced_pod_log = Mock(return_value=fake_resp) + fake_api.read_namespaced_pod_log.__doc__ = ':param bool follow:\n:return: str' + + w = Watch() + count = 1 + for e in w.stream(fake_api.read_namespaced_pod_log): + self.assertEqual("log_line_1", e) + count += 1 + # make sure we can stop the watch and the last event with won't be + # returned + if count == 2: + w.stop() + + fake_api.read_namespaced_pod_log.assert_called_once_with( + _preload_content=False, follow=True) + fake_resp.read_chunked.assert_called_once_with(decode_content=False) + fake_resp.close.assert_called_once() + fake_resp.release_conn.assert_called_once() + + def test_watch_resource_version_set(self): + # https://github.com/kubernetes-client/python/issues/700 + # ensure watching from a resource version does reset to resource + # version 0 after k8s resets the watch connection + fake_resp = Mock() + fake_resp.close = Mock() + fake_resp.release_conn = Mock() + values = [ + '{"type": "ADDED", "object": {"metadata": {"name": "test1",' + '"resourceVersion": "1"}, "spec": {}, "status": {}}}\n', + '{"type": "ADDED", "object": {"metadata": {"name": "test2",' + '"resourceVersion": "2"}, "spec": {}, "sta', + 'tus": {}}}\n' + '{"type": "ADDED", "object": {"metadata": {"name": "test3",' + '"resourceVersion": "3"}, "spec": {}, "status": {}}}\n' + ] + # return nothing on the first call and values on the second + # this emulates a watch from a rv that returns nothing in the first k8s + # watch reset and values later + + def get_values(*args, **kwargs): + self.callcount += 1 + if self.callcount == 1: + return [] + else: + return values + + fake_resp.read_chunked = Mock( + side_effect=get_values) + + fake_api = Mock() + fake_api.get_namespaces = Mock(return_value=fake_resp) + fake_api.get_namespaces.__doc__ = ':return: V1NamespaceList' + + w = Watch() + # ensure we keep our requested resource version or the version latest + # returned version when the existing versions are older than the + # requested version + # needed for the list existing objects, then watch from there use case + calls = [] + + iterations = 2 + # first two calls must use the passed rv, the first call is a + # "reset" and does not actually return anything + # the second call must use the same rv but will return values + # (with a wrong rv but a real cluster would behave correctly) + # calls following that will use the rv from those returned values + calls.append(call(_preload_content=False, watch=True, + resource_version="5")) + calls.append(call(_preload_content=False, watch=True, + resource_version="5")) + for i in range(iterations): + # ideally we want 5 here but as rv must be treated as an + # opaque value we cannot interpret it and order it so rely + # on k8s returning the events completely and in order + calls.append(call(_preload_content=False, watch=True, + resource_version="3")) + + for c, e in enumerate(w.stream(fake_api.get_namespaces, + resource_version="5")): + if c == len(values) * iterations: + w.stop() + + # check calls are in the list, gives good error output + fake_api.get_namespaces.assert_has_calls(calls) + # more strict test with worse error message + self.assertEqual(fake_api.get_namespaces.mock_calls, calls) + def test_watch_stream_twice(self): w = Watch(float) for step in ['first', 'second']: