7

phoenix-queryserver.git

 2 years ago
source link: https://git-wip-us.apache.org/repos/asf?p=phoenix-queryserver.git%3Ba%3Dblob%3Bf%3Dpython-phoenixdb%2Fphoenixdb%2Favatica%2Fclient.py
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

ASF Git Repos - phoenix-queryserver.git/blob - python-phoenixdb/phoenixdb/avatica/client.py

? search:

re

48b6406670e326084e8e7546f1305fbb5969d01d
1 # Copyright 2015 Lukas Lalinsky
2 #
3 # Licensed to the Apache Software Foundation (ASF) under one or more
4 # contributor license agreements. See the NOTICE file distributed with
5 # this work for additional information regarding copyright ownership.
6 # The ASF licenses this file to You under the Apache License, Version 2.0
7 # (the "License"); you may not use this file except in compliance with
8 # the License. You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
18 """Implementation of the PROTOBUF-over-HTTP RPC protocol used by Avatica."""
20 import logging
21 import math
22 import pprint
23 import re
24 import time
26 from phoenixdb import errors
27 from phoenixdb.avatica.proto import common_pb2, requests_pb2, responses_pb2
29 import requests
31 try:
32 import urlparse
33 except ImportError:
34 import urllib.parse as urlparse
36 try:
37 from HTMLParser import HTMLParser
38 except ImportError:
39 from html.parser import HTMLParser
41 __all__ = ['AvaticaClient']
43 logger = logging.getLogger(__name__)
46 class JettyErrorPageParser(HTMLParser):
48 def __init__(self):
49 HTMLParser.__init__(self)
50 self.path = []
51 self.title = []
52 self.message = []
54 def handle_starttag(self, tag, attrs):
55 self.path.append(tag)
57 def handle_endtag(self, tag):
58 self.path.pop()
60 def handle_data(self, data):
61 if len(self.path) > 2 and self.path[0] == 'html' and self.path[1] == 'body':
62 if len(self.path) == 3 and self.path[2] == 'h2':
63 self.title.append(data.strip())
64 elif len(self.path) == 4 and self.path[2] == 'p' and self.path[3] == 'pre':
65 self.message.append(data.strip())
68 def parse_url(url):
69 url = urlparse.urlparse(url)
70 if not url.scheme and not url.netloc and url.path:
71 netloc = url.path
72 if ':' not in netloc:
73 netloc = '{}:8765'.format(netloc)
74 return urlparse.ParseResult('http', netloc, '/', '', '', '')
75 return url
78 # Defined in phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
79 SQLSTATE_ERROR_CLASSES = [
80 ('08', errors.OperationalError), # Connection Exception
81 ('22018', errors.IntegrityError), # Constraint violatioin.
82 ('22', errors.DataError), # Data Exception
83 ('23', errors.IntegrityError), # Constraint Violation
84 ('24', errors.InternalError), # Invalid Cursor State
85 ('25', errors.InternalError), # Invalid Transaction State
86 ('42', errors.ProgrammingError), # Syntax Error or Access Rule Violation
87 ('XLC', errors.OperationalError), # Execution exceptions
88 ('INT', errors.InternalError), # Phoenix internal error
89 ]
92 def raise_sql_error(code, sqlstate, message):
93 for prefix, error_class in SQLSTATE_ERROR_CLASSES:
94 if sqlstate.startswith(prefix):
95 raise error_class(message, code, sqlstate)
96 raise errors.InternalError(message, code, sqlstate)
99 def parse_and_raise_sql_error(message):
100 match = re.findall(r'(?:([^ ]+): )?ERROR (\d+) \(([0-9A-Z]{5})\): (.*?) ->', message)
101 if match is not None and len(match):
102 exception, code, sqlstate, message = match[0]
103 raise_sql_error(int(code), sqlstate, message)
106 def parse_error_page(html):
107 parser = JettyErrorPageParser()
108 parser.feed(html)
109 if parser.title == ['HTTP ERROR: 500']:
110 message = ' '.join(parser.message).strip()
111 parse_and_raise_sql_error(message)
112 raise errors.InternalError(message)
115 def parse_error_protobuf(text):
116 try:
117 message = common_pb2.WireMessage()
118 message.ParseFromString(text)
120 err = responses_pb2.ErrorResponse()
121 if not err.ParseFromString(message.wrapped_message):
122 raise Exception('No error message found')
123 except Exception:
124 # Not a protobuf error, fall through
125 return
127 parse_and_raise_sql_error(err.error_message)
128 raise_sql_error(err.error_code, err.sql_state, err.error_message)
129 # Not a protobuf error, fall through
132 class AvaticaClient(object):
133 """Client for Avatica's RPC server.
135 This exposes all low-level functionality that the Avatica
136 server provides, using the native terminology. You most likely
137 do not want to use this class directly, but rather get connect
138 to a server using :func:`phoenixdb.connect`.
139 """
141 def __init__(self, url, max_retries=None, auth=None, verify=None):
142 """Constructs a new client object.
144 :param url:
145 URL of an Avatica RPC server.
146 """
147 self.url = parse_url(url)
148 self.max_retries = max_retries if max_retries is not None else 3
149 self.auth = auth
150 self.verify = verify
151 self.session = None
153 def __del__(self):
154 """Finalizer. Calls close() to close any open sessions"""
155 self.close()
157 def connect(self):
158 """Open the session on the the first request instead"""
159 pass
161 def close(self):
162 if self.session:
163 self.session.close()
164 self.session = None
166 def _post_request(self, body, headers):
167 # Create the session if we haven't before
168 if not self.session:
169 logger.debug("Creating a new Session")
170 self.session = requests.Session()
171 self.session.headers.update(headers)
172 self.session.stream = True
173 if self.auth is not None:
174 self.session.auth = self.auth
176 retry_count = self.max_retries
177 while True:
178 logger.debug("POST %s %r %r", self.url.geturl(), body, self.session.headers)
180 requestArgs = {'data': body}
182 # Setting verify on the Session is not the same as setting it
183 # as a request arg
184 if self.verify is not None:
185 requestArgs.update(verify=self.verify)
187 try:
188 response = self.session.post(self.url.geturl(), **requestArgs)
190 except requests.HTTPError as e:
191 if retry_count > 0:
192 delay = math.exp(-retry_count)
193 logger.debug("HTTP protocol error, will retry in %s seconds...", delay, exc_info=True)
194 time.sleep(delay)
195 retry_count -= 1
196 continue
197 raise errors.InterfaceError('RPC request failed', cause=e)
198 else:
199 if response.status_code == requests.codes.service_unavailable:
200 if retry_count > 0:
201 delay = math.exp(-retry_count)
202 logger.debug("Service unavailable, will retry in %s seconds...", delay, exc_info=True)
203 time.sleep(delay)
204 retry_count -= 1
205 continue
206 return response
208 def _apply(self, request_data, expected_response_type=None):
209 logger.debug("Sending request\n%s", pprint.pformat(request_data))
211 request_name = request_data.__class__.__name__
212 message = common_pb2.WireMessage()
213 message.name = 'org.apache.calcite.avatica.proto.Requests${}'.format(request_name)
214 message.wrapped_message = request_data.SerializeToString()
215 body = message.SerializeToString()
216 headers = {'content-type': 'application/x-google-protobuf'}
218 response = self._post_request(body, headers)
219 response_body = response.raw.read()
221 if response.status_code != requests.codes.ok:
222 logger.debug("Received response\n%s", response_body)
223 if b'<html>' in response_body:
224 parse_error_page(response_body.decode(response.encoding))
225 else:
226 # assume the response is in protobuf format
227 parse_error_protobuf(response_body)
228 raise errors.InterfaceError('RPC request returned invalid status code', response.status_code)
230 message = common_pb2.WireMessage()
231 message.ParseFromString(response_body)
233 logger.debug("Received response\n%s", message)
235 if expected_response_type is None:
236 expected_response_type = request_name.replace('Request', 'Response')
238 expected_response_type = 'org.apache.calcite.avatica.proto.Responses$' + expected_response_type
239 if message.name != expected_response_type:
240 raise errors.InterfaceError('unexpected response type "{}" expected "{}"'.format(message.name, expected_response_type))
242 return message.wrapped_message
244 def get_catalogs(self, connection_id):
245 request = requests_pb2.CatalogsRequest()
246 request.connection_id = connection_id
247 response_data = self._apply(request, 'ResultSetResponse')
248 response = responses_pb2.ResultSetResponse()
249 response.ParseFromString(response_data)
250 return response
252 def get_schemas(self, connection_id, catalog=None, schemaPattern=None):
253 request = requests_pb2.SchemasRequest()
254 request.connection_id = connection_id
255 if catalog is not None:
256 request.catalog = catalog
257 if schemaPattern is not None:
258 request.schema_pattern = schemaPattern
259 response_data = self._apply(request, 'ResultSetResponse')
260 response = responses_pb2.ResultSetResponse()
261 response.ParseFromString(response_data)
262 return response
264 def get_tables(self, connection_id, catalog=None, schemaPattern=None, tableNamePattern=None, typeList=None):
265 request = requests_pb2.TablesRequest()
266 request.connection_id = connection_id
267 if catalog is not None:
268 request.catalog = catalog
269 if schemaPattern is not None:
270 request.schema_pattern = schemaPattern
271 if tableNamePattern is not None:
272 request.table_name_pattern = tableNamePattern
273 if typeList is not None:
274 request.type_list.extend(typeList)
275 request.has_type_list = typeList is not None
276 response_data = self._apply(request, 'ResultSetResponse')
277 response = responses_pb2.ResultSetResponse()
278 response.ParseFromString(response_data)
279 return response
281 def get_columns(self, connection_id, catalog=None, schemaPattern=None, tableNamePattern=None, columnNamePattern=None):
282 request = requests_pb2.ColumnsRequest()
283 request.connection_id = connection_id
284 if catalog is not None:
285 request.catalog = catalog
286 if schemaPattern is not None:
287 request.schema_pattern = schemaPattern
288 if tableNamePattern is not None:
289 request.table_name_pattern = tableNamePattern
290 if columnNamePattern is not None:
291 request.column_name_pattern = columnNamePattern
292 response_data = self._apply(request, 'ResultSetResponse')
293 response = responses_pb2.ResultSetResponse()
294 response.ParseFromString(response_data)
295 return response
297 def get_table_types(self, connection_id):
298 request = requests_pb2.TableTypesRequest()
299 request.connection_id = connection_id
300 response_data = self._apply(request, 'ResultSetResponse')
301 response = responses_pb2.ResultSetResponse()
302 response.ParseFromString(response_data)
303 return response
305 def get_type_info(self, connection_id):
306 request = requests_pb2.TypeInfoRequest()
307 request.connection_id = connection_id
308 response_data = self._apply(request, 'ResultSetResponse')
309 response = responses_pb2.ResultSetResponse()
310 response.ParseFromString(response_data)
311 return response
313 def get_sync_results(self, connection_id, statement_id, state):
314 request = requests_pb2.SyncResultsRequest()
315 request.connection_id = connection_id
316 request.statement_id = statement_id
317 request.state.CopyFrom(state)
318 response_data = self._apply(request, 'SyncResultsResponse')
319 syncResultResponse = responses_pb2.SyncResultsResponse()
320 syncResultResponse.ParseFromString(response_data)
321 return syncResultResponse
323 def connection_sync_dict(self, connection_id, connProps=None):
324 conn_props = self.connection_sync(connection_id, connProps)
325 return {
326 'autoCommit': conn_props.auto_commit,
327 'readOnly': conn_props.read_only,
328 'transactionIsolation': conn_props.transaction_isolation,
329 'catalog': conn_props.catalog,
330 'schema': conn_props.schema}
332 def connection_sync(self, connection_id, connProps=None):
333 """Synchronizes connection properties with the server.
335 :param connection_id:
336 ID of the current connection.
338 :param connProps:
339 Dictionary with the properties that should be changed.
341 :returns:
342 A ``common_pb2.ConnectionProperties`` object.
343 """
344 if connProps:
345 props = connProps.copy()
346 else:
347 props = {}
349 request = requests_pb2.ConnectionSyncRequest()
350 request.connection_id = connection_id
351 request.conn_props.has_auto_commit = True
352 request.conn_props.has_read_only = True
353 if 'autoCommit' in props:
354 request.conn_props.auto_commit = props.pop('autoCommit')
355 if 'readOnly' in props:
356 request.conn_props.read_only = props.pop('readOnly')
357 if 'transactionIsolation' in props:
358 request.conn_props.transaction_isolation = props.pop('transactionIsolation', None)
359 if 'catalog' in props:
360 request.conn_props.catalog = props.pop('catalog', None)
361 if 'schema' in props:
362 request.conn_props.schema = props.pop('schema', None)
364 if props:
365 logger.warning("Unhandled connection property:" + props)
367 response_data = self._apply(request)
368 response = responses_pb2.ConnectionSyncResponse()
369 response.ParseFromString(response_data)
370 return response.conn_props
372 def open_connection(self, connection_id, info=None):
373 """Opens a new connection.
375 :param connection_id:
376 ID of the connection to open.
377 """
378 request = requests_pb2.OpenConnectionRequest()
379 request.connection_id = connection_id
380 if info is not None:
381 # Info is a list of repeated pairs, setting a dict directly fails
382 for k, v in info.items():
383 request.info[k] = v
385 response_data = self._apply(request)
386 response = responses_pb2.OpenConnectionResponse()
387 response.ParseFromString(response_data)
389 def close_connection(self, connection_id):
390 """Closes a connection.
392 :param connection_id:
393 ID of the connection to close.
394 """
395 request = requests_pb2.CloseConnectionRequest()
396 request.connection_id = connection_id
397 self._apply(request)
399 def create_statement(self, connection_id):
400 """Creates a new statement.
402 :param connection_id:
403 ID of the current connection.
405 :returns:
406 New statement ID.
407 """
408 request = requests_pb2.CreateStatementRequest()
409 request.connection_id = connection_id
411 response_data = self._apply(request)
412 response = responses_pb2.CreateStatementResponse()
413 response.ParseFromString(response_data)
414 return response.statement_id
416 def close_statement(self, connection_id, statement_id):
417 """Closes a statement.
419 :param connection_id:
420 ID of the current connection.
422 :param statement_id:
423 ID of the statement to close.
424 """
425 request = requests_pb2.CloseStatementRequest()
426 request.connection_id = connection_id
427 request.statement_id = statement_id
429 self._apply(request)
431 def prepare_and_execute(self, connection_id, statement_id, sql, max_rows_total=None, first_frame_max_size=None):
432 """Prepares and immediately executes a statement.
434 :param connection_id:
435 ID of the current connection.
437 :param statement_id:
438 ID of the statement to prepare.
440 :param sql:
441 SQL query.
443 :param max_rows_total:
444 The maximum number of rows that will be allowed for this query.
446 :param first_frame_max_size:
447 The maximum number of rows that will be returned in the first Frame returned for this query.
449 :returns:
450 Result set with the signature of the prepared statement and the first frame data.
451 """
452 request = requests_pb2.PrepareAndExecuteRequest()
453 request.connection_id = connection_id
454 request.statement_id = statement_id
455 request.sql = sql
456 if max_rows_total is not None:
457 request.max_rows_total = max_rows_total
458 if first_frame_max_size is not None:
459 request.first_frame_max_size = first_frame_max_size
461 response_data = self._apply(request, 'ExecuteResponse')
462 response = responses_pb2.ExecuteResponse()
463 response.ParseFromString(response_data)
464 return response.results
466 def prepare(self, connection_id, sql, max_rows_total=None):
467 """Prepares a statement.
469 :param connection_id:
470 ID of the current connection.
472 :param sql:
473 SQL query.
475 :param max_rows_total:
476 The maximum number of rows that will be allowed for this query.
478 :returns:
479 Signature of the prepared statement.
480 """
481 request = requests_pb2.PrepareRequest()
482 request.connection_id = connection_id
483 request.sql = sql
484 if max_rows_total is not None:
485 request.max_rows_total = max_rows_total
487 response_data = self._apply(request)
488 response = responses_pb2.PrepareResponse()
489 response.ParseFromString(response_data)
490 return response.statement
492 def execute(self, connection_id, statement_id, signature, parameter_values=None, first_frame_max_size=None):
493 """Returns a frame of rows.
495 The frame describes whether there may be another frame. If there is not
496 another frame, the current iteration is done when we have finished the
497 rows in the this frame.
499 :param connection_id:
500 ID of the current connection.
502 :param statement_id:
503 ID of the statement to fetch rows from.
505 :param signature:
506 common_pb2.Signature object
508 :param parameter_values:
509 A list of parameter values, if statement is to be executed; otherwise ``None``.
511 :param first_frame_max_size:
512 The maximum number of rows that will be returned in the first Frame returned for this query.
514 :returns:
515 Frame data, or ``None`` if there are no more.
516 """
517 request = requests_pb2.ExecuteRequest()
518 request.statementHandle.id = statement_id
519 request.statementHandle.connection_id = connection_id
520 request.statementHandle.signature.CopyFrom(signature)
521 if parameter_values is not None:
522 request.parameter_values.extend(parameter_values)
523 request.has_parameter_values = True
524 if first_frame_max_size is not None:
525 request.deprecated_first_frame_max_size = first_frame_max_size
526 request.first_frame_max_size = first_frame_max_size
528 response_data = self._apply(request)
529 response = responses_pb2.ExecuteResponse()
530 response.ParseFromString(response_data)
531 return response.results
533 def execute_batch(self, connection_id, statement_id, rows):
534 """Returns an array of update counts corresponding to each row written.
536 :param connection_id:
537 ID of the current connection.
539 :param statement_id:
540 ID of the statement to fetch rows from.
542 :param rows:
543 A list of lists corresponding to the columns to bind to the statement
544 for many rows.
546 :returns:
547 Update counts for the writes.
548 """
549 request = requests_pb2.ExecuteBatchRequest()
550 request.statement_id = statement_id
551 request.connection_id = connection_id
552 if rows is not None:
553 for row in rows:
554 batch = requests_pb2.UpdateBatch()
555 for col in row:
556 batch.parameter_values.append(col)
557 request.updates.append(batch)
559 response_data = self._apply(request)
560 response = responses_pb2.ExecuteBatchResponse()
561 response.ParseFromString(response_data)
562 if response.missing_statement:
563 raise errors.DatabaseError('ExecuteBatch reported missing statement', -1)
564 return response.update_counts
566 def fetch(self, connection_id, statement_id, offset=0, frame_max_size=None):
567 """Returns a frame of rows.
569 The frame describes whether there may be another frame. If there is not
570 another frame, the current iteration is done when we have finished the
571 rows in the this frame.
573 :param connection_id:
574 ID of the current connection.
576 :param statement_id:
577 ID of the statement to fetch rows from.
579 :param offset:
580 Zero-based offset of first row in the requested frame.
582 :param frame_max_size:
583 Maximum number of rows to return; negative means no limit.
585 :returns:
586 Frame data, or ``None`` if there are no more.
587 """
588 request = requests_pb2.FetchRequest()
589 request.connection_id = connection_id
590 request.statement_id = statement_id
591 request.offset = offset
592 if frame_max_size is not None:
593 request.frame_max_size = frame_max_size
595 response_data = self._apply(request)
596 response = responses_pb2.FetchResponse()
597 response.ParseFromString(response_data)
598 return response.frame
600 def commit(self, connection_id):
601 """TODO Commits the transaction
603 :param connection_id:
604 ID of the current connection.
605 """
606 request = requests_pb2.CommitRequest()
607 request.connection_id = connection_id
608 return self._apply(request)
610 def rollback(self, connection_id):
611 """TODO Rolls back the transaction
613 :param connection_id:
614 ID of the current connection.
615 """
616 request = requests_pb2.RollbackRequest()
617 request.connection_id = connection_id
618 return self._apply(request)

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK