From 1f5cef17d29931bd7673a61e385d53b2395837e7 Mon Sep 17 00:00:00 2001 From: Julien Riou Date: Thu, 15 Apr 2021 15:13:00 +0200 Subject: [PATCH] Handle Twitter duplicates (#20) This commit adds a hash attribute to help identify duplicate messages. Tweets have a TweetID attribute for the initial thread identifier and a LastTweetID attribute to keep track of the last reply to eventually continue the thread if a duplicate is detected. Signed-off-by: Julien Riou --- README.md | 1 + config.go | 1 + main.go | 3 +- notifier_twitter.go | 157 ++++++++++++++++++++++++++++++++++---------- utils.go | 10 +++ utils_test.go | 28 ++++++++ 6 files changed, 166 insertions(+), 34 deletions(-) diff --git a/README.md b/README.md index d98ea97..d8884ed 100644 --- a/README.md +++ b/README.md @@ -164,6 +164,7 @@ Options: * `access_token_secret`: authentication token secret generated for your Twitter account * `hashtags`: list of key/value used to append hashtags to each tweet. Key is the pattern to match in the product name, value is the string to append to the tweet. For example, `{"twitter": {"hashtags": [{"rtx 3090": "#nvidia #rtx3090"}]}}` will detect `rtx 3090` to append `#nvidia #rtx3090` at the end of the tweet. * `enable_replies`: reply to original message when product is not available anymore + * `retention`: number of days to keep tweet references in the database (not deleted by default) * `telegram` (optional): * `channel_name`: send message to a channel (ex: `@channel`) * `chat_id`: send message to a chat (ex: `1234`) diff --git a/config.go b/config.go index eb8d330..3ae1b4f 100644 --- a/config.go +++ b/config.go @@ -33,6 +33,7 @@ type TwitterConfig struct { AccessTokenSecret string `json:"access_token_secret"` Hashtags []map[string]string `json:"hashtags"` EnableReplies bool `json:"enable_replies"` + Retention int `json:"retention"` } // TelegramConfig to store Telegram API key diff --git a/main.go b/main.go index cf74b0e..2370582 100644 --- a/main.go +++ b/main.go @@ -132,8 +132,9 @@ func main() { log.Debugf("found old product: %s", p.Name) if trx = db.Unscoped().Delete(&p); trx.Error != nil { log.Warnf("cannot remove stale product %s (%s): %s", p.Name, p.URL, trx.Error) + } else { + log.Logf("stale product %s (%s) removed from database", p.Name, p.URL) } - log.Printf("stale product %s (%s) removed from database", p.Name, p.URL) } } diff --git a/notifier_twitter.go b/notifier_twitter.go index 4456db9..c37bb04 100644 --- a/notifier_twitter.go +++ b/notifier_twitter.go @@ -1,6 +1,7 @@ package main import ( + "crypto/md5" "fmt" "regexp" "strings" @@ -19,9 +20,11 @@ const tweetMaxSize = 280 // Tweet to store relationship between a Product and a Twitter notification type Tweet struct { gorm.Model - TweetID int64 - ProductURL string - Product Product `gorm:"not null;references:URL"` + TweetID int64 `gorm:"not null;unique"` + Hash string `gorm:"unique"` + LastTweetID int64 `gorm:"index"` + ProductURL string `gorm:"index"` + Product Product `gorm:"not null;references:URL"` } // TwitterNotifier to manage notifications to Twitter @@ -31,6 +34,7 @@ type TwitterNotifier struct { user *twitter.User hashtagsMap []map[string]string enableReplies bool + retentionDays int } // NewTwitterNotifier creates a TwitterNotifier @@ -58,7 +62,46 @@ func NewTwitterNotifier(c *TwitterConfig, db *gorm.DB) (*TwitterNotifier, error) } log.Debugf("connected to twitter as @%s", user.ScreenName) - return &TwitterNotifier{client: client, user: user, hashtagsMap: c.Hashtags, db: db, enableReplies: c.EnableReplies}, nil + notifier := &TwitterNotifier{ + client: client, + user: user, + hashtagsMap: c.Hashtags, + db: db, + enableReplies: c.EnableReplies, + retentionDays: c.Retention, + } + + // delete old tweets + if err = notifier.ensureRetention(); err != nil { + return nil, err + } + + return notifier, nil + +} + +// ensureRetention deletes tweets according to the defined retention +func (c *TwitterNotifier) ensureRetention() error { + if c.retentionDays == 0 { + log.Debugf("tweet retention not found, skipping database cleanup") + return nil + } + + var oldTweets []Tweet + retentionDate := time.Now().Local().Add(-time.Hour * 24 * time.Duration(c.retentionDays)) + trx := c.db.Where("updated_at < ?", retentionDate).Find(&oldTweets) + if trx.Error != nil { + return fmt.Errorf("cannot find twitter old statuses: %s", trx.Error) + } + for _, t := range oldTweets { + log.Debugf("twitter old status found with id %d", t.TweetID) + if trx = c.db.Unscoped().Delete(&t); trx.Error != nil { + log.Warnf("cannot remove old tweet %d: %s", t.TweetID, trx.Error) + } else { + log.Infof("twitter old status %d removed from database", t.TweetID) + } + } + return nil } // create a brand new tweet @@ -98,26 +141,69 @@ func (c *TwitterNotifier) buildHashtags(productName string) string { // NotifyWhenAvailable create a Twitter status for announcing that a product is available // implements the Notifier interface func (c *TwitterNotifier) NotifyWhenAvailable(shopName string, productName string, productPrice float64, productCurrency string, productURL string) error { - // TODO: check if message exists in the database to avoid flood + // format message hashtags := c.buildHashtags(productName) message := formatAvailableTweet(shopName, productName, productPrice, productCurrency, productURL, hashtags) - // create thread - tweetID, err := c.createTweet(message) - if err != nil { - return fmt.Errorf("failed to create new twitter thread: %s", err) - } - log.Infof("tweet %d sent", tweetID) - // save thread to database - t := Tweet{TweetID: tweetID, ProductURL: productURL} - trx := c.db.Create(&t) - if trx.Error != nil { - return fmt.Errorf("failed to save tweet %d to database: %s", t.TweetID, trx.Error) + // compute message checksum to avoid duplicates + var tweet Tweet + hash := fmt.Sprintf("%x", md5.Sum([]byte(message))) + trx := c.db.Where(Tweet{Hash: hash}).First(&tweet) + if trx.Error != nil && trx.Error != gorm.ErrRecordNotFound { + return fmt.Errorf("could not search for tweet with hash %s for product '%s': %s", hash, productURL, trx.Error) } - log.Debugf("tweet %d saved to database", t.TweetID) + + if trx.Error == gorm.ErrRecordNotFound { + + // tweet has not been sent in the past + // create thread + tweetID, err := c.createTweet(message) + if err != nil { + return fmt.Errorf("could not create new twitter thread for product '%s': %s", productURL, err) + } + log.Infof("tweet %d sent for product '%s'", tweetID, productURL) + + // save thread to database + tweet = Tweet{TweetID: tweetID, ProductURL: productURL, Hash: hash} + trx = c.db.Create(&tweet) + if trx.Error != nil { + return fmt.Errorf("could not save tweet %d to database for product '%s': %s", tweet.TweetID, productURL, trx.Error) + } + log.Debugf("tweet %d saved to database", tweet.TweetID) + + } else { + + if !c.enableReplies { + log.Debugf("twitter replies are disabled, skipping available notification for product '%s'", productURL) + return nil + } + + // select tweet to reply + lastTweetID := CoalesceInt64(tweet.LastTweetID, tweet.TweetID) + if lastTweetID == 0 { + return fmt.Errorf("could not find original tweet ID to create reply for product '%s'", productURL) + } + + // tweet already has been sent in the past and replies are enabled + // continuing thread + tweetID, err := c.replyToTweet(lastTweetID, "Good news, it's available again!") + if err != nil { + return fmt.Errorf("could not reply to tweet %d for product '%s': %s", lastTweetID, productURL, err) + } + log.Infof("reply to tweet %d sent with id %d for product '%s'", lastTweetID, tweetID, productURL) + + // save thread to database + tweet.LastTweetID = tweetID + if trx = c.db.Save(&tweet); trx.Error != nil { + return fmt.Errorf("could not save tweet %d to database for product '%s': %s", tweet.TweetID, productURL, trx.Error) + } + log.Debugf("tweet %d saved in database", tweet.TweetID) + } + return nil } +// formatAvailableTweet creates a message based on product characteristics func formatAvailableTweet(shopName string, productName string, productPrice float64, productCurrency string, productURL string, hashtags string) string { // format message formattedPrice := formatPrice(productPrice, productCurrency) @@ -140,32 +226,37 @@ func (c *TwitterNotifier) NotifyWhenNotAvailable(productURL string, duration tim // find Tweet in the database var tweet Tweet trx := c.db.Where(Tweet{ProductURL: productURL}).First(&tweet) + if trx.Error != nil { - return fmt.Errorf("failed to find tweet in database for product with url %s: %s", productURL, trx.Error) - } - if tweet.TweetID == 0 { - log.Warnf("tweet for product with url %s not found, skipping close notification", productURL) - return nil + return fmt.Errorf("could not find tweet for product '%s' in the database: %s", productURL, trx.Error) } if c.enableReplies { // format message message := fmt.Sprintf("And it's gone (%s)", duration) - // close thread on twitter - _, err := c.replyToTweet(tweet.TweetID, message) - if err != nil { - return fmt.Errorf("failed to create reply tweet: %s", err) + // select tweet to reply + lastTweetID := CoalesceInt64(tweet.LastTweetID, tweet.TweetID) + if lastTweetID == 0 { + return fmt.Errorf("could not find original tweet ID to create reply for product '%s'", productURL) } - log.Infof("reply to tweet %d sent", tweet.TweetID) - } - // remove tweet from database - trx = c.db.Unscoped().Delete(&tweet) - if trx.Error != nil { - return fmt.Errorf("failed to remove tweet %d from database: %s", tweet.TweetID, trx.Error) + // close thread on twitter + tweetID, err := c.replyToTweet(lastTweetID, message) + if err != nil { + return fmt.Errorf("could not close thread on twitter for product '%s': %s", productURL, err) + } + log.Infof("reply to tweet %d sent with id %d for product '%s'", lastTweetID, tweetID, productURL) + + // save tweet id on database + tweet.LastTweetID = tweetID + if trx = c.db.Save(&tweet); trx.Error != nil { + return fmt.Errorf("could not save tweet %d to database for product '%s': %s", tweet.TweetID, productURL, trx.Error) + } + log.Debugf("tweet %d saved in database", tweet.TweetID) + } else { + log.Debugf("twitter replies are disabled, skipping not available notification for '%s'", productURL) } - log.Debugf("tweet removed from database") return nil } diff --git a/utils.go b/utils.go index cdd5b93..0e22eca 100644 --- a/utils.go +++ b/utils.go @@ -26,3 +26,13 @@ func ContainsString(arr []string, str string) bool { } return false } + +// CoalesceInt64 returns the first non zero value from variadic int64 arguments +func CoalesceInt64(values ...int64) int64 { + for _, value := range values { + if value != 0 { + return value + } + } + return 0 +} diff --git a/utils_test.go b/utils_test.go index d0e3642..b03381e 100644 --- a/utils_test.go +++ b/utils_test.go @@ -31,3 +31,31 @@ func TestExtractShopName(t *testing.T) { }) } } + +func TestCoalesceInt64(t *testing.T) { + tests := []struct { + arguments []int64 + expected int64 + }{ + // single value + {[]int64{0}, 0}, + {[]int64{99}, 99}, + + // multiple values + {[]int64{0, 0}, 0}, + {[]int64{0, 99}, 99}, + {[]int64{99, 0}, 99}, + {[]int64{99, 99}, 99}, + } + + for i, tc := range tests { + t.Run(fmt.Sprintf("TestExtractShopName#%d", i), func(t *testing.T) { + result := CoalesceInt64(tc.arguments...) + if result != tc.expected { + t.Errorf("for %+v: got %d, want %d", tc.arguments, result, tc.expected) + } else { + t.Logf("for %+v: got %d, want %d", tc.arguments, result, tc.expected) + } + }) + } +}