Handle Twitter duplicates (#20)
This commit adds a hash attribute to help identify duplicate messages. Tweets have a TweetID attribute for the initial thread identifier and a LastTweetID attribute to keep track of the last reply to eventually continue the thread if a duplicate is detected. Signed-off-by: Julien Riou <julien@riou.xyz>
This commit is contained in:
		
					parent
					
						
							
								9629483953
							
						
					
				
			
			
				commit
				
					
						1f5cef17d2
					
				
			
		
					 6 changed files with 166 additions and 34 deletions
				
			
		|  | @ -164,6 +164,7 @@ Options: | ||||||
|     * `access_token_secret`: authentication token secret generated for your Twitter account |     * `access_token_secret`: authentication token secret generated for your Twitter account | ||||||
|     * `hashtags`: list of key/value used to append hashtags to each tweet. Key is the pattern to match in the product name, value is the string to append to the tweet. For example, `{"twitter": {"hashtags": [{"rtx 3090": "#nvidia #rtx3090"}]}}` will detect `rtx 3090` to append `#nvidia #rtx3090` at the end of the tweet. |     * `hashtags`: list of key/value used to append hashtags to each tweet. Key is the pattern to match in the product name, value is the string to append to the tweet. For example, `{"twitter": {"hashtags": [{"rtx 3090": "#nvidia #rtx3090"}]}}` will detect `rtx 3090` to append `#nvidia #rtx3090` at the end of the tweet. | ||||||
|     * `enable_replies`: reply to original message when product is not available anymore |     * `enable_replies`: reply to original message when product is not available anymore | ||||||
|  |     * `retention`: number of days to keep tweet references in the database (not deleted by default) | ||||||
| * `telegram` (optional): | * `telegram` (optional): | ||||||
|     * `channel_name`: send message to a channel (ex: `@channel`) |     * `channel_name`: send message to a channel (ex: `@channel`) | ||||||
|     * `chat_id`: send message to a chat (ex: `1234`) |     * `chat_id`: send message to a chat (ex: `1234`) | ||||||
|  |  | ||||||
|  | @ -33,6 +33,7 @@ type TwitterConfig struct { | ||||||
| 	AccessTokenSecret string              `json:"access_token_secret"` | 	AccessTokenSecret string              `json:"access_token_secret"` | ||||||
| 	Hashtags          []map[string]string `json:"hashtags"` | 	Hashtags          []map[string]string `json:"hashtags"` | ||||||
| 	EnableReplies     bool                `json:"enable_replies"` | 	EnableReplies     bool                `json:"enable_replies"` | ||||||
|  | 	Retention         int                 `json:"retention"` | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // TelegramConfig to store Telegram API key | // TelegramConfig to store Telegram API key | ||||||
|  |  | ||||||
							
								
								
									
										3
									
								
								main.go
									
										
									
									
									
								
							
							
						
						
									
										3
									
								
								main.go
									
										
									
									
									
								
							|  | @ -132,8 +132,9 @@ func main() { | ||||||
| 			log.Debugf("found old product: %s", p.Name) | 			log.Debugf("found old product: %s", p.Name) | ||||||
| 			if trx = db.Unscoped().Delete(&p); trx.Error != nil { | 			if trx = db.Unscoped().Delete(&p); trx.Error != nil { | ||||||
| 				log.Warnf("cannot remove stale product %s (%s): %s", p.Name, p.URL, trx.Error) | 				log.Warnf("cannot remove stale product %s (%s): %s", p.Name, p.URL, trx.Error) | ||||||
|  | 			} else { | ||||||
|  | 				log.Logf("stale product %s (%s) removed from database", p.Name, p.URL) | ||||||
| 			} | 			} | ||||||
| 			log.Printf("stale product %s (%s) removed from database", p.Name, p.URL) |  | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -1,6 +1,7 @@ | ||||||
| package main | package main | ||||||
| 
 | 
 | ||||||
| import ( | import ( | ||||||
|  | 	"crypto/md5" | ||||||
| 	"fmt" | 	"fmt" | ||||||
| 	"regexp" | 	"regexp" | ||||||
| 	"strings" | 	"strings" | ||||||
|  | @ -19,9 +20,11 @@ const tweetMaxSize = 280 | ||||||
| // Tweet to store relationship between a Product and a Twitter notification | // Tweet to store relationship between a Product and a Twitter notification | ||||||
| type Tweet struct { | type Tweet struct { | ||||||
| 	gorm.Model | 	gorm.Model | ||||||
| 	TweetID    int64 | 	TweetID     int64   `gorm:"not null;unique"` | ||||||
| 	ProductURL string | 	Hash        string  `gorm:"unique"` | ||||||
| 	Product    Product `gorm:"not null;references:URL"` | 	LastTweetID int64   `gorm:"index"` | ||||||
|  | 	ProductURL  string  `gorm:"index"` | ||||||
|  | 	Product     Product `gorm:"not null;references:URL"` | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // TwitterNotifier to manage notifications to Twitter | // TwitterNotifier to manage notifications to Twitter | ||||||
|  | @ -31,6 +34,7 @@ type TwitterNotifier struct { | ||||||
| 	user          *twitter.User | 	user          *twitter.User | ||||||
| 	hashtagsMap   []map[string]string | 	hashtagsMap   []map[string]string | ||||||
| 	enableReplies bool | 	enableReplies bool | ||||||
|  | 	retentionDays int | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // NewTwitterNotifier creates a TwitterNotifier | // NewTwitterNotifier creates a TwitterNotifier | ||||||
|  | @ -58,7 +62,46 @@ func NewTwitterNotifier(c *TwitterConfig, db *gorm.DB) (*TwitterNotifier, error) | ||||||
| 	} | 	} | ||||||
| 	log.Debugf("connected to twitter as @%s", user.ScreenName) | 	log.Debugf("connected to twitter as @%s", user.ScreenName) | ||||||
| 
 | 
 | ||||||
| 	return &TwitterNotifier{client: client, user: user, hashtagsMap: c.Hashtags, db: db, enableReplies: c.EnableReplies}, nil | 	notifier := &TwitterNotifier{ | ||||||
|  | 		client:        client, | ||||||
|  | 		user:          user, | ||||||
|  | 		hashtagsMap:   c.Hashtags, | ||||||
|  | 		db:            db, | ||||||
|  | 		enableReplies: c.EnableReplies, | ||||||
|  | 		retentionDays: c.Retention, | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	// delete old tweets | ||||||
|  | 	if err = notifier.ensureRetention(); err != nil { | ||||||
|  | 		return nil, err | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	return notifier, nil | ||||||
|  | 
 | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // ensureRetention deletes tweets according to the defined retention | ||||||
|  | func (c *TwitterNotifier) ensureRetention() error { | ||||||
|  | 	if c.retentionDays == 0 { | ||||||
|  | 		log.Debugf("tweet retention not found, skipping database cleanup") | ||||||
|  | 		return nil | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	var oldTweets []Tweet | ||||||
|  | 	retentionDate := time.Now().Local().Add(-time.Hour * 24 * time.Duration(c.retentionDays)) | ||||||
|  | 	trx := c.db.Where("updated_at < ?", retentionDate).Find(&oldTweets) | ||||||
|  | 	if trx.Error != nil { | ||||||
|  | 		return fmt.Errorf("cannot find twitter old statuses: %s", trx.Error) | ||||||
|  | 	} | ||||||
|  | 	for _, t := range oldTweets { | ||||||
|  | 		log.Debugf("twitter old status found with id %d", t.TweetID) | ||||||
|  | 		if trx = c.db.Unscoped().Delete(&t); trx.Error != nil { | ||||||
|  | 			log.Warnf("cannot remove old tweet %d: %s", t.TweetID, trx.Error) | ||||||
|  | 		} else { | ||||||
|  | 			log.Infof("twitter old status %d removed from database", t.TweetID) | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 	return nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // create a brand new tweet | // create a brand new tweet | ||||||
|  | @ -98,26 +141,69 @@ func (c *TwitterNotifier) buildHashtags(productName string) string { | ||||||
| // NotifyWhenAvailable create a Twitter status for announcing that a product is available | // NotifyWhenAvailable create a Twitter status for announcing that a product is available | ||||||
| // implements the Notifier interface | // implements the Notifier interface | ||||||
| func (c *TwitterNotifier) NotifyWhenAvailable(shopName string, productName string, productPrice float64, productCurrency string, productURL string) error { | func (c *TwitterNotifier) NotifyWhenAvailable(shopName string, productName string, productPrice float64, productCurrency string, productURL string) error { | ||||||
| 	// TODO: check if message exists in the database to avoid flood | 	// format message | ||||||
| 	hashtags := c.buildHashtags(productName) | 	hashtags := c.buildHashtags(productName) | ||||||
| 	message := formatAvailableTweet(shopName, productName, productPrice, productCurrency, productURL, hashtags) | 	message := formatAvailableTweet(shopName, productName, productPrice, productCurrency, productURL, hashtags) | ||||||
| 	// create thread |  | ||||||
| 	tweetID, err := c.createTweet(message) |  | ||||||
| 	if err != nil { |  | ||||||
| 		return fmt.Errorf("failed to create new twitter thread: %s", err) |  | ||||||
| 	} |  | ||||||
| 	log.Infof("tweet %d sent", tweetID) |  | ||||||
| 
 | 
 | ||||||
| 	// save thread to database | 	// compute message checksum to avoid duplicates | ||||||
| 	t := Tweet{TweetID: tweetID, ProductURL: productURL} | 	var tweet Tweet | ||||||
| 	trx := c.db.Create(&t) | 	hash := fmt.Sprintf("%x", md5.Sum([]byte(message))) | ||||||
| 	if trx.Error != nil { | 	trx := c.db.Where(Tweet{Hash: hash}).First(&tweet) | ||||||
| 		return fmt.Errorf("failed to save tweet %d to database: %s", t.TweetID, trx.Error) | 	if trx.Error != nil && trx.Error != gorm.ErrRecordNotFound { | ||||||
|  | 		return fmt.Errorf("could not search for tweet with hash %s for product '%s': %s", hash, productURL, trx.Error) | ||||||
| 	} | 	} | ||||||
| 	log.Debugf("tweet %d saved to database", t.TweetID) | 
 | ||||||
|  | 	if trx.Error == gorm.ErrRecordNotFound { | ||||||
|  | 
 | ||||||
|  | 		// tweet has not been sent in the past | ||||||
|  | 		// create thread | ||||||
|  | 		tweetID, err := c.createTweet(message) | ||||||
|  | 		if err != nil { | ||||||
|  | 			return fmt.Errorf("could not create new twitter thread for product '%s': %s", productURL, err) | ||||||
|  | 		} | ||||||
|  | 		log.Infof("tweet %d sent for product '%s'", tweetID, productURL) | ||||||
|  | 
 | ||||||
|  | 		// save thread to database | ||||||
|  | 		tweet = Tweet{TweetID: tweetID, ProductURL: productURL, Hash: hash} | ||||||
|  | 		trx = c.db.Create(&tweet) | ||||||
|  | 		if trx.Error != nil { | ||||||
|  | 			return fmt.Errorf("could not save tweet %d to database for product '%s': %s", tweet.TweetID, productURL, trx.Error) | ||||||
|  | 		} | ||||||
|  | 		log.Debugf("tweet %d saved to database", tweet.TweetID) | ||||||
|  | 
 | ||||||
|  | 	} else { | ||||||
|  | 
 | ||||||
|  | 		if !c.enableReplies { | ||||||
|  | 			log.Debugf("twitter replies are disabled, skipping available notification for product '%s'", productURL) | ||||||
|  | 			return nil | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		// select tweet to reply | ||||||
|  | 		lastTweetID := CoalesceInt64(tweet.LastTweetID, tweet.TweetID) | ||||||
|  | 		if lastTweetID == 0 { | ||||||
|  | 			return fmt.Errorf("could not find original tweet ID to create reply for product '%s'", productURL) | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		// tweet already has been sent in the past and replies are enabled | ||||||
|  | 		// continuing thread | ||||||
|  | 		tweetID, err := c.replyToTweet(lastTweetID, "Good news, it's available again!") | ||||||
|  | 		if err != nil { | ||||||
|  | 			return fmt.Errorf("could not reply to tweet %d for product '%s': %s", lastTweetID, productURL, err) | ||||||
|  | 		} | ||||||
|  | 		log.Infof("reply to tweet %d sent with id %d for product '%s'", lastTweetID, tweetID, productURL) | ||||||
|  | 
 | ||||||
|  | 		// save thread to database | ||||||
|  | 		tweet.LastTweetID = tweetID | ||||||
|  | 		if trx = c.db.Save(&tweet); trx.Error != nil { | ||||||
|  | 			return fmt.Errorf("could not save tweet %d to database for product '%s': %s", tweet.TweetID, productURL, trx.Error) | ||||||
|  | 		} | ||||||
|  | 		log.Debugf("tweet %d saved in database", tweet.TweetID) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | // formatAvailableTweet creates a message based on product characteristics | ||||||
| func formatAvailableTweet(shopName string, productName string, productPrice float64, productCurrency string, productURL string, hashtags string) string { | func formatAvailableTweet(shopName string, productName string, productPrice float64, productCurrency string, productURL string, hashtags string) string { | ||||||
| 	// format message | 	// format message | ||||||
| 	formattedPrice := formatPrice(productPrice, productCurrency) | 	formattedPrice := formatPrice(productPrice, productCurrency) | ||||||
|  | @ -140,32 +226,37 @@ func (c *TwitterNotifier) NotifyWhenNotAvailable(productURL string, duration tim | ||||||
| 	// find Tweet in the database | 	// find Tweet in the database | ||||||
| 	var tweet Tweet | 	var tweet Tweet | ||||||
| 	trx := c.db.Where(Tweet{ProductURL: productURL}).First(&tweet) | 	trx := c.db.Where(Tweet{ProductURL: productURL}).First(&tweet) | ||||||
|  | 
 | ||||||
| 	if trx.Error != nil { | 	if trx.Error != nil { | ||||||
| 		return fmt.Errorf("failed to find tweet in database for product with url %s: %s", productURL, trx.Error) | 		return fmt.Errorf("could not find tweet for product '%s' in the database: %s", productURL, trx.Error) | ||||||
| 	} |  | ||||||
| 	if tweet.TweetID == 0 { |  | ||||||
| 		log.Warnf("tweet for product with url %s not found, skipping close notification", productURL) |  | ||||||
| 		return nil |  | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	if c.enableReplies { | 	if c.enableReplies { | ||||||
| 		// format message | 		// format message | ||||||
| 		message := fmt.Sprintf("And it's gone (%s)", duration) | 		message := fmt.Sprintf("And it's gone (%s)", duration) | ||||||
| 
 | 
 | ||||||
| 		// close thread on twitter | 		// select tweet to reply | ||||||
| 		_, err := c.replyToTweet(tweet.TweetID, message) | 		lastTweetID := CoalesceInt64(tweet.LastTweetID, tweet.TweetID) | ||||||
| 		if err != nil { | 		if lastTweetID == 0 { | ||||||
| 			return fmt.Errorf("failed to create reply tweet: %s", err) | 			return fmt.Errorf("could not find original tweet ID to create reply for product '%s'", productURL) | ||||||
| 		} | 		} | ||||||
| 		log.Infof("reply to tweet %d sent", tweet.TweetID) |  | ||||||
| 	} |  | ||||||
| 
 | 
 | ||||||
| 	// remove tweet from database | 		// close thread on twitter | ||||||
| 	trx = c.db.Unscoped().Delete(&tweet) | 		tweetID, err := c.replyToTweet(lastTweetID, message) | ||||||
| 	if trx.Error != nil { | 		if err != nil { | ||||||
| 		return fmt.Errorf("failed to remove tweet %d from database: %s", tweet.TweetID, trx.Error) | 			return fmt.Errorf("could not close thread on twitter for product '%s': %s", productURL, err) | ||||||
|  | 		} | ||||||
|  | 		log.Infof("reply to tweet %d sent with id %d for product '%s'", lastTweetID, tweetID, productURL) | ||||||
|  | 
 | ||||||
|  | 		// save tweet id on database | ||||||
|  | 		tweet.LastTweetID = tweetID | ||||||
|  | 		if trx = c.db.Save(&tweet); trx.Error != nil { | ||||||
|  | 			return fmt.Errorf("could not save tweet %d to database for product '%s': %s", tweet.TweetID, productURL, trx.Error) | ||||||
|  | 		} | ||||||
|  | 		log.Debugf("tweet %d saved in database", tweet.TweetID) | ||||||
|  | 	} else { | ||||||
|  | 		log.Debugf("twitter replies are disabled, skipping not available notification for '%s'", productURL) | ||||||
| 	} | 	} | ||||||
| 	log.Debugf("tweet removed from database") |  | ||||||
| 
 | 
 | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
|  |  | ||||||
							
								
								
									
										10
									
								
								utils.go
									
										
									
									
									
								
							
							
						
						
									
										10
									
								
								utils.go
									
										
									
									
									
								
							|  | @ -26,3 +26,13 @@ func ContainsString(arr []string, str string) bool { | ||||||
| 	} | 	} | ||||||
| 	return false | 	return false | ||||||
| } | } | ||||||
|  | 
 | ||||||
|  | // CoalesceInt64 returns the first non zero value from variadic int64 arguments | ||||||
|  | func CoalesceInt64(values ...int64) int64 { | ||||||
|  | 	for _, value := range values { | ||||||
|  | 		if value != 0 { | ||||||
|  | 			return value | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 	return 0 | ||||||
|  | } | ||||||
|  |  | ||||||
|  | @ -31,3 +31,31 @@ func TestExtractShopName(t *testing.T) { | ||||||
| 		}) | 		}) | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  | 
 | ||||||
|  | func TestCoalesceInt64(t *testing.T) { | ||||||
|  | 	tests := []struct { | ||||||
|  | 		arguments []int64 | ||||||
|  | 		expected  int64 | ||||||
|  | 	}{ | ||||||
|  | 		// single value | ||||||
|  | 		{[]int64{0}, 0}, | ||||||
|  | 		{[]int64{99}, 99}, | ||||||
|  | 
 | ||||||
|  | 		// multiple values | ||||||
|  | 		{[]int64{0, 0}, 0}, | ||||||
|  | 		{[]int64{0, 99}, 99}, | ||||||
|  | 		{[]int64{99, 0}, 99}, | ||||||
|  | 		{[]int64{99, 99}, 99}, | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	for i, tc := range tests { | ||||||
|  | 		t.Run(fmt.Sprintf("TestExtractShopName#%d", i), func(t *testing.T) { | ||||||
|  | 			result := CoalesceInt64(tc.arguments...) | ||||||
|  | 			if result != tc.expected { | ||||||
|  | 				t.Errorf("for %+v: got %d, want %d", tc.arguments, result, tc.expected) | ||||||
|  | 			} else { | ||||||
|  | 				t.Logf("for %+v: got %d, want %d", tc.arguments, result, tc.expected) | ||||||
|  | 			} | ||||||
|  | 		}) | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
		Reference in a new issue