拆分的数据

有时在进行数据时我们需要把一列数据分割成多列数据,把一个字段值,分割成多个值。本节介绍如何通过spark sql提供的函数来进行数据的分割。

1. 数据拆分概述

数据拆分操作

在进行数据处理时,通常我们需要对数据进行拆分。比如:把一列拆分成多行,多列,把一行拆分成多行,多列等。

在spark-sql中提供了多个函数用来进行数据拆分。

数据拆分的函数

  • split
  • explode
  • postexplode
  • substring

2. 数据的拆分

2.1 通过explode系列函数进行拆分
  • 把一个数组值的列拆分成多行**: explode

通过explode函数可以把一个list类型的值,拆分成多行。

>>> import pyspark.sql.functions as F
>>> list_data = [(1, "abc", ["p", "q", "r"]), (2, "def", ["x", "y", "z"])]
>>> schema = ["id", "col1", "col2"]
>>> 
>>> df = spark.createDataFrame(list_data, schema)
>>> df.show()
+---+----+---------+
| id|col1|     col2|
+---+----+---------+
|  1| abc|[p, q, r]|
|  2| def|[x, y, z]|
+---+----+---------+

>>> df.withColumn("col2", F.explode("col2")).show()
+---+----+----+
| id|col1|col2|
+---+----+----+
|  1| abc|   p|
|  1| abc|   q|
|  1| abc|   r|
|  2| def|   x|
|  2| def|   y|
|  2| def|   z|
+---+----+----+
  • 把一个map值的列拆分成多个列和多行: explode

通过explode函数也可以把一个map值拆分成key,value两个值。若该map中有多个key-value键值对,会对这些key-value值进行全部拆分。

>>> from pyspark.sql import Row
>>> 
>>> rows = [
...     Row(a=1, mapfield={"a": "b"}),
...     Row(a=2, mapfield={"a1": "b1"}),
...     Row(a=3, mapfield={"a2": "b2"}),
...     Row(a=4, mapfield={"a3": "b3", "a4": "b4", "a5": {"key5": "value5"}})
... ]
>>> 
>>> eDF = spark.createDataFrame(rows)
>>> eDF.show(truncate=False)
+---+-----------------------------------------+
|a  |mapfield                                 |
+---+-----------------------------------------+
|1  |[a -> b]                                 |
|2  |[a1 -> b1]                               |
|3  |[a2 -> b2]                               |
|4  |[a3 -> b3, a4 -> b4, a5 -> {key5=value5}]|
+---+-----------------------------------------+

>>> eDF.select("a", F.explode(eDF.mapfield).alias("key", "value")).show()
+---+---+-------------+
|  a|key|        value|
+---+---+-------------+
|  1|  a|            b|
|  2| a1|           b1|
|  3| a2|           b2|
|  4| a3|           b3|
|  4| a4|           b4|
|  4| a5|{key5=value5}|
+---+---+-------------+

可以看到:explode函数先把map值的key-value进行拆分成两列:key,value,若map中有多个key-value值,则会创建新的行,其他列的值保存不变。但注意:该函数只会拆分一层的key-value值,不会对嵌套的map值进行拆分。

  • 对list和map值进行拆分,并且添加一个index号(从0开始)posexplode
pyspark.sql.functions.posexplode(col)

通过该函数可以把list或map值的列拆分成两列,多行。并且会根据值在list或map中的位置添加一列索引值。该索引值从0开始。

>>> eDF = spark.createDataFrame([Row(a=1, intlist=[1,2,3], mapfield={"a": "b", "c": "d"})])
>>> eDF.show()
+---+---------+----------------+
|  a|  intlist|        mapfield|
+---+---------+----------------+
|  1|[1, 2, 3]|[a -> b, c -> d]|
+---+---------+----------------+

# 拆分map值的列,并添加一列pos
>> eDF.select("a","intlist",F.posexplode(eDF.mapfield)).show()
+---+---------+---+---+-----+
|  a|  intlist|pos|key|value|
+---+---------+---+---+-----+
|  1|[1, 2, 3]|  0|  a|    b|
|  1|[1, 2, 3]|  1|  c|    d|
+---+---------+---+---+-----+

# 拆分数组值的列,并添加一列索引值pos
>>> eDF.select("a","mapfield",F.posexplode(eDF.intlist)).show()
+---+----------------+---+---+
|  a|        mapfield|pos|col|
+---+----------------+---+---+
|  1|[a -> b, c -> d]|  0|  1|
|  1|[a -> b, c -> d]|  1|  2|
|  1|[a -> b, c -> d]|  2|  3|
+---+----------------+---+---+
  • **拆分时对空值进行填充:**posexplode_outer(spark-2.3开始)

从spark-2.3开始,提供了一个函数posexplode_outer,该函数的行为和explode一样,不同的是,若是数组或map值中存在None,则会以null替换。

df = spark.createDataFrame(
...     [(1, ["foo", "bar"], {"x": 1.0}), (2, [], {}), (3, None, None)],
...     ("id", "an_array", "a_map")
... )
>>> df.select("id", "an_array", posexplode_outer("a_map")).show()
+---+----------+----+----+-----+
| id|  an_array| pos| key|value|
+---+----------+----+----+-----+
|  1|[foo, bar]|   0|   x|  1.0|
|  2|        []|null|null| null|
|  3|      null|null|null| null|
+---+----------+----+----+-----+
>>> df.select("id", "a_map", posexplode_outer("an_array")).show()
+---+----------+----+----+
| id|     a_map| pos| col|
+---+----------+----+----+
|  1|[x -> 1.0]|   0| foo|
|  1|[x -> 1.0]|   1| bar|
|  2|        []|null|null|
|  3|      null|null|null|
+---+----------+----+----+
2.2 split拆分数据

split函数可以把一个字符串值拆分成一个数组值,而且split函数还支持按正则表达式来进行拆分。

pyspark.sql.functions.split(str, pattern)
>>> df = spark.createDataFrame([('ab12cd',"aaaa")], ['c1','c2'])
>>> df.show()
+------+----+
|    c1|  c2|
+------+----+
|ab12cd|aaaa|
+------+----+

# 以下可以看到,拆分后的值都变成了数组值
>>> df.select('c2', F.split(df.c1, '[0-9]+').alias('r')).show()
+----+--------+
|  c2|       r|
+----+--------+
|aaaa|[ab, cd]|
+----+--------+

>>> df.select('c1', F.split(df.c2, '[0-9]+').alias('r')).show()
+------+------+
|    c1|     r|
+------+------+
|ab12cd|[aaaa]|
+------+------+

3. 总结

本节介绍了如何通过spark-sql对数据进行拆分操作。

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