Redian新闻
>
探究 Go 的高级特性之 【处理1分钟百万请求】

探究 Go 的高级特性之 【处理1分钟百万请求】

公众号新闻

作者:Goland猫 

https://juejin.cn/post/7245919919223636023 

对于大型的互联网应用程序,如电商平台、社交网络、金融交易平台等,每秒钟都会收到大量的请求。在这些应用程序中,需要使用高效的技术来应对高并发的请求,尤其是在短时间内处理大量的请求,如1分钟百万请求。

同时,为了降低用户的使用门槛和提升用户体验,前端需要实现参数的无感知传递。这样用户在使用时,无需担心参数传递的问题,能够轻松地享受应用程序的服务。

在处理1分钟百万请求时,需要使用高效的技术和算法,以提高请求的响应速度和处理能力。Go语言以其高效性和并发性而闻名,因此成为处理高并发请求的优秀选择。Go中有多种模式可供选择,如基于goroutine和channel的并发模型、使用池技术的协程模型等,以便根据具体应用的需要来选择适合的技术模式。

本文代码参考搬至

https://marksuper.xyz/2021/10/08/handle_million_req/http://marcio.io/2015/07/handling-1-million-requests-per-minute-with-golang/

W1

W1 结构体类型,它有五个成员:

  • WgSend 用于等待任务发送的 goroutine 完成。
  • Wg 用于等待任务处理的 goroutine 完成。
  • MaxNum 表示 goroutine 池的大小。
  • Ch 是一个字符串类型的通道,用于传递任务。
  • DispatchStop 是一个空结构体类型的通道,用于停止任务分发。
type W1 struct {
 WgSend       *sync.WaitGroup
 Wg           *sync.WaitGroup
 MaxNum       int
 Ch           chan string
 DispatchStop chan struct{}
}

接下来是 Dispatch 方法,它将任务发送到通道 Ch 中。它通过 for 循环来发送 10 倍于 MaxNum 的任务,每个任务都是一个 goroutine。defer 语句用于在任务完成时减少 WgSend 的计数。select 语句用于在任务分发被中止时退出任务发送。

Dispatch

func (w *W1) Dispatch(job string) {
 w.WgSend.Add(10 * w.MaxNum)
 for i := 0; i < 10*w.MaxNum; i++ {
  go func(i int) {
   defer w.WgSend.Done()

   select {
   case w.Ch <- fmt.Sprintf("%d", i):
    return
   case <-w.DispatchStop:
    fmt.Println("退出发送 job: ", fmt.Sprintf("%d", i))
    return
   }

  }(i)
 }
}

StartPool

然后是 StartPool 方法,它创建了一个 goroutine 池来处理从通道 Ch 中读取到的任务。

如果通道 Ch 还没有被创建,那么它将被创建。如果计数器 WgSend 还没有被创建,那么它也将被创建。如果计数器 Wg 还没有被创建,那么它也将被创建。

如果通道 DispatchStop 还没有被创建,那么它也将被创建。

for 循环用于创建 MaxNum 个 goroutine 来处理从通道中读取到的任务。defer 语句用于在任务完成时减少 Wg 的计数。

func (w *W1) StartPool() {
 if w.Ch == nil {
  w.Ch = make(chan string, w.MaxNum)
 }

 if w.WgSend == nil {
  w.WgSend = &sync.WaitGroup{}
 }

 if w.Wg == nil {
  w.Wg = &sync.WaitGroup{}
 }

 if w.DispatchStop == nil {
  w.DispatchStop = make(chan struct{})
 }
 w.Wg.Add(w.MaxNum)
 for i := 0; i < w.MaxNum; i++ {
  go func() {
   defer w.Wg.Done()
   for v := range w.Ch {
    fmt.Printf("完成工作: %s \n", v)
   }
  }()
 }
}

Stop

最后是 Stop 方法,它停止任务分发并等待所有任务完成。

关闭了通道 DispatchStop,等待 WgSend 中的任务发送 goroutine 完成,然后关闭通道 Ch,等待 Wg 中的任务处理 goroutine 完成。

func (w *W1) Stop() {
 close(w.DispatchStop)
 w.WgSend.Wait()
 close(w.Ch)
 w.Wg.Wait()
}

W2

SubWorker


type SubWorker struct {
 JobChan chan string
}

子协程,它有一个 JobChan,用于接收任务。

Run:SubWorker 的方法,用于启动一个子协程,从 JobChan 中读取任务并执行。

func (sw *SubWorker) Run(wg *sync.WaitGroup, poolCh chan chan string, quitCh chan struct{}) {
 if sw.JobChan == nil {
  sw.JobChan = make(chan string)
 }
 wg.Add(1)

 go func() {
  defer wg.Done()

  for {
   poolCh <- sw.JobChan

   select {
   case res := <-sw.JobChan:
    fmt.Printf("完成工作: %s \n", res)

   case <-quitCh:
    fmt.Printf("消费者结束...... \n")
    return

   }
  }
 }()
}

W2

type W2 struct {
 SubWorkers []SubWorker
 Wg         *sync.WaitGroup
 MaxNum     int
 ChPool     chan chan string
 QuitChan   chan struct{}
}

Dispatch

Dispatch:W2 的方法,用于从 ChPool 中获取 TaskChan,将任务发送给一个 SubWorker 执行。

func (w *W2) Dispatch(job string) {
 jobChan := <-w.ChPool

 select {
 case jobChan <- job:
  fmt.Printf("发送任务 : %s 完成 \n", job)
  return

 case <-w.QuitChan:
  fmt.Printf("发送者(%s)结束 \n", job)
  return

 }
}

StartPool

StartPool:W2 的方法,用于初始化协程池,启动所有子协程并把 TaskChan 存储在 ChPool 中。

func (w *W2) StartPool() {
 if w.ChPool == nil {
  w.ChPool = make(chan chan string, w.MaxNum)
 }

 if w.SubWorkers == nil {
  w.SubWorkers = make([]SubWorker, w.MaxNum)
 }

 if w.Wg == nil {
  w.Wg = &sync.WaitGroup{}
 }

 for i := 0; i < len(w.SubWorkers); i++ {
  w.SubWorkers[i].Run(w.Wg, w.ChPool, w.QuitChan)
 }
}

Stop

Stop:W2 的方法,用于停止协程的工作,并等待所有协程结束。

func (w *W2) Stop() {
 close(w.QuitChan)
 w.Wg.Wait()

 close(w.ChPool)
}

DealW2 函数则是整个协程池的入口,它通过 NewWorker 方法创建一个 W2 实例,然后调用 StartPool 启动协程池,并通过 Dispatch 发送任务,最后调用 Stop 停止协程池。

func DealW2(max int) {
 w := NewWorker(w2, max)
 w.StartPool()
 for i := 0; i < 10*max; i++ {
  go w.Dispatch(fmt.Sprintf("%d", i))
 }

 w.Stop()
}

个人见解

  • 看到这里对于w2我已经有点迷糊了,还能传递w.Wg, w.ChPool, w.QuitChan?
原来是golang里如果方法传递的不是地址,那么就会做一个拷贝,所以这里调用的wg根本就不是一个对象。

传递的地方传递地址就可以了,如果不传递地址,将会出现死锁

go doSomething(i, &wg, ch)

func doSomething(index int, wg *sync.WaitGroup, ch chan int) {
  • w1也有一个比较大的问题。在处理请求时,每个 Goroutine 都会占用一定的系统资源,如果请求量过大,会造成 Goroutine 数量的剧增消耗过多系统资源,程序可能会崩溃

探究原文

在这段代码中,poolCh代表工作者池,sw.JobChan代表工作者的工作通道。当一个工作者完成了工作后,它会将工作结果发送到sw.JobChan,此时可以通过case res := <-sw.JobChan:来接收该工作的结果。

在这个代码块中,还需要处理一个退出信号quitCh。因此,第二个case <-quitCh:用于检测是否接收到了退出信号。如果接收到了退出信号,程序将打印出消息并结束。

需要注意的是,这两个case语句是互斥的,只有当工作者完成工作或收到退出信号时,才会进入其中一个语句。因此,这个循环可以保证在工作者完成工作或收到退出信号时退出。

需要读取两次sw.JobChan的原因是:第一次读取用于将工作者的工作通道放回工作者池中,这样其他工作者就可以使用该通道。第二次读取用于接收工作者的工作结果或退出信号。因此,这两次读取是为了确保能够在正确的时刻将工作者的工作通道放回工作者池中并正确地处理工作结果或退出信号。

根据w2的特点 我自己写了一个w2

import (
   "fmt"
   "sync"
)

type SubWorkerNew struct {
   JobChan chan string
}

type W2New struct {
   SubWorkers []SubWorkerNew
   Wg         *sync.WaitGroup
   MaxNum     int
   ChPool     chan chan string
   QuitChan   chan struct{}
}

func NewW2(maxNum int) *W2New {
   subWorkers := make([]SubWorkerNew, maxNum)
   for i := 0; i < maxNum; i++ {
      subWorkers[i] = SubWorkerNew{JobChan: make(chan string)}
   }

   pool := make(chan chan string, maxNum)
   for i := 0; i < maxNum; i++ {
      pool <- subWorkers[i].JobChan
   }

   return &W2New{
      SubWorkers: subWorkers,
      Wg:         &sync.WaitGroup{},
      MaxNum:     maxNum,
      ChPool:     pool,
      QuitChan:   make(chan struct{}),
   }
}

func (w *W2New) Dispatch(job string) {
   select {
   case jobChannel := <-w.ChPool:
      jobChannel <- job
   default:
      fmt.Println("All workers busy")
   }
}

func (w *W2New) StartPool() {
   for i := 0; i < w.MaxNum; i++ {
      go func(subWorker *SubWorkerNew) {
         w.Wg.Add(1)
         defer w.Wg.Done()

         for {
            select {
            case job := <-subWorker.JobChan:
               fmt.Println("processing ", job)
            case <-w.QuitChan:
               return
            }
         }
      }(&w.SubWorkers[i])
   }
}

func (w *W2New) Stop() {
   close(w.QuitChan)
   w.Wg.Wait()
   close(w.ChPool)

   for _, subWorker := range w.SubWorkers {
      close(subWorker.JobChan)
   }
}

func main() {
   w := NewW2(5)
   w.StartPool()

   for i := 0; i < 20; i++ {
      w.Dispatch(fmt.Sprintf("job %d", i))
   }

   w.Stop()
}

但是有几个点需要注意

1.没有考虑JobChan通道的缓冲区大小,如果有大量任务被并发分配,容易导致内存占用过高;

2.每个线程都会执行无限循环,此时线程退出的条件是接收到QuitChan通道的信号,可能导致线程的阻塞等问题;

3.Dispatch函数的默认情况下只会输出"All workers busy",而不是阻塞,这意味着当所有线程都处于忙碌状态时,任务会丢失

4.线程池启动后无法动态扩展或缩小

优化

这个优化版本改了很多次。有一些需要注意的点是,不然会一直死锁


1.使用sync.WaitGroup来确保线程池中所有线程都能够启动并运行;

2.在Stop函数中,先向SubWorker的JobChan中发送一个关闭信号,再等待所有SubWorker线程退出;

3.在Dispatch函数中,将默认情况下的输出改为阻塞等待可用通道;

w2new

package handle_million_requests

import (
 "fmt"
 "sync"
 "time"
)

type SubWorkerNew struct {
 Id      int
 JobChan chan string
}

type W2New struct {
 SubWorkers []SubWorkerNew
 MaxNum     int
 ChPool     chan chan string
 QuitChan   chan struct{}
 Wg         *sync.WaitGroup
}

func NewW2(maxNum int) *W2New {
 chPool := make(chan chan string, maxNum)
 subWorkers := make([]SubWorkerNew, maxNum)
 for i := 0; i < maxNum; i++ {
  subWorkers[i] = SubWorkerNew{Id: i, JobChan: make(chan string)}
  chPool <- subWorkers[i].JobChan
 }
 wg := new(sync.WaitGroup)
 wg.Add(maxNum)

 return &W2New{
  MaxNum:     maxNum,
  SubWorkers: subWorkers,
  ChPool:     chPool,
  QuitChan:   make(chan struct{}),
  Wg:         wg,
 }
}

func (w *W2New) StartPool() {
 for i := 0; i < w.MaxNum; i++ {
  go func(wg *sync.WaitGroup, subWorker *SubWorkerNew) {
   defer wg.Done()
   for {
    select {
    case job := <-subWorker.JobChan:
     fmt.Printf("SubWorker %d processing job %s\n", subWorker.Id, job)
     time.Sleep(time.Second) // 模拟任务处理过程
    case <-w.QuitChan:
     return
    }
   }
  }(w.Wg, &w.SubWorkers[i])
 }
}

func (w *W2New) Stop() {
 close(w.QuitChan)
 for i := 0; i < w.MaxNum; i++ {
  close(w.SubWorkers[i].JobChan)
 }
 w.Wg.Wait()
}

func (w *W2New) Dispatch(job string) {
 select {
 case jobChan := <-w.ChPool:
  jobChan <- job
 default:
  fmt.Println("All workers busy")
 }
}
func (w *W2New) AddWorker() {
 newWorker := SubWorkerNew{Id: w.MaxNum, JobChan: make(chan string)}
 w.SubWorkers = append(w.SubWorkers, newWorker)
 w.ChPool <- newWorker.JobChan
 w.MaxNum++
 w.Wg.Add(1)
 go func(subWorker *SubWorkerNew) {
  defer w.Wg.Done()

  for {
   select {
   case job := <-subWorker.JobChan:
    fmt.Printf("SubWorker %d processing job %s\n", subWorker.Id, job)
    time.Sleep(time.Second) // 模拟任务处理过程
   case <-w.QuitChan:
    return
   }
  }
 }(&newWorker)
}

func (w *W2New) RemoveWorker() {
 if w.MaxNum > 1 {
  worker := w.SubWorkers[w.MaxNum-1]
  close(worker.JobChan)
  w.MaxNum--
  w.SubWorkers = w.SubWorkers[:w.MaxNum]
 }
}

AddWorkerRemoveWorker,用于动态扩展/缩小线程池。

  • AddWorker函数中,我们首先将MaxNum增加了1,然后创建一个新的SubWorkerNew结构体,将其添加到SubWorkers中,并将其JobChan通道添加到ChPool通道中。最后,我们创建一个新的协程来处理新添加的SubWorkerNew并让它进入无限循环,等待接收任务。
  • RemoveWorker函数中,我们首先将MaxNum减少1,然后获取最后一个SubWorkerNew结构体,将它的JobChan通道发送到ChPool通道中,并从其通道中读取任何待处理的任务,最后创建一个新的协程来处理SubWorkerNew,继续处理任务。

测试用例

func TestW2New(t *testing.T) {  
    pool := NewW2(3)  
    pool.StartPool()  

    pool.Dispatch("task 1")  
    pool.Dispatch("task 2")  
    pool.Dispatch("task 3")  

    pool.AddWorker()  
    pool.AddWorker()  
    pool.RemoveWorker()  

    pool.Stop()  
}

当Dispatch函数向ChPool通道获取可用通道时,会从通道中取出一个SubWorker的JobChan通道,并将任务发送到该通道中。而对于SubWorker来说,并没有进行任务的使用次数限制,所以它可以处理多个任务。

在这个例子中,当任务数量比SubWorker数量多时,一个SubWorker的JobChan通道会接收到多个任务,它们会在SubWorker的循环中按顺序依次处理,直到JobChan中没有未处理的任务为止。因此,如果任务数量特别大,可能会导致某些SubWorker的JobChan通道暂时处于未处理任务状态,而其他的SubWorker在执行任务。

在测试结果中,最后三行中出现了多个"SubWorker 0 processing job",说明SubWorker 0的JobChan通道接收了多个任务,并且在其循环中处理这些任务。下面的代码片段显示了这个过程:

// SubWorker 0 的循环部分

for {

    select {

    case job := <-subWorker.JobChan:

        fmt.Printf("SubWorker %d processing job %s\n", subWorker.Id, job)

    case <-w.QuitChan:

        return

    }

}
推荐阅读(点击标题可打开)

1、Linux 网络技术栈,看这篇就够了

2、[Golang] 泛型的使用

3、吵翻了!2023 年最大技术分歧:选 Rust 还是 Go ?

微信扫码关注该文公众号作者

戳这里提交新闻线索和高质量文章给我们。
相关阅读
刷屏!路演时补妆、挤痘?张小龙专门坐电梯去怒骂女基金经理10分钟20万张图片训练出医用AI大模型,斯坦福团队整理16年来社交网络数据并建库,使用图像或文本即可检索类似病例无中介费|9.1入住|近红线地铁站步行5分钟/MIT步行15分钟/哈佛步行13分钟高级公寓两室一厅两卫4000纽约10月去哪逛 | 吐血整理16大美食娱乐活动,畅玩金秋!知名美女基金经理10年亏了234亿!却收了33亿的管理费!最新开业的最大的大灰狼室内水上乐园出发啦!9/5超级特价来袭!纽约豪掷百万请难民住酒店,还被嫌“饭不好吃”?真成冤大头了!1.1入住|近红线地铁站步行5分钟/MIT步行15分钟/哈佛步行13分钟高级公寓2B2B4000花100万请的顾问,结果被坑惨了...找男生室友|无中介费|10.1入住|近红线地铁站步行5分钟/MIT步行15分钟/哈佛步行13分钟高级公寓两室一厅两卫3700花200万请顾问的老板们,现在怎么样了?​找室友合租|无中介费|9.1入住|近红线地铁站步行5分钟/MIT步行15分钟/哈佛步行13分钟高级公寓两室一厅两卫3800美麗阿拉斯加(二)安克雷奇 (Anchorage)疯狂星期④,抢980元超值美食卡,尊享超级特价!无法无天!尔湾珠宝店光天化日之下遭爆枪,百万珠宝1分钟之内被抢光1分钟卖了3000双的“仙女单鞋”!腿长+4cm,高级又日常、超美国家地理100+部英文原声纪录片,孩子的见识比知识更重要500万请顾问,美国有钱人是疯了么?9.1入住|接本科|全套家具|室内洗烘拎包入住|近Chinatown地铁站步行1分钟高级公寓1B1B 3K5, 2B2B 5K10万上补习、100万请中介、1000万爬藤...这届父母为孩子能拼到什么程度?老海归回国的原因半中介费|八九月入住|近红线地铁站步行5分钟/MIT步行15分钟/哈佛步行13分钟高级公寓两室一厅两卫4250+无中介费|随时入住|近红线地铁站步行5分钟/MIT步行15分钟/哈佛步行13分钟高级公寓两室一厅1.5卫32502023回国 一真一假的两家店最新开业的最大的大灰狼室内水上乐园出发啦!超级特价来袭!无中介费|随时入住|近红线地铁站步行5分钟/MIT步行15分钟/哈佛步行13分钟高级公寓两室一厅两卫4250北上广深老母亲自述:花费百万请留学顾问,仿佛供了个祖宗...被勒索200万后,河南百万粉丝网红被杀:人性之恶,超乎想象《祖国圆舞曲》&《你在终点等我》推理1760亿参数的BLOOMZ,性能时延仅3.7秒 | 最“in”大模型1.1入住|接本科生 |近伯克利步行1分钟/NEU步行11分钟/BU步行15分钟1B1B 3050,包供暖和热水1.1入住|近伯克利步行1分钟/NEU步行11分钟/BU步行15分钟一室一厅3600,包供暖和热水期待吗?最新开业的大灰狼室内水上乐园出发啦!超级特价来袭!中年爱情2 倒霉的男人降价|无中介费|9.1入住|近红线地铁站步行5分钟/MIT步行15分钟/哈佛步行13分钟高级公寓两室一厅1.5卫3900
logo
联系我们隐私协议©2024 redian.news
Redian新闻
Redian.news刊载任何文章,不代表同意其说法或描述,仅为提供更多信息,也不构成任何建议。文章信息的合法性及真实性由其作者负责,与Redian.news及其运营公司无关。欢迎投稿,如发现稿件侵权,或作者不愿在本网发表文章,请版权拥有者通知本网处理。