Fork me on GitHub

Spark笔记15-Spark数据源及操作

数据输入源

Spark Streaming中的数据来源主要是

  • 系统文件源
  • 套接字流
  • RDD对列流
  • 高级数据源Kafka

文件流

  • 交互式环境下执行
1
2
3
4
5
6
# 创建文件存放的目录
cd /usr/loca/spark/mycode
mkdir streaming
cd streaming
mkdir logfile
cd logfile # 对这个子目录进行数据监控
1
2
3
4
5
6
7
8
9
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
ssc = StreamingContext(sc, 10) # 每10秒监听;交互式环境下自带sc实例对象
lines = ssc.textFileStream(".../logfile") # 创建文件流,监控目录的全称地址
words = lines.flatMap(lambda line:line.split(' ')) # 通过flatMap操作将数据进行lambda操作,再进行拍平
wordCounts = words.map(lambda x:(x,1)).reduceByKey(lambda a,b: a+b)
wordCounts.pprint() # 在交互式环境下查看
ssc.start() # 启动流计算
ssc.awaitTermination() # 等待流计算结束
  • 编写独立的应用程序

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    # vim fileStreaming.py

    from pyspark import SparkContext
    from pyspark.streaming import StreamingContext

    # 如何创建sc对象
    conf = SparkConf()
    conf.setAppName("TestDStream")
    conf.setMaster('local[2]')
    sc = SparkContext(conf=conf)

    ssc = StreamingContext(sc, 10) # 每10秒监听;交互式环境下自带sc实例对象
    lines = ssc.textFileStream(".../logfile") # 创建文件流,监控目录的全称地址
    words = lines.flatMap(lambda line:line.split(' ')) # 通过flatMap操作将数据进行lambda操作,再进行拍平
    wordCounts = words.map(lambda x:(x,1)).reduceByKey(lambda a,b: a+b)
    wordCounts.pprint() # 在交互式环境下查看
    ssc.start() # 启动流计算
    ssc.awaitTermination() # 等待流计算结束

    # 执行
    cd /usr/local/spark/mycode/streaming/logfile
    /usr/local/spark/bin/spark-submit fileStreaming.py

套接字流

创建客户端和服务端

tcp编程包含客户端和服务端,通信过程:

  • 服务端先进行端口的绑定,再进入监听和阻塞状态,等待来自客户端的连接
  • 客户端发送请求,连接到指定的端口号,服务端收到请求,完成通信过程

KqufOK.png

SparkStreaming扮演的是客户端的角色,不断的发送数据。

1
2
3
4
5
6
# 创建文件存放的目录
cd /usr/loca/spark/mycode
mkdir streaming
cd streaming
mkdir socket
cd socket
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# vim NetworkWordCount.py:扮演的是客户端角色

from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

