PostgreSQL 使用指南

使用用户名和密码进行身份验证

配方源码: postgresql_authenticate.py

要连接到 PostgreSQL 数据库,必须在 URI 中提供用户名和密码。例如,

postgresql://username:password@hostname:port/dbname

详细信息请参阅 PostgreSQL 文档

32import os
33
34import adbc_driver_postgresql.dbapi
35
36uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
37conn = adbc_driver_postgresql.dbapi.connect(uri)
38
39with conn.cursor() as cur:
40    cur.execute("SELECT 1")
41    print(cur.fetchone())
42    # Output: (1,)
43
44conn.close()
标准输出
(1,)

启用自动提交模式

配方源码: postgresql_autocommit.py

您可以通过将 autocommit 参数传递给 connect 函数来启用自动提交模式。启用自动提交后,每条语句都会自动提交,无需显式调用 commit()。这对于无法在事务内运行的操作,或者当您希望立即提交每条语句时非常有用。

27import os
28
29import adbc_driver_postgresql.dbapi
30
31uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
32
33# Enable autocommit mode
34conn = adbc_driver_postgresql.dbapi.connect(uri, autocommit=True)
35
36with conn.cursor() as cur:
37    # In autocommit mode, this statement is automatically committed
38    cur.execute("CREATE TEMP TABLE IF NOT EXISTS autocommit_test (id INTEGER)")
39    cur.execute("INSERT INTO autocommit_test VALUES (1)")
40
41# Verify the data was committed
42with conn.cursor() as cur:
43    cur.execute("SELECT * FROM autocommit_test")
44    assert cur.fetchone() == (1,)
45
46conn.close()

从 Arrow 数据集创建/追加到表

配方源码: postgresql_create_dataset_table.py

ADBC 可以轻松地将 PyArrow 数据集加载到您的数据存储中。

24import os
25import tempfile
26from pathlib import Path
27
28import pyarrow
29import pyarrow.csv
30import pyarrow.dataset
31import pyarrow.feather
32import pyarrow.parquet
33
34import adbc_driver_postgresql.dbapi
35
36uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
37conn = adbc_driver_postgresql.dbapi.connect(uri)

为了进行测试,我们首先确保即将使用的表不存在。

41with conn.cursor() as cur:
42    cur.execute("DROP TABLE IF EXISTS csvtable")
43    cur.execute("DROP TABLE IF EXISTS ipctable")
44    cur.execute("DROP TABLE IF EXISTS pqtable")
45    cur.execute("DROP TABLE IF EXISTS csvdataset")
46    cur.execute("DROP TABLE IF EXISTS ipcdataset")
47    cur.execute("DROP TABLE IF EXISTS pqdataset")
48
49conn.commit()

生成示例数据

54tempdir = tempfile.TemporaryDirectory(
55    prefix="adbc-docs-",
56    ignore_cleanup_errors=True,
57)
58root = Path(tempdir.name)
59table = pyarrow.table(
60    [
61        [1, 1, 2],
62        ["foo", "bar", "baz"],
63    ],
64    names=["ints", "strs"],
65)

首先,我们将写入单个文件。

69csv_file = root / "example.csv"
70pyarrow.csv.write_csv(table, csv_file)
71
72ipc_file = root / "example.arrow"
73pyarrow.feather.write_feather(table, ipc_file)
74
75parquet_file = root / "example.parquet"
76pyarrow.parquet.write_table(table, parquet_file)

我们还将生成一些分区数据集。

 80csv_dataset = root / "csv_dataset"
 81pyarrow.dataset.write_dataset(
 82    table,
 83    csv_dataset,
 84    format="csv",
 85    partitioning=["ints"],
 86)
 87
 88ipc_dataset = root / "ipc_dataset"
 89pyarrow.dataset.write_dataset(
 90    table,
 91    ipc_dataset,
 92    format="feather",
 93    partitioning=["ints"],
 94)
 95
 96parquet_dataset = root / "parquet_dataset"
 97pyarrow.dataset.write_dataset(
 98    table,
 99    parquet_dataset,
100    format="parquet",
101    partitioning=["ints"],
102)

