pySparkDataFrame入门

star2017 1年前 ⋅ 2898 阅读
DataFrame是一种不可变的分布式数据集,这种数据被组织成指定的列,类似于关系数据库中的表。Spark DataFrame与Python pandas 中的DataFrame类似,通过在分布式数据集上施加结构,让 spark 用户利用spark SQL来查询结构化的数据或使用spark表达式方法。

1、创建DataFrame

可以通过直接读入json或parquet等文件来创建DataFrame,还可以通过RDD来创建DataFrame。

df = spark.read.parquet(parquet_file)
df = spark.read.csv(csv_file)
df = spark.read.json(json_file)

df = spark.createDataFrame(RDD, schema)
df = rdd.toDF(*cols)

2、DataFrame数据初步查看

通过printSchema可以查看DataFrame各列的数据类型,而describe则可以查看各列数据的统计情况。

# 查看DataFrame数据结构
df = spark.createDataFrame([('1', 'Joe', '70000', '1'), ('2', 'Henry', '80000', None)],
                           ['Id', 'Name', 'Sallary', 'DepartmentId'])
df.printSchema()
# 输出
root
 |-- Id: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sallary: string (nullable = true)
 |-- DepartmentId: string (nullable = true)

# 查看数据基本统计情况
df.describe().show()
# 输出
+-------+------------------+-----+-----------------+------------+
|summary|                Id| Name|          Sallary|DepartmentId|
+-------+------------------+-----+-----------------+------------+
|  count|                 2|    2|                2|           1|
|   mean|               1.5| null|          75000.0|         1.0|
| stddev|0.7071067811865476| null|7071.067811865475|         NaN|
|    min|                 1|Henry|            70000|           1|
|    max|                 2|  Joe|            80000|           1|
+-------+------------------+-----+-----------------+------------+

from pyspark.sql.functions import count
# 查看各列非空记录的数量
df.agg(*[count(c).alias(c) for c in df.columns]).show()
# 输出
+---+----+-------+------------+
| Id|Name|Sallary|DepartmentId|
+---+----+-------+------------+
|  2|   2|      2|           1|
+---+----+-------+------------+

3、操作DataFrame

3.1 选择DataFrame子集

在很多时候我们不需要分析全部的DataFrame元素,只需要其中一部分,这时候便需要对其列进行选择。pyspark DataFrame筛选子集的方法很多:

  • df.select(), 根据列名来选择子集;
  • df.selectExpr(), 用来选择某列并对某列进行变换,返回变换后的值;
  • df.where(), df.filter(), 这两个函数的用法相同,都是用来提取符合特定条件的记录(行);
  • df.distinct(), 用来过滤重复的记录(行),返回不含重复记录的DataFrame子集;
  • df.sample(withReplacement, fraction, seed=None),随机抽样;
  • df.sampleBy(col, fractions, seed=None),根据某一列类别来进行抽样,用来进行分层抽样;
  • df.withColumn(colName, col),用来对某一列进行操作,如转换数据类型,根据某一列创建新列等;
  • withColumnRenamed(existing, new), 重命名列;
from pyspark.sql.functions import *
df = spark.createDataFrame([('a',[1,2,3]),('b',[2,3,4])], ['key','value'])
df.show()
df.select(df.key, explode(df.value)).show()
+---+---------+
|key|    value|
+---+---------+
|  a|[1, 2, 3]|
|  b|[2, 3, 4]|
+---+---------+

+---+---+
|key|col|
+---+---+
|  a|  1|
|  a|  2|
|  a|  3|
|  b|  2|
|  b|  3|
|  b|  4|
+---+---+

df = spark.createDataFrame([('a',1),('a',2),('a',3),('a',1),('b',1),('b',2)],['key', 'val'])
df.show()
|key|val|
+---+---+
|  a|  1|
|  a|  2|
|  a|  3|
|  a|  1|
|  b|  1|
|  b|  2|
+---+---+

df.select('key').show()
df.selectExpr('length(key)').show()
+---+
|key|
+---+
|  a|
|  a|
|  a|
|  a|
|  b|
|  b|
+---+

+-----------+
|length(key)|
+-----------+
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
+-----------+

df.filter(df.key=='a').show()
df.where(df.key=='a').show()
+---+---+
|key|val|
+---+---+
|  a|  1|
|  a|  2|
|  a|  3|
|  a|  1|
+---+---+

+---+---+
|key|val|
+---+---+
|  a|  1|
|  a|  2|
|  a|  3|
|  a|  1|
+---+---+

df.distinct().show()
+---+---+
|key|val|
+---+---+
|  a|  1|
|  b|  1|
|  a|  2|
|  a|  3|
|  b|  2|
+---+---+

 df.sample(withReplacement=False, fraction=0.5, seed=666).show()
+---+---+
|key|val|
+---+---+
|  a|  1|
|  a|  2|
|  a|  1|
|  b|  2|
+---+---+

df.sampleBy('key', fractions={'a':0.2,'b':0.6}, seed=123).show()
+---+---+
|key|val|
+---+---+
|  a|  3|
|  b|  2|
+---+---+

