线程池顾名思义,是用来管理线程的一个对象。但并不带表它很简单。
一、设计一个线程池
1. 简易线程对象
一个基础的线程池,通常就是用于存放线程和管理线程的对象,但在Go中我们通常使用的是协程而不是直接使用线程,而Go中的协程并不是一个对象,而是使用 go func()
启动动的,并非一个对象,那如果在Go中控制协程呢?
答案很简单,自己造一个啦。
type Callable interface {
run()
Start()
}
type Thread struct {
ThreadId int
}
func (t *Thread) run() {
// do something
}
func (t *Thread) Start() {
go t.run()
}
上述代码之实现了一个简易的线程对象,还定义了一个Callable
接口。
有了线程对象,线程池就好搞了。
2. 简易线程池
那现在,在线程对象的基础上,写出一个控制所有线程的线程池,代码如下:
type Pool struct {
pool map[int]Callable
size int
mux sync.Mutex
}
func (p *Pool) AddThread() int {
p.mux.Lock()
defer p.mux.Unlock()
threadId := p.size
p.pool[threadId] = NewThread(threadId)
p.size++
return threadId
}
func (p *Pool) DelThread(threadId int) {
p.mux.Lock()
defer p.mux.Unlock()
delete(p.pool, threadId)
p.size--
}
func (p *Pool) StartThread(threadId int) {
td, exist := p.pool[threadId]
if !exist {
return
}
td.Start()
}
最然线程池完成了,但远没到我们想要的样子,那接下来,我们为线程对象加一个状态,让它可以控制启停
3. 带状态的线程
增加一个 state 字段,用于表示线程状态,在线程运行进判断该字符是否正常,如是不正常就停止线程。注意state需要线程安全,所以还需要加锁。代码如下:
type Thread struct {
ThreadId int
stateMux sync.RWMutex
state int
}
func (t *Thread) run() {
for t.GetState() == 0 {
// do something
}
log.Println("线程结束")
}
func (t *Thread) Start() {
go t.run()
}
func (t *Thread) Stop() {
t.SetState(1)
}
func (t *Thread) GetState() int {
t.stateMux.RLock()
defer t.stateMux.RUnlock()
return t.state
}
func (t *Thread) SetState(state int) {
t.stateMux.Lock()
defer t.stateMux.Unlock()
t.state = state
}
如上所示,由于state读的次数远大于写的次数,所以自然要用RWMutex
了。
4. 什么是可伸缩线程池
线程池的使用比较简单,通常来说我们会有一个任务队列,任务队列是线程安全的,线程池会同时启动多个线程去消费队列内的任务。
但普通线程池有个很大的问题,那就是如果线程增加的过多,会浪费资源,如果太少又可能不会用。所以我们需要可伸缩的线程池。
可伸缩线程池是在此基础上改进而来的,线程初分来两类,主线程和辅线程。其中主线程相当于常住线程,一但启动不会自动停止。而辅线程则会在一定时间后自动停止,这样就可以在有突发流量下增加线程,流量恢复后线程自动停止已节约资源。
二、实现一个可伸缩线程池
首先,实现可伸缩线程池主要有两个要点,一个需要一个字段来表示当前线程是否是辅线程。
还需要确定好如果是辅线程,他的自动结束条件。
例如,当任务列表超过阀值时增加线程,代码如下:
func (p *Pool) AddTask(task *Task) {
p.taskQueue.Put(task) // taskQueue为一个线程安全的任务队列
if len(p.taskQueue) > 64 { // 以64为阀值,任务队列长度超过64则增加线程
if p.CurThreadNum < MasterThreadNum {
// 如果当前线程数小于主线程数,增加主线程
tId := p.AddThread()
p.StartThread(tId)
return
}
if p.CurThreadNum < MaxThreadNum {
// 如果当前线程数小于最大线程数,增加辅线程
tId := p.AddTempThread()
p.StartThread(tId)
}
}
}
除此外,还要修改一下线程的 run 函数,需要让线程可以自动停止
func (t *Thread) run() {
for t.GetState() == 0 {
// do something
if t.temp {
// 辅线程停止的条件
}
}
log.Println("线程结束")
}
至此,一个简易的可伸缩线程池就基本完成了