1

状态机与函数式编程(二)

 8 months ago
source link: https://blog.henix.info/blog/functional-state-machine-2/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

状态机与函数式编程(二)

最后更新日期:2023-12-26

  读了我的上一篇文章的读者可能仍然会觉得状态机,或者说 Puhser 这个概念太抽象,不知道该怎么用。因此我在这篇文章中讲一个实际的例子。

  考虑一个常见的数据汇总问题:有一个文件,每行是一个数据,每行数据中包含一个日期和一些统计数字(比如销售额、用户活跃数、用户付费等),需要按月度汇总这些统计数字,并且输出到另一个文件。

  我们还有一些限制条件和设计要点:

  1. 文件已经按照日期从小到大排序
  2. 输出时也应按日期从小到大排序
  3. 每个时间分组中的数据行的数量并不确定,例如一个月可能有 31 天,也可能有 28 天
  4. 这个文件很大,为了控制内存的使用,不能把所有行读进一个大数组里处理
  5. 扩展性:如果以后要求不仅仅是汇总每月,还要按每周、每季度、每年汇总,能否很方便地支持?能否支持任意自定义的日期汇总方式?
  6. 扩展性2:汇总方法可能不仅仅是求和,还可能是求平均、求中位数、求标准差之类的,能否方便地支持自定义?
  7. 扩展性3:可能在其他部分的代码中也有这种“按时间统计汇总”的需求,能否抽象出一个公共函数 / 类?

  这个问题我们一般称为“resample”,在处理时间序列型数据时非常常见。著名的 Python 数据处理库 pandas 还有专门的教程页面

  回到函数式编程,哪种序列变换函数对这个问题最合适?可以一步步地分解思考这个问题:

  1. 我们需要一个将原始日期变换成汇总后日期的函数,例如将“2023-10-06”变换成“2023-10”,即提取出“月”的部分
  2. 上一步得到的值可以看作一个 key ,我们需要将序列中相邻的且 key 相同的元素放进一个组中
  3. 对每个组进行汇总,然后输出

  可见这里的重点是,如何将序列中相邻的且 key 相同的元素分组,有什么现成的函数吗?

  我首先想到的是,这个需求有点像 GroupBy ,但 GroupBy 跟这个需求有一些不同:

  1. GroupBy 的输出结果是一个 Map[Key, Item[]] ,而我们需要输出另一个序列
  2. GroupBy 是全局的,它会把所有 key 相同的汇总,而这个问题中,我们需要汇总的是 key 相同且相邻的元素

  然后我找到一个看上去很接近的 lo.PartitionBy ,但仔细观察后发现,这个函数会调整元素的顺序,依然不是我们需要的。

  所以只能自己实现了,我称之为“SeqGroupBy”:

type Pusher[T any] interface {
  Push(T)
  Flush()
}

type PusherSeqGroupBy[K comparable, A any] struct {
  next   Pusher[[]A]
  getKey func(A) K
  curKey K
  items  []A
}

func NewPusherSeqGroupBy[K comparable, A any](getKey func(A) K, next Pusher[[]A]) *PusherSeqGroupBy[K, A] {
  return &PusherSeqGroupBy[K, A]{next: next, getKey: getKey}
}

func (t *PusherSeqGroupBy[K, A]) Push(a A) {
  newKey := t.getKey(a)
  if len(t.items) == 0 {
    t.items = append(t.items, a)
    t.curKey = newKey
  } else {
    if newKey == t.curKey {
      t.items = append(t.items, a)
    } else {
      t.next.Push(t.items)
      t.curKey = newKey
      t.items = []A{a}
    }
  }
  return
}

func (t *PusherSeqGroupBy[K, A]) Flush() {
  if len(t.items) > 0 {
    t.next.Push(t.items)
    t.next.Flush()
  }
  t.items = nil
  return
}

  构造一个 PusherSeqGroupBy 需要传入两个参数:

  1. getKey: 从一行数据中提取出 key ,这个函数的实现是,先从一行数据中提取出日期,再把日期变成汇总后的日期,比如按月的话就是将“2023-10-06”变成“2023-10”
  2. next: 数据需要传给的下一个处理环节,类型是 Pusher[[]Row] 。接受一个已经分好组的数据组,汇总然后输出

  我们可以画出程序的数据流图(dataflow diagram)如下:

  按行读文件 → SeqGroupBy 分组 → 每个组放进汇总器 → 每条汇总结果输出

  每个右箭头“→”都对应了一个 Pusher.Push() 的调用。最终可以实现内存中最多持有一个汇总组的数据,做到了随用随销毁。

  程序的剩余部分已经很显然了,我想可以留给读者自己完成。从这个例子也可以看出为什么 Pusher 的接口定义中需要有 Flush 。

  这个例子我们还可以看出基于状态机的,或 push-style 函数式编程的一个特点:如果说基于迭代器的函数式编程是以数据的源(source)为基础,经过层层变换,最终输出到数据的汇(sink);那么基于状态机的函数式编程则是反过来将数据的汇(sink)层层包装,最后接入数据源。当然,实际上我们也可以同时使用这两种风格,既用迭代器把源变形,又用状态机把汇变形,然后在中间的某个地方拼接在一起,这样,我们可以自由地选择最合适的工具。

评论模式 评论帮助
请按照如下格式发邮件:
收件人[email protected]
标题[复制]
正文评论 / 回复内容,只支持纯文本
说明评论用户名为你在邮箱中设置的用户名

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK