1

Mayend的个人空间

 3 years ago
source link: https://my.oschina.net/u/4930910/blog/4952664
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

DStream的转化操作分为无状态有状态 两种

  • 在无状态转化操作中,每个批次的处理不依赖于之前批次的数据。
  • 有状态转化操作需要使用之前批次的数据或者中间结果来计算当前批次的数据,有状态转化操作包括基于滑动窗口的转化操作和追踪状态变化的转换操作。

无状态转化


无状态转化操作的实质就说把简单的RDD转化操作应用到每个批次上,也就是转化DStream的每一个RDD

Transform算子

Transform 允许 DStream 上执行任意的 RDD-to-RDD 函数。即使这些函数并没有在 DStream 的 API 中暴露出来,通过该函数可以方便的扩展 Spark API。该函数每一批次调度一次。其实也 就是对 DStream 中的 RDD 应用转换。

def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("transform").setMaster("local[*]")
    val sc: StreamingContext = new StreamingContext(conf, Seconds(3))
    val lines = sc.socketTextStream("localhost", 9999)

    // transform方法可以将底层RDD获取到后进行操作
    // 1. DStream功能不完善
    // 2. 需要代码周期性的执行

    // Code : Driver端
    val newDS: DStream[String] = lines.transform(
      rdd => {
        // Code : Driver端,(周期性执行)
        rdd.map(
          str => {
            // Code : Executor端
            str
          }
        )
      }
    )
    // Code : Driver端
    val newDS1: DStream[String] = lines.map(
      data => {
        // Code : Executor端
        data
      }
    )
    sc.start()
    sc.awaitTermination()
  }

join算子

两个流之间的 join 需要两个流的批次大小一致,这样才能做到同时触发计算。计算过程就是对当前批次的两个流中各自的 RDD 进行 join,与两个 RDD 的 join 效果相同。

def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
    val ssc = new StreamingContext(sparkConf, Seconds(5))

    val data9999 = ssc.socketTextStream("localhost", 9999)
    val data8888 = ssc.socketTextStream("localhost", 8888)

    val map9999: DStream[(String, Int)] = data9999.map((_,9))
    val map8888: DStream[(String, Int)] = data8888.map((_,8))

    // 所谓的DStream的Join操作,其实就是两个RDD的join
    val joinDS: DStream[(String, (Int, Int))] = map9999.join(map8888)

    joinDS.print()

    ssc.start()
    ssc.awaitTermination()
}

有状态转化


有状态转化操作是跨时间区间跟踪数据的操作,也就是说,一些先前批次的数据也被用来在新的批次中用于计算结果。有状态转换的主要的两种类型:

  • 滑动窗口:以一个时间阶段为滑动窗口进行操作
  • updateStateByKey():通过key值来跟踪数据的状态变化

有状态转化操作需要在StreamingContext中打开检查点机制来提高容错

updateStateByKey

  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("updateStateByKey")
    val sc: StreamingContext = new StreamingContext(conf, Seconds(4))
    sc.checkpoint("cp")
    val ds: ReceiverInputDStream[String] = sc.socketTextStream("localhost", 9999)

    val value: DStream[(String, Int)] = ds.map(((_: String), 1))


    // updateStateByKey:根据key对数据的状态进行更新
    // 传递的参数中含有两个值
    // 第一个值表示相同的key的value数据的集合
    // 第二个值表示缓存区key对应的计算值
    val state: DStream[(String, Int)] = value.updateStateByKey((seq: Seq[Int], option: Option[Int]) => {
      val newCount: Int = option.getOrElse(0) + seq.sum
      Option(newCount)
    })

    state.print()

    sc.start()
    sc.awaitTermination()

  }

所有基于窗口的函数都需要两个参数,分别对应窗口时长滑动步长,并且两者都必须是SparkStreaming的批次间隔的整数倍。
窗口时长控制的是每次用来计算的批次的个数
滑动步长用于控制对新的DStream进行计算的间隔

window操作

基于window进行窗口内元素计数操作

def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
    val ssc = new StreamingContext(sparkConf, Seconds(3))

    val lines = ssc.socketTextStream("localhost", 9999)
    val wordToOne = lines.map((_,1))

    val windowDS: DStream[(String, Int)] = wordToOne.window(Seconds(6), Seconds(6))

    val wordToCount = windowDS.reduceByKey(_+_)

    wordToCount.print()

    ssc.start()
    ssc.awaitTermination()
  }

reduce操作

有逆操作规约是一种更高效的规约操作,通过只考虑新进入窗口的元素和离开窗口的元素,让spark增量计算归约的结果,其在代码上的体现就是reduceFuncinvReduceFunc

普通归约操作

  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
    val ssc = new StreamingContext(sparkConf, Seconds(3))
    ssc.checkpoint("cp")

    val lines = ssc.socketTextStream("localhost", 9999)

    lines.reduceByWindow(
      (x: String, y: String) => {
        x + "-" + y
      },
      Seconds(9), Seconds(3)
    ).print()

    ssc.start()
    ssc.awaitTermination()
  }

有逆归约操作

  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
    val ssc = new StreamingContext(sparkConf, Seconds(3))
    ssc.checkpoint("cp")

    val lines = ssc.socketTextStream("localhost", 9999)
    val wordToOne = lines.map((_,1))

    /**
     * 基于窗口进行有逆归约:通过控制窗口流出和进入的元素来提高性能
     */
    val windowDS: DStream[(String, Int)] =
    wordToOne.reduceByKeyAndWindow(
      (x:Int, y:Int) => { x + y},
      (x:Int, y:Int) => {x - y},
      Seconds(9), Seconds(3))

    windowDS.print()

    ssc.start()
    ssc.awaitTermination()
  }

count操作

  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
    val ssc = new StreamingContext(sparkConf, Seconds(3))
    ssc.checkpoint("cp")

    val lines = ssc.socketTextStream("localhost", 9999)

    /**
     * 统计窗口中输入数据的个数
     * 比如 3s内输入了10条数据,则打印10
     */
    val countByWindow: DStream[Long] = lines.countByWindow(
      Seconds(9), Seconds(3)
    )
    countByWindow.print()

    /**
     * 统计窗口中每个值的个数
     * 比如 3s内输入了1个3 2个4 3个5,则打印(3,1)(2,4)(3,5)
     */
    val countByValueAndWindow: DStream[(String, Long)] = lines.countByValueAndWindow(
      Seconds(9), Seconds(3)
    )
    countByValueAndWindow.print()

    ssc.start()
    ssc.awaitTermination()
  }

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK