数据输入源
Spark Streaming
中的数据来源主要是
- 系统文件源
- 套接字流
RDD
对列流- 高级数据源
Kafka
文件流
- 交互式环境下执行
1 | 创建文件存放的目录 |
1 | from pyspark import SparkContext |
-
编写独立的应用程序
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22# vim fileStreaming.py
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
# 如何创建sc对象
conf = SparkConf()
conf.setAppName("TestDStream")
conf.setMaster('local[2]')
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 10) # 每10秒监听;交互式环境下自带sc实例对象
lines = ssc.textFileStream(".../logfile") # 创建文件流,监控目录的全称地址
words = lines.flatMap(lambda line:line.split(' ')) # 通过flatMap操作将数据进行lambda操作,再进行拍平
wordCounts = words.map(lambda x:(x,1)).reduceByKey(lambda a,b: a+b)
wordCounts.pprint() # 在交互式环境下查看
ssc.start() # 启动流计算
ssc.awaitTermination() # 等待流计算结束
# 执行
cd /usr/local/spark/mycode/streaming/logfile
/usr/local/spark/bin/spark-submit fileStreaming.py
套接字流
创建客户端和服务端
tcp编程包含客户端和服务端,通信过程:
- 服务端先进行端口的绑定,再进入监听和阻塞状态,等待来自客户端的连接
- 客户端发送请求,连接到指定的端口号,服务端收到请求,完成通信过程
SparkStreaming扮演的是客户端的角色,不断的发送数据。
1 | 创建文件存放的目录 |
1 | # vim NetworkWordCount.py:扮演的是客户端角色 |
1 | # 使用socket编程实现自定义数据源 |
如何启动
1 | cd /usr/local/spark/mycode/streaming/socket |
RDD队列流
1 | cd /usr/local/spark/mycode/streaming/rddqueue |
1 | # RDDQueueStream.py |
Kafka(Apache)
功能
不同类型的分布式系统(关系数据库、NoSQL数据库、流处理系统等)可以统一接入到Kafka,实现和Hadoop各个组件之间的不同类型数据的实现高效交换
信息传递的枢纽,主要功能是:
-
高吞吐量的分布式发布订阅消息系统
-
同时满足在线实时处理和批量离线处理
组件
- Broker:一个或者多个服务器
- Topic:每条消息发布到
Kafka
集群的消息都有一个类别,这个类别就是Topic
。- 不同的topic消息分开存储
- 用户不必关心数据存放位置,只需要指定消息的topic即可产生或者消费数据
- partition:每个topic分布在一个或者多个分区上
- Producer:生产者,负责发布消息
- Consumer:向Broker读取消息额客户端
- Consumer Group:所属组
Kafka的运行是依赖于Zookeeper
启动Kafka
-
安装
Kafka
注意版本号 -
启动
Kafka
:每个窗口都不能关闭1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18窗口1:启动zookeeper
cd /usr/local/kafka
./bin/zookeeper-server-start.sh config/zookeeper.properites # 不能关闭这个窗口,否则ZK会停止运行
窗口2:启动Kafka
cd /usr/local/kafka
./bin/kafka-server-start.sh config/server.properites
窗口3:测试
cd /usr/local/kafka
./bin/kafka-topics.sh --create --zookeeper localhost:2181 -- replication-factor 1 --partitions 1 --topic wordsendertest # 创建名为wordsendertest的topic
./bin/kafka-topics.sh --list --zookeeper localhost:2181 # 列出所有创建的topic,显示出来则创建成功
./bin/kafka-console-producer.sh --broker-list localhose:9092 --topic wordsendertest # 用Producer来产生数据,测试Kafka是否正常
窗口4:产生消费者
cd /usr/local/kafka
./bin/kafka-console-consumer.sh --zookeeepr localhost:2181 --topic wordsendertest --from-beginning
spark 配置
先下载jar
包:
1 | # 将下载解压后的jar包拷贝到spark的jars目录下 |
修改spark配置文件
1 | cd /usr/local/spark/conf |
kafka数据源
1 | # kafkaWordCount.py |
执行过程
1 | cd /usr/local/spark/mycode/streaming/kafka |