将 CSV 文件加载到 PostgreSQL

我们可以直接将 pyarrow.RecordBatchReader(来自 open_csv)传递给 adbc_ingest。我们也可以传递 pyarrow.dataset.Datasetpyarrow.dataset.Scanner

112with conn.cursor() as cur:
113    reader = pyarrow.csv.open_csv(csv_file)
114    cur.adbc_ingest("csvtable", reader, mode="create")
115
116    reader = pyarrow.dataset.dataset(
117        csv_dataset,
118        format="csv",
119        partitioning=["ints"],
120    )
121    cur.adbc_ingest("csvdataset", reader, mode="create")
122
123conn.commit()
124
125with conn.cursor() as cur:
126    cur.execute("SELECT ints, strs FROM csvtable ORDER BY ints, strs ASC")
127    assert cur.fetchall() == [(1, "bar"), (1, "foo"), (2, "baz")]
128
129    cur.execute("SELECT ints, strs FROM csvdataset ORDER BY ints, strs ASC")
130    assert cur.fetchall() == [(1, "bar"), (1, "foo"), (2, "baz")]

将 Arrow IPC (Feather) 文件加载到 PostgreSQL

135with conn.cursor() as cur:
136    reader = pyarrow.ipc.RecordBatchFileReader(ipc_file)

由于 PyArrow API 的特性,我们必须先将文件读入内存。

139    cur.adbc_ingest("ipctable", reader.read_all(), mode="create")

不过,Dataset API 会将数据流式传输到内存中,然后再加载到 PostgreSQL 中。

143    reader = pyarrow.dataset.dataset(
144        ipc_dataset,
145        format="feather",
146        partitioning=["ints"],
147    )
148    cur.adbc_ingest("ipcdataset", reader, mode="create")
149
150conn.commit()
151
152with conn.cursor() as cur:
153    cur.execute("SELECT ints, strs FROM ipctable ORDER BY ints, strs ASC")
154    assert cur.fetchall() == [(1, "bar"), (1, "foo"), (2, "baz")]
155
156    cur.execute("SELECT ints, strs FROM ipcdataset ORDER BY ints, strs ASC")
157    assert cur.fetchall() == [(1, "bar"), (1, "foo"), (2, "baz")]

将 Parquet 文件加载到 PostgreSQL

162with conn.cursor() as cur:
163    reader = pyarrow.parquet.ParquetFile(parquet_file)
164    cur.adbc_ingest("pqtable", reader.iter_batches(), mode="create")
165
166    reader = pyarrow.dataset.dataset(
167        parquet_dataset,
168        format="parquet",
169        partitioning=["ints"],
170    )
171    cur.adbc_ingest("pqdataset", reader, mode="create")
172
173conn.commit()
174
175with conn.cursor() as cur:
176    cur.execute("SELECT ints, strs FROM pqtable ORDER BY ints, strs ASC")
177    assert cur.fetchall() == [(1, "bar"), (1, "foo"), (2, "baz")]
178
179    cur.execute("SELECT ints, strs FROM pqdataset ORDER BY ints, strs ASC")
180    assert cur.fetchall() == [(1, "bar"), (1, "foo"), (2, "baz")]

清理

185conn.close()
186tempdir.cleanup()

从 Arrow 表创建/追加到表

配方源码: postgresql_create_append_table.py

ADBC 允许使用 Arrow 表来创建数据库表或向其追加数据。

24import os
25
26import pyarrow
27
28import adbc_driver_postgresql.dbapi
29
30uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
31conn = adbc_driver_postgresql.dbapi.connect(uri)

为了进行测试,我们首先确保即将使用的表不存在。

