在流模式下写入 MongoDB
要将数据写入 MongoDB,请在您的 Dataset<Row>
对象上调用writeStream()
方法。此方法返回一个 DataStreamWriter
对象,您可以使用该对象指定流写入操作的格式和其他配置设置。
您必须指定以下配置设置才能写入MongoDB
设置 | 描述 |
---|---|
writeStream.format() | 指定底层输出数据源的格式。使用 mongodb 将数据写入MongoDB。 |
writeStream.option() | |
writeStream.outputMode() | 指定如何将流DataFrame的数据写入流接收器。要查看所有支持输出模式的列表,请参阅Java outputMode文档。 |
writeStream.trigger() | 指定Spark连接器将结果写入流接收器的频率。在从配置的 要使用连续处理,将 要查看所有支持的处理策略列表,请参阅Java触发器文档。 |
以下代码示例展示了如何使用前面的配置设置将数据流式传输到MongoDB
<streaming DataFrame>.writeStream() .format("mongodb") .option("spark.mongodb.connection.uri", <mongodb-connection-string>) .option("spark.mongodb.database", <database-name>) .option("spark.mongodb.collection", <collection-name>) .outputMode("append");
关于方法的完整列表,请参阅Java Structured Streaming 参考文档。
要将数据写入MongoDB,请调用您的DataFrame对象的writeStream
函数。此函数返回一个DataStreamWriter
对象,您可以使用它来指定流式写入操作的格式和其他配置设置。
您必须指定以下配置设置才能写入MongoDB
设置 | 描述 |
---|---|
writeStream.format() | 指定底层输出数据源的格式。使用 mongodb 将数据写入MongoDB。 |
writeStream.option() | 指定流设置,包括MongoDB部署连接字符串、MongoDB数据库和集合以及检查点目录。 有关写入流配置选项的列表,请参阅流式写入配置选项指南。 |
writeStream.outputMode() | 指定Spark连接器如何将流式DataFrame写入流式接收器。要查看所有受支持输出模式的列表,请参阅pyspark outputMode 文档。 |
writeStream.trigger() | 指定Spark连接器将结果写入流接收器的频率。在从配置的 要使用连续处理,请使用 要查看所有支持的处理策略列表,请参阅pyspark 触发器文档。 |
以下代码示例展示了如何使用前面的配置设置将数据流式传输到MongoDB
<streaming DataFrame>.writeStream \ .format("mongodb") \ .option("spark.mongodb.connection.uri", <mongodb-connection-string>) \ .option("spark.mongodb.database", <database-name>) \ .option("spark.mongodb.collection", <collection-name>) \ .outputMode("append")
要查看函数的完整列表,请参阅pyspark 结构化流参考。
要将数据写入MongoDB,请在您的DataFrame
对象上调用write
方法。此方法返回一个DataStreamWriter
对象,您可以使用它来指定流式写入操作的格式和其他配置设置。
您必须指定以下配置设置才能写入MongoDB
设置 | 描述 |
---|---|
writeStream.format() | 指定底层输出数据源的格式。使用 mongodb 将数据写入MongoDB。 |
writeStream.option() | 指定流设置,包括MongoDB部署连接字符串、MongoDB数据库和集合以及检查点目录。 有关写入流配置选项的列表,请参阅流式写入配置选项指南。 |
writeStream.outputMode() | 指定Spark连接器如何将流式DataFrame写入流式接收器。要查看所有支持的输出模式列表,请参阅Scala outputMode文档。 |
writeStream.trigger() | 指定Spark连接器将结果写入流接收器的频率。在从配置的 要使用连续处理,将 要查看所有支持的处理策略的列表,请参阅 Scala 触发器文档。 |
以下代码示例展示了如何使用前面的配置设置将数据流式传输到MongoDB
<streaming DataFrame>.writeStream .format("mongodb") .option("spark.mongodb.connection.uri", <mongodb-connection-string>) .option("spark.mongodb.database", <database-name>) .option("spark.mongodb.collection", <collection-name>) .outputMode("append")
有关方法列表的完整信息,请参阅 Scala 结构化流参考。
示例
以下示例展示了如何从CSV 文件中流式传输数据到 MongoDB
创建一个从 CSV 文件读取的
DataStreamReader
对象。要创建一个
DataStreamWriter
对象,请在您使用DataStreamReader
创建的流Dataset<Row>
上调用writeStream()
方法。使用format()
方法指定mongodb
作为底层数据格式。在DataStreamWriter对象上调用
start()
方法以启动流。
当连接器从CSV文件读取数据时,它根据您指定的outputMode
将数据添加到MongoDB。
// create a local SparkSession SparkSession spark = SparkSession.builder() .appName("writeExample") .master("spark://spark-master:<port>") .config("spark.jars", "<mongo-spark-connector-JAR-file-name>") .getOrCreate(); // define a streaming query DataStreamWriter<Row> dataStreamWriter = spark.readStream() .format("csv") .option("header", "true") .schema("<csv-schema>") .load("<csv-file-name>") // manipulate your streaming data .writeStream() .format("mongodb") .option("checkpointLocation", "/tmp/") .option("forceDeleteTempCheckpointLocation", "true") .option("spark.mongodb.connection.uri", "<mongodb-connection-string>") .option("spark.mongodb.database", "<database-name>") .option("spark.mongodb.collection", "<collection-name>") .outputMode("append"); // run the query StreamingQuery query = dataStreamWriter.start();
创建一个从 CSV 文件读取的
DataStreamReader
对象。要创建DataStreamWriter对象,请在使用DataStreamReader创建的流DataFrame上调用
writeStream
函数。使用format()
函数指定底层数据格式为mongodb
。在DataStreamWriter实例上调用
start()
函数以启动流。
当连接器从CSV文件读取数据时,它根据您指定的outputMode
将数据添加到MongoDB。
# create a local SparkSession spark = SparkSession.builder \ .appName("writeExample") \ .master("spark://spark-master:<port>") \ .config("spark.jars", "<mongo-spark-connector-JAR-file-name>") \ .getOrCreate() # define a streaming query dataStreamWriter = (spark.readStream .format("csv") .option("header", "true") .schema(<csv-schema>) .load(<csv-file-name>) # manipulate your streaming data .writeStream .format("mongodb") .option("checkpointLocation", "/tmp/pyspark/") .option("forceDeleteTempCheckpointLocation", "true") .option("spark.mongodb.connection.uri", <mongodb-connection-string>) .option("spark.mongodb.database", <database-name>) .option("spark.mongodb.collection", <collection-name>) .outputMode("append") ) # run the query query = dataStreamWriter.start()
创建一个从 CSV 文件读取的
DataStreamReader
对象。要创建DataStreamWriter对象,请在使用DataStreamReader创建的流DataFrame上调用
writeStream
方法。使用format()
方法指定底层数据格式为mongodb
。在DataStreamWriter实例上调用
start()
方法以启动流。
当连接器从CSV文件读取数据时,它根据您指定的outputMode
将数据添加到MongoDB。
// create a local SparkSession val spark = SparkSession.builder .appName("writeExample") .master("spark://spark-master:<port>") .config("spark.jars", "<mongo-spark-connector-JAR-file-name>") .getOrCreate() // define a streaming query val dataStreamWriter = spark.readStream .format("csv") .option("header", "true") .schema(<csv-schema>) .load(<csv-file-name>) // manipulate your streaming data .writeStream .format("mongodb") .option("checkpointLocation", "/tmp/") .option("forceDeleteTempCheckpointLocation", "true") .option("spark.mongodb.connection.uri", <mongodb-connection-string>) .option("spark.mongodb.database", <database-name>) .option("spark.mongodb.collection", <collection-name>) .outputMode("append") // run the query val query = dataStreamWriter.start()
API文档
有关这些示例中使用的类型的更多信息,请参阅以下Apache Spark API文档