1、读入数据

>>> from pyspark.sql import SparkSession
>>> spark = SparkSession.builder.master("local").appName("dataframe_split").config("spark.some.config.option", "some-value").getOrCreate()
>>> sc = spark.sparkContext
>>> df = spark.read.csv("./demo.csv", inferSchema=True, header=True)
>>> df.show()
+---+-----------+
|gid|      score|
+---+-----------+
| a1|90 80 79 80|
| a2|79 89 45 60|
| a3|57 56 89 75|
+---+-----------+

2、列数据的分割

>>> from pyspark.sql.functions import split, explode, concat, concat_ws
>>> df_split = df.withColumn("s", split(df['score'], " "))
>>> df_split.show()
+---+-----------+----------------+
|gid|      score|               s|
+---+-----------+----------------+
| a1|90 80 79 80|[90, 80, 79, 80]|
| a2|79 89 45 60|[79, 89, 45, 60]|
| a3|57 56 89 75|[57, 56, 89, 75]|
+---+-----------+----------------+

3、列数据的拆分

zipWithIndex:给每个元素生成一个索引 

排序首先基于分区索引,然后是每个分区内的项目顺序.因此,第一个分区中的第一个item索引为0,最后一个分区中的最后一个item的索引最大.当 RDD包含多个分区时此方法需要触发spark作业.

>>> attrs = sc.parallelize(["score_" + str(i) for i in range(numAttrs)]).zipWithIndex().collect()
>>> attrs
[('score_0', 0), ('score_1', 1), ('score_2', 2), ('score_3', 3)]
>>> for name, index in attrs:
...     df_split = df_split.withColumn(name, df_split['s'].getItem(index))

>>> df_split.show()
+---+-----------+----------------+-------+-------+-------+-------+
|gid|      score|               s|score_0|score_1|score_2|score_3|
+---+-----------+----------------+-------+-------+-------+-------+
| a1|90 80 79 80|[90, 80, 79, 80]|     90|     80|     79|     80|
| a2|79 89 45 60|[79, 89, 45, 60]|     79|     89|     45|     60|
| a3|57 56 89 75|[57, 56, 89, 75]|     57|     56|     89|     75|
+---+-----------+----------------+-------+-------+-------+-------+

4、将一行分成多行

>>> df.show()
+---+-----------+
|gid|      score|
+---+-----------+
| a1|90 80 79 80|
| a2|79 89 45 60|
| a3|57 56 89 75|
+---+-----------+
>>> df_explode = df.withColumn("e", explode(split(df['score'], " ")))
>>> df_explode.show()
+---+-----------+---+
|gid|      score|  e|
+---+-----------+---+
| a1|90 80 79 80| 90|
| a1|90 80 79 80| 80|
| a1|90 80 79 80| 79|
| a1|90 80 79 80| 80|
| a2|79 89 45 60| 79|
| a2|79 89 45 60| 89|
| a2|79 89 45 60| 45|
| a2|79 89 45 60| 60|
| a3|57 56 89 75| 57|
| a3|57 56 89 75| 56|
| a3|57 56 89 75| 89|
| a3|57 56 89 75| 75|
+---+-----------+---+

5、列数据的合并

列的合并有两个函数:一个不添加分隔符concat(),一个添加分隔符concat_ws()

concat

>>> df_concat = df_split.withColumn("score_concat", concat(df_split['score_0'], df_split['score_1'], df_split['score_2'], df_split['score_3']))
>>> df_concat.show()
+---+-----------+----------------+-------+-------+-------+-------+------------+
|gid|      score|               s|score_0|score_1|score_2|score_3|score_concat|
+---+-----------+----------------+-------+-------+-------+-------+------------+
| a1|90 80 79 80|[90, 80, 79, 80]|     90|     80|     79|     80|    90807980|
| a2|79 89 45 60|[79, 89, 45, 60]|     79|     89|     45|     60|    79894560|
| a3|57 56 89 75|[57, 56, 89, 75]|     57|     56|     89|     75|    57568975|
+---+-----------+----------------+-------+-------+-------+-------+------------+

caoncat_ws

>>> df_ws = df_split.withColumn("score_concat", concat_ws('-',df_split['score_0'], df_split['score_1'], df_split['score_2'], df_split['score_3']))
>>> df_ws.show()
+---+-----------+----------------+-------+-------+-------+-------+------------+
|gid|      score|               s|score_0|score_1|score_2|score_3|score_concat|
+---+-----------+----------------+-------+-------+-------+-------+------------+
| a1|90 80 79 80|[90, 80, 79, 80]|     90|     80|     79|     80| 90-80-79-80|
| a2|79 89 45 60|[79, 89, 45, 60]|     79|     89|     45|     60| 79-89-45-60|
| a3|57 56 89 75|[57, 56, 89, 75]|     57|     56|     89|     75| 57-56-89-75|
+---+-----------+----------------+-------+-------+-------+-------+------------+

6、多行转多列

pivot: 旋转当前列并执行指定的聚合操作

>>> df = spark.sparkContext.parallelize([[15,399,2],[15,1401,5],[15,1608,4],[15,20,4],[18,100,3],[18,1401,3], [18,399,1]]).toDF(["userID","movieID","rating"])
>>> resultDF = df.groupBy("userID").pivot("movieID").sum("rating").na.fill(-1)
>>> resultDF.show()
+------+---+---+---+----+----+
|userID| 20|100|399|1401|1608|
+------+---+---+---+----+----+
|    18| -1|  3|  1|   3|  -1|
|    15|  4| -1|  2|   5|   4|
+------+---+---+---+----+----+
Last modification:November 23, 2021
如果觉得我的文章对你有用,请随意赞赏