add1 = udf(lambda x: x+1)
df.withColumn('val1', add1('val')).show()
df.withColumn('val', df.val.cast('float')).show()
+---+---+----+
|key|val|val1|
+---+---+----+
|  a|  1|   2|
|  a|  2|   3|
|  a|  3|   4|
|  a|  1|   2|
|  b|  1|   2|
|  b|  2|   3|
+---+---+----+

+---+---+
|key|val|
+---+---+
|  a|1.0|
|  a|2.0|
|  a|3.0|
|  a|1.0|
|  b|1.0|
|  b|2.0|
+---+---+

df.withColumnRenamed('key', 'kk').show()
+---+---+
| kk|val|
+---+---+
|  a|  1|
|  a|  2|
|  a|  3|
|  a|  1|
|  b|  1|
|  b|  2|
+---+---+
3.2 处理NA(空值)元素

pyspark中提供了df.na.drop方法来丢掉空值行,使用df.na.fill方法来使用某些值来替换空值。

df.show()
+---+-----+-------+------------+
| Id| Name|Sallary|DepartmentId|
+---+-----+-------+------------+
|  1|  Joe|  70000|           1|
|  2|Henry|  80000|        null|
+---+-----+-------+------------+

# df.fillna('666') 效果与下面相同
df.na.fill('666')
+---+-----+-------+------------+
| Id| Name|Sallary|DepartmentId|
+---+-----+-------+------------+
|  1|  Joe|  70000|           1|
|  2|Henry|  80000|         666|
+---+-----+-------+------------+

# df.dropna()与下面结果相同
df.na.drop()
+---+----+-------+------------+
| Id|Name|Sallary|DepartmentId|
+---+----+-------+------------+
|  1| Joe|  70000|           1|
+---+----+-------+------------+
3.3 连接DataFrame

与大多数关系数据表相同,spark中的DataFrame也提供了join功能。

df1 = spark.createDataFrame([('a',1),('b',2),('c',3)],['x1','x2'])
df2 = spark.createDataFrame([('a','T'),('b','F'),('d','T')],['x1','x3'])
df1.show();df2.show()
+---+---+   
| x1| x2|
+---+---+
|  a|  1|
|  b|  2|
|  c|  3|
+---+---+

+---+---+
| x1| x3|
+---+---+
|  a|  T|
|  b|  F|
|  d|  T|
+---+---+

join支持的方式有:’inner’, ‘outer’, ‘full’, ‘fullouter’, ‘full_outer’, ‘leftouter’, ‘left’, ‘left_outer’, ‘rightouter’, ‘right’, ‘right_outer’, ‘leftsemi’, ‘left_semi’, ‘leftanti’, ‘left_anti’, ‘cross’:

print('left:')
df1.join(df2, on='x1', how='left').show()
df1.join(df2, df1.x1==df2.x1, how='left').show()
print('right:')
df1.join(df2, on='x1', how='right').show()
print('outer:')
df1.join(df2, on='x1', how='outer').show()
print('inner:')
df1.join(df2, on='x1', how='inner').show()
print('leftsemi:')
df1.join(df2, on='x1', how='leftsemi').show()
print('leftanti:')
df1.join(df2, on='x1', how='leftanti').show()

left:
+---+---+----+
| x1| x2|  x3|
+---+---+----+
|  c|  3|null|
|  b|  2|   F|
|  a|  1|   T|
+---+---+----+

+---+---+----+----+
| x1| x2|  x1|  x3|
+---+---+----+----+
|  c|  3|null|null|
|  b|  2|   b|   F|
|  a|  1|   a|   T|
+---+---+----+----+

right:
+---+----+---+
| x1|  x2| x3|
+---+----+---+
|  d|null|  T|
|  b|   2|  F|
|  a|   1|  T|
+---+----+---+

outer:
+---+----+----+
| x1|  x2|  x3|
+---+----+----+
|  d|null|   T|
|  c|   3|null|
|  b|   2|   F|
|  a|   1|   T|
+---+----+----+

inner:
+---+---+---+
| x1| x2| x3|
+---+---+---+
|  b|  2|  F|
|  a|  1|  T|
+---+---+---+

leftsemi:
+---+---+
| x1| x2|
+---+---+
|  b|  2|
|  a|  1|
+---+---+

leftanti:
+---+---+
| x1| x2|
+---+---+
|  c|  3|
+---+---+

需要注意的是,join后的DataFrame是乱序的。

3.4 集合操作

DataFrame也支持常见的集合操作:union, intersection, subtract。用法如下:

df1 = spark.createDataFrame([('a',1),('b',2),('c',3)],['x1','x2'])
df2 = spark.createDataFrame([('a','T'),('b','F'),('d','T')],['x1','x2'])
df1.show();df2.show()
+---+---+
| x1| x2|
+---+---+
|  a|  1|
|  b|  2|
|  c|  3|
+---+---+

