From c4f385486153d6acf96c72a056deb40afb4706ef Mon Sep 17 00:00:00 2001 From: Julien Riou Date: Sun, 10 Oct 2021 20:56:11 +0200 Subject: [PATCH] feat: Add offline/online workers notifications (#1) Signed-off-by: Julien Riou --- README.md | 1 + client.go | 68 ++++++++++++++++++++++++++++++++++++-- config.go | 7 ++-- db.go | 18 ++++++++++ flexassistant.yaml.example | 2 ++ main.go | 47 ++++++++++++++++++++++++++ miner.go | 25 ++++++++++++++ notification.go | 13 ++++++++ 8 files changed, 175 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index 327421f..fffd450 100644 --- a/README.md +++ b/README.md @@ -73,6 +73,7 @@ Reference: * `address`: address of the miner or the farmer registered on the API * `enable-balance` (optional): enable balance notifications (disabled by default) * `enable-payments` (optional): enable payments notifications (disabled by default) + * `enable-offline-workers` (optional): enable offline/online notifications for associated workers (disabled by default) * `telegram`: Telegram configuration * `token`: token of the Telegram bot * `chat-id` (optional if `channel-name` is present): chat identifier to send Telegram notifications diff --git a/client.go b/client.go index 674b8f4..155515f 100644 --- a/client.go +++ b/client.go @@ -55,10 +55,40 @@ func (f *FlexpoolClient) request(url string) (result map[string]interface{}, err json.Unmarshal(jsonBody, &result) - if result["error"] == nil { - return result["result"].(map[string]interface{}), nil + if result["error"] != nil { + return nil, fmt.Errorf("Flexpool API error: %s", result["error"].(string)) } - return nil, fmt.Errorf("Flexpool API error: %s", result["error"].(string)) + return result["result"].(map[string]interface{}), nil +} + +// requestBytes to create an HTTPS request, call the Flexpool API, detect errors and return the result in bytes +func (f *FlexpoolClient) requestBytes(url string) ([]byte, error) { + log.Debugf("Requesting %s", url) + + request, err := http.NewRequest("GET", url, nil) + if err != nil { + return nil, err + } + request.Header.Set("User-Agent", UserAgent) + + resp, err := f.client.Do(request) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + jsonBody, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil, err + } + + var result map[string]interface{} + json.Unmarshal(jsonBody, &result) + + if result["error"] != nil { + return nil, fmt.Errorf("Flexpool API error: %s", result["error"].(string)) + } + return jsonBody, nil } // MinerBalance returns the current unpaid balance @@ -102,6 +132,38 @@ func (f *FlexpoolClient) MinerPayments(coin string, address string, limit int) ( } } +// WorkersResponse represents the JSON structure of the Flexpool API response for workers +type WorkersResponse struct { + Error string `json:"error"` + Result []struct { + Name string `json:"name"` + IsOnline bool `json:"isOnline"` + LastSteen int64 `json:"lastSeen"` + } `json:"result"` +} + +// MinerWorkers returns a list of workers given a miner address +func (f *FlexpoolClient) MinerWorkers(coin string, address string) (workers []*Worker, err error) { + body, err := f.requestBytes(fmt.Sprintf("%s/miner/workers?coin=%s&address=%s", FlexpoolAPIURL, coin, address)) + if err != nil { + return nil, err + } + + var response WorkersResponse + json.Unmarshal(body, &response) + + for _, result := range response.Result { + worker := NewWorker( + address, + result.Name, + result.IsOnline, + time.Unix(result.LastSteen, 0), + ) + workers = append(workers, worker) + } + return workers, nil +} + // PoolBlocks returns an ordered list of blocks func (f *FlexpoolClient) PoolBlocks(coin string, limit int) (blocks []*Block, err error) { page := 0 diff --git a/config.go b/config.go index 553ed33..0fdf836 100644 --- a/config.go +++ b/config.go @@ -24,9 +24,10 @@ type PoolConfig struct { // MinerConfig to store Miner configuration type MinerConfig struct { - Address string `yaml:"address"` - EnableBalance bool `yaml:"enable-balance"` - EnablePayments bool `yaml:"enable-payments"` + Address string `yaml:"address"` + EnableBalance bool `yaml:"enable-balance"` + EnablePayments bool `yaml:"enable-payments"` + EnableOfflineWorkers bool `yaml:"enable-offline-workers"` } // TelegramConfig to store Telegram configuration diff --git a/db.go b/db.go index ada9ce8..15d5948 100644 --- a/db.go +++ b/db.go @@ -1,6 +1,9 @@ package main import ( + "time" + + log "github.com/sirupsen/logrus" "gorm.io/driver/sqlite" "gorm.io/gorm" ) @@ -15,8 +18,23 @@ func CreateDatabaseObjects(db *gorm.DB) error { if err := db.AutoMigrate(&Miner{}); err != nil { return err } + if err := db.AutoMigrate(&Worker{}); err != nil { + return err + } if err := db.AutoMigrate(&Pool{}); err != nil { return err } return nil } + +// EnsureDatabaseRetention removes stale objects from database +func EnsureDatabaseRetention(db *gorm.DB) error { + log.Debugf("Deleting inactive workers") + lastWeek := time.Now().AddDate(0, 0, -7) + var worker *Worker + trx := db.Unscoped().Where("last_seen < ?", lastWeek).Delete(&worker) + if trx.Error != nil { + return trx.Error + } + return nil +} diff --git a/flexassistant.yaml.example b/flexassistant.yaml.example index 7320ac4..066f713 100644 --- a/flexassistant.yaml.example +++ b/flexassistant.yaml.example @@ -6,9 +6,11 @@ miners: - address: 0x0000000000000000000000000000000000000000 enable-balance: true enable-payments: true + enable-offline-workers: true - address: xch00000000000000000000000000000000000000000000000000000000000 enable-balance: true enable-payments: true + enable-offline-workers: true pools: - coin: eth enable-blocks: true diff --git a/main.go b/main.go index d25d02d..78b0173 100644 --- a/main.go +++ b/main.go @@ -76,6 +76,10 @@ func main() { log.Fatalf("Could not create objects: %v", err) } + if err := EnsureDatabaseRetention(db); err != nil { + log.Fatalf("Could not cleanup objects from database: %v", err) + } + // API client client := NewFlexpoolClient() @@ -181,6 +185,49 @@ func main() { } } } + + // Offline workers management + if configuredMiner.EnableOfflineWorkers { + log.Debugf("Fetching workers for %s", miner) + + workers, err := client.MinerWorkers(miner.Coin, miner.Address) + if err != nil { + log.Warnf("Could not fetch workers: %v", err) + continue + } + for _, worker := range workers { + log.Debugf("Fetched %s", worker) + + var dbWorker Worker + trx := db.Where(Worker{MinerAddress: miner.Address, Name: worker.Name}).Attrs(Worker{MinerAddress: miner.Address, Name: worker.Name}).FirstOrCreate(&dbWorker) + if trx.Error != nil { + log.Warnf("Cannot fetch worker %s from database: %v", worker, trx.Error) + continue + } + + if dbWorker.IsOnline != worker.IsOnline { + // Skip first notification + notify := true + if dbWorker.LastSeen.IsZero() { + notify = false + } + dbWorker.IsOnline = worker.IsOnline + dbWorker.LastSeen = worker.LastSeen + if trx = db.Save(&dbWorker); trx.Error != nil { + log.Warnf("Cannot update worker: %v", trx.Error) + continue + } + if notify { + err = notifier.NotifyOfflineWorker(*worker) + if err != nil { + log.Warnf("Cannot send notification: %v", err) + continue + } + log.Infof("Offline worker notification sent for %s", worker) + } + } + } + } } // Handle pools diff --git a/miner.go b/miner.go index 980d841..a91d275 100644 --- a/miner.go +++ b/miner.go @@ -3,6 +3,7 @@ package main import ( "fmt" "strings" + "time" "gorm.io/gorm" ) @@ -72,3 +73,27 @@ func NewPayment(hash string, value float64, timestamp float64) *Payment { func (p *Payment) String() string { return fmt.Sprintf("Payment<%s>", p.Hash) } + +// Worker to store workers attributes +type Worker struct { + gorm.Model + MinerAddress string `gorm:"not null"` + Name string `gorm:"not null"` + IsOnline bool `gorm:"not null"` + LastSeen time.Time `gorm:"not null"` +} + +// NewWorker creates a Worker +func NewWorker(minerAddress string, name string, isOnline bool, lastSeen time.Time) *Worker { + return &Worker{ + MinerAddress: minerAddress, + Name: name, + IsOnline: isOnline, + LastSeen: lastSeen, + } +} + +// String represents Worker to a printable format +func (w *Worker) String() string { + return fmt.Sprintf("Worker<%s>", w.Name) +} diff --git a/notification.go b/notification.go index a4e3713..f2efd8f 100644 --- a/notification.go +++ b/notification.go @@ -15,6 +15,7 @@ type Notifier interface { NotifyBalance(miner Miner, difference float64) error NotifyPayment(miner Miner, payment Payment) error NotifyBlock(pool Pool, block Block) error + NotifyOfflineWorker(worker Worker) error } // TelegramNotifier to send notifications using Telegram @@ -121,3 +122,15 @@ func (t *TelegramNotifier) NotifyBlock(pool Pool, block Block) error { message := fmt.Sprintf("🎉 *%s* [#%.0f](%s) _%s_", verb, block.Number, url, ac.FormatMoney(convertedValue)) return t.sendMessage(message) } + +// NotifyOfflineWorker sends a message when a worker is online or offline +func (t *TelegramNotifier) NotifyOfflineWorker(worker Worker) error { + stateIcon := "🟢" + stateMessage := "online" + if !worker.IsOnline { + stateIcon = "🔴" + stateMessage = "offline" + } + message := fmt.Sprintf("%s *Worker* _%s_ is %s", stateIcon, worker.Name, stateMessage) + return t.sendMessage(message) +}