Flight SQL 食譜¶
其中一些食譜是針對由 SQLite 支援的示範 Flight SQL 服務編寫的。您可以自行執行,如下所示
$ go install github.com/apache/arrow/go/v${ARROW_MAJOR_VERSION}/arrow/flight/flightsql/example/cmd/sqlite_flightsql_server@latest
$ sqlite_flightsql_server -host 0.0.0.0 -port 8080
其他食譜使用 OSS 版本的 Dremio
$ docker run -p 9047:9047 -p 31010:31010 -p 45678:45678 dremio/dremio-oss
如果您已檢出 ADBC 儲存庫並安裝了 Docker Compose,則可以使用我們的設定來執行這兩項服務
$ docker compose up --detach --wait dremio dremio-init flightsql-sqlite-test
連線到未受保護的 Flight SQL 服務¶
食譜來源:flightsql_sqlite_connect.py
要連線到未受保護的 Flight SQL 服務,只需提供 URI 即可。
22import os
23
24import adbc_driver_flightsql.dbapi
25
26uri = os.environ["ADBC_SQLITE_FLIGHTSQL_URI"]
27conn = adbc_driver_flightsql.dbapi.connect(uri)
然後我們可以執行一個簡單的查詢。
31with conn.cursor() as cur:
32 cur.execute("SELECT 1")
33
34 assert cur.fetchone() == (1,)
35
36conn.close()
連線到具有使用者名稱和密碼的 Flight SQL 服務¶
食譜來源:flightsql_dremio_connect.py
Dremio 需要使用者名稱和密碼。要連線到具有身份驗證的 Flight SQL 服務,請在連線時提供選項。
23import os
24
25import adbc_driver_flightsql.dbapi
26import adbc_driver_manager
27
28uri = os.environ["ADBC_DREMIO_FLIGHTSQL_URI"]
29username = os.environ["ADBC_DREMIO_FLIGHTSQL_USER"]
30password = os.environ["ADBC_DREMIO_FLIGHTSQL_PASS"]
31conn = adbc_driver_flightsql.dbapi.connect(
32 uri,
33 db_kwargs={
34 adbc_driver_manager.DatabaseOptions.USERNAME.value: username,
35 adbc_driver_manager.DatabaseOptions.PASSWORD.value: password,
36 },
37)
然後我們可以執行一個簡單的查詢。
41with conn.cursor() as cur:
42 cur.execute("SELECT 1")
43
44 assert cur.fetchone() == (1,)
45
46conn.close()
設定逾時和其他選項¶
食譜來源:flightsql_sqlite_options.py
Flight SQL 驅動程式支援各種選項。
22import os
23
24import adbc_driver_flightsql.dbapi
25from adbc_driver_flightsql import ConnectionOptions, DatabaseOptions
26
27uri = os.environ["ADBC_SQLITE_FLIGHTSQL_URI"]
我們可以啟用 Cookie 支援,某些伺服器實作需要此支援。
29conn = adbc_driver_flightsql.dbapi.connect(
30 uri,
31 db_kwargs={DatabaseOptions.WITH_COOKIE_MIDDLEWARE.value: "true"},
32)
其他選項在連線或語句上設定。
例如,我們可以將自訂標頭新增至所有傳出的請求。
37custom_header = f"{ConnectionOptions.RPC_CALL_HEADER_PREFIX.value}x-custom-header"
38conn.adbc_connection.set_options(**{custom_header: "value"})
我們也可以設定逾時。這些以浮點秒為單位。
41conn.adbc_connection.set_options(
42 **{
43 ConnectionOptions.TIMEOUT_FETCH.value: 30.0,
44 ConnectionOptions.TIMEOUT_QUERY.value: 30.0,
45 ConnectionOptions.TIMEOUT_UPDATE.value: 30.0,
46 }
47)
這些選項將適用於我們建立的所有游標。
51with conn.cursor() as cur:
52 cur.execute("SELECT 1")
53
54 assert cur.fetchone() == (1,)
55
56conn.close()
設定最大 gRPC 訊息大小¶
食譜來源:flightsql_sqlite_max_msg_size.py
預設情況下,Flight SQL 驅動程式會限制傳入/傳出訊息的大小。如果超出這些限制,您可能會看到類似這樣的錯誤
INTERNAL: [FlightSQL] grpc: received message larger than max
可以調整這些限制以避免這種情況。
27import os
28
29import adbc_driver_flightsql.dbapi
30from adbc_driver_flightsql import DatabaseOptions
31
32uri = os.environ["ADBC_SQLITE_FLIGHTSQL_URI"]
此查詢每個批次產生約 16 MiB,這將觸發預設限制。
37query = """
38WITH RECURSIVE generate_series(value) AS (
39 SELECT 1
40 UNION ALL
41 SELECT value + 1 FROM generate_series
42 WHERE value + 1 <= 2048
43)
44SELECT printf('%.*c', 16384, 'x') FROM generate_series
45"""
當我們執行查詢時,會收到錯誤。
49conn = adbc_driver_flightsql.dbapi.connect(uri)
50with conn.cursor() as cur:
51 cur.execute(query)
52
53 try:
54 cur.fetchallarrow()
55 except adbc_driver_flightsql.dbapi.InternalError:
56 # This exception is expected.
57 pass
58 else:
59 assert False, "Did not raise expected exception"
60
61conn.close()
我們可以改為在連線時變更限制。
65conn = adbc_driver_flightsql.dbapi.connect(
66 uri,
67 db_kwargs={DatabaseOptions.WITH_MAX_MSG_SIZE.value: "2147483647"},
68)
69with conn.cursor() as cur:
70 cur.execute(query)
71
72 assert len(cur.fetchallarrow()) == 2048
73
74conn.close()