From 89946ffd6267fedb7d7a9398b384d8a2166782cf Mon Sep 17 00:00:00 2001 From: iyear Date: Sat, 19 Jun 2021 13:02:49 +0800 Subject: [PATCH] Use goroutine to speed up the sign speed --- bots/sender.go | 86 ++++++++++++++++++++++++++++ bots/task.go | 148 +++++++++++++++++++++++++++++++++++++++++++++++++ task/sign.go | 59 ++++++++++++++++++++ 3 files changed, 293 insertions(+) create mode 100644 bots/sender.go create mode 100644 bots/task.go create mode 100644 task/sign.go diff --git a/bots/sender.go b/bots/sender.go new file mode 100644 index 0000000..8dcc8ed --- /dev/null +++ b/bots/sender.go @@ -0,0 +1,86 @@ +package bots + +import ( + "fmt" + "go.uber.org/zap" + "gopkg.in/tucnak/telebot.v2" + "strconv" + "sync" +) + +type Sender struct { + done chan struct{} + in chan *Msg + wg sync.WaitGroup +} +type Msg struct { + To telebot.Recipient + What interface{} + Options []interface{} +} + +//牺牲错误处理和解耦,加快发送速度 + +func NewSender() *Sender { + return &Sender{} +} +func (s *Sender) Init(goroutine int) { + + s.done = make(chan struct{}) + s.in = make(chan *Msg) + + for i := 0; i < goroutine; i++ { + go func() { + s.sender() + }() + } +} + +//Stop until all messages were sent +func (s *Sender) Stop() { + s.wg.Wait() + close(s.done) + fmt.Println("task finished") +} +func (s *Sender) SendMessageByID(ID int64, what interface{}, options ...interface{}) { + s.wg.Add(1) + go func() { + chat, err := bot.ChatByID(strconv.FormatInt(ID, 10)) + if err != nil { + zap.S().Errorw("failed to get chat", + "error", err, + "id", ID, + ) + s.wg.Done() + return + } + s.SendMessage(chat, what, options...) + }() +} +func (s *Sender) SendMessage(to telebot.Recipient, what interface{}, options ...interface{}) { + s.in <- &Msg{ + To: to, + What: what, + Options: options, + } +} +func (s *Sender) sender() { + for { + select { + case msg, f := <-s.in: + if !f { + continue + } + fmt.Println("send to " + msg.To.Recipient()) + if _, err := bot.Send(msg.To, msg.What, msg.Options...); err != nil { + zap.S().Errorw("failed to send msg", + "error", err, + "id", msg.To.Recipient(), + ) + } + s.wg.Done() + case <-s.done: + return + } + } +} diff --git a/bots/task.go b/bots/task.go new file mode 100644 index 0000000..fb655bc --- /dev/null +++ b/bots/task.go @@ -0,0 +1,148 @@ +package bots + +import ( + "fmt" + "github.com/iyear/E5SubBot/config" + "github.com/iyear/E5SubBot/model" + "github.com/iyear/E5SubBot/task" + "github.com/robfig/cron/v3" + "go.uber.org/zap" + tb "gopkg.in/tucnak/telebot.v2" + "strconv" + "time" +) + +var errorTimes map[int]int +var signErr map[int64]int +var unbindUsers []int64 +var msgSender *Sender + +func InitTask() { + errorTimes = make(map[int]int) + msgSender = NewSender() + + c := cron.New() + c.AddFunc(config.Cron, SignTask) + c.Start() +} +func SignTask() { + msgSender.Init(config.MaxGoroutines) + + signErr = make(map[int64]int) + unbindUsers = nil + + var clients []*model.Client + if result := model.DB.Find(&clients); result.Error != nil { + zap.S().Errorw("failed to get all clients", + "error", result.Error) + return + } + + fmt.Printf("clients: %d goroutines:%d\n", + len(clients), + config.MaxGoroutines, + ) + + start := time.Now() + + errClients := task.Sign(clients) + + for _, errClient := range errClients { + if errClient.Err != nil { + opErrorSign(errClient) + continue + } + //请求一次成功清零errorTimes,避免接口的偶然错误积累导致账号被清退 + errorTimes[errClient.ID] = 0 + model.DB.Save(&errClient.Client) + } + + //fmt.Println(signErr) + //fmt.Println(errorTimes) + timeSpending := time.Since(start).Seconds() + summarySignTaskForUsers(errClients) + summarySignTaskForAdmins(errClients, timeSpending) + + msgSender.Stop() +} + +func summarySignTaskForAdmins(errClients []*model.ErrClient, timeSpending float64) { + var Count = len(errClients) + var ErrCount int + var ErrUserStr string + var UnbindUserStr string + for err, count := range signErr { + ErrCount += count + ErrUserStr += fmt.Sprintf("[%d](tg://user?id=%d)\n", err, err) + } + for _, unbindUser := range unbindUsers { + UnbindUserStr += fmt.Sprintf("[%d](tg://user?id=%d)\n", unbindUser, unbindUser) + } + for _, admin := range config.Admins { + a := admin + msgSender.SendMessageByID(a, fmt.Sprintf("任务反馈(管理员)\n完成时间: %s\n用时: %.2fs\n结果: %d/%d\n错误账户: \n%s\n清退账户: \n%s", + time.Now().Format("2006-01-02 15:04:05"), + timeSpending, + Count-ErrCount, Count, + ErrUserStr, UnbindUserStr, + ), + tb.ModeMarkdown, + ) + } +} +func summarySignTaskForUsers(errClients []*model.ErrClient) { + + var isSent map[int64]bool + isSent = make(map[int64]bool) + + for _, errClient := range errClients { + errClient := errClient + //pending SignErrNum + if errorTimes[errClient.ID] > config.MaxErrTimes { + if result := model.DB.Delete(&errClient.Client); result.Error != nil { + zap.S().Errorw("failed to delete data", + "error", result.Error, + "id", errClient.ID, + ) + continue + } + + unbindUsers = append(unbindUsers, errClient.TgId) + + msgSender.SendMessageByID(errClient.TgId, fmt.Sprintf("您的账户因达到错误上限而被自动解绑\n后会有期!\n\n别名: %s\nclient_id: %s\nclient_secret: %s", + errClient.Alias, + errClient.ClientId, + errClient.ClientSecret, + )) + continue + + } + if isSent[errClient.TgId] { + continue + } + signOK := GetBindNum(errClient.TgId) - signErr[errClient.TgId] + + msgSender.SendMessageByID(errClient.TgId, + fmt.Sprintf("任务反馈\n时间: %s\n结果:%d/%d", + time.Now().Format("2006-01-02 15:04:05"), + signOK, + signErr[errClient.TgId]+signOK, + ), + ) + isSent[errClient.TgId] = true + time.Sleep(time.Millisecond * 100) + } +} +func opErrorSign(errClient *model.ErrClient) { + errorTimes[errClient.ID]++ + signErr[errClient.TgId]++ + + UnBindBtn := tb.InlineButton{Unique: "un" + errClient.MsId, Text: "点击解绑", Data: strconv.Itoa(errClient.ID)} + bot.Handle(&UnBindBtn, bUnBindInlineBtn) + + msgSender.SendMessageByID(errClient.TgId, + fmt.Sprintf("您的帐户 %s 在执行时出现了错误\n您可以选择解绑该用户\n错误: %s", + errClient.Alias, errClient.Err), + &tb.ReplyMarkup{InlineKeyboard: [][]tb.InlineButton{{UnBindBtn}}}, + ) +} diff --git a/task/sign.go b/task/sign.go new file mode 100644 index 0000000..535db56 --- /dev/null +++ b/task/sign.go @@ -0,0 +1,59 @@ +package task + +import ( + "fmt" + "github.com/iyear/E5SubBot/config" + "github.com/iyear/E5SubBot/model" + "go.uber.org/zap" +) + +func Sign(clients []*model.Client) []*model.ErrClient { + var errClients []*model.ErrClient + + done := make(chan struct{}) + in := make(chan *model.ErrClient, 5) + out := make(chan *model.ErrClient, 5) + + go func() { + for _, client := range clients { + in <- &model.ErrClient{ + Client: client, + Err: nil, + } + } + close(in) + }() + for i := 0; i < config.MaxGoroutines; i++ { + go func() { + for { + select { + case errCli, f := <-in: + if !f { + continue + } + + err := errCli.GetOutlookMails() + errCli.Err = err + out <- errCli + case <-done: + return + } + } + }() + } + for i := 0; i < len(clients); i++ { + errClient := <-out + if errClient.Err == nil { + fmt.Printf("%s OK\n", errClient.MsId) + } else { + zap.S().Errorw("failed to sign", + "error", errClient.Err, + "id", errClient.ID, + ) + //fmt.Printf("%s %s\n",errClient.MsId,errClient.Err) + } + errClients = append(errClients, errClient) + } + close(done) + return errClients +}