# 🔍 Description This is the follow-up of #5686, renaming `./pyhive` to `./python`, and also adding `**/python/*` to RAT exclusion list temporarily. "PyHive" may not be a suitable name after being part of Apache Kyuubi, let's use a generic dir name `python`, and discuss the official name later(we probably keep the code at `./python` eventually). ## Types of changes 🔖 - [ ] Bugfix (non-breaking change which fixes an issue) - [ ] New feature (non-breaking change which adds functionality) - [ ] Breaking change (fix or feature that would cause existing functionality to change) ## Test Plan 🧪 Recover RAT checked. --- # Checklist 📝 - [x] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html) **Be nice. Be informative.** Closes #6279 from pan3793/pyhive-1. Closes #5686 42d338e71 [Cheng Pan] [KYUUBI #5686][FOLLOWUP] Rename pyhive to python Authored-by: Cheng Pan <chengpan@apache.org> Signed-off-by: Cheng Pan <chengpan@apache.org>
368 lines
15 KiB
Python
368 lines
15 KiB
Python
"""DB-API implementation backed by Presto
|
|
|
|
See http://www.python.org/dev/peps/pep-0249/
|
|
|
|
Many docstrings in this file are based on the PEP, which is in the public domain.
|
|
"""
|
|
|
|
from __future__ import absolute_import
|
|
from __future__ import unicode_literals
|
|
|
|
from builtins import object
|
|
from decimal import Decimal
|
|
|
|
from pyhive import common
|
|
from pyhive.common import DBAPITypeObject
|
|
# Make all exceptions visible in this module per DB-API
|
|
from pyhive.exc import * # noqa
|
|
import base64
|
|
import getpass
|
|
import datetime
|
|
import logging
|
|
import requests
|
|
from requests.auth import HTTPBasicAuth
|
|
import os
|
|
|
|
try: # Python 3
|
|
import urllib.parse as urlparse
|
|
except ImportError: # Python 2
|
|
import urlparse
|
|
|
|
|
|
# PEP 249 module globals
|
|
apilevel = '2.0'
|
|
threadsafety = 2 # Threads may share the module and connections.
|
|
paramstyle = 'pyformat' # Python extended format codes, e.g. ...WHERE name=%(name)s
|
|
|
|
_logger = logging.getLogger(__name__)
|
|
|
|
TYPES_CONVERTER = {
|
|
"decimal": Decimal,
|
|
# As of Presto 0.69, binary data is returned as the varbinary type in base64 format
|
|
"varbinary": base64.b64decode
|
|
}
|
|
|
|
class PrestoParamEscaper(common.ParamEscaper):
|
|
def escape_datetime(self, item, format):
|
|
_type = "timestamp" if isinstance(item, datetime.datetime) else "date"
|
|
formatted = super(PrestoParamEscaper, self).escape_datetime(item, format, 3)
|
|
return "{} {}".format(_type, formatted)
|
|
|
|
|
|
_escaper = PrestoParamEscaper()
|
|
|
|
|
|
def connect(*args, **kwargs):
|
|
"""Constructor for creating a connection to the database. See class :py:class:`Connection` for
|
|
arguments.
|
|
|
|
:returns: a :py:class:`Connection` object.
|
|
"""
|
|
return Connection(*args, **kwargs)
|
|
|
|
|
|
class Connection(object):
|
|
"""Presto does not have a notion of a persistent connection.
|
|
|
|
Thus, these objects are small stateless factories for cursors, which do all the real work.
|
|
"""
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
self._args = args
|
|
self._kwargs = kwargs
|
|
|
|
def close(self):
|
|
"""Presto does not have anything to close"""
|
|
# TODO cancel outstanding queries?
|
|
pass
|
|
|
|
def commit(self):
|
|
"""Presto does not support transactions"""
|
|
pass
|
|
|
|
def cursor(self):
|
|
"""Return a new :py:class:`Cursor` object using the connection."""
|
|
return Cursor(*self._args, **self._kwargs)
|
|
|
|
def rollback(self):
|
|
raise NotSupportedError("Presto does not have transactions") # pragma: no cover
|
|
|
|
|
|
class Cursor(common.DBAPICursor):
|
|
"""These objects represent a database cursor, which is used to manage the context of a fetch
|
|
operation.
|
|
|
|
Cursors are not isolated, i.e., any changes done to the database by a cursor are immediately
|
|
visible by other cursors or connections.
|
|
"""
|
|
|
|
def __init__(self, host, port='8080', username=None, principal_username=None, catalog='hive',
|
|
schema='default', poll_interval=1, source='pyhive', session_props=None,
|
|
protocol='http', password=None, requests_session=None, requests_kwargs=None,
|
|
KerberosRemoteServiceName=None, KerberosPrincipal=None,
|
|
KerberosConfigPath=None, KerberosKeytabPath=None,
|
|
KerberosCredentialCachePath=None, KerberosUseCanonicalHostname=None):
|
|
"""
|
|
:param host: hostname to connect to, e.g. ``presto.example.com``
|
|
:param port: int -- port, defaults to 8080
|
|
:param username: string -- defaults to system user name
|
|
:param principal_username: string -- defaults to ``username`` argument if it exists,
|
|
else defaults to system user name
|
|
:param catalog: string -- defaults to ``hive``
|
|
:param schema: string -- defaults to ``default``
|
|
:param poll_interval: float -- how often to ask the Presto REST interface for a progress
|
|
update, defaults to a second
|
|
:param source: string -- arbitrary identifier (shows up in the Presto monitoring page)
|
|
:param protocol: string -- network protocol, valid options are ``http`` and ``https``.
|
|
defaults to ``http``
|
|
:param password: string -- Deprecated. Defaults to ``None``.
|
|
Using BasicAuth, requires ``https``.
|
|
Prefer ``requests_kwargs={'auth': HTTPBasicAuth(username, password)}``.
|
|
May not be specified with ``requests_kwargs['auth']``.
|
|
:param requests_session: a ``requests.Session`` object for advanced usage. If absent, this
|
|
class will use the default requests behavior of making a new session per HTTP request.
|
|
Caller is responsible for closing session.
|
|
:param requests_kwargs: Additional ``**kwargs`` to pass to requests
|
|
:param KerberosRemoteServiceName: string -- Presto coordinator Kerberos service name.
|
|
This parameter is required for Kerberos authentiation.
|
|
:param KerberosPrincipal: string -- The principal to use when authenticating to
|
|
the Presto coordinator.
|
|
:param KerberosConfigPath: string -- Kerberos configuration file.
|
|
(default: /etc/krb5.conf)
|
|
:param KerberosKeytabPath: string -- Kerberos keytab file.
|
|
:param KerberosCredentialCachePath: string -- Kerberos credential cache.
|
|
:param KerberosUseCanonicalHostname: boolean -- Use the canonical hostname of the
|
|
Presto coordinator for the Kerberos service principal by first resolving the
|
|
hostname to an IP address and then doing a reverse DNS lookup for that IP address.
|
|
This is enabled by default.
|
|
"""
|
|
super(Cursor, self).__init__(poll_interval)
|
|
# Config
|
|
self._host = host
|
|
self._port = port
|
|
"""
|
|
Presto User Impersonation: https://docs.starburstdata.com/latest/security/impersonation.html
|
|
|
|
User impersonation allows the execution of queries in Presto based on principal_username
|
|
argument, instead of executing the query as the account which authenticated against Presto.
|
|
(Usually a service account)
|
|
|
|
Allows for a service account to authenticate with Presto, and then leverage the
|
|
principal_username as the user Presto will execute the query as. This is required by
|
|
applications that leverage authentication methods like SAML, where the application has a
|
|
username, but not a password to still leverage user specific Presto Resource Groups and
|
|
Authorization rules that would not be applied when only using a shared service account.
|
|
This also allows auditing of who is executing a query in these environments, instead of
|
|
having all queryes run by the shared service account.
|
|
"""
|
|
self._username = principal_username or username or getpass.getuser()
|
|
self._catalog = catalog
|
|
self._schema = schema
|
|
self._arraysize = 1
|
|
self._poll_interval = poll_interval
|
|
self._source = source
|
|
self._session_props = session_props if session_props is not None else {}
|
|
self.last_query_id = None
|
|
|
|
if protocol not in ('http', 'https'):
|
|
raise ValueError("Protocol must be http/https, was {!r}".format(protocol))
|
|
self._protocol = protocol
|
|
|
|
self._requests_session = requests_session or requests
|
|
|
|
requests_kwargs = dict(requests_kwargs) if requests_kwargs is not None else {}
|
|
|
|
if KerberosRemoteServiceName is not None:
|
|
from requests_kerberos import HTTPKerberosAuth, OPTIONAL
|
|
|
|
hostname_override = None
|
|
if KerberosUseCanonicalHostname is not None \
|
|
and KerberosUseCanonicalHostname.lower() == 'false':
|
|
hostname_override = host
|
|
if KerberosConfigPath is not None:
|
|
os.environ['KRB5_CONFIG'] = KerberosConfigPath
|
|
if KerberosKeytabPath is not None:
|
|
os.environ['KRB5_CLIENT_KTNAME'] = KerberosKeytabPath
|
|
if KerberosCredentialCachePath is not None:
|
|
os.environ['KRB5CCNAME'] = KerberosCredentialCachePath
|
|
|
|
requests_kwargs['auth'] = HTTPKerberosAuth(mutual_authentication=OPTIONAL,
|
|
principal=KerberosPrincipal,
|
|
service=KerberosRemoteServiceName,
|
|
hostname_override=hostname_override)
|
|
|
|
else:
|
|
if password is not None and 'auth' in requests_kwargs:
|
|
raise ValueError("Cannot use both password and requests_kwargs authentication")
|
|
for k in ('method', 'url', 'data', 'headers'):
|
|
if k in requests_kwargs:
|
|
raise ValueError("Cannot override requests argument {}".format(k))
|
|
if password is not None:
|
|
requests_kwargs['auth'] = HTTPBasicAuth(username, password)
|
|
if protocol != 'https':
|
|
raise ValueError("Protocol must be https when passing a password")
|
|
self._requests_kwargs = requests_kwargs
|
|
|
|
self._reset_state()
|
|
|
|
def _reset_state(self):
|
|
"""Reset state about the previous query in preparation for running another query"""
|
|
super(Cursor, self)._reset_state()
|
|
self._nextUri = None
|
|
self._columns = None
|
|
|
|
@property
|
|
def description(self):
|
|
"""This read-only attribute is a sequence of 7-item sequences.
|
|
|
|
Each of these sequences contains information describing one result column:
|
|
|
|
- name
|
|
- type_code
|
|
- display_size (None in current implementation)
|
|
- internal_size (None in current implementation)
|
|
- precision (None in current implementation)
|
|
- scale (None in current implementation)
|
|
- null_ok (always True in current implementation)
|
|
|
|
The ``type_code`` can be interpreted by comparing it to the Type Objects specified in the
|
|
section below.
|
|
"""
|
|
# Sleep until we're done or we got the columns
|
|
self._fetch_while(
|
|
lambda: self._columns is None and
|
|
self._state not in (self._STATE_NONE, self._STATE_FINISHED)
|
|
)
|
|
if self._columns is None:
|
|
return None
|
|
return [
|
|
# name, type_code, display_size, internal_size, precision, scale, null_ok
|
|
(col['name'], col['type'], None, None, None, None, True)
|
|
for col in self._columns
|
|
]
|
|
|
|
def execute(self, operation, parameters=None):
|
|
"""Prepare and execute a database operation (query or command).
|
|
|
|
Return values are not defined.
|
|
"""
|
|
headers = {
|
|
'X-Presto-Catalog': self._catalog,
|
|
'X-Presto-Schema': self._schema,
|
|
'X-Presto-Source': self._source,
|
|
'X-Presto-User': self._username,
|
|
}
|
|
|
|
if self._session_props:
|
|
headers['X-Presto-Session'] = ','.join(
|
|
'{}={}'.format(propname, propval)
|
|
for propname, propval in self._session_props.items()
|
|
)
|
|
|
|
# Prepare statement
|
|
if parameters is None:
|
|
sql = operation
|
|
else:
|
|
sql = operation % _escaper.escape_args(parameters)
|
|
|
|
self._reset_state()
|
|
|
|
self._state = self._STATE_RUNNING
|
|
url = urlparse.urlunparse((
|
|
self._protocol,
|
|
'{}:{}'.format(self._host, self._port), '/v1/statement', None, None, None))
|
|
_logger.info('%s', sql)
|
|
_logger.debug("Headers: %s", headers)
|
|
response = self._requests_session.post(
|
|
url, data=sql.encode('utf-8'), headers=headers, **self._requests_kwargs)
|
|
self._process_response(response)
|
|
|
|
def cancel(self):
|
|
if self._state == self._STATE_NONE:
|
|
raise ProgrammingError("No query yet")
|
|
if self._nextUri is None:
|
|
assert self._state == self._STATE_FINISHED, "Should be finished if nextUri is None"
|
|
return
|
|
|
|
response = self._requests_session.delete(self._nextUri, **self._requests_kwargs)
|
|
if response.status_code != requests.codes.no_content:
|
|
fmt = "Unexpected status code after cancel {}\n{}"
|
|
raise OperationalError(fmt.format(response.status_code, response.content))
|
|
|
|
self._state = self._STATE_FINISHED
|
|
self._nextUri = None
|
|
|
|
def poll(self):
|
|
"""Poll for and return the raw status data provided by the Presto REST API.
|
|
|
|
:returns: dict -- JSON status information or ``None`` if the query is done
|
|
:raises: ``ProgrammingError`` when no query has been started
|
|
|
|
.. note::
|
|
This is not a part of DB-API.
|
|
"""
|
|
if self._state == self._STATE_NONE:
|
|
raise ProgrammingError("No query yet")
|
|
if self._nextUri is None:
|
|
assert self._state == self._STATE_FINISHED, "Should be finished if nextUri is None"
|
|
return None
|
|
response = self._requests_session.get(self._nextUri, **self._requests_kwargs)
|
|
self._process_response(response)
|
|
return response.json()
|
|
|
|
def _fetch_more(self):
|
|
"""Fetch the next URI and update state"""
|
|
self._process_response(self._requests_session.get(self._nextUri, **self._requests_kwargs))
|
|
|
|
def _process_data(self, rows):
|
|
for i, col in enumerate(self.description):
|
|
col_type = col[1].split("(")[0].lower()
|
|
if col_type in TYPES_CONVERTER:
|
|
for row in rows:
|
|
if row[i] is not None:
|
|
row[i] = TYPES_CONVERTER[col_type](row[i])
|
|
|
|
def _process_response(self, response):
|
|
"""Given the JSON response from Presto's REST API, update the internal state with the next
|
|
URI and any data from the response
|
|
"""
|
|
# TODO handle HTTP 503
|
|
if response.status_code != requests.codes.ok:
|
|
fmt = "Unexpected status code {}\n{}"
|
|
raise OperationalError(fmt.format(response.status_code, response.content))
|
|
|
|
response_json = response.json()
|
|
_logger.debug("Got response %s", response_json)
|
|
assert self._state == self._STATE_RUNNING, "Should be running if processing response"
|
|
self._nextUri = response_json.get('nextUri')
|
|
self._columns = response_json.get('columns')
|
|
if 'id' in response_json:
|
|
self.last_query_id = response_json['id']
|
|
if 'X-Presto-Clear-Session' in response.headers:
|
|
propname = response.headers['X-Presto-Clear-Session']
|
|
self._session_props.pop(propname, None)
|
|
if 'X-Presto-Set-Session' in response.headers:
|
|
propname, propval = response.headers['X-Presto-Set-Session'].split('=', 1)
|
|
self._session_props[propname] = propval
|
|
if 'data' in response_json:
|
|
assert self._columns
|
|
new_data = response_json['data']
|
|
self._process_data(new_data)
|
|
self._data += map(tuple, new_data)
|
|
if 'nextUri' not in response_json:
|
|
self._state = self._STATE_FINISHED
|
|
if 'error' in response_json:
|
|
raise DatabaseError(response_json['error'])
|
|
|
|
|
|
#
|
|
# Type Objects and Constructors
|
|
#
|
|
|
|
|
|
# See types in presto-main/src/main/java/com/facebook/presto/tuple/TupleInfo.java
|
|
FIXED_INT_64 = DBAPITypeObject(['bigint'])
|
|
VARIABLE_BINARY = DBAPITypeObject(['varchar'])
|
|
DOUBLE = DBAPITypeObject(['double'])
|
|
BOOLEAN = DBAPITypeObject(['boolean'])
|