kyuubi/python/pyhive/tests/test_sasl_compat.py
Cheng Pan f8c7b93f55
[KYUUBI #5686][FOLLOWUP] Rename pyhive to python
# 🔍 Description

This is the follow-up of #5686, renaming `./pyhive` to `./python`, and also adding `**/python/*` to RAT exclusion list temporarily.

"PyHive" may not be a suitable name after being part of Apache Kyuubi, let's use a generic dir name `python`, and discuss the official name later(we probably keep the code at `./python` eventually).

## Types of changes 🔖

- [ ] Bugfix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing functionality to change)

## Test Plan 🧪

Recover RAT checked.

---

# Checklist 📝

- [x] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html)

**Be nice. Be informative.**

Closes #6279 from pan3793/pyhive-1.

Closes #5686

42d338e71 [Cheng Pan] [KYUUBI #5686][FOLLOWUP] Rename pyhive to python

Authored-by: Cheng Pan <chengpan@apache.org>
Signed-off-by: Cheng Pan <chengpan@apache.org>
2024-04-09 20:30:02 +08:00

334 lines
14 KiB
Python

'''
http://www.opensource.org/licenses/mit-license.php
Copyright 2007-2011 David Alan Cridland
Copyright 2011 Lance Stout
Copyright 2012 Tyler L Hobbs
Permission is hereby granted, free of charge, to any person obtaining a copy of this
software and associated documentation files (the "Software"), to deal in the Software
without restriction, including without limitation the rights to use, copy, modify, merge,
publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons
to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or
substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED,
INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE
FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE.
'''
# This file was generated by referring test cases from the pure-sasl repo i.e. https://github.com/thobbs/pure-sasl/tree/master/tests/unit
# and by refactoring them to cover wrapper functions in sasl_compat.py along with added coverage for functions exclusive to sasl_compat.py.
import unittest
import base64
import hashlib
import hmac
import kerberos
from mock import patch
import six
import struct
from puresasl import SASLProtocolException, QOP
from puresasl.client import SASLError
from pyhive.sasl_compat import PureSASLClient, error_catcher
class TestPureSASLClient(unittest.TestCase):
"""Test cases for initialization of SASL client using PureSASLClient class"""
def setUp(self):
self.sasl_kwargs = {}
self.sasl = PureSASLClient('localhost', **self.sasl_kwargs)
def test_start_no_mechanism(self):
"""Test starting SASL authentication with no mechanism."""
success, mechanism, response = self.sasl.start(mechanism=None)
self.assertFalse(success)
self.assertIsNone(mechanism)
self.assertIsNone(response)
self.assertEqual(self.sasl.getError(), 'None of the mechanisms listed meet all required properties')
def test_start_wrong_mechanism(self):
"""Test starting SASL authentication with a single unsupported mechanism."""
success, mechanism, response = self.sasl.start(mechanism='WRONG')
self.assertFalse(success)
self.assertEqual(mechanism, 'WRONG')
self.assertIsNone(response)
self.assertEqual(self.sasl.getError(), 'None of the mechanisms listed meet all required properties')
def test_start_list_of_invalid_mechanisms(self):
"""Test starting SASL authentication with a list of unsupported mechanisms."""
self.sasl.start(['invalid1', 'invalid2'])
self.assertEqual(self.sasl.getError(), 'None of the mechanisms listed meet all required properties')
def test_start_list_of_valid_mechanisms(self):
"""Test starting SASL authentication with a list of supported mechanisms."""
self.sasl.start(['PLAIN', 'DIGEST-MD5', 'CRAM-MD5'])
# Validate right mechanism is chosen based on score.
self.assertEqual(self.sasl._chosen_mech.name, 'DIGEST-MD5')
def test_error_catcher_no_error(self):
"""Test the error_catcher with no error."""
with error_catcher(self.sasl):
result, _, _ = self.sasl.start(mechanism='ANONYMOUS')
self.assertEqual(self.sasl.getError(), None)
self.assertEqual(result, True)
def test_error_catcher_with_error(self):
"""Test the error_catcher with an error."""
with error_catcher(self.sasl):
result, _, _ = self.sasl.start(mechanism='WRONG')
self.assertEqual(result, False)
self.assertEqual(self.sasl.getError(), 'None of the mechanisms listed meet all required properties')
"""Assuming Client initilization went well and a mechanism is chosen, Below are the test cases for different mechanims"""
class _BaseMechanismTests(unittest.TestCase):
"""Base test case for SASL mechanisms."""
mechanism = 'ANONYMOUS'
sasl_kwargs = {}
def setUp(self):
self.sasl = PureSASLClient('localhost', mechanism=self.mechanism, **self.sasl_kwargs)
self.mechanism_class = self.sasl._chosen_mech
def test_init_basic(self, *args):
sasl = PureSASLClient('localhost', mechanism=self.mechanism, **self.sasl_kwargs)
mech = sasl._chosen_mech
self.assertIs(mech.sasl, sasl)
def test_step_basic(self, *args):
success, response = self.sasl.step(six.b('string'))
self.assertTrue(success)
self.assertIsInstance(response, six.binary_type)
def test_decode_encode(self, *args):
self.assertEqual(self.sasl.encode('msg'), (False, None))
self.assertEqual(self.sasl.getError(), '')
self.assertEqual(self.sasl.decode('msg'), (False, None))
self.assertEqual(self.sasl.getError(), '')
class AnonymousMechanismTest(_BaseMechanismTests):
"""Test case for the Anonymous SASL mechanism."""
mechanism = 'ANONYMOUS'
class PlainTextMechanismTest(_BaseMechanismTests):
"""Test case for the PlainText SASL mechanism."""
mechanism = 'PLAIN'
username = 'user'
password = 'pass'
sasl_kwargs = {'username': username, 'password': password}
def test_step(self):
for challenge in (None, '', b'asdf', u"\U0001F44D"):
success, response = self.sasl.step(challenge)
self.assertTrue(success)
self.assertEqual(response, six.b(f'\x00{self.username}\x00{self.password}'))
self.assertIsInstance(response, six.binary_type)
def test_step_with_authorization_id_or_identity(self):
challenge = u"\U0001F44D"
identity = 'user2'
# Test that we can pass an identity
sasl_kwargs = self.sasl_kwargs.copy()
sasl_kwargs.update({'identity': identity})
sasl = PureSASLClient('localhost', mechanism=self.mechanism, **sasl_kwargs)
success, response = sasl.step(challenge)
self.assertTrue(success)
self.assertEqual(response, six.b(f'{identity}\x00{self.username}\x00{self.password}'))
self.assertIsInstance(response, six.binary_type)
self.assertTrue(sasl.complete)
# Test that the sasl authorization_id has priority over identity
auth_id = 'user3'
sasl_kwargs.update({'authorization_id': auth_id})
sasl = PureSASLClient('localhost', mechanism=self.mechanism, **sasl_kwargs)
success, response = sasl.step(challenge)
self.assertTrue(success)
self.assertEqual(response, six.b(f'{auth_id}\x00{self.username}\x00{self.password}'))
self.assertIsInstance(response, six.binary_type)
self.assertTrue(sasl.complete)
def test_decode_encode(self):
msg = 'msg'
self.assertEqual(self.sasl.decode(msg), (True, msg))
self.assertEqual(self.sasl.encode(msg), (True, msg))
class ExternalMechanismTest(_BaseMechanismTests):
"""Test case for the External SASL mechanisms"""
mechanism = 'EXTERNAL'
def test_step(self):
self.assertEqual(self.sasl.step(), (True, b''))
def test_decode_encode(self):
msg = 'msg'
self.assertEqual(self.sasl.decode(msg), (True, msg))
self.assertEqual(self.sasl.encode(msg), (True, msg))
@patch('puresasl.mechanisms.kerberos.authGSSClientStep')
@patch('puresasl.mechanisms.kerberos.authGSSClientResponse', return_value=base64.b64encode(six.b('some\x00 response')))
class GSSAPIMechanismTest(_BaseMechanismTests):
"""Test case for the GSSAPI SASL mechanism."""
mechanism = 'GSSAPI'
service = 'GSSAPI'
sasl_kwargs = {'service': service}
@patch('puresasl.mechanisms.kerberos.authGSSClientWrap')
@patch('puresasl.mechanisms.kerberos.authGSSClientUnwrap')
def test_decode_encode(self, _inner1, _inner2, authGSSClientResponse, *args):
# bypassing step setup by setting qop directly
self.mechanism_class.qop = QOP.AUTH
msg = b'msg'
self.assertEqual(self.sasl.decode(msg), (True, msg))
self.assertEqual(self.sasl.encode(msg), (True, msg))
# Test for behavior with different QOP like data integrity and confidentiality for Kerberos authentication
for qop in (QOP.AUTH_INT, QOP.AUTH_CONF):
self.mechanism_class.qop = qop
with patch('puresasl.mechanisms.kerberos.authGSSClientResponseConf', return_value=1):
self.assertEqual(self.sasl.decode(msg), (True, base64.b64decode(authGSSClientResponse.return_value)))
self.assertEqual(self.sasl.encode(msg), (True, base64.b64decode(authGSSClientResponse.return_value)))
if qop == QOP.AUTH_CONF:
with patch('puresasl.mechanisms.kerberos.authGSSClientResponseConf', return_value=0):
self.assertEqual(self.sasl.encode(msg), (False, None))
self.assertEqual(self.sasl.getError(), 'Error: confidentiality requested, but not honored by the server.')
def test_step_no_user(self, authGSSClientResponse, *args):
msg = six.b('whatever')
# no user
self.assertEqual(self.sasl.step(msg), (True, base64.b64decode(authGSSClientResponse.return_value)))
with patch('puresasl.mechanisms.kerberos.authGSSClientResponse', return_value=''):
self.assertEqual(self.sasl.step(msg), (True, six.b('')))
username = 'username'
# with user; this has to be last because it sets mechanism.user
with patch('puresasl.mechanisms.kerberos.authGSSClientStep', return_value=kerberos.AUTH_GSS_COMPLETE):
with patch('puresasl.mechanisms.kerberos.authGSSClientUserName', return_value=six.b(username)):
self.assertEqual(self.sasl.step(msg), (True, six.b('')))
self.assertEqual(self.mechanism_class.user, six.b(username))
@patch('puresasl.mechanisms.kerberos.authGSSClientUnwrap')
def test_step_qop(self, *args):
self.mechanism_class._have_negotiated_details = True
self.mechanism_class.user = 'user'
msg = six.b('msg')
self.assertEqual(self.sasl.step(msg), (False, None))
self.assertEqual(self.sasl.getError(), 'Bad response from server')
max_len = 100
self.assertLess(max_len, self.sasl.max_buffer)
for i, qop in QOP.bit_map.items():
qop_size = struct.pack('!i', i << 24 | max_len)
response = base64.b64encode(qop_size)
with patch('puresasl.mechanisms.kerberos.authGSSClientResponse', return_value=response):
with patch('puresasl.mechanisms.kerberos.authGSSClientWrap') as authGSSClientWrap:
self.mechanism_class.complete = False
self.assertEqual(self.sasl.step(msg), (True, qop_size))
self.assertTrue(self.mechanism_class.complete)
self.assertEqual(self.mechanism_class.qop, qop)
self.assertEqual(self.mechanism_class.max_buffer, max_len)
args = authGSSClientWrap.call_args[0]
out_data = args[1]
out = base64.b64decode(out_data)
self.assertEqual(out[:4], qop_size)
self.assertEqual(out[4:], six.b(self.mechanism_class.user))
class CramMD5MechanismTest(_BaseMechanismTests):
"""Test case for the CRAM-MD5 SASL mechanism."""
mechanism = 'CRAM-MD5'
username = 'user'
password = 'pass'
sasl_kwargs = {'username': username, 'password': password}
def test_step(self):
success, response = self.sasl.step(None)
self.assertTrue(success)
self.assertIsNone(response)
challenge = six.b('msg')
hash = hmac.HMAC(key=six.b(self.password), digestmod=hashlib.md5)
hash.update(challenge)
success, response = self.sasl.step(challenge)
self.assertTrue(success)
self.assertIn(six.b(self.username), response)
self.assertIn(six.b(hash.hexdigest()), response)
self.assertIsInstance(response, six.binary_type)
self.assertTrue(self.sasl.complete)
def test_decode_encode(self):
msg = 'msg'
self.assertEqual(self.sasl.decode(msg), (True, msg))
self.assertEqual(self.sasl.encode(msg), (True, msg))
class DigestMD5MechanismTest(_BaseMechanismTests):
"""Test case for the DIGEST-MD5 SASL mechanism."""
mechanism = 'DIGEST-MD5'
username = 'user'
password = 'pass'
sasl_kwargs = {'username': username, 'password': password}
def test_decode_encode(self):
msg = 'msg'
self.assertEqual(self.sasl.decode(msg), (True, msg))
self.assertEqual(self.sasl.encode(msg), (True, msg))
def test_step_basic(self, *args):
pass
def test_step(self):
"""Test a SASL step with dummy challenge for DIGEST-MD5 mechanism."""
testChallenge = (
b'nonce="rmD6R8aMYVWH+/ih9HGBr3xNGAR6o2DUxpKlgDz6gUQ=",r'
b'ealm="example.org",qop="auth,auth-int,auth-conf",cipher="rc4-40,rc'
b'4-56,rc4,des,3des",maxbuf=65536,charset=utf-8,algorithm=md5-sess'
)
result, response = self.sasl.step(testChallenge)
self.assertTrue(result)
self.assertIsNotNone(response)
def test_step_server_answer(self):
"""Test a SASL step with a proper server answer for DIGEST-MD5 mechanism."""
sasl_kwargs = {'username': "chris", 'password': "secret"}
sasl = PureSASLClient('elwood.innosoft.com',
service="imap",
mechanism=self.mechanism,
mutual_auth=True,
**sasl_kwargs)
testChallenge = (
b'utf-8,username="chris",realm="elwood.innosoft.com",'
b'nonce="OA6MG9tEQGm2hh",nc=00000001,cnonce="OA6MHXh6VqTrRk",'
b'digest-uri="imap/elwood.innosoft.com",'
b'response=d388dad90d4bbd760a152321f2143af7,qop=auth'
)
sasl.step(testChallenge)
sasl._chosen_mech.cnonce = b"OA6MHXh6VqTrRk"
serverResponse = (
b'rspauth=ea40f60335c427b5527b84dbabcdfffd'
)
sasl.step(serverResponse)
# assert that step choses the only supported QOP for for DIGEST-MD5
self.assertEqual(self.sasl.qop, QOP.AUTH)