批量读取配置选项
概述
在批量模式从 MongoDB 读取数据时,您可以配置以下属性。
注意
如果您使用SparkConf
设置连接器的读取配置,请将每个属性的 spark.mongodb.read.
前缀。
属性名称 | 描述 | ||
---|---|---|---|
connection.uri | 必需。 连接字符串配置键。 默认: mongodb://localhost:27017/ | ||
database | 必需。 数据库名配置。 | ||
collection | 必需。 集合名称配置。 | ||
comment | |||
mode | 处理不符合预期模式的文档时要使用的解析策略。此选项接受以下值
默认值: ReadConfig.ParseMode.FAILFAST | ||
columnNameOfCorruptRecord | 如果您将 mode 选项设置为ReadConfig.ParseMode.PERMISSIVE ,则此选项指定了存储无效文档(作为扩展JSON)的新列的名称。如果您使用显式模式,则必须包含新列的名称。如果您使用推断模式,则Spark连接器将新列添加到模式的末尾。默认: None | ||
mongoClientFactory | MongoClientFactory配置键。 您可以指定一个自定义实现,该实现必须实现 com.mongodb.spark.sql.connector.connection.MongoClientFactory 接口。默认值: com.mongodb.spark.sql.connector.connection.DefaultMongoClientFactory | ||
partitioner | 分区器完整类名。 您可以指定一个自定义实现,该实现必须实现 com.mongodb.spark.sql.connector.read.partitioner.Partitioner 接口。有关分区器的更多信息,请参阅分区器配置部分。 默认值: com.mongodb.spark.sql.connector.read.partitioner.SamplePartitioner | ||
partitioner.options. | 分区器配置前缀。 有关分区器的更多信息,请参阅分区器配置部分。 | ||
sampleSize | 推断模式时从集合中采样文档的数量。 以推断模式。 默认值: 1000 | ||
sql.inferSchema.mapTypes.enabled | 是否启用推断模式时的Map类型。 启用时,大型兼容的结构类型被推断为 MapType 。默认值: true | ||
sql.inferSchema.mapTypes.minimum.key.size | 在推断为 MapType 之前StructType 的最小大小。默认值: 250 | ||
aggregation.pipeline | 指定在将数据发送到Spark之前应用于集合的自定义聚合管道。 值必须是扩展JSON单个文档或文档列表。 单个文档如下所示
文档列表如下所示
重要:自定义聚合管道必须与分区策略兼容。例如,像 | ||
aggregation.allowDiskUse | 指定在运行聚合时是否允许存储到磁盘。 默认值: true | ||
outputExtendedJson | 当 true 时,连接器将Spark不支持的原生BSON类型转换为扩展JSON字符串。当false 时,连接器使用原始的宽松JSON格式来处理不支持的类型。默认值: false | ||
schemaHint |
分区器配置
分区器会改变使用Spark连接器的批量读取的读取行为。通过将数据划分为分区,可以在并行中运行转换。
本节包含以下分区器的配置信息
注意
仅批量读取
因为数据流处理引擎生成单个数据流,分区器不会影响流式读取。
SamplePartitioner
配置
SamplePartitioner
是默认的分区器配置。此配置允许您指定分区字段、分区大小和每个分区的样本数。
要使用此配置,请将 partitioner
配置选项设置为 com.mongodb.spark.sql.connector.read.partitioner.SamplePartitioner
。
属性名称 | 描述 | |
---|---|---|
partitioner.options.partition.field | 用于分区的字段,必须是一个唯一的字段。 默认: | |
partitioner.options.partition.size | 每个分区的尺寸(以MB为单位)。较小的分区大小会创建更多包含较少文档的分区。 默认: | |
partitioner.options.samples.per.partition | 每个分区要提取的样本数。提取的样本总数是
默认: |
示例
对于包含640个文档且平均文档大小为0.5MB的集合,默认的 SamplePartitioner
配置创建5个分区,每个分区包含128个文档。
Spark连接器从50个文档中抽取样本(默认每个目标分区10个),并通过从样本文档中选择分区字段范围来定义5个分区。
ShardedPartitioner
配置
ShardedPartitioner
配置会根据您的分片配置自动对数据进行分区。
要使用此配置,请将 partitioner
配置选项设置为 com.mongodb.spark.sql.connector.read.partitioner.ShardedPartitioner
。
重要
ShardedPartitioner 限制
在 MongoDB 服务器 v6.0 及更高版本中,分片操作会创建一个覆盖所有分片键值的大初始分片,这使得分片分区器效率低下。我们不推荐在连接到 MongoDB v6.0 及更高版本时使用分片分区器。
分片分区器与哈希分片键不兼容。
PaginateBySizePartitioner
配置
PaginateBySizePartitioner
配置通过使用平均文档大小将数据分页,将集合分成平均大小的块。
要使用此配置,请将 partitioner
配置选项设置为 com.mongodb.spark.sql.connector.read.partitioner.PaginateBySizePartitioner
。
属性名称 | 描述 |
---|---|
partitioner.options.partition.field | 用于分区的字段,必须是一个唯一的字段。 默认: |
partitioner.options.partition.size | 每个分区的尺寸(以 MB 计算)。更小的分区尺寸 创建包含更少文档的更多分区。 默认: |
PaginateIntoPartitionsPartitioner
配置
《PaginateIntoPartitionsPartitioner
》配置通过将集合中文档的数量除以允许的最大分区数来分页数据。
要使用此配置,将partitioner
配置选项设置为com.mongodb.spark.sql.connector.read.partitioner.PaginateIntoPartitionsPartitioner
。
属性名称 | 描述 |
---|---|
partitioner.options.partition.field | 用于分区的字段,必须是一个唯一的字段。 默认: |
partitioner.options.max.number.of.partitions | 要创建的分区数。 默认: |
SinglePartitionPartitioner
配置
《SinglePartitionPartitioner
》配置创建单个分区。
要使用此配置,将partitioner
配置选项设置为com.mongodb.spark.sql.connector.read.partitioner.SinglePartitionPartitioner
。
AutoBucketPartitioner
配置
AutoBucketPartitioner
配置类似于 SamplePartitioner 配置,但使用 $bucketAuto 聚合阶段来分页数据。使用此配置,您可以跨单个或多个字段分区数据,包括嵌套字段。
要使用此配置,请将 partitioner
配置选项设置为 com.mongodb.spark.sql.connector.read.partitioner.AutoBucketPartitioner
。
属性名称 | 描述 |
---|---|
partitioner.options.partition.fieldList | 用于分区的字段列表。值可以是单个字段名或逗号分隔的字段列表。 默认: |
partitioner.options.partition.chunkSize | 每个分区的平均大小(MB)。较小的分区大小会创建更多的分区,每个分区包含较少的文档。因为此配置使用平均文档大小来确定每个分区的文档数,所以分区的大小可能不均匀。 默认: |
partitioner.options.partition.samplesPerPartition | 每个分区中要抽取的样本数。 默认值: |
partitioner.options.partition.partitionKeyProjectionField | 用于包含用于分区集合的所有字段的投影字段的字段名。我们建议仅在每个文档已包含 默认值: |
在 connection.uri
中指定属性
如果您使用 SparkConf 指定任何前面的设置,您可以在 connection.uri
设置中包含它们或单独列出它们。
以下代码示例展示了如何将数据库、集合和读取偏好作为 connection.uri
设置的一部分进行指定
spark.mongodb.read.connection.uri=mongodb://127.0.0.1/myDB.myCollection?readPreference=primaryPreferred
为了使connection.uri
更短,并使设置更容易阅读,您可以选择单独指定它们
spark.mongodb.read.connection.uri=mongodb://127.0.0.1/ spark.mongodb.read.database=myDB spark.mongodb.read.collection=myCollection spark.mongodb.read.readPreference.name=primaryPreferred
重要
如果您在connection.uri
以及其单独的一行中指定了设置,则connection.uri
设置优先。例如,在以下配置中,连接数据库是foobar
,因为它是connection.uri
设置中的值
spark.mongodb.read.connection.uri=mongodb://127.0.0.1/foobar spark.mongodb.read.database=bar