PostgreSQL 指南¶
使用用户名和密码进行身份验证¶
指南来源:postgresql_authenticate.py
要连接到 PostgreSQL 数据库,必须在 URI 中提供用户名和密码。例如,
postgresql://username:password@hostname:port/dbname
有关详细信息,请参阅 PostgreSQL 文档。
32import os
33
34import adbc_driver_postgresql.dbapi
35
36uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
37conn = adbc_driver_postgresql.dbapi.connect(uri)
38
39with conn.cursor() as cur:
40 cur.execute("SELECT 1")
41 print(cur.fetchone())
42 # Output: (1,)
43
44conn.close()
(1,)
从 Arrow 数据集创建/追加到表¶
指南来源:postgresql_create_dataset_table.py
ADBC 可以轻松地将 PyArrow 数据集加载到您的数据存储中。
24import os
25import tempfile
26from pathlib import Path
27
28import pyarrow
29import pyarrow.csv
30import pyarrow.dataset
31import pyarrow.feather
32import pyarrow.parquet
33
34import adbc_driver_postgresql.dbapi
35
36uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
37conn = adbc_driver_postgresql.dbapi.connect(uri)
为了测试目的,我们首先确保将要使用的表不存在。
41with conn.cursor() as cur:
42 cur.execute("DROP TABLE IF EXISTS csvtable")
43 cur.execute("DROP TABLE IF EXISTS ipctable")
44 cur.execute("DROP TABLE IF EXISTS pqtable")
45 cur.execute("DROP TABLE IF EXISTS csvdataset")
46 cur.execute("DROP TABLE IF EXISTS ipcdataset")
47 cur.execute("DROP TABLE IF EXISTS pqdataset")
48
49conn.commit()
生成示例数据¶
54tempdir = tempfile.TemporaryDirectory(
55 prefix="adbc-docs-",
56 ignore_cleanup_errors=True,
57)
58root = Path(tempdir.name)
59table = pyarrow.table(
60 [
61 [1, 1, 2],
62 ["foo", "bar", "baz"],
63 ],
64 names=["ints", "strs"],
65)
首先我们将写入单个文件。
69csv_file = root / "example.csv"
70pyarrow.csv.write_csv(table, csv_file)
71
72ipc_file = root / "example.arrow"
73pyarrow.feather.write_feather(table, ipc_file)
74
75parquet_file = root / "example.parquet"
76pyarrow.parquet.write_table(table, parquet_file)
我们还将生成一些分区数据集。
80csv_dataset = root / "csv_dataset"
81pyarrow.dataset.write_dataset(
82 table,
83 csv_dataset,
84 format="csv",
85 partitioning=["ints"],
86)
87
88ipc_dataset = root / "ipc_dataset"
89pyarrow.dataset.write_dataset(
90 table,
91 ipc_dataset,
92 format="feather",
93 partitioning=["ints"],
94)
95
96parquet_dataset = root / "parquet_dataset"
97pyarrow.dataset.write_dataset(
98 table,
99 parquet_dataset,
100 format="parquet",
101 partitioning=["ints"],
102)
将 CSV 文件加载到 PostgreSQL¶
我们可以直接将 pyarrow.RecordBatchReader(来自 open_csv)传递给 adbc_ingest。我们还可以传递 pyarrow.dataset.Dataset 或 pyarrow.dataset.Scanner。
112with conn.cursor() as cur:
113 reader = pyarrow.csv.open_csv(csv_file)
114 cur.adbc_ingest("csvtable", reader, mode="create")
115
116 reader = pyarrow.dataset.dataset(
117 csv_dataset,
118 format="csv",
119 partitioning=["ints"],
120 )
121 cur.adbc_ingest("csvdataset", reader, mode="create")
122
123conn.commit()
124
125with conn.cursor() as cur:
126 cur.execute("SELECT ints, strs FROM csvtable ORDER BY ints, strs ASC")
127 assert cur.fetchall() == [(1, "bar"), (1, "foo"), (2, "baz")]
128
129 cur.execute("SELECT ints, strs FROM csvdataset ORDER BY ints, strs ASC")
130 assert cur.fetchall() == [(1, "bar"), (1, "foo"), (2, "baz")]
将 Arrow IPC (Feather) 文件加载到 PostgreSQL¶
135with conn.cursor() as cur:
136 reader = pyarrow.ipc.RecordBatchFileReader(ipc_file)
由于 PyArrow API 的怪癖,我们必须将文件读入内存。
139 cur.adbc_ingest("ipctable", reader.read_all(), mode="create")
不过,数据集 API 会将数据流式传输到内存中,然后传输到 PostgreSQL。
143 reader = pyarrow.dataset.dataset(
144 ipc_dataset,
145 format="feather",
146 partitioning=["ints"],
147 )
148 cur.adbc_ingest("ipcdataset", reader, mode="create")
149
150conn.commit()
151
152with conn.cursor() as cur:
153 cur.execute("SELECT ints, strs FROM ipctable ORDER BY ints, strs ASC")
154 assert cur.fetchall() == [(1, "bar"), (1, "foo"), (2, "baz")]
155
156 cur.execute("SELECT ints, strs FROM ipcdataset ORDER BY ints, strs ASC")
157 assert cur.fetchall() == [(1, "bar"), (1, "foo"), (2, "baz")]
将 Parquet 文件加载到 PostgreSQL¶
162with conn.cursor() as cur:
163 reader = pyarrow.parquet.ParquetFile(parquet_file)
164 cur.adbc_ingest("pqtable", reader.iter_batches(), mode="create")
165
166 reader = pyarrow.dataset.dataset(
167 parquet_dataset,
168 format="parquet",
169 partitioning=["ints"],
170 )
171 cur.adbc_ingest("pqdataset", reader, mode="create")
172
173conn.commit()
174
175with conn.cursor() as cur:
176 cur.execute("SELECT ints, strs FROM pqtable ORDER BY ints, strs ASC")
177 assert cur.fetchall() == [(1, "bar"), (1, "foo"), (2, "baz")]
178
179 cur.execute("SELECT ints, strs FROM pqdataset ORDER BY ints, strs ASC")
180 assert cur.fetchall() == [(1, "bar"), (1, "foo"), (2, "baz")]
清理¶
185conn.close()
186tempdir.cleanup()
从 Arrow 表创建/追加到表¶
指南来源:postgresql_create_append_table.py
ADBC 允许使用 Arrow 表创建和追加到数据库表。
24import os
25
26import pyarrow
27
28import adbc_driver_postgresql.dbapi
29
30uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
31conn = adbc_driver_postgresql.dbapi.connect(uri)
为了测试目的,我们首先确保将要使用的表不存在。
35with conn.cursor() as cur:
36 cur.execute("DROP TABLE IF EXISTS example")
37 cur.execute("DROP TABLE IF EXISTS example2")
现在我们可以创建表了。
40with conn.cursor() as cur:
41 data = pyarrow.table(
42 [
43 [1, 2, None, 4],
44 ],
45 schema=pyarrow.schema(
46 [
47 ("ints", "int32"),
48 ]
49 ),
50 )
51 cur.adbc_ingest("example", data, mode="create")
52
53conn.commit()
摄取后,我们可以获取结果。
56with conn.cursor() as cur:
57 cur.execute("SELECT * FROM example")
58 assert cur.fetchone() == (1,)
59 assert cur.fetchone() == (2,)
60
61 cur.execute("SELECT COUNT(*) FROM example")
62 assert cur.fetchone() == (4,)
如果我们尝试再次摄取,它会失败,因为表已经存在。
66with conn.cursor() as cur:
67 try:
68 cur.adbc_ingest("example", data, mode="create")
69 except conn.ProgrammingError:
70 pass
71 else:
72 raise RuntimeError("Should have failed!")
73
74conn.rollback()
相反,我们可以追加到表中。
77with conn.cursor() as cur:
78 cur.adbc_ingest("example", data, mode="append")
79
80 cur.execute("SELECT COUNT(*) FROM example")
81 assert cur.fetchone() == (8,)
我们还可以选择在表不存在时创建表,否则追加。
86with conn.cursor() as cur:
87 cur.adbc_ingest("example2", data, mode="create_append")
88
89 cur.execute("SELECT COUNT(*) FROM example2")
90 assert cur.fetchone() == (4,)
91
92 cur.adbc_ingest("example2", data, mode="create_append")
93
94 cur.execute("SELECT COUNT(*) FROM example2")
95 assert cur.fetchone() == (8,)
最后,我们可以替换表。
99with conn.cursor() as cur:
100 cur.adbc_ingest("example", data.slice(0, 2), mode="replace")
101
102 cur.execute("SELECT COUNT(*) FROM example")
103 assert cur.fetchone() == (2,)
104
105conn.close()
创建/追加到临时表¶
指南来源:postgresql_create_temp_table.py
ADBC 也允许创建和追加到临时表。
23import os
24
25import pyarrow
26
27import adbc_driver_postgresql.dbapi
28
29uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
30conn = adbc_driver_postgresql.dbapi.connect(uri)
为了测试目的,我们首先确保将要使用的表不存在。
34with conn.cursor() as cur:
35 cur.execute("DROP TABLE IF EXISTS example")
要创建临时表,只需指定选项“temporary”。
38data = pyarrow.table(
39 [
40 [1, 2, None, 4],
41 ],
42 schema=pyarrow.schema(
43 [
44 ("ints", "int32"),
45 ]
46 ),
47)
48
49with conn.cursor() as cur:
50 cur.adbc_ingest("example", data, mode="create", temporary=True)
51
52conn.commit()
摄取后,我们可以获取结果。
55with conn.cursor() as cur:
56 cur.execute("SELECT * FROM example")
57 assert cur.fetchone() == (1,)
58 assert cur.fetchone() == (2,)
59
60 cur.execute("SELECT COUNT(*) FROM example")
61 assert cur.fetchone() == (4,)
临时表与常规表是分开的,即使它们具有相同的名称。
66with conn.cursor() as cur:
67 cur.adbc_ingest("example", data.slice(0, 2), mode="create", temporary=False)
68
69conn.commit()
70
71with conn.cursor() as cur:
由于我们有两个同名表,我们必须在此处明确引用普通的临时表。
74 cur.execute("SELECT COUNT(*) FROM public.example")
75 assert cur.fetchone() == (2,)
76
77 cur.execute("SELECT COUNT(*) FROM example")
78 assert cur.fetchone() == (4,)
79
80conn.close()
关闭连接后,临时表被隐式删除。如果重新连接,表将不存在;我们将只看到“普通”表。
85with adbc_driver_postgresql.dbapi.connect(uri) as conn:
86 with conn.cursor() as cur:
87 cur.execute("SELECT COUNT(*) FROM example")
88 assert cur.fetchone() == (2,)
所有常规摄取选项也适用于临时表。更多示例请参阅 从 Arrow 数据集创建/追加到表。
执行带绑定参数的语句¶
指南来源:postgresql_execute_bind.py
ADBC 允许使用 Python 和 Arrow 值作为绑定参数。目前,PostgreSQL 驱动程序仅支持不生成结果集的查询的绑定参数。
26import os
27
28import pyarrow
29
30import adbc_driver_postgresql.dbapi
31
32uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
33conn = adbc_driver_postgresql.dbapi.connect(uri)
我们将创建一个示例表进行测试。
36with conn.cursor() as cur:
37 cur.execute("DROP TABLE IF EXISTS example")
38 cur.execute("CREATE TABLE example (ints INT, bigints BIGINT)")
39
40conn.commit()
我们可以绑定 Python 值
43with conn.cursor() as cur:
44 cur.executemany("INSERT INTO example VALUES ($1, $2)", [(1, 2), (3, 4)])
45
46 cur.execute("SELECT SUM(ints) FROM example")
47 assert cur.fetchone() == (4,)
注意
如果您习惯于像 psycopg 等库用于绑定参数的格式字符串样式 %s 语法,请注意,此语法不支持——只支持 PostgreSQL 原生的 $1 语法。
我们还可以绑定 Arrow 值
54with conn.cursor() as cur:
55 data = pyarrow.record_batch(
56 [
57 [5, 6],
58 [7, 8],
59 ],
60 names=["$1", "$2"],
61 )
62 cur.executemany("INSERT INTO example VALUES ($1, $2)", data)
63
64 cur.execute("SELECT SUM(ints) FROM example")
65 assert cur.fetchone() == (15,)
66
67conn.close()
执行不带 COPY 的语句¶
指南来源:postgresql_execute_nocopy.py
ADBC 驱动程序默认尝试使用 COPY 执行查询,因为它对于大型结果集更快。但是,PostgreSQL 不支持所有类型的查询的 COPY。例如,SHOW 查询将不起作用。在这种情况下,您可以显式禁用 COPY 优化。
27import os
28
29import adbc_driver_postgresql.dbapi
30
31uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
32conn = adbc_driver_postgresql.dbapi.connect(uri)
可以在创建游标时设置该选项
36with conn.cursor(
37 adbc_stmt_kwargs={
38 adbc_driver_postgresql.StatementOptions.USE_COPY.value: False,
39 }
40) as cur:
41 cur.execute("SHOW ALL")
42 print(cur.fetch_arrow_table().schema)
或者可以在之后设置
50with conn.cursor() as cur:
51 cur.adbc_statement.set_options(
52 **{
53 adbc_driver_postgresql.StatementOptions.USE_COPY.value: False,
54 }
55 )
56 cur.execute("SHOW ALL")
57 print(cur.fetch_arrow_table().schema)
如果没有该选项,查询将失败,因为驱动程序尝试使用 COPY 执行查询
66with conn.cursor() as cur:
67 try:
68 cur.execute("SHOW ALL")
69 except conn.Error:
70 pass
71 else:
72 raise RuntimeError("Expected error")
73
74conn.close()
name: string
setting: string
description: string
name: string
setting: string
description: string
获取表的 Arrow 模式¶
指南来源:postgresql_get_table_schema.py
ADBC 允许您将表的模式作为 Arrow 模式获取。
24import os
25
26import pyarrow
27
28import adbc_driver_postgresql.dbapi
29
30uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
31conn = adbc_driver_postgresql.dbapi.connect(uri)
我们将创建一些示例表进行测试。
34with conn.cursor() as cur:
35 cur.execute("DROP TABLE IF EXISTS example")
36 cur.execute("CREATE TABLE example (ints INT, bigints BIGINT)")
37
38 cur.execute("CREATE SCHEMA IF NOT EXISTS other_schema")
39 cur.execute("DROP TABLE IF EXISTS other_schema.example")
40 cur.execute("CREATE TABLE other_schema.example (strings TEXT, values INT)")
41
42conn.commit()
默认情况下,假定“活动”目录/模式。
45assert conn.adbc_get_table_schema("example") == pyarrow.schema(
46 [
47 ("ints", "int32"),
48 ("bigints", "int64"),
49 ]
50)
我们可以显式指定 PostgreSQL 模式,以获取不同命名空间中表的 Arrow 模式。
注意
在 PostgreSQL 中,您只能查询连接到的数据库(目录)。因此我们不能在此处指定目录(或者说,这样做没有意义)。
请注意,NUMERIC 列被读取为字符串,因为 PostgreSQL 小数不映射到 Arrow 小数。
61assert conn.adbc_get_table_schema(
62 "example",
63 db_schema_filter="other_schema",
64) == pyarrow.schema(
65 [
66 ("strings", "string"),
67 ("values", "int32"),
68 ]
69)
70
71conn.close()
获取查询的 Arrow 模式¶
指南来源:postgresql_get_query_schema.py
ADBC 允许您获取结果集的模式,而无需执行查询。
24import os
25
26import pyarrow
27
28import adbc_driver_postgresql.dbapi
29
30uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
31conn = adbc_driver_postgresql.dbapi.connect(uri)
我们将创建一个示例表进行测试。
34with conn.cursor() as cur:
35 cur.execute("DROP TABLE IF EXISTS example")
36 cur.execute("CREATE TABLE example (ints INT, bigints BIGINT)")
37
38conn.commit()
39
40expected = pyarrow.schema(
41 [
42 ("ints", "int32"),
43 ("bigints", "int64"),
44 ]
45)
46
47with conn.cursor() as cur:
48 assert cur.adbc_execute_schema("SELECT * FROM example") == expected
PostgreSQL 在这里不知道类型,所以它只是返回一个猜测。
51 assert cur.adbc_execute_schema("SELECT $1 AS res") == pyarrow.schema(
52 [
53 ("res", "string"),
54 ]
55 )
56
57conn.close()
列出目录、模式和表¶
指南来源:postgresql_list_catalogs.py
ADBC 允许列出数据库中的表、目录和模式。
24import os
25
26import adbc_driver_postgresql.dbapi
27
28uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
29conn = adbc_driver_postgresql.dbapi.connect(uri)
我们将创建一个示例表来查找。
32with conn.cursor() as cur:
33 cur.execute("DROP TABLE IF EXISTS example")
34 cur.execute("CREATE TABLE example (ints INT, bigints BIGINT)")
35
36conn.commit()
数据以 PyArrow RecordBatchReader 的形式给出。
39objects = conn.adbc_get_objects(depth="all").read_all()
为了方便起见,我们将其转换为纯 Python 数据。
42objects = objects.to_pylist()
43catalog = objects[0]
44assert catalog["catalog_name"] == "postgres"
45
46db_schema = catalog["catalog_db_schemas"][0]
47assert db_schema["db_schema_name"] == "public"
48
49tables = db_schema["db_schema_tables"]
50example = [table for table in tables if table["table_name"] == "example"]
51assert len(example) == 1
52example = example[0]
53
54assert example["table_columns"][0]["column_name"] == "ints"
55assert example["table_columns"][1]["column_name"] == "bigints"
56
57conn.close()
使用 SQLAlchemy 进行连接池¶
指南来源:postgresql_pool.py
ADBC 不实现连接池,因为这通常不是 DBAPI 驱动程序的特性。相反,请使用第三方连接池,例如 SQLAlchemy 中内置的连接池。
28import os
29
30import sqlalchemy.pool
31
32import adbc_driver_postgresql.dbapi
33
34uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
35
36source = adbc_driver_postgresql.dbapi.connect(uri)
adbc_driver_manager.dbapi.Connection.adbc_clone() 从现有连接打开新连接,尽可能共享内部资源。例如,PostgreSQL 驱动程序将共享内部 OID 缓存,从而节省一些连接开销。
41pool = sqlalchemy.pool.QueuePool(source.adbc_clone, max_overflow=1, pool_size=2)
现在我们可以从连接池中获取连接;SQLAlchemy 覆盖了 close() 以将连接返回到连接池。
注意
与底层的 ADBC 连接不同,SQLAlchemy 的包装器不支持上下文管理器协议。
49conn = pool.connect()
50
51assert pool.checkedin() == 0
52assert pool.checkedout() == 1
53
54with conn.cursor() as cur:
55 cur.execute("SELECT 1")
56 assert cur.fetchone() == (1,)
57
58conn.close()
59
60assert pool.checkedin() == 1
61assert pool.checkedout() == 0
62
63source.close()
使用 Pandas 和 ADBC¶
指南来源:postgresql_pandas.py
ADBC 集成到 pandas 中,这是一个流行的 DataFrame 库。Pandas 可以使用 ADBC 与 PostgreSQL 和其他数据库交换数据。与使用 SQLAlchemy 或其他选项相比,将 ADBC 与 pandas 结合使用可以获得更好的性能,例如避免过多地转换为和从 Python 对象。
30import os
31
32import pandas as pd
33
34import adbc_driver_postgresql.dbapi
35
36uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
37conn = adbc_driver_postgresql.dbapi.connect(uri)
我们将使用 pd.DataFrame.to_sql 创建一个示例表。
42data = pd.DataFrame(
43 {
44 "ints": [1, 2, None, 4],
45 "strs": ["a", "b", "c", "d"],
46 }
47)
48data.to_sql("example", conn, if_exists="replace")
49conn.commit()
创建表后,我们可以将 ADBC 连接和 SQL 查询传递给 pd.read_sql,以将结果集作为 pandas DataFrame 获取。
55df = pd.read_sql("SELECT * FROM example WHERE ints > 1", conn)
56
57assert len(df) == 2
58
59conn.close()
与 ADBC 接口相比,pandas 提供了更方便、更高级别的 API,特别是对于那些已经使用 pandas 的人。
使用 Polars 和 ADBC¶
指南来源:postgresql_polars.py
ADBC 可以与 Polars 配合使用,这是一个用 Rust 编写的 DataFrame 库。根据其文档
如果后端支持直接返回 Arrow 数据,则将使用此功能高效地实例化 DataFrame;否则,DataFrame 将从行式数据初始化。
显然,ADBC 直接返回 Arrow 数据,这使得 ADBC 和 Polars 自然而然地相互匹配。
34import os
35
36import polars as pl
37
38uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
我们将使用 Polars 创建一个示例表,使用 polars.DataFrame.write_database()。我们不需要自己用 Polars 打开 ADBC 连接。
44data = pl.DataFrame(
45 {
46 "ints": [1, 2, None, 4],
47 "strs": ["a", "b", "c", "d"],
48 }
49)
50data.write_database("example", uri, engine="adbc", if_table_exists="replace")
创建表后,我们可以使用 polars.read_database_uri() 获取结果。同样,我们只需传递 URI 并告诉 Polars 为我们管理 ADBC。
56df = pl.read_database_uri("SELECT * FROM example WHERE ints > 1", uri, engine="adbc")
57
58assert len(df) == 2