【译】图解工作池 Visually Understanding Worker Pool

本文翻译了Medium上的一篇博文《Visually Understanding Worker Pool》。帮助快速了解Job/Worker Model(任务与工作者模型)。原文地址:https://medium.com/coinmonks/visually-understanding-worker-pool-48a83b7fc1f5

正文开始

Example diagram of a worker pool process in Go using goroutines

我们需要关注的问题是开创一种更直观的方法来理解那些模式和其他一些解决方案的架构,如果它们不能被正确理解的话,可能会在您的内部团队中造成黑盒情况。

通常来讲,我们都在通过”粘贴和复制“来编程,甚至没有意识到底层是如何工作的。今天,我们要来回顾一下著名的(但被低估的)工作池(又名线程池)。 在 Go 中,通常使用带缓冲区的通道Channel作为主要队列和 goroutine 之间的通信通道来解决这个问题。在继续阅读这篇文章之前,强烈建议先了解通道Channel的相关概念,不仅仅是为了更清晰的理解。

理解图例

我们在这篇文章的最开始,展示了 Go 中工作池实现的顶层设计。 它总结了需要在一般工作池实现中发生的主要操作:

  • “分配”待处理的资源
  • “工作”或者处理这些资源
  • “收集”结果以进一步的后续处理

这些操作可以通过不同的方式来完成,甚至是一个单纯的只有迭代的代码段,但今天我们将在 Go 中使用并发和并行设计,以最大限度地利用可用的硬件资源,尽快获得结果,并延续微服务架构的设计趋势。

让我们以更详细的“放大”方式查看每个组件:

Main goroutine starts

主goroutine就是正常的程序执行,在这种情况下,我们的程序几乎没有做具体的事情,因为我们要专注于工作池,唯一要执行的特定任务是生成和控制与工作池的正确执行相关的其他goroutine

1
2
3
4
func main()  {
pool := workerpool.NewPool(3) // 3个goroutines(工作者)
pool.Start(resources, ResourceProcessor, ResultProcessor)
}

Done Channel

通道“Done”(done chan bool)用作主程序控制,指示所有任务已执行且所有 goroutine 已完成。 与“主循环”设计不同的是,主goroutine等待通过该通道发送的信号来停止和结束程序。 为什么我们使用一个channel来控制程序执行并等待所有goroutines,这很容易回应:因为简单

1
2
3
4
5
6
7
8
9
10
11
12
13
// Start starts the worker pool process 开始工作池的工作
func (m *Pool) Start(resources []interface{}, procFunc ProcessorFunc, resFunc ResultProcessorFunc) {
log.DEBUG.Print("worker pool starting")
startTime := time.Now()
go m.allocate(resources)
m.done = make(chan bool)
go m.collect(resFunc)
go m.workerPool(procFunc)
<-m.done
endTime := time.Now()
diff := endTime.Sub(startTime)
log.DEBUG.Printf("total time taken: [%f] seconds", diff.Seconds())
}

在上面的代码例子中,我们的程序等待继续执行,直到通道变量“done”(<-m.done)收到一个值。 这使得代码更小且易于阅读。

分配资源

“Allocate” goroutine

“分配”goroutine是从主goroutine产生的子协程,其目的是分配将要由工作池使用的资源。 它有一个生命周期有限的主循环:直到所有资源都分配完毕。 为了更好地理解这一点,让我们看一下下图:

“Allocate” goroutine tasks

“只要有资源要分配,goroutine就会继续运行”就是对上图的一个很好的表述方式。 分配执行接收一个未知大小的资源数组并迭代该数组,将其转换为一个结构化的任务“Job”,稍后可以由另一个goroutine处理。 被转换为“作业”的资源被发送到一个大小/维度有限的缓冲区通道“Jobs”。 你可能已经注意到,在示例中,资源数组大于可以在通道中缓冲/发送的元素。这是故意要这么设计的,因为它控制可以同时执行的并行作业的数量。如果“作业”通道已满,则资源迭代将停止,直到它可以再次在作业通道中分配作业。

我们将资源转换为Job Channel遵循分离关注点 Separation of Concerns 的设计原则。 发送到Jobs通道的资源将被另一个 goroutine 拾取以供以后处理,将分配的关注点限制在仅接收、转换和分配。 以下代码代表了所讨论的图例:

1
2
3
4
5
6
7
8
9
10
// allocate allocates jobs based on an array of resources to be processed by the worker pool
func (m *Pool) allocate(jobs []interface{}) {
defer close(m.jobs)
log.DEBUG.Printf("Allocating [%d] resources", len(jobs))
for i, v := range jobs {
job := Job{id: i, resource: v}
m.jobs <- job
}
log.DEBUG.Print("Done Allocating.")
}

处理Jobs通道(待处理的任务)

Worker Pool

“Worker Pool” goroutine 的存是由“Jobs” Channel 定义的:只要有工作需要处理,就继续这样做。 同样的,关注点分离,这个 goroutine 目的是实际处理作业,并且作为大多数事情,它并发和并行工作。 这是我们讨论中最关键的部分,因为它涵盖了工作池的创建和工作分配

Worker Pool 利用 Sync.WaitGroup 来控制生成新的 goroutines

在这种情况下,工作池 goroutine 产生新的“工作”(或“工作者”)goroutines,产生的数量由配置来定义或注入的方式添加,现在让我们将“3”定义为池中的工作者数量。任何工作池(或线程池)的一个关键因素,就是要控制池的大小,如果你不这样做,很可能导致硬件资源被耗尽,甚至你的程序被死锁。

