Spark 连接器入门指南
先决条件
对 MongoDB 和 Apache Spark 的基本操作知识。请参考MongoDB 文档,Spark 文档,以及这篇 MongoDB 白皮书 了解更多细节。
MongoDB 版本 4.0 或更高版本
Spark 版本 3.1 至 3.5
Java 8 或更高版本
入门
重要
在连接器 10.0.0 版本及以后,使用以下格式mongodb
从 MongoDB 读取和写入
df = spark.read.format("mongodb").load()
依赖管理
将Spark Core、Spark SQL和MongoDB Spark Connector的依赖项提供给您的依赖管理工具。
从版本3.2.0开始,Apache Spark同时支持Scala 2.12和2.13。Spark 3.1.3及以下版本仅支持Scala 2.12。为了支持这两个Scala版本,Spark Connector的10.4.0版本产生两个工件
org.mongodb.spark:mongo-spark-connector_2.12:10.4.0
是针对Scala 2.12编译的,并支持Spark 3.1.x及以上版本。org.mongodb.spark:mongo-spark-connector_2.13:10.4.0
是针对Scala 2.13编译的,并支持Spark 3.2.x及以上版本。
重要
使用与您的Scala和Spark版本兼容的Spark Connector工件。
以下是从Maven pom.xml
文件中摘录的内容,展示了如何包含与Scala 2.12兼容的依赖项
<dependencies> <dependency> <groupId>org.mongodb.spark</groupId> <artifactId>mongo-spark-connector_2.12</artifactId> <version>10.4.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.12</artifactId> <version>3.3.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.12</artifactId> <version>3.3.1</version> </dependency> </dependencies>
配置
当通过 SparkSession
指定Connector配置时,必须适当地添加设置前缀。有关详细信息和其他可用的MongoDB Spark Connector选项,请参阅配置Spark指南。
package com.mongodb.spark_examples; import org.apache.spark.sql.SparkSession; public final class GettingStarted { public static void main(final String[] args) throws InterruptedException { /* Create the SparkSession. * If config arguments are passed from the command line using --conf, * parse args for the values to set. */ SparkSession spark = SparkSession.builder() .master("local") .appName("MongoSparkConnectorIntro") .config("spark.mongodb.read.connection.uri", "mongodb://127.0.0.1/test.myCollection") .config("spark.mongodb.write.connection.uri", "mongodb://127.0.0.1/test.myCollection") .getOrCreate(); // Application logic } }
spark.mongodb.read.connection.uri
指定了MongoDB服务器地址(127.0.0.1
)、要连接的数据库(test
)和读取数据的集合(myCollection
),以及读取优先级。spark.mongodb.write.connection.uri
指定了MongoDB服务器地址(127.0.0.1
)、要连接的数据库(test
)和写入数据的集合(myCollection
)。
您可以使用 SparkSession
对象将数据写入MongoDB,从MongoDB读取数据,创建数据集,并执行SQL操作。
重要
在Connector的10.0.0及更高版本中,使用格式 mongodb
从MongoDB读取和写入数据
df = spark.read.format("mongodb").load()
Python Spark Shell
本教程使用 pyspark
命令行,但代码同样适用于独立的 Python 应用程序。
启动 pyspark
命令行时,可以指定以下选项
使用
--packages
选项下载 MongoDB Spark 连接器包。以下包可用mongo-spark-connector
使用
--conf
选项配置 MongoDB Spark 连接器。这些设置配置了SparkConf
对象。注意
如果您使用
SparkConf
来配置 Spark 连接器,您必须适当地为设置添加前缀。有关详细信息和其他可用的 MongoDB Spark 连接器选项,请参阅配置 Spark 指南。
以下示例从命令行启动 pyspark
命令行
./bin/pyspark --conf "spark.mongodb.read.connection.uri=mongodb://127.0.0.1/test.myCollection?readPreference=primaryPreferred" \ --conf "spark.mongodb.write.connection.uri=mongodb://127.0.0.1/test.myCollection" \ --packages org.mongodb.spark:mongo-spark-connector_2.12:10.4.0
spark.mongodb.read.connection.uri
指定 MongoDB 服务器地址(127.0.0.1
)、要连接的数据库(test
)、要从中读取数据的集合(myCollection
)以及读取偏好。spark.mongodb.write.connection.uri
指定 MongoDB 服务器地址(127.0.0.1
)、要连接的数据库(test
)以及要写入数据的集合(myCollection
)。默认情况下通过端口27017
连接。packages
选项指定 Spark 连接器的 Maven 坐标,格式为groupId:artifactId:version
。
本教程中的示例将使用此数据库和集合。
创建 SparkSession
对象
注意
当你启动 pyspark
时,默认会获得一个名为 spark
的 SparkSession
对象。在一个独立的 Python 应用程序中,你需要显式创建你的 SparkSession
对象,如下所示。
如果你在启动 pyspark
时指定了 spark.mongodb.read.connection.uri
和 spark.mongodb.write.connection.uri
配置选项,默认的 SparkSession
对象会使用它们。如果你想在 pyspark
内部创建自己的 SparkSession
对象,可以使用 SparkSession.builder
并指定不同的配置选项。
from pyspark.sql import SparkSession my_spark = SparkSession \ .builder \ .appName("myApp") \ .config("spark.mongodb.read.connection.uri", "mongodb://127.0.0.1/test.coll") \ .config("spark.mongodb.write.connection.uri", "mongodb://127.0.0.1/test.coll") \ .getOrCreate()
你可以使用 SparkSession
对象将数据写入 MongoDB、从 MongoDB 读取数据、创建 DataFrames 以及执行 SQL 操作。
重要
在Connector的10.0.0及更高版本中,使用格式 mongodb
从MongoDB读取和写入数据
df = spark.read.format("mongodb").load()
Spark Shell
在启动 Spark Shell 时,指定
使用
--packages
选项下载 MongoDB Spark 连接器包。以下包可用mongo-spark-connector
使用
--conf
选项配置 MongoDB Spark 连接器。这些设置配置了SparkConf
对象。注意
如果您使用
SparkConf
来配置 Spark 连接器,您必须适当地为设置添加前缀。有关详细信息和其他可用的 MongoDB Spark 连接器选项,请参阅配置 Spark 指南。
例如,
./bin/spark-shell --conf "spark.mongodb.read.connection.uri=mongodb://127.0.0.1/test.myCollection?readPreference=primaryPreferred" \ --conf "spark.mongodb.write.connection.uri=mongodb://127.0.0.1/test.myCollection" \ --packages org.mongodb.spark:mongo-spark-connector_2.12:10.4.0
spark.mongodb.read.connection.uri
指定 MongoDB 服务器地址(127.0.0.1
)、要连接的数据库(test
)、要从中读取数据的集合(myCollection
)以及读取偏好。spark.mongodb.write.connection.uri
指定 MongoDB 服务器地址(127.0.0.1
)、要连接的数据库(test
)以及要写入数据的集合(myCollection
)。默认情况下通过端口27017
连接。packages
选项指定 Spark 连接器的 Maven 坐标,格式为groupId:artifactId:version
。
导入 MongoDB 连接器包
通过在 Spark Shell 中导入以下包来启用针对你的 SparkSession
和 Dataset
对象的 MongoDB 连接器特定函数和隐式操作。
import com.mongodb.spark._
连接到MongoDB
当数据集操作需要从MongoDB读取或向MongoDB写入时,将自动连接到MongoDB。
自包含的Scala应用程序
依赖管理
将Spark Core、Spark SQL和MongoDB Spark Connector的依赖项提供给您的依赖管理工具。
以下摘录演示了如何在 SBT build.scala
文件中包含这些依赖项
scalaVersion := "2.12", libraryDependencies ++= Seq( "org.mongodb.spark" %% "mongo-spark-connector_2.12" % "10.4.0", "org.apache.spark" %% "spark-core" % "3.3.1", "org.apache.spark" %% "spark-sql" % "3.3.1" )
配置
当通过 SparkSession
指定连接器配置时,必须适当地为设置添加前缀。有关详细信息和其他可用的 MongoDB Spark 连接器选项,请参阅 配置 Spark 指南。
package com.mongodb object GettingStarted { def main(args: Array[String]): Unit = { /* Create the SparkSession. * If config arguments are passed from the command line using --conf, * parse args for the values to set. */ import org.apache.spark.sql.SparkSession val spark = SparkSession.builder() .master("local") .appName("MongoSparkConnectorIntro") .config("spark.mongodb.read.connection.uri", "mongodb://127.0.0.1/test.myCollection") .config("spark.mongodb.write.connection.uri", "mongodb://127.0.0.1/test.myCollection") .getOrCreate() } }
故障排除
如果你遇到 java.net.BindException: Can't assign requested address
错误,
请检查是否已运行另一个 Spark shell。
尝试设置
SPARK_LOCAL_IP
环境变量;例如。export SPARK_LOCAL_IP=127.0.0.1 尝试在启动 Spark shell 时包含以下选项
--driver-java-options "-Djava.net.preferIPv4Stack=true"
如果你在此教程的示例中遇到错误,你可能需要清除你的本地 Ivy 缓存(~/.ivy2/cache/org.mongodb.spark
和 ~/.ivy2/jars
)。