PostgreSQL 食譜

使用使用者名稱與密碼驗證

食譜來源: postgresql_authenticate.py

若要連線到 PostgreSQL 資料庫,必須在 URI 中提供使用者名稱與密碼。例如,

postgresql://username:password@hostname:port/dbname

請參閱 PostgreSQL 文件 以取得完整詳細資訊。

30import os
31
32import adbc_driver_postgresql.dbapi
33
34uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
35conn = adbc_driver_postgresql.dbapi.connect(uri)
36
37with conn.cursor() as cur:
38    cur.execute("SELECT 1")
39    assert cur.fetchone() == (1,)
40
41conn.close()

從 Arrow 資料集建立/附加到資料表

食譜來源: postgresql_create_dataset_table.py

ADBC 讓您可以輕鬆地將 PyArrow 資料集載入到您的資料儲存區。

22import os
23import tempfile
24from pathlib import Path
25
26import pyarrow
27import pyarrow.csv
28import pyarrow.dataset
29import pyarrow.feather
30import pyarrow.parquet
31
32import adbc_driver_postgresql.dbapi
33
34uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
35conn = adbc_driver_postgresql.dbapi.connect(uri)

為了測試目的,我們先確保我們即將使用的資料表不存在。

39with conn.cursor() as cur:
40    cur.execute("DROP TABLE IF EXISTS csvtable")
41    cur.execute("DROP TABLE IF EXISTS ipctable")
42    cur.execute("DROP TABLE IF EXISTS pqtable")
43    cur.execute("DROP TABLE IF EXISTS csvdataset")
44    cur.execute("DROP TABLE IF EXISTS ipcdataset")
45    cur.execute("DROP TABLE IF EXISTS pqdataset")
46
47conn.commit()

產生範例資料

52tempdir = tempfile.TemporaryDirectory(
53    prefix="adbc-docs-",
54    ignore_cleanup_errors=True,
55)
56root = Path(tempdir.name)
57table = pyarrow.table(
58    [
59        [1, 1, 2],
60        ["foo", "bar", "baz"],
61    ],
62    names=["ints", "strs"],
63)

首先,我們會寫入單一檔案。

67csv_file = root / "example.csv"
68pyarrow.csv.write_csv(table, csv_file)
69
70ipc_file = root / "example.arrow"
71pyarrow.feather.write_feather(table, ipc_file)
72
73parquet_file = root / "example.parquet"
74pyarrow.parquet.write_table(table, parquet_file)

我們也會產生一些分割的資料集。

 78csv_dataset = root / "csv_dataset"
 79pyarrow.dataset.write_dataset(
 80    table,
 81    csv_dataset,
 82    format="csv",
 83    partitioning=["ints"],
 84)
 85
 86ipc_dataset = root / "ipc_dataset"
 87pyarrow.dataset.write_dataset(
 88    table,
 89    ipc_dataset,
 90    format="feather",
 91    partitioning=["ints"],
 92)
 93
 94parquet_dataset = root / "parquet_dataset"
 95pyarrow.dataset.write_dataset(
 96    table,
 97    parquet_dataset,
 98    format="parquet",
 99    partitioning=["ints"],
100)

將 CSV 檔案載入到 PostgreSQL

我們可以將 pyarrow.RecordBatchReader (來自 open_csv) 直接傳遞給 adbc_ingest。我們也可以傳遞 pyarrow.dataset.Dataset,或 pyarrow.dataset.Scanner

110with conn.cursor() as cur:
111    reader = pyarrow.csv.open_csv(csv_file)
112    cur.adbc_ingest("csvtable", reader, mode="create")
113
114    reader = pyarrow.dataset.dataset(
115        csv_dataset,
116        format="csv",
117        partitioning=["ints"],
118    )
119    cur.adbc_ingest("csvdataset", reader, mode="create")
120
121conn.commit()
122
123with conn.cursor() as cur:
124    cur.execute("SELECT ints, strs FROM csvtable ORDER BY ints, strs ASC")
125    assert cur.fetchall() == [(1, "bar"), (1, "foo"), (2, "baz")]
126
127    cur.execute("SELECT ints, strs FROM csvdataset ORDER BY ints, strs ASC")
128    assert cur.fetchall() == [(1, "bar"), (1, "foo"), (2, "baz")]