+---+---+
| x1| x2|
+---+---+
|  a|  T|
|  b|  F|
|  d|  T|
+---+---+

集合操作如下:

print('union:')
df1.union(df2).orderBy('x1', ascending=True).show()
print('intersect:')
df1.intersect(df2).orderBy('x1', ascending=True).show()
print('subtract:')
df1.subtract(df2).orderBy('x1', ascending=True).show()

union:
+---+---+
| x1| x2|
+---+---+
|  a|  T|
|  a|  1|
|  b|  2|
|  b|  F|
|  c|  3|
|  d|  T|
+---+---+

intersect:
+---+---+
| x1| x2|
+---+---+
+---+---+

subtract:
+---+---+
| x1| x2|
+---+---+
|  a|  1|
|  b|  2|
|  c|  3|
+---+---+
3.4 DataFrame的一些高级操作

拆分DataFrame单列

df = spark.createDataFrame([('a',[1,2,3]), ('b', [4,5,6])], ['key', 'values'])
df.show()
df.printSchema()

+---+---------+
|key|   values|
+---+---------+
|  a|[1, 2, 3]|
|  b|[4, 5, 6]|
+---+---------+

df.selectExpr('key', 'values[1]').show()
+---+---------+
|key|values[1]|
+---+---------+
|  a|        2|
|  b|        5|
+---+---------+

单列变多行

df = spark.createDataFrame([('a','1,2,3'),('b','4,5,6')],['key', 'values'])
df.show()
+---+------+
|key|values|
+---+------+
|  a| 1,2,3|
|  b| 4,5,6|
+---+------+

import pyspark.sql.functions as F
df.select("key", F.split("values", ",").alias("values"),
          F.posexplode(F.split("values", ",")).alias("pos", "val")).drop("val").select("key", F.expr("values[pos]").alias("val")).show()

+---+---+
|key|val|
+---+---+
|  a|  1|
|  a|  2|
|  a|  3|
|  b|  4|
|  b|  5|
|  b|  6|
+---+---+

多列变多行

from pyspark.sql.functions import *

def to_long(df, by):
    cols, dtypes = zip(*((c,t) for (c,t) in df.dtypes if c not in by))
    # Spark SQL supports only homogeneous columns
    assert len(set(dtypes))==1, 'All columns have to be of the same type'
    # Create and explode an array of (column_name, column_value) structs
    kvs = explode(array([struct(lit(c).alias('key1'), col(c).alias('val')) for c in cols])).alias('kvs')
    return df.select(by, kvs).select(by, 'kvs.key1', 'kvs.val')


df = spark.createDataFrame([('a',1,2,3),('b',4,5,6)],['key', 'c1', 'c2', 'c3'])
df.show()
+---+---+---+---+
|key| c1| c2| c3|
+---+---+---+---+
|  a|  1|  2|  3|
|  b|  4|  5|  6|
+---+---+---+---+

dd = to_long(df, 'key')
dd.show()
+---+---+---+
|key|key1|val|
+---+---+---+
|  a| c1|  1|
|  a| c2|  2|
|  a| c3|  3|
|  b| c1|  4|
|  b| c2|  5|
|  b| c3|  6|
+---+---+---+

分组统计

dd.show()
+---+----+---+
|key|key1|val|
+---+----+---+
|  a|  c1|  1|
|  a|  c2|  2|
|  a|  c3|  3|
|  b|  c1|  4|
|  b|  c2|  5|
|  b|  c3|  6|
+---+----+---+

dd.groupby('key').count().show()
+---+-----+
|key|count|
+---+-----+
|  b|    3|
|  a|    3|
+---+-----+

数据透视表

dd.show()
+---+----+---+
|key|key1|val|
+---+----+---+
|  a|  c1|  1|
|  a|  c2|  2|
|  a|  c3|  3|
|  b|  c1|  4|
|  b|  c2|  5|
|  b|  c3|  6|
+---+----+---+

dd.groupby('key').pivot('val').count().show()
+---+----+----+----+----+----+----+
|key|   1|   2|   3|   4|   5|   6|
+---+----+----+----+----+----+----+
|  b|null|null|null|   1|   1|   1|
|  a|   1|   1|   1|null|null|null|
+---+----+----+----+----+----+----+

聚合函数

dd.show()
+---+----+---+
|key|key1|val|
+---+----+---+
|  a|  c1|  1|
|  a|  c2|  2|
|  a|  c3|  3|
|  b|  c1|  4|
|  b|  c2|  5|
|  b|  c3|  6|
+---+----+---+

import pyspark.sql.functions as F
dd.agg(F.sum(dd.val), F.max(dd.val), F.min(dd.val)).show()
+--------+--------+--------+
|sum(val)|max(val)|min(val)|
+--------+--------+--------+
|      21|       6|       1|
+--------+--------+--------+

参考:
pyspark官方文档

本文来自简书,观点不代表一起大数据-技术文章心得立场,如若转载,请注明出处:https://www.jianshu.com/p/eea013037851

更多内容请访问:IT源点

相关文章推荐

全部评论: 0

    我有话说: