Apache Beam（The original name Google DataFlow) is the Apache hatching project that Google contributed to the Apache foundation in February 2016, and is considered to be followed by MapReduce, GFS, and BigQuery, and Google is at the big data placeAnother important contribution of the field to open source community. The main goal of Apache Beam is to unify the programming paradigm of batch processing and flow processing, providing a simple, flexible, functional, and expressive SDK for the infinite, disorderly, web-scale data set processing. AThe Pache Beam project focuses on the programming paradigm and interface definition of data processing, which does not involve the implementation of a specific execution engine, and Apache Beam hopes that data processing programs developed based on Beam can be executed on any distributed computing engine. This article mainly introduces ApacThe programming paradigm of he Beam, -Beam Model, and how to easily and flexibly write distributed data processing business logic through Beam SDK, hopes that readers can have a preliminary understanding of the Apache Beam through this article, as well as a distributed data processing system, such as a distributed data processing system.There is a preliminary understanding of how to deal with the disorder of infinite data streams.
Apache BeamBasic architecture
With the development of distributed data processing, new distributed data processing technology has been put forward, and more and more distributed data processing frameworks have emerged in the industry, from the earliest Hadoop MapReduce to Apache Spark, Apache StormAnd more near Apache Flink, Apache Apex and so on. The new distributed processing framework can bring higher performance, more powerful functionality, lower latency, but the cost of switching to a new distributed processing framework is very costly: a new data service needs to be learned.Take care of the framework and rewrite all the business logic. The idea of solving this problem includes two parts. First, it needs a programming paradigm that can unify and standardize the needs of distributed data processing, for example, unified batch processing and flow processing requirements. Second, the distributed data processing task should be able to work in different parts.Users can freely switch the execution engine and execution environment of distributed data processing tasks on a cloth execution engine. Apache Beam was put forward to solve the above problems.
Apache BeamMainly composed of Beam SDK and Beam Runner, Beam SDK defines the API interface for developing the business logic of distributed data processing tasks, and the generated distributed data processing task Pipeline is given to a specific Beam Runner execution engine.. The API interface currently supported by Apache Beam is implemented by Java language, and Python version API is being developed. The underlying execution engine supported by Apache Beam includes Apache Flink, Apache SparkAnd Google Cloud Platform, in addition to Apache Storm, Apache Hadoop, Apache Gearpump and other executive engine support is also under discussion or development. Its basic architecture is shown in the following figure:
Figure 1 Apache Beam architecture diagram
It is important to note that although the Apache Beam community very much hopes that all Beam execution engines can support the functional complete set of Beam SDK, it may not be true in the actual implementation. For example, Runner based on MapReduce is obviously difficult to achieve.Function characteristics related to flow processing. At present, Google DataFlow Cloud is the most comprehensive execution engine for Beam SDK function set. In the open source execution engine, the most comprehensive support is Apache Flink.
Beam ModelIt refers to the programming paradigm of Beam, that is, the design idea behind Beam SDK. Before introducing Beam Model, we will briefly introduce the problem domain and some basic concepts that Beam Model has to deal with.
- Data. The types of data to be processed in distributed data processing can be generally divided into two categories, limited data sets and unlimited data streams. A limited set of data, such as a file in a HDFS, a HBase table, and so on, is characterized by the existence of data in advance, which is generally persisted, and will not suddenly disappear. andAn infinite stream of data, such as the flow of a system log in Kafka, or a twitter stream from the twitter API, and so on. This kind of data is characterized by a dynamic inflow of data, inexhaustible and unenduring. Generally speaking, the design objective of the batch framework is toIn dealing with limited data sets, the design objective of stream processing framework is to deal with unlimited data streams. A finite set of data can be seen as a special case of an infinite data stream, but from the point of view of the data processing logic, there is no difference between the two, for example, the assumption that the micro-blog data contains time stamp and forwarding amount, the user hopes.According to the sum of the per hour forwarding amount, the business logic should be executed at the same time in the limited data set and the infinite data stream, and should not have any effect on the implementation of the business logic because of the difference of the data source.
- Time. Process Time refers to the time when data enters the distributed processing framework, while Event-Time refers to the time of data generation. These two times are usually different, for example, for a stream computing task dealing with micro-blog data, a 2016-06-01-1Micro-blog released by 2:00:00 could not enter 2016-06-01-12:01:30 stream processing system until it was delayed by network transmission. Batch tasks usually carry out full amount of data calculation, less attention to data time attributes, but for stream processing tasks, because of data flowIt is inexhaustible and inexhaustible, unable to carry out a full amount of calculation, usually to calculate the data in a window. For most flow processing tasks, window partition according to time may be the most common requirement.
- Disorder. For data streams handled by stream processing frameworks, the order of arrival of data may not be in strict accordance with the time sequence of Event-Time. If the time window is defined based on Process Time, the order of data arrival is the order of data, so there is no disorder problem. butFor a time window based on the Event Time definition, there may be a case of the arrival of a message from a time to a later message, which is likely to be very common in a distributed data source. In this case, how to determine tardiness data and how to handle late data is usually veryA thorny problem.
Beam ModelThe target data processed is an infinite time sequence data stream, which can be considered as a special case of an infinite sequence of data streams without considering the time sequence or the limited data set. Beam Model summarizes the problems that users need to consider in data processing from the following four dimensions:
- What。How to calculate the data? For example, Sum, Join or training learning model in machine learning. The operator in the Beam SDK is specified by the operator in Pipeline.
- Where。In what range is the data calculated? For example, Process-Time based time windows, Event-Time based time windows, sliding windows, and so on. The window in the BeamSDK is specified by the window in the Pipeline.
- When。When will the calculation result be output? For example, in the 1 hour Event-Time time window, the output of the current window is output every 1 minutes. In Beam SDK, it is specified by Watermark and trigger in Pipeline.
- How。How to deal with late data? For example, the output of the incremental data is calculated by the tardiness data, or the results of the tardiness data and the calculation results in the window are combined to output the full result. It is specified by Accumulation in Beam SDK.
Beam ModelThe four dimensions of “WWWH” are abstracted to form a Beam SDK. When users build data processing business logic based on Beam SDK, a distributed data processing Pipeline can be generated at each step only by calling specific API according to the business requirements according to the four dimensions.And submit it to the specific execution engine. The abstraction of the four dimensions of WWWH only concerns the business logic itself, and has nothing to do with how distributed tasks are executed.
Unlike Apache Flink or Apache Spark, Beam SDK uses the same set of API to represent data sources, output targets and operators, etc. Here are 4 data processing tasks based on Beam SDK, which are read through these four data processing tasks.People can understand how the Beam Mode is a unified and flexible description of batch and flow processing tasks, and these 4 tasks are used to deal with the statistical requirements of the field of mobile games, including:
- User score. Batch tasks, based on limited data sets, to calculate user scores.
- Team scores per hour. Batch tasks, based on limited data sets, are used to count the scores of each team per hour.
- Ranking List. Stream processing tasks, 2 statistical items, the scores per hour per team and the total score of users’ real-time history.
- Game state. Stream processing tasks, statistics of each team’s score per hour, and more complex hourly statistics, such as the online time per hour per hour.
Note: the sample code comes from the source code of Beam, and the specific address is: apache/incubator-beam. Some of the contents of the analysis refer to the official documents of Beam. Please refer to the reference link for details.
The following four dimensions of “WWWH”, based on Beam Model, analyze business logic and show how to implement the business logic of the four dimensions of “WWWH” through Beam SDK through the code.
It is a very simple task to count the total scores of each user’s history, where we simply implement a batch task, and each time a new user fraction is needed, the batch task can be re executed. For the user score task, the “WWWH” four-dimensional analysis results are as follows.:
Through the analysis of “WWWH”, for user batch, this batch processing code is implemented through Beam Java SDK as follows:
gameEvents [... input ...] [... parse ...] .apply("ExtractUserScore", new ExtractAndSumScore("user")) [... output ...];
ExtractAndSumScoreThe logic described in “What” is implemented, that is, grouping users according to their users, and then accumulating fractions.
gameInfo .apply(MapElements .via((GameActionInfo gInfo) -> KV.of(gInfo.getKey(field), gInfo.getScore())) .withOutputType( TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.integers()))) .apply(Sum.<String>integersPerKey());
MapElements determines that Key and Value are users and fractions respectively. Then Sum defines grouping by key and accumulates fractions. Beam supports the integration of multiple operations on data into one operation, which not only supports clearer business logic implementations, but also enablesThe combined operation logic is reused in multiple places.
Hourly team score
According to the score of each team in the hour, the team that gets the highest score may be rewarded. This analysis task adds to the requirements for the window, but we can still do it through a batch task, for the analysis of the four dimensions of the “WWWH” of the task as follows:
Relative to the first user fractional task, it only answered in Where part: “what range of data is calculated?” At the same time, in the What part, “how to calculate data?” The group condition is changed from user to team, which is also reflected in the code.
gameEvents [... input ...] [... parse ...] .apply("AddEventTimestamps", WithTimestamps.of((GameActionInfo i) -> new Instant(i.getTimestamp()))) .apply("FixedWindowsTeam", Window.<GameActionInfo>into( FixedWindows.of(Duration.standardMinutes(windowDuration)))) .apply("ExtractTeamScore", new ExtractAndSumScore("team")) [... output ...];
“AddEventTimestamps”Defining how to extract EventTime data from the original data, “FixedWindowsTeam” defines a 1 hour fixed window, and then reuses the ExtractAndSumScore class, just changing the group column from the user to the team. For each smallIn the team score task, new business logic is introduced about the “Where” part window definition, but from the code you can see that the implementation of the “Where” part and the implementation of the “What” part are completely independent, and the user needs only two lines of code on “Where”.It’s very simple and clear.
The first two tasks are all batch tasks based on a finite set of data. For the rankings, we also need to count the user scores and the hourly team scores, but what we want from the business point of view is real time data. For Apache Beam, a batch processing with the same processing logic.The only difference between the task and the stream processing task is the input and output of the task, and the middle business logic Pipeline needs no change. For the current example of the rankings data analysis task, we not only want them to meet the same business logic as the previous two examples, but also meet more customized business needs.For example, for example:
- A very important feature of flow processing tasks relative to batch tasks is that the flow processing task can return the results more realtime, for example, when calculating the team scores per hour, for an hour’s time window, the default is to output the final results after all the one hour’s data arrives, but flow processing.The system should also support the output of some of the results when only part of the data arrive at the one hour window, so that the user can get the results of real-time analysis.
- Ensure the correctness of the calculation results consistent with batch tasks. Because of the existence of disorder data, how do we determine whether all data arrive at a certain computing window (Watermark)? How to deal with late data? How can output be processed, total, incremental and juxtaposition? The flow processing system should provide the machineThe system ensures that the user can satisfy the low delay performance and achieve the final correctness of the calculation results.
The above two questions define user’s data analysis needs by answering the two questions of “When” and “How”. “When” depends on how often users want to get calculated results. When answering “When”, it can basically be divided into four stages.
- Early。Before the end of the window, determine when to output intermediate state data.
- On-Time。At the end of the window, the output window data is calculated. Due to the existence of chaotic sequence data, how to judge the end of the window may be estimated by the user based on additional knowledge, and allows the data that is late for the window after the end of the user’s set window.
- Late。At the end of the window, there is a late arrival of data at this stage, when the output result is calculated.
- Final。It can tolerate the maximum delay, for example, 1 hours. After reaching the final waiting time, the final calculation result is output, and the later tardiness data is no longer accepted, and the state data of the window is cleaned.
For the flow processing task per hour team score, the business logic that this example hopes is, based on the 1 hour window of the Event Time, the score is calculated by the team, the current team score is output every 5 minutes in one hour window, and the output is present every 10 minutes for the late data.The team score, the 2 hours after the end of the window is generally not possible to arrive late, and if it appears, discard it directly. “WWWH” is expressed as follows:
In the implementation of Beam SDK, the business logic represented by users based on “WWWH” Beam Model can be implemented independently and directly.
gameEvents [... input ...] .apply("LeaderboardTeamFixedWindows", Window .<GameActionInfo>into(FixedWindows.of( Duration.standardMinutes(Durations.minutes(60)))) .triggering(AfterWatermark.pastEndOfWindow() .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane() .plusDelayOf(Durations.minutes(5))) .withLateFirings(AfterProcessingTime.pastFirstElementInPane() .plusDelayOf(Durations.minutes(10)))) .withAllowedLateness(Duration.standardMinutes(120) .accumulatingFiredPanes()) .apply("ExtractTeamScore", new ExtractAndSumScore("team")) [... output ...]
LeaderboardTeamFixedWindowsCorresponding to the “Where” definition window, Trigger defines the result output conditions for “Where”, Accumulation defines the output results corresponding to “How”, and ExtractTeamScore defines the computing logic for “What”.
Apache BeamThe Beam Model has a very elegant abstraction of the data processing of an infinite sequence of data streams. The description of the data processing by the four dimensions of “WWWH” is very clear and reasonable. Beam Model is also clear at the same time that the processing mode of the infinite data stream and the finite data set is unified.The programming paradigm for data processing methods for infinite data flow has expanded the business scope that the flow processing system can apply, such as the support of the Event-Time/Session window, the processing support of the disordered data, and so on. Apache Flink, Apache SparkThe API design of Streaming and other projects are more and more used for reference or reference to Apache Beam Model, and as the implementation of Beam Runner, the compatibility with Beam SDK is also getting higher and higher. This article mainly introduces the Beam MoDel, and how to design realistic data processing tasks based on Beam Model, hoping to give readers a preliminary understanding of Apache Beam projects. Because Apache Beam has entered Apache Incubator incubation.Therefore, readers can learn more about the progress and status of Apache Beam through the official website or mail group.