在批量模式下从 MongoDB 读取
概述
要从 MongoDB 读取数据,请在您的 SparkSession
对象上调用 read()
方法。此方法返回一个 DataFrameReader
对象,您可以使用它来指定批量读取操作的数据格式和其他配置设置。dataFrame.read.format()
您必须指定以下配置设置才能从 MongoDB 读取
设置 | 描述 |
---|---|
dataFrame.read.format() | 指定底层输入数据源的格式。使用 mongodb 从 MongoDB 读取。 |
dataFrame.read.option() |
以下代码示例展示了如何使用前面的配置设置从MongoDB中的people.contacts
读取数据
Dataset<Row> dataFrame = spark.read() .format("mongodb") .option("database", "people") .option("collection", "contacts") .load();
提示
DataFrame类型
DataFrame
在Java API中不是一个类。使用Dataset<Row>
来引用DataFrame。
要从MongoDB读取数据,请在您的SparkSession
对象上调用read
函数。此函数返回一个DataFrameReader
对象,您可以使用它来指定批读取操作的模式和其他配置设置。
您必须指定以下配置设置才能从 MongoDB 读取
设置 | 描述 |
---|---|
dataFrame.read.format() | 指定底层输入数据源的格式。使用 mongodb 从 MongoDB 读取。 |
dataFrame.read.option() |
以下代码示例展示了如何使用前面的配置设置从MongoDB中的people.contacts
读取数据
dataFrame = spark.read .format("mongodb") .option("database", "people") .option("collection", "contacts") .load()
要从MongoDB读取数据,请在您的SparkSession
对象上调用read
方法。此方法返回一个DataFrameReader
对象,您可以使用它来指定批读取操作的模式和其他配置设置。
您必须指定以下配置设置才能从 MongoDB 读取
设置 | 描述 |
---|---|
dataFrame.read.format() | 指定底层输入数据源的格式。使用 mongodb 从 MongoDB 读取。 |
dataFrame.read.option() |
以下代码示例展示了如何使用前面的配置设置从MongoDB中的people.contacts
读取数据
val dataFrame = spark.read .format("mongodb") .option("database", "people") .option("collection", "contacts") .load()
提示
DataFrame类型
DataFrame由一个Dataset
的Row
对象表示。《DataFrame》类型是Dataset[Row]
的别名。
模式推断
当您加载一个没有模式的Dataset或DataFrame时,Spark会采样记录来推断集合的模式。
假设MongoDB集合people.contacts
包含以下文档
{ "_id" : ObjectId("585024d558bef808ed84fc3e"), "name" : "Bilbo Baggins", "age" : 50 } { "_id" : ObjectId("585024d558bef808ed84fc3f"), "name" : "Gandalf", "age" : 1000 } { "_id" : ObjectId("585024d558bef808ed84fc40"), "name" : "Thorin", "age" : 195 } { "_id" : ObjectId("585024d558bef808ed84fc41"), "name" : "Balin", "age" : 178 } { "_id" : ObjectId("585024d558bef808ed84fc42"), "name" : "Kíli", "age" : 77 } { "_id" : ObjectId("585024d558bef808ed84fc43"), "name" : "Dwalin", "age" : 169 } { "_id" : ObjectId("585024d558bef808ed84fc44"), "name" : "Óin", "age" : 167 } { "_id" : ObjectId("585024d558bef808ed84fc45"), "name" : "Glóin", "age" : 158 } { "_id" : ObjectId("585024d558bef808ed84fc46"), "name" : "Fíli", "age" : 82 } { "_id" : ObjectId("585024d558bef808ed84fc47"), "name" : "Bombur" }
以下操作从people.contacts
加载数据并推断DataFrame的模式
Dataset<Row> dataFrame = spark.read() .format("mongodb") .option("database", "people") .option("collection", "contacts") .load();
要查看推断出的模式,请在您的Dataset<Row>
对象上使用printSchema()
方法,如下面的示例所示
dataFrame.printSchema();
root |-- _id: struct (nullable = true) | |-- oid: string (nullable = true) |-- age: integer (nullable = true) |-- name: string (nullable = true)
要查看DataFrame中的数据,请在您的DataFrame
对象上使用show()
方法,如下面的示例所示
dataFrame.show();
+--------------------+----+-------------+ | _id| age| name| +--------------------+----+-------------+ |[585024d558bef808...| 50|Bilbo Baggins| |[585024d558bef808...|1000| Gandalf| |[585024d558bef808...| 195| Thorin| |[585024d558bef808...| 178| Balin| |[585024d558bef808...| 77| Kíli| |[585024d558bef808...| 169| Dwalin| |[585024d558bef808...| 167| Óin| |[585024d558bef808...| 158| Glóin| |[585024d558bef808...| 82| Fíli| |[585024d558bef808...|null| Bombur| +--------------------+----+-------------+
当您加载一个没有模式的Dataset或DataFrame时,Spark会采样记录来推断集合的模式。
假设MongoDB集合people.contacts
包含以下文档
{ "_id" : ObjectId("585024d558bef808ed84fc3e"), "name" : "Bilbo Baggins", "age" : 50 } { "_id" : ObjectId("585024d558bef808ed84fc3f"), "name" : "Gandalf", "age" : 1000 } { "_id" : ObjectId("585024d558bef808ed84fc40"), "name" : "Thorin", "age" : 195 } { "_id" : ObjectId("585024d558bef808ed84fc41"), "name" : "Balin", "age" : 178 } { "_id" : ObjectId("585024d558bef808ed84fc42"), "name" : "Kíli", "age" : 77 } { "_id" : ObjectId("585024d558bef808ed84fc43"), "name" : "Dwalin", "age" : 169 } { "_id" : ObjectId("585024d558bef808ed84fc44"), "name" : "Óin", "age" : 167 } { "_id" : ObjectId("585024d558bef808ed84fc45"), "name" : "Glóin", "age" : 158 } { "_id" : ObjectId("585024d558bef808ed84fc46"), "name" : "Fíli", "age" : 82 } { "_id" : ObjectId("585024d558bef808ed84fc47"), "name" : "Bombur" }
以下操作从people.contacts
加载数据并推断DataFrame的模式
dataFrame = spark.read .format("mongodb") .option("database", "people") .option("collection", "contacts") .load()
要查看推断出的模式,请在您的DataFrame
对象上使用printSchema()
函数,如下面的示例所示
dataFrame.printSchema()
root |-- _id: struct (nullable = true) | |-- oid: string (nullable = true) |-- age: integer (nullable = true) |-- name: string (nullable = true)
要查看DataFrame中的数据,请在您的DataFrame
对象上使用show()
函数,如下面的示例所示
dataFrame.show()
+--------------------+----+-------------+ | _id| age| name| +--------------------+----+-------------+ |[585024d558bef808...| 50|Bilbo Baggins| |[585024d558bef808...|1000| Gandalf| |[585024d558bef808...| 195| Thorin| |[585024d558bef808...| 178| Balin| |[585024d558bef808...| 77| Kíli| |[585024d558bef808...| 169| Dwalin| |[585024d558bef808...| 167| Óin| |[585024d558bef808...| 158| Glóin| |[585024d558bef808...| 82| Fíli| |[585024d558bef808...|null| Bombur| +--------------------+----+-------------+
当您加载一个没有模式的Dataset或DataFrame时,Spark会采样记录来推断集合的模式。
假设MongoDB集合people.contacts
包含以下文档
{ "_id" : ObjectId("585024d558bef808ed84fc3e"), "name" : "Bilbo Baggins", "age" : 50 } { "_id" : ObjectId("585024d558bef808ed84fc3f"), "name" : "Gandalf", "age" : 1000 } { "_id" : ObjectId("585024d558bef808ed84fc40"), "name" : "Thorin", "age" : 195 } { "_id" : ObjectId("585024d558bef808ed84fc41"), "name" : "Balin", "age" : 178 } { "_id" : ObjectId("585024d558bef808ed84fc42"), "name" : "Kíli", "age" : 77 } { "_id" : ObjectId("585024d558bef808ed84fc43"), "name" : "Dwalin", "age" : 169 } { "_id" : ObjectId("585024d558bef808ed84fc44"), "name" : "Óin", "age" : 167 } { "_id" : ObjectId("585024d558bef808ed84fc45"), "name" : "Glóin", "age" : 158 } { "_id" : ObjectId("585024d558bef808ed84fc46"), "name" : "Fíli", "age" : 82 } { "_id" : ObjectId("585024d558bef808ed84fc47"), "name" : "Bombur" }
以下操作从people.contacts
加载数据并推断DataFrame的模式
val dataFrame = spark.read() .format("mongodb") .option("database", "people") .option("collection", "contacts") .load()
要查看推断出的模式,请在您的DataFrame
对象上使用printSchema()
方法,如下面的示例所示
dataFrame.printSchema()
root |-- _id: struct (nullable = true) | |-- oid: string (nullable = true) |-- age: integer (nullable = true) |-- name: string (nullable = true)
要查看DataFrame中的数据,请在您的DataFrame
对象上使用show()
方法,如下面的示例所示
dataFrame.show()
+--------------------+----+-------------+ | _id| age| name| +--------------------+----+-------------+ |[585024d558bef808...| 50|Bilbo Baggins| |[585024d558bef808...|1000| Gandalf| |[585024d558bef808...| 195| Thorin| |[585024d558bef808...| 178| Balin| |[585024d558bef808...| 77| Kíli| |[585024d558bef808...| 169| Dwalin| |[585024d558bef808...| 167| Óin| |[585024d558bef808...| 158| Glóin| |[585024d558bef808...| 82| Fíli| |[585024d558bef808...|null| Bombur| +--------------------+----+-------------+
使用模式提示指定已知字段
您可以通过指定schemaHint
配置选项来指定包含已知字段值的模式,以便在模式推断期间使用。您可以在以下任何Spark格式中指定schemaHint
选项:
类型 | 格式 | |||
---|---|---|---|---|
DDL | <field one name> <FIELD ONE TYPE>, <field two name> <FIELD TWO TYPE> | |||
SQL DDL | STRUCT<<field one name>: <FIELD ONE TYPE>, <field two name>: <FIELD TWO TYPE> | |||
JSON |
|
以下示例展示了如何使用Spark shell指定每种格式的schemaHint
选项。示例指定了一个名为"value"
的字符串值字段和一个名为"count"
的整数值字段。
import org.apache.spark.sql.types._ val mySchema = StructType(Seq( StructField("value", StringType), StructField("count", IntegerType)) // Generate DDL format mySchema.toDDL // Generate SQL DDL format mySchema.sql // Generate Simple String DDL format mySchema.simpleString // Generate JSON format mySchema.json
您还可以使用PySpark在简单的字符串DDL格式或JSON格式中指定schemaHint
选项,如下例所示
from pyspark.sql.types import StructType, StructField, StringType, IntegerType mySchema = StructType([ StructField('value', StringType(), True), StructField('count', IntegerType(), True)]) # Generate Simple String DDL format mySchema.simpleString() # Generate JSON format mySchema.json()
过滤器
当使用DataFrames或Datasets与过滤器一起使用时,底层的MongoDB Connector代码会在将数据发送到Spark之前构建一个聚合管道以过滤MongoDB中的数据。这通过仅检索和处理所需的数据来提高Spark性能。
MongoDB Spark Connector将以下过滤器转换为聚合管道阶段
And
EqualNullSafe
EqualTo
GreaterThan
GreaterThanOrEqual
In
IsNull
LessThan
LessThanOrEqual
Not
Or
StringContains
StringEndsWith
StringStartsWith
使用filter()
从MongoDB集合中读取数据子集。
考虑一个名为fruit
的集合,它包含以下文档
{ "_id" : 1, "type" : "apple", "qty" : 5 } { "_id" : 2, "type" : "orange", "qty" : 10 } { "_id" : 3, "type" : "banana", "qty" : 15 }
首先,设置一个DataFrame
对象以连接到默认的MongoDB数据源
df = spark.read.format("mongodb").load()
以下示例仅包含满足 qty
字段值大于或等于 10
的记录。
df.filter(df['qty'] >= 10).show()
操作将打印以下输出
+---+----+------+ |_id| qty| type| +---+----+------+ |2.0|10.0|orange| |3.0|15.0|banana| +---+----+------+
当使用DataFrames或Datasets与过滤器一起使用时,底层的MongoDB Connector代码会在将数据发送到Spark之前构建一个聚合管道以过滤MongoDB中的数据。这通过仅检索和处理所需的数据来提高Spark性能。
MongoDB Spark Connector将以下过滤器转换为聚合管道阶段
And
EqualNullSafe
EqualTo
GreaterThan
GreaterThanOrEqual
In
IsNull
LessThan
LessThanOrEqual
Not
Or
StringContains
StringEndsWith
StringStartsWith
以下示例对年龄低于100岁的字符进行筛选和输出
df.filter(df("age") < 100).show()
操作输出以下内容
+--------------------+---+-------------+ | _id|age| name| +--------------------+---+-------------+ |[5755d7b4566878c9...| 50|Bilbo Baggins| |[5755d7b4566878c9...| 82| Fíli| |[5755d7b4566878c9...| 77| Kíli| +--------------------+---+-------------+
SQL查询
在您的数据集上运行SQL查询之前,您必须为数据集注册一个临时视图。
以下操作注册了一个名为 characters
的表,然后查询它以找到所有100岁或以上的角色
implicitDS.createOrReplaceTempView("characters"); Dataset<Row> centenarians = spark.sql("SELECT name, age FROM characters WHERE age >= 100"); centenarians.show();
centenarians.show()
输出以下内容
+-------+----+ | name| age| +-------+----+ |Gandalf|1000| | Thorin| 195| | Balin| 178| | Dwalin| 169| | Óin| 167| | Glóin| 158| +-------+----+
在您可以针对您的DataFrame运行SQL查询之前,您需要注册一个临时表。
以下示例注册了一个名为 temp
的临时表,然后使用SQL查询其中 type
字段包含字母 e
的记录
df.createOrReplaceTempView("temp") some_fruit = spark.sql("SELECT type, qty FROM temp WHERE type LIKE '%e%'") some_fruit.show()
在 pyspark
命令行中,操作打印以下输出
+------+----+ | type| qty| +------+----+ | apple| 5.0| |orange|10.0| +------+----+
在您的数据集上运行SQL查询之前,您必须为数据集注册一个临时视图。
以下操作注册了一个名为 characters
的表,然后查询它以找到所有100岁或以上的角色
val characters = spark.read.format("mongodb").as[Character] characters.createOrReplaceTempView("characters") val centenarians = spark.sql("SELECT name, age FROM characters WHERE age >= 100") centenarians.show()
API文档
有关这些示例中使用的类型,请参阅以下Apache Spark API文档