线程池顾名思义,是用来管理线程的一个对象。但并不带表它很简单。

一、设计一个线程池

1. 简易线程对象

一个基础的线程池,通常就是用于存放线程和管理线程的对象,但在Go中我们通常使用的是协程而不是直接使用线程,而Go中的协程并不是一个对象,而是使用 go func() 启动动的,并非一个对象,那如果在Go中控制协程呢?
答案很简单,自己造一个啦。

type Callable interface {
    run()
    Start()
}

type Thread struct {
    ThreadId int
}

func (t *Thread) run() {
    // do something
}

func (t *Thread) Start() {
    go t.run()
}

上述代码之实现了一个简易的线程对象,还定义了一个Callable接口。
有了线程对象,线程池就好搞了。

2. 简易线程池

那现在,在线程对象的基础上,写出一个控制所有线程的线程池,代码如下:

type Pool struct {
    pool map[int]Callable
    size int
    mux  sync.Mutex
}

func (p *Pool) AddThread() int {
    p.mux.Lock()
    defer p.mux.Unlock()

    threadId := p.size
    p.pool[threadId] = NewThread(threadId)
    p.size++
    return threadId
}

func (p *Pool) DelThread(threadId int) {
    p.mux.Lock()
    defer p.mux.Unlock()
    
    delete(p.pool, threadId)
    p.size--
}

func (p *Pool) StartThread(threadId int) {
    td, exist := p.pool[threadId]
    if !exist {
        return
    }

    td.Start()
}

最然线程池完成了,但远没到我们想要的样子,那接下来,我们为线程对象加一个状态,让它可以控制启停

3. 带状态的线程

增加一个 state 字段,用于表示线程状态,在线程运行进判断该字符是否正常,如是不正常就停止线程。注意state需要线程安全,所以还需要加锁。代码如下:

type Thread struct {
    ThreadId int
    stateMux sync.RWMutex
    state    int
}

func (t *Thread) run() {
    for t.GetState() == 0 {
        // do something
    }
    log.Println("线程结束")
}

func (t *Thread) Start() {
    go t.run()
}

func (t *Thread) Stop() {
    t.SetState(1)
}

func (t *Thread) GetState() int {
    t.stateMux.RLock()
    defer t.stateMux.RUnlock()

    return t.state
}

func (t *Thread) SetState(state int) {
    t.stateMux.Lock()
    defer t.stateMux.Unlock()

    t.state = state
}

如上所示,由于state读的次数远大于写的次数,所以自然要用RWMutex了。

4. 什么是可伸缩线程池

线程池的使用比较简单,通常来说我们会有一个任务队列,任务队列是线程安全的,线程池会同时启动多个线程去消费队列内的任务。

但普通线程池有个很大的问题,那就是如果线程增加的过多,会浪费资源,如果太少又可能不会用。所以我们需要可伸缩的线程池。
可伸缩线程池是在此基础上改进而来的,线程初分来两类,主线程和辅线程。其中主线程相当于常住线程,一但启动不会自动停止。而辅线程则会在一定时间后自动停止,这样就可以在有突发流量下增加线程,流量恢复后线程自动停止已节约资源。

二、实现一个可伸缩线程池

首先,实现可伸缩线程池主要有两个要点,一个需要一个字段来表示当前线程是否是辅线程。
还需要确定好如果是辅线程,他的自动结束条件。

例如,当任务列表超过阀值时增加线程,代码如下:

func (p *Pool) AddTask(task *Task) {
    p.taskQueue.Put(task) // taskQueue为一个线程安全的任务队列
    if len(p.taskQueue) > 64 { // 以64为阀值,任务队列长度超过64则增加线程
        if p.CurThreadNum < MasterThreadNum {
            // 如果当前线程数小于主线程数,增加主线程
            tId := p.AddThread()
            p.StartThread(tId)
            return
        }

        if p.CurThreadNum < MaxThreadNum {
            // 如果当前线程数小于最大线程数,增加辅线程
            tId := p.AddTempThread()
            p.StartThread(tId)
        }
    }
}

除此外,还要修改一下线程的 run 函数,需要让线程可以自动停止

func (t *Thread) run() {
    for t.GetState() == 0 {
        // do something
        if t.temp {
            // 辅线程停止的条件
        }
    }
    log.Println("线程结束")
}

至此,一个简易的可伸缩线程池就基本完成了