Merge pull request #120 from dims/support-exec-calls
Implementation for /exec using websocket
This commit is contained in:
commit
cfd34ef280
@ -21,6 +21,7 @@ Copyright 2016 SmartBear Software
|
||||
from __future__ import absolute_import
|
||||
|
||||
from . import models
|
||||
from . import ws_client
|
||||
from .rest import RESTClientObject
|
||||
from .rest import ApiException
|
||||
|
||||
@ -343,6 +344,15 @@ class ApiClient(object):
|
||||
"""
|
||||
Makes the HTTP request using RESTClient.
|
||||
"""
|
||||
# FIXME(dims) : We need a better way to figure out which
|
||||
# calls end up using web sockets
|
||||
if url.endswith('/exec') and method == "GET":
|
||||
return ws_client.GET(self.config,
|
||||
url,
|
||||
query_params=query_params,
|
||||
_request_timeout=_request_timeout,
|
||||
headers=headers)
|
||||
|
||||
if method == "GET":
|
||||
return self.rest_client.GET(url,
|
||||
query_params=query_params,
|
||||
|
||||
114
kubernetes/client/ws_client.py
Normal file
114
kubernetes/client/ws_client.py
Normal file
@ -0,0 +1,114 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from .rest import ApiException
|
||||
|
||||
import certifi
|
||||
import collections
|
||||
import websocket
|
||||
import six
|
||||
import ssl
|
||||
from six.moves.urllib.parse import urlencode
|
||||
from six.moves.urllib.parse import quote_plus
|
||||
|
||||
|
||||
class WSClient:
|
||||
def __init__(self, configuration, url, headers):
|
||||
self.messages = []
|
||||
self.errors = []
|
||||
websocket.enableTrace(False)
|
||||
header = None
|
||||
|
||||
# We just need to pass the Authorization, ignore all the other
|
||||
# http headers we get from the generated code
|
||||
if 'Authorization' in headers:
|
||||
header = "Authorization: %s" % headers['Authorization']
|
||||
|
||||
self.ws = websocket.WebSocketApp(url,
|
||||
on_message=self.on_message,
|
||||
on_error=self.on_error,
|
||||
on_close=self.on_close,
|
||||
header=[header] if header else None)
|
||||
self.ws.on_open = self.on_open
|
||||
|
||||
if url.startswith('wss://') and configuration.verify_ssl:
|
||||
ssl_opts = {
|
||||
'cert_reqs': ssl.CERT_REQUIRED,
|
||||
'keyfile': configuration.key_file,
|
||||
'certfile': configuration.cert_file,
|
||||
'ca_certs': configuration.ssl_ca_cert or certifi.where(),
|
||||
}
|
||||
if configuration.assert_hostname is not None:
|
||||
ssl_opts['check_hostname'] = configuration.assert_hostname
|
||||
else:
|
||||
ssl_opts = {'cert_reqs': ssl.CERT_NONE}
|
||||
|
||||
self.ws.run_forever(sslopt=ssl_opts)
|
||||
|
||||
def on_message(self, ws, message):
|
||||
if message[0] == '\x01':
|
||||
message = message[1:]
|
||||
if message:
|
||||
if six.PY3 and isinstance(message, six.binary_type):
|
||||
message = message.decode('utf-8')
|
||||
self.messages.append(message)
|
||||
|
||||
def on_error(self, ws, error):
|
||||
self.errors.append(error)
|
||||
|
||||
def on_close(self, ws):
|
||||
pass
|
||||
|
||||
def on_open(self, ws):
|
||||
pass
|
||||
|
||||
|
||||
WSResponse = collections.namedtuple('WSResponse', ['data'])
|
||||
|
||||
|
||||
def GET(configuration, url, query_params, _request_timeout, headers):
|
||||
# switch protocols from http to websocket
|
||||
url = url.replace('http://', 'ws://')
|
||||
url = url.replace('https://', 'wss://')
|
||||
|
||||
# patch extra /
|
||||
url = url.replace('//api', '/api')
|
||||
|
||||
# Extract the command from the list of tuples
|
||||
commands = None
|
||||
for key, value in query_params:
|
||||
if key == 'command':
|
||||
commands = value
|
||||
break
|
||||
|
||||
# drop command from query_params as we will be processing it separately
|
||||
query_params = [(key, value) for key, value in query_params if
|
||||
key != 'command']
|
||||
|
||||
# if we still have query params then encode them
|
||||
if query_params:
|
||||
url += '?' + urlencode(query_params)
|
||||
|
||||
# tack on the actual command to execute at the end
|
||||
if isinstance(commands, list):
|
||||
for command in commands:
|
||||
url += "&command=%s&" % quote_plus(command)
|
||||
else:
|
||||
url += '&command=' + quote_plus(commands)
|
||||
|
||||
client = WSClient(configuration, url, headers)
|
||||
if client.errors:
|
||||
raise ApiException(
|
||||
status=0,
|
||||
reason='\n'.join([str(error) for error in client.errors])
|
||||
)
|
||||
return WSResponse('%s' % ''.join(client.messages))
|
||||
@ -12,6 +12,7 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import time
|
||||
import unittest
|
||||
import uuid
|
||||
|
||||
@ -34,22 +35,49 @@ class TestClient(unittest.TestCase):
|
||||
client = api_client.ApiClient(self.API_URL, config=self.config)
|
||||
api = core_v1_api.CoreV1Api(client)
|
||||
|
||||
name = 'test-' + str(uuid.uuid4())
|
||||
pod_manifest = {'apiVersion': 'v1',
|
||||
'kind': 'Pod',
|
||||
'metadata': {'color': 'blue', 'name': name},
|
||||
'spec': {'containers': [{'image': 'dockerfile/redis',
|
||||
'name': 'redis'}]}}
|
||||
name = 'busybox-test-' + str(uuid.uuid4())
|
||||
pod_manifest = {
|
||||
'apiVersion': 'v1',
|
||||
'kind': 'Pod',
|
||||
'metadata': {
|
||||
'name': name
|
||||
},
|
||||
'spec': {
|
||||
'containers': [{
|
||||
'image': 'busybox',
|
||||
'name': 'sleep',
|
||||
"args": [
|
||||
"/bin/sh",
|
||||
"-c",
|
||||
"while true;do date;sleep 5; done"
|
||||
]
|
||||
}]
|
||||
}
|
||||
}
|
||||
|
||||
resp = api.create_namespaced_pod(body=pod_manifest,
|
||||
namespace='default')
|
||||
self.assertEqual(name, resp.metadata.name)
|
||||
self.assertTrue(resp.status.phase)
|
||||
|
||||
resp = api.read_namespaced_pod(name=name,
|
||||
namespace='default')
|
||||
self.assertEqual(name, resp.metadata.name)
|
||||
self.assertTrue(resp.status.phase)
|
||||
while True:
|
||||
resp = api.read_namespaced_pod(name=name,
|
||||
namespace='default')
|
||||
self.assertEqual(name, resp.metadata.name)
|
||||
self.assertTrue(resp.status.phase)
|
||||
if resp.status.phase != 'Pending':
|
||||
break
|
||||
time.sleep(1)
|
||||
|
||||
exec_command = ['/bin/sh',
|
||||
'-c',
|
||||
'for i in $(seq 1 3); do date; sleep 1; done']
|
||||
resp = api.connect_get_namespaced_pod_exec(name, 'default',
|
||||
command=exec_command,
|
||||
stderr=False, stdin=False,
|
||||
stdout=True, tty=False)
|
||||
print('EXEC response : %s' % resp)
|
||||
self.assertEqual(3, len(resp.splitlines()))
|
||||
|
||||
number_of_pods = len(api.list_pod_for_all_namespaces().items)
|
||||
self.assertTrue(number_of_pods > 0)
|
||||
|
||||
@ -1,9 +1,9 @@
|
||||
certifi >= 14.05.14
|
||||
six == 1.8.0
|
||||
six>=1.9.0
|
||||
python_dateutil >= 2.5.3
|
||||
setuptools >= 21.0.0
|
||||
urllib3 >= 1.19.1
|
||||
pyyaml >= 3.12
|
||||
oauth2client >= 4.0.0
|
||||
ipaddress >= 1.0.17
|
||||
|
||||
websocket-client>=0.32.0
|
||||
|
||||
Loading…
Reference in New Issue
Block a user