在java语言里面,当我们有子任务需要执行的时候,我们几乎都是把这些子任务放在线程池里面的,而不会每次都new Thread(xxx).start()。那么在go语言里面,我们有没有线程池呢?
答案是有的,只是在go语言里面他的名称称为工作池,但是实际和线程池是一样的。
在java语言里面,线程池是现成的框架,我们可以直接拿来开箱即用,但是在go语言里面确实比较少的,我们需要手动的编写一个工作池,整体的思路如下:
1)首先我们要定义一个Job
这里每一个任务都是一个job,示例代码如下:
/** 定义一个job的内容 */ type Job struct { id int name string }
2)然后我们定义一个工作池
这里我们主要是通过channel来定义工作池,相当于做一个队列这种,把任务放到这个channel队列里面去,示例代码如下:
/* * 定义一个工作池 */ type Worker struct { id int //工作池id workerPool chan chan Job //工作者池(通道的通道),每个元素都是一个job通道, 公共的job jobChannel chan Job //工作通道,每个元素是一个job,worker私有的job exit chan bool //结束信号 }
3)定义工作池的开始任务执行
这里的话,工作池需要启动起来,需要把任务给添加进来并且执行,这里的示例代码如下:
// Start 工作池开始 func (s *Scheduler) Start() { Workers := make([]*Worker, s.workerMaxNum) for i := 0; i < s.workerMaxNum; i++ { worker := NewWorker(s.workerPool, i) worker.Start() Workers[i] = &worker } s.workers = Workers go s.schedule() }
4)定义工作池的结束任务执行
既然有开始任务,那么就有结束任务,示例代码如下:
// Stop 工作池的关闭 func (s *Scheduler) Stop() { Workers := s.workers for _, w := range Workers { w.Stop() } time.Sleep(time.Second) close(s.workerPool) }
5)创建工作池的调度管理者
工作池在正常运行的时候是需要调度管理者进行调度的,也就是把对应的任务安排进工作池里面,所以这里我们需要定义一个调度管理者
type Scheduler struct { workerPool chan chan Job //工作池 workerMaxNum int //最大工作者数量 workers []*Worker //worker队列 }
6)管理者开始监听任务
有了管理者,那么管理者就有生命周期,总共分为: 启动监听->调度->结束。所以首先我们来编写启动监听的类,示例代码如下:
/* * 管理者开始监听任务 */ func (w Worker) Start() { go func() { for { select { case job := <-w.jobChannel: // 收到任务 fmt.Println("get a job from private w.JobChannel") fmt.Println(job) case <-w.exit: // 收到结束信号 fmt.Println("worker exit", w) return } } }() }
7)管理者调度任务
管理者启动监听之后,当有任务来,就会进行任务的调度,示例代码如下:
/* * 管理者调度任务 */ func (s *Scheduler) schedule() { for { select { case job := <-JobQueue: fmt.Println("get a job from JobQueue") go func(job Job) { //从WorkerPool获取jobChannel,忙时阻塞 jobChannel := <-s.workerPool fmt.Println("get a private jobChannel from public s.WorkerPool", reflect.TypeOf(jobChannel)) jobChannel <- job fmt.Println("worker's private jobChannel add one job") }(job) } } }
8)管理者结束任务
最后我们就是管理者结束任务,示例代码如下:
/* * 管理者结束任务 */ func (w Worker) Stop() { go func() { w.exit <- true }() }
以上我们就完成了一个简单的工作池的编写。
在使用工作池的时候,我们肯定是需要创建一个工作池的,示例代码如下:
/* * 创建一个工作池 */ func NewWorker(WorkerPool chan chan Job, id int) Worker { fmt.Printf("new a worker(%d)\n", id) return Worker{ id: id, workerPool: WorkerPool, jobChannel: make(chan Job), exit: make(chan bool), } }
同时还需要创建一个调度管理者
// NewScheduler 创建一组管理者 func NewScheduler(workerMaxNum int) *Scheduler { workerPool := make(chan chan Job, workerMaxNum) // 工作池 return &Scheduler{workerPool: workerPool, workerMaxNum: workerMaxNum} }
最后我们就可以使用了,在main函数里面的使用方法是:
MaxWorker := 5 // 最大worker数量 JobQueue := make(chan Job, 5) // 工作通道,模拟需处理的工作 scheduler := NewScheduler(MaxWorker) //创建调度这 scheduler.Start() //启动调度者 ${doaction} //添加任务 scheduler.Stop() //停止执行
最后我们来运行看看效果:
以上就是关于go语言工作池的介绍,后期我们会专门写一篇实战的文章。 最后按照惯例,附上本案例的源码,登录后即可下载。
还没有评论,来说两句吧...