35with conn.cursor() as cur:
36    cur.execute("DROP TABLE IF EXISTS example")
37    cur.execute("DROP TABLE IF EXISTS example2")

现在我们可以创建表了。

40with conn.cursor() as cur:
41    data = pyarrow.table(
42        [
43            [1, 2, None, 4],
44        ],
45        schema=pyarrow.schema(
46            [
47                ("ints", "int32"),
48            ]
49        ),
50    )
51    cur.adbc_ingest("example", data, mode="create")
52
53conn.commit()

摄取后,我们可以获取结果。

56with conn.cursor() as cur:
57    cur.execute("SELECT * FROM example")
58    assert cur.fetchone() == (1,)
59    assert cur.fetchone() == (2,)
60
61    cur.execute("SELECT COUNT(*) FROM example")
62    assert cur.fetchone() == (4,)

如果我们尝试再次摄取,它会失败,因为表已经存在。

66with conn.cursor() as cur:
67    try:
68        cur.adbc_ingest("example", data, mode="create")
69    except conn.ProgrammingError:
70        pass
71    else:
72        raise RuntimeError("Should have failed!")
73
74conn.rollback()

相反,我们可以追加到表中。

77with conn.cursor() as cur:
78    cur.adbc_ingest("example", data, mode="append")
79
80    cur.execute("SELECT COUNT(*) FROM example")
81    assert cur.fetchone() == (8,)

我们还可以选择:如果表不存在则创建它,否则追加数据。

86with conn.cursor() as cur:
87    cur.adbc_ingest("example2", data, mode="create_append")
88
89    cur.execute("SELECT COUNT(*) FROM example2")
90    assert cur.fetchone() == (4,)
91
92    cur.adbc_ingest("example2", data, mode="create_append")
93
94    cur.execute("SELECT COUNT(*) FROM example2")
95    assert cur.fetchone() == (8,)

最后,我们可以替换该表。

 99with conn.cursor() as cur:
100    cur.adbc_ingest("example", data.slice(0, 2), mode="replace")
101
102    cur.execute("SELECT COUNT(*) FROM example")
103    assert cur.fetchone() == (2,)
104
105conn.close()

创建/追加到临时表

配方源码: postgresql_create_temp_table.py

ADBC 也允许创建临时表并向其追加数据。

23import os
24
25import pyarrow
26
27import adbc_driver_postgresql.dbapi
28
29uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
30conn = adbc_driver_postgresql.dbapi.connect(uri)

为了进行测试,我们首先确保即将使用的表不存在。

34with conn.cursor() as cur:
35    cur.execute("DROP TABLE IF EXISTS example")

要创建临时表,只需指定 “temporary” 选项即可。

38data = pyarrow.table(
39    [
40        [1, 2, None, 4],
41    ],
42    schema=pyarrow.schema(
43        [
44            ("ints", "int32"),
45        ]
46    ),
47)
48
49with conn.cursor() as cur:
50    cur.adbc_ingest("example", data, mode="create", temporary=True)
51
52conn.commit()

摄取后,我们可以获取结果。

55with conn.cursor() as cur:
56    cur.execute("SELECT * FROM example")
57    assert cur.fetchone() == (1,)
58    assert cur.fetchone() == (2,)
59
60    cur.execute("SELECT COUNT(*) FROM example")
61    assert cur.fetchone() == (4,)

临时表与常规表是分开的,即使它们具有相同的名称。

66with conn.cursor() as cur:
67    cur.adbc_ingest("example", data.slice(0, 2), mode="create", temporary=False)
68
69conn.commit()
70
71with conn.cursor() as cur:

因为我们有两个同名的表,所以必须在这里明确引用正常的临时表。

74    cur.execute("SELECT COUNT(*) FROM public.example")
75    assert cur.fetchone() == (2,)
76
77    cur.execute("SELECT COUNT(*) FROM example")
78    assert cur.fetchone() == (4,)
79
80conn.close()

