golang批量执行任务的通用模板
需求
一个接口调用时,接收到一个列表,十个元素,需要并发执行十个任务,每个任务都要返回执行的结果和异常,然后对返回的结果装填到一个切片列表里,统一返回结果。
需要协程处理的结构体
type Order struct {
Name string `json:"name"`
Id int `json:"id"`
}
确定通道数量
一般按入参的需要处理的元素数量为准
taskNum := 10
初始化通道
orderCh := make(chan Order, taskNum) //接收返回的结果
errCh := make(chan error, taskNum) //接收返回的异常
发起执行,我们使用sync.WaitGroup来监听执行情况
wg := sync.WaitGroup{}
for i:=0; i < taskNum; i++ {
wg.Add(1)
go func() {
defer wg.Done()
if i == 3 {//模拟当i=3的时候,返回一个异常
err := errors.New("there is an error")
errCh <- err
return
}
//组装返回结果
res := Order{
Name: "num: " + strconv.Itoa(i),
Id: i,
}
orderCh <- res
}()
}
wg.Wait() //等待所有任务执行完毕
使用for-select接收执行结果
orderList := make([]Order, taskNum)
for i:=0; i<taskNum; i++ {
select {
case order, ok := <-orderCh: //接收orderCh
if ok {
orderList = append(orderList, order)
}
case err := <-errCh: //接收errCh
if err != nil {
return err //看需求,这里设计发现一个错误就直接停止执行,返回错误
}
default:
fmt.Println("done")
}
}
//处理完数据,关闭通道
close(orderCh)
close(errCh)
1,超时问题
任务执行过程中,需要控制每个任务的执行时间,不能超过一定范围,我们用定时器来解决这个问题
timeoutTime := time.Second * 3 //超时时间
taskTimer := time.NewTimer(timeoutTime) //初始化定时器
orderList := make([]Order, taskNum)
for i:=0; i<taskNum; i++ {
select {
....
case <-taskTimeout.C: //处理超时
err := errors.New("task timeout") //此处我们认为超时是错误的一种,赋值给了err
return
...
}
//每次执行都需要重置定时器
taskTimer.Reset(timeoutTime)
}
2, 协程panic问题
主程序是无法捕捉协程内的panic,因此如果不手动处理,就会发生协程内panic导致整个程序中止的情况,我们在defer里处理
for i:=0; i < taskNum; i++ {
wg.Add(1)
go func() {
defer func () {
wg.Done()
//协程内单独捕捉异常
if r := recover(); r != nil {
err := errors.New(fmt.Sprintf("System panic:%v", r))
errCh <- err //此处将panic信息转为err返回,也可以按需求和异常等级进行处理
return
}
}()
........
}()
}
3, 顺序问题
返回的列表元素的顺序,需要跟传参的列表顺序保持一致,这时我们需要定义个带序号的结构体
// 需要记录原始顺序的时候,定义个带编号的结构体
type OrderWithSeq struct {
Seq int
OrderItem Order
}
//重写相关排序类型
type BySeq []OrderWithSeq
func (a BySeq) Len() int {
return len(a)
}
func (a BySeq) Swap(i, j int) {
a[i], a[j] = a[j], a[i]
}
func (a BySeq) Less(i, j int) bool {
return a[i].Seq < a[j].Seq
}
// 调整返回结果
orderCh := make(chan OrderWithSeq, taskNum) //接收带序号的结构体
//在执行任务时,加入序号
for i:=0; i < taskNum; i++ {
i:= i
wg.Add(1)
go func() {
····
//组装返回结果
res := Order{
Name: "num: " + strconv.Itoa(i),
Id: i,
}
orderCh <-OrderWithSeq {
Seq: i, //带上i这个序号
OrderItem: res,
}
}()
//接收信息,也按带序号的结构体进行组装
orderSeqList := make([]OrderWithSeq, taskNum)
for i:=0; i<taskNum; i++ {
select {
case order, ok := <-orderCh: //接收orderCh
if ok {
orderList = append(orderSeqList, order)
}
.....
}
}
//按原始顺序进行排序
sort.Sort(BySeq(orderSeqList))
....重新组装数据返回
总结
标准模板如下:
type Order struct {
Name string `json:"name"`
Id int `json:"id"`
}
// 需要记录原始顺序的时候,定义个带编号的结构体
type OrderWithSeq struct {
Seq int
OrderItem Order
}
//重写相关排序类型
type BySeq []OrderWithSeq
func (a BySeq) Len() int {
return len(a)
}
func (a BySeq) Swap(i, j int) {
a[i], a[j] = a[j], a[i]
}
func (a BySeq) Less(i, j int) bool {
return a[i].Seq < a[j].Seq
}
taskNum := 10
orderCh := make(chan OrderWithSeq, taskNum) //接收带序号的结构体
errCh := make(chan error, taskNum) //接收返回的异常
wg := sync.WaitGroup{}
//在执行任务时,加入序号
for i:=0; i < taskNum; i++ {
i:= i
wg.Add(1)
go func() {
defer func () {
wg.Done()
//协程内单独捕捉异常
if r := recover(); r != nil {
err := errors.New(fmt.Sprintf("System panic:%v", r))
errCh <- err //此处将panic信息转为err返回,也可以按需求和异常等级进行处理
return
}
}()
//组装返回结果
res := Order{
Name: "num: " + strconv.Itoa(i),
Id: i,
}
orderCh <-OrderWithSeq {
Seq: i, //带上i这个序号
OrderItem: res,
}
}()
wg.Wait()
//接收信息,也按带序号的结构体进行组装
orderSeqList := make([]OrderWithSeq, taskNum)
timeoutTime := time.Second * 3
taskTimer := time.NewTimer(timeoutTime)
for i:=0; i<taskNum; i++ {
select {
case order, ok := <-orderCh: //接收orderCh
if ok {
orderList = append(orderSeqList, order)
}
case err := <-errCh: //接收errCh
if err != nil {
return err
}
case <-taskTimer.C: //处理超时
err := errors.New("task timeout")
return
default:
fmt.Println("done")
}
taskTimer.Reset(timeoutTime)
}
close(orderCh)
close(errCh)
//按原始顺序进行排序
sort.Sort(BySeq(orderSeqList))
链接:https://juejin.cn/post/7301150860824854582
(版权归原作者所有,侵删)
微信扫码关注该文公众号作者
戳这里提交新闻线索和高质量文章给我们。
来源: qq
点击查看作者最近其他文章