基于MapReduce思想,用Channel实现Stream流处理框架

警告
本文最后更新于 2023-05-05,文中内容可能已过时。

我们在公司搬砖,经常会写一些“脚本”、“定时任务”用来大规模的处理数据。这种代码通常会利用并发来加速数据处理,同时我们还知道传统的互斥锁低效且不优雅,所以此时用 Go 语言的 Channel 就是最好的方案。

本文的目标就是基于MapReduce思想,用Channel实现一个Stream流处理框架,让小白也能快速写出 Go 的并发程序,快速处理大量数据。

该工具实现在我的开源项目 go-kit,各位可以用 Go Module 的方式使用它。

下面我们详细解释下这款 Stream 流处理脚手架,首先我们要看看什么是 MapReduce:

MapReduce 编程思想

MapReduce 是一种编程模型,它最早出现在 2004 年 Google 公司的论文“MapReduce:Simplified Data Processing on Large Clusters”(面向大型集群的简化数据处理),是 Google 公司开源的一项重要技术。

MapReduce 主要用于大数据量的计算,能够让没多少并行计算经验的开发人员也可以开发并行应用程序。

MapReduce 采用“分而治之”的思想,简而言之,就是“分散任务,汇总结果”。将大规模数据集分发给多个子节点共同完成,然后整合各个子节点的中间结果,得出最终的计算结果。

MapReduce 给予我们哪些启发?

MapReduce 主要用于大数据、分布式计算领域,原本与我们的单机程序并不相关。

但是,我们可以学习 MapReduce “分散任务,汇总结果”的思想,优化我们的单机并发程序。

多个语言的内置函数库中都有 MapReduce 思想的影子,比如Python reduce()函数,再比如Java stream库

有了这些的知识储备后,我们就可以着手实现我们自己的 Stream 流处理脚手架了:

详解 Stream 库

函数用法讲解

Stream 库提供 4 个函数,分别是:

  1. From 函数:Stream 流数据来源,比如 SQL 查询出来的数据。
  2. Map 函数:实现数据流处理逻辑,支持并发。
  3. Filter 函数:实现数据流过滤逻辑,支持并发。
  4. Done 函数:数据流汇总,做最后的处理。

如果用一个图描述,Stream 流处理的过程长这样:

1

简单的并发

如果想并发处理数据,只需要在函数后面增加一个参数WithWorkerNum

1
From(xxx).Map(xxx, WithWorkerNum(10))

2

随意构建你的业务程序

Stream 流处理脚手架支持你任意搭配Map函数Filter函数以实现你自己的业务代码:

3

实现你自己的业务逻辑

From函数Filter函数Map函数Done函数的参数类型为函数:

1
2
3
4
5
6
7
func From(fn srcIteratorFunc) Stream 

func (s Stream) Map(fn MapFunc, opts ...OptionFunc) Stream

func (s Stream) Filter(fn FilterFunc, opts ...OptionFunc) Stream

func (s Stream) Done(fn done) (result interface{}, err error)

这些函数就是我们实现业务逻辑的地方,实现了相应的函数后我们将其作为参数传给 Stream 流处理脚手架,比如下面这个 Demo:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package stream

import (
	"fmt"
	"log"
	"testing"
)

type Member struct {
	UserId  int64
	GroupId int64
	Wallet  int64
}

// 这是使用 Stream 包的一个 Demo
// 假设 Group 是豆瓣小组,Member 是小组成员,我们将对 GroupId < 3 的星球的所有成员赠送 1 块钱。
func TestName(t *testing.T) {
	res, err := From(GetMembers).Filter(MemberWalletFilter).Map(WalletUpdate, WithWorkerNum(10)).Done(Report)
	if err != nil {
		log.Printf("%+v", err)
	}
	log.Println(res)
}

func GetMembers(dst chan<- interface{}) {
	// TODO 实现业务逻辑
}

func MemberWalletFilter(val interface{}) bool {
	// TODO 实现业务逻辑
}

func WalletUpdate(val interface{}) interface{} {
	// TODO 实现业务逻辑
}

func Report(src <-chan interface{}) (result interface{}, err error) {
    // TODO 实现业务逻辑
}

转载声明:本文允许转载,原文地址:基于MapReduce思想,用Channel实现Stream流处理框架

Buy me a coffee~
室长 支付宝支付宝
室长 微信微信
0%