文档菜单
文档首页
/
Spark 连接器
/ /

批量读取配置选项

本页面

  • 概述
  • 分区器配置
  • 在 connection.uri 中指定属性connection.uri

在批量模式从 MongoDB 读取数据时,您可以配置以下属性。

注意

如果您使用SparkConf 设置连接器的读取配置,请将每个属性的 spark.mongodb.read. 前缀。

属性名称
描述
connection.uri
必需。
连接字符串配置键。

默认: mongodb://localhost:27017/
database
必需。
数据库名配置。
collection
必需。
集合名称配置。
comment
附加到读取操作的注释。注释显示在数据库分析器的输出中。

默认: None
mode
处理不符合预期模式的文档时要使用的解析策略。此选项接受以下值
  • ReadConfig.ParseMode.FAILFAST:当解析不匹配模式的文档时抛出异常。

  • ReadConfig.ParseMode.PERMISSIVE:当数据类型不匹配模式时将字段设置为null。要将每个无效文档作为扩展JSON字符串存储,请将此值与columnNameOfCorruptRecord选项组合。

  • ReadConfig.ParseMode.DROPMALFORMED:忽略任何不匹配模式的文档。


默认值: ReadConfig.ParseMode.FAILFAST
columnNameOfCorruptRecord
如果您将mode选项设置为ReadConfig.ParseMode.PERMISSIVE,则此选项指定了存储无效文档(作为扩展JSON)的新列的名称。如果您使用显式模式,则必须包含新列的名称。如果您使用推断模式,则Spark连接器将新列添加到模式的末尾。

默认: None
mongoClientFactory
MongoClientFactory配置键。
您可以指定一个自定义实现,该实现必须实现com.mongodb.spark.sql.connector.connection.MongoClientFactory接口。

默认值: com.mongodb.spark.sql.connector.connection.DefaultMongoClientFactory
partitioner
分区器完整类名。
您可以指定一个自定义实现,该实现必须实现com.mongodb.spark.sql.connector.read.partitioner.Partitioner接口。
有关分区器的更多信息,请参阅分区器配置部分。

默认值: com.mongodb.spark.sql.connector.read.partitioner.SamplePartitioner
partitioner.options.
分区器配置前缀。
有关分区器的更多信息,请参阅分区器配置部分。
sampleSize
推断模式时从集合中采样文档的数量。
以推断模式。

默认值: 1000
sql.inferSchema.mapTypes.enabled
是否启用推断模式时的Map类型。
启用时,大型兼容的结构类型被推断为MapType

默认值: true
sql.inferSchema.mapTypes.minimum.key.size
在推断为MapType之前StructType的最小大小。

默认值: 250
aggregation.pipeline
指定在将数据发送到Spark之前应用于集合的自定义聚合管道。
值必须是扩展JSON单个文档或文档列表。
单个文档如下所示
{"$match": {"closed": false}}
文档列表如下所示
[{"$match": {"closed": false}}, {"$project": {"status": 1, "name": 1, "description": 1}}]

重要:自定义聚合管道必须与分区策略兼容。例如,像$group这样的聚合阶段不能与创建多个分区的任何分区器一起使用。

aggregation.allowDiskUse
指定在运行聚合时是否允许存储到磁盘。

默认值: true
outputExtendedJson
true时,连接器将Spark不支持的原生BSON类型转换为扩展JSON字符串。当false时,连接器使用原始的宽松JSON格式来处理不支持的类型。

默认值: false
schemaHint
指定已知字段类型的不完整模式,在推断集合的模式时使用。有关schemaHint选项的更多信息,请参阅使用模式提示指定已知字段部分。

默认: None

分区器会改变使用Spark连接器的批量读取的读取行为。通过将数据划分为分区,可以在并行中运行转换。

本节包含以下分区器的配置信息

注意

仅批量读取

因为数据流处理引擎生成单个数据流,分区器不会影响流式读取。

SamplePartitioner 是默认的分区器配置。此配置允许您指定分区字段、分区大小和每个分区的样本数。

要使用此配置,请将 partitioner 配置选项设置为 com.mongodb.spark.sql.connector.read.partitioner.SamplePartitioner

属性名称
描述
partitioner.options.partition.field

用于分区的字段,必须是一个唯一的字段。

默认: _id

partitioner.options.partition.size

每个分区的尺寸(以MB为单位)。较小的分区大小会创建更多包含较少文档的分区。