將 Arrow IPC (Feather) 檔案載入到 PostgreSQL

133with conn.cursor() as cur:
134    reader = pyarrow.ipc.RecordBatchFileReader(ipc_file)

由於 PyArrow API 中的怪異之處,我們必須將檔案讀取到記憶體中。

137    cur.adbc_ingest("ipctable", reader.read_all(), mode="create")

但是,Dataset API 會將資料串流到記憶體中,然後再進入 PostgreSQL。

141    reader = pyarrow.dataset.dataset(
142        ipc_dataset,
143        format="feather",
144        partitioning=["ints"],
145    )
146    cur.adbc_ingest("ipcdataset", reader, mode="create")
147
148conn.commit()
149
150with conn.cursor() as cur:
151    cur.execute("SELECT ints, strs FROM ipctable ORDER BY ints, strs ASC")
152    assert cur.fetchall() == [(1, "bar"), (1, "foo"), (2, "baz")]
153
154    cur.execute("SELECT ints, strs FROM ipcdataset ORDER BY ints, strs ASC")
155    assert cur.fetchall() == [(1, "bar"), (1, "foo"), (2, "baz")]

將 Parquet 檔案載入到 PostgreSQL

160with conn.cursor() as cur:
161    reader = pyarrow.parquet.ParquetFile(parquet_file)
162    cur.adbc_ingest("pqtable", reader.iter_batches(), mode="create")
163
164    reader = pyarrow.dataset.dataset(
165        parquet_dataset,
166        format="parquet",
167        partitioning=["ints"],
168    )
169    cur.adbc_ingest("pqdataset", reader, mode="create")
170
171conn.commit()
172
173with conn.cursor() as cur:
174    cur.execute("SELECT ints, strs FROM pqtable ORDER BY ints, strs ASC")
175    assert cur.fetchall() == [(1, "bar"), (1, "foo"), (2, "baz")]
176
177    cur.execute("SELECT ints, strs FROM pqdataset ORDER BY ints, strs ASC")
178    assert cur.fetchall() == [(1, "bar"), (1, "foo"), (2, "baz")]

清理

183conn.close()
184tempdir.cleanup()

從 Arrow 資料表建立/附加到資料表

食譜來源: postgresql_create_append_table.py

ADBC 允許使用 Arrow 資料表來建立及附加到資料庫資料表。

22import os
23
24import pyarrow
25
26import adbc_driver_postgresql.dbapi
27
28uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
29conn = adbc_driver_postgresql.dbapi.connect(uri)

為了測試目的,我們先確保我們即將使用的資料表不存在。

33with conn.cursor() as cur:
34    cur.execute("DROP TABLE IF EXISTS example")
35    cur.execute("DROP TABLE IF EXISTS example2")

現在我們可以建立資料表。

38with conn.cursor() as cur:
39    data = pyarrow.table(
40        [
41            [1, 2, None, 4],
42        ],
43        schema=pyarrow.schema(
44            [
45                ("ints", "int32"),
46            ]
47        ),
48    )
49    cur.adbc_ingest("example", data, mode="create")
50
51conn.commit()

在擷取之後,我們可以取得結果。

54with conn.cursor() as cur:
55    cur.execute("SELECT * FROM example")
56    assert cur.fetchone() == (1,)
57    assert cur.fetchone() == (2,)
58
59    cur.execute("SELECT COUNT(*) FROM example")
60    assert cur.fetchone() == (4,)

如果我們再次嘗試擷取,則會失敗,因為資料表已存在。

64with conn.cursor() as cur:
65    try:
66        cur.adbc_ingest("example", data, mode="create")
67    except conn.ProgrammingError:
68        pass
69    else:
70        raise RuntimeError("Should have failed!")
71
72conn.rollback()

相反地,我們可以附加到資料表。

75with conn.cursor() as cur:
76    cur.adbc_ingest("example", data, mode="append")
77
78    cur.execute("SELECT COUNT(*) FROM example")
79    assert cur.fetchone() == (8,)

我們也可以選擇在資料表不存在時建立資料表,否則就附加。

84with conn.cursor() as cur:
85    cur.adbc_ingest("example2", data, mode="create_append")
86
87    cur.execute("SELECT COUNT(*) FROM example2")
88    assert cur.fetchone() == (4,)
89
90    cur.adbc_ingest("example2", data, mode="create_append")
91
92    cur.execute("SELECT COUNT(*) FROM example2")
93    assert cur.fetchone() == (8,)

最後,我們可以取代資料表。

 97with conn.cursor() as cur:
 98    cur.adbc_ingest("example", data.slice(0, 2), mode="replace")
 99
100    cur.execute("SELECT COUNT(*) FROM example")
101    assert cur.fetchone() == (2,)
102
103conn.close()

建立/附加到暫時資料表

食譜來源: postgresql_create_temp_table.py

ADBC 也允許建立及附加到暫時資料表。

21import os
22
23import pyarrow
24
25import adbc_driver_postgresql.dbapi
26
27uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
28conn = adbc_driver_postgresql.dbapi.connect(uri)

為了測試目的,我們先確保我們即將使用的資料表不存在。

32with conn.cursor() as cur:
33    cur.execute("DROP TABLE IF EXISTS example")

若要建立暫時資料表,只需指定選項 “temporary”。

36data = pyarrow.table(
37    [
38        [1, 2, None, 4],
39    ],
40    schema=pyarrow.schema(
41        [
42            ("ints", "int32"),
43        ]
44    ),
45)
46
47with conn.cursor() as cur:
48    cur.adbc_ingest("example", data, mode="create", temporary=True)
49
50conn.commit()

在擷取之後,我們可以取得結果。

53with conn.cursor() as cur:
54    cur.execute("SELECT * FROM example")
55    assert cur.fetchone() == (1,)
56    assert cur.fetchone() == (2,)
57
58    cur.execute("SELECT COUNT(*) FROM example")
59    assert cur.fetchone() == (4,)

暫時資料表與一般資料表是分開的,即使它們具有相同的名稱。

64with conn.cursor() as cur:
65    cur.adbc_ingest("example", data.slice(0, 2), mode="create", temporary=False)
66
67conn.commit()
68
69with conn.cursor() as cur:

因為我們有兩個名稱相同的資料表,所以我們必須在此明確參考一般的暫時資料表。

72    cur.execute("SELECT COUNT(*) FROM public.example")
73    assert cur.fetchone() == (2,)
74
75    cur.execute("SELECT COUNT(*) FROM example")
76    assert cur.fetchone() == (4,)
77
78conn.close()

關閉連線後,暫時資料表會隱含地被捨棄。如果我們重新連線,資料表將不存在;我們只會看到「一般」資料表。

83with adbc_driver_postgresql.dbapi.connect(uri) as conn:
84    with conn.cursor() as cur:
85        cur.execute("SELECT COUNT(*) FROM example")
86        assert cur.fetchone() == (2,)

所有一般的擷取選項也適用於暫時資料表。請參閱 從 Arrow 資料集建立/附加到資料表 以取得更多範例。

使用繫結參數執行陳述式

食譜來源: postgresql_execute_bind.py

ADBC 允許使用 Python 和 Arrow 值作為繫結參數。目前,PostgreSQL 驅動程式僅支援用於不產生結果集的查詢的繫結參數。

24import os
25
26import pyarrow
27
28import adbc_driver_postgresql.dbapi
29
30uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
31conn = adbc_driver_postgresql.dbapi.connect(uri)

我們將建立一個範例資料表來測試。

34with conn.cursor() as cur:
35    cur.execute("DROP TABLE IF EXISTS example")
36    cur.execute("CREATE TABLE example (ints INT, bigints BIGINT)")
37
38conn.commit()

我們可以繫結 Python 值

41with conn.cursor() as cur:
42    cur.executemany("INSERT INTO example VALUES ($1, $2)", [(1, 2), (3, 4)])
43
44    cur.execute("SELECT SUM(ints) FROM example")
45    assert cur.fetchone() == (4,)

注意

如果您習慣使用格式字串樣式 %s 語法 (例如 psycopg 等程式庫用於繫結參數),請注意這不受支援 — 僅支援 PostgreSQL 原生的 $1 語法。

我們也可以繫結 Arrow 值

