文档菜单
文档首页
/ / /
PyMongoArrow

快速入门

在本页

  • 先决条件
  • 扩展PyMongo
  • 测试数据
  • 定义模式
  • 查找操作
  • 聚合操作
  • 写入MongoDB
  • 忽略空值
  • 写入其他格式

本教程旨在作为使用 PyMongoArrow 的入门介绍。教程假设读者熟悉基本PyMongo 以及 MongoDB 概念。

确保您有 PyMongoArrow 发行版安装。在 Python 命令行中,以下命令应该能够正常运行而不会引发异常

>>> import pymongoarrow as pma

本教程还假设 MongoDB 实例正在默认主机和端口上运行。在您下载并安装 MongoDB 后,您可以按照以下代码示例启动它

$ mongod

pymongoarrow.monkey 模块提供了一种接口,用于就地修补 PyMongo,并将 PyMongoArrow 功能直接添加到 Collection 实例。

from pymongoarrow.monkey import patch_all
patch_all()

在运行 monkey.patch_all() 方法后,新的 Collection 类实例将包含 PyMongoArrow API -- 例如,pymongoarrow.api.find_pandas_all() 方法。

注意

您还可以通过从 pymongoarrow.api 模块导入来使用任何 PyMongoArrow API。如果这样做,您必须在调用 API 方法时将要在其上执行操作的 Collection 实例作为第一个参数传递。

以下代码使用 PyMongo 向您的集群添加示例数据

from datetime import datetime
from pymongo import MongoClient
client = MongoClient()
client.db.data.insert_many([
{'_id': 1, 'amount': 21, 'last_updated': datetime(2020, 12, 10, 1, 3, 1), 'account': {'name': 'Customer1', 'account_number': 1}, 'txns': ['A']},
{'_id': 2, 'amount': 16, 'last_updated': datetime(2020, 7, 23, 6, 7, 11), 'account': {'name': 'Customer2', 'account_number': 2}, 'txns': ['A', 'B']},
{'_id': 3, 'amount': 3, 'last_updated': datetime(2021, 3, 10, 18, 43, 9), 'account': {'name': 'Customer3', 'account_number': 3}, 'txns': ['A', 'B', 'C']},
{'_id': 4, 'amount': 0, 'last_updated': datetime(2021, 2, 25, 3, 50, 31), 'account': {'name': 'Customer4', 'account_number': 4}, 'txns': ['A', 'B', 'C', 'D']}])

PyMongoArrow 依赖于数据模式将查询结果集编入表格形式。如果您不提供此模式,PyMongoArrow 将从数据中推断出一个。您可以通过创建一个 Schema 对象并将字段名称映射到类型说明符来定义模式,如下面的示例所示

from pymongoarrow.api import Schema
schema = Schema({'_id': int, 'amount': float, 'last_updated': datetime})

MongoDB 使用嵌入式文档来表示嵌套数据。PyMongoArrow 为这些文档提供了一等支持

schema = Schema({'_id': int, 'amount': float, 'account': { 'name': str, 'account_number': int}})

PyMongoArrow 还支持列表和嵌套列表

from pyarrow import list_, string
schema = Schema({'txns': list_(string())})
polars_df = client.db.data.find_polars_all({'amount': {'$gt': 0}}, schema=schema)

提示

PyMongoArrow 为每个支持的 BSON 类型包含多个允许的类型标识符。有关这些数据类型及其关联的类型标识符的完整列表,请参阅 数据类型。

以下代码示例展示了如何将所有具有非零值 amount 字段的记录加载为一个 pandas.DataFrame 对象

df = client.db.data.find_pandas_all({'amount': {'$gt': 0}}, schema=schema)

您还可以将相同的结果集加载为一个 pyarrow.Table 实例

arrow_table = client.db.data.find_arrow_all({'amount': {'$gt': 0}}, schema=schema)

或者作为一个 polars.DataFrame 实例

df = client.db.data.find_polars_all({'amount': {'$gt': 0}}, schema=schema)

或者作为一个 NumPy arrays 对象