默认: 64

partitioner.options.samples.per.partition

每个分区要提取的样本数。提取的样本总数是

samples per partition * ( count / number of documents per partition)

默认: 10

示例

对于包含640个文档且平均文档大小为0.5MB的集合,默认的 SamplePartitioner 配置创建5个分区,每个分区包含128个文档。

Spark连接器从50个文档中抽取样本(默认每个目标分区10个),并通过从样本文档中选择分区字段范围来定义5个分区。

ShardedPartitioner 配置会根据您的分片配置自动对数据进行分区。

要使用此配置,请将 partitioner 配置选项设置为 com.mongodb.spark.sql.connector.read.partitioner.ShardedPartitioner

重要

ShardedPartitioner 限制

  1. 在 MongoDB 服务器 v6.0 及更高版本中,分片操作会创建一个覆盖所有分片键值的大初始分片,这使得分片分区器效率低下。我们不推荐在连接到 MongoDB v6.0 及更高版本时使用分片分区器。

  2. 分片分区器与哈希分片键不兼容。

PaginateBySizePartitioner 配置通过使用平均文档大小将数据分页,将集合分成平均大小的块。

要使用此配置,请将 partitioner 配置选项设置为 com.mongodb.spark.sql.connector.read.partitioner.PaginateBySizePartitioner

属性名称
描述
partitioner.options.partition.field

用于分区的字段,必须是一个唯一的字段。

默认: _id

partitioner.options.partition.size

每个分区的尺寸(以 MB 计算)。更小的分区尺寸

创建包含更少文档的更多分区。

默认: 64

PaginateIntoPartitionsPartitioner》配置通过将集合中文档的数量除以允许的最大分区数来分页数据。

要使用此配置,将partitioner配置选项设置为com.mongodb.spark.sql.connector.read.partitioner.PaginateIntoPartitionsPartitioner

属性名称
描述
partitioner.options.partition.field

用于分区的字段,必须是一个唯一的字段。

默认: _id

partitioner.options.max.number.of.partitions

要创建的分区数。

默认: 64

SinglePartitionPartitioner》配置创建单个分区。

要使用此配置,将partitioner配置选项设置为com.mongodb.spark.sql.connector.read.partitioner.SinglePartitionPartitioner

AutoBucketPartitioner 配置类似于 SamplePartitioner 配置,但使用 $bucketAuto 聚合阶段来分页数据。使用此配置,您可以跨单个或多个字段分区数据,包括嵌套字段。

要使用此配置,请将 partitioner 配置选项设置为 com.mongodb.spark.sql.connector.read.partitioner.AutoBucketPartitioner

属性名称
描述
partitioner.options.partition.fieldList

用于分区的字段列表。值可以是单个字段名或逗号分隔的字段列表。

默认: _id

partitioner.options.partition.chunkSize

每个分区的平均大小(MB)。较小的分区大小会创建更多的分区,每个分区包含较少的文档。因为此配置使用平均文档大小来确定每个分区的文档数,所以分区的大小可能不均匀。

默认: 64

partitioner.options.partition.samplesPerPartition

每个分区中要抽取的样本数。

默认值: 100

partitioner.options.partition.partitionKeyProjectionField

用于包含用于分区集合的所有字段的投影字段的字段名。我们建议仅在每个文档已包含 __idx 字段时更改此属性的值。

默认值: __idx

如果您使用 SparkConf 指定任何前面的设置,您可以在 connection.uri 设置中包含它们或单独列出它们。

以下代码示例展示了如何将数据库、集合和读取偏好作为 connection.uri 设置的一部分进行指定

spark.mongodb.read.connection.uri=mongodb://127.0.0.1/myDB.myCollection?readPreference=primaryPreferred

为了使connection.uri更短,并使设置更容易阅读,您可以选择单独指定它们

spark.mongodb.read.connection.uri=mongodb://127.0.0.1/
spark.mongodb.read.database=myDB
spark.mongodb.read.collection=myCollection
spark.mongodb.read.readPreference.name=primaryPreferred

重要

如果您在connection.uri以及其单独的一行中指定了设置,则connection.uri设置优先。例如,在以下配置中,连接数据库是foobar,因为它是connection.uri设置中的值

spark.mongodb.read.connection.uri=mongodb://127.0.0.1/foobar
spark.mongodb.read.database=bar

返回

读取