? 作者:韩信子@ShowMeAI
? 大数据技术◉技能提升系列:https://www.showmeai.tech/tutorials/84
? 数据分析实战系列:https://www.showmeai.tech/tutorials/40
? 本文地址:https://www.showmeai.tech/article-detail/338
? 声明:版权所有,转载请联系平台与作者并注明出处
? 收藏ShowMeAI查看更多精彩内容
Pandas 是每位数据科学家和 Python 数据分析师都熟悉的工具库,它灵活且强大具备丰富的功能,但在处理大型数据集时,它是非常受限的。
这种情况下,我们会过渡到 PySpark,结合 Spark 生态强大的大数据处理能力,充分利用多机器并行的计算能力,可以加速计算。不过 PySpark 的语法和 Pandas 差异也比较大,很多开发人员会感觉这很让人头大。
在本篇内容中, ShowMeAI 将对最核心的数据处理和分析功能,梳理 PySpark 和 Pandas 相对应的代码片段,以便大家可以无痛地完成 Pandas 到大数据 PySpark 的转换?
大数据处理分析及机器学习建模相关知识,ShowMeAI制作了详细的教程与工具速查手册,大家可以通过如下内容展开学习或者回顾相关知识。
?图解数据分析:从入门到精通系列教程
?图解大数据技术:从入门到精通系列教程
?图解机器学习算法:从入门到精通系列教程
?数据科学工具库速查表 | Spark RDD 速查表
?数据科学工具库速查表 | Spark SQL 速查表
? 导入工具库
在使用具体功能之前,我们需要先导入所需的库:
# pandas vs pyspark,工具库导入import pandas as pdimport pyspark.sql.functions as F
PySpark 所有功能的入口点是 SparkSession 类。通过 SparkSession 实例,您可以创建spark dataframe、应用各种转换、读取和写入文件等,下面是定义 SparkSession的代码模板:
from pyspark.sql import SparkSessionspark = SparkSession\.builder\.appName('SparkByExamples.com')\.getOrCreate()
? 创建 dataframe
在 Pandas 和 PySpark 中,我们最方便的数据承载数据结构都是 dataframe,它们的定义有一些不同,我们来对比一下看看:
? Pandas
columns = ["employee","department","state","salary","age"]data = [("Alain","Sales","Paris",60000,34), ("Ahmed","Sales","Lyon",80000,45), ("Ines","Sales","Nice",55000,30), ("Fatima","Finance","Paris",90000,28), ("Marie","Finance","Nantes",100000,40)]
创建DataFrame
的 Pandas 语法如下:
df = pd.DataFrame(data=data, columns=columns)# 查看头2行df.head(2)
? PySpark
创建DataFrame
的 PySpark 语法如下:
df = spark.createDataFrame(data).toDF(*columns)# 查看头2行df.limit(2).show()
? 指定列类型? Pandas
Pandas 指定字段数据类型的方法如下:
types_dict = { "employee": pd.Series([r[0] for r in data], dtype='str'), "department": pd.Series([r[1] for r in data], dtype='str'), "state": pd.Series([r[2] for r in data], dtype='str'), "salary": pd.Series([r[3] for r in data], dtype='int'), "age": pd.Series([r[4] for r in data], dtype='int')}df = pd.DataFrame(types_dict)
Pandas 可以通过如下代码来检查数据类型:
df.dtypes
? PySpark
PySpark 指定字段数据类型的方法如下:
from pyspark.sql.types import StructType,StructField, StringType, IntegerTypeschema = StructType([ \ StructField("employee",StringType(),True), \ StructField("department",StringType(),True), \ StructField("state",StringType(),True), \ StructField("salary", IntegerType(), True), \ StructField("age", IntegerType(), True) \ ])df = spark.createDataFrame(data=data,schema=schema)
PySpark 可以通过如下代码来检查数据类型:
df.dtypes# 查看数据类型 df.printSchema()
? 读写文件
Pandas 和 PySpark 中的读写文件方式非常相似。 具体语法对比如下:
? Pandas
df = pd.read_csv(path, sep=';', header=True)df.to_csv(path, ';', index=False)
? PySpark
df = spark.read.csv(path, sep=';')df.coalesce(n).write.mode('overwrite').csv(path, sep=';')
注意 ①
PySpark 中可以指定要分区的列:
df.partitionBy("department","state").write.mode('overwrite').csv(path, sep=';')
注意 ②
可以通过上面所有代码行中的 parquet
更改 CSV 来读取和写入不同的格式,例如 parquet 格式
? 数据选择 – 列? Pandas
在 Pandas 中选择某些列是这样完成的:
columns_subset = ['employee', 'salary']df[columns_subset].head()df.loc[:, columns_subset].head()
? PySpark
在 PySpark 中,我们需要使用带有列名列表的 select
方法来进行字段选择:
columns_subset = ['employee', 'salary']df.select(columns_subset).show(5)
? 数据选择 – 行? Pandas
Pandas可以使用 iloc
对行进行筛选:
# 头2行df.iloc[:2].head()
? PySpark
在 Spark 中,可以像这样选择前 n 行:
df.take(2).head()# 或者df.limit(2).head()
注意:使用 spark 时,数据可能分布在不同的计算节点上,因此“第一行”可能会随着运行而变化。
? 条件选择? Pandas
Pandas 中根据特定条件过滤数据/选择数据的语法如下:
# First methodflt = (df['salary'] >= 90_000) & (df['state'] == 'Paris')filtered_df = df[flt]# Second Method: Using query which is generally fasterfiltered_df = df.query('(salary >= 90_000) and (state == "Paris")')# Ortarget_state = "Paris"filtered_df = df.query('(salary >= 90_000) and (state == @target_state)')
? PySpark
在 Spark 中,使用 filter
方法或执行 SQL 进行数据选择。 语法如下:
# 方法1:基于filter进行数据选择filtered_df = df.filter((F.col('salary') >= 90_000) & (F.col('state') == 'Paris'))# 或者filtered_df = df.filter(F.expr('(salary >= 90000) and (state == "Paris")'))# 方法2:基于SQL进行数据选择df.createOrReplaceTempView("people")filtered_df = spark.sql("""SELECT * FROM peopleWHERE (salary >= 90000) and (state == "Paris")""")
? 添加字段? Pandas
在 Pandas 中,有几种添加列的方法:
seniority = [3, 5, 2, 4, 10]# 方法1df['seniority'] = seniority# 方法2df.insert(2, "seniority", seniority, True)
? PySpark
在 PySpark 中有一个特定的方法withColumn
可用于添加列:
seniority = [3, 5, 2, 4, 10]df = df.withColumn('seniority', seniority)
? dataframe拼接? 2个dataframe – pandas
# pandas拼接2个dataframedf_to_add = pd.DataFrame(data=[("Robert","Advertisement","Paris",55000,27)], columns=columns)df = pd.concat([df, df_to_add], ignore_index = True)
? 2个dataframe – PySpark
# PySpark拼接2个dataframedf_to_add = spark.createDataFrame([("Robert","Advertisement","Paris",55000,27)]).toDF(*columns)df = df.union(df_to_add)
? 多个dataframe – pandas
# pandas拼接多个dataframedfs = [df, df1, df2,...,dfn]df = pd.concat(dfs, ignore_index = True)
? 多个dataframe – PySpark
PySpark 中 unionAll
方法只能用来连接两个 dataframe。我们使用 reduce 方法配合unionAll
来完成多个 dataframe 拼接:
# pyspark拼接多个dataframefrom functools import reducefrom pyspark.sql import DataFramedef unionAll(*dfs): return reduce(DataFrame.unionAll, dfs)dfs = [df, df1, df2,...,dfn]df = unionAll(*dfs)
? 简单统计
Pandas 和 PySpark 都提供了为 dataframe 中的每一列进行统计计算的方法,可以轻松对下列统计值进行统计计算:
- 列元素的计数
- 列元素的平均值
- 最大值
- 最小值
- 标准差
- 三个分位数:25%、50% 和 75%
Pandas 和 PySpark 计算这些统计值的方法很类似,如下:
? Pandas & PySpark
df.summary()#或者df.describe()
? 数据分组聚合统计
Pandas 和 PySpark 分组聚合的操作也是非常类似的:
? Pandas
df.groupby('department').agg({'employee': 'count', 'salary':'max', 'age':'mean'})
? PySpark
df.groupBy('department').agg({'employee': 'count', 'salary':'max', 'age':'mean'})
但是,最终显示的结果需要一些调整才能一致。
在 Pandas 中,要分组的列会自动成为索引,如下所示:
要将其作为列恢复,我们需要应用 reset_index
方法:
df.groupby('department').agg({'employee': 'count', 'salary':'max', 'age':'mean'}).reset_index()
在 PySpark 中,列名会在结果dataframe中被重命名,如下所示:
要恢复列名,可以像下面这样使用别名方法:
df.groupBy('department').agg(F.count('employee').alias('employee'), F.max('salary').alias('salary'), F.mean('age').alias('age'))
? 数据转换
在数据处理中,我们经常要进行数据变换,最常见的是要对「字段/列」应用特定转换,在Pandas中我们可以轻松基于apply函数完成,但在PySpark 中我们可以使用udf
(用户定义的函数)封装我们需要完成的变换的Python函数。
例如,我们对salary字段进行处理,如果工资低于 60000,我们需要增加工资 15%,如果超过 60000,我们需要增加 5%。
? Pandas
Pandas 中的语法如下:
df['new_salary'] = df['salary'].apply(lambda x: x*1.15 if x<= 60000 else x*1.05)
? Pyspark
PySpark 中的等价操作下:
from pyspark.sql.types import FloatTypedf.withColumn('new_salary', F.udf(lambda x: x*1.15 if x<= 60000 else x*1.05, FloatType())('salary'))
⚠️ 请注意, udf
方法需要明确指定数据类型(在我们的例子中为 FloatType)
? 总结
本篇内容中, ShowMeAI 给大家总结了Pandas和PySpark对应的功能操作细节,我们可以看到Pandas和PySpark的语法有很多相似之处,但是要注意一些细节差异。
另外,大家还是要基于场景进行合适的工具选择:
- 在处理大型数据集时,使用 PySpark 可以为您提供很大的优势,因为它允许并行计算。
- 如果您正在使用的数据集很小,那么使用Pandas会很快和灵活。
参考资料
- ? 图解数据分析:从入门到精通系列教程:https://www.showmeai.tech/tutorials/33
- ? 图解大数据技术:从入门到精通系列教程:https://www.showmeai.tech/tutorials/84
- ? 图解机器学习算法:从入门到精通系列教程:https://www.showmeai.tech/tutorials/34
- ? 数据科学工具库速查表 | Spark RDD 速查表:https://www.showmeai.tech/article-detail/106
- ? 数据科学工具库速查表 | Spark SQL 速查表:https://www.showmeai.tech/article-detail/107
推荐阅读
- ? 数据分析实战系列 :https://www.showmeai.tech/tutorials/40
- ? 机器学习数据分析实战系列:https://www.showmeai.tech/tutorials/41
- ? 深度学习数据分析实战系列:https://www.showmeai.tech/tutorials/42
- ? TensorFlow数据分析实战系列:https://www.showmeai.tech/tutorials/43
- ? PyTorch数据分析实战系列:https://www.showmeai.tech/tutorials/44
- ? NLP实战数据分析实战系列:https://www.showmeai.tech/tutorials/45
- ? CV实战数据分析实战系列:https://www.showmeai.tech/tutorials/46
- ? AI 面试题库系列:https://www.showmeai.tech/tutorials/48