关闭连接后,临时表会被隐式删除。如果我们重新连接,该表将不存在;我们将只能看到“普通”表。

85with adbc_driver_postgresql.dbapi.connect(uri) as conn:
86    with conn.cursor() as cur:
87        cur.execute("SELECT COUNT(*) FROM example")
88        assert cur.fetchone() == (2,)

所有常规的摄取选项也适用于临时表。更多示例请参阅 从 Arrow 数据集创建/追加到表

执行带有绑定参数的语句

配方源码: postgresql_execute_bind.py

ADBC 允许使用 Python 和 Arrow 值作为绑定参数。目前,PostgreSQL 驱动程序仅支持对不生成结果集的查询使用绑定参数。

26import os
27
28import pyarrow
29
30import adbc_driver_postgresql.dbapi
31
32uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
33conn = adbc_driver_postgresql.dbapi.connect(uri)

我们将创建一个示例表来测试。

36with conn.cursor() as cur:
37    cur.execute("DROP TABLE IF EXISTS example")
38    cur.execute("CREATE TABLE example (ints INT, bigints BIGINT)")
39
40conn.commit()

我们可以绑定 Python 值。

43with conn.cursor() as cur:
44    cur.executemany("INSERT INTO example VALUES ($1, $2)", [(1, 2), (3, 4)])
45
46    cur.execute("SELECT SUM(ints) FROM example")
47    assert cur.fetchone() == (4,)

注意

如果您习惯了类似 psycopg 等库用于绑定参数的格式字符串样式 %s 语法,请注意这不受支持——仅支持 PostgreSQL 原生的 $1 语法。

我们也可以绑定 Arrow 值。

54with conn.cursor() as cur:
55    data = pyarrow.record_batch(
56        [
57            [5, 6],
58            [7, 8],
59        ],
60        names=["$1", "$2"],
61    )
62    cur.executemany("INSERT INTO example VALUES ($1, $2)", data)
63
64    cur.execute("SELECT SUM(ints) FROM example")
65    assert cur.fetchone() == (15,)
66
67conn.close()

在不使用 COPY 的情况下执行语句

配方源码: postgresql_execute_nocopy.py

ADBC 驱动程序默认尝试使用 COPY 执行查询,因为这对于大数据集速度更快。但是,PostgreSQL 并不支持对所有类型的查询使用 COPY。例如,SHOW 查询将无法工作。在这种情况下,您可以显式禁用 COPY 优化。

27import os
28
29import adbc_driver_postgresql.dbapi
30
31uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
32conn = adbc_driver_postgresql.dbapi.connect(uri)

该选项可以在创建游标时设置:

36with conn.cursor(
37    adbc_stmt_kwargs={
38        adbc_driver_postgresql.StatementOptions.USE_COPY.value: False,
39    }
40) as cur:
41    cur.execute("SHOW ALL")
42    print(cur.fetch_arrow_table().schema)

或者也可以在事后设置:

50with conn.cursor() as cur:
51    cur.adbc_statement.set_options(
52        **{
53            adbc_driver_postgresql.StatementOptions.USE_COPY.value: False,
54        }
55    )
56    cur.execute("SHOW ALL")
57    print(cur.fetch_arrow_table().schema)

如果不使用该选项,查询将失败,因为驱动程序会尝试使用 COPY 执行查询。

66with conn.cursor() as cur:
67    try:
68        cur.execute("SHOW ALL")
69    except conn.Error:
70        pass
71    else:
72        raise RuntimeError("Expected error")
73
74conn.close()
标准输出 (stdout)
name: string
setting: string
description: string
name: string
setting: string
description: string

获取表的 Arrow 模式 (Schema)

配方源码: postgresql_get_table_schema.py

ADBC 允许您获取表的模式,并将其作为 Arrow 模式。

