1 Overview
Spark Streaming is an extension of the core Spark API that enables scalable, high-throughput(高吞吐), fault-tolerant(容错机制) stream processing of live data streams(实时流处理). Data can be ingested from many sources like Kafka, Flume, Kinesis, or TCP sockets, and can be processed using complex algorithms expressed with high-level functions like map, reduce, join and window. Finally, processed data can be pushed out to filesystems, databases, and live dashboards(现场仪表盘). In fact, you can apply Spark’s machine learning and graph processing algorithms on data streams.
Internally, it works as follows. Spark Streaming receives live input data streams and divides the data into batches, which are then processed by the Spark engine to generate the final stream of results in batches.(Spark Streaming在内部的处理机制是,接收实时流的数据,并根据一定的时间间隔拆分成一批批的数据,然后通过Spark Engine处理这些批数据,最终得到处理后的一批批结果数据。)
Spark Streaming provides a high-level abstraction called discretized stream or DStream, which represents a continuous stream of data. DStreams can be created either from input data streams from sources such as Kafka, Flume, and Kinesis, or by applying high-level operations on other DStreams. Internally, a DStream is represented as a sequence of RDDs.see Dstream.
2 A Quick Example
18 // scalastyle:off println
19 package org.apache.spark.examples.streaming
20
21 import org.apache.spark.SparkConf
22 import org.apache.spark.storage.StorageLevel
23 import org.apache.spark.streaming.{Seconds, StreamingContext}
24
25 /**
26 * Counts words in UTF8 encoded, '\n' delimited text received from the network every second.
27 *
28 * Usage: NetworkWordCount <hostname> <port>
29 * <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data.
30 *
31 * To run this on your local machine, you need to first run a Netcat server
32 * `$ nc -lk 9999`
33 * and then run the example
34 * `$ bin/run-example org.apache.spark.examples.streaming.NetworkWordCount localhost 9999`
35 */
36 object NetworkWordCount {
37 def main(args: Array[String]) {
38 if (args.length < 2) {
39 System.err.println("Usage: NetworkWordCount <hostname> <port>")
40 System.exit(1)
41 }
42
43 StreamingExamples.setStreamingLogLevels()
44
45 // Create the context with a 1 second batch size
46 val sparkConf = new SparkConf().setAppName("NetworkWordCount")
47 val ssc = new StreamingContext(sparkConf, Seconds(1))
48
49 // Create a socket stream on target ip:port and count the
50 // words in input stream of \n delimited text (eg. generated by 'nc')
51 // Note that no duplication in storage level only for running locally.
52 // Replication necessary in distributed scenario for fault tolerance.
53 val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
54 val words = lines.flatMap(_.split(" "))
55 val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
56 wordCounts.print()
57 ssc.start()
58 ssc.awaitTermination()
59 }
60 }
61 // scalastyle:on println
[hadoop@NN01 ~]$ nc -lp 9999
Hello world
This is david
stop now
[hadoop@NN01 spark]$ ./bin/run-example streaming.NetworkWordCount NN01.HadoopVM 9999
-------------------------------------------
Time: 1473325234000 ms
-------------------------------------------
(is,1)
(Hello,1)
(This,1)
(david,1)
(world,1)
...
Time: 1473325243000 ms
-------------------------------------------
(stop,1)
(now,1)
3 Basic Concepts
3.1 Linking
Similar to Spark, Spark Streaming is available through Maven Central. To write your own Spark Streaming program, you will have to add the following dependency to your SBT or Maven project.
# maven
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.0.0</version>
</dependency>
# SBT
libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.0.0"
For ingesting data from sources like Kafka, Flume, and Kinesis that are not present in the Spark Streaming core API, you will have to add the corresponding artifact spark-streaming-xyz_2.11 to the dependencies. For example, some of the common ones are as follows.
Source | Artifact |
Kafka | spark-streaming-kafka-0-8_2.11 |
Flume | spark-streaming-flume_2.11 |
Kinesis | spark-streaming-kinesis-asl_2.11 [Amazon Software License] |
For an up-to-date list, please refer to the Maven repository for the full list of supported sources and artifacts.
3.2 Initializing StreamingContext
To initialize a Spark Streaming program, a StreamingContext object(StreamingContext对象) has to be created which is the main entry point of all Spark Streaming functionality.
scala
A StreamingContext object can be created from a SparkConf object.
import org.apache.spark._
import org.apache.spark.streaming._
val conf = new SparkConf().setAppName(appName).setMaster(master)
val ssc = new StreamingContext(conf, Seconds(1))
python
A StreamingContext object can be created from a SparkContext object.
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
sc = SparkContext(master, appName)
ssc = StreamingContext(sc, 1)
The appName parameter is a name for your application to show on the cluster UI. master is a Spark, Mesos or YARN cluster URL, or a special “local[*]” string to run in local mode. In practice, when running on a cluster, you will not want to hardcode master in the program, but rather launch the application with spark-submit and receive it there. However, for local testing and unit tests, you can pass “local[*]” to run Spark Streaming in-process (detects the number of cores in the local system). Note that this internally creates a SparkContext (starting point of all Spark functionality) which can be accessed as ssc.sparkContext.
- The batch interval must be set based on the latency requirements of your application and available cluster resources. See the Performance Tuning section for more details.
- A StreamingContext object can also be created from an existing SparkContext object.
-
After a context is defined, you have to do the following.
- 1.Define the input sources by creating input DStreams.
- 2.Define the streaming computations by applying transformation and output operations to DStreams.
- 3.Start receiving data and processing it using streamingContext.start().
- 4.Wait for the processing to be stopped (manually or due to any error) using streamingContext.awaitTermination().
- 5.The processing can be manually stopped using streamingContext.stop().
Points to remember
- Once a context has been started, no new streaming computations can be set up or added to it.
- Once a context has been stopped, it cannot be restarted.
- Only one StreamingContext can be active in a JVM at the same time.
- stop() on StreamingContext also stops the SparkContext. To stop only the StreamingContext, set the optional parameter of stop() called stopSparkContext to false.
- A SparkContext can be re-used to create multiple StreamingContexts, as long as the previous StreamingContext is stopped (without stopping the SparkContext) before the next StreamingContext is created.
import org.apache.spark.streaming._
val sc = ... // existing SparkContext
val ssc = new StreamingContext(sc, Seconds(1))
3.3 Discretized Streams(DStreams)
Discretized Stream or DStream is the basic abstraction provided by Spark Streaming. It represents a continuous stream of data, either the input data stream received from source, or the processed data stream generated by transforming the input stream. Internally, a DStream is represented by a continuous series of RDDs, which is Spark’s abstraction of an immutable, distributed dataset (see Spark Programming Guide for more details). Each RDD in a DStream contains data from a certain interval, as shown in the following figure.
Any operation applied on a DStream translates to operations on the underlying RDDs. For example, in the earlier example of converting a stream of lines to words, the flatMap operation is applied on each RDD in the lines DStream to generate the RDDs of the words DStream. This is shown in the following figure.
These underlying RDD transformations are computed by the Spark engine. The DStream operations hide most of these details and provide the developer with a higher-level API for convenience. These operations are discussed in detail in later sections.
3.4 Input Dstreams and Receivers
Input DStreams are DStreams representing the stream of input data received from streaming sources. In the quick example, lines was an input DStream as it represented the stream of data received from the netcat server. Every input DStream (except file stream, discussed later in this section) is associated with a Receiver (Scala doc, Java doc) object which receives the data from a source and stores it in Spark’s memory for processing.一个input DStream是一个特殊的DStream,将Spark Streaming连接到一个外部数据源来读取数据。
Spark Streaming provides two categories of built-in streaming sources.
- Basic sources: Sources directly available in the StreamingContext API. Examples: file systems, and socket connections.
- Advanced sources: Sources like Kafka, Flume, Kinesis, etc. are available through extra utility classes. These require linking against extra dependencies as discussed in the linking section.
File Streams
# scala
streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory)
# python
streamingContext.textFileStream(dataDirectory)
Advanced Sources
- Kafka: Spark Streaming 2.0.0 is compatible with Kafka 0.8.2.1. See the Kafka Integration Guide for more details.
- Flume: Spark Streaming 2.0.0 is compatible with Flume 1.6.0. See the Flume Integration Guide for more details.
- Kinesis: Spark Streaming 2.0.0 is compatible with Kinesis Client Library 1.2.1. See the Kinesis Integration Guide for more details.
3.5 Transformations on DStreams
Similar to that of RDDs, transformations allow the data from the input DStream to be modified. DStreams support many of the transformations available on normal Spark RDD’s. Some of the common ones are as follows.
Transformation | Meaning |
map(func) | Return a new DStream by passing each element of the source DStream through a function func. |
flatMap(func) | Similar to map, but each input item can be mapped to 0 or more output items. |
filter(func) | Return a new DStream by selecting only the records of the source DStream on which func returns true. |
repartition(numPartitions) | Changes the level of parallelism in this DStream by creating more or fewer partitions. |
union(otherStream) | Return a new DStream that contains the union of the elements in the source DStream and otherDStream. |
count() | Return a new DStream of single-element RDDs by counting the number of elements in each RDD of the source DStream. |
reduce(func) | Return a new DStream of single-element RDDs by aggregating the elements in each RDD of the source DStream using a function func (which takes two arguments and returns one). The function should be associative and commutative so that it can be computed in parallel. |
countByValue() | When called on a DStream of elements of type K, return a new DStream of (K, Long) pairs where the value of each key is its frequency in each RDD of the source DStream. |
reduceByKey(func, [numTasks]) | When called on a DStream of (K, V) pairs, return a new DStream of (K, V) pairs where the values for each key are aggregated using the given reduce function. Note: By default, this uses Spark’s default number of parallel tasks (2 for local mode, and in cluster mode the number is determined by the config property spark.default.parallelism) to do the grouping. You can pass an optional numTasks argument to set a different number of tasks. |
join(otherStream, [numTasks]) | When called on two DStreams of (K, V) and (K, W) pairs, return a new DStream of (K, (V, W)) pairs with all pairs of elements for each key. |
cogroup(otherStream, [numTasks]) | When called on a DStream of (K, V) and (K, W) pairs, return a new DStream of (K, Seq[V], Seq[W]) tuples. |
transform(func) | Return a new DStream by applying a RDD-to-RDD function to every RDD of the source DStream. This can be used to do arbitrary RDD operations on the DStream. |
updateStateByKey(func) | Return a new “state” DStream where the state for each key is updated by applying the given function on the previous state of the key and the new values for the key. This can be used to maintain arbitrary state data for each key. |
UpdateStateByKey Operation
The updateStateByKey operation allows you to maintain arbitrary state while continuously updating it with new information. To use this, you will have to do two steps.该 updateStateByKey 操作可以让你保持任意状态,同时不断有新的信息进行更新。
- 1.Define the state - The state can be an arbitrary data type.
- 2.Define the state update function - Specify with a function how to update the state using the previous state and the new values from an input stream.
In every batch, Spark will apply the state update function for all existing keys, regardless of whether they have new data in a batch or not. If the update function returns None then the key-value pair will be eliminated.
Let’s illustrate this with an example. Say you want to maintain a running count of each word seen in a text data stream. Here, the running count is the state and it is an integer. We define the update function as:
def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
val newCount = ... // add the new values with the previous running count to get the new count
Some(newCount)
}
# This is applied on a DStream containing words (say, the pairs DStream containing (word, 1) pairs in the earlier example).
val runningCounts = pairs.updateStateByKey[Int](updateFunction _)
def updateFunction(newValues, runningCount):
if runningCount is None:
runningCount = 0
return sum(newValues, runningCount) # add the new values with the previous running count to get the new count
# This is applied on a DStream containing words (say, the pairs DStream containing (word, 1) pairs in the earlier example).
runningCounts = pairs.updateStateByKey(updateFunction)
The update function will be called for each word, with newValues having a sequence of 1’s (from the (word, 1) pairs) and the runningCount having the previous count. For the complete Python code, take a look at the example stateful_network_wordcount.py.
Note that using updateStateByKey requires the checkpoint directory to be configured, which is discussed in detail in the checkpointing section.
Transform Operation
The transform operation (along with its variations like transformWith) allows arbitrary RDD-to-RDD functions to be applied on a DStream. It can be used to apply any RDD operation that is not exposed in the DStream API. For example, the functionality of joining every batch in a data stream with another dataset is not directly exposed in the DStream API. However, you can easily use transform to do this. This enables very powerful possibilities. For example, one can do real-time data cleaning by joining the input data stream with precomputed spam information (maybe generated with Spark as well) and then filtering based on it. 该transform操作(转换操作)连同其其类似的 transformWith操作允许DStream 上应用任意RDD-to-RDD函数。它可以被应用于未在 DStream API 中暴露任何的RDD操作。例如,在每批次的数据流与另一数据集的连接功能不直接暴露在DStream API 中,但可以轻松地使用transform操作来做到这一点,这使得DStream的功能非常强大。例如,你可以通过连接预先计算的垃圾邮件信息的输入数据流(可能也有Spark生成的),然后基于此做实时数据清理的筛选,如下面官方提供的伪代码所示。事实上,也可以在transform方法中使用机器学习和图形计算的算法。
val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...) // RDD containing spam information
val cleanedDStream = wordCounts.transform(rdd => {
rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning
...
})
spamInfoRDD = sc.pickleFile(...) # RDD containing spam information
# join data stream with spam information to do data cleaning
cleanedDStream = wordCounts.transform(lambda rdd: rdd.join(spamInfoRDD).filter(...))
Window Operations
Spark Streaming also provides windowed computations, which allow you to apply transformations over a sliding window of data(通过滑动窗口对数据进行转换). The following figure illustrates this sliding window.
As shown in the figure, every time the window slides over a source DStream, the source RDDs that fall within the window are combined and operated upon to produce the RDDs of the windowed DStream. In this specific case, the operation is applied over the last 3 time units of data, and slides by 2 time units. This shows that any window operation needs to specify two parameters.
- window length - The duration of the window (3 in the figure).
- sliding interval - The interval at which the window operation is performed (2 in the figure).
在Spark Streaming中,数据处理是按批进行的,而数据采集是逐条进行的,因此在Spark Streaming中会先设置好批处理间隔(batch duration),当超过批处理间隔的时候就会把采集到的数据汇总起来成为一批数据交给系统去处理。
对于窗口操作而言,在其窗口内部会有N个批处理数据,批处理数据的大小由窗口间隔(window duration)决定,而窗口间隔指的就是窗口的持续时间,在窗口操作中,只有窗口的长度满足了才会触发批数据的处理。除了窗口的长度,窗口操作还有另一个重要的参数就是滑动间隔(slide duration),它指的是经过多长时间窗口滑动一次形成新的窗口,滑动窗口默认情况下和批次间隔的相同,而窗口间隔一般设置的要比它们两个大。在这里必须注意的一点是滑动间隔和窗口间隔的大小一定得设置为批处理间隔的整数倍。
如批处理间隔示意图所示,批处理间隔是1个时间单位,窗口间隔是3个时间单位,滑动间隔是2个时间单位。对于初始的窗口time 1-time 3,只有窗口间隔满足了才触发数据的处理。这里需要注意的一点是,初始的窗口有可能流入的数据没有撑满,但是随着时间的推进,窗口最终会被撑满。当每个2个时间单位,窗口滑动一次后,会有新的数据流入窗口,这时窗口会移去最早的两个时间单位的数据,而与最新的两个时间单位的数据进行汇总形成新的窗口(time3-time5)。
对于窗口操作,批处理间隔、窗口间隔和滑动间隔是非常重要的三个时间概念,是理解窗口操作的关键所在。
Let’s illustrate the window operations with an example. Say, you want to extend the earlier example by generating word counts over the last 30 seconds of data, every 10 seconds. To do this, we have to apply the reduceByKey operation on the pairs DStream of (word, 1) pairs over the last 30 seconds of data. This is done using the operation reduceByKeyAndWindow.
// Reduce last 30 seconds of data, every 10 seconds
val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10))
# Reduce last 30 seconds of data, every 10 seconds
windowedWordCounts = pairs.reduceByKeyAndWindow(lambda x, y: x + y, lambda x, y: x - y, 30, 10)
Some of the common window operations are as follows. All of these operations take the said two parameters - windowLength and slideInterval.
Transformation | Meaning |
window(windowLength, slideInterval) | Return a new DStream which is computed based on windowed batches of the source DStream. |
countByWindow(windowLength, slideInterval) | Return a sliding window count of elements in the stream. |
reduceByWindow(func, windowLength, slideInterval) | Return a new single-element stream, created by aggregating elements in the stream over a sliding interval using func. The function should be associative and commutative so that it can be computed correctly in parallel. |
reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]) | When called on a DStream of (K, V) pairs, returns a new DStream of (K, V) pairs where the values for each key are aggregated using the given reduce function func over batches in a sliding window. Note: By default, this uses Spark’s default number of parallel tasks (2 for local mode, and in cluster mode the number is determined by the config property spark.default.parallelism) to do the grouping. You can pass an optional numTasks argument to set a different number of tasks. |
reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]) | A more efficient version of the above reduceByKeyAndWindow() where the reduce value of each window is calculated incrementally using the reduce values of the previous window. This is done by reducing the new data that enters the sliding window, and “inverse reducing” the old data that leaves the window. An example would be that of “adding” and “subtracting” counts of keys as the window slides. However, it is applicable only to “invertible reduce functions”, that is, those reduce functions which have a corresponding “inverse reduce” function (taken as parameter invFunc). Like in reduceByKeyAndWindow, the number of reduce tasks is configurable through an optional argument. Note that checkpointing must be enabled for using this operation. |
countByValueAndWindow(windowLength, slideInterval, [numTasks]) | When called on a DStream of (K, V) pairs, returns a new DStream of (K, Long) pairs where the value of each key is its frequency within a sliding window. Like in reduceByKeyAndWindow, the number of reduce tasks is configurable through an optional argument. |
Join Operations
Stream-stream joins
Streams can be very easily joined with other streams.
val stream1: DStream[String, String] = ...
val stream2: DStream[String, String] = ...
val joinedStream = stream1.join(stream2)
stream1 = ...
stream2 = ...
joinedStream = stream1.join(stream2)
Here, in each batch interval, the RDD generated by stream1 will be joined with the RDD generated by stream2. You can also do leftOuterJoin, rightOuterJoin, fullOuterJoin. Furthermore, it is often very useful to do joins over windows of the streams. That is pretty easy as well.
val windowedStream1 = stream1.window(Seconds(20))
val windowedStream2 = stream2.window(Minutes(1))
val joinedStream = windowedStream1.join(windowedStream2)
windowedStream1 = stream1.window(20)
windowedStream2 = stream2.window(60)
joinedStream = windowedStream1.join(windowedStream2)
Stream-dataset joins
val dataset: RDD[String, String] = ...
val windowedStream = stream.window(Seconds(20))...
val joinedStream = windowedStream.transform { rdd => rdd.join(dataset) }
dataset = ... # some RDD
windowedStream = stream.window(20)
joinedStream = windowedStream.transform(lambda rdd: rdd.join(dataset))
Output Operations on DStreams
Spark Streaming允许DStream的数据被输出到外部系统,如数据库或文件系统。由于输出操作实际上使transformation操作后的数据可以通过外部系统被使用,同时输出操作触发所有DStream的transformation操作的实际执行(类似于RDD action)。以下表列出了目前主要的输出操作:
Output Operation | Meaning |
print() | Prints the first ten elements of every batch of data in a DStream on the driver node running the streaming application. This is useful for development and debugging. |
saveAsTextFiles(prefix, [suffix]) | Save this DStream’s contents as text files. The file name at each batch interval is generated based on prefix and suffix: “prefix-TIME_IN_MS[.suffix]”. |
saveAsObjectFiles(prefix, [suffix]) | Save this DStream’s contents as SequenceFiles of serialized Java objects. The file name at each batch interval is generated based on prefix and suffix: “prefix-TIME_IN_MS[.suffix]”. |
saveAsHadoopFiles(prefix, [suffix]) | Save this DStream’s contents as Hadoop files. The file name at each batch interval is generated based on prefix and suffix: “prefix-TIME_IN_MS[.suffix]”. |
foreachRDD(func) | The most generic output operator that applies a function, func, to each RDD generated from the stream. This function should push the data in each RDD to an external system, such as saving the RDD to files, or writing it over the network to a database. Note that the function func is executed in the driver process running the streaming application, and will usually have RDD actions in it that will force the computation of the streaming RDDs. |
Design Patterns for using foreachRDD
dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
// ConnectionPool is a static, lazily initialized pool of connections
val connection = ConnectionPool.getConnection()
partitionOfRecords.foreach(record => connection.send(record))
ConnectionPool.returnConnection(connection) // return to the pool for future reuse
}
}
def sendPartition(iter):
# ConnectionPool is a static, lazily initialized pool of connections
connection = ConnectionPool.getConnection()
for record in iter:
connection.send(record)
# return to the pool for future reuse
ConnectionPool.returnConnection(connection)
dstream.foreachRDD(lambda rdd: rdd.foreachPartition(sendPartition))
通常情况下,创建一个连接对象有时间和资源开销。因此,创建和销毁的每条记录的连接对象可能招致不必要的资源开销,并显著降低系统整体的吞吐量 。一个更好的解决方案是使用rdd.foreachPartition方法创建一个单独的连接对象,然后使用该连接对象输出的所有RDD分区中的数据到外部系统。
这缓解了创建多条记录连接的开销。最后,还可以进一步通过在多个RDDs/ batches上重用连接对象进行优化。一个保持连接对象的静态池可以重用在多个批处理的RDD上将其输出到外部系统,从而进一步降低了开销。
需要注意的是,在静态池中的连接应该按需延迟创建,这样可以更有效地把数据发送到外部系统。另外需要要注意的是:DStreams延迟执行的,就像RDD的操作是由actions触发一样。默认情况下,输出操作会按照它们在Streaming应用程序中定义的顺序一个个执行。
3.6 Accumulators and Broadcast Variables
我们传递给Spark的函数,如map(),或者filter()的判断条件函数,能够利用定义在函数之外的变量,但是集群中的每一个task都会得到变量的一个副本,并且task在对变量进行的更新不会被返回给driver。而Spark的两种共享变量:累加器(accumulator)和广播变量(broadcast variable),在广播和结果聚合这两种常见类型的通信模式上放宽了这种限制。 Spark 共享变量——累加器(accumulator)与广播变量(broadcast variable)
Accumulators累加器
使用累加器可以很简便地对各个worker返回给driver的值进行聚合。累加器最常见的用途之一就是对一个job执行期间发生的事件进行计数。例如,当我们统计输入文件信息时,有时需要统计空白行的数量。下面的程序描述了这个过程。
scala> val file = sc.textFile("README.md")
file: org.apache.spark.rdd.RDD[String] = README.md MapPart12] at textFile at <console>:24
scala> val blankLines = sc.accumulator(0)
warning: there were two deprecation warnings; re-run with on for details
blankLines: org.apache.spark.Accumulator[Int] = 0
scala> val callSigns = file.flatMap(line => {
| if (line == "") {
| blankLines += 1
| }
| line.split(" ")
| })
callSigns: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[13] at flatMap at <console>:28
scala> callSigns.saveAsTextFile("Output.txt")
[Stage 0:> [Stage 0:> [Stage 0:=============================>
scala> println("Blank Lines: " + blankLines.value)
Blank Lines: 2
>>> file = sc.textFile("README.md")
>>> # create Accumulator[Int] initialized to 0
...
>>> blankLines = sc.accumulator(0)
>>> def extractCallSigns(line):
... global blankLines # make the global variable
... if (line == ""):
... blankLines += 1
... return line.split(" ")
...
>>> callSigns = file.flatMap(extractCallSigns)
>>> callSigns.saveAsTextFile("output" + "/callsigns")
[Stage 0:> [Stage 0:> (0 + 2) / 2]
[Stage 0:=============================>
>>> print "Blank Lines: %d" % blankLines.value
Blank Lines: 2
在上面的例子中,我们创建了一个名为blankLines的整型累加器(Accumulator[Int]),初始化为0,然后再每次读到一个空白行的时候blankLines加一。因此,累加器使我们可以用一种更简便的方式,在一个RDD的转换过程中对值进行聚合,而不用额外使用一个filter()或reduce()操作。 需要注意的是,由于Spark的lazy机制,只有在saveAsTestFile这个action算子执行后我们才能得到blankLines的正确结果。
Broadcast Variables 广播变量
Spark的另一种共享变量是广播变量。通常情况下,当一个RDD的很多操作都需要使用driver中定义的变量时,每次操作,driver都要把变量发送给worker节点一次,如果这个变量中的数据很大的话,会产生很高的传输负载,导致执行效率降低。使用广播变量可以使程序高效地将一个很大的只读数据发送给多个worker节点,而且对每个worker节点只需要传输一次,每次操作时executor可以直接获取本地保存的数据副本,不需要多次传输。
Broadcast variables let programmer keep a read-only variable cached on each machine rather than shipping a copy of it with tasks. For example, to give every node a copy of a large input dataset efficiently. Spark also attempts to distribute broadcast variables using efficient broadcast algorithms to reduce communication cost.
3.7 DataFrame and SQL Operations
You can easily use DataFrames and SQL operations on streaming data. You have to create a SparkSession using the SparkContext that the StreamingContext is using. Furthermore this has to done such that it can be restarted on driver failures. This is done by creating a lazily instantiated singleton instance of SparkSession. This is shown in the following example. It modifies the earlier word count example to generate word counts using DataFrames and SQL. Each RDD is converted to a DataFrame, registered as a temporary table and then queried using SQL.
/** DataFrame operations inside your streaming program */
val words: DStream[String] = ...
words.foreachRDD { rdd =>
// Get the singleton instance of SparkSession
val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
import spark.implicits._
// Convert RDD[String] to DataFrame
val wordsDataFrame = rdd.toDF("word")
// Create a temporary view
wordsDataFrame.createOrReplaceTempView("words")
// Do word count on DataFrame using SQL and print it
val wordCountsDataFrame =
spark.sql("select word, count(*) as total from words group by word")
wordCountsDataFrame.show()
}
# Lazily instantiated global instance of SparkSession
def getSparkSessionInstance(sparkConf):
if ('sparkSessionSingletonInstance' not in globals()):
globals()['sparkSessionSingletonInstance'] = SparkSession\
.builder\
.config(conf=sparkConf)\
.getOrCreate()
return globals()['sparkSessionSingletonInstance']
...
# DataFrame operations inside your streaming program
words = ... # DStream of strings
def process(time, rdd):
print("========= %s =========" % str(time))
try:
# Get the singleton instance of SparkSession
spark = getSparkSessionInstance(rdd.context.getConf())
# Convert RDD[String] to RDD[Row] to DataFrame
rowRdd = rdd.map(lambda w: Row(word=w))
wordsDataFrame = spark.createDataFrame(rowRdd)
# Creates a temporary view using the DataFrame
wordsDataFrame.createOrReplaceTempView("words")
# Do word count on table using SQL and print it
wordCountsDataFrame = spark.sql("select word, count(*) as total from words group by word")
wordCountsDataFrame.show()
except:
pass
words.foreachRDD(process)
3.8 MLlib Operations
You can also easily use machine learning algorithms provided by MLlib. First of all, there are streaming machine learning algorithms (e.g. Streaming Linear Regression, Streaming KMeans, etc.) which can simultaneously learn from the streaming data as well as apply the model on the streaming data. Beyond these, for a much larger class of machine learning algorithms, you can learn a learning model offline (i.e. using historical data) and then apply the model online on streaming data. See the MLlib guide for more details.
3.9 Caching / Persistence
Similar to RDDs, DStreams also allow developers to persist the stream’s data in memory. That is, using the persist() method on a DStream will automatically persist every RDD of that DStream in memory. This is useful if the data in the DStream will be computed multiple times (e.g., multiple operations on the same data). For window-based operations like reduceByWindow and reduceByKeyAndWindow and state-based operations like updateStateByKey, this is implicitly true. Hence, DStreams generated by window-based operations are automatically persisted in memory, without the developer calling persist().
For input streams that receive data over the network (such as, Kafka, Flume, sockets, etc.), the default persistence level is set to replicate the data to two nodes for fault-tolerance.
Note that, unlike RDDs, the default persistence level of DStreams keeps the data serialized in memory. This is further discussed in the Performance Tuning section. More information on different persistence levels can be found in the Spark Programming Guide.
3.10 Checkpointing
A streaming application must operate 24/7 and hence must be resilient to failures unrelated to the application logic (e.g., system failures, JVM crashes, etc.). For this to be possible, Spark Streaming needs to checkpoint enough information to a fault- tolerant storage system such that it can recover from failures. There are two types of data that are checkpointed.
Metadata checkpointing - Saving of the information defining the streaming computation to fault-tolerant storage like HDFS. This is used to recover from failure of the node running the driver of the streaming application (discussed in detail later). Metadata includes:
- Configuration - The configuration that was used to create the streaming application.
- DStream operations - The set of DStream operations that define the streaming application.
- Incomplete batches - Batches whose jobs are queued but have not completed yet.
Data checkpointing - Saving of the generated RDDs to reliable storage. This is necessary in some stateful transformations that combine data across multiple batches. In such transformations, the generated RDDs depend on RDDs of previous batches, which causes the length of the dependency chain to keep increasing with time. To avoid such unbounded increases in recovery time (proportional to dependency chain), intermediate RDDs of stateful transformations are periodically checkpointed to reliable storage (e.g. HDFS) to cut off the dependency chains.
To summarize, metadata checkpointing is primarily needed for recovery from driver failures, whereas data or RDD checkpointing is necessary even for basic functioning if stateful transformations are used.
How to configure Checkpointing
Ref: nc input & output
[hadoop@NN01 ~]$ nc -lp 9999
hhh # input
this is a test # input
hello #output
[hadoop@NN01 ~]$ nc 127.0.0.1 9999
hhh
this is a test
hello
参考资料: