PostgreSQL 食谱¶
使用用户名和密码进行身份验证¶
食谱来源: postgresql_authenticate.py
要连接到 PostgreSQL 数据库,必须在 URI 中提供用户名和密码。 例如,
postgresql://username:password@hostname:port/dbname
有关完整详细信息,请参阅 PostgreSQL 文档。
32import os
33
34import adbc_driver_postgresql.dbapi
35
36uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
37conn = adbc_driver_postgresql.dbapi.connect(uri)
38
39with conn.cursor() as cur:
40 cur.execute("SELECT 1")
41 print(cur.fetchone())
42 # Output: (1,)
43
44conn.close()
(1,)
从 Arrow 数据集创建/追加到表¶
食谱来源: postgresql_create_dataset_table.py
ADBC 可以轻松地将 PyArrow 数据集加载到您的数据存储中。
24import os
25import tempfile
26from pathlib import Path
27
28import pyarrow
29import pyarrow.csv
30import pyarrow.dataset
31import pyarrow.feather
32import pyarrow.parquet
33
34import adbc_driver_postgresql.dbapi
35
36uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
37conn = adbc_driver_postgresql.dbapi.connect(uri)
为了测试,我们将首先确保要使用的表不存在。
41with conn.cursor() as cur:
42 cur.execute("DROP TABLE IF EXISTS csvtable")
43 cur.execute("DROP TABLE IF EXISTS ipctable")
44 cur.execute("DROP TABLE IF EXISTS pqtable")
45 cur.execute("DROP TABLE IF EXISTS csvdataset")
46 cur.execute("DROP TABLE IF EXISTS ipcdataset")
47 cur.execute("DROP TABLE IF EXISTS pqdataset")
48
49conn.commit()
生成示例数据¶
54tempdir = tempfile.TemporaryDirectory(
55 prefix="adbc-docs-",
56 ignore_cleanup_errors=True,
57)
58root = Path(tempdir.name)
59table = pyarrow.table(
60 [
61 [1, 1, 2],
62 ["foo", "bar", "baz"],
63 ],
64 names=["ints", "strs"],
65)
首先我们将写入单个文件。
69csv_file = root / "example.csv"
70pyarrow.csv.write_csv(table, csv_file)
71
72ipc_file = root / "example.arrow"
73pyarrow.feather.write_feather(table, ipc_file)
74
75parquet_file = root / "example.parquet"
76pyarrow.parquet.write_table(table, parquet_file)
我们还将生成一些分区数据集。
80csv_dataset = root / "csv_dataset"
81pyarrow.dataset.write_dataset(
82 table,
83 csv_dataset,
84 format="csv",
85 partitioning=["ints"],
86)
87
88ipc_dataset = root / "ipc_dataset"
89pyarrow.dataset.write_dataset(
90 table,
91 ipc_dataset,
92 format="feather",
93 partitioning=["ints"],
94)
95
96parquet_dataset = root / "parquet_dataset"
97pyarrow.dataset.write_dataset(
98 table,
99 parquet_dataset,
100 format="parquet",
101 partitioning=["ints"],
102)
将 CSV 文件加载到 PostgreSQL¶
我们可以直接将 pyarrow.RecordBatchReader
(来自 open_csv
) 传递给 adbc_ingest
。 我们也可以传递 pyarrow.dataset.Dataset
或 pyarrow.dataset.Scanner
。
112with conn.cursor() as cur:
113 reader = pyarrow.csv.open_csv(csv_file)
114 cur.adbc_ingest("csvtable", reader, mode="create")
115
116 reader = pyarrow.dataset.dataset(
117 csv_dataset,
118 format="csv",
119 partitioning=["ints"],
120 )
121 cur.adbc_ingest("csvdataset", reader, mode="create")
122
123conn.commit()
124
125with conn.cursor() as cur:
126 cur.execute("SELECT ints, strs FROM csvtable ORDER BY ints, strs ASC")
127 assert cur.fetchall() == [(1, "bar"), (1, "foo"), (2, "baz")]
128
129 cur.execute("SELECT ints, strs FROM csvdataset ORDER BY ints, strs ASC")
130 assert cur.fetchall() == [(1, "bar"), (1, "foo"), (2, "baz")]
将 Arrow IPC (Feather) 文件加载到 PostgreSQL¶
135with conn.cursor() as cur:
136 reader = pyarrow.ipc.RecordBatchFileReader(ipc_file)
由于 PyArrow API 中的怪癖,我们必须将文件读入内存。
139 cur.adbc_ingest("ipctable", reader.read_all(), mode="create")
但是,Dataset API 会将数据流式传输到内存中,然后再传输到 PostgreSQL 中。
143 reader = pyarrow.dataset.dataset(
144 ipc_dataset,
145 format="feather",
146 partitioning=["ints"],
147 )
148 cur.adbc_ingest("ipcdataset", reader, mode="create")
149
150conn.commit()
151
152with conn.cursor() as cur:
153 cur.execute("SELECT ints, strs FROM ipctable ORDER BY ints, strs ASC")
154 assert cur.fetchall() == [(1, "bar"), (1, "foo"), (2, "baz")]
155
156 cur.execute("SELECT ints, strs FROM ipcdataset ORDER BY ints, strs ASC")
157 assert cur.fetchall() == [(1, "bar"), (1, "foo"), (2, "baz")]
将 Parquet 文件加载到 PostgreSQL¶
162with conn.cursor() as cur:
163 reader = pyarrow.parquet.ParquetFile(parquet_file)
164 cur.adbc_ingest("pqtable", reader.iter_batches(), mode="create")
165
166 reader = pyarrow.dataset.dataset(
167 parquet_dataset,
168 format="parquet",
169 partitioning=["ints"],
170 )
171 cur.adbc_ingest("pqdataset", reader, mode="create")
172
173conn.commit()
174
175with conn.cursor() as cur:
176 cur.execute("SELECT ints, strs FROM pqtable ORDER BY ints, strs ASC")
177 assert cur.fetchall() == [(1, "bar"), (1, "foo"), (2, "baz")]
178
179 cur.execute("SELECT ints, strs FROM pqdataset ORDER BY ints, strs ASC")
180 assert cur.fetchall() == [(1, "bar"), (1, "foo"), (2, "baz")]
清理¶
185conn.close()
186tempdir.cleanup()
从 Arrow 表创建/追加到表¶
食谱来源: postgresql_create_append_table.py
ADBC 允许使用 Arrow 表创建和追加到数据库表。
24import os
25
26import pyarrow
27
28import adbc_driver_postgresql.dbapi
29
30uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
31conn = adbc_driver_postgresql.dbapi.connect(uri)
为了测试,我们将首先确保要使用的表不存在。
35with conn.cursor() as cur:
36 cur.execute("DROP TABLE IF EXISTS example")
37 cur.execute("DROP TABLE IF EXISTS example2")
现在我们可以创建表。
40with conn.cursor() as cur:
41 data = pyarrow.table(
42 [
43 [1, 2, None, 4],
44 ],
45 schema=pyarrow.schema(
46 [
47 ("ints", "int32"),
48 ]
49 ),
50 )
51 cur.adbc_ingest("example", data, mode="create")
52
53conn.commit()
摄取后,我们可以获取结果。
56with conn.cursor() as cur:
57 cur.execute("SELECT * FROM example")
58 assert cur.fetchone() == (1,)
59 assert cur.fetchone() == (2,)
60
61 cur.execute("SELECT COUNT(*) FROM example")
62 assert cur.fetchone() == (4,)
如果我们尝试再次摄取,它将失败,因为该表已存在。
66with conn.cursor() as cur:
67 try:
68 cur.adbc_ingest("example", data, mode="create")
69 except conn.ProgrammingError:
70 pass
71 else:
72 raise RuntimeError("Should have failed!")
73
74conn.rollback()
相反,我们可以追加到表。
77with conn.cursor() as cur:
78 cur.adbc_ingest("example", data, mode="append")
79
80 cur.execute("SELECT COUNT(*) FROM example")
81 assert cur.fetchone() == (8,)
我们也可以选择在表不存在时创建表,否则追加。
86with conn.cursor() as cur:
87 cur.adbc_ingest("example2", data, mode="create_append")
88
89 cur.execute("SELECT COUNT(*) FROM example2")
90 assert cur.fetchone() == (4,)
91
92 cur.adbc_ingest("example2", data, mode="create_append")
93
94 cur.execute("SELECT COUNT(*) FROM example2")
95 assert cur.fetchone() == (8,)
最后,我们可以替换表。
99with conn.cursor() as cur:
100 cur.adbc_ingest("example", data.slice(0, 2), mode="replace")
101
102 cur.execute("SELECT COUNT(*) FROM example")
103 assert cur.fetchone() == (2,)
104
105conn.close()
创建/追加到临时表¶
食谱来源: postgresql_create_temp_table.py
ADBC 允许创建和追加到临时表。
23import os
24
25import pyarrow
26
27import adbc_driver_postgresql.dbapi
28
29uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
30conn = adbc_driver_postgresql.dbapi.connect(uri)
为了测试,我们将首先确保要使用的表不存在。
34with conn.cursor() as cur:
35 cur.execute("DROP TABLE IF EXISTS example")
要创建临时表,只需指定选项“temporary”。
38data = pyarrow.table(
39 [
40 [1, 2, None, 4],
41 ],
42 schema=pyarrow.schema(
43 [
44 ("ints", "int32"),
45 ]
46 ),
47)
48
49with conn.cursor() as cur:
50 cur.adbc_ingest("example", data, mode="create", temporary=True)
51
52conn.commit()
摄取后,我们可以获取结果。
55with conn.cursor() as cur:
56 cur.execute("SELECT * FROM example")
57 assert cur.fetchone() == (1,)
58 assert cur.fetchone() == (2,)
59
60 cur.execute("SELECT COUNT(*) FROM example")
61 assert cur.fetchone() == (4,)
临时表与常规表是分开的,即使它们具有相同的名称。
66with conn.cursor() as cur:
67 cur.adbc_ingest("example", data.slice(0, 2), mode="create", temporary=False)
68
69conn.commit()
70
71with conn.cursor() as cur:
因为我们有两个具有相同名称的表,所以我们必须在此处显式引用正常的临时表。
74 cur.execute("SELECT COUNT(*) FROM public.example")
75 assert cur.fetchone() == (2,)
76
77 cur.execute("SELECT COUNT(*) FROM example")
78 assert cur.fetchone() == (4,)
79
80conn.close()
关闭连接后,临时表会被隐式删除。 如果我们重新连接,则该表将不存在; 我们只会看到“normal”表。
85with adbc_driver_postgresql.dbapi.connect(uri) as conn:
86 with conn.cursor() as cur:
87 cur.execute("SELECT COUNT(*) FROM example")
88 assert cur.fetchone() == (2,)
所有常规的摄取选项也适用于临时表。 有关更多示例,请参阅 从 Arrow 数据集创建/追加到表。
执行带有绑定参数的语句¶
食谱来源: postgresql_execute_bind.py
ADBC 允许使用 Python 和 Arrow 值作为绑定参数。 目前,PostgreSQL 驱动程序仅支持不生成结果集的查询的绑定参数。
26import os
27
28import pyarrow
29
30import adbc_driver_postgresql.dbapi
31
32uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
33conn = adbc_driver_postgresql.dbapi.connect(uri)
我们将创建一个示例表进行测试。
36with conn.cursor() as cur:
37 cur.execute("DROP TABLE IF EXISTS example")
38 cur.execute("CREATE TABLE example (ints INT, bigints BIGINT)")
39
40conn.commit()
我们可以绑定 Python 值
43with conn.cursor() as cur:
44 cur.executemany("INSERT INTO example VALUES ($1, $2)", [(1, 2), (3, 4)])
45
46 cur.execute("SELECT SUM(ints) FROM example")
47 assert cur.fetchone() == (4,)
注意
如果您习惯于格式字符串风格 %s
语法(例如 psycopg 等库用于绑定参数),请注意,这不受支持 - 仅支持 PostgreSQL 原生的 $1
语法。
我们还可以绑定 Arrow 值
54with conn.cursor() as cur:
55 data = pyarrow.record_batch(
56 [
57 [5, 6],
58 [7, 8],
59 ],
60 names=["$1", "$2"],
61 )
62 cur.executemany("INSERT INTO example VALUES ($1, $2)", data)
63
64 cur.execute("SELECT SUM(ints) FROM example")
65 assert cur.fetchone() == (15,)
66
67conn.close()
获取表的 Arrow 模式¶
食谱来源: postgresql_get_table_schema.py
ADBC 允许您获取表的模式作为 Arrow 模式。
24import os
25
26import pyarrow
27
28import adbc_driver_postgresql.dbapi
29
30uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
31conn = adbc_driver_postgresql.dbapi.connect(uri)
我们将创建一些示例表进行测试。
34with conn.cursor() as cur:
35 cur.execute("DROP TABLE IF EXISTS example")
36 cur.execute("CREATE TABLE example (ints INT, bigints BIGINT)")
37
38 cur.execute("CREATE SCHEMA IF NOT EXISTS other_schema")
39 cur.execute("DROP TABLE IF EXISTS other_schema.example")
40 cur.execute("CREATE TABLE other_schema.example (strings TEXT, values INT)")
41
42conn.commit()
默认情况下,假设“active”目录/模式。
45assert conn.adbc_get_table_schema("example") == pyarrow.schema(
46 [
47 ("ints", "int32"),
48 ("bigints", "int64"),
49 ]
50)
我们可以显式指定 PostgreSQL 模式,以获取不同命名空间中表的 Arrow 模式。
注意
在 PostgreSQL 中,您只能查询您连接到的数据库(目录)。 所以我们不能在这里指定目录(或者更确切地说,这样做没有意义)。
请注意,NUMERIC 列被读取为字符串,因为 PostgreSQL 十进制数不映射到 Arrow 十进制数。
61assert conn.adbc_get_table_schema(
62 "example",
63 db_schema_filter="other_schema",
64) == pyarrow.schema(
65 [
66 ("strings", "string"),
67 ("values", "int32"),
68 ]
69)
70
71conn.close()
获取查询的 Arrow 模式¶
食谱来源: postgresql_get_query_schema.py
ADBC 允许您获取结果集的模式,而无需执行查询。
24import os
25
26import pyarrow
27
28import adbc_driver_postgresql.dbapi
29
30uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
31conn = adbc_driver_postgresql.dbapi.connect(uri)
我们将创建一个示例表进行测试。
34with conn.cursor() as cur:
35 cur.execute("DROP TABLE IF EXISTS example")
36 cur.execute("CREATE TABLE example (ints INT, bigints BIGINT)")
37
38conn.commit()
39
40expected = pyarrow.schema(
41 [
42 ("ints", "int32"),
43 ("bigints", "int64"),
44 ]
45)
46
47with conn.cursor() as cur:
48 assert cur.adbc_execute_schema("SELECT * FROM example") == expected
PostgreSQL 在这里不知道类型,所以它只是返回一个猜测。
51 assert cur.adbc_execute_schema("SELECT $1 AS res") == pyarrow.schema(
52 [
53 ("res", "string"),
54 ]
55 )
56
57conn.close()
列出目录、模式和表¶
食谱来源: postgresql_list_catalogs.py
ADBC 允许列出数据库中的表、目录和模式。
24import os
25
26import adbc_driver_postgresql.dbapi
27
28uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
29conn = adbc_driver_postgresql.dbapi.connect(uri)
我们将创建一个示例表来查找。
32with conn.cursor() as cur:
33 cur.execute("DROP TABLE IF EXISTS example")
34 cur.execute("CREATE TABLE example (ints INT, bigints BIGINT)")
35
36conn.commit()
数据以 PyArrow RecordBatchReader 的形式给出。
39objects = conn.adbc_get_objects(depth="all").read_all()
为了方便起见,我们将其转换为纯 Python 数据。
42objects = objects.to_pylist()
43catalog = objects[0]
44assert catalog["catalog_name"] == "postgres"
45
46db_schema = catalog["catalog_db_schemas"][0]
47assert db_schema["db_schema_name"] == "public"
48
49tables = db_schema["db_schema_tables"]
50example = [table for table in tables if table["table_name"] == "example"]
51assert len(example) == 1
52example = example[0]
53
54assert example["table_columns"][0]["column_name"] == "ints"
55assert example["table_columns"][1]["column_name"] == "bigints"
56
57conn.close()
使用 SQLAlchemy 进行连接池¶
食谱来源: postgresql_pool.py
ADBC 不实现连接池,因为这通常不是 DBAPI 驱动程序的功能。 而是使用第三方连接池,例如内置于 SQLAlchemy 中的连接池。
28import os
29
30import sqlalchemy.pool
31
32import adbc_driver_postgresql.dbapi
33
34uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
35
36source = adbc_driver_postgresql.dbapi.connect(uri)
adbc_driver_manager.dbapi.Connection.adbc_clone()
从现有连接打开新连接,尽可能共享内部资源。 例如,PostgreSQL 驱动程序将共享内部 OID 缓存,从而节省一些连接开销。
41pool = sqlalchemy.pool.QueuePool(source.adbc_clone, max_overflow=1, pool_size=2)
我们现在可以从池中获取连接; SQLAlchemy 覆盖 close()
以将连接返回到池。
注意
与底层 ADBC 连接不同,SQLAlchemy 的包装器不支持上下文管理器协议。
49conn = pool.connect()
50
51assert pool.checkedin() == 0
52assert pool.checkedout() == 1
53
54with conn.cursor() as cur:
55 cur.execute("SELECT 1")
56 assert cur.fetchone() == (1,)
57
58conn.close()
59
60assert pool.checkedin() == 1
61assert pool.checkedout() == 0
62
63source.close()
使用 Pandas 和 ADBC¶
食谱来源: postgresql_pandas.py
ADBC 已集成到流行的 dataframe 库 pandas 中。 Pandas 可以使用 ADBC 与 PostgreSQL 和其他数据库交换数据。 与使用 SQLAlchemy 或其他选项相比,将 ADBC 与 pandas 结合使用可以获得更好的性能,例如避免过度转换为 Python 对象和从 Python 对象转换。
30import os
31
32import pandas as pd
33
34import adbc_driver_postgresql.dbapi
35
36uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
37conn = adbc_driver_postgresql.dbapi.connect(uri)
我们将使用 pd.DataFrame.to_sql
来创建一个示例表。
42data = pd.DataFrame(
43 {
44 "ints": [1, 2, None, 4],
45 "strs": ["a", "b", "c", "d"],
46 }
47)
48data.to_sql("example", conn, if_exists="replace")
49conn.commit()
创建表后,我们可以将 ADBC 连接和 SQL 查询传递给 pd.read_sql
以获取结果集作为 pandas DataFrame。
55df = pd.read_sql("SELECT * FROM example WHERE ints > 1", conn)
56
57assert len(df) == 2
58
59conn.close()
与 ADBC 接口相比,pandas 提供了更方便和更高层次的 API,特别是对于那些已经使用 pandas 的人。
使用 Polars 和 ADBC¶
食谱来源: postgresql_polars.py
ADBC 可以与 Polars 一起使用,Polars 是一个用 Rust 编写的 dataframe 库。 根据其文档
如果后端支持直接返回 Arrow 数据,则将使用此工具高效地实例化 DataFrame; 否则,将从逐行数据初始化 DataFrame。
显然,ADBC 直接返回 Arrow 数据,这使得 ADBC 和 Polars 成为彼此的天然搭配。
34import os
35
36import polars as pl
37
38uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
我们将使用 Polars 的 polars.DataFrame.write_database()
创建一个示例表。 我们不需要使用 Polars 自己打开 ADBC 连接。
44data = pl.DataFrame(
45 {
46 "ints": [1, 2, None, 4],
47 "strs": ["a", "b", "c", "d"],
48 }
49)
50data.write_database("example", uri, engine="adbc", if_table_exists="replace")
创建表后,我们可以使用 polars.read_database_uri()
获取结果。 同样,我们可以只传递 URI 并告诉 Polars 为我们管理 ADBC。
56df = pl.read_database_uri("SELECT * FROM example WHERE ints > 1", uri, engine="adbc")
57
58assert len(df) == 2