文档菜单
文档首页
/
Spark 连接器
/

在批量模式下从 MongoDB 读取

本页内容

  • 概述
  • 模式推断
  • 过滤器
  • SQL 查询
  • API 文档

要从 MongoDB 读取数据,请在您的 SparkSession 对象上调用 read() 方法。此方法返回一个 DataFrameReader 对象,您可以使用它来指定批量读取操作的数据格式和其他配置设置。dataFrame.read.format()

您必须指定以下配置设置才能从 MongoDB 读取

设置
描述
dataFrame.read.format()
指定底层输入数据源的格式。使用 mongodb 从 MongoDB 读取。
dataFrame.read.option()

使用 option 方法配置批量读取设置,包括 MongoDB 部署连接字符串、MongoDB 数据库和集合以及分区器配置。

有关批量读取配置选项的列表,请参阅批读取配置选项指南。

以下代码示例展示了如何使用前面的配置设置从MongoDB中的people.contacts读取数据

Dataset<Row> dataFrame = spark.read()
.format("mongodb")
.option("database", "people")
.option("collection", "contacts")
.load();

提示

DataFrame类型

DataFrame在Java API中不是一个类。使用Dataset<Row>来引用DataFrame。

要从MongoDB读取数据,请在您的SparkSession对象上调用read函数。此函数返回一个DataFrameReader对象,您可以使用它来指定批读取操作的模式和其他配置设置。

您必须指定以下配置设置才能从 MongoDB 读取

设置
描述
dataFrame.read.format()
指定底层输入数据源的格式。使用 mongodb 从 MongoDB 读取。
dataFrame.read.option()

使用option方法配置批读取设置,包括MongoDB部署连接字符串、MongoDB数据库和集合以及分区器配置。

有关批读取配置选项的列表,请参阅批读取配置选项指南。

以下代码示例展示了如何使用前面的配置设置从MongoDB中的people.contacts读取数据

dataFrame = spark.read
.format("mongodb")
.option("database", "people")
.option("collection", "contacts")
.load()

要从MongoDB读取数据,请在您的SparkSession对象上调用read方法。此方法返回一个DataFrameReader对象,您可以使用它来指定批读取操作的模式和其他配置设置。

您必须指定以下配置设置才能从 MongoDB 读取

设置
描述
dataFrame.read.format()
指定底层输入数据源的格式。使用 mongodb 从 MongoDB 读取。
dataFrame.read.option()

使用option方法配置批读取设置,包括MongoDB部署连接字符串、MongoDB数据库和集合以及分区器配置。

有关批读取配置选项的列表,请参阅批读取配置选项指南。

以下代码示例展示了如何使用前面的配置设置从MongoDB中的people.contacts读取数据

val dataFrame = spark.read
.format("mongodb")
.option("database", "people")
.option("collection", "contacts")
.load()

提示

DataFrame类型

DataFrame由一个DatasetRow对象表示。《DataFrame》类型是Dataset[Row]的别名。

当您加载一个没有模式的Dataset或DataFrame时,Spark会采样记录来推断集合的模式。

假设MongoDB集合people.contacts包含以下文档

{ "_id" : ObjectId("585024d558bef808ed84fc3e"), "name" : "Bilbo Baggins", "age" : 50 }
{ "_id" : ObjectId("585024d558bef808ed84fc3f"), "name" : "Gandalf", "age" : 1000 }
{ "_id" : ObjectId("585024d558bef808ed84fc40"), "name" : "Thorin", "age" : 195 }
{ "_id" : ObjectId("585024d558bef808ed84fc41"), "name" : "Balin", "age" : 178 }
{ "_id" : ObjectId("585024d558bef808ed84fc42"), "name" : "Kíli", "age" : 77 }
{ "_id" : ObjectId("585024d558bef808ed84fc43"), "name" : "Dwalin", "age" : 169 }
{ "_id" : ObjectId("585024d558bef808ed84fc44"), "name" : "Óin", "age" : 167 }
{ "_id" : ObjectId("585024d558bef808ed84fc45"), "name" : "Glóin", "age" : 158 }
{ "_id" : ObjectId("585024d558bef808ed84fc46"), "name" : "Fíli", "age" : 82 }
{ "_id" : ObjectId("585024d558bef808ed84fc47"), "name" : "Bombur" }

