Archived
1
0
Fork 0

feat: Add offline/online workers notifications (#1)

Signed-off-by: Julien Riou <julien@riou.xyz>
This commit is contained in:
Julien Riou 2021-10-10 20:56:11 +02:00
parent 47eed6fbee
commit c4f3854861
No known key found for this signature in database
GPG key ID: FF42D23B580C89F7
8 changed files with 175 additions and 6 deletions

View file

@ -73,6 +73,7 @@ Reference:
* `address`: address of the miner or the farmer registered on the API * `address`: address of the miner or the farmer registered on the API
* `enable-balance` (optional): enable balance notifications (disabled by default) * `enable-balance` (optional): enable balance notifications (disabled by default)
* `enable-payments` (optional): enable payments notifications (disabled by default) * `enable-payments` (optional): enable payments notifications (disabled by default)
* `enable-offline-workers` (optional): enable offline/online notifications for associated workers (disabled by default)
* `telegram`: Telegram configuration * `telegram`: Telegram configuration
* `token`: token of the Telegram bot * `token`: token of the Telegram bot
* `chat-id` (optional if `channel-name` is present): chat identifier to send Telegram notifications * `chat-id` (optional if `channel-name` is present): chat identifier to send Telegram notifications

View file

@ -55,10 +55,40 @@ func (f *FlexpoolClient) request(url string) (result map[string]interface{}, err
json.Unmarshal(jsonBody, &result) json.Unmarshal(jsonBody, &result)
if result["error"] == nil { if result["error"] != nil {
return result["result"].(map[string]interface{}), nil return nil, fmt.Errorf("Flexpool API error: %s", result["error"].(string))
} }
return nil, fmt.Errorf("Flexpool API error: %s", result["error"].(string)) return result["result"].(map[string]interface{}), nil
}
// requestBytes to create an HTTPS request, call the Flexpool API, detect errors and return the result in bytes
func (f *FlexpoolClient) requestBytes(url string) ([]byte, error) {
log.Debugf("Requesting %s", url)
request, err := http.NewRequest("GET", url, nil)
if err != nil {
return nil, err
}
request.Header.Set("User-Agent", UserAgent)
resp, err := f.client.Do(request)
if err != nil {
return nil, err
}
defer resp.Body.Close()
jsonBody, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
var result map[string]interface{}
json.Unmarshal(jsonBody, &result)
if result["error"] != nil {
return nil, fmt.Errorf("Flexpool API error: %s", result["error"].(string))
}
return jsonBody, nil
} }
// MinerBalance returns the current unpaid balance // MinerBalance returns the current unpaid balance
@ -102,6 +132,38 @@ func (f *FlexpoolClient) MinerPayments(coin string, address string, limit int) (
} }
} }
// WorkersResponse represents the JSON structure of the Flexpool API response for workers
type WorkersResponse struct {
Error string `json:"error"`
Result []struct {
Name string `json:"name"`
IsOnline bool `json:"isOnline"`
LastSteen int64 `json:"lastSeen"`
} `json:"result"`
}
// MinerWorkers returns a list of workers given a miner address
func (f *FlexpoolClient) MinerWorkers(coin string, address string) (workers []*Worker, err error) {
body, err := f.requestBytes(fmt.Sprintf("%s/miner/workers?coin=%s&address=%s", FlexpoolAPIURL, coin, address))
if err != nil {
return nil, err
}
var response WorkersResponse
json.Unmarshal(body, &response)
for _, result := range response.Result {
worker := NewWorker(
address,
result.Name,
result.IsOnline,
time.Unix(result.LastSteen, 0),
)
workers = append(workers, worker)
}
return workers, nil
}
// PoolBlocks returns an ordered list of blocks // PoolBlocks returns an ordered list of blocks
func (f *FlexpoolClient) PoolBlocks(coin string, limit int) (blocks []*Block, err error) { func (f *FlexpoolClient) PoolBlocks(coin string, limit int) (blocks []*Block, err error) {
page := 0 page := 0

View file

@ -24,9 +24,10 @@ type PoolConfig struct {
// MinerConfig to store Miner configuration // MinerConfig to store Miner configuration
type MinerConfig struct { type MinerConfig struct {
Address string `yaml:"address"` Address string `yaml:"address"`
EnableBalance bool `yaml:"enable-balance"` EnableBalance bool `yaml:"enable-balance"`
EnablePayments bool `yaml:"enable-payments"` EnablePayments bool `yaml:"enable-payments"`
EnableOfflineWorkers bool `yaml:"enable-offline-workers"`
} }
// TelegramConfig to store Telegram configuration // TelegramConfig to store Telegram configuration

18
db.go
View file

@ -1,6 +1,9 @@
package main package main
import ( import (
"time"
log "github.com/sirupsen/logrus"
"gorm.io/driver/sqlite" "gorm.io/driver/sqlite"
"gorm.io/gorm" "gorm.io/gorm"
) )
@ -15,8 +18,23 @@ func CreateDatabaseObjects(db *gorm.DB) error {
if err := db.AutoMigrate(&Miner{}); err != nil { if err := db.AutoMigrate(&Miner{}); err != nil {
return err return err
} }
if err := db.AutoMigrate(&Worker{}); err != nil {
return err
}
if err := db.AutoMigrate(&Pool{}); err != nil { if err := db.AutoMigrate(&Pool{}); err != nil {
return err return err
} }
return nil return nil
} }
// EnsureDatabaseRetention removes stale objects from database
func EnsureDatabaseRetention(db *gorm.DB) error {
log.Debugf("Deleting inactive workers")
lastWeek := time.Now().AddDate(0, 0, -7)
var worker *Worker
trx := db.Unscoped().Where("last_seen < ?", lastWeek).Delete(&worker)
if trx.Error != nil {
return trx.Error
}
return nil
}

View file

@ -6,9 +6,11 @@ miners:
- address: 0x0000000000000000000000000000000000000000 - address: 0x0000000000000000000000000000000000000000
enable-balance: true enable-balance: true
enable-payments: true enable-payments: true
enable-offline-workers: true
- address: xch00000000000000000000000000000000000000000000000000000000000 - address: xch00000000000000000000000000000000000000000000000000000000000
enable-balance: true enable-balance: true
enable-payments: true enable-payments: true
enable-offline-workers: true
pools: pools:
- coin: eth - coin: eth
enable-blocks: true enable-blocks: true

47
main.go
View file

@ -76,6 +76,10 @@ func main() {
log.Fatalf("Could not create objects: %v", err) log.Fatalf("Could not create objects: %v", err)
} }
if err := EnsureDatabaseRetention(db); err != nil {
log.Fatalf("Could not cleanup objects from database: %v", err)
}
// API client // API client
client := NewFlexpoolClient() client := NewFlexpoolClient()
@ -181,6 +185,49 @@ func main() {
} }
} }
} }
// Offline workers management
if configuredMiner.EnableOfflineWorkers {
log.Debugf("Fetching workers for %s", miner)
workers, err := client.MinerWorkers(miner.Coin, miner.Address)
if err != nil {
log.Warnf("Could not fetch workers: %v", err)
continue
}
for _, worker := range workers {
log.Debugf("Fetched %s", worker)
var dbWorker Worker
trx := db.Where(Worker{MinerAddress: miner.Address, Name: worker.Name}).Attrs(Worker{MinerAddress: miner.Address, Name: worker.Name}).FirstOrCreate(&dbWorker)
if trx.Error != nil {
log.Warnf("Cannot fetch worker %s from database: %v", worker, trx.Error)
continue
}
if dbWorker.IsOnline != worker.IsOnline {
// Skip first notification
notify := true
if dbWorker.LastSeen.IsZero() {
notify = false
}
dbWorker.IsOnline = worker.IsOnline
dbWorker.LastSeen = worker.LastSeen
if trx = db.Save(&dbWorker); trx.Error != nil {
log.Warnf("Cannot update worker: %v", trx.Error)
continue
}
if notify {
err = notifier.NotifyOfflineWorker(*worker)
if err != nil {
log.Warnf("Cannot send notification: %v", err)
continue
}
log.Infof("Offline worker notification sent for %s", worker)
}
}
}
}
} }
// Handle pools // Handle pools

