Structured Streaming
概述
Structured Streaming
将实时数据视为一张正在不断添加数据的表。
可以把流计算等同于在一个静态表上的批处理查询,进行增量运算。
在无界表上对输入的查询将生成结果表,系统每隔一定的周期会触发对无界表的计算并且更新结果。
两种处理模式
-
1.微批处理模式(默认)
在微批处理之前,将待处理数据的偏移量写入预写日志中。 防止故障宕机等造成数据的丢失,无法恢复。
- 定期检查流数据源
- 对上一批次结束后到达的新数据进行批量查询
- 由于需要写日志,造成延迟。最快响应时间为100毫秒
-
2.持续处理模式
- 毫秒级响应
- 不再根据触发器来周期性启动任务
- 启动一系列的连续的读取、处理等长时间运行的任务
- 异步写日志,不需要等待
Spark Streaming 和Structured Streaming
类别 | Spark | Structured |
---|---|---|
数据源 | DStream,本质上是RDD | DF数据框 |
处理数据 | 只能处理静态数据 | 能够处理数据流 |
实时性 | 秒级响应 | 毫秒级响应 |
编写
1 | # StructuredNetWordCount.py |
启动执行
1 | 启动HDFS |
输入源
-
file源(文件源)
-
Kafka源
-
socket源
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19# spark_ss_kafka_producer.py
import string
import random
import time
from kafka import KafkaProduce
if __name__ == "__main__":
producer = KafkaProducer(bootstrap_servers =['localhost:9092'])
while True:
s2 = (random.choice(string.ascii_lowercase) for _ in range(2)
word = ''.join(s2)
value = bytearray(word,'utf-8')
producer.send("wordcount-topic", value=vlaue).get(timecount=10) # 选定主题发送内容value
time.sleep(0.1)1
2
3
4sudo agt-get install pip3
sudo pip3 install kafka-python
cd /usr/local/spark/mycode/structuredstreaming/kafka/
python3 spark_ss_kafka_producer.py1
2
3
4
5
6
7
8
9
10
11# spark_ss_kafka_consumer.py
from pyspark.sql import SparkSession
if __name__="__main__":
spark = SparkSession.builder.appName("StructuredKafkaWordCount").getOrCount()
spark.sparkContext.setLogLevel("WARN")
# 读取数据源
lines = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "local:9092").option("subcribe", "wordcount-topic").load().selectExpr("CAST(value ASSTRING)")
wordCounts = lines.groupBy("values").count()
输出
-
启动流计算
DF或者Dataset的.writeStream()方法将会返回DataStreamWriter接口,接口通过.start()真正启动流计算,接口的主要参数是:
- format:接收者类型
- outputMode:输出模式
- queryName:查询的名称,可选,用于标识查询的唯一名称
- trigger:触发间隔,可选
-
三种输出模式
append
complete
update
-
输出接收器
系统内置的接收起包含:
- file接收器
- Kafka接收器
- Foreach接收器
- Console接收器
- Memory接收器