1、读入数据
>>> from pyspark.sql import SparkSession
>>> spark = SparkSession.builder.master("local").appName("dataframe_split").config("spark.some.config.option", "some-value").getOrCreate()
>>> sc = spark.sparkContext
>>> df = spark.read.csv("./demo.csv", inferSchema=True, header=True)
>>> df.show()
+---+-----------+
|gid| score|
+---+-----------+
| a1|90 80 79 80|
| a2|79 89 45 60|
| a3|57 56 89 75|
+---+-----------+
2、列数据的分割
>>> from pyspark.sql.functions import split, explode, concat, concat_ws
>>> df_split = df.withColumn("s", split(df['score'], " "))
>>> df_split.show()
+---+-----------+----------------+
|gid| score| s|
+---+-----------+----------------+
| a1|90 80 79 80|[90, 80, 79, 80]|
| a2|79 89 45 60|[79, 89, 45, 60]|
| a3|57 56 89 75|[57, 56, 89, 75]|
+---+-----------+----------------+
3、列数据的拆分
zipWithIndex:给每个元素生成一个索引
排序首先基于分区索引,然后是每个分区内的项目顺序.因此,第一个分区中的第一个item索引为0,最后一个分区中的最后一个item的索引最大.当 RDD包含多个分区时此方法需要触发spark作业.
>>> attrs = sc.parallelize(["score_" + str(i) for i in range(numAttrs)]).zipWithIndex().collect()
>>> attrs
[('score_0', 0), ('score_1', 1), ('score_2', 2), ('score_3', 3)]
>>> for name, index in attrs:
... df_split = df_split.withColumn(name, df_split['s'].getItem(index))
>>> df_split.show()
+---+-----------+----------------+-------+-------+-------+-------+
|gid| score| s|score_0|score_1|score_2|score_3|
+---+-----------+----------------+-------+-------+-------+-------+
| a1|90 80 79 80|[90, 80, 79, 80]| 90| 80| 79| 80|
| a2|79 89 45 60|[79, 89, 45, 60]| 79| 89| 45| 60|
| a3|57 56 89 75|[57, 56, 89, 75]| 57| 56| 89| 75|
+---+-----------+----------------+-------+-------+-------+-------+
4、将一行分成多行
>>> df.show()
+---+-----------+
|gid| score|
+---+-----------+
| a1|90 80 79 80|
| a2|79 89 45 60|
| a3|57 56 89 75|
+---+-----------+
>>> df_explode = df.withColumn("e", explode(split(df['score'], " ")))
>>> df_explode.show()
+---+-----------+---+
|gid| score| e|
+---+-----------+---+
| a1|90 80 79 80| 90|
| a1|90 80 79 80| 80|
| a1|90 80 79 80| 79|
| a1|90 80 79 80| 80|
| a2|79 89 45 60| 79|
| a2|79 89 45 60| 89|
| a2|79 89 45 60| 45|
| a2|79 89 45 60| 60|
| a3|57 56 89 75| 57|
| a3|57 56 89 75| 56|
| a3|57 56 89 75| 89|
| a3|57 56 89 75| 75|
+---+-----------+---+
5、列数据的合并
列的合并有两个函数:一个不添加分隔符concat(),一个添加分隔符concat_ws()
concat
>>> df_concat = df_split.withColumn("score_concat", concat(df_split['score_0'], df_split['score_1'], df_split['score_2'], df_split['score_3']))
>>> df_concat.show()
+---+-----------+----------------+-------+-------+-------+-------+------------+
|gid| score| s|score_0|score_1|score_2|score_3|score_concat|
+---+-----------+----------------+-------+-------+-------+-------+------------+
| a1|90 80 79 80|[90, 80, 79, 80]| 90| 80| 79| 80| 90807980|
| a2|79 89 45 60|[79, 89, 45, 60]| 79| 89| 45| 60| 79894560|
| a3|57 56 89 75|[57, 56, 89, 75]| 57| 56| 89| 75| 57568975|
+---+-----------+----------------+-------+-------+-------+-------+------------+
caoncat_ws
>>> df_ws = df_split.withColumn("score_concat", concat_ws('-',df_split['score_0'], df_split['score_1'], df_split['score_2'], df_split['score_3']))
>>> df_ws.show()
+---+-----------+----------------+-------+-------+-------+-------+------------+
|gid| score| s|score_0|score_1|score_2|score_3|score_concat|
+---+-----------+----------------+-------+-------+-------+-------+------------+
| a1|90 80 79 80|[90, 80, 79, 80]| 90| 80| 79| 80| 90-80-79-80|
| a2|79 89 45 60|[79, 89, 45, 60]| 79| 89| 45| 60| 79-89-45-60|
| a3|57 56 89 75|[57, 56, 89, 75]| 57| 56| 89| 75| 57-56-89-75|
+---+-----------+----------------+-------+-------+-------+-------+------------+
6、多行转多列
pivot: 旋转当前列并执行指定的聚合操作
>>> df = spark.sparkContext.parallelize([[15,399,2],[15,1401,5],[15,1608,4],[15,20,4],[18,100,3],[18,1401,3], [18,399,1]]).toDF(["userID","movieID","rating"])
>>> resultDF = df.groupBy("userID").pivot("movieID").sum("rating").na.fill(-1)
>>> resultDF.show()
+------+---+---+---+----+----+
|userID| 20|100|399|1401|1608|
+------+---+---+---+----+----+
| 18| -1| 3| 1| 3| -1|
| 15| 4| -1| 2| 5| 4|
+------+---+---+---+----+----+