文档菜单
文档首页
/
Spark 连接器
/

在流模式下从 MongoDB 读取

本页内容

  • 概述
  • 示例
  • API 文档

当从 MongoDB 数据库读取流时,MongoDB Spark 连接器支持两种模式:**微批处理**和**连续处理**。微批处理是默认的处理引擎,能够实现端到端延迟低至 100 毫秒,并具有恰好一次的错误容错保证。连续处理是 Spark 版本 2.3 中引入的一个实验性功能,能够实现端到端延迟低至 1 毫秒,并至少一次保证。

要了解有关连续处理的更多信息,请参阅Spark 文档.

注意

连接器从您的 MongoDB 部署的更改流中读取。要生成更改流中的更改事件,请在您的数据库上执行更新操作。

要了解有关更改流的更多信息,请参阅 MongoDB 手册中的 更改流

要从MongoDB读取数据,请调用您的 SparkSession 对象上的 readStream() 方法。此方法返回一个 DataStreamReader 对象,您可以使用它来指定流式读取操作的格式和其他配置设置。

您必须指定以下配置设置才能从MongoDB读取

设置
描述
readStream.format()
指定底层输入数据源的格式。使用 mongodb 从MongoDB读取。
readStream.option()

指定流设置,包括MongoDB部署的 连接字符串、MongoDB数据库和集合以及聚合管道阶段。

有关读取流配置选项的列表,请参阅流式读取配置选项指南。

readStream.schema()
指定输入模式。

以下代码片段展示了如何使用前面的配置设置来持续处理从MongoDB流式传输的数据。连接器将所有新数据追加到现有数据中,并且每秒异步将检查点写入到 /tmp/checkpointDir。将 Trigger.Continuous 参数传递给 trigger() 方法可以启用连续处理。

import org.apache.spark.sql.streaming.Trigger;
Dataset<Row> streamingDataset = <local SparkSession>.readStream()
.format("mongodb")
.load();
DataStreamWriter<Row> dataStreamWriter = streamingDataset.writeStream()
.trigger(Trigger.Continuous("1 second"))
.format("memory")
.option("checkpointLocation", "/tmp/checkpointDir")
.outputMode("append");
StreamingQuery query = dataStreamWriter.start();

注意

Spark 不会开始流式传输,直到您在流查询上调用 start() 方法。

有关方法的完整列表,请参阅 Java 结构化流式传输参考。

要从MongoDB读取数据,请调用您的 SparkSession 对象上的 readStream 函数。此函数返回一个 DataStreamReader 对象,您可以使用它来指定流式读取操作的格式和其他配置设置。

您必须指定以下配置设置才能从MongoDB读取

设置
描述
readStream.format()
指定底层输入数据源的格式。使用 mongodb 从MongoDB读取。
readStream.option()

指定流设置,包括MongoDB部署的 连接字符串、MongoDB数据库和集合以及聚合管道阶段。

有关读取流配置选项的列表,请参阅 流式读取配置选项指南。

readStream.schema()
指定输入模式。

以下代码片段展示了如何使用前面的配置设置来持续处理从MongoDB流式传输的数据。连接器将所有新数据追加到现有数据中,并且每秒异步将检查点写入到 /tmp/checkpointDir。将 continuous 参数传递给 trigger() 方法可以启用连续处理。

streamingDataFrame = (<local SparkSession>.readStream
.format("mongodb")
.load()
)
dataStreamWriter = (streamingDataFrame.writeStream
.trigger(continuous="1 second")
.format("memory")
.option("checkpointLocation", "/tmp/checkpointDir")
.outputMode("append")
)
query = dataStreamWriter.start()

注意

Spark 不会开始流式传输,直到您在流查询上调用 start() 方法。

关于方法的完整列表,请参阅pyspark Structured Streaming参考。

要从MongoDB读取数据,请调用您的SparkSession对象的readStream方法。此方法返回一个DataStreamReader对象,您可以使用它来指定流式读取操作的格式和其他配置设置。

您必须指定以下配置设置才能从MongoDB读取

设置
描述
readStream.format()
指定底层输入数据源的格式。使用 mongodb 从MongoDB读取。
readStream.option()

指定流设置,包括MongoDB部署的 连接字符串、MongoDB数据库和集合以及聚合管道阶段。

有关读取流配置选项的列表,请参阅 流式读取配置选项指南。

readStream.schema()
指定输入模式。

以下代码片段展示了如何使用前面的配置设置来持续处理从MongoDB流式传输的数据。连接器将所有新数据追加到现有数据中,并且每秒异步将检查点写入到 /tmp/checkpointDir。将 Trigger.Continuous 参数传递给 trigger() 方法可以启用连续处理。

import org.apache.spark.sql.streaming.Trigger
val streamingDataFrame = <local SparkSession>.readStream
.format("mongodb")
.load()
val dataStreamWriter = streamingDataFrame.writeStream
.trigger(Trigger.Continuous("1 second"))
.format("memory")
.option("checkpointLocation", "/tmp/checkpointDir")
.outputMode("append")
val query = dataStreamWriter.start()

注意

Spark 不会开始流式传输,直到您在流查询上调用 start() 方法。

