PySpark案例实战
前言介绍
Spark是什么
Apache Spark是用于大规模数据(large-scala data)处理的统一 (unified) 分析引擎。
简单来说,Spark是一款分布式的计算框架,用于调度成百上千的服务器集群,计算TB、PB乃至EB级别的海量数据。
Spark作为全球顶级的分布式计算框架,支持众多的编程语言进行开发
而Python语言,则是Spark重点支持的方向。
Spark对Python语言的支持,重点体现在Python第三方库:PySpark之上。
PySpark是由Spark官方开发的Python语言第三方库Python开发者可以使用pip程序快速的安装PySpark并像其它三方库那样直接使用。
Python应用场景和就业方向是十分丰富的,其中,最为亮点的方向为:
大数据开发和人工智能
基础准备
PySpark库的安装
同其它的Python第三方库一样PySpark同样可以使用pip程序进行安装。
在”CMD”命令提示符程序内,输入pip install pyspark
或使用国内代理镜像网站(清华大学源)
pip install -i https://pypi.tuna.tsinghua.edu.cn/simple pyspark
构建PySpark执行环境入口对象
想要使用PySpark库完成数据处理首先需要构建一个执行环境入口对象PySpark的执行环境入口对象是:类SparkContext的类对象
"""演示获取PySpark的执行环境入库对象: SparkContext并通过SparkContext对象获取当前PySpark的版本"""# 导包from pyspark import SparkConf,SparkContext# 创建SparkConf类对象中conf = SparkConf().setSparkHome("local[*]").setAppName("test_spqrk_app")#基FSparkConf类对象创LSparkContext对象sc = SparkContext(conf = conf)#打印PySpark的运行版本print(sc.version)#停止SparkContext对象的运行(停止PySpark程序)sc.stop()
PySpark的编程模型
SparkContext类对象,是PySpark编程中一切功能的入口。PySpark的编程,主要分为如下三大步骤
通过SparkContext对象,完成数据输入
输入数据后得到RDD对象,对RDD对象进行选代计算
最终通过RDD对象的成员方法,完成数据输出工作
数据输入
只要数据输入到spark就一定是rdd
RDD对象
如图可见,PySpark支持多种数据的输入,在输入完成后,都会得到一个:RDD类的对象
RDD全称为:弹性分布式数据集(Resilient Distributed Datasets)PySpark针对数据的处理,都是以RDD对象作为载体,即:
数据存储在RDD内
各类数据的计算方法,也都是RDD的成员方法
RDD的数据计算方法,返回值依旧是RDD对象
Python数据容器转RDD对象
PySpark支持通过SparkContext对象的parallelize成员方法将
List
Tuple
Set
Dic
tstr
转换为PySpark的RDD对象
注意:
字符串会被拆分出1个个的字符,存入RDD对象
字典仅有key会被存入RDD对象
"""演示通过PySpark代码加载数据,即数据输入"""from pyspark import SparkContext,SparkConf#构建接口对象conf = SparkConf().setSparkHome("local[*]").setAppName("test_rdd")sc = SparkContext(conf=conf) #构建接口对象#构建实验数据list_1 = [1,2,3,4,5]turple_1 = (1,2,3,4)str_1 = ("asdoac")dict_1 = {"key1":"value1","key2":"value2"}set_1 = {1,2,3,4,5}#通过parallelize方法将Python对象加载到Spark内,成为RDD对象rdd1 = sc.parallelize(list_1) #列表转spark的rddrdd2 = sc.parallelize(turple_1) #元组转spark的rddrdd3 = sc.parallelize(str_1) #字符串转spark的rddrdd4 = sc.parallelize(dict_1) #字典转spark的rddrdd5 = sc.parallelize(set_1) #集合转spark的rddprint(rdd1.collect())print(rdd2.collect())print(rdd3.collect())print(rdd4.collect())print(rdd5.collect())sc.stop()
读取文件转RDD对象
PySpark也支持通过SparkContext入口对象来读取文件来构建出RDD对象
"""演示通过PySpark代码加载数据,即数据输入"""from pyspark import SparkContext,SparkConf#构建接口对象conf = SparkConf().setSparkHome("local[*]").setAppName("test_rdd")sc = SparkContext(conf=conf) #构建接口对象#用过textFile方法,读取文件数据加载到Spark内,成为RDD对象rdd = sc.textFile("D:/bill.txt")print(rdd.collect())sc.stop()
总结:
RDD对象是什么” />通过spark的contxt加载数据为rddRDD对象称之为分布式弹性数据集,是PySpark中数据计算的载体它可以:
提供数据存储
提供数据计算的各类方法
数据计算的方法,返回值依旧是RDD(RDD迭代计算)后续对数据进行各类计算,都是基于RDD对象进行
如何输入数据到Spark(即得到RDD对象)
通过SparkContext的parallelize成员方法,将Python数据容器转换为RDD对象
通过SparkContext的textFile成员方法读取文本文件得到RDD对象
数据计算
map方法
PySpark的数据计算,都是基于RDD对象来进行的那么如何进行呢?
自然是依赖RDD对象内置丰富的:成员方法(算子)
map算子
功能:map算子,是将RDD的数据一条条处理( 处理的逻基于ap算子中接收的处理函数 )返回新的RDD
语法:
(T)-> U :表示你传入一个参数T,有传出的东西,类型不限;
(T)-> T :表示的是传入参数T之后,传出的也是和T一样的数据类型
需要添加python.exe的位置
"""演示map算子"""from pyspark import SparkContext,SparkConfimport osos.environ['PYSPARK_PYTHON']="D:\dev\python\python3.10.4\python.exe"#构建接口对象conf = SparkConf().setSparkHome("local[*]").setAppName("test_rdd")sc = SparkContext(conf=conf) #构建接口对象#准备一个rddrdd = sc.parallelize([1,2,3,4,5,6])#使用map方法使得每个都乘10def func(data):return data * 10rdd2 = rdd.map(func) #调用这个func的函数去对参数进行操作print(rdd2.collect())
因为def的函数有些简单就一行,所以可以使用lamba匿名函数来优化
"""演示map算子"""from pyspark import SparkContext,SparkConfimport osos.environ['PYSPARK_PYTHON']="D:\dev\python\python3.10.4\python.exe"#构建接口对象conf = SparkConf().setSparkHome("local[*]").setAppName("test_rdd")sc = SparkContext(conf=conf) #构建接口对象#准备一个rddrdd = sc.parallelize([1,2,3,4,5,6])#使用map方法使得每个都乘10# def func(data):# return data * 10rdd2 = rdd.map(lambda x:x*10) #调用这个func的函数去对参数进行操作print(rdd2.collect())
Map调用之后,乘以10了,返回值依旧是rdd,那么如果还想对数据进行操作的话,那么就可以在后面继续加map+匿名函数(链式调用),但是匿名函数只限于函数语句少的,多的话还是def外部定义
"""演示map算子"""from pyspark import SparkContext,SparkConfimport osos.environ['PYSPARK_PYTHON']="D:\dev\python\python3.10.4\python.exe"#构建接口对象conf = SparkConf().setSparkHome("local[*]").setAppName("test_rdd")sc = SparkContext(conf=conf) #构建接口对象#准备一个rddrdd = sc.parallelize([1,2,3,4,5,6])#使用map方法使得每个都乘10# def func(data):# return data * 10rdd2 = rdd.map(lambda x:x*10).map(lambda x:x+5) #调用这个func的函数去对参数进行操作print(rdd2.collect())
总结:
map算子(成员方法)
接受一个处理函数可用lambda表达式快速编写
对RDD内的元素逐个处理,并返回一个新的RDD
链式调用
对于返回值是新RDD的算子,可以通过链式调用的方式多次调用算子
flatMap方法
flatMap算子
功能:对rdd执行map操作,然后进行 解除嵌套操作
"""演示flatmap算子"""from pyspark import SparkContext,SparkConfimport osos.environ['PYSPARK_PYTHON']="D:\dev\python\python3.10.4\python.exe"#构建接口对象conf = SparkConf().setSparkHome("local[*]").setAppName("test_rdd")sc = SparkContext(conf=conf) #构建接口对象#准备一个rddrdd = sc.parallelize(["master,servant,fate","saber,archer,lacher","basker,rider"])#需求,将RDD数据里面的一个个单词拿出来rdd2 = rdd.map(lambda x:x.split(" "))print(rdd2.collect())
"""演示flatmap算子"""from pyspark import SparkContext,SparkConfimport osos.environ['PYSPARK_PYTHON']="D:\dev\python\python3.10.4\python.exe"#构建接口对象conf = SparkConf().setSparkHome("local[*]").setAppName("test_rdd")sc = SparkContext(conf=conf) #构建接口对象#准备一个rddrdd = sc.parallelize(["master servant fate","saber archer lacher","basker rider"])#需求,将RDD数据里面的一个个单词拿出来rdd2 = rdd.flatMap(lambda x:x.split(" "))print(rdd2.collect())
总结:
flatMap算子
计算逻辑和map一样
可以比map多出,解除一层嵌套的功能
reduceByKey方法
功能: 针对KV型RDD,自动按照key分组,然后根据你提供的聚合逻辑,完成组内数据[value)的聚合操作.
两个传入参数,和返回的类型需要是一致的。
聚合逻辑
注意: reduceByKey中接收的函数 只负责聚合,不理会分组,分组是自动 by key来分组的。
这里的a+b是指代的传入的两个key的value的实现,比如a的value有两个,就是这两个value相加
"""演示reduceBYKey算子"""from pyspark import SparkContext,SparkConfimport osos.environ['PYSPARK_PYTHON']="D:\dev\python\python3.10.4\python.exe"#构建接口对象conf = SparkConf().setSparkHome("local[*]").setAppName("test_rdd")sc = SparkContext(conf=conf) #构建接口对象#准备一个rddrdd = sc.parallelize([('a',1),('a',2),('b',3),('b',4)])rdd2 = rdd.reduceByKey(lambda a,b : a+b)print(rdd2.collect())sc.stop()
总结:
练习案例1
读取文件,对文件内的单词进行计数
#1.构建执行环境入口对象from pyspark import SparkContext,SparkConfimport osos.environ['PYSPARK_PYTHON']="D:\dev\python\python3.10.4\python.exe"#构建接口对象conf = SparkConf().setSparkHome("local[*]").setAppName("test_rdd")sc = SparkContext(conf=conf) #构建接口对象#2.读取数据文件rdd = sc.textFile("D:/hello.txt")#3.取出全部单词world_rdd = rdd.flatMap(lambda a : a.split(" ")).map(lambda x:x.strip())# print(world_rdd.collect())#4.将所有单词都转换成二元元组,单词为Key,value设置为1,有几个就有几个1,相加word_with_one = world_rdd.map(lambda word:(word,1))# print(word_with_one.collect())#5.分组并求和result_rdd = word_with_one.reduceByKey(lambda a,b :a+b )#6.打印输出结果print(result_rdd.collect())
filter方法
功能:过滤想要的数据进行保留
"""演示RDD的filter成员方法的使用"""#1.构建执行环境入口对象from pyspark import SparkContext,SparkConfimport osos.environ['PYSPARK_PYTHON']="D:\dev\python\python3.10.4\python.exe"#构建接口对象conf = SparkConf().setSparkHome("local[*]").setAppName("test_rdd")sc = SparkContext(conf=conf) #构建接口对象#准备一个rddrdd = sc.parallelize([1,2,3,4,5,6,7,8])#对rdd的数据进行过滤rdd2 = rdd.filter(lambda num: num % 2 == 0)print(rdd2.collect())
接受一个处理函数,可用lambda快速编写
函数对RDD数据逐个处理,得到True的保留至返回值的RDD中
distinct方法
功能:对RDD数据进行去重,返回新RDD
"""演示RDD的distint成员方法的使用"""#1.构建执行环境入口对象from pyspark import SparkContext,SparkConfimport osos.environ['PYSPARK_PYTHON']="D:\dev\python\python3.10.4\python.exe"#构建接口对象conf = SparkConf().setSparkHome("local[*]").setAppName("test_rdd")sc = SparkContext(conf=conf) #构建接口对象#准备一个rddrdd = sc.parallelize([1,1,2,3,4,4,4,4,3,4,6,6,6,7])#对rdd的数据进行去重操作rdd2 = rdd.distinct()print(rdd2.collect())
sortBy方法
功能:对RDD数据进行排序基于你指定的排序依据
语法:
func: (T) – U: 告知按照rdd中的哪个数据进行排序,比如 Lambda x: x[1]表示rdd中的第二列进行排序
ascending =True升序 False 降序
numPartitions: 用多少分区排序
#1.构建执行环境入口对象from pyspark import SparkContext,SparkConfimport osos.environ['PYSPARK_PYTHON']="D:\dev\python\python3.10.4\python.exe"#构建接口对象conf = SparkConf().setSparkHome("local[*]").setAppName("test_rdd")sc = SparkContext(conf=conf) #构建接口对象#2.读取数据文件rdd = sc.textFile("D:/hello.txt")#3.取出全部单词world_rdd = rdd.flatMap(lambda a : a.split(" ")).map(lambda x:x.strip())# print(world_rdd.collect())#4.将所有单词都转换成二元元组,单词为Key,value设置为1,有几个就有几个1,相加word_with_one = world_rdd.map(lambda word:(word,1))# print(word_with_one.collect())#5.分组并求和result_rdd = word_with_one.reduceByKey(lambda a,b :a+b )#6.对结果进行排序# print(result_rdd.collect())final_rdd = result_rdd.sortBy(lambda x:x[1],ascending=False,numPartitions=1)print(final_rdd.collect())
接收一个处理函数,可用lambda快速编写
函数表示用来决定排序的依据
可以控制升序或降序
全局排序需要设置分区数为1
练习案例2
多个Json数据串联在一起的;
"""案例需求:各个城市销售额排名,从大到小全部城市,有哪些商品类别在售卖北京市有哪些商品类别在售卖"""from pyspark import SparkContext,SparkConfimport osimport jsonos.environ['PYSPARK_PYTHON']="D:\dev\python\python3.10.4\python.exe"#构建接口对象conf = SparkConf().setSparkHome("local[*]").setAppName("test_rdd")sc = SparkContext(conf=conf) #构建接口对象# TODO 而状1: 城市箭售额接名1.1/歌文件得到RDD# 1.2 取出一个个JSON字符串file_rdd = sc.textFile("D:/orders.txt")json_str_rdd = file_rdd.flatMap(lambda x :x.split("|"))"""map可以将数据一条一条的取出来,因为文本内一行内有多条json数据以为|隔开"""# 1.3 一个个JSON字符审转换为字典dict_rdd = json_str_rdd.map(lambda x :json.loads(x)) #将每条json数据转换为字典# 1.4取出城市和销售额数据,通过lamad函数去让它成为二元元组city_with_money_rdd = dict_rdd.map(lambda x:(x['areaName'],int(x['money']))) #取出城市和销售额数据# 1.5 技城市分组按销售额聚合city_result = city_with_money_rdd.reduceByKey(lambda a,b:a+b)# 1.6 按销售额聚合结果进行排序result_rdd = city_result.sortBy(lambda x:x[1],ascending=False,numPartitions=1)print(f"最后的结果是{result_rdd.collect()}")# TODO 需求2: 全部城市有哪些商品类别在售卖# 2.1 取出全部的商品类别,对全部商品类别进行去重category_rdd = dict_rdd.map(lambda x :x["category"]).distinct()print(f"全部售卖的结果去重后是{category_rdd.collect()}")# TODO 状3: 北京市有哪些商品类别在售灵#3.1 过滤北京市的数据beijing_rdd = dict_rdd.filter(lambda x :x['areaName'] == '北京')# 3.2 取出全部商品类别result3_rdd = beijing_rdd.map(lambda x :x['category']).distinct()print(f"北京的类别有{result3_rdd.collect()}")
数据输出(rdd转换为python数据)
collect算子
功能:将RDD各个分区内的数据,统一收集到Driver中,形成一个List对象
用法:
rdd.collect()
返回值是一个list
之前也使用过去print(rdd.collect())这是将rdd对象转变为一个python的list进行打印。
reduce算子
take算子
功能:取RDD的前N个元素组合成list返回给你
用法:
Take(5)表示取出前5个元素,组装为一个列表返回给我们。
count算子
功能: 计算RDD有多少条数据返回值是一个数字
用法:,
from pyspark import SparkContext,SparkConfimport osimport jsonos.environ['PYSPARK_PYTHON']="D:\dev\python\python3.10.4\python.exe"#构建接口对象conf = SparkConf().setSparkHome("local[*]").setAppName("test_rdd")sc = SparkContext(conf=conf) #构建接口对象rdd = sc.parallelize([1, 2, 3,4, 5])# coLLect算子,输出RDD为List对象# rdd_list = rdd.collect()# print(rdd_list)# print(type(rdd_list))# reduce算子,对RDD进行两两聚合# num = rdd.reduce(lambda a,b:a+b)# print(num)# take算了,取出RDDNN个元素,组成List返回# take_list = rdd.take(3)# print(take_list)# count,统irdd内有多少条数据,返回值为数字num_count = rdd.count()print(num_count)
总结
Spark的编程流程就是:
将数据加载为RDD(数据输入)
对RDD进行计算(数据计算)
将RDD转换为Python对象(数据输出)
数据输出的方法
collect: 将RDD内容转换为list
reduce:对RDD内容进行自定义聚合
take:取出RDD的前N个元素组成list
count:统计RDD元素个数数据输出
可用的方法是很多的,本小节简单的介绍了4个
数据输出之将rdd输出到文件
saveAsTextFile算子
功能:将RDD的数据写入文本文件中
支持本地写出,hdfs等文件系统
这个需要配置haddop依赖,才能正常进行。
准备完成后,进行写入文件,但是发现输出的内容有16个!
from pyspark import SparkContext,SparkConfimport osimport jsonos.environ['PYSPARK_PYTHON']="D:\dev\python\python3.10.4\python.exe"os.environ['HADOOP_HOME'] = "D:/dev/hadoop-3.0.0"#构建接口对象conf = SparkConf().setSparkHome("local[*]").setAppName("test_rdd")sc = SparkContext(conf=conf) #构建接口对象#准备RDD1rdd1 = sc.parallelize([1,2, 3, 4, 5])#淮备RDD2rdd2 = sc.parallelize([("Hello",3),("Spark",5),("Hi",7)])# 准备RDD3rdd3 = sc.parallelize([[1,3,5],[6,7,9],[11,13,11]])#输出到文本中rdd1.saveAsTextFile("D:/python/output1")rdd2.saveAsTextFile("D:/python/output2")rdd3.saveAsTextFile("D:/python/output3")
这是因为rdd是有多少分区,它就输出多少个文件,rdd有16个分区。
修改rdd分区为1个
这样也可以实现!
from pyspark import SparkContext,SparkConfimport osimport jsonos.environ['PYSPARK_PYTHON']="D:\dev\python\python3.10.4\python.exe"os.environ['HADOOP_HOME'] = "D:/dev/hadoop-3.0.0"#构建接口对象conf = SparkConf().setSparkHome("local[*]").setAppName("test_rdd")conf.set("spark.default.parallelism","1")sc = SparkContext(conf=conf) #构建接口对象#准备RDD1rdd1 = sc.parallelize([1,2, 3, 4, 5])#淮备RDD2rdd2 = sc.parallelize([("Hello",3),("Spark",5),("Hi",7)])# 准备RDD3rdd3 = sc.parallelize([[1,3,5],[6,7,9],[11,13,11]])#输出到文本中rdd1.saveAsTextFile("D:/python/output1")rdd2.saveAsTextFile("D:/python/output2")rdd3.saveAsTextFile("D:/python/output3")
总结:
RDD输出到文件的方法
rdd.saveAsTextFile(路径)
输出的结果是一个文件夹
有几个分区就输出多少个结果文件
如何修改RDD分区
SparkConf对象设置conf.set(“spark.default.parallelism”,”1”)
创建RDD的时候,scparallelize方法传入numSlices参数为1
综合案例
案例代码:
from pyspark import SparkContext,SparkConfimport osimport jsonos.environ['PYSPARK_PYTHON']="D:\dev\python\python3.10.4\python.exe"os.environ['HADOOP_HOME'] = "D:/dev/hadoop-3.0.0"#构建接口对象conf = SparkConf().setSparkHome("local[*]").setAppName("test_rdd")conf.set("spark.default.parallelism","1")sc = SparkContext(conf=conf) #构建接口对象#读取文件转换成RDDfile_rdd = sc.textFile("D:/search_log.txt")# TODO 需求1: 些门搜索时问段Top3《小时精度)# 1.1 取出全部的时向并转换为小时#取到小时的方面# 1.2 转换为(小时,1) 的-元元组# 1.3 Key分组聚合VaLue#1.4 持序(降序)#1.5取前3# 先将其数据划分为列表,split划分(map是一条一条的取数据) 第三个map是将数据取出来之后,都成为了一个(时间:1)的二元元组# 然后在用reducebykey去取出来,将对应时间的1依次叠加,然后成为次数#然后sortby去取这个新rdd的二元元组的1下标的值去排序;# file_rdd.map(lambda x : x.split("\t")).\# map(lambda x :x[0][:2]).\# map(lambda x :(x,1)).\# reduceByKey(lambda a,b :a+b).\# sortBy(lambda x:x[1],ascending=False,numPartitions=1).\# take(3)#优化写法:将map重叠为一个result_1 = file_rdd.map(lambda x : (x.split("\t")[0][:2],1)).\reduceByKey(lambda a,b :a+b).\sortBy(lambda x:x[1],ascending=False,numPartitions=1).\take(3)print(f"需求1的结果为{result_1}")# TODO 需求2: 热门搜索词Top3# 2.1 取出全部的搜索词# 2.2 (词,1) 二元元组# 2.3 分组聚合# 2.4 排序# 2.5 Top3#搜索词的下标为2,也成为二元元组去叠加result_2 = file_rdd.map(lambda x:(x.split("\t")[2],1)) .\reduceByKey(lambda a,b:a+b).\sortBy(lambda x:x[1],ascending=False,numPartitions=1).\take(3)print(f"需求2的结果为{result_2}")# TODO 需求3: 计黑马程序员关键字在什么时段数搜索的最多# 3.1 过滤内容,只保留黑马程序员关键词# 3.2 转换为(小时,1) 的-元元组# 3.3 Key分组聚合Value#3.4 排序(降序)#3.5取前1result_3 =file_rdd.map(lambda x:x.split("\t")) .\filter(lambda x:x[2] == "黑马程序员").\map(lambda x:(x[0][:2],1)).\reduceByKey(lambda a,b:a+b).\sortBy(lambda x:x[1],ascending=False,numPartitions=1).\take(1)print(f"需求3的结果为{result_3}")# TODO 需求4: 数据转换为JSON格式,写出到文件中# 4.1 转换为JSON格式的RDD# 4.2写出为文件#最好的python转换json就是将python数据变为字典#就是加一些字段名就可以了file_rdd.map(lambda x:x.split("\t")).\map(lambda x:{"time":x[0],"user_id":x[1],"key_word":x[2],"rank1":x[3],"rank2":x[4],"url":x[5]}).\saveAsTextFile("D:/python/output_json")#这个需要去该一开头的分区 conf.set("spark.default.parallelism","1")
"""演示map算子"""from pyspark import SparkContext,SparkConfimport osos.environ['PYSPARK_PYTHON']="D:\dev\python\python3.10.4\python.exe"#构建接口对象conf = SparkConf().setSparkHome("local[*]").setAppName("test_rdd")sc = SparkContext(conf=conf) #构建接口对象#准备一个rddrdd = sc.parallelize([1,2,3,4,5,6])#使用map方法使得每个都乘10def func(data):return data * 10rdd2 = rdd.map(func) #调用这个func的函数去对参数进行操作print(rdd2.collect())
"""演示map算子"""from pyspark import SparkContext,SparkConfimport osos.environ['PYSPARK_PYTHON']="D:\dev\python\python3.10.4\python.exe"#构建接口对象conf = SparkConf().setSparkHome("local[*]").setAppName("test_rdd")sc = SparkContext(conf=conf) #构建接口对象#准备一个rddrdd = sc.parallelize([1,2,3,4,5,6])#使用map方法使得每个都乘10# def func(data):# return data * 10rdd2 = rdd.map(lambda x:x*10) #调用这个func的函数去对参数进行操作print(rdd2.collect())
"""演示map算子"""from pyspark import SparkContext,SparkConfimport osos.environ['PYSPARK_PYTHON']="D:\dev\python\python3.10.4\python.exe"#构建接口对象conf = SparkConf().setSparkHome("local[*]").setAppName("test_rdd")sc = SparkContext(conf=conf) #构建接口对象#准备一个rddrdd = sc.parallelize([1,2,3,4,5,6])#使用map方法使得每个都乘10# def func(data):# return data * 10rdd2 = rdd.map(lambda x:x*10).map(lambda x:x+5) #调用这个func的函数去对参数进行操作print(rdd2.collect())
"""演示flatmap算子"""from pyspark import SparkContext,SparkConfimport osos.environ['PYSPARK_PYTHON']="D:\dev\python\python3.10.4\python.exe"#构建接口对象conf = SparkConf().setSparkHome("local[*]").setAppName("test_rdd")sc = SparkContext(conf=conf) #构建接口对象#准备一个rddrdd = sc.parallelize(["master,servant,fate","saber,archer,lacher","basker,rider"])#需求,将RDD数据里面的一个个单词拿出来rdd2 = rdd.map(lambda x:x.split(" "))print(rdd2.collect())
"""演示flatmap算子"""from pyspark import SparkContext,SparkConfimport osos.environ['PYSPARK_PYTHON']="D:\dev\python\python3.10.4\python.exe"#构建接口对象conf = SparkConf().setSparkHome("local[*]").setAppName("test_rdd")sc = SparkContext(conf=conf) #构建接口对象#准备一个rddrdd = sc.parallelize(["master servant fate","saber archer lacher","basker rider"])#需求,将RDD数据里面的一个个单词拿出来rdd2 = rdd.flatMap(lambda x:x.split(" "))print(rdd2.collect())
"""演示reduceBYKey算子"""from pyspark import SparkContext,SparkConfimport osos.environ['PYSPARK_PYTHON']="D:\dev\python\python3.10.4\python.exe"#构建接口对象conf = SparkConf().setSparkHome("local[*]").setAppName("test_rdd")sc = SparkContext(conf=conf) #构建接口对象#准备一个rddrdd = sc.parallelize([('a',1),('a',2),('b',3),('b',4)])rdd2 = rdd.reduceByKey(lambda a,b : a+b)print(rdd2.collect())sc.stop()
#1.构建执行环境入口对象from pyspark import SparkContext,SparkConfimport osos.environ['PYSPARK_PYTHON']="D:\dev\python\python3.10.4\python.exe"#构建接口对象conf = SparkConf().setSparkHome("local[*]").setAppName("test_rdd")sc = SparkContext(conf=conf) #构建接口对象#2.读取数据文件rdd = sc.textFile("D:/hello.txt")#3.取出全部单词world_rdd = rdd.flatMap(lambda a : a.split(" ")).map(lambda x:x.strip())# print(world_rdd.collect())#4.将所有单词都转换成二元元组,单词为Key,value设置为1,有几个就有几个1,相加word_with_one = world_rdd.map(lambda word:(word,1))# print(word_with_one.collect())#5.分组并求和result_rdd = word_with_one.reduceByKey(lambda a,b :a+b )#6.打印输出结果print(result_rdd.collect())
"""演示RDD的filter成员方法的使用"""#1.构建执行环境入口对象from pyspark import SparkContext,SparkConfimport osos.environ['PYSPARK_PYTHON']="D:\dev\python\python3.10.4\python.exe"#构建接口对象conf = SparkConf().setSparkHome("local[*]").setAppName("test_rdd")sc = SparkContext(conf=conf) #构建接口对象#准备一个rddrdd = sc.parallelize([1,2,3,4,5,6,7,8])#对rdd的数据进行过滤rdd2 = rdd.filter(lambda num: num % 2 == 0)print(rdd2.collect())
"""演示RDD的distint成员方法的使用"""#1.构建执行环境入口对象from pyspark import SparkContext,SparkConfimport osos.environ['PYSPARK_PYTHON']="D:\dev\python\python3.10.4\python.exe"#构建接口对象conf = SparkConf().setSparkHome("local[*]").setAppName("test_rdd")sc = SparkContext(conf=conf) #构建接口对象#准备一个rddrdd = sc.parallelize([1,1,2,3,4,4,4,4,3,4,6,6,6,7])#对rdd的数据进行去重操作rdd2 = rdd.distinct()print(rdd2.collect())
#1.构建执行环境入口对象from pyspark import SparkContext,SparkConfimport osos.environ['PYSPARK_PYTHON']="D:\dev\python\python3.10.4\python.exe"#构建接口对象conf = SparkConf().setSparkHome("local[*]").setAppName("test_rdd")sc = SparkContext(conf=conf) #构建接口对象#2.读取数据文件rdd = sc.textFile("D:/hello.txt")#3.取出全部单词world_rdd = rdd.flatMap(lambda a : a.split(" ")).map(lambda x:x.strip())# print(world_rdd.collect())#4.将所有单词都转换成二元元组,单词为Key,value设置为1,有几个就有几个1,相加word_with_one = world_rdd.map(lambda word:(word,1))# print(word_with_one.collect())#5.分组并求和result_rdd = word_with_one.reduceByKey(lambda a,b :a+b )#6.对结果进行排序# print(result_rdd.collect())final_rdd = result_rdd.sortBy(lambda x:x[1],ascending=False,numPartitions=1)print(final_rdd.collect())
"""案例需求:各个城市销售额排名,从大到小全部城市,有哪些商品类别在售卖北京市有哪些商品类别在售卖"""from pyspark import SparkContext,SparkConfimport osimport jsonos.environ['PYSPARK_PYTHON']="D:\dev\python\python3.10.4\python.exe"#构建接口对象conf = SparkConf().setSparkHome("local[*]").setAppName("test_rdd")sc = SparkContext(conf=conf) #构建接口对象# TODO 而状1: 城市箭售额接名1.1/歌文件得到RDD# 1.2 取出一个个JSON字符串file_rdd = sc.textFile("D:/orders.txt")json_str_rdd = file_rdd.flatMap(lambda x :x.split("|"))"""map可以将数据一条一条的取出来,因为文本内一行内有多条json数据以为|隔开"""# 1.3 一个个JSON字符审转换为字典dict_rdd = json_str_rdd.map(lambda x :json.loads(x)) #将每条json数据转换为字典# 1.4取出城市和销售额数据,通过lamad函数去让它成为二元元组city_with_money_rdd = dict_rdd.map(lambda x:(x['areaName'],int(x['money']))) #取出城市和销售额数据# 1.5 技城市分组按销售额聚合city_result = city_with_money_rdd.reduceByKey(lambda a,b:a+b)# 1.6 按销售额聚合结果进行排序result_rdd = city_result.sortBy(lambda x:x[1],ascending=False,numPartitions=1)print(f"最后的结果是{result_rdd.collect()}")# TODO 需求2: 全部城市有哪些商品类别在售卖# 2.1 取出全部的商品类别,对全部商品类别进行去重category_rdd = dict_rdd.map(lambda x :x["category"]).distinct()print(f"全部售卖的结果去重后是{category_rdd.collect()}")# TODO 状3: 北京市有哪些商品类别在售灵#3.1 过滤北京市的数据beijing_rdd = dict_rdd.filter(lambda x :x['areaName'] == '北京')# 3.2 取出全部商品类别result3_rdd = beijing_rdd.map(lambda x :x['category']).distinct()print(f"北京的类别有{result3_rdd.collect()}")
from pyspark import SparkContext,SparkConfimport osimport jsonos.environ['PYSPARK_PYTHON']="D:\dev\python\python3.10.4\python.exe"#构建接口对象conf = SparkConf().setSparkHome("local[*]").setAppName("test_rdd")sc = SparkContext(conf=conf) #构建接口对象rdd = sc.parallelize([1, 2, 3,4, 5])# coLLect算子,输出RDD为List对象# rdd_list = rdd.collect()# print(rdd_list)# print(type(rdd_list))# reduce算子,对RDD进行两两聚合# num = rdd.reduce(lambda a,b:a+b)# print(num)# take算了,取出RDDNN个元素,组成List返回# take_list = rdd.take(3)# print(take_list)# count,统irdd内有多少条数据,返回值为数字num_count = rdd.count()print(num_count)
from pyspark import SparkContext,SparkConfimport osimport jsonos.environ['PYSPARK_PYTHON']="D:\dev\python\python3.10.4\python.exe"os.environ['HADOOP_HOME'] = "D:/dev/hadoop-3.0.0"#构建接口对象conf = SparkConf().setSparkHome("local[*]").setAppName("test_rdd")sc = SparkContext(conf=conf) #构建接口对象#准备RDD1rdd1 = sc.parallelize([1,2, 3, 4, 5])#淮备RDD2rdd2 = sc.parallelize([("Hello",3),("Spark",5),("Hi",7)])# 准备RDD3rdd3 = sc.parallelize([[1,3,5],[6,7,9],[11,13,11]])#输出到文本中rdd1.saveAsTextFile("D:/python/output1")rdd2.saveAsTextFile("D:/python/output2")rdd3.saveAsTextFile("D:/python/output3")
from pyspark import SparkContext,SparkConfimport osimport jsonos.environ['PYSPARK_PYTHON']="D:\dev\python\python3.10.4\python.exe"os.environ['HADOOP_HOME'] = "D:/dev/hadoop-3.0.0"#构建接口对象conf = SparkConf().setSparkHome("local[*]").setAppName("test_rdd")conf.set("spark.default.parallelism","1")sc = SparkContext(conf=conf) #构建接口对象#准备RDD1rdd1 = sc.parallelize([1,2, 3, 4, 5])#淮备RDD2rdd2 = sc.parallelize([("Hello",3),("Spark",5),("Hi",7)])# 准备RDD3rdd3 = sc.parallelize([[1,3,5],[6,7,9],[11,13,11]])#输出到文本中rdd1.saveAsTextFile("D:/python/output1")rdd2.saveAsTextFile("D:/python/output2")rdd3.saveAsTextFile("D:/python/output3")
from pyspark import SparkContext,SparkConfimport osimport jsonos.environ['PYSPARK_PYTHON']="D:\dev\python\python3.10.4\python.exe"os.environ['HADOOP_HOME'] = "D:/dev/hadoop-3.0.0"#构建接口对象conf = SparkConf().setSparkHome("local[*]").setAppName("test_rdd")conf.set("spark.default.parallelism","1")sc = SparkContext(conf=conf) #构建接口对象#读取文件转换成RDDfile_rdd = sc.textFile("D:/search_log.txt")# TODO 需求1: 些门搜索时问段Top3《小时精度)# 1.1 取出全部的时向并转换为小时#取到小时的方面# 1.2 转换为(小时,1) 的-元元组# 1.3 Key分组聚合VaLue#1.4 持序(降序)#1.5取前3# 先将其数据划分为列表,split划分(map是一条一条的取数据) 第三个map是将数据取出来之后,都成为了一个(时间:1)的二元元组# 然后在用reducebykey去取出来,将对应时间的1依次叠加,然后成为次数#然后sortby去取这个新rdd的二元元组的1下标的值去排序;# file_rdd.map(lambda x : x.split("\t")).\# map(lambda x :x[0][:2]).\# map(lambda x :(x,1)).\# reduceByKey(lambda a,b :a+b).\# sortBy(lambda x:x[1],ascending=False,numPartitions=1).\# take(3)#优化写法:将map重叠为一个result_1 = file_rdd.map(lambda x : (x.split("\t")[0][:2],1)).\reduceByKey(lambda a,b :a+b).\sortBy(lambda x:x[1],ascending=False,numPartitions=1).\take(3)print(f"需求1的结果为{result_1}")# TODO 需求2: 热门搜索词Top3# 2.1 取出全部的搜索词# 2.2 (词,1) 二元元组# 2.3 分组聚合# 2.4 排序# 2.5 Top3#搜索词的下标为2,也成为二元元组去叠加result_2 = file_rdd.map(lambda x:(x.split("\t")[2],1)) .\reduceByKey(lambda a,b:a+b).\sortBy(lambda x:x[1],ascending=False,numPartitions=1).\take(3)print(f"需求2的结果为{result_2}")# TODO 需求3: 计黑马程序员关键字在什么时段数搜索的最多# 3.1 过滤内容,只保留黑马程序员关键词# 3.2 转换为(小时,1) 的-元元组# 3.3 Key分组聚合Value#3.4 排序(降序)#3.5取前1result_3 =file_rdd.map(lambda x:x.split("\t")) .\filter(lambda x:x[2] == "黑马程序员").\map(lambda x:(x[0][:2],1)).\reduceByKey(lambda a,b:a+b).\sortBy(lambda x:x[1],ascending=False,numPartitions=1).\take(1)print(f"需求3的结果为{result_3}")# TODO 需求4: 数据转换为JSON格式,写出到文件中# 4.1 转换为JSON格式的RDD# 4.2写出为文件#最好的python转换json就是将python数据变为字典#就是加一些字段名就可以了file_rdd.map(lambda x:x.split("\t")).\map(lambda x:{"time":x[0],"user_id":x[1],"key_word":x[2],"rank1":x[3],"rank2":x[4],"url":x[5]}).\saveAsTextFile("D:/python/output_json")#这个需要去该一开头的分区 conf.set("spark.default.parallelism","1")