5

【丢弃】【spark】rdd.

 3 years ago
source link: https://www.guofei.site/2018/03/29/spark1.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

【丢弃】【spark】rdd.

2018年03月29日

Author: Guofei

文章归类: ,文章编号: 151


版权声明:本文作者是郭飞。转载随意,但需要标明原文链接,并通知本人
原文链接:https://www.guofei.site/2018/03/29/spark1.html

Edit

回收原因:RDD过时了

3. RDD

3.1. 初始化SparkContext

from pyspark import SparkContext, SparkConf
conf=SparkConf().setMaster('local').setAppName('My App')#local可以改为local[*],以增加核心数
sc=SparkContext(conf=conf)

3.2. 创建RDD

lines=sc.parallelize(['pandas','i like pandas','hello','hello world!','error line','warning line'])
lines=sc.textFile('test.txt')

RDD有两种操作:transformationactions

3.3 transformation

RDD1.map(func)#func接受每一个元素,返回每一个元素,作为新RDD的元素
RDD1.filter(func)#筛选满足条件的RDD值,func接受每一个元素,返回一个bool类型
RDD1.flatMap(func) #如果func返回的是一个迭代器,那么把迭代器里所有对象放入新RDD中并摊平

#集合操作
RDD1.distinct()
RDD1.union(RDD2) #并集
RDD1.intersection(RDD2) #交集
RDD1.subtract(RDD2) #差集

RDD1.cartesian(RDD2) #笛卡尔积

RDD1.sample(withReplacement=False,fraction=0.2) #采样
errorsRDD=lines.filter(lambda line:'error' in line)
warningRDD=lines.filter(lambda line:'warning' in line)

badRDD=errorsRDD.union(warningRDD)# 合并

3.4. actions

RDD1.reduce(lambda x,y:x+y)
RDD1.fold(0,lambda x,y:x+y) #相当于一个带初始值的reduce

RDD1.foreach(func1) #对每个元素使用func1???
RDD1.aggregate(zeroValue,seqOp,combOp)# 没搞太清楚???

RDD1.collect()
RDD1.take(10)
RDD1.top(10)
RDD1.first()

RDD1.count()
RDD1.countByValue() #返回dict,内容是每个元素的个数

3.5. pair RDD

pair RDD是一种特殊的RDD
转化:

pair_RDD1=sc.parallelize([(1,2),(5,2),(3,4),(3,3)])
pair_RDD2=sc.parallelize([(1,5),(6,2)])

pair_RDD1.reduceByKey(func) #分组应用func,例如func=lambda x,y:x+y。是transformation(对比reduce是actions)
pari_RDD1.foldByKey(0,func) #类似fold,带初始值的reduce。是transformation,fold是action


pair_RDD1.groupByKey() # 同一个key下的value变成[value1,value2,...]???没明白
combineByKey#???


pair_RDD1.flatMapValues(func) #func 每次接受1个value,返回iterable。返回的RDD保留key,摊平value
pair_RDD1.mapValues(func) #对每个value应用func,key保持不变


pair_RDD1.keys()#返回key组成的RDD
pair_RDD1.values()#返回value组成的RDD
pair_RDD1.sortByKey()#按照key排序
# 按key排序的高级玩法
pair_RDD1.sortByKey(ascending=True,keyfunc=lambda x:str(x)) #ascending默认为True,keyfunc可以把原本元素转化成新对象,然后用新对象排序

双pair RDD操作:


pair_RDD1.subtractByKey(pair_RDD2) #删除key相同的元素
pair_RDD1.join(pair_RDD2)  #内连接,返回类似[(1, (2, 5))]的RDD
pair_RDD1.leftOuterJoin(pair_RDD2) #左边的保留
pair_RDD1.rightOuterJoin(pair_RDD2) #右边的保留
pair_RDD1.cogroup(pair_RDD2) #key-value的RDD,其中value是一个iterable

pari RDD的行动:

countByKey() #返回key的计数
lookup(key) #返回key所对应的所有value

row

from pyspark.sql import Row
row=Row(k1=2,k2='abc',k3=99)

# 取数
row.k1
row['k1']

参考文献

https://blog.csdn.net/wy250229163/article/details/52354278


您的支持将鼓励我继续创作!

Recommend

  • 42
    • www.itweet.cn 6 years ago
    • Cache

    Why Spark RDD

    我提出的论文计划,一再被打乱,我也在找机会慢慢调整过来。 今天,我们聊一聊Spark,我第一次在工作中使使用spark是0.9版本,当时是试用Spark来做OLAP Cube模型,那个时候的SparkSQL称为 Shark ,历史原因,spark...

  • 24
    • 微信 mp.weixin.qq.com 5 years ago
    • Cache

    spark rdd的另类解读

  • 30
    • www.tuicool.com 5 years ago
    • Cache

    Spark -- RDD

    传统的MapReduce框架运行缓慢 有向无环图的 中间计算结果 需要写入 硬盘 来防止运行结果丢失 每次调用中间计算结果都需要重新进行一次硬盘的读取 ...

  • 10
    • www.itrensheng.com 3 years ago
    • Cache

    Spark RDD的弹性到底指什么 | IT人生

    RDD(Resiliennt Distributed Datasets)抽象弹性分布式数据集对于Spark来说的弹性计算到底提现在什么地方? 自动进行内存和磁盘数据这两种存储方式的切换 Spark 可以使用 persist 和 cache 方法将任意 RDD 缓存到内存或者磁盘文件系统中。数据...

  • 6
    • blog.knoldus.com 3 years ago
    • Cache

    Difference between RDD , DF and DS in Spark

    Difference between RDD , DF and DS in Spark Reading Time: 3 minutesIn this blog I try to cover the difference between RDD, DF and DS. much of you have a little bit confused about RDD, DF and DS. so don’t worry a...

  • 7
    • blog.knoldus.com 3 years ago
    • Cache

    Spark: RDD vs DataFrames

    Spark: RDD vs DataFrames Reading Time: 3 minutesSpark SQL is a Spark module for structured data processing. Unlike the basic Spark RDD API, the interfaces provided by Spark SQL provide Spark with more informatio...

  • 9
    • lorenzo-dee.blogspot.com 2 years ago
    • Cache

    Apache Spark RDD and Java Streams

    Apache Spark RDD and Java Streams A few months ago, I was fortunate enough to participate in a few PoCs (proof-of-concepts) that used Apache Spark. There, I got the chance to use resilient distributed datasets (RDDs for short), t...

  • 6

    WordPress Hosting...

  • 2
    • yoursite.com 2 years ago
    • Cache

    Spark笔记(1):RDD编程

    WordPress Hosting...

  • 5
    • www.analyticsvidhya.com 2 years ago
    • Cache

    An End-to-End Starter Guide on Apache Spark and RDD

    This article was published as a part of the Data Science Blogathon.

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK