47

使用 Spark 轻松做数据透视(Pivot)

 5 years ago
source link: https://mp.weixin.qq.com/s?__biz=MzI3MDU3OTc1Nw%3D%3D&%3Bmid=2247483946&%3Bidx=1&%3Bsn=f179adecfa6032280d104b376a19912d&%3Bchksm=eacfa368ddb82a7e3bb132d37d433d04f029b7c01487d260319c23010091482f0fc842e
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

spark从1.6开始引入,到现在2.4版本,pivot算子有了进一步增强,这使得后续无论是交给pandas继续做处理,还是交给R继续分析,都简化了不少。大家无论在使用pandas、numpy或是R的时候,首先会做的就是处理数据,尤其是将列表,转成成合适的形状。

列表

在说透视表之前,我们先看看,什么是列表,在传统观念上,列表的每一行代表一条记录,而每一列代表一个属性。

+-------+-------+-----+

|   date|project|value|

+-------+-------+-----+

|2018-01|     p1|  100|

|2018-01|     p2|  200|

|2018-01|     p3|  300|

|2018-02|     p1| 1000|

|2018-02|     p2| 2000|

|2018-03|     px|  999|

+-------+-------+-----+

举个简单的例子,如上表,一条记录可能代表某个项目,在某个年月创造的价值。而在这个表里面,某一列,就代表一个属性,比如date代表日期,project代表项目名称。而这里每一行,代表一条独立,完整的记录,一条与另外一条记录,没有直接的关系。

这种结构,也是一般关系型数据库的数据结构。

透视表

透视表没有一个明确的定义,一般是观念上是指,为了方便进行数据分析,而对数据进行一定的重排,方便后续分析,计算等操作。透视表每一个元素及其对应的“坐标”一起形成一条完整的记录。

+-------+------+------+-----+-----+

|   date|    p1|    p2|   p3|   px|

+-------+------+------+-----+-----+

|2018-01| 100.0| 200.0|300.0|  0.0|

|2018-02|1000.0|2000.0|  0.0|  0.0|

|2018-03|   0.0|   0.0|  0.0|999.0|

+-------+------+------+-----+-----+

上面的表,是将列表进行重排后的透视表,其第一行和第一列可以理解成索引,而在表中根据索引可以确定一条唯一的值,他们一起组成一条相当于列表里的数据。

通过一般的定义,我们能看出,透视表主要用于分析,所以,一般的场景我们都会先对数据进行聚合,以后再对数据分析,这样也更有意义。就好像,将话费清单,做成透视表,尽管逻辑上没有任何问题,但是结果是可能比现在的清单列表更难查阅。

PS:一些可以借鉴的名词,目前维基百科并没有收录,也只能权且理解一下吧

Yj22Yfe.jpg!web

建模拟数据

先来模拟个数据吧,按照前面的例子,建个csv,这里多加了一列s2,是为了做多透视列的,

date,project,value,s2
2018-01,p1,100,12
2018-01,p2,200,33
2018-01,p3,300,44
2018-02,p1,1000,22
2018-02,p2,2000,41
2018-03,px,999,22

spark API

我们先来看下DEMO程序

SparkConf sparkConf = new SparkConf().setAppName("JavaWordCount").setMaster("local");
SparkContext sc = SparkContext.getOrCreate(sparkConf);
SparkSession ss = new SparkSession(sc);
Dataset<Row> ds = ss.read()
         //csv分隔符 
        .option("sep", ",")
         //是否包含header
        .option("header", "true")
        //加载csv路径
        .csv("E:\\devlop\\workspace\\sparkdemo\\src\\main\\java\\com\\dafei1288\\spark\\data1.csv");
Dataset<Row>  r = 
        //设置分组
        ds.groupBy(col("date"))
        //设置pivot
        .pivot("project")
        //设置聚合
        .agg(sum("value"));
r.show();

在加载csv的时候,我们设置了分隔符,以及读取表头。

对加载后的dataset只需要进行3步设置

  1. groupBy 设置分组列

  2. pivot 设置pivot列

  3. agg 设置聚合方式,可以是求和、平均等聚合函数

我们得到的输出结果如下:

+-------+------+------+-----+-----+

|   date|    p1|    p2|   p3|   px|

+-------+------+------+-----+-----+

|2018-03|  null|  null| null|999.0|

|2018-02|1000.0|2000.0| null| null|

|2018-01| 100.0| 200.0|300.0| null|

+-------+------+------+-----+-----+

请注意,这里和sql有些区别,就是groupBy的时候,不需要将project列写入了,如果写入成了

groupBy(col("date"),col("project"))

那么结果就是这样了

+-------+-------+------+------+-----+-----+

|   date|project|    p1|    p2|   p3|   px|

+-------+-------+------+------+-----+-----+

|2018-01|     p3|  null|  null|300.0| null|

|2018-01|     p2|  null| 200.0| null| null|

|2018-01|     p1| 100.0|  null| null| null|

|2018-03|     px|  null|  null| null|999.0|

|2018-02|     p1|1000.0|  null| null| null|

|2018-02|     p2|  null|2000.0| null| null|

+-------+-------+------+------+-----+-----+

sparkSQL 

SparkConf sparkConf = new SparkConf().setAppName("JavaWordCount").setMaster("local");
SparkContext sc = SparkContext.getOrCreate(sparkConf);
SparkSession ss = new SparkSession(sc);
Dataset<Row> ds = ss.read() .option("sep", ",")
        .option("header", "true").csv("E:\\devlop\\workspace\\sparkdemo\\src\\main\\java\\com\\dafei1288\\spark\\data1.csv");
ds.registerTempTable("f");
Dataset<Row>  r = ds.sqlContext().sql(
 "select * from (
    select date,project as p,sum(value) as ss from f group by date,project
   )
  pivot (  
      sum(ss) 
      for p in ( 'p1','p2','p3','px' )  
   ) 
   order by date");
r.na().fill(0).show();

可以看到,这里我们将读取的csv注册成了表f,使用spark sql语句,这里和oracle的透视语句类似

pivot语法: pivot( 聚合列  for  待转换列 in (列值) )   

其语法还是比较简单的。

为了展示数据好看一点,我特意使用语句

r.na().fill(0)

将空值`null`替换成了0。

+-------+------+------+-----+-----+

|   date|    p1|    p2|   p3|   px|

+-------+------+------+-----+-----+

|2018-01| 100.0| 200.0|300.0|  0.0|

|2018-02|1000.0|2000.0|  0.0|  0.0|

|2018-03|   0.0|   0.0|  0.0|999.0|

+-------+------+------+-----+-----+

多聚合列

上文提到了,多做了一列,就是为了这个DEMO准备的,使用如下SparkSQL语句,设置多聚合列透视表

select * from (
    select date,project as p,sum(value) as ss,sum(s2) as ss2 from f group by date,project
)
pivot (  
      sum(ss),sum(ss2)  
     for p in ( 'p1','p2','p3','px' ) 
) 
order by date

这里为例方便看,我就截图了

EjayIvE.png!web

为了防止OOM的情况,spark对pivot的数据量进行了限制,其可以通过 spark.sql.pivotMaxValues 来进行修改,默认值为10000,这里是指piovt后的列数。

好了,关于spark pivot就介绍到这了,其实这里与矩阵的行列转换类似,pivot对应的也有unpivot,下次我们再聊。

参考资料:

https://stackoverflow.com/questions/30244910/how-to-pivot-dataframe

https://databricks.com/session/pivoting-data-with-sparksql


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK