快速入门
本教程旨在作为使用 PyMongoArrow 的入门介绍。教程假设读者熟悉基本PyMongo 以及 MongoDB 概念。
先决条件
确保您有 PyMongoArrow 发行版安装。在 Python 命令行中,以下命令应该能够正常运行而不会引发异常
import pymongoarrow as pma
本教程还假设 MongoDB 实例正在默认主机和端口上运行。在您下载并安装 MongoDB 后,您可以按照以下代码示例启动它
$ mongod
扩展 PyMongo
的pymongoarrow.monkey
模块提供了一种接口,用于就地修补 PyMongo,并将 PyMongoArrow 功能直接添加到 Collection
实例。
from pymongoarrow.monkey import patch_all patch_all()
在运行 monkey.patch_all()
方法后,新的 Collection
类实例将包含 PyMongoArrow API -- 例如,pymongoarrow.api.find_pandas_all()
方法。
注意
您还可以通过从 pymongoarrow.api
模块导入来使用任何 PyMongoArrow API。如果这样做,您必须在调用 API 方法时将要在其上执行操作的 Collection
实例作为第一个参数传递。
测试数据
以下代码使用 PyMongo 向您的集群添加示例数据
from datetime import datetime from pymongo import MongoClient client = MongoClient() client.db.data.insert_many([ {'_id': 1, 'amount': 21, 'last_updated': datetime(2020, 12, 10, 1, 3, 1), 'account': {'name': 'Customer1', 'account_number': 1}, 'txns': ['A']}, {'_id': 2, 'amount': 16, 'last_updated': datetime(2020, 7, 23, 6, 7, 11), 'account': {'name': 'Customer2', 'account_number': 2}, 'txns': ['A', 'B']}, {'_id': 3, 'amount': 3, 'last_updated': datetime(2021, 3, 10, 18, 43, 9), 'account': {'name': 'Customer3', 'account_number': 3}, 'txns': ['A', 'B', 'C']}, {'_id': 4, 'amount': 0, 'last_updated': datetime(2021, 2, 25, 3, 50, 31), 'account': {'name': 'Customer4', 'account_number': 4}, 'txns': ['A', 'B', 'C', 'D']}])
定义模式
PyMongoArrow 依赖于数据模式将查询结果集编入表格形式。如果您不提供此模式,PyMongoArrow 将从数据中推断出一个。您可以通过创建一个 Schema
对象并将字段名称映射到类型说明符来定义模式,如下面的示例所示
from pymongoarrow.api import Schema schema = Schema({'_id': int, 'amount': float, 'last_updated': datetime})
MongoDB 使用嵌入式文档来表示嵌套数据。PyMongoArrow 为这些文档提供了一等支持
schema = Schema({'_id': int, 'amount': float, 'account': { 'name': str, 'account_number': int}})
PyMongoArrow 还支持列表和嵌套列表
from pyarrow import list_, string schema = Schema({'txns': list_(string())}) polars_df = client.db.data.find_polars_all({'amount': {'$gt': 0}}, schema=schema)
提示
PyMongoArrow 为每个支持的 BSON 类型包含多个允许的类型标识符。有关这些数据类型及其关联的类型标识符的完整列表,请参阅 数据类型。
查找操作
以下代码示例展示了如何将所有具有非零值 amount
字段的记录加载为一个 pandas.DataFrame
对象
df = client.db.data.find_pandas_all({'amount': {'$gt': 0}}, schema=schema)
您还可以将相同的结果集加载为一个 pyarrow.Table
实例
arrow_table = client.db.data.find_arrow_all({'amount': {'$gt': 0}}, schema=schema)
或者作为一个 polars.DataFrame
实例
df = client.db.data.find_polars_all({'amount': {'$gt': 0}}, schema=schema)
或者作为一个 NumPy arrays
对象
ndarrays = client.db.data.find_numpy_all({'amount': {'$gt': 0}}, schema=schema)
当使用 NumPy 时,返回值是一个字典,其中键是字段名称,值是相应的 numpy.ndarray
实例。
注意
在所有前面的示例中,您可以省略模式,如下例所示
arrow_table = client.db.data.find_arrow_all({'amount': {'$gt': 0}})
如果您省略了模式,PyMongoArrow 将尝试根据第一个批次中的数据自动应用模式。
聚合操作
执行聚合操作类似于执行查找操作,但它需要一系列操作来执行。
以下是一个简单的 aggregate_pandas_all()
方法示例,该方法输出一个新数据框,其中所有 _id
值都分组在一起,其 amount
值求和
df = client.db.data.aggregate_pandas_all([{'$group': {'_id': None, 'total_amount': { '$sum': '$amount' }}}])
您还可以对嵌入式文档执行聚合操作。以下示例展开嵌套的 txn
字段中的值,计算每个值的数量,然后以降序排列返回结果作为 NumPy ndarray
对象列表
pipeline = [{'$unwind': '$txns'}, {'$group': {'_id': '$txns', 'count': {'$sum': 1}}}, {'$sort': {"count": -1}}] ndarrays = client.db.data.aggregate_numpy_all(pipeline)
提示
有关聚合管道的更多信息,请参阅 MongoDB 服务器文档。
写入MongoDB
您可以使用 write()
方法将以下类型的对象写入MongoDB
箭头
Table
Pandas
DataFrame
NumPy
ndarray
Polars
DataFrame
from pymongoarrow.api import write from pymongo import MongoClient coll = MongoClient().db.my_collection write(coll, df) write(coll, arrow_table) write(coll, ndarrays)
注意
NumPy数组指定为 dict[str, ndarray]
。
忽略空值
write()
方法可选地接受一个布尔型 exclude_none
参数。如果您将此参数设置为 True
,则驱动程序不会将空值写入数据库。如果您将此参数设置为 False
或将其保留为空白,则驱动程序将为每个空字段写入 None
。
以下示例代码将一个 Arrow Table
写入 MongoDB 两次。其中 'b'
字段中的一个值被设置为 None
。
第一次调用 write()
方法省略了 exclude_none
参数,因此默认为 False
。该 Table
中的所有值,包括 None
,都被写入数据库。第二次调用 write()
方法将 exclude_none
设置为 True
,因此 'b'
字段中的空值被忽略。
data_a = [1, 2, 3] data_b = [1, None, 3] data = Table.from_pydict( { "a": data_a, "b": data_b, }, ) coll.drop() write(coll, data) col_data = list(coll.find({})) coll.drop() write(coll, data, exclude_none=True) col_data_exclude_none = list(coll.find({})) print(col_data) print(col_data_exclude_none)
{'_id': ObjectId('...'), 'a': 1, 'b': 1} {'_id': ObjectId('...'), 'a': 2, 'b': None} {'_id': ObjectId('...'), 'a': 3, 'b': 3} {'_id': ObjectId('...'), 'a': 1, 'b': 1} {'_id': ObjectId('...'), 'a': 2} {'_id': ObjectId('...'), 'a': 3, 'b': 3}
将数据写入其他格式
一旦加载了结果集,就可以将它们写入该包支持的任何格式。
例如,要将变量 arrow_table
指引用的表写入名为 example.parquet
的 Parquet 文件,请运行以下代码
import pyarrow.parquet as pq pq.write_table(arrow_table, 'example.parquet')
Pandas 还支持将 DataFrame
实例写入各种格式,包括 CSV 和 HDF。要将变量 df
指引用的数据帧写入名为 out.csv
的 CSV 文件,请运行以下代码
df.to_csv('out.csv', index=False)
Polars API 是前两个示例的混合
import polars as pl df = pl.DataFrame({"foo": [1, 2, 3, 4, 5]}) df.write_parquet('example.parquet')
注意
支持嵌套数据在 Parquet 读写操作中,但 Arrow 或 Pandas 对 CSV 读写操作的支持不佳。