PostgreSQL 食谱

使用用户名和密码进行身份验证

食谱来源: postgresql_authenticate.py

要连接到 PostgreSQL 数据库,必须在 URI 中提供用户名和密码。例如,

postgresql://username:password@hostname:port/dbname

有关详细信息,请参见 PostgreSQL 文档

30import os
31
32import adbc_driver_postgresql.dbapi
33
34uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
35conn = adbc_driver_postgresql.dbapi.connect(uri)
36
37with conn.cursor() as cur:
38    cur.execute("SELECT 1")
39    assert cur.fetchone() == (1,)
40
41conn.close()

从 Arrow 数据集创建/追加到表

食谱来源: postgresql_create_dataset_table.py

ADBC 使得将 PyArrow 数据集加载到您的数据存储中变得容易。

22import os
23import tempfile
24from pathlib import Path
25
26import pyarrow
27import pyarrow.csv
28import pyarrow.dataset
29import pyarrow.feather
30import pyarrow.parquet
31
32import adbc_driver_postgresql.dbapi
33
34uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
35conn = adbc_driver_postgresql.dbapi.connect(uri)

为了测试目的,我们首先要确保我们要使用的表不存在。

39with conn.cursor() as cur:
40    cur.execute("DROP TABLE IF EXISTS csvtable")
41    cur.execute("DROP TABLE IF EXISTS ipctable")
42    cur.execute("DROP TABLE IF EXISTS pqtable")
43    cur.execute("DROP TABLE IF EXISTS csvdataset")
44    cur.execute("DROP TABLE IF EXISTS ipcdataset")
45    cur.execute("DROP TABLE IF EXISTS pqdataset")
46
47conn.commit()

生成示例数据

52tempdir = tempfile.TemporaryDirectory(
53    prefix="adbc-docs-",
54    ignore_cleanup_errors=True,
55)
56root = Path(tempdir.name)
57table = pyarrow.table(
58    [
59        [1, 1, 2],
60        ["foo", "bar", "baz"],
61    ],
62    names=["ints", "strs"],
63)

首先,我们将写入单个文件。

67csv_file = root / "example.csv"
68pyarrow.csv.write_csv(table, csv_file)
69
70ipc_file = root / "example.arrow"
71pyarrow.feather.write_feather(table, ipc_file)
72
73parquet_file = root / "example.parquet"
74pyarrow.parquet.write_table(table, parquet_file)

我们还将生成一些分区数据集。

 78csv_dataset = root / "csv_dataset"
 79pyarrow.dataset.write_dataset(
 80    table,
 81    csv_dataset,
 82    format="csv",
 83    partitioning=["ints"],
 84)
 85
 86ipc_dataset = root / "ipc_dataset"
 87pyarrow.dataset.write_dataset(
 88    table,
 89    ipc_dataset,
 90    format="feather",
 91    partitioning=["ints"],
 92)
 93
 94parquet_dataset = root / "parquet_dataset"
 95pyarrow.dataset.write_dataset(
 96    table,
 97    parquet_dataset,
 98    format="parquet",
 99    partitioning=["ints"],
100)

将 CSV 文件加载到 PostgreSQL

我们可以直接将 pyarrow.RecordBatchReader(来自 open_csv)传递给 adbc_ingest。我们也可以传递 pyarrow.dataset.Datasetpyarrow.dataset.Scanner

110with conn.cursor() as cur:
111    reader = pyarrow.csv.open_csv(csv_file)
112    cur.adbc_ingest("csvtable", reader, mode="create")
113
114    reader = pyarrow.dataset.dataset(
115        csv_dataset,
116        format="csv",
117        partitioning=["ints"],
118    )
119    cur.adbc_ingest("csvdataset", reader, mode="create")
120
121conn.commit()
122
123with conn.cursor() as cur:
124    cur.execute("SELECT ints, strs FROM csvtable ORDER BY ints, strs ASC")
125    assert cur.fetchall() == [(1, "bar"), (1, "foo"), (2, "baz")]
126
127    cur.execute("SELECT ints, strs FROM csvdataset ORDER BY ints, strs ASC")
128    assert cur.fetchall() == [(1, "bar"), (1, "foo"), (2, "baz")]

将 Arrow IPC(Feather)文件加载到 PostgreSQL

133with conn.cursor() as cur:
134    reader = pyarrow.ipc.RecordBatchFileReader(ipc_file)

由于 PyArrow API 中的怪癖,我们必须将文件读入内存。

137    cur.adbc_ingest("ipctable", reader.read_all(), mode="create")

但是,数据集 API 会将数据流式传输到内存,然后流式传输到 PostgreSQL。

141    reader = pyarrow.dataset.dataset(
142        ipc_dataset,
143        format="feather",
144        partitioning=["ints"],
145    )
146    cur.adbc_ingest("ipcdataset", reader, mode="create")
147
148conn.commit()
149
150with conn.cursor() as cur:
151    cur.execute("SELECT ints, strs FROM ipctable ORDER BY ints, strs ASC")
152    assert cur.fetchall() == [(1, "bar"), (1, "foo"), (2, "baz")]
153
154    cur.execute("SELECT ints, strs FROM ipcdataset ORDER BY ints, strs ASC")
155    assert cur.fetchall() == [(1, "bar"), (1, "foo"), (2, "baz")]

将 Parquet 文件加载到 PostgreSQL

160with conn.cursor() as cur:
161    reader = pyarrow.parquet.ParquetFile(parquet_file)
162    cur.adbc_ingest("pqtable", reader.iter_batches(), mode="create")
163
164    reader = pyarrow.dataset.dataset(
165        parquet_dataset,
166        format="parquet",
167        partitioning=["ints"],
168    )
169    cur.adbc_ingest("pqdataset", reader, mode="create")
170
171conn.commit()
172
173with conn.cursor() as cur:
174    cur.execute("SELECT ints, strs FROM pqtable ORDER BY ints, strs ASC")
175    assert cur.fetchall() == [(1, "bar"), (1, "foo"), (2, "baz")]
176
177    cur.execute("SELECT ints, strs FROM pqdataset ORDER BY ints, strs ASC")
178    assert cur.fetchall() == [(1, "bar"), (1, "foo"), (2, "baz")]

清理

183conn.close()
184tempdir.cleanup()

从 Arrow 表创建/追加到表

食谱来源: postgresql_create_append_table.py

ADBC 允许使用 Arrow 表创建和追加到数据库表。

22import os
23
24import pyarrow
25
26import adbc_driver_postgresql.dbapi
27
28uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
29conn = adbc_driver_postgresql.dbapi.connect(uri)

为了测试目的,我们首先要确保我们要使用的表不存在。

33with conn.cursor() as cur:
34    cur.execute("DROP TABLE IF EXISTS example")
35    cur.execute("DROP TABLE IF EXISTS example2")

现在我们可以创建表了。

38with conn.cursor() as cur:
39    data = pyarrow.table(
40        [
41            [1, 2, None, 4],
42        ],
43        schema=pyarrow.schema(
44            [
45                ("ints", "int32"),
46            ]
47        ),
48    )
49    cur.adbc_ingest("example", data, mode="create")
50
51conn.commit()

摄取后,我们可以获取结果。

54with conn.cursor() as cur:
55    cur.execute("SELECT * FROM example")
56    assert cur.fetchone() == (1,)
57    assert cur.fetchone() == (2,)
58
59    cur.execute("SELECT COUNT(*) FROM example")
60    assert cur.fetchone() == (4,)

如果我们尝试再次摄取,它会失败,因为表已存在。

64with conn.cursor() as cur:
65    try:
66        cur.adbc_ingest("example", data, mode="create")
67    except conn.ProgrammingError:
68        pass
69    else:
70        raise RuntimeError("Should have failed!")
71
72conn.rollback()

相反,我们可以追加到表。

75with conn.cursor() as cur:
76    cur.adbc_ingest("example", data, mode="append")
77
78    cur.execute("SELECT COUNT(*) FROM example")
79    assert cur.fetchone() == (8,)

我们也可以选择在表不存在时创建它,否则追加。

84with conn.cursor() as cur:
85    cur.adbc_ingest("example2", data, mode="create_append")
86
87    cur.execute("SELECT COUNT(*) FROM example2")
88    assert cur.fetchone() == (4,)
89
90    cur.adbc_ingest("example2", data, mode="create_append")
91
92    cur.execute("SELECT COUNT(*) FROM example2")
93    assert cur.fetchone() == (8,)

最后,我们可以替换表。

 97with conn.cursor() as cur:
 98    cur.adbc_ingest("example", data.slice(0, 2), mode="replace")
 99
100    cur.execute("SELECT COUNT(*) FROM example")
101    assert cur.fetchone() == (2,)
102
103conn.close()

创建/追加到临时表

食谱来源: postgresql_create_temp_table.py

ADBC 也允许创建和追加到临时表。

21import os
22
23import pyarrow
24
25import adbc_driver_postgresql.dbapi
26
27uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
28conn = adbc_driver_postgresql.dbapi.connect(uri)

为了测试目的,我们首先要确保我们要使用的表不存在。

32with conn.cursor() as cur:
33    cur.execute("DROP TABLE IF EXISTS example")

要创建临时表,只需指定选项“temporary”。

36data = pyarrow.table(
37    [
38        [1, 2, None, 4],
39    ],
40    schema=pyarrow.schema(
41        [
42            ("ints", "int32"),
43        ]
44    ),
45)
46
47with conn.cursor() as cur:
48    cur.adbc_ingest("example", data, mode="create", temporary=True)
49
50conn.commit()

摄取后,我们可以获取结果。

53with conn.cursor() as cur:
54    cur.execute("SELECT * FROM example")
55    assert cur.fetchone() == (1,)
56    assert cur.fetchone() == (2,)
57
58    cur.execute("SELECT COUNT(*) FROM example")
59    assert cur.fetchone() == (4,)

临时表与常规表分开,即使它们具有相同的名称。

64with conn.cursor() as cur:
65    cur.adbc_ingest("example", data.slice(0, 2), mode="create", temporary=False)
66
67conn.commit()
68
69with conn.cursor() as cur:

因为我们有两个具有相同名称的表,所以我们必须在此处显式引用正常的临时表。

72    cur.execute("SELECT COUNT(*) FROM public.example")
73    assert cur.fetchone() == (2,)
74
75    cur.execute("SELECT COUNT(*) FROM example")
76    assert cur.fetchone() == (4,)
77
78conn.close()

关闭连接后,临时表会隐式删除。如果我们重新连接,表将不存在;我们将只看到“normal”表。

83with adbc_driver_postgresql.dbapi.connect(uri) as conn:
84    with conn.cursor() as cur:
85        cur.execute("SELECT COUNT(*) FROM example")
86        assert cur.fetchone() == (2,)

所有常规摄取选项也适用于临时表。有关更多示例,请参见 从 Arrow 数据集创建/追加到表

使用绑定参数执行语句

食谱来源: postgresql_execute_bind.py

ADBC 允许使用 Python 和 Arrow 值作为绑定参数。目前,PostgreSQL 驱动程序仅支持对不生成结果集的查询的绑定参数。

24import os
25
26import pyarrow
27
28import adbc_driver_postgresql.dbapi
29
30uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
31conn = adbc_driver_postgresql.dbapi.connect(uri)

我们将创建一个示例表进行测试。

34with conn.cursor() as cur:
35    cur.execute("DROP TABLE IF EXISTS example")
36    cur.execute("CREATE TABLE example (ints INT, bigints BIGINT)")
37
38conn.commit()

我们可以绑定 Python 值

41with conn.cursor() as cur:
42    cur.executemany("INSERT INTO example VALUES ($1, $2)", [(1, 2), (3, 4)])
43
44    cur.execute("SELECT SUM(ints) FROM example")
45    assert cur.fetchone() == (4,)

注意

如果您习惯于 psycopg 等库为绑定参数使用的格式字符串样式 %s 语法,请注意,不支持此语法 - 仅支持 PostgreSQL 本地的 $1 语法。

我们也可以绑定 Arrow 值

