流式读取配置选项
概述
在流模式下从 MongoDB 读取数据时,您可以配置以下属性。
注意
如果您使用SparkConf
来设置连接器的读取配置,请在每个属性前添加前缀 spark.mongodb.read.
。
属性名称 | 描述 | ||
---|---|---|---|
connection.uri | 必需。 连接字符串配置键。 默认值: mongodb://localhost:27017/ | ||
database | 必需。 数据库名配置。 | ||
collection | |||
注释 | |||
mode | 处理与预期模式不匹配的文档时使用的解析策略。此选项接受以下值
默认值: ReadConfig.ParseMode.FAILFAST | ||
columnNameOfCorruptRecord | 如果您将 mode 选项设置为ReadConfig.ParseMode.PERMISSIVE ,则此选项指定用于存储无效文档(作为扩展JSON)的新列的名称。如果您使用的是显式模式,它必须包含新列的名称。如果您使用的是推断模式,Spark连接器将新列添加到模式的末尾。默认值:无 | ||
mongoClientFactory | MongoClientFactory配置键。 您可以为自定义实现指定,该实现必须实现 com.mongodb.spark.sql.connector.connection.MongoClientFactory 接口。默认值: com.mongodb.spark.sql.connector.connection.DefaultMongoClientFactory | ||
aggregation.pipeline | 指定在将数据发送到Spark之前应用于集合的自定义聚合管道。 值必须是扩展JSON单个文档或文档列表。 单个文档类似于以下内容
文档列表类似于以下内容
自定义聚合管道必须与分区策略兼容。例如, | ||
aggregation.allowDiskUse | 指定在运行聚合时是否允许存储到磁盘。 默认值: true | ||
change.stream. | 更改流配置前缀。 有关更改流的更多信息,请参阅更改流配置部分。 | ||
outputExtendedJson | 当设置为 true 时,连接器将Spark不支持的数据类型转换为扩展JSON字符串。当设置为false 时,连接器使用原始的宽松JSON格式来处理不支持的数据类型。默认值: false | ||
schemaHint |
更改流配置
从MongoDB读取更改流时,您可以配置以下属性
属性名称 | 描述 |
---|---|
change.stream.lookup.full.document | 确定更改流在更新操作中返回的值。 默认设置返回原始文档和更新文档之间的差异。
有关此更改流选项如何工作的更多信息,请参阅MongoDB服务器手册指南 更新操作的全文档查找。 默认: "default" |
change.stream.micro.batch.max.partition.count | Spark Connector将每个微批划分为的最大分区数。Spark工作进程可以并行处理这些分区。 此设置仅适用于使用微批流。 默认: 1 警告:指定大于 |
change.stream.publish.full.document.only | 指定是否发布已更改的文档或完整的更改流文档。 当此设置为 false 时,您必须指定一个模式。该模式必须包括您想从更改流中读取的所有字段。您可以使用可选字段确保模式对所有更改流事件都有效。当此设置为 true 时,连接器表现如下
此设置覆盖了 默认: |
change.stream.startup.mode |
在connection.uri
中指定属性
如果您使用SparkConf来指定任何之前的设置,您可以将它们包含在connection.uri
设置中,也可以单独列出。
以下代码示例展示了如何将数据库、集合和读取偏好作为connection.uri
设置的一部分进行指定
spark.mongodb.read.connection.uri=mongodb://127.0.0.1/myDB.myCollection?readPreference=primaryPreferred
为了使connection.uri
更短并使设置更易于阅读,您可以单独指定它们
spark.mongodb.read.connection.uri=mongodb://127.0.0.1/ spark.mongodb.read.database=myDB spark.mongodb.read.collection=myCollection spark.mongodb.read.readPreference.name=primaryPreferred
重要
如果您在connection.uri
和单独的一行中都指定了设置,则connection.uri
设置将具有优先级。例如,在下面的配置中,连接数据库是foobar
,因为它是connection.uri
设置中的值。
spark.mongodb.read.connection.uri=mongodb://127.0.0.1/foobar spark.mongodb.read.database=bar
在collection
属性中指定多个集合
您可以通过用逗号分隔集合名称来在collection
更改流配置属性中指定多个集合。除非空格是集合名称的一部分,否则请勿在集合之间添加空格。
如以下示例所示指定多个集合
... .option("spark.mongodb.collection", "collectionOne,collectionTwo")
如果集合名称是"*"或名称包含逗号或反斜杠(\),则必须按以下方式转义字符
如果您的
collection
配置选项中使用的集合名称包含逗号,则Spark连接器将其视为两个不同的集合。为了避免这种情况,您必须通过在逗号之前加上反斜杠(\)来转义逗号。按以下方式转义名为"my,collection"的集合"my\,collection" 如果您的
collection
配置选项中使用的集合名称是"*",则Spark连接器将其解释为扫描所有集合的指定。为了避免这种情况,您必须通过在前面加上反斜杠(\)来转义星号。按以下方式转义名为"*"的集合"\*" 如果您的
collection
配置选项中使用的集合名称包含反斜杠(\),则Spark连接器将其视为转义字符,这可能会改变它对值的解释。为了避免这种情况,您必须通过在前面加上另一个反斜杠来转义反斜杠。按以下方式转义名为"\collection"的集合"\\collection" 注意
当在Java中将集合名称指定为字符串字面量时,您必须进一步通过在前面加上另一个反斜杠来转义每个反斜杠。例如,按以下方式转义名为"\collection"的集合
"\\\\collection"
您可以通过传递一个字符串"*"作为集合名称来从数据库中的所有集合进行流式传输。
如以下示例所示指定所有集合
... .option("spark.mongodb.collection", "*")
当从所有集合进行流式传输时创建集合,则新集合将自动包含在流中。
您可以在从多个集合进行流式传输的同时随时删除集合。
重要
使用多个集合推断模式
如果您将 change.stream.publish.full.document.only
选项设置为 true
,Spark 连接器将使用扫描文档的模式来推断 DataFrame
的模式。
模式推断发生在流开始的时刻,不会考虑在流过程中创建的集合。
当从多个集合中流式传输并推断模式时,连接器会按顺序采样每个集合。从大量集合中流式传输可能会导致模式推断的性能明显变慢。这种性能影响仅发生在推断模式时。