文档菜单
文档首页
/
Spark 连接器
/ /

流式读取配置选项

本页内容

  • 概述
  • 更改流配置
  • connection.uri
  • 中指定多个集合的collection属性

在流模式下从 MongoDB 读取数据时,您可以配置以下属性。

注意

如果您使用SparkConf 来设置连接器的读取配置,请在每个属性前添加前缀 spark.mongodb.read.

属性名称
描述
connection.uri
必需。
连接字符串配置键。

默认值: mongodb://localhost:27017/
database
必需。
数据库名配置。
collection
必需。
集合名称配置。
您可以通过用逗号分隔集合名称来指定多个集合。

有关指定多个集合的更多信息,请参阅collection 属性中指定多个集合.
注释
要附加到读取操作的注释。注释会在数据库分析器的输出中显示。

默认值:
mode
处理与预期模式不匹配的文档时使用的解析策略。此选项接受以下值
  • ReadConfig.ParseMode.FAILFAST:解析不匹配模式的文档时抛出异常。

  • ReadConfig.ParseMode.PERMISSIVE:当数据类型不匹配模式时,将字段设置为null。要将无效文档存储为扩展JSON字符串,请将此值与columnNameOfCorruptRecord选项组合。

  • ReadConfig.ParseMode.DROPMALFORMED:忽略任何不匹配模式的文档。


默认值:ReadConfig.ParseMode.FAILFAST
columnNameOfCorruptRecord
如果您将mode选项设置为ReadConfig.ParseMode.PERMISSIVE,则此选项指定用于存储无效文档(作为扩展JSON)的新列的名称。如果您使用的是显式模式,它必须包含新列的名称。如果您使用的是推断模式,Spark连接器将新列添加到模式的末尾。

默认值:
mongoClientFactory
MongoClientFactory配置键。
您可以为自定义实现指定,该实现必须实现com.mongodb.spark.sql.connector.connection.MongoClientFactory接口。

默认值:com.mongodb.spark.sql.connector.connection.DefaultMongoClientFactory
aggregation.pipeline
指定在将数据发送到Spark之前应用于集合的自定义聚合管道。
值必须是扩展JSON单个文档或文档列表。
单个文档类似于以下内容
{"$match": {"closed": false}}
文档列表类似于以下内容
[{"$match": {"closed": false}}, {"$project": {"status": 1, "name": 1, "description": 1}}]

自定义聚合管道必须与分区策略兼容。例如,$group等聚合阶段不能与创建多个分区的任何分区器一起使用。

aggregation.allowDiskUse
指定在运行聚合时是否允许存储到磁盘。

默认值:true
change.stream.
更改流配置前缀。
有关更改流的更多信息,请参阅更改流配置部分。
outputExtendedJson
当设置为true时,连接器将Spark不支持的数据类型转换为扩展JSON字符串。当设置为false时,连接器使用原始的宽松JSON格式来处理不支持的数据类型。

默认值:false
schemaHint
指定用于推断集合模式时使用的已知字段类型的部分模式。有关schemaHint选项的更多信息,请参阅使用模式提示指定已知字段部分。

默认值:

从MongoDB读取更改流时,您可以配置以下属性

属性名称
描述
change.stream.lookup.full.document

确定更改流在更新操作中返回的值。

默认设置返回原始文档和更新文档之间的差异。

updateLookup 设置也返回原始文档和更新文档之间的差异,但它还包括整个更新文档的副本。

有关此更改流选项如何工作的更多信息,请参阅MongoDB服务器手册指南 更新操作的全文档查找。

默认: "default"

change.stream.micro.batch.max.partition.count
Spark Connector将每个微批划分为的最大分区数。Spark工作进程可以并行处理这些分区。

此设置仅适用于使用微批流。

默认: 1

警告:指定大于 1 的值可能会改变Spark Connector处理更改事件的顺序。如果顺序处理可能导致数据不一致,请避免此设置。