以下操作从people.contacts加载数据并推断DataFrame的模式

Dataset<Row> dataFrame = spark.read()
.format("mongodb")
.option("database", "people")
.option("collection", "contacts")
.load();

要查看推断出的模式,请在您的Dataset<Row>对象上使用printSchema()方法,如下面的示例所示

dataFrame.printSchema();
root
|-- _id: struct (nullable = true)
| |-- oid: string (nullable = true)
|-- age: integer (nullable = true)
|-- name: string (nullable = true)

要查看DataFrame中的数据,请在您的DataFrame对象上使用show()方法,如下面的示例所示

dataFrame.show();
+--------------------+----+-------------+
| _id| age| name|
+--------------------+----+-------------+
|[585024d558bef808...| 50|Bilbo Baggins|
|[585024d558bef808...|1000| Gandalf|
|[585024d558bef808...| 195| Thorin|
|[585024d558bef808...| 178| Balin|
|[585024d558bef808...| 77| Kíli|
|[585024d558bef808...| 169| Dwalin|
|[585024d558bef808...| 167| Óin|
|[585024d558bef808...| 158| Glóin|
|[585024d558bef808...| 82| Fíli|
|[585024d558bef808...|null| Bombur|
+--------------------+----+-------------+

当您加载一个没有模式的Dataset或DataFrame时,Spark会采样记录来推断集合的模式。

假设MongoDB集合people.contacts包含以下文档

{ "_id" : ObjectId("585024d558bef808ed84fc3e"), "name" : "Bilbo Baggins", "age" : 50 }
{ "_id" : ObjectId("585024d558bef808ed84fc3f"), "name" : "Gandalf", "age" : 1000 }
{ "_id" : ObjectId("585024d558bef808ed84fc40"), "name" : "Thorin", "age" : 195 }
{ "_id" : ObjectId("585024d558bef808ed84fc41"), "name" : "Balin", "age" : 178 }
{ "_id" : ObjectId("585024d558bef808ed84fc42"), "name" : "Kíli", "age" : 77 }
{ "_id" : ObjectId("585024d558bef808ed84fc43"), "name" : "Dwalin", "age" : 169 }
{ "_id" : ObjectId("585024d558bef808ed84fc44"), "name" : "Óin", "age" : 167 }
{ "_id" : ObjectId("585024d558bef808ed84fc45"), "name" : "Glóin", "age" : 158 }
{ "_id" : ObjectId("585024d558bef808ed84fc46"), "name" : "Fíli", "age" : 82 }
{ "_id" : ObjectId("585024d558bef808ed84fc47"), "name" : "Bombur" }

以下操作从people.contacts加载数据并推断DataFrame的模式

dataFrame = spark.read
.format("mongodb")
.option("database", "people")
.option("collection", "contacts")
.load()

要查看推断出的模式,请在您的DataFrame对象上使用printSchema()函数,如下面的示例所示

dataFrame.printSchema()
root
|-- _id: struct (nullable = true)
| |-- oid: string (nullable = true)
|-- age: integer (nullable = true)
|-- name: string (nullable = true)

要查看DataFrame中的数据,请在您的DataFrame对象上使用show()函数,如下面的示例所示

dataFrame.show()
+--------------------+----+-------------+
| _id| age| name|
+--------------------+----+-------------+
|[585024d558bef808...| 50|Bilbo Baggins|
|[585024d558bef808...|1000| Gandalf|
|[585024d558bef808...| 195| Thorin|
|[585024d558bef808...| 178| Balin|
|[585024d558bef808...| 77| Kíli|
|[585024d558bef808...| 169| Dwalin|
|[585024d558bef808...| 167| Óin|
|[585024d558bef808...| 158| Glóin|
|[585024d558bef808...| 82| Fíli|
|[585024d558bef808...|null| Bombur|
+--------------------+----+-------------+

当您加载一个没有模式的Dataset或DataFrame时,Spark会采样记录来推断集合的模式。

假设MongoDB集合people.contacts包含以下文档

{ "_id" : ObjectId("585024d558bef808ed84fc3e"), "name" : "Bilbo Baggins", "age" : 50 }
{ "_id" : ObjectId("585024d558bef808ed84fc3f"), "name" : "Gandalf", "age" : 1000 }
{ "_id" : ObjectId("585024d558bef808ed84fc40"), "name" : "Thorin", "age" : 195 }
{ "_id" : ObjectId("585024d558bef808ed84fc41"), "name" : "Balin", "age" : 178 }
{ "_id" : ObjectId("585024d558bef808ed84fc42"), "name" : "Kíli", "age" : 77 }
{ "_id" : ObjectId("585024d558bef808ed84fc43"), "name" : "Dwalin", "age" : 169 }
{ "_id" : ObjectId("585024d558bef808ed84fc44"), "name" : "Óin", "age" : 167 }
{ "_id" : ObjectId("585024d558bef808ed84fc45"), "name" : "Glóin", "age" : 158 }
{ "_id" : ObjectId("585024d558bef808ed84fc46"), "name" : "Fíli", "age" : 82 }
{ "_id" : ObjectId("585024d558bef808ed84fc47"), "name" : "Bombur" }

以下操作从people.contacts加载数据并推断DataFrame的模式

val dataFrame = spark.read()
.format("mongodb")
.option("database", "people")
.option("collection", "contacts")
.load()

要查看推断出的模式,请在您的DataFrame对象上使用printSchema()方法,如下面的示例所示

dataFrame.printSchema()
root
|-- _id: struct (nullable = true)
| |-- oid: string (nullable = true)
|-- age: integer (nullable = true)
|-- name: string (nullable = true)

要查看DataFrame中的数据,请在您的DataFrame对象上使用show()方法,如下面的示例所示

dataFrame.show()
+--------------------+----+-------------+
| _id| age| name|
+--------------------+----+-------------+
|[585024d558bef808...| 50|Bilbo Baggins|
|[585024d558bef808...|1000| Gandalf|
|[585024d558bef808...| 195| Thorin|
|[585024d558bef808...| 178| Balin|
|[585024d558bef808...| 77| Kíli|
|[585024d558bef808...| 169| Dwalin|
|[585024d558bef808...| 167| Óin|
|[585024d558bef808...| 158| Glóin|
|[585024d558bef808...| 82| Fíli|
|[585024d558bef808...|null| Bombur|
+--------------------+----+-------------+

您可以通过指定schemaHint配置选项来指定包含已知字段值的模式,以便在模式推断期间使用。您可以在以下任何Spark格式中指定schemaHint选项:

类型
格式
DDL
<field one name> <FIELD ONE TYPE>, <field two name> <FIELD TWO TYPE>
SQL DDL
STRUCT<<field one name>: <FIELD ONE TYPE>, <field two name>: <FIELD TWO TYPE>
JSON
{ "type": "struct", "fields": [
{ "name": "<field name>", "type": "<field type>", "nullable": <true/false> },
{ "name": "<field name>", "type": "<field type>", "nullable": <true/false> }]}

以下示例展示了如何使用Spark shell指定每种格式的schemaHint选项。示例指定了一个名为"value"的字符串值字段和一个名为"count"的整数值字段。

import org.apache.spark.sql.types._
val mySchema = StructType(Seq(
StructField("value", StringType),
StructField("count", IntegerType))
// Generate DDL format
mySchema.toDDL
// Generate SQL DDL format
mySchema.sql
// Generate Simple String DDL format
mySchema.simpleString
// Generate JSON format
mySchema.json

您还可以使用PySpark在简单的字符串DDL格式或JSON格式中指定schemaHint选项,如下例所示

from pyspark.sql.types import StructType, StructField, StringType, IntegerType
mySchema = StructType([
StructField('value', StringType(), True),
StructField('count', IntegerType(), True)])
# Generate Simple String DDL format
mySchema.simpleString()
# Generate JSON format
mySchema.json()

当使用DataFrames或Datasets与过滤器一起使用时,底层的MongoDB Connector代码会在将数据发送到Spark之前构建一个聚合管道以过滤MongoDB中的数据。这通过仅检索和处理所需的数据来提高Spark性能。

MongoDB Spark Connector将以下过滤器转换为聚合管道阶段

  • And

  • EqualNullSafe

  • EqualTo

  • GreaterThan

  • GreaterThanOrEqual

  • In

  • IsNull

  • LessThan

  • LessThanOrEqual

  • Not

  • Or

  • StringContains

  • StringEndsWith

  • StringStartsWith

使用filter()从MongoDB集合中读取数据子集。

考虑一个名为fruit的集合,它包含以下文档

{ "_id" : 1, "type" : "apple", "qty" : 5 }
{ "_id" : 2, "type" : "orange", "qty" : 10 }
{ "_id" : 3, "type" : "banana", "qty" : 15 }

首先,设置一个DataFrame对象以连接到默认的MongoDB数据源

df = spark.read.format("mongodb").load()

以下示例仅包含满足 qty 字段值大于或等于 10 的记录。

df.filter(df['qty'] >= 10).show()

操作将打印以下输出

+---+----+------+
|_id| qty| type|
+---+----+------+
|2.0|10.0|orange|
|3.0|15.0|banana|
+---+----+------+

当使用DataFrames或Datasets与过滤器一起使用时,底层的MongoDB Connector代码会在将数据发送到Spark之前构建一个聚合管道以过滤MongoDB中的数据。这通过仅检索和处理所需的数据来提高Spark性能。

MongoDB Spark Connector将以下过滤器转换为聚合管道阶段

  • And

  • EqualNullSafe

  • EqualTo

  • GreaterThan

  • GreaterThanOrEqual

  • In

  • IsNull

  • LessThan

  • LessThanOrEqual

  • Not

  • Or

  • StringContains

  • StringEndsWith

  • StringStartsWith

以下示例对年龄低于100岁的字符进行筛选和输出

df.filter(df("age") < 100).show()

操作输出以下内容

+--------------------+---+-------------+
| _id|age| name|
+--------------------+---+-------------+
|[5755d7b4566878c9...| 50|Bilbo Baggins|
|[5755d7b4566878c9...| 82| Fíli|
|[5755d7b4566878c9...| 77| Kíli|
+--------------------+---+-------------+

在您的数据集上运行SQL查询之前,您必须为数据集注册一个临时视图。

以下操作注册了一个名为 characters 的表,然后查询它以找到所有100岁或以上的角色

implicitDS.createOrReplaceTempView("characters");
Dataset<Row> centenarians = spark.sql("SELECT name, age FROM characters WHERE age >= 100");
centenarians.show();

centenarians.show() 输出以下内容

+-------+----+
| name| age|
+-------+----+
|Gandalf|1000|
| Thorin| 195|
| Balin| 178|
| Dwalin| 169|
| Óin| 167|
| Glóin| 158|
+-------+----+

在您可以针对您的DataFrame运行SQL查询之前,您需要注册一个临时表。

以下示例注册了一个名为 temp 的临时表,然后使用SQL查询其中 type 字段包含字母 e 的记录

df.createOrReplaceTempView("temp")
some_fruit = spark.sql("SELECT type, qty FROM temp WHERE type LIKE '%e%'")
some_fruit.show()

pyspark 命令行中,操作打印以下输出

+------+----+
| type| qty|
+------+----+
| apple| 5.0|
|orange|10.0|
+------+----+

在您的数据集上运行SQL查询之前,您必须为数据集注册一个临时视图。

以下操作注册了一个名为 characters 的表,然后查询它以找到所有100岁或以上的角色

val characters = spark.read.format("mongodb").as[Character]
characters.createOrReplaceTempView("characters")
val centenarians = spark.sql("SELECT name, age FROM characters WHERE age >= 100")
centenarians.show()

有关这些示例中使用的类型,请参阅以下Apache Spark API文档

返回

批量模式