ndarrays = client.db.data.find_numpy_all({'amount': {'$gt': 0}}, schema=schema)

当使用 NumPy 时,返回值是一个字典,其中键是字段名称,值是相应的 numpy.ndarray 实例。

注意

在所有前面的示例中,您可以省略模式,如下例所示

arrow_table = client.db.data.find_arrow_all({'amount': {'$gt': 0}})

如果您省略了模式,PyMongoArrow 将尝试根据第一个批次中的数据自动应用模式。

执行聚合操作类似于执行查找操作,但它需要一系列操作来执行。

以下是一个简单的 aggregate_pandas_all() 方法示例,该方法输出一个新数据框,其中所有 _id 值都分组在一起,其 amount 值求和

df = client.db.data.aggregate_pandas_all([{'$group': {'_id': None, 'total_amount': { '$sum': '$amount' }}}])

您还可以对嵌入式文档执行聚合操作。以下示例展开嵌套的 txn 字段中的值,计算每个值的数量,然后以降序排列返回结果作为 NumPy ndarray 对象列表

pipeline = [{'$unwind': '$txns'}, {'$group': {'_id': '$txns', 'count': {'$sum': 1}}}, {'$sort': {"count": -1}}]
ndarrays = client.db.data.aggregate_numpy_all(pipeline)

提示

有关聚合管道的更多信息,请参阅 MongoDB 服务器文档。

您可以使用 write() 方法将以下类型的对象写入MongoDB

  • 箭头 Table

  • Pandas DataFrame

  • NumPy ndarray

  • Polars DataFrame

from pymongoarrow.api import write
from pymongo import MongoClient
coll = MongoClient().db.my_collection
write(coll, df)
write(coll, arrow_table)
write(coll, ndarrays)

注意

NumPy数组指定为 dict[str, ndarray]

write() 方法可选地接受一个布尔型 exclude_none 参数。如果您将此参数设置为 True,则驱动程序不会将空值写入数据库。如果您将此参数设置为 False 或将其保留为空白,则驱动程序将为每个空字段写入 None

以下示例代码将一个 Arrow Table 写入 MongoDB 两次。其中 'b' 字段中的一个值被设置为 None

第一次调用 write() 方法省略了 exclude_none 参数,因此默认为 False。该 Table 中的所有值,包括 None,都被写入数据库。第二次调用 write() 方法将 exclude_none 设置为 True,因此 'b' 字段中的空值被忽略。

data_a = [1, 2, 3]
data_b = [1, None, 3]
data = Table.from_pydict(
{
"a": data_a,
"b": data_b,
},
)
coll.drop()
write(coll, data)
col_data = list(coll.find({}))
coll.drop()
write(coll, data, exclude_none=True)
col_data_exclude_none = list(coll.find({}))
print(col_data)
print(col_data_exclude_none)
{'_id': ObjectId('...'), 'a': 1, 'b': 1}
{'_id': ObjectId('...'), 'a': 2, 'b': None}
{'_id': ObjectId('...'), 'a': 3, 'b': 3}
{'_id': ObjectId('...'), 'a': 1, 'b': 1}
{'_id': ObjectId('...'), 'a': 2}
{'_id': ObjectId('...'), 'a': 3, 'b': 3}

一旦加载了结果集,就可以将它们写入该包支持的任何格式。

例如,要将变量 arrow_table 指引用的表写入名为 example.parquet 的 Parquet 文件,请运行以下代码

import pyarrow.parquet as pq
pq.write_table(arrow_table, 'example.parquet')

Pandas 还支持将 DataFrame 实例写入各种格式,包括 CSV 和 HDF。要将变量 df 指引用的数据帧写入名为 out.csv 的 CSV 文件,请运行以下代码

df.to_csv('out.csv', index=False)

Polars API 是前两个示例的混合

import polars as pl
df = pl.DataFrame({"foo": [1, 2, 3, 4, 5]})
df.write_parquet('example.parquet')

注意

支持嵌套数据在 Parquet 读写操作中,但 Arrow 或 Pandas 对 CSV 读写操作的支持不佳。

返回

安装和升级

© . All rights reserved.