52with conn.cursor() as cur:
53    data = pyarrow.record_batch(
54        [
55            [5, 6],
56            [7, 8],
57        ],
58        names=["$1", "$2"],
59    )
60    cur.executemany("INSERT INTO example VALUES ($1, $2)", data)
61
62    cur.execute("SELECT SUM(ints) FROM example")
63    assert cur.fetchone() == (15,)
64
65conn.close()

取得資料表的 Arrow 綱要

食譜來源: postgresql_get_table_schema.py

ADBC 讓您可以取得資料表的綱要作為 Arrow 綱要。

22import os
23
24import pyarrow
25
26import adbc_driver_postgresql.dbapi
27
28uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
29conn = adbc_driver_postgresql.dbapi.connect(uri)

我們將建立一些範例資料表來測試。

32with conn.cursor() as cur:
33    cur.execute("DROP TABLE IF EXISTS example")
34    cur.execute("CREATE TABLE example (ints INT, bigints BIGINT)")
35
36    cur.execute("CREATE SCHEMA IF NOT EXISTS other_schema")
37    cur.execute("DROP TABLE IF EXISTS other_schema.example")
38    cur.execute("CREATE TABLE other_schema.example (strings TEXT, values INT)")
39
40conn.commit()

預設情況下,會假設「作用中」的目錄/綱要。

43assert conn.adbc_get_table_schema("example") == pyarrow.schema(
44    [
45        ("ints", "int32"),
46        ("bigints", "int64"),
47    ]
48)

我們可以明確指定 PostgreSQL 綱要,以取得不同命名空間中資料表的 Arrow 綱要。

注意

在 PostgreSQL 中,您只能查詢您連線的資料庫 (目錄)。因此我們無法在此指定目錄 (或者,這樣做沒有意義)。

請注意,NUMERIC 欄位會讀取為字串,因為 PostgreSQL 小數點不會對應到 Arrow 小數點。

59assert conn.adbc_get_table_schema(
60    "example",
61    db_schema_filter="other_schema",
62) == pyarrow.schema(
63    [
64        ("strings", "string"),
65        ("values", "int32"),
66    ]
67)
68
69conn.close()

取得查詢的 Arrow 綱要

食譜來源: postgresql_get_query_schema.py

ADBC 讓您可以取得結果集的綱要,而無需執行查詢。

22import os
23
24import pyarrow
25
26import adbc_driver_postgresql.dbapi
27
28uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
29conn = adbc_driver_postgresql.dbapi.connect(uri)

我們將建立一個範例資料表來測試。

32with conn.cursor() as cur:
33    cur.execute("DROP TABLE IF EXISTS example")
34    cur.execute("CREATE TABLE example (ints INT, bigints BIGINT)")
35
36conn.commit()
37
38expected = pyarrow.schema(
39    [
40        ("ints", "int32"),
41        ("bigints", "int64"),
42    ]
43)
44
45with conn.cursor() as cur:
46    assert cur.adbc_execute_schema("SELECT * FROM example") == expected

PostgreSQL 在此處不知道類型,因此它只會傳回一個猜測。

49    assert cur.adbc_execute_schema("SELECT $1 AS res") == pyarrow.schema(
50        [
51            ("res", "string"),
52        ]
53    )
54
55conn.close()

列出目錄、綱要和資料表

食譜來源: postgresql_list_catalogs.py

ADBC 允許列出資料庫中的資料表、目錄和綱要。

22import os
23
24import adbc_driver_postgresql.dbapi
25
26uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
27conn = adbc_driver_postgresql.dbapi.connect(uri)

我們將建立一個範例資料表來尋找。

30with conn.cursor() as cur:
31    cur.execute("DROP TABLE IF EXISTS example")
32    cur.execute("CREATE TABLE example (ints INT, bigints BIGINT)")
33
34conn.commit()

資料以 PyArrow RecordBatchReader 的形式提供。

37objects = conn.adbc_get_objects(depth="all").read_all()

為了方便起見,我們會將其轉換為純 Python 資料。

40objects = objects.to_pylist()
41catalog = objects[0]
42assert catalog["catalog_name"] == "postgres"
43
44db_schema = catalog["catalog_db_schemas"][0]
45assert db_schema["db_schema_name"] == "public"
46
47tables = db_schema["db_schema_tables"]
48example = [table for table in tables if table["table_name"] == "example"]
49assert len(example) == 1
50example = example[0]
51
52assert example["table_columns"][0]["column_name"] == "ints"
53assert example["table_columns"][1]["column_name"] == "bigints"
54
55conn.close()

