在流模式下从 MongoDB 读取
概述
当从 MongoDB 数据库读取流时,MongoDB Spark 连接器支持两种模式:**微批处理**和**连续处理**。微批处理是默认的处理引擎,能够实现端到端延迟低至 100 毫秒,并具有恰好一次的错误容错保证。连续处理是 Spark 版本 2.3 中引入的一个实验性功能,能够实现端到端延迟低至 1 毫秒,并至少一次保证。
要了解有关连续处理的更多信息,请参阅Spark 文档.
注意
连接器从您的 MongoDB 部署的更改流中读取。要生成更改流中的更改事件,请在您的数据库上执行更新操作。
要了解有关更改流的更多信息,请参阅 MongoDB 手册中的 更改流。
要从MongoDB读取数据,请调用您的 SparkSession
对象上的 readStream()
方法。此方法返回一个 DataStreamReader
对象,您可以使用它来指定流式读取操作的格式和其他配置设置。
您必须指定以下配置设置才能从MongoDB读取
设置 | 描述 |
---|---|
readStream.format() | 指定底层输入数据源的格式。使用 mongodb 从MongoDB读取。 |
readStream.option() | |
readStream.schema() | 指定输入模式。 |
以下代码片段展示了如何使用前面的配置设置来持续处理从MongoDB流式传输的数据。连接器将所有新数据追加到现有数据中,并且每秒异步将检查点写入到 /tmp/checkpointDir
。将 Trigger.Continuous
参数传递给 trigger()
方法可以启用连续处理。
import org.apache.spark.sql.streaming.Trigger; Dataset<Row> streamingDataset = <local SparkSession>.readStream() .format("mongodb") .load(); DataStreamWriter<Row> dataStreamWriter = streamingDataset.writeStream() .trigger(Trigger.Continuous("1 second")) .format("memory") .option("checkpointLocation", "/tmp/checkpointDir") .outputMode("append"); StreamingQuery query = dataStreamWriter.start();
注意
Spark 不会开始流式传输,直到您在流查询上调用 start()
方法。
有关方法的完整列表,请参阅 Java 结构化流式传输参考。
要从MongoDB读取数据,请调用您的 SparkSession
对象上的 readStream
函数。此函数返回一个 DataStreamReader
对象,您可以使用它来指定流式读取操作的格式和其他配置设置。
您必须指定以下配置设置才能从MongoDB读取
设置 | 描述 |
---|---|
readStream.format() | 指定底层输入数据源的格式。使用 mongodb 从MongoDB读取。 |
readStream.option() | |
readStream.schema() | 指定输入模式。 |
以下代码片段展示了如何使用前面的配置设置来持续处理从MongoDB流式传输的数据。连接器将所有新数据追加到现有数据中,并且每秒异步将检查点写入到 /tmp/checkpointDir
。将 continuous
参数传递给 trigger()
方法可以启用连续处理。
streamingDataFrame = (<local SparkSession>.readStream .format("mongodb") .load() ) dataStreamWriter = (streamingDataFrame.writeStream .trigger(continuous="1 second") .format("memory") .option("checkpointLocation", "/tmp/checkpointDir") .outputMode("append") ) query = dataStreamWriter.start()
注意
Spark 不会开始流式传输,直到您在流查询上调用 start()
方法。
关于方法的完整列表,请参阅pyspark Structured Streaming参考。
要从MongoDB读取数据,请调用您的SparkSession
对象的readStream
方法。此方法返回一个DataStreamReader
对象,您可以使用它来指定流式读取操作的格式和其他配置设置。
您必须指定以下配置设置才能从MongoDB读取
设置 | 描述 |
---|---|
readStream.format() | 指定底层输入数据源的格式。使用 mongodb 从MongoDB读取。 |
readStream.option() | |
readStream.schema() | 指定输入模式。 |
以下代码片段展示了如何使用前面的配置设置来持续处理从MongoDB流式传输的数据。连接器将所有新数据追加到现有数据中,并且每秒异步将检查点写入到 /tmp/checkpointDir
。将 Trigger.Continuous
参数传递给 trigger()
方法可以启用连续处理。
import org.apache.spark.sql.streaming.Trigger val streamingDataFrame = <local SparkSession>.readStream .format("mongodb") .load() val dataStreamWriter = streamingDataFrame.writeStream .trigger(Trigger.Continuous("1 second")) .format("memory") .option("checkpointLocation", "/tmp/checkpointDir") .outputMode("append") val query = dataStreamWriter.start()
注意
Spark 不会开始流式传输,直到您在流查询上调用 start()
方法。
关于方法的完整列表,请参阅Scala Structured Streaming参考。
示例
以下示例展示了如何将MongoDB中的数据流式传输到您的控制台。
创建一个从MongoDB读取数据的
DataStreamReader
对象。通过在您使用
DataStreamReader
创建的流式Dataset
对象上调用writeStream()
方法来创建一个DataStreamWriter
对象。使用format()
方法指定格式console
。在
DataStreamWriter
实例上调用start()
方法以开始流。
当新数据被插入 MongoDB 时,MongoDB 会根据您指定的 outputMode
将数据流式传输到您的控制台。
重要
请避免将大型数据集流式传输到您的控制台。流式传输到控制台对内存消耗很大,仅适用于测试目的。
// create a local SparkSession SparkSession spark = SparkSession.builder() .appName("readExample") .master("spark://spark-master:<port>") .config("spark.jars", "<mongo-spark-connector-JAR-file-name>") .getOrCreate(); // define the schema of the source collection StructType readSchema = new StructType() .add("company_symbol", DataTypes.StringType) .add("company_name", DataTypes.StringType) .add("price", DataTypes.DoubleType) .add("tx_time", DataTypes.TimestampType); // define a streaming query DataStreamWriter<Row> dataStreamWriter = spark.readStream() .format("mongodb") .option("spark.mongodb.connection.uri", "<mongodb-connection-string>") .option("spark.mongodb.database", "<database-name>") .option("spark.mongodb.collection", "<collection-name>") .schema(readSchema) .load() // manipulate your streaming data .writeStream() .format("console") .trigger(Trigger.Continuous("1 second")) .outputMode("append"); // run the query StreamingQuery query = dataStreamWriter.start();
创建一个从MongoDB读取数据的
DataStreamReader
对象。通过在您使用
DataStreamReader
创建的流式DataFrame
上调用writeStream()
方法来创建一个DataStreamWriter
对象。通过使用format()
方法指定格式console
。在
DataStreamWriter
实例上调用start()
方法以开始流。
当新数据被插入 MongoDB 时,MongoDB 会根据您指定的 outputMode
将数据流式传输到您的控制台。
重要
请避免将大型数据集流式传输到您的控制台。流式传输到控制台对内存消耗很大,仅适用于测试目的。
# create a local SparkSession spark = SparkSession.builder \ .appName("readExample") \ .master("spark://spark-master:<port>") \ .config("spark.jars", "<mongo-spark-connector-JAR-file-name>") \ .getOrCreate() # define the schema of the source collection readSchema = (StructType() .add('company_symbol', StringType()) .add('company_name', StringType()) .add('price', DoubleType()) .add('tx_time', TimestampType()) ) # define a streaming query dataStreamWriter = (spark.readStream .format("mongodb") .option("spark.mongodb.connection.uri", <mongodb-connection-string>) .option('spark.mongodb.database', <database-name>) .option('spark.mongodb.collection', <collection-name>) .schema(readSchema) .load() # manipulate your streaming data .writeStream .format("console") .trigger(continuous="1 second") .outputMode("append") ) # run the query query = dataStreamWriter.start()
创建一个从MongoDB读取数据的
DataStreamReader
对象。通过在您使用
DataStreamReader
创建的流式DataFrame
对象上调用writeStream()
方法来创建一个DataStreamWriter
对象。通过使用format()
方法指定格式console
。在
DataStreamWriter
实例上调用start()
方法以开始流。
当新数据被插入 MongoDB 时,MongoDB 会根据您指定的 outputMode
将数据流式传输到您的控制台。
重要
请避免将大型数据集流式传输到您的控制台。流式传输到控制台对内存消耗很大,仅适用于测试目的。
// create a local SparkSession val spark = SparkSession.builder .appName("readExample") .master("spark://spark-master:<port>") .config("spark.jars", "<mongo-spark-connector-JAR-file-name>") .getOrCreate() // define the schema of the source collection val readSchema = StructType() .add("company_symbol", StringType()) .add("company_name", StringType()) .add("price", DoubleType()) .add("tx_time", TimestampType()) // define a streaming query val dataStreamWriter = spark.readStream .format("mongodb") .option("spark.mongodb.connection.uri", <mongodb-connection-string>) .option("spark.mongodb.database", <database-name>) .option("spark.mongodb.collection", <collection-name>) .schema(readSchema) .load() // manipulate your streaming data .writeStream .format("console") .trigger(Trigger.Continuous("1 second")) .outputMode("append") // run the query val query = dataStreamWriter.start()
重要
变更流的模式推断
如果您将 change.stream.publish.full.document.only
选项设置为 true
,Spark 连接器将使用扫描文档的模式推断 DataFrame
的模式。如果将选项设置为 false
,则必须指定一个模式。
有关此设置的更多信息以及完整更改流配置选项列表,请参阅读取配置选项指南。
API 文档
有关这些示例中使用的类型的更多信息,请参阅以下 Apache Spark API 文档