"""DB-API implementation backed by Trino See http://www.python.org/dev/peps/pep-0249/ Many docstrings in this file are based on the PEP, which is in the public domain. """ from __future__ import absolute_import from __future__ import unicode_literals import logging import requests # Make all exceptions visible in this module per DB-API from pyhive.common import DBAPITypeObject from pyhive.exc import * # noqa from pyhive.presto import Connection as PrestoConnection, Cursor as PrestoCursor, PrestoParamEscaper try: # Python 3 import urllib.parse as urlparse except ImportError: # Python 2 import urlparse # PEP 249 module globals apilevel = '2.0' threadsafety = 2 # Threads may share the module and connections. paramstyle = 'pyformat' # Python extended format codes, e.g. ...WHERE name=%(name)s _logger = logging.getLogger(__name__) class TrinoParamEscaper(PrestoParamEscaper): pass _escaper = TrinoParamEscaper() def connect(*args, **kwargs): """Constructor for creating a connection to the database. See class :py:class:`Connection` for arguments. :returns: a :py:class:`Connection` object. """ return Connection(*args, **kwargs) class Connection(PrestoConnection): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) def cursor(self): """Return a new :py:class:`Cursor` object using the connection.""" return Cursor(*self._args, **self._kwargs) class Cursor(PrestoCursor): """These objects represent a database cursor, which is used to manage the context of a fetch operation. Cursors are not isolated, i.e., any changes done to the database by a cursor are immediately visible by other cursors or connections. """ def execute(self, operation, parameters=None): """Prepare and execute a database operation (query or command). Return values are not defined. """ headers = { 'X-Trino-Catalog': self._catalog, 'X-Trino-Schema': self._schema, 'X-Trino-Source': self._source, 'X-Trino-User': self._username, } if self._session_props: headers['X-Trino-Session'] = ','.join( '{}={}'.format(propname, propval) for propname, propval in self._session_props.items() ) # Prepare statement if parameters is None: sql = operation else: sql = operation % _escaper.escape_args(parameters) self._reset_state() self._state = self._STATE_RUNNING url = urlparse.urlunparse(( self._protocol, '{}:{}'.format(self._host, self._port), '/v1/statement', None, None, None)) _logger.info('%s', sql) _logger.debug("Headers: %s", headers) response = self._requests_session.post( url, data=sql.encode('utf-8'), headers=headers, **self._requests_kwargs) self._process_response(response) def _process_response(self, response): """Given the JSON response from Trino's REST API, update the internal state with the next URI and any data from the response """ # TODO handle HTTP 503 if response.status_code != requests.codes.ok: fmt = "Unexpected status code {}\n{}" raise OperationalError(fmt.format(response.status_code, response.content)) response_json = response.json() _logger.debug("Got response %s", response_json) assert self._state == self._STATE_RUNNING, "Should be running if processing response" self._nextUri = response_json.get('nextUri') self._columns = response_json.get('columns') if 'id' in response_json: self.last_query_id = response_json['id'] if 'X-Trino-Clear-Session' in response.headers: propname = response.headers['X-Trino-Clear-Session'] self._session_props.pop(propname, None) if 'X-Trino-Set-Session' in response.headers: propname, propval = response.headers['X-Trino-Set-Session'].split('=', 1) self._session_props[propname] = propval if 'data' in response_json: assert self._columns new_data = response_json['data'] self._process_data(new_data) self._data += map(tuple, new_data) if 'nextUri' not in response_json: self._state = self._STATE_FINISHED if 'error' in response_json: raise DatabaseError(response_json['error']) # # Type Objects and Constructors # # See types in trino-main/src/main/java/com/facebook/trino/tuple/TupleInfo.java FIXED_INT_64 = DBAPITypeObject(['bigint']) VARIABLE_BINARY = DBAPITypeObject(['varchar']) DOUBLE = DBAPITypeObject(['double']) BOOLEAN = DBAPITypeObject(['boolean'])