if __name__ == "__main__":
if len(sys.argv) != 3: # 第一个参数默认是self
print("Usage: NetworkWordCount.py<hostname><port>", file=sys.stderr)
exit(-1) # 参数长度不够,自动退出
sc = SparkContext(appName="pythonStreamingNetworkWordCount")
ssc = StreamingContext(sc, 1) # 流计算的指挥官
lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2])) # 定义套接字类型的输入源
counts = lines.flatMap(lambda line:line.split(" ").map(lambda word:(word,1)).reduceByKey(lambda a,b: a+b)

counts.pprint()
ssc.start()
ssc.awaitTermination()


# 服务端的角色
# 在linux中:nc -lk 9999
cd /usr/local/spark/mycode/streaming/socket
/usr/local/spark/bin/spark-submit NetworkWordCount.py localhost 9999
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 使用socket编程实现自定义数据源
# DataSourceSocket.py

import socket
server = socket.socket() # 生成对象
server.bind("localhose", 9999) # 设置监听的机器和端口号
server.listen(1)
while 1:
conn,addr = server.accept() # 使用两个值进行接受
print("connect success! connection is from %s" %addr[0])
print("sending data....")
conn.send("I love hadoop I love spark hadoop is good spark is fast".encode()) # 打印正在传输的数据
conn.close()
print("connection is broken.")
如何启动
1
2
3
4
5
6
cd /usr/local/spark/mycode/streaming/socket
/usr/local/spark/bin/spark-submit DataSourceSocket.py

# 启动客户端
cd /usr/local/spark/mycode/streaming/socket
/usr/local/spark/bin/spark-submit NetworkWordCount.py localhost 9999

RDD队列流

1
2
cd /usr/local/spark/mycode/streaming/rddqueue
/usr/local/spark/bin/spark-submit RDDQueueStream.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# RDDQueueStream.py

import time
from pyspark import SparkContext
from pyspark.Streaming import StreamingContext

if __name__ == "__main__":
sc = sparkContext(appName="pythonStreamingQueueStream")
ssc = StreamingContext(sc, 2) # 数据流指挥官的生成
rddQueue = []
for i in range(5):
rddQueue += [ssc.sparkContext.parallelize[j for j in range(1,1001)], 10] #. 创建RDD队列流
time.sleep(1)

inputStream = ssc.queueStream(rddQueue)
mappedStream = inputStream.map(lambda x:(x %10, 1))
reduceStream = mappedStream.reduceByKey(lambda a,b: a + b)
reduceStream.pprint()
ssc.start()
ssc.stop(stopSparkContext=True, stopGraceFully=True)

Kafka(Apache)

功能

不同类型的分布式系统(关系数据库、NoSQL数据库、流处理系统等)可以统一接入到Kafka,实现和Hadoop各个组件之间的不同类型数据的实现高效交换

信息传递的枢纽,主要功能是:

  • 高吞吐量的分布式发布订阅消息系统

  • 同时满足在线实时处理和批量离线处理

组件
  • Broker:一个或者多个服务器
  • Topic:每条消息发布到Kafka集群的消息都有一个类别,这个类别就是Topic
    • 不同的topic消息分开存储
    • 用户不必关心数据存放位置,只需要指定消息的topic即可产生或者消费数据
  • partition:每个topic分布在一个或者多个分区上
  • Producer:生产者,负责发布消息
  • Consumer:向Broker读取消息额客户端
  • Consumer Group:所属组

KqrXwD.png

Kafka的运行是依赖于Zookeeper

启动Kafka
  • 安装Kafka注意版本号

  • 启动Kafka:每个窗口都不能关闭

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    # 窗口1:启动zookeeper
    cd /usr/local/kafka
    ./bin/zookeeper-server-start.sh config/zookeeper.properites # 不能关闭这个窗口,否则ZK会停止运行

    # 窗口2:启动Kafka
    cd /usr/local/kafka
    ./bin/kafka-server-start.sh config/server.properites

    # 窗口3:测试
    cd /usr/local/kafka
    ./bin/kafka-topics.sh --create --zookeeper localhost:2181 -- replication-factor 1 --partitions 1 --topic wordsendertest # 创建名为wordsendertest的topic

    ./bin/kafka-topics.sh --list --zookeeper localhost:2181 # 列出所有创建的topic,显示出来则创建成功
    ./bin/kafka-console-producer.sh --broker-list localhose:9092 --topic wordsendertest # 用Producer来产生数据,测试Kafka是否正常

    # 窗口4:产生消费者
    cd /usr/local/kafka
    ./bin/kafka-console-consumer.sh --zookeeepr localhost:2181 --topic wordsendertest --from-beginning

spark 配置

先下载jar包:

Kq2GWQ.png

1
2
3
4
5
6
7
8
9
# 将下载解压后的jar包拷贝到spark的jars目录下
cd /usr/local/spark/jars
mkdir kafka
cd ~
cp ./spark-streaming-kafka-0.8_2.11-2.4.0.jar /usr/local/spark/jars/kafka

# 将Kafka安装目录下的libs目录下的所有文件复制到spark的jars目录下
cd /usr/local/kafka/libs
cp ./* /usr/local/spark/jars/kafka # 进入libs目录后,将当权目录下的所有文件进行拷贝
修改spark配置文件
1
2
cd /usr/local/spark/conf
vim spark-env.sh

KqWRIK.png

kafka数据源

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# kafkaWordCount.py

from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

if __name__ == "__main__":
if len(sys.argv) != 3: # 第一个参数默认是self
print("Usage: kafkaWordCount.py<hostname><port>", file=sys.stderr)
exit(-1) # 参数长度不够,自动退出
sc = SparkContext(appName="pythonStreamingKafkaWordCount")
ssc = StreamingContext(sc, 1) # 流计算的指挥官

zkQuorum,topic = sys.argv[1:]
kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic:1}) # 建立数据源

lines = kvs.map(lambda x:x[1])
counts = lines.flatMap(lambda line:line.split(" ").map(lambda word:(word,1)).reduceByKey(lambda a,b: a+b) # 第二个 map 函数的作用是形成键值对,因为 reduceByKeyd 的参数必须是键值对

counts.pprint()
ssc.start()
ssc.awaitTermination()

执行过程

1
2
cd /usr/local/spark/mycode/streaming/kafka
/usr/local/spark/bin/spark-submit ./kafkaWordCount.py locaohost:2181 wordsendertest # 2181是ZK服务器的地址,wordsendertest是topic的名称

本文标题:Spark笔记15-Spark数据源及操作

发布时间:2019年11月02日 - 15:11

原始链接:http://www.renpeter.cn/2019/11/02/Spark%E7%AC%94%E8%AE%B015-Spark%E6%95%B0%E6%8D%AE%E6%BA%90%E5%8F%8A%E6%93%8D%E4%BD%9C.html

许可协议: 署名-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。

Coffee or Tea