View file

@ -3,6 +3,7 @@ package main
import ( import (
"fmt" "fmt"
"strings" "strings"
"time"
"gorm.io/gorm" "gorm.io/gorm"
) )
@ -72,3 +73,27 @@ func NewPayment(hash string, value float64, timestamp float64) *Payment {
func (p *Payment) String() string { func (p *Payment) String() string {
return fmt.Sprintf("Payment<%s>", p.Hash) return fmt.Sprintf("Payment<%s>", p.Hash)
} }
// Worker to store workers attributes
type Worker struct {
gorm.Model
MinerAddress string `gorm:"not null"`
Name string `gorm:"not null"`
IsOnline bool `gorm:"not null"`
LastSeen time.Time `gorm:"not null"`
}
// NewWorker creates a Worker
func NewWorker(minerAddress string, name string, isOnline bool, lastSeen time.Time) *Worker {
return &Worker{
MinerAddress: minerAddress,
Name: name,
IsOnline: isOnline,
LastSeen: lastSeen,
}
}
// String represents Worker to a printable format
func (w *Worker) String() string {
return fmt.Sprintf("Worker<%s>", w.Name)
}

View file

@ -15,6 +15,7 @@ type Notifier interface {
NotifyBalance(miner Miner, difference float64) error NotifyBalance(miner Miner, difference float64) error
NotifyPayment(miner Miner, payment Payment) error NotifyPayment(miner Miner, payment Payment) error
NotifyBlock(pool Pool, block Block) error NotifyBlock(pool Pool, block Block) error
NotifyOfflineWorker(worker Worker) error
} }
// TelegramNotifier to send notifications using Telegram // TelegramNotifier to send notifications using Telegram
@ -121,3 +122,15 @@ func (t *TelegramNotifier) NotifyBlock(pool Pool, block Block) error {
message := fmt.Sprintf("🎉 *%s* [#%.0f](%s) _%s_", verb, block.Number, url, ac.FormatMoney(convertedValue)) message := fmt.Sprintf("🎉 *%s* [#%.0f](%s) _%s_", verb, block.Number, url, ac.FormatMoney(convertedValue))
return t.sendMessage(message) return t.sendMessage(message)
} }
// NotifyOfflineWorker sends a message when a worker is online or offline
func (t *TelegramNotifier) NotifyOfflineWorker(worker Worker) error {
stateIcon := "🟢"
stateMessage := "online"
if !worker.IsOnline {
stateIcon = "🔴"
stateMessage = "offline"
}
message := fmt.Sprintf("%s *Worker* _%s_ is %s", stateIcon, worker.Name, stateMessage)
return t.sendMessage(message)
}