change.stream.publish.full.document.only
指定是否发布已更改的文档或完整的更改流文档。

当此设置为 false 时,您必须指定一个模式。该模式必须包括您想从更改流中读取的所有字段。您可以使用可选字段确保模式对所有更改流事件都有效。

当此设置为 true 时,连接器表现如下
  • 连接器过滤掉省略了 fullDocument 字段的消息,并仅发布该字段的值。

  • 如果您没有指定模式,连接器将从更改流文档中推断模式。

此设置覆盖了 change.stream.lookup.full.document 设置。

默认: false

change.stream.startup.mode
指定连接器在没有偏移量时如何启动。
此设置接受以下值

如果您使用SparkConf来指定任何之前的设置,您可以将它们包含在connection.uri设置中,也可以单独列出。

以下代码示例展示了如何将数据库、集合和读取偏好作为connection.uri设置的一部分进行指定

spark.mongodb.read.connection.uri=mongodb://127.0.0.1/myDB.myCollection?readPreference=primaryPreferred

为了使connection.uri更短并使设置更易于阅读,您可以单独指定它们

spark.mongodb.read.connection.uri=mongodb://127.0.0.1/
spark.mongodb.read.database=myDB
spark.mongodb.read.collection=myCollection
spark.mongodb.read.readPreference.name=primaryPreferred

重要

如果您在connection.uri和单独的一行中都指定了设置,则connection.uri设置将具有优先级。例如,在下面的配置中,连接数据库是foobar,因为它是connection.uri设置中的值。

spark.mongodb.read.connection.uri=mongodb://127.0.0.1/foobar
spark.mongodb.read.database=bar

您可以通过用逗号分隔集合名称来在collection更改流配置属性中指定多个集合。除非空格是集合名称的一部分,否则请勿在集合之间添加空格。

如以下示例所示指定多个集合

...
.option("spark.mongodb.collection", "collectionOne,collectionTwo")

如果集合名称是"*"或名称包含逗号或反斜杠(\),则必须按以下方式转义字符

  • 如果您的collection配置选项中使用的集合名称包含逗号,则Spark连接器将其视为两个不同的集合。为了避免这种情况,您必须通过在逗号之前加上反斜杠(\)来转义逗号。按以下方式转义名为"my,collection"的集合

    "my\,collection"
  • 如果您的collection配置选项中使用的集合名称是"*",则Spark连接器将其解释为扫描所有集合的指定。为了避免这种情况,您必须通过在前面加上反斜杠(\)来转义星号。按以下方式转义名为"*"的集合

    "\*"
  • 如果您的collection配置选项中使用的集合名称包含反斜杠(\),则Spark连接器将其视为转义字符,这可能会改变它对值的解释。为了避免这种情况,您必须通过在前面加上另一个反斜杠来转义反斜杠。按以下方式转义名为"\collection"的集合

    "\\collection"

    注意

    当在Java中将集合名称指定为字符串字面量时,您必须进一步通过在前面加上另一个反斜杠来转义每个反斜杠。例如,按以下方式转义名为"\collection"的集合

    "\\\\collection"

您可以通过传递一个字符串"*"作为集合名称来从数据库中的所有集合进行流式传输。

如以下示例所示指定所有集合

...
.option("spark.mongodb.collection", "*")

当从所有集合进行流式传输时创建集合,则新集合将自动包含在流中。

您可以在从多个集合进行流式传输的同时随时删除集合。

重要

使用多个集合推断模式

如果您将 change.stream.publish.full.document.only 选项设置为 true,Spark 连接器将使用扫描文档的模式来推断 DataFrame 的模式。

模式推断发生在流开始的时刻,不会考虑在流过程中创建的集合。

当从多个集合中流式传输并推断模式时,连接器会按顺序采样每个集合。从大量集合中流式传输可能会导致模式推断的性能明显变慢。这种性能影响仅发生在推断模式时。

返回

读取