我们将通过使用 Sync.WaitGroup 来简化我们的解决方案。WaitGroup 基本上用作我们可以随时询问是否所有 goroutine 都已完成的一个结构。 与我们的“Done”通道类似,WaitGroup 包公开了一个 Wait 方法,该方法将暂时休眠,直到收到一个值才被唤醒。 代码类似于:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// workerPool creates or spawns new "work" goRoutines to process the "Jobs" channel
func (m *Pool) workerPool(processor ProcessorFunc) {
defer close(m.results)
log.DEBUG.Printf("Worker Pool spawning new goroutine, total: [%d]", m.numRoutines)
var wg sync.WaitGroup
for i := 0; i < m.numRoutines; i++ {
wg.Add(1)
go m.work(&wg, processor)
log.DEBUG.Printf("Spawned work goroutine [%d]", i)
}
log.DEBUG.Print("Worker Pool done spawning work goroutines")
wg.Wait()
log.DEBUG.Print("all work goroutines done processing")

}

正如您可能已经注意到的,同步器只是一个原子计数器,我们在每个子 goroutine 中递增 (Add) 然后递减 (Done)。 工作池会处于休眠,直到满足 wg.Wait() 条件。

现在让我们看一下每个“工作”(或工作者)的实际执行情况,以更好地了解所有的同步和作业的实际处理过程。

“Work” goroutine process

在 WokerPool goroutine 中,我们看到了如何通过调用 go m.workfor 循环内生成新的“工作”goroutine,这将创建新的 goroutine(s)。

在这个新的 goroutine 中,会有作业的实际处理和计算的结果。为此,每个新的“工作”goroutine 将在通道“Jobs”上执行for range jobs的范围循环。 这样做的效果是,由于 Go 中使用带锁机制的 Channels 的实现,每个工作协程都将抓取一个唯一的 Job,并确保 goroutine 只能拾取通道中的一项,这也定义了“work” goroutine:只要Jobs通道有工作要做,就继续迭代。 这种技术使得代码能更好被阅读和理解,它隐藏了进程之间加锁和解锁共享数据的一些机制。

一旦我们从“Jobs”通道中选择了一个任务,我们就会处理它并创建一个“Result”对象,然后将其发送到“Results”通道。 这是我们采用的关注点分离设计的延续,“工作”goroutine 的生命周期由处理“Job”所需的时间和发送结果到“Results”通道的时间来决定,它将持续迭代如果“Jobs”通道还没有关闭。

1
2
3
4
5
6
7
8
9
10
11
12
13
// work performs the actual work by calling the processor and passing in the Job as reference obtained
// from iterating over the "Jobs" channel
func (m *Pool) work(wg *sync.WaitGroup, processor ProcessorFunc) {
defer wg.Done()
log.DEBUG.Print("goRoutine work starting")
for job := range m.jobs {
log.DEBUG.Printf("working on Job ID [%d]", job.id)
output := Result{job, processor(job.resource)}
m.results <- output
log.DEBUG.Printf("done with Job ID [%d]", job.id)
}
log.DEBUG.Print("goRoutine work done.")
}

上面的代码代表了这一章,正如你所看到的,变量 wg (sync.WaitGroup) 作为方法的引用被传递,所以当我们处理完所有的“作业”后,我们可以调用 wg.Done() 让父进程 goroutine ( workerPool) 知道这个特定的 goroutine 已经完成。

奖励内容:到目前为止,我们发布的代码中没有出现 Job 的实际处理,这是因为我们专门使用了依赖注入,通过使用函数签名 ProcessorFunc 作为参数传递给工作池,然后传递给工作 goroutine ,这隔离了工作池的实际实现,处理工作在另一层完成,使其易于重用。

收集结果

“Collect” goroutine

那么,一旦结果被发送到“结果”通道,我们应该怎么做? 答案很简单:收集、处理并委托结果。

“Collect” goroutine process

我们的缓冲通道“Results”保留了所有工作者返回的所有结果,因此我们有一种不同的方法来处理这个“队列”(在这种情况下,我们只是迭代它们)。 关于此实现的重要部分是理解处理 Job 与处理其结果具有不同的关注点。 一个人可以根据结果采取不同的行动。

1
2
3
4
5
6
7
8
9
10
11
12
// Collect post processes the channel "Results" and calls the ResultProcessorFunc passed in as reference
// for further processing.
func (m *Pool) collect(proc ResultProcessorFunc) {
log.DEBUG.Print("goRoutine collect starting")
for result := range m.results {
outcome := proc(result)
log.DEBUG.Printf("Job with id: [%d] completed, outcome: %s", result.Job.id, outcome)
}
log.DEBUG.Print("goRoutine collect done, setting channel done as completed")
m.done <- true
m.completed = true
}

从前面的代码和关联的图例中,理解几个因素很重要:

  • 再次使用依赖注入将结果注入“后续处理器”,这会在池上下文之外创建结果的委托,注入的函数可以在其中执行不同的操作。此类型的签名类似于:

    type ResultProcessorFunc func(result Result) error

  • 我们只是在通道上迭代,这定义了collect协程的生命周期。 我们可以交换这种方法并创建另一个工作池来加速结果的后续处理。但根据设计,这应该不是必需的。因为在“工作” goroutines 中处理“作业”理论上应该比后续处理话费更多的时间。如果后续处理无法满足你的设计,则意味着你的结果中可能有许多业务逻辑规则,这些规则可能值得研究并移至“作业”中处理。

  • 最后的 m.done <- true 表示“Done”通道,让主 goroutine 知道工作池已完成。

总结

至此,我们已经在 Go 中绘制、理解并实现了一个完整的并发、并行和抽象的工作池。 自信息时代开始以来,为我们的问题解决方案创建视觉表示和/或图表的做法受到了极大的赞赏。 它可以帮助我们抽象、找到模式和机会来改进我们的解决方案,并帮忙我们写出更好的文档。

全文结束