52with conn.cursor() as cur:
53    data = pyarrow.record_batch(
54        [
55            [5, 6],
56            [7, 8],
57        ],
58        names=["$1", "$2"],
59    )
60    cur.executemany("INSERT INTO example VALUES ($1, $2)", data)
61
62    cur.execute("SELECT SUM(ints) FROM example")
63    assert cur.fetchone() == (15,)
64
65conn.close()

获取表的 Arrow 模式

食谱来源: postgresql_get_table_schema.py

ADBC 允许您将表的模式作为 Arrow 模式获取。

22import os
23
24import pyarrow
25
26import adbc_driver_postgresql.dbapi
27
28uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
29conn = adbc_driver_postgresql.dbapi.connect(uri)

我们将创建一些示例表进行测试。

32with conn.cursor() as cur:
33    cur.execute("DROP TABLE IF EXISTS example")
34    cur.execute("CREATE TABLE example (ints INT, bigints BIGINT)")
35
36    cur.execute("CREATE SCHEMA IF NOT EXISTS other_schema")
37    cur.execute("DROP TABLE IF EXISTS other_schema.example")
38    cur.execute("CREATE TABLE other_schema.example (strings TEXT, values INT)")
39
40conn.commit()

默认情况下,将假定“active”目录/模式。

43assert conn.adbc_get_table_schema("example") == pyarrow.schema(
44    [
45        ("ints", "int32"),
46        ("bigints", "int64"),
47    ]
48)

我们可以显式指定 PostgreSQL 模式来获取不同命名空间中表的 Arrow 模式。

注意

在 PostgreSQL 中,您只能查询您连接到的数据库(目录)。因此,我们不能在此处指定目录(或者更确切地说,这样做没有意义)。

请注意,NUMERIC 列被读取为字符串,因为 PostgreSQL 小数不映射到 Arrow 小数。

59assert conn.adbc_get_table_schema(
60    "example",
61    db_schema_filter="other_schema",
62) == pyarrow.schema(
63    [
64        ("strings", "string"),
65        ("values", "int32"),
66    ]
67)
68
69conn.close()

获取查询的 Arrow 模式

食谱来源: postgresql_get_query_schema.py

ADBC 允许您获取结果集的模式,无需执行查询。

22import os
23
24import pyarrow
25
26import adbc_driver_postgresql.dbapi
27
28uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
29conn = adbc_driver_postgresql.dbapi.connect(uri)

我们将创建一个示例表进行测试。

32with conn.cursor() as cur:
33    cur.execute("DROP TABLE IF EXISTS example")
34    cur.execute("CREATE TABLE example (ints INT, bigints BIGINT)")
35
36conn.commit()
37
38expected = pyarrow.schema(
39    [
40        ("ints", "int32"),
41        ("bigints", "int64"),
42    ]
43)
44
45with conn.cursor() as cur:
46    assert cur.adbc_execute_schema("SELECT * FROM example") == expected

PostgreSQL 不知道此处的类型,因此它只返回一个猜测。

49    assert cur.adbc_execute_schema("SELECT $1 AS res") == pyarrow.schema(
50        [
51            ("res", "string"),
52        ]
53    )
54
55conn.close()

列出目录、模式和表

食谱来源: postgresql_list_catalogs.py

ADBC 允许在数据库中列出表、目录和模式。

22import os
23
24import adbc_driver_postgresql.dbapi
25
26uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
27conn = adbc_driver_postgresql.dbapi.connect(uri)

我们将创建一个示例表来查找。

30with conn.cursor() as cur:
31    cur.execute("DROP TABLE IF EXISTS example")
32    cur.execute("CREATE TABLE example (ints INT, bigints BIGINT)")
33
34conn.commit()

数据以 PyArrow RecordBatchReader 形式提供。

37objects = conn.adbc_get_objects(depth="all").read_all()

为了方便起见,我们将将其转换为纯 Python 数据。

40objects = objects.to_pylist()
41catalog = objects[0]
42assert catalog["catalog_name"] == "postgres"
43
44db_schema = catalog["catalog_db_schemas"][0]
45assert db_schema["db_schema_name"] == "public"
46
47tables = db_schema["db_schema_tables"]
48example = [table for table in tables if table["table_name"] == "example"]
49assert len(example) == 1
50example = example[0]
51
52assert example["table_columns"][0]["column_name"] == "ints"
53assert example["table_columns"][1]["column_name"] == "bigints"
54
55conn.close()

