Article From:https://www.cnblogs.com/xing901022/p/9125860.html

In recent years, more and more attention has been paid to the computing engine of big data. Spark, as the most popular computing framework for big data, is constantly learning and improving. In Spark2.x, a new DataFrame based no lower limit flow processing component — Structu is newly opened.Red Streaming, it is also the protagonist of this series.

simple introduction

After 1.6 streaming and 2.x streaming development experiences, the use of Structured Streaming has a completely different experience, especially in the code design.

When we used streaming in the past, it is easy to understand that one processing is all the data of the current batch, as long as it is processed for this wave of data. If you want to do some statistics similar to PV UV, you have to rely on the DStream of stateful state.Or some distributed caching systems, such as Redis and Alluxio, can be implemented. What needs to be concerned is to deal with the batch data as fast as possible and to run 7*24 hours as quickly as possible.

You can see that you want to do something similar to Group by, Streaming is very inconvenient. Structured Streaming solves this problem perfectly.

In Structured Streaming, the incoming data is “appended” or “updated” through the fixed mode to the DataFrame with no lower limit. The rest of the work is the same as ordinary DataFrame, you can go to map, fiLter, you can also go to groupby ().Count (). It can even carry out the join of the stream processing dataframe with other “static” DataFrame. In addition, flow processing based on window time is also provided. In a word, StruCtured Streaming provides fast, scalable, high availability, and reliable flow processing.

In the development of big data, Word Count is the basic demonstration example, so here is also an example of the official website.

Take a look at the complete example.

package xingoo.sstreaming

import org.apache.spark.sql.SparkSession

object WordCount {
  def main(args: Array[String]): Unit = {


    val spark = SparkSession
      .builder
      .master("local")
      .appName("StructuredNetworkWordCount")
      .getOrCreate()

    spark.sparkContext.setLogLevel("WARN")

    import spark.implicits._
    // Create DataFrame
    // Create DataFrame representing the stream of input lines from connection to localhost:9999
    val lines = spark.readStream
      .format("socket")
      .option("host", "localhost")
      .option("port", 9999)
      .load()

    // Split the lines into words
    val words = lines.as[String].flatMap(_.split(" "))

    // Generate running word count
    val wordCounts = words.groupBy("value").count()

    // Start running the query that prints the running counts to the console
    // The three models:
    // 1 complete All content is output
    // 2 append   New line output
    // 3 update   Updated line output
    val query = wordCounts.writeStream
      .outputMode("complete")
      .format("console")
      .start()

    query.awaitTermination()
  }
}

The effect is input at the consolenc -lk 9999,Then enter a large number of characters, the console will output the corresponding results:

And then look at the code in detail:

val spark = SparkSession
      .builder
      .master("local")
      .appName("StructuredNetworkWordCount")
      .getOrCreate()

    spark.sparkContext.setLogLevel("WARN")

    import spark.implicits._

There’s no need to explain too much. Create a local sparkSession, set the log level to WARN, otherwise the console is too messy. Then introduce the necessary method of spark SQL (if there is no import spark.implicits._)The basic type is not directly converted to DataFrame.

val lines = spark.readStream
      .format("socket")
      .option("host", "localhost")
      .option("port", 9999)
      .load()

A Socket connected DataStream is created, and the DataFrame of the current batch is obtained through load ().

val words = lines.as[String].flatMap(_.split(" "))
val wordCounts = words.groupBy("value").count()

First turn the DataFrame into a single column of DataSet, then cut each row by space, then do groupby according to value and count the number.

val query = wordCounts.writeStream
      .outputMode("complete")
      .format("console")
      .start()

Invoke the writeStream method of DataFrame, convert it to the output stream, set the mode to “complete”, specify the output object as the console “console”, and then call the start () method to start the calculation. And return to queryStreaMing, control.

Both outputmode and format will be introduced in detail later.

query.awaitTermination()

The main thread is blocked by calling awaitTermination through the object of QueryStreaming. The program can be repeatedly invoked.

Looking at Spark UI, you can see that the program is running steadily.

summary

This is an example of the most basic wordcount. Imagine that without a Structured Streaming, a wordcount that wants to count the global is still hard (even if streaming state is used, not in fact. “It’s so good.

Similar Posts:

Leave a Reply

Your email address will not be published. Required fields are marked *