使用 SQLAlchemy 的連線池

食譜來源: postgresql_pool.py

ADBC 未實作連線池,因為這通常不是 DBAPI 驅動程式的功能。相反地,請使用第三方連線池,例如內建於 SQLAlchemy 中的連線池。

26import os
27
28import sqlalchemy.pool
29
30import adbc_driver_postgresql.dbapi
31
32uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
33
34source = adbc_driver_postgresql.dbapi.connect(uri)

adbc_driver_manager.dbapi.Connection.adbc_clone() 從現有的連線開啟新的連線,並在可能的情況下共用內部資源。例如,PostgreSQL 驅動程式將共用內部 OID 快取,從而節省一些連線的額外負擔。

39pool = sqlalchemy.pool.QueuePool(source.adbc_clone, max_overflow=1, pool_size=2)

我們現在可以從池中取得連線;SQLAlchemy 會覆寫 close() 以將連線傳回池中。

注意

與底層 ADBC 連線不同,SQLAlchemy 的包裝器不支援內容管理器協定。

47conn = pool.connect()
48
49assert pool.checkedin() == 0
50assert pool.checkedout() == 1
51
52with conn.cursor() as cur:
53    cur.execute("SELECT 1")
54    assert cur.fetchone() == (1,)
55
56conn.close()
57
58assert pool.checkedin() == 1
59assert pool.checkedout() == 0
60
61source.close()

使用 Pandas 和 ADBC

食譜來源: postgresql_pandas.py

ADBC 已整合到 pandas (一個熱門的資料框架程式庫) 中。Pandas 可以使用 ADBC 與 PostgreSQL 和其他資料庫交換資料。與使用 SQLAlchemy 或其他選項相比,將 ADBC 與 pandas 一起使用可以獲得更好的效能,例如避免過多轉換為和從 Python 物件轉換。

28import os
29
30import pandas as pd
31
32import adbc_driver_postgresql.dbapi
33
34uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
35conn = adbc_driver_postgresql.dbapi.connect(uri)

我們將使用 pd.DataFrame.to_sql 來建立範例資料表。

40data = pd.DataFrame(
41    {
42        "ints": [1, 2, None, 4],
43        "strs": ["a", "b", "c", "d"],
44    }
45)
46data.to_sql("example", conn, if_exists="replace")
47conn.commit()

建立資料表後,我們可以將 ADBC 連線和 SQL 查詢傳遞給 pd.read_sql,以取得作為 pandas DataFrame 的結果集。

53df = pd.read_sql("SELECT * FROM example WHERE ints > 1", conn)
54
55assert len(df) == 2
56
57conn.close()

與 ADBC 介面相比,pandas 提供了更方便且更高階的 API,特別是對於那些已經在使用 pandas 的人。

使用 Polars 和 ADBC

食譜來源: postgresql_polars.py

ADBC 可以與 Polars (一個以 Rust 撰寫的資料框架程式庫) 一起使用。根據其文件

如果後端支援直接傳回 Arrow 資料,則將使用此功能有效率地實例化 DataFrame;否則,DataFrame 會從逐列資料初始化。

顯然,ADBC 直接傳回 Arrow 資料,使 ADBC 和 Polars 自然地契合在一起。

32import os
33
34import polars as pl
35
36uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]

我們將使用 Polars 和 polars.DataFrame.write_database() 來建立範例資料表。我們不需要使用 Polars 自己開啟 ADBC 連線。

42data = pl.DataFrame(
43    {
44        "ints": [1, 2, None, 4],
45        "strs": ["a", "b", "c", "d"],
46    }
47)
48data.write_database("example", uri, engine="adbc", if_table_exists="replace")

建立資料表後,我們可以使用 polars.read_database_uri() 來擷取結果。同樣地,我們可以只傳遞 URI 並告知 Polars 為我們管理 ADBC。

54df = pl.read_database_uri("SELECT * FROM example WHERE ints > 1", uri, engine="adbc")
55
56assert len(df) == 2