PostgreSQL 食譜¶
使用使用者名稱與密碼驗證¶
食譜來源: postgresql_authenticate.py
若要連線到 PostgreSQL 資料庫,必須在 URI 中提供使用者名稱與密碼。例如,
postgresql://username:password@hostname:port/dbname
請參閱 PostgreSQL 文件 以取得完整詳細資訊。
30import os
31
32import adbc_driver_postgresql.dbapi
33
34uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
35conn = adbc_driver_postgresql.dbapi.connect(uri)
36
37with conn.cursor() as cur:
38 cur.execute("SELECT 1")
39 assert cur.fetchone() == (1,)
40
41conn.close()
從 Arrow 資料集建立/附加到資料表¶
食譜來源: postgresql_create_dataset_table.py
ADBC 讓您可以輕鬆地將 PyArrow 資料集載入到您的資料儲存區。
22import os
23import tempfile
24from pathlib import Path
25
26import pyarrow
27import pyarrow.csv
28import pyarrow.dataset
29import pyarrow.feather
30import pyarrow.parquet
31
32import adbc_driver_postgresql.dbapi
33
34uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
35conn = adbc_driver_postgresql.dbapi.connect(uri)
為了測試目的,我們先確保我們即將使用的資料表不存在。
39with conn.cursor() as cur:
40 cur.execute("DROP TABLE IF EXISTS csvtable")
41 cur.execute("DROP TABLE IF EXISTS ipctable")
42 cur.execute("DROP TABLE IF EXISTS pqtable")
43 cur.execute("DROP TABLE IF EXISTS csvdataset")
44 cur.execute("DROP TABLE IF EXISTS ipcdataset")
45 cur.execute("DROP TABLE IF EXISTS pqdataset")
46
47conn.commit()
產生範例資料¶
52tempdir = tempfile.TemporaryDirectory(
53 prefix="adbc-docs-",
54 ignore_cleanup_errors=True,
55)
56root = Path(tempdir.name)
57table = pyarrow.table(
58 [
59 [1, 1, 2],
60 ["foo", "bar", "baz"],
61 ],
62 names=["ints", "strs"],
63)
首先,我們會寫入單一檔案。
67csv_file = root / "example.csv"
68pyarrow.csv.write_csv(table, csv_file)
69
70ipc_file = root / "example.arrow"
71pyarrow.feather.write_feather(table, ipc_file)
72
73parquet_file = root / "example.parquet"
74pyarrow.parquet.write_table(table, parquet_file)
我們也會產生一些分割的資料集。
78csv_dataset = root / "csv_dataset"
79pyarrow.dataset.write_dataset(
80 table,
81 csv_dataset,
82 format="csv",
83 partitioning=["ints"],
84)
85
86ipc_dataset = root / "ipc_dataset"
87pyarrow.dataset.write_dataset(
88 table,
89 ipc_dataset,
90 format="feather",
91 partitioning=["ints"],
92)
93
94parquet_dataset = root / "parquet_dataset"
95pyarrow.dataset.write_dataset(
96 table,
97 parquet_dataset,
98 format="parquet",
99 partitioning=["ints"],
100)
將 CSV 檔案載入到 PostgreSQL¶
我們可以將 pyarrow.RecordBatchReader
(來自 open_csv
) 直接傳遞給 adbc_ingest
。我們也可以傳遞 pyarrow.dataset.Dataset
,或 pyarrow.dataset.Scanner
。
110with conn.cursor() as cur:
111 reader = pyarrow.csv.open_csv(csv_file)
112 cur.adbc_ingest("csvtable", reader, mode="create")
113
114 reader = pyarrow.dataset.dataset(
115 csv_dataset,
116 format="csv",
117 partitioning=["ints"],
118 )
119 cur.adbc_ingest("csvdataset", reader, mode="create")
120
121conn.commit()
122
123with conn.cursor() as cur:
124 cur.execute("SELECT ints, strs FROM csvtable ORDER BY ints, strs ASC")
125 assert cur.fetchall() == [(1, "bar"), (1, "foo"), (2, "baz")]
126
127 cur.execute("SELECT ints, strs FROM csvdataset ORDER BY ints, strs ASC")
128 assert cur.fetchall() == [(1, "bar"), (1, "foo"), (2, "baz")]
將 Arrow IPC (Feather) 檔案載入到 PostgreSQL¶
133with conn.cursor() as cur:
134 reader = pyarrow.ipc.RecordBatchFileReader(ipc_file)
由於 PyArrow API 中的怪異之處,我們必須將檔案讀取到記憶體中。
137 cur.adbc_ingest("ipctable", reader.read_all(), mode="create")
但是,Dataset API 會將資料串流到記憶體中,然後再進入 PostgreSQL。
141 reader = pyarrow.dataset.dataset(
142 ipc_dataset,
143 format="feather",
144 partitioning=["ints"],
145 )
146 cur.adbc_ingest("ipcdataset", reader, mode="create")
147
148conn.commit()
149
150with conn.cursor() as cur:
151 cur.execute("SELECT ints, strs FROM ipctable ORDER BY ints, strs ASC")
152 assert cur.fetchall() == [(1, "bar"), (1, "foo"), (2, "baz")]
153
154 cur.execute("SELECT ints, strs FROM ipcdataset ORDER BY ints, strs ASC")
155 assert cur.fetchall() == [(1, "bar"), (1, "foo"), (2, "baz")]
將 Parquet 檔案載入到 PostgreSQL¶
160with conn.cursor() as cur:
161 reader = pyarrow.parquet.ParquetFile(parquet_file)
162 cur.adbc_ingest("pqtable", reader.iter_batches(), mode="create")
163
164 reader = pyarrow.dataset.dataset(
165 parquet_dataset,
166 format="parquet",
167 partitioning=["ints"],
168 )
169 cur.adbc_ingest("pqdataset", reader, mode="create")
170
171conn.commit()
172
173with conn.cursor() as cur:
174 cur.execute("SELECT ints, strs FROM pqtable ORDER BY ints, strs ASC")
175 assert cur.fetchall() == [(1, "bar"), (1, "foo"), (2, "baz")]
176
177 cur.execute("SELECT ints, strs FROM pqdataset ORDER BY ints, strs ASC")
178 assert cur.fetchall() == [(1, "bar"), (1, "foo"), (2, "baz")]
清理¶
183conn.close()
184tempdir.cleanup()
從 Arrow 資料表建立/附加到資料表¶
食譜來源: postgresql_create_append_table.py
ADBC 允許使用 Arrow 資料表來建立及附加到資料庫資料表。
22import os
23
24import pyarrow
25
26import adbc_driver_postgresql.dbapi
27
28uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
29conn = adbc_driver_postgresql.dbapi.connect(uri)
為了測試目的,我們先確保我們即將使用的資料表不存在。
33with conn.cursor() as cur:
34 cur.execute("DROP TABLE IF EXISTS example")
35 cur.execute("DROP TABLE IF EXISTS example2")
現在我們可以建立資料表。
38with conn.cursor() as cur:
39 data = pyarrow.table(
40 [
41 [1, 2, None, 4],
42 ],
43 schema=pyarrow.schema(
44 [
45 ("ints", "int32"),
46 ]
47 ),
48 )
49 cur.adbc_ingest("example", data, mode="create")
50
51conn.commit()
在擷取之後,我們可以取得結果。
54with conn.cursor() as cur:
55 cur.execute("SELECT * FROM example")
56 assert cur.fetchone() == (1,)
57 assert cur.fetchone() == (2,)
58
59 cur.execute("SELECT COUNT(*) FROM example")
60 assert cur.fetchone() == (4,)
如果我們再次嘗試擷取,則會失敗,因為資料表已存在。
64with conn.cursor() as cur:
65 try:
66 cur.adbc_ingest("example", data, mode="create")
67 except conn.ProgrammingError:
68 pass
69 else:
70 raise RuntimeError("Should have failed!")
71
72conn.rollback()
相反地,我們可以附加到資料表。
75with conn.cursor() as cur:
76 cur.adbc_ingest("example", data, mode="append")
77
78 cur.execute("SELECT COUNT(*) FROM example")
79 assert cur.fetchone() == (8,)
我們也可以選擇在資料表不存在時建立資料表,否則就附加。
84with conn.cursor() as cur:
85 cur.adbc_ingest("example2", data, mode="create_append")
86
87 cur.execute("SELECT COUNT(*) FROM example2")
88 assert cur.fetchone() == (4,)
89
90 cur.adbc_ingest("example2", data, mode="create_append")
91
92 cur.execute("SELECT COUNT(*) FROM example2")
93 assert cur.fetchone() == (8,)
最後,我們可以取代資料表。
97with conn.cursor() as cur:
98 cur.adbc_ingest("example", data.slice(0, 2), mode="replace")
99
100 cur.execute("SELECT COUNT(*) FROM example")
101 assert cur.fetchone() == (2,)
102
103conn.close()
建立/附加到暫時資料表¶
食譜來源: postgresql_create_temp_table.py
ADBC 也允許建立及附加到暫時資料表。
21import os
22
23import pyarrow
24
25import adbc_driver_postgresql.dbapi
26
27uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
28conn = adbc_driver_postgresql.dbapi.connect(uri)
為了測試目的,我們先確保我們即將使用的資料表不存在。
32with conn.cursor() as cur:
33 cur.execute("DROP TABLE IF EXISTS example")
若要建立暫時資料表,只需指定選項 “temporary”。
36data = pyarrow.table(
37 [
38 [1, 2, None, 4],
39 ],
40 schema=pyarrow.schema(
41 [
42 ("ints", "int32"),
43 ]
44 ),
45)
46
47with conn.cursor() as cur:
48 cur.adbc_ingest("example", data, mode="create", temporary=True)
49
50conn.commit()
在擷取之後,我們可以取得結果。
53with conn.cursor() as cur:
54 cur.execute("SELECT * FROM example")
55 assert cur.fetchone() == (1,)
56 assert cur.fetchone() == (2,)
57
58 cur.execute("SELECT COUNT(*) FROM example")
59 assert cur.fetchone() == (4,)
暫時資料表與一般資料表是分開的,即使它們具有相同的名稱。
64with conn.cursor() as cur:
65 cur.adbc_ingest("example", data.slice(0, 2), mode="create", temporary=False)
66
67conn.commit()
68
69with conn.cursor() as cur:
因為我們有兩個名稱相同的資料表,所以我們必須在此明確參考一般的暫時資料表。
72 cur.execute("SELECT COUNT(*) FROM public.example")
73 assert cur.fetchone() == (2,)
74
75 cur.execute("SELECT COUNT(*) FROM example")
76 assert cur.fetchone() == (4,)
77
78conn.close()
關閉連線後,暫時資料表會隱含地被捨棄。如果我們重新連線,資料表將不存在;我們只會看到「一般」資料表。
83with adbc_driver_postgresql.dbapi.connect(uri) as conn:
84 with conn.cursor() as cur:
85 cur.execute("SELECT COUNT(*) FROM example")
86 assert cur.fetchone() == (2,)
所有一般的擷取選項也適用於暫時資料表。請參閱 從 Arrow 資料集建立/附加到資料表 以取得更多範例。
使用繫結參數執行陳述式¶
食譜來源: postgresql_execute_bind.py
ADBC 允許使用 Python 和 Arrow 值作為繫結參數。目前,PostgreSQL 驅動程式僅支援用於不產生結果集的查詢的繫結參數。
24import os
25
26import pyarrow
27
28import adbc_driver_postgresql.dbapi
29
30uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
31conn = adbc_driver_postgresql.dbapi.connect(uri)
我們將建立一個範例資料表來測試。
34with conn.cursor() as cur:
35 cur.execute("DROP TABLE IF EXISTS example")
36 cur.execute("CREATE TABLE example (ints INT, bigints BIGINT)")
37
38conn.commit()
我們可以繫結 Python 值
41with conn.cursor() as cur:
42 cur.executemany("INSERT INTO example VALUES ($1, $2)", [(1, 2), (3, 4)])
43
44 cur.execute("SELECT SUM(ints) FROM example")
45 assert cur.fetchone() == (4,)
注意
如果您習慣使用格式字串樣式 %s
語法 (例如 psycopg 等程式庫用於繫結參數),請注意這不受支援 — 僅支援 PostgreSQL 原生的 $1
語法。
我們也可以繫結 Arrow 值
52with conn.cursor() as cur:
53 data = pyarrow.record_batch(
54 [
55 [5, 6],
56 [7, 8],
57 ],
58 names=["$1", "$2"],
59 )
60 cur.executemany("INSERT INTO example VALUES ($1, $2)", data)
61
62 cur.execute("SELECT SUM(ints) FROM example")
63 assert cur.fetchone() == (15,)
64
65conn.close()
取得資料表的 Arrow 綱要¶
食譜來源: postgresql_get_table_schema.py
ADBC 讓您可以取得資料表的綱要作為 Arrow 綱要。
22import os
23
24import pyarrow
25
26import adbc_driver_postgresql.dbapi
27
28uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
29conn = adbc_driver_postgresql.dbapi.connect(uri)
我們將建立一些範例資料表來測試。
32with conn.cursor() as cur:
33 cur.execute("DROP TABLE IF EXISTS example")
34 cur.execute("CREATE TABLE example (ints INT, bigints BIGINT)")
35
36 cur.execute("CREATE SCHEMA IF NOT EXISTS other_schema")
37 cur.execute("DROP TABLE IF EXISTS other_schema.example")
38 cur.execute("CREATE TABLE other_schema.example (strings TEXT, values INT)")
39
40conn.commit()
預設情況下,會假設「作用中」的目錄/綱要。
43assert conn.adbc_get_table_schema("example") == pyarrow.schema(
44 [
45 ("ints", "int32"),
46 ("bigints", "int64"),
47 ]
48)
我們可以明確指定 PostgreSQL 綱要,以取得不同命名空間中資料表的 Arrow 綱要。
注意
在 PostgreSQL 中,您只能查詢您連線的資料庫 (目錄)。因此我們無法在此指定目錄 (或者,這樣做沒有意義)。
請注意,NUMERIC 欄位會讀取為字串,因為 PostgreSQL 小數點不會對應到 Arrow 小數點。
59assert conn.adbc_get_table_schema(
60 "example",
61 db_schema_filter="other_schema",
62) == pyarrow.schema(
63 [
64 ("strings", "string"),
65 ("values", "int32"),
66 ]
67)
68
69conn.close()
取得查詢的 Arrow 綱要¶
食譜來源: postgresql_get_query_schema.py
ADBC 讓您可以取得結果集的綱要,而無需執行查詢。
22import os
23
24import pyarrow
25
26import adbc_driver_postgresql.dbapi
27
28uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
29conn = adbc_driver_postgresql.dbapi.connect(uri)
我們將建立一個範例資料表來測試。
32with conn.cursor() as cur:
33 cur.execute("DROP TABLE IF EXISTS example")
34 cur.execute("CREATE TABLE example (ints INT, bigints BIGINT)")
35
36conn.commit()
37
38expected = pyarrow.schema(
39 [
40 ("ints", "int32"),
41 ("bigints", "int64"),
42 ]
43)
44
45with conn.cursor() as cur:
46 assert cur.adbc_execute_schema("SELECT * FROM example") == expected
PostgreSQL 在此處不知道類型,因此它只會傳回一個猜測。
49 assert cur.adbc_execute_schema("SELECT $1 AS res") == pyarrow.schema(
50 [
51 ("res", "string"),
52 ]
53 )
54
55conn.close()
列出目錄、綱要和資料表¶
食譜來源: postgresql_list_catalogs.py
ADBC 允許列出資料庫中的資料表、目錄和綱要。
22import os
23
24import adbc_driver_postgresql.dbapi
25
26uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
27conn = adbc_driver_postgresql.dbapi.connect(uri)
我們將建立一個範例資料表來尋找。
30with conn.cursor() as cur:
31 cur.execute("DROP TABLE IF EXISTS example")
32 cur.execute("CREATE TABLE example (ints INT, bigints BIGINT)")
33
34conn.commit()
資料以 PyArrow RecordBatchReader 的形式提供。
37objects = conn.adbc_get_objects(depth="all").read_all()
為了方便起見,我們會將其轉換為純 Python 資料。
40objects = objects.to_pylist()
41catalog = objects[0]
42assert catalog["catalog_name"] == "postgres"
43
44db_schema = catalog["catalog_db_schemas"][0]
45assert db_schema["db_schema_name"] == "public"
46
47tables = db_schema["db_schema_tables"]
48example = [table for table in tables if table["table_name"] == "example"]
49assert len(example) == 1
50example = example[0]
51
52assert example["table_columns"][0]["column_name"] == "ints"
53assert example["table_columns"][1]["column_name"] == "bigints"
54
55conn.close()
使用 SQLAlchemy 的連線池¶
食譜來源: postgresql_pool.py
ADBC 未實作連線池,因為這通常不是 DBAPI 驅動程式的功能。相反地,請使用第三方連線池,例如內建於 SQLAlchemy 中的連線池。
26import os
27
28import sqlalchemy.pool
29
30import adbc_driver_postgresql.dbapi
31
32uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
33
34source = adbc_driver_postgresql.dbapi.connect(uri)
adbc_driver_manager.dbapi.Connection.adbc_clone()
從現有的連線開啟新的連線,並在可能的情況下共用內部資源。例如,PostgreSQL 驅動程式將共用內部 OID 快取,從而節省一些連線的額外負擔。
39pool = sqlalchemy.pool.QueuePool(source.adbc_clone, max_overflow=1, pool_size=2)
我們現在可以從池中取得連線;SQLAlchemy 會覆寫 close()
以將連線傳回池中。
注意
與底層 ADBC 連線不同,SQLAlchemy 的包裝器不支援內容管理器協定。
47conn = pool.connect()
48
49assert pool.checkedin() == 0
50assert pool.checkedout() == 1
51
52with conn.cursor() as cur:
53 cur.execute("SELECT 1")
54 assert cur.fetchone() == (1,)
55
56conn.close()
57
58assert pool.checkedin() == 1
59assert pool.checkedout() == 0
60
61source.close()
使用 Pandas 和 ADBC¶
食譜來源: postgresql_pandas.py
ADBC 已整合到 pandas (一個熱門的資料框架程式庫) 中。Pandas 可以使用 ADBC 與 PostgreSQL 和其他資料庫交換資料。與使用 SQLAlchemy 或其他選項相比,將 ADBC 與 pandas 一起使用可以獲得更好的效能,例如避免過多轉換為和從 Python 物件轉換。
28import os
29
30import pandas as pd
31
32import adbc_driver_postgresql.dbapi
33
34uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
35conn = adbc_driver_postgresql.dbapi.connect(uri)
我們將使用 pd.DataFrame.to_sql
來建立範例資料表。
40data = pd.DataFrame(
41 {
42 "ints": [1, 2, None, 4],
43 "strs": ["a", "b", "c", "d"],
44 }
45)
46data.to_sql("example", conn, if_exists="replace")
47conn.commit()
建立資料表後,我們可以將 ADBC 連線和 SQL 查詢傳遞給 pd.read_sql
,以取得作為 pandas DataFrame 的結果集。
53df = pd.read_sql("SELECT * FROM example WHERE ints > 1", conn)
54
55assert len(df) == 2
56
57conn.close()
與 ADBC 介面相比,pandas 提供了更方便且更高階的 API,特別是對於那些已經在使用 pandas 的人。
使用 Polars 和 ADBC¶
食譜來源: postgresql_polars.py
ADBC 可以與 Polars (一個以 Rust 撰寫的資料框架程式庫) 一起使用。根據其文件
如果後端支援直接傳回 Arrow 資料,則將使用此功能有效率地實例化 DataFrame;否則,DataFrame 會從逐列資料初始化。
顯然,ADBC 直接傳回 Arrow 資料,使 ADBC 和 Polars 自然地契合在一起。
32import os
33
34import polars as pl
35
36uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
我們將使用 Polars 和 polars.DataFrame.write_database()
來建立範例資料表。我們不需要使用 Polars 自己開啟 ADBC 連線。
42data = pl.DataFrame(
43 {
44 "ints": [1, 2, None, 4],
45 "strs": ["a", "b", "c", "d"],
46 }
47)
48data.write_database("example", uri, engine="adbc", if_table_exists="replace")
建立資料表後,我們可以使用 polars.read_database_uri()
來擷取結果。同樣地,我們可以只傳遞 URI 並告知 Polars 為我們管理 ADBC。
54df = pl.read_database_uri("SELECT * FROM example WHERE ints > 1", uri, engine="adbc")
55
56assert len(df) == 2