Redian新闻
>
golang批量执行任务的通用模板

golang批量执行任务的通用模板

公众号新闻

需求

一个接口调用时,接收到一个列表,十个元素,需要并发执行十个任务,每个任务都要返回执行的结果和异常,然后对返回的结果装填到一个切片列表里,统一返回结果。

需要协程处理的结构体

type Order struct {    Name string `json:"name"`    Id int `json:"id"`  }

确定通道数量

一般按入参的需要处理的元素数量为准

taskNum := 10 

初始化通道

orderCh := make(chan Order, taskNum) //接收返回的结果errCh := make(chan error, taskNum) //接收返回的异常

发起执行,我们使用sync.WaitGroup来监听执行情况

wg := sync.WaitGroup{}for i:=0; i < taskNum; i++ {   wg.Add(1)   go func() {     defer wg.Done()     if i == 3 {//模拟当i=3的时候,返回一个异常         err := errors.New("there is an error")         errCh <- err          return     }     //组装返回结果     res := Order{           Name: "num: " + strconv.Itoa(i),           Id: i,           }     orderCh <- res      }()}wg.Wait() //等待所有任务执行完毕

使用for-select接收执行结果

orderList := make([]Order, taskNum)for i:=0; i<taskNum; i++ {    select {        case order, ok := <-orderCh: //接收orderCh        if ok {            orderList = append(orderList, order)        }        case err := <-errCh: //接收errCh        if err != nil {            return err //看需求,这里设计发现一个错误就直接停止执行,返回错误        }        default:        fmt.Println("done")    }}//处理完数据,关闭通道close(orderCh)close(errCh)

1,超时问题

任务执行过程中,需要控制每个任务的执行时间,不能超过一定范围,我们用定时器来解决这个问题

timeoutTime := time.Second * 3  //超时时间taskTimer := time.NewTimer(timeoutTime) //初始化定时器orderList := make([]Order, taskNum)for i:=0; i<taskNum; i++ {    select {        ....        case <-taskTimeout.C: //处理超时            err := errors.New("task timeout") //此处我们认为超时是错误的一种,赋值给了err            return        ...    }    //每次执行都需要重置定时器    taskTimer.Reset(timeoutTime)}


2, 协程panic问题

主程序是无法捕捉协程内的panic,因此如果不手动处理,就会发生协程内panic导致整个程序中止的情况,我们在defer里处理

for i:=0; i < taskNum; i++ {   wg.Add(1)   go func() {     defer func () {      wg.Done()      //协程内单独捕捉异常        if r := recover(); r != nil {          err := errors.New(fmt.Sprintf("System panic:%v", r))          errCh <- err //此处将panic信息转为err返回,也可以按需求和异常等级进行处理        return      }     }()   ........  }()}

3, 顺序问题

返回的列表元素的顺序,需要跟传参的列表顺序保持一致,这时我们需要定义个带序号的结构体

// 需要记录原始顺序的时候,定义个带编号的结构体  type OrderWithSeq struct {      Seq int      OrderItem Order  }  //重写相关排序类型type BySeq []OrderWithSeq    func (a BySeq) Len() int {      return len(a)  }  func (a BySeq) Swap(i, j int) {      a[i], a[j] = a[j], a[i]  }  func (a BySeq) Less(i, j int) bool {      return a[i].Seq < a[j].Seq  }// 调整返回结果orderCh := make(chan OrderWithSeq, taskNum) //接收带序号的结构体//在执行任务时,加入序号for i:=0; i < taskNum; i++ {   i:= i   wg.Add(1)   go func() {     ····     //组装返回结果     res := Order{           Name: "num: " + strconv.Itoa(i),           Id: i,           }     orderCh <-OrderWithSeq {         Seq: i, //带上i这个序号         OrderItem: res,     }  }() //接收信息,也按带序号的结构体进行组装 orderSeqList := make([]OrderWithSeq, taskNum) for i:=0; i<taskNum; i++ {    select {        case order, ok := <-orderCh: //接收orderCh        if ok {            orderList = append(orderSeqList, order)        }       .....     }   } //按原始顺序进行排序sort.Sort(BySeq(orderSeqList))....重新组装数据返回

总结

标准模板如下:

type Order struct {    Name string `json:"name"`    Id int `json:"id"`  }
// 需要记录原始顺序的时候,定义个带编号的结构体 type OrderWithSeq struct { Seq int OrderItem Order } //重写相关排序类型type BySeq []OrderWithSeq func (a BySeq) Len() int { return len(a) } func (a BySeq) Swap(i, j int) { a[i], a[j] = a[j], a[i] } func (a BySeq) Less(i, j int) bool { return a[i].Seq < a[j].Seq }
taskNum := 10 orderCh := make(chan OrderWithSeq, taskNum) //接收带序号的结构体errCh := make(chan error, taskNum) //接收返回的异常wg := sync.WaitGroup{}//在执行任务时,加入序号for i:=0; i < taskNum; i++ { i:= i wg.Add(1) go func() { defer func () { wg.Done() //协程内单独捕捉异常 if r := recover(); r != nil { err := errors.New(fmt.Sprintf("System panic:%v", r)) errCh <- err //此处将panic信息转为err返回,也可以按需求和异常等级进行处理 return } }() //组装返回结果 res := Order{ Name: "num: " + strconv.Itoa(i), Id: i, } orderCh <-OrderWithSeq { Seq: i, //带上i这个序号 OrderItem: res, } }() wg.Wait() //接收信息,也按带序号的结构体进行组装 orderSeqList := make([]OrderWithSeq, taskNum) timeoutTime := time.Second * 3 taskTimer := time.NewTimer(timeoutTime) for i:=0; i<taskNum; i++ { select { case order, ok := <-orderCh: //接收orderCh if ok { orderList = append(orderSeqList, order) } case err := <-errCh: //接收errCh if err != nil { return err } case <-taskTimer.C: //处理超时 err := errors.New("task timeout") return default: fmt.Println("done") } taskTimer.Reset(timeoutTime) }close(orderCh)close(errCh) //按原始顺序进行排序sort.Sort(BySeq(orderSeqList))


链接:https://juejin.cn/post/7301150860824854582

(版权归原作者所有,侵删)



微信扫码关注该文公众号作者

戳这里提交新闻线索和高质量文章给我们。
相关阅读
​CIKM 2023 | 为序列推荐引入自适应以及个性化全局协同信息的通用框架Science | 重大进展!利用经过基因改造的益生菌引导CAR-T细胞有望成为一种安全有效实体瘤的通用平台通用、a16z下场,“特斯拉”正在批量下水美国总统拜登父子姓名考「灌篮高手」模拟人形机器人,一比一照搬人类篮球招式,看一遍就能学会,无需特定任务的奖励阳光保险张晗:在未来的保险行业,百亿级专用模型将成为主流趋势ICCV 2023 | CLIP驱动的器官分割和肿瘤检测通用模型松下发布半坚固笔记本电脑 Toughbook 55 Mk3,采用模块化设计国庆黄金周的"黄金",英文是 gold 还是 golden?宇宙人(1352期)今日谷神星一号(遥十一)运载火箭飞行试验任务的情况说明;香港首颗高分遥感AI卫星下线将11月底发射;押解回国董宇辉最新演讲:我们想不出一套通用的家庭教育模板,但爱会引导你做一切正确的事情 | 精选2024年度载人航天飞行任务标识,正式发布!Heilongjiang Gymnasium Collapse Kills 3 Middle School Students2024年度载人航天飞行任务标识正式发布“多巴胺”的“胺”应读为àn,“2023年十大语文差错”发布golang string和[]byte的对比宇宙人(1389期)2024年度载人航天飞行任务标识正式发布;FAA将对SpaceX二次星舰试飞自爆事件展开调查;奥特曼加入微软彡字源考清华叉院提出「GenH2R」框架,用百万场景打造基于视觉的通用人机交接策略视觉模型+大语言模型:首个支持10K+帧长视频理解任务的新型框架【点击领取】中秋节PPT模板、中国风PPT模板,免费送~信息量很大!一文回顾神舟十七号载人飞行任务新闻发布会(附神十七航天员简历)艳遇好用的不通用,通用的不好用,金融落地大模型需要“专业型”选手“多巴胺”的“胺”不读ān?华美银行任命新的首席财务官和首席风险官雅诗兰黛集团上季度净销售额同比跌10%,中国内地香氛业务的增长部分弥补了护肤业务的下跌冰岛雷克雅未克(Reykjavík),地标景点YOLO再升级!华为诺亚提出Gold-YOLO,聚集-分发机制打造新SOTA通用汽车表示对其电动汽车业务的缓慢步伐感到不满首个精通3D任务的具身通才智能体:感知、推理、规划、行动统统拿下【注意】关于中国加入《取消外国公文书认证要求的公约》后使馆停办领事认证业务的通知NeurIPS 2023 | 超越YOLO系列!华为提出Gold-YOLO:实时目标检测新SOTA社会分工---邪恶与错误的放大器董宇辉最新演讲:我们想不出一套通用的家庭教育模板,但爱会引导你做一切正确的事情
logo
联系我们隐私协议©2024 redian.news
Redian新闻
Redian.news刊载任何文章,不代表同意其说法或描述,仅为提供更多信息,也不构成任何建议。文章信息的合法性及真实性由其作者负责,与Redian.news及其运营公司无关。欢迎投稿,如发现稿件侵权,或作者不愿在本网发表文章,请版权拥有者通知本网处理。