文档菜单
文档首页
/
Spark 连接器

Spark 连接器入门指南

本页内容

  • 先决条件
  • 入门指南
  • 教程
  • 对 MongoDB 和 Apache Spark 的基本操作知识。请参考MongoDB 文档Spark 文档,以及这篇 MongoDB 白皮书 了解更多细节。

  • MongoDB 版本 4.0 或更高版本

  • Spark 版本 3.1 至 3.5

  • Java 8 或更高版本

重要

在连接器 10.0.0 版本及以后,使用以下格式mongodb 从 MongoDB 读取和写入

df = spark.read.format("mongodb").load()

将Spark Core、Spark SQL和MongoDB Spark Connector的依赖项提供给您的依赖管理工具。

从版本3.2.0开始,Apache Spark同时支持Scala 2.12和2.13。Spark 3.1.3及以下版本仅支持Scala 2.12。为了支持这两个Scala版本,Spark Connector的10.4.0版本产生两个工件

  • org.mongodb.spark:mongo-spark-connector_2.12:10.4.0 是针对Scala 2.12编译的,并支持Spark 3.1.x及以上版本。

  • org.mongodb.spark:mongo-spark-connector_2.13:10.4.0 是针对Scala 2.13编译的,并支持Spark 3.2.x及以上版本。

重要

使用与您的Scala和Spark版本兼容的Spark Connector工件。

以下是从Maven pom.xml文件中摘录的内容,展示了如何包含与Scala 2.12兼容的依赖项

<dependencies>
<dependency>
<groupId>org.mongodb.spark</groupId>
<artifactId>mongo-spark-connector_2.12</artifactId>
<version>10.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.3.1</version>
</dependency>
</dependencies>

当通过 SparkSession 指定Connector配置时,必须适当地添加设置前缀。有关详细信息和其他可用的MongoDB Spark Connector选项,请参阅配置Spark指南。

package com.mongodb.spark_examples;
import org.apache.spark.sql.SparkSession;
public final class GettingStarted {
public static void main(final String[] args) throws InterruptedException {
/* Create the SparkSession.
* If config arguments are passed from the command line using --conf,
* parse args for the values to set.
*/
SparkSession spark = SparkSession.builder()
.master("local")
.appName("MongoSparkConnectorIntro")
.config("spark.mongodb.read.connection.uri", "mongodb://127.0.0.1/test.myCollection")
.config("spark.mongodb.write.connection.uri", "mongodb://127.0.0.1/test.myCollection")
.getOrCreate();
// Application logic
}
}
  • spark.mongodb.read.connection.uri 指定了MongoDB服务器地址(127.0.0.1)、要连接的数据库(test)和读取数据的集合(myCollection),以及读取优先级。

  • spark.mongodb.write.connection.uri 指定了MongoDB服务器地址(127.0.0.1)、要连接的数据库(test)和写入数据的集合(myCollection)。

您可以使用 SparkSession 对象将数据写入MongoDB,从MongoDB读取数据,创建数据集,并执行SQL操作。

重要

在Connector的10.0.0及更高版本中,使用格式 mongodb 从MongoDB读取和写入数据

df = spark.read.format("mongodb").load()

本教程使用 pyspark 命令行,但代码同样适用于独立的 Python 应用程序。

启动 pyspark 命令行时,可以指定以下选项

  • 使用 --packages 选项下载 MongoDB Spark 连接器包。以下包可用

    • mongo-spark-connector

  • 使用 --conf 选项配置 MongoDB Spark 连接器。这些设置配置了 SparkConf 对象。

    注意

    如果您使用 SparkConf 来配置 Spark 连接器,您必须适当地为设置添加前缀。有关详细信息和其他可用的 MongoDB Spark 连接器选项,请参阅配置 Spark 指南。

以下示例从命令行启动 pyspark 命令行

./bin/pyspark --conf "spark.mongodb.read.connection.uri=mongodb://127.0.0.1/test.myCollection?readPreference=primaryPreferred" \
--conf "spark.mongodb.write.connection.uri=mongodb://127.0.0.1/test.myCollection" \
--packages org.mongodb.spark:mongo-spark-connector_2.12:10.4.0
  • spark.mongodb.read.connection.uri 指定 MongoDB 服务器地址(127.0.0.1)、要连接的数据库(test)、要从中读取数据的集合(myCollection)以及读取偏好。

  • spark.mongodb.write.connection.uri 指定 MongoDB 服务器地址(127.0.0.1)、要连接的数据库(test)以及要写入数据的集合(myCollection)。默认情况下通过端口 27017 连接。

  • packages 选项指定 Spark 连接器的 Maven 坐标,格式为 groupId:artifactId:version

本教程中的示例将使用此数据库和集合。

注意

当你启动 pyspark 时,默认会获得一个名为 sparkSparkSession 对象。在一个独立的 Python 应用程序中,你需要显式创建你的 SparkSession 对象,如下所示。

如果你在启动 pyspark 时指定了 spark.mongodb.read.connection.urispark.mongodb.write.connection.uri 配置选项,默认的 SparkSession 对象会使用它们。如果你想在 pyspark 内部创建自己的 SparkSession 对象,可以使用 SparkSession.builder 并指定不同的配置选项。

from pyspark.sql import SparkSession
my_spark = SparkSession \
.builder \
.appName("myApp") \
.config("spark.mongodb.read.connection.uri", "mongodb://127.0.0.1/test.coll") \
.config("spark.mongodb.write.connection.uri", "mongodb://127.0.0.1/test.coll") \
.getOrCreate()

你可以使用 SparkSession 对象将数据写入 MongoDB、从 MongoDB 读取数据、创建 DataFrames 以及执行 SQL 操作。

重要

在Connector的10.0.0及更高版本中,使用格式 mongodb 从MongoDB读取和写入数据

df = spark.read.format("mongodb").load()

在启动 Spark Shell 时,指定

  • 使用 --packages 选项下载 MongoDB Spark 连接器包。以下包可用

    • mongo-spark-connector

  • 使用 --conf 选项配置 MongoDB Spark 连接器。这些设置配置了 SparkConf 对象。

    注意

    如果您使用 SparkConf 来配置 Spark 连接器,您必须适当地为设置添加前缀。有关详细信息和其他可用的 MongoDB Spark 连接器选项,请参阅配置 Spark 指南。

例如,

./bin/spark-shell --conf "spark.mongodb.read.connection.uri=mongodb://127.0.0.1/test.myCollection?readPreference=primaryPreferred" \
--conf "spark.mongodb.write.connection.uri=mongodb://127.0.0.1/test.myCollection" \
--packages org.mongodb.spark:mongo-spark-connector_2.12:10.4.0
  • spark.mongodb.read.connection.uri 指定 MongoDB 服务器地址(127.0.0.1)、要连接的数据库(test)、要从中读取数据的集合(myCollection)以及读取偏好。

  • spark.mongodb.write.connection.uri 指定 MongoDB 服务器地址(127.0.0.1)、要连接的数据库(test)以及要写入数据的集合(myCollection)。默认情况下通过端口 27017 连接。

  • packages 选项指定 Spark 连接器的 Maven 坐标,格式为 groupId:artifactId:version

通过在 Spark Shell 中导入以下包来启用针对你的 SparkSessionDataset 对象的 MongoDB 连接器特定函数和隐式操作。

import com.mongodb.spark._

当数据集操作需要从MongoDB读取或向MongoDB写入时,将自动连接到MongoDB。

将Spark Core、Spark SQL和MongoDB Spark Connector的依赖项提供给您的依赖管理工具。

以下摘录演示了如何在 SBT build.scala 文件中包含这些依赖项

scalaVersion := "2.12",
libraryDependencies ++= Seq(
"org.mongodb.spark" %% "mongo-spark-connector_2.12" % "10.4.0",
"org.apache.spark" %% "spark-core" % "3.3.1",
"org.apache.spark" %% "spark-sql" % "3.3.1"
)

当通过 SparkSession 指定连接器配置时,必须适当地为设置添加前缀。有关详细信息和其他可用的 MongoDB Spark 连接器选项,请参阅 配置 Spark 指南。

package com.mongodb
object GettingStarted {
def main(args: Array[String]): Unit = {
/* Create the SparkSession.
* If config arguments are passed from the command line using --conf,
* parse args for the values to set.
*/
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
.master("local")
.appName("MongoSparkConnectorIntro")
.config("spark.mongodb.read.connection.uri", "mongodb://127.0.0.1/test.myCollection")
.config("spark.mongodb.write.connection.uri", "mongodb://127.0.0.1/test.myCollection")
.getOrCreate()
}
}

如果你遇到 java.net.BindException: Can't assign requested address 错误,

  • 请检查是否已运行另一个 Spark shell。

  • 尝试设置 SPARK_LOCAL_IP 环境变量;例如。

    export SPARK_LOCAL_IP=127.0.0.1
  • 尝试在启动 Spark shell 时包含以下选项

    --driver-java-options "-Djava.net.preferIPv4Stack=true"

如果你在此教程的示例中遇到错误,你可能需要清除你的本地 Ivy 缓存(~/.ivy2/cache/org.mongodb.spark~/.ivy2/jars)。

返回

MongoDB Connector for Spark