关于方法的完整列表,请参阅Scala Structured Streaming参考。

以下示例展示了如何将MongoDB中的数据流式传输到您的控制台。

  1. 创建一个从MongoDB读取数据的DataStreamReader对象。

  2. 通过在您使用 DataStreamReader 创建的流式 Dataset 对象上调用 writeStream() 方法来创建一个 DataStreamWriter 对象。使用 format() 方法指定格式 console

  3. DataStreamWriter 实例上调用 start() 方法以开始流。

当新数据被插入 MongoDB 时,MongoDB 会根据您指定的 outputMode 将数据流式传输到您的控制台。

重要

请避免将大型数据集流式传输到您的控制台。流式传输到控制台对内存消耗很大,仅适用于测试目的。

// create a local SparkSession
SparkSession spark = SparkSession.builder()
.appName("readExample")
.master("spark://spark-master:<port>")
.config("spark.jars", "<mongo-spark-connector-JAR-file-name>")
.getOrCreate();
// define the schema of the source collection
StructType readSchema = new StructType()
.add("company_symbol", DataTypes.StringType)
.add("company_name", DataTypes.StringType)
.add("price", DataTypes.DoubleType)
.add("tx_time", DataTypes.TimestampType);
// define a streaming query
DataStreamWriter<Row> dataStreamWriter = spark.readStream()
.format("mongodb")
.option("spark.mongodb.connection.uri", "<mongodb-connection-string>")
.option("spark.mongodb.database", "<database-name>")
.option("spark.mongodb.collection", "<collection-name>")
.schema(readSchema)
.load()
// manipulate your streaming data
.writeStream()
.format("console")
.trigger(Trigger.Continuous("1 second"))
.outputMode("append");
// run the query
StreamingQuery query = dataStreamWriter.start();
  1. 创建一个从MongoDB读取数据的DataStreamReader对象。

  2. 通过在您使用 DataStreamReader 创建的流式 DataFrame 上调用 writeStream() 方法来创建一个 DataStreamWriter 对象。通过使用 format() 方法指定格式 console

  3. DataStreamWriter 实例上调用 start() 方法以开始流。

当新数据被插入 MongoDB 时,MongoDB 会根据您指定的 outputMode 将数据流式传输到您的控制台。

重要

请避免将大型数据集流式传输到您的控制台。流式传输到控制台对内存消耗很大,仅适用于测试目的。

# create a local SparkSession
spark = SparkSession.builder \
.appName("readExample") \
.master("spark://spark-master:<port>") \
.config("spark.jars", "<mongo-spark-connector-JAR-file-name>") \
.getOrCreate()
# define the schema of the source collection
readSchema = (StructType()
.add('company_symbol', StringType())
.add('company_name', StringType())
.add('price', DoubleType())
.add('tx_time', TimestampType())
)
# define a streaming query
dataStreamWriter = (spark.readStream
.format("mongodb")
.option("spark.mongodb.connection.uri", <mongodb-connection-string>)
.option('spark.mongodb.database', <database-name>)
.option('spark.mongodb.collection', <collection-name>)
.schema(readSchema)
.load()
# manipulate your streaming data
.writeStream
.format("console")
.trigger(continuous="1 second")
.outputMode("append")
)
# run the query
query = dataStreamWriter.start()
  1. 创建一个从MongoDB读取数据的DataStreamReader对象。

  2. 通过在您使用 DataStreamReader 创建的流式 DataFrame 对象上调用 writeStream() 方法来创建一个 DataStreamWriter 对象。通过使用 format() 方法指定格式 console

  3. DataStreamWriter 实例上调用 start() 方法以开始流。

当新数据被插入 MongoDB 时,MongoDB 会根据您指定的 outputMode 将数据流式传输到您的控制台。

重要

请避免将大型数据集流式传输到您的控制台。流式传输到控制台对内存消耗很大,仅适用于测试目的。

// create a local SparkSession
val spark = SparkSession.builder
.appName("readExample")
.master("spark://spark-master:<port>")
.config("spark.jars", "<mongo-spark-connector-JAR-file-name>")
.getOrCreate()
// define the schema of the source collection
val readSchema = StructType()
.add("company_symbol", StringType())
.add("company_name", StringType())
.add("price", DoubleType())
.add("tx_time", TimestampType())
// define a streaming query
val dataStreamWriter = spark.readStream
.format("mongodb")
.option("spark.mongodb.connection.uri", <mongodb-connection-string>)
.option("spark.mongodb.database", <database-name>)
.option("spark.mongodb.collection", <collection-name>)
.schema(readSchema)
.load()
// manipulate your streaming data
.writeStream
.format("console")
.trigger(Trigger.Continuous("1 second"))
.outputMode("append")
// run the query
val query = dataStreamWriter.start()

重要

变更流的模式推断

如果您将 change.stream.publish.full.document.only 选项设置为 true,Spark 连接器将使用扫描文档的模式推断 DataFrame 的模式。如果将选项设置为 false,则必须指定一个模式。

有关此设置的更多信息以及完整更改流配置选项列表,请参阅读取配置选项指南。

有关这些示例中使用的类型的更多信息,请参阅以下 Apache Spark API 文档

返回

流模式