PySpark 杂记


1 构造 ARRAY 格式数据
from pyspark.sql.functions import array
test = df.select(array('col1', 'col2').alias('array_cols'))
2 进行数据类型转换
# 以浮点型 -> Double型转换为例
from pyspark.sql.types import DoubleType
test = df.select(df['float_col'].cast(DoubleType()))
3 数值型数据标准化

方案1

from pyspark.sql.functions import col, stddev_samp
df = df.withColumn("scaled",
    col("num_col") / df.agg(stddev_samp("num_col")).first()[0])

方案2

""" 标准化公式
X_std = (X - X.min(axis=0)) / (X.max(axis=0) - X.min(axis=0))
X_scaled = X_std * (max - min) + min
此处假设放缩到 [0,1] 
"""
from pyspark.sql.functions import max, min
min_raw = df.select(min('col')).first()[0] # 注:也可以采用 df.agg({'col':'min'}).first()[0] 的方式
max_raw = df.select(max('col')).first()[0]
new_df = df.withColumn('x_std', (df['col'] - fn.lit(min_raw)) / fn.lit(max_raw - min_raw))
# 根据自定义区间重新缩放
scaled_df = new_df.withColumn(output_col, (new_df['x_std'] * fn.lit(max - min)) + fn.lit(min))
4 过滤空值/非空行
# 过滤空值行
df.filter(df['col'].isNull())
# 过滤非空行
df.filter(df['col'].isNotNull())
5 去重
df = df.dropDuplicates()
6 去空
df = df.dropna()
7 列重命名
# 方案1
df.withColumnRenamed('col1', 'col2')

# 方案2
df.select(F.col('col1').alias('col2'))

# 方案3 
# toDF(*cols), *cols -> list of new column names.
df.toDF('col2', 'col3')
8 DataFrame 合并
df.union(df2)
9 将集合拆分为全部子集
from itertools import chain, combinations
def powerset(iterable, maxlen=5, controller=1):
    """
    powerset([1,2,3]) --> () (1,) (2,) (3,) (1,2) (1,3) (2,3) (1,2,3)
    """
    xs = list(iterable)
    # note we return an iterator rather than a list
    xs_len = len(xs)+1
    if xs_len > 20:
        maxlen = maxlen - controller
    if xs_len > maxlen:
        xs_len = maxlen
    return [i for i in chain.from_iterable(combinations(xs,n) for n in range(xs_len)) if i != ()]

from utils.base import powerset
from pyspark.sql.types import ArrayType, StringType
# 注册一个 UDF 用户自定义函数,注意,这里需要定义返回值,这里是 ArrayType(ArrayType(StringType()))
powerset_udf = F.udf(lambda z: powerset(z), ArrayType(ArrayType(StringType())))
# 将 ArrayType(StringType()) 的数据,进行拆分
df.withColumn('SUB_SET', powerset_udf('SET'))
10 将子集或 Array 拓展 Explode
# 此处 id_col 假设为用户 id StringType
# array_sku_col 假设为 Array<string> 类型的 SKU 数组
# 通过此条命令,可以将 Array 拓展开来,最终达成 id StringType | sku StringType 这样的格式
df.select('id_col', F.explode('array_sku_col').alias('col'))

这块还有一个延伸情况,比如我有一个 商品属性集合,形式如下:

df_row ( 'sku_code_001',  [['1kg', 10], ['2kg', 20], ['3kg', 30]] )

我现在想把这个数据,扩展成如下形式

df_row ( 'sku_code_001',  '1kg', 10)
df_row ( 'sku_code_001',  '2kg', 20 )
df_row ( 'sku_code_001',  '3kg', 30 )

那么就不能先选择列,然后再拓展,否则会变成 9 行,而是得先扩展 [['1kg', 10], ['2kg', 20], ['3kg', 30]] 然后再选择成列。

11 JOIN

PySpark 中的 JOIN 除了我们通常用到 left / right / inner / full / outer ... 以外,还存在 left_semileft_anti 这两个比较特殊的 JOIN 模式

其中 left_semi 表示只保留 left_df 中可以和 right_df 中匹配上的行,left_anti 则与之相反,相当于只保留 left_df 中和 right_df 匹配不上的行。

那么 left_semiinner 的区别在哪里呢?

区别在于: left_semi 以及 left_anti 是只保留 left_df 中的行,即相当于 right_df 只起到了过滤作用,而不产生连接。

12 计算日期距离最大日期的差值(常用于时间衰减)
# 先计算最大日期
max_dt = df.select(max('date')).first()[0]
# 然后再用 datediff 计算日期间隔
df.withColumn('datediff', F.datediff(F.lit(max_dt), df['date']))
# 注意,datediff 在这里输入字符串格式即可,会自动进行日期解析 ('2015-04-08','2015-05-10')
13 Pandas df <-> Spark df
# 1. pyspark df -> pandas df
pd_df = spark_df.toPandas()

# 2. pyspark df <- pandas df
spark_df = spark.createDataFrame(pd_df)
14 Spark SQL 执行
df2 = spark.sql("SELECT field1 AS f1, field2 as f2 from table1")
df2.collect()
15 排序
df_sort = df.sort('col1', ascending=False)
df_sort = df.sort(F.col('col1')
16 union & unionByName

简单来说,union 是基于位置的,unionByName 是基于列名的,所以使用的时候,一定要注意。

17 获取指定列

有的时候存在一大堆列,我们需要从中获取部分列,一种方法是一个一个 select 但显然比较麻烦,还有一种方式是使用 .like() | .rlike()

前者类似 sql 中的 like,后者类似 sql 的正则表达式。

18 值替换 .replace
replace(to_replace, value=<no value>, subset=None)

示例:

'''
原 Dataframe
+---+-----+
|int|float|
+---+-----+
|  1|  1.0|
|  2|  2.0|
+---+-----+
'''

df.replace(1, 100)
# 注意,这里对 1 / 1.0 都进行了替换。
'''
+---+-----+
|int|float|
+---+-----+
|100|100.0|
|  2|  2.0|
+---+-----+
'''

df.replace(1, 'zzz')
# ValueError: Mixed type replacements are not supported
# 不支持跨字段类型的替换。所以如果想要将 1 替换为 'zzz' 首先要对列格式进行转换,转换为字符串类型,方可。

df.replace(1, 100, subset='int')
'''
+---+-----+
|int|float|
+---+-----+
|100|  1.0|
|  2|  2.0|
+---+-----+
'''
19 聚合函数 .collect_list

有类似下述场景:

有如下表格,姓名为对应班级中同学的姓名。

班级, 姓名 1, zzz 1, aaa 1, bbb 2, ccc 2, ddd

我们需要基于班级进行聚合,并将同学姓名组合为一个列表,此时就需要使用聚合函数 .collect_list

df = spark.createDataFrame(
    [
        (1, 'zzz'),
        (1, 'aaa'),
        (1, 'bbb'),
        (2, 'ccc'),
        (2, 'ddd')
    ],
    ["class", "name"]
)
'''
+-----+----+
|class|name|
+-----+----+
|    1| zzz|
|    1| aaa|
|    1| bbb|
|    2| ccc|
|    2| ddd|
+-----+----+
'''

# 对 df 基于 class 进行分组,并对 name 进行合并
df.groupBy('class').agg({'name':'collect_list'})
'''
+-----+------------------+
|class|collect_list(name)|
+-----+------------------+
|    1|   [aaa, bbb, zzz]|
|    2|        [ddd, ccc]|
+-----+------------------+
'''