24import os
25
26import pyarrow
27
28import adbc_driver_postgresql.dbapi
29
30uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
31conn = adbc_driver_postgresql.dbapi.connect(uri)

我们将创建一些示例表进行测试。

34with conn.cursor() as cur:
35    cur.execute("DROP TABLE IF EXISTS example")
36    cur.execute("CREATE TABLE example (ints INT, bigints BIGINT)")
37
38    cur.execute("CREATE SCHEMA IF NOT EXISTS other_schema")
39    cur.execute("DROP TABLE IF EXISTS other_schema.example")
40    cur.execute("CREATE TABLE other_schema.example (strings TEXT, values INT)")
41
42conn.commit()

默认情况下,假定为“活动”目录/模式。

45assert conn.adbc_get_table_schema("example") == pyarrow.schema(
46    [
47        ("ints", "int32"),
48        ("bigints", "int64"),
49    ]
50)

我们可以明确指定 PostgreSQL 模式,以获取不同命名空间中表的 Arrow 模式。

注意

在 PostgreSQL 中,您只能查询您连接到的数据库(目录)。因此我们不能在此处指定目录(或者说,这样做没有意义)。

请注意,NUMERIC 列被读取为字符串,因为 PostgreSQL 的小数类型无法直接映射到 Arrow 的小数类型。

61assert conn.adbc_get_table_schema(
62    "example",
63    db_schema_filter="other_schema",
64) == pyarrow.schema(
65    [
66        ("strings", "string"),
67        ("values", "int32"),
68    ]
69)
70
71conn.close()

获取查询的 Arrow 模式

配方源码: postgresql_get_query_schema.py

ADBC 允许您在不执行查询的情况下获取结果集的模式。

24import os
25
26import pyarrow
27
28import adbc_driver_postgresql.dbapi
29
30uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
31conn = adbc_driver_postgresql.dbapi.connect(uri)

我们将创建一个示例表来测试。

34with conn.cursor() as cur:
35    cur.execute("DROP TABLE IF EXISTS example")
36    cur.execute("CREATE TABLE example (ints INT, bigints BIGINT)")
37
38conn.commit()
39
40expected = pyarrow.schema(
41    [
42        ("ints", "int32"),
43        ("bigints", "int64"),
44    ]
45)
46
47with conn.cursor() as cur:
48    assert cur.adbc_execute_schema("SELECT * FROM example") == expected

PostgreSQL 在此处不知道类型,所以它只是返回一个猜测值。

51    assert cur.adbc_execute_schema("SELECT $1 AS res") == pyarrow.schema(
52        [
53            ("res", "string"),
54        ]
55    )
56
57conn.close()

列出目录、模式和表

配方源码: postgresql_list_catalogs.py

ADBC 允许列出数据库中的表、目录和模式。

24import os
25
26import adbc_driver_postgresql.dbapi
27
28uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
29conn = adbc_driver_postgresql.dbapi.connect(uri)

我们将创建一个示例表来查找。

32with conn.cursor() as cur:
33    cur.execute("DROP TABLE IF EXISTS example")
34    cur.execute("CREATE TABLE example (ints INT, bigints BIGINT)")
35
36conn.commit()

数据以 PyArrow RecordBatchReader 的形式提供。

39objects = conn.adbc_get_objects(depth="all").read_all()

为了方便起见,我们将把它转换为普通的 Python 数据。

42objects = objects.to_pylist()
43catalog = objects[0]
44assert catalog["catalog_name"] == "postgres"
45
46db_schema = catalog["catalog_db_schemas"][0]
47assert db_schema["db_schema_name"] == "public"
48
49tables = db_schema["db_schema_tables"]
50example = [table for table in tables if table["table_name"] == "example"]
51assert len(example) == 1
52example = example[0]
53
54assert example["table_columns"][0]["column_name"] == "ints"
55assert example["table_columns"][1]["column_name"] == "bigints"
56
57conn.close()

使用 SQLAlchemy 进行连接池管理

配方源码: postgresql_pool.py

ADBC 没有实现连接池,因为这通常不是 DBAPI 驱动程序的功能。相反,请使用第三方连接池,例如 SQLAlchemy 中内置的连接池。

28import os
29
30import sqlalchemy.pool
31
32import adbc_driver_postgresql.dbapi
33
34uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
35
36source = adbc_driver_postgresql.dbapi.connect(uri)

adbc_driver_manager.dbapi.Connection.adbc_clone() 从现有连接打开一个新连接,尽可能共享内部资源。例如,PostgreSQL 驱动程序将共享内部 OID 缓存,从而节省连接开销。

41pool = sqlalchemy.pool.QueuePool(source.adbc_clone, max_overflow=1, pool_size=2)

现在我们可以从池中获取连接;SQLAlchemy 会覆盖 close() 方法以将连接返回给池。

注意

与底层 ADBC 连接不同,SQLAlchemy 的包装器不支持上下文管理器协议。

49conn = pool.connect()
50
51assert pool.checkedin() == 0
52assert pool.checkedout() == 1
53
54with conn.cursor() as cur:
55    cur.execute("SELECT 1")
56    assert cur.fetchone() == (1,)
57
58conn.close()
59
60assert pool.checkedin() == 1
61assert pool.checkedout() == 0
62
63source.close()

结合使用 Pandas 和 ADBC

配方源码: postgresql_pandas.py

ADBC 集成到了流行的 dataframe 库 pandas 中。Pandas 可以使用 ADBC 与 PostgreSQL 及其他数据库交换数据。与使用 SQLAlchemy 或其他选项相比,将 ADBC 与 pandas 结合使用可以获得更好的性能,例如避免了与 Python 对象之间的过度转换。

30import os
31
32import pandas as pd
33
34import adbc_driver_postgresql.dbapi
35
36uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
37conn = adbc_driver_postgresql.dbapi.connect(uri)

我们将使用 pd.DataFrame.to_sql 创建一个示例表。

42data = pd.DataFrame(
43    {
44        "ints": [1, 2, None, 4],
45        "strs": ["a", "b", "c", "d"],
46    }
47)
48data.to_sql("example", conn, if_exists="replace")
49conn.commit()

创建表后,我们可以将 ADBC 连接和 SQL 查询传递给 pd.read_sql,以 pandas DataFrame 的形式获取结果集。

55df = pd.read_sql("SELECT * FROM example WHERE ints > 1", conn)
56
57assert len(df) == 2
58
59conn.close()

与 ADBC 接口相比,pandas 提供了更方便、更高级的 API,特别是对于那些已经在使用 pandas 的用户而言。

结合使用 Polars 和 ADBC

配方源码: postgresql_polars.py

ADBC 可以与使用 Rust 编写的 dataframe 库 Polars 结合使用。根据其文档,

如果后端支持直接返回 Arrow 数据,则将使用此设施高效地实例化 DataFrame;否则,将从行数据初始化 DataFrame。

显然,ADBC 直接返回 Arrow 数据,这使得 ADBC 和 Polars 成为天作之合。

34import os
35
36import polars as pl
37
38uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]

我们将使用 Polars 和 polars.DataFrame.write_database() 来创建示例表。在使用 Polars 时,我们无需自己打开 ADBC 连接。

44data = pl.DataFrame(
45    {
46        "ints": [1, 2, None, 4],
47        "strs": ["a", "b", "c", "d"],
48    }
49)
50data.write_database("example", uri, engine="adbc", if_table_exists="replace")

创建表后,我们可以使用 polars.read_database_uri() 来获取结果。同样,我们可以直接传入 URI 并告诉 Polars 为我们管理 ADBC。

56df = pl.read_database_uri("SELECT * FROM example WHERE ints > 1", uri, engine="adbc")
57
58assert len(df) == 2