文档菜单
文档首页
/
Spark 连接器
/

在流模式下写入 MongoDB

要将数据写入 MongoDB,请在您的 Dataset<Row> 对象上调用writeStream() 方法。此方法返回一个 DataStreamWriter 对象,您可以使用该对象指定流写入操作的格式和其他配置设置。

您必须指定以下配置设置才能写入MongoDB

设置
描述
writeStream.format()
指定底层输出数据源的格式。使用 mongodb 将数据写入MongoDB。
writeStream.option()

指定流设置,包括MongoDB部署连接字符串、MongoDB数据库和集合,以及检查点目录。

有关写入流配置选项的列表,请参阅流写入配置选项指南。

writeStream.outputMode()
指定如何将流DataFrame的数据写入流接收器。要查看所有支持输出模式的列表,请参阅Java outputMode文档。
writeStream.trigger()

指定Spark连接器将结果写入流接收器的频率。在从配置的DataStreamReader创建的DataStreamWriter对象上调用此方法。

要使用连续处理,将Trigger.Continuous(<time value>)作为参数传递,其中<time value>是Spark连接器异步检查点的频率。如果您传递Trigger类的任何其他静态方法,或者如果没有调用writeStream.trigger(),则Spark连接器使用微批处理。

要查看所有支持的处理策略列表,请参阅Java触发器文档。

以下代码示例展示了如何使用前面的配置设置将数据流式传输到MongoDB

<streaming DataFrame>.writeStream()
.format("mongodb")
.option("spark.mongodb.connection.uri", <mongodb-connection-string>)
.option("spark.mongodb.database", <database-name>)
.option("spark.mongodb.collection", <collection-name>)
.outputMode("append");

关于方法的完整列表,请参阅Java Structured Streaming 参考文档。

要将数据写入MongoDB,请调用您的DataFrame对象的writeStream函数。此函数返回一个DataStreamWriter对象,您可以使用它来指定流式写入操作的格式和其他配置设置。

您必须指定以下配置设置才能写入MongoDB

设置
描述
writeStream.format()
指定底层输出数据源的格式。使用 mongodb 将数据写入MongoDB。
writeStream.option()

指定流设置,包括MongoDB部署连接字符串、MongoDB数据库和集合以及检查点目录。

有关写入流配置选项的列表,请参阅流式写入配置选项指南

writeStream.outputMode()
指定Spark连接器如何将流式DataFrame写入流式接收器。要查看所有受支持输出模式的列表,请参阅pyspark outputMode 文档。
writeStream.trigger()

指定Spark连接器将结果写入流接收器的频率。在从配置的DataStreamReader创建的DataStreamWriter对象上调用此方法。

要使用连续处理,请使用continuous参数传递时间值给函数。如果您传递任何其他命名参数,或者没有调用writeStream.trigger(),Spark连接器将使用微批处理。

要查看所有支持的处理策略列表,请参阅pyspark 触发器文档。

以下代码示例展示了如何使用前面的配置设置将数据流式传输到MongoDB

<streaming DataFrame>.writeStream \
.format("mongodb") \
.option("spark.mongodb.connection.uri", <mongodb-connection-string>) \
.option("spark.mongodb.database", <database-name>) \
.option("spark.mongodb.collection", <collection-name>) \
.outputMode("append")

要查看函数的完整列表,请参阅pyspark 结构化流参考。

要将数据写入MongoDB,请在您的DataFrame对象上调用write方法。此方法返回一个DataStreamWriter对象,您可以使用它来指定流式写入操作的格式和其他配置设置。

您必须指定以下配置设置才能写入MongoDB

设置
描述
writeStream.format()
指定底层输出数据源的格式。使用 mongodb 将数据写入MongoDB。
writeStream.option()

指定流设置,包括MongoDB部署连接字符串、MongoDB数据库和集合以及检查点目录。

有关写入流配置选项的列表,请参阅流式写入配置选项指南

writeStream.outputMode()
指定Spark连接器如何将流式DataFrame写入流式接收器。要查看所有支持的输出模式列表,请参阅Scala outputMode文档。
writeStream.trigger()

指定Spark连接器将结果写入流接收器的频率。在从配置的DataStreamReader创建的DataStreamWriter对象上调用此方法。

要使用连续处理,将Trigger.Continuous(<time value>)作为参数传递,其中<time value>是Spark连接器异步检查点的频率。如果您传递Trigger类的任何其他静态方法,或者如果没有调用writeStream.trigger(),则Spark连接器使用微批处理。

要查看所有支持的处理策略的列表,请参阅 Scala 触发器文档。

以下代码示例展示了如何使用前面的配置设置将数据流式传输到MongoDB

<streaming DataFrame>.writeStream
.format("mongodb")
.option("spark.mongodb.connection.uri", <mongodb-connection-string>)
.option("spark.mongodb.database", <database-name>)
.option("spark.mongodb.collection", <collection-name>)
.outputMode("append")

有关方法列表的完整信息,请参阅 Scala 结构化流参考。

以下示例展示了如何从CSV 文件中流式传输数据到 MongoDB

  1. 创建一个从 CSV 文件读取的 DataStreamReader 对象。

  2. 要创建一个 DataStreamWriter 对象,请在您使用 DataStreamReader 创建的流 Dataset<Row> 上调用 writeStream() 方法。使用 format() 方法指定 mongodb 作为底层数据格式。

  3. 在DataStreamWriter对象上调用start()方法以启动流。

当连接器从CSV文件读取数据时,它根据您指定的outputMode将数据添加到MongoDB。

// create a local SparkSession
SparkSession spark = SparkSession.builder()
.appName("writeExample")
.master("spark://spark-master:<port>")
.config("spark.jars", "<mongo-spark-connector-JAR-file-name>")
.getOrCreate();
// define a streaming query
DataStreamWriter<Row> dataStreamWriter = spark.readStream()
.format("csv")
.option("header", "true")
.schema("<csv-schema>")
.load("<csv-file-name>")
// manipulate your streaming data
.writeStream()
.format("mongodb")
.option("checkpointLocation", "/tmp/")
.option("forceDeleteTempCheckpointLocation", "true")
.option("spark.mongodb.connection.uri", "<mongodb-connection-string>")
.option("spark.mongodb.database", "<database-name>")
.option("spark.mongodb.collection", "<collection-name>")
.outputMode("append");
// run the query
StreamingQuery query = dataStreamWriter.start();
  1. 创建一个从 CSV 文件读取的 DataStreamReader 对象。

  2. 要创建DataStreamWriter对象,请在使用DataStreamReader创建的流DataFrame上调用writeStream函数。使用format()函数指定底层数据格式为mongodb

  3. 在DataStreamWriter实例上调用start()函数以启动流。

当连接器从CSV文件读取数据时,它根据您指定的outputMode将数据添加到MongoDB。

# create a local SparkSession
spark = SparkSession.builder \
.appName("writeExample") \
.master("spark://spark-master:<port>") \
.config("spark.jars", "<mongo-spark-connector-JAR-file-name>") \
.getOrCreate()
# define a streaming query
dataStreamWriter = (spark.readStream
.format("csv")
.option("header", "true")
.schema(<csv-schema>)
.load(<csv-file-name>)
# manipulate your streaming data
.writeStream
.format("mongodb")
.option("checkpointLocation", "/tmp/pyspark/")
.option("forceDeleteTempCheckpointLocation", "true")
.option("spark.mongodb.connection.uri", <mongodb-connection-string>)
.option("spark.mongodb.database", <database-name>)
.option("spark.mongodb.collection", <collection-name>)
.outputMode("append")
)
# run the query
query = dataStreamWriter.start()
  1. 创建一个从 CSV 文件读取的 DataStreamReader 对象。

  2. 要创建DataStreamWriter对象,请在使用DataStreamReader创建的流DataFrame上调用writeStream方法。使用format()方法指定底层数据格式为mongodb

  3. 在DataStreamWriter实例上调用start()方法以启动流。

当连接器从CSV文件读取数据时,它根据您指定的outputMode将数据添加到MongoDB。

// create a local SparkSession
val spark = SparkSession.builder
.appName("writeExample")
.master("spark://spark-master:<port>")
.config("spark.jars", "<mongo-spark-connector-JAR-file-name>")
.getOrCreate()
// define a streaming query
val dataStreamWriter = spark.readStream
.format("csv")
.option("header", "true")
.schema(<csv-schema>)
.load(<csv-file-name>)
// manipulate your streaming data
.writeStream
.format("mongodb")
.option("checkpointLocation", "/tmp/")
.option("forceDeleteTempCheckpointLocation", "true")
.option("spark.mongodb.connection.uri", <mongodb-connection-string>)
.option("spark.mongodb.database", <database-name>)
.option("spark.mongodb.collection", <collection-name>)
.outputMode("append")
// run the query
val query = dataStreamWriter.start()

有关这些示例中使用的类型的更多信息,请参阅以下Apache Spark API文档

返回

配置