使用 SQLAlchemy 进行连接池

食谱来源: postgresql_pool.py

ADBC 没有实现连接池,因为这通常不是 DBAPI 驱动程序的功能。相反,请使用内置在 SQLAlchemy 中的第三方连接池。

26import os
27
28import sqlalchemy.pool
29
30import adbc_driver_postgresql.dbapi
31
32uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
33
34source = adbc_driver_postgresql.dbapi.connect(uri)

adbc_driver_manager.dbapi.Connection.adbc_clone() 从现有连接打开一个新连接,尽可能共享内部资源。例如,PostgreSQL 驱动程序将共享内部 OID 缓存,从而节省连接的一些开销。

39pool = sqlalchemy.pool.QueuePool(source.adbc_clone, max_overflow=1, pool_size=2)

我们现在可以从池中获取连接;SQLAlchemy 覆盖了 close() 以将连接返回到池中。

注意

与底层 ADBC 连接不同,SQLAlchemy 的包装器不支持上下文管理器协议。

47conn = pool.connect()
48
49assert pool.checkedin() == 0
50assert pool.checkedout() == 1
51
52with conn.cursor() as cur:
53    cur.execute("SELECT 1")
54    assert cur.fetchone() == (1,)
55
56conn.close()
57
58assert pool.checkedin() == 1
59assert pool.checkedout() == 0
60
61source.close()

使用 Pandas 和 ADBC

食谱来源: postgresql_pandas.py

ADBC 集成到 pandas 中,这是一个流行的数据框库。Pandas 可以使用 ADBC 与 PostgreSQL 和其他数据库交换数据。与使用 SQLAlchemy 或其他选项相比,使用 ADBC 与 pandas 可以获得更好的性能,例如通过避免不必要的 Python 对象的转换。

28import os
29
30import pandas as pd
31
32import adbc_driver_postgresql.dbapi
33
34uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
35conn = adbc_driver_postgresql.dbapi.connect(uri)

我们将使用 pd.DataFrame.to_sql 创建一个示例表。

40data = pd.DataFrame(
41    {
42        "ints": [1, 2, None, 4],
43        "strs": ["a", "b", "c", "d"],
44    }
45)
46data.to_sql("example", conn, if_exists="replace")
47conn.commit()

创建表后,我们可以将 ADBC 连接和 SQL 查询传递给 pd.read_sql 以将结果集作为 pandas DataFrame 获取。

53df = pd.read_sql("SELECT * FROM example WHERE ints > 1", conn)
54
55assert len(df) == 2
56
57conn.close()

与 ADBC 接口相比,pandas 提供了一个更方便和更高层次的 API,尤其是对于那些已经在使用 pandas 的人来说。

使用 Polars 和 ADBC

食谱来源: postgresql_polars.py

ADBC 可与 Polars 一起使用,Polars 是一个用 Rust 编写的 DataFrame 库。根据其文档

如果后端支持直接返回 Arrow 数据,则将使用此工具有效地实例化 DataFrame;否则,DataFrame 将从逐行数据初始化。

显然,ADBC 直接返回 Arrow 数据,这使得 ADBC 和 Polars 成为彼此的自然选择。

32import os
33
34import polars as pl
35
36uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]

我们将使用 Polars 使用 polars.DataFrame.write_database() 创建一个示例表。我们不需要使用 Polars 自己打开 ADBC 连接。

42data = pl.DataFrame(
43    {
44        "ints": [1, 2, None, 4],
45        "strs": ["a", "b", "c", "d"],
46    }
47)
48data.write_database("example", uri, engine="adbc", if_table_exists="replace")

创建表后,我们可以使用 polars.read_database_uri() 获取结果。同样,我们只需传递 URI 并告诉 Polars 为我们管理 ADBC。

54df = pl.read_database_uri("SELECT * FROM example WHERE ints > 1", uri, engine="adbc")
55
56assert len(df) == 2