Prepare for new parsers
- Rename "Parser" to "URLParser" - Make "Parse" function generic - Rename "crawlShop" function to "handleProducts" - Reduce "handleProducts" footprint a little bit Signed-off-by: Julien Riou <julien@riou.xyz>
This commit is contained in:
parent
9269d59380
commit
e67ab63ca8
3 changed files with 115 additions and 109 deletions
194
main.go
194
main.go
|
@ -99,12 +99,6 @@ func main() {
|
||||||
defer removePid(*pidFile)
|
defer removePid(*pidFile)
|
||||||
}
|
}
|
||||||
|
|
||||||
// create parser
|
|
||||||
parser, err := NewParser(config.BrowserAddress, config.IncludeRegex, config.ExcludeRegex)
|
|
||||||
if err != nil {
|
|
||||||
log.Fatalf("could not create parser: %s", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// connect to the database
|
// connect to the database
|
||||||
db, err := gorm.Open(sqlite.Open(*databaseFileName), &gorm.Config{})
|
db, err := gorm.Open(sqlite.Open(*databaseFileName), &gorm.Config{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -145,128 +139,134 @@ func main() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// crawl shops asynchronously
|
// parse asynchronously
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
jobsCount := 0
|
jobsCount := 0
|
||||||
|
|
||||||
|
// start with URLs
|
||||||
for shopName, shopLinks := range ShopsMap {
|
for shopName, shopLinks := range ShopsMap {
|
||||||
if jobsCount < *workers {
|
|
||||||
wg.Add(1)
|
// read shop from database or create it
|
||||||
jobsCount++
|
var shop Shop
|
||||||
go crawlShop(parser, shopName, shopLinks, notifiers, db, &wg)
|
trx := db.Where(Shop{Name: shopName}).FirstOrCreate(&shop)
|
||||||
} else {
|
if trx.Error != nil {
|
||||||
log.Debugf("waiting for intermediate jobs to end")
|
log.Errorf("cannot create or select shop %s to/from database: %s", shopName, trx.Error)
|
||||||
wg.Wait()
|
continue
|
||||||
jobsCount = 0
|
}
|
||||||
|
|
||||||
|
for _, link := range shopLinks {
|
||||||
|
if jobsCount < *workers {
|
||||||
|
// create parser
|
||||||
|
parser, err := NewURLParser(link, config.BrowserAddress, config.IncludeRegex, config.ExcludeRegex)
|
||||||
|
if err != nil {
|
||||||
|
log.Warnf("could not create URL parser for %s", link)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
wg.Add(1)
|
||||||
|
jobsCount++
|
||||||
|
go handleProducts(shop, parser, notifiers, db, &wg)
|
||||||
|
} else {
|
||||||
|
log.Debugf("waiting for intermediate jobs to end")
|
||||||
|
wg.Wait()
|
||||||
|
jobsCount = 0
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Debugf("waiting for all jobs to end")
|
log.Debugf("waiting for all jobs to end")
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
// For a given shop, fetch and parse all the dependent URLs, then eventually send notifications
|
// For a given shop, fetch and parse its URL, then eventually send notifications
|
||||||
func crawlShop(parser *Parser, shopName string, shopLinks []string, notifiers []Notifier, db *gorm.DB, wg *sync.WaitGroup) {
|
func handleProducts(shop Shop, parser *URLParser, notifiers []Notifier, db *gorm.DB, wg *sync.WaitGroup) {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
log.Debugf("parsing shop %s", shopName)
|
|
||||||
|
|
||||||
// read shop from database or create it
|
log.Debugf("parsing with %s", parser)
|
||||||
var shop Shop
|
products, err := parser.Parse()
|
||||||
trx := db.Where(Shop{Name: shopName}).FirstOrCreate(&shop)
|
if err != nil {
|
||||||
if trx.Error != nil {
|
log.Warnf("cannot parse: %s", err)
|
||||||
log.Errorf("cannot create or select shop %s to/from database: %s", shopName, trx.Error)
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
log.Debugf("parsed")
|
||||||
|
|
||||||
for _, link := range shopLinks {
|
// upsert products to database
|
||||||
|
for _, product := range products {
|
||||||
|
|
||||||
log.Debugf("parsing url %s", link)
|
log.Debugf("detected product %+v", product)
|
||||||
products, err := parser.Parse(link)
|
|
||||||
if err != nil {
|
if !product.IsValid() {
|
||||||
log.Warnf("cannot parse %s: %s", link, err)
|
log.Warnf("parsed malformatted product: %+v", product)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
log.Debugf("url %s parsed", link)
|
|
||||||
|
|
||||||
// upsert products to database
|
// check if product is already in the database
|
||||||
for _, product := range products {
|
// sometimes new products are detected on the website, directly available, without reference in the database
|
||||||
|
// the bot has to send a notification instead of blindly creating it in the database and check availability afterwards
|
||||||
|
var count int64
|
||||||
|
trx := db.Model(&Product{}).Where(Product{URL: product.URL}).Count(&count)
|
||||||
|
if trx.Error != nil {
|
||||||
|
log.Warnf("cannot see if product %s already exists in the database: %s", product.Name, trx.Error)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
log.Debugf("detected product %+v", product)
|
// fetch product from database or create it if it doesn't exist
|
||||||
|
var dbProduct Product
|
||||||
|
trx = db.Where(Product{URL: product.URL}).Attrs(Product{Name: product.Name, Shop: shop, Price: product.Price, PriceCurrency: product.PriceCurrency, Available: product.Available}).FirstOrCreate(&dbProduct)
|
||||||
|
if trx.Error != nil {
|
||||||
|
log.Warnf("cannot fetch product %s from database: %s", product.Name, trx.Error)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
log.Debugf("product %s found in database", dbProduct.Name)
|
||||||
|
|
||||||
if !product.IsValid() {
|
// detect availability change
|
||||||
log.Warnf("parsed malformatted product: %+v", product)
|
duration := time.Now().Sub(dbProduct.UpdatedAt).Truncate(time.Second)
|
||||||
continue
|
createThread := false
|
||||||
}
|
closeThread := false
|
||||||
|
|
||||||
// check if product is already in the database
|
// non-existing product directly available
|
||||||
// sometimes new products are detected on the website, directly available, without reference in the database
|
if count == 0 && product.Available {
|
||||||
// the bot has to send a notification instead of blindly creating it in the database and check availability afterwards
|
log.Infof("product %s on %s is now available", product.Name, shop.Name)
|
||||||
var count int64
|
createThread = true
|
||||||
trx = db.Model(&Product{}).Where(Product{URL: product.URL}).Count(&count)
|
}
|
||||||
if trx.Error != nil {
|
|
||||||
log.Warnf("cannot see if product %s already exists in the database: %s", product.Name, trx.Error)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// fetch product from database or create it if it doesn't exist
|
// existing product with availability change
|
||||||
var dbProduct Product
|
if count > 0 && (dbProduct.Available != product.Available) {
|
||||||
trx = db.Where(Product{URL: product.URL}).Attrs(Product{Name: product.Name, Shop: shop, Price: product.Price, PriceCurrency: product.PriceCurrency, Available: product.Available}).FirstOrCreate(&dbProduct)
|
if product.Available {
|
||||||
if trx.Error != nil {
|
log.Infof("product %s on %s is now available", product.Name, shop.Name)
|
||||||
log.Warnf("cannot fetch product %s from database: %s", product.Name, trx.Error)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
log.Debugf("product %s found in database", dbProduct.Name)
|
|
||||||
|
|
||||||
// detect availability change
|
|
||||||
duration := time.Now().Sub(dbProduct.UpdatedAt).Truncate(time.Second)
|
|
||||||
createThread := false
|
|
||||||
closeThread := false
|
|
||||||
|
|
||||||
// non-existing product directly available
|
|
||||||
if count == 0 && product.Available {
|
|
||||||
log.Infof("product %s on %s is now available", product.Name, shopName)
|
|
||||||
createThread = true
|
createThread = true
|
||||||
|
} else {
|
||||||
|
log.Infof("product %s on %s is not available anymore", product.Name, shop.Name)
|
||||||
|
closeThread = true
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// existing product with availability change
|
// update product in database before sending notification
|
||||||
if count > 0 && (dbProduct.Available != product.Available) {
|
// if there is a database failure, we don't want the bot to send a notification at each run
|
||||||
if product.Available {
|
if dbProduct.ToMerge(product) {
|
||||||
log.Infof("product %s on %s is now available", product.Name, shopName)
|
dbProduct.Merge(product)
|
||||||
createThread = true
|
trx = db.Save(&dbProduct)
|
||||||
} else {
|
if trx.Error != nil {
|
||||||
log.Infof("product %s on %s is not available anymore", product.Name, shopName)
|
log.Warnf("cannot save product %s to database: %s", dbProduct.Name, trx.Error)
|
||||||
closeThread = true
|
continue
|
||||||
|
}
|
||||||
|
log.Debugf("product %s updated in database", dbProduct.Name)
|
||||||
|
}
|
||||||
|
|
||||||
|
// send notifications
|
||||||
|
if createThread {
|
||||||
|
for _, notifier := range notifiers {
|
||||||
|
if err := notifier.NotifyWhenAvailable(shop.Name, dbProduct.Name, dbProduct.Price, dbProduct.PriceCurrency, dbProduct.URL); err != nil {
|
||||||
|
log.Errorf("%s", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
} else if closeThread {
|
||||||
// update product in database before sending notification
|
for _, notifier := range notifiers {
|
||||||
// if there is a database failure, we don't want the bot to send a notification at each run
|
if err := notifier.NotifyWhenNotAvailable(dbProduct.URL, duration); err != nil {
|
||||||
if dbProduct.ToMerge(product) {
|
log.Errorf("%s", err)
|
||||||
dbProduct.Merge(product)
|
|
||||||
trx = db.Save(&dbProduct)
|
|
||||||
if trx.Error != nil {
|
|
||||||
log.Warnf("cannot save product %s to database: %s", dbProduct.Name, trx.Error)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
log.Debugf("product %s updated in database", dbProduct.Name)
|
|
||||||
}
|
|
||||||
|
|
||||||
// send notifications
|
|
||||||
if createThread {
|
|
||||||
for _, notifier := range notifiers {
|
|
||||||
if err := notifier.NotifyWhenAvailable(shop.Name, dbProduct.Name, dbProduct.Price, dbProduct.PriceCurrency, dbProduct.URL); err != nil {
|
|
||||||
log.Errorf("%s", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else if closeThread {
|
|
||||||
for _, notifier := range notifiers {
|
|
||||||
if err := notifier.NotifyWhenNotAvailable(dbProduct.URL, duration); err != nil {
|
|
||||||
log.Errorf("%s", err)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Debugf("shop %s parsed", shopName)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func showVersion() {
|
func showVersion() {
|
||||||
|
|
|
@ -14,15 +14,20 @@ import (
|
||||||
"github.com/MontFerret/ferret/pkg/drivers/http"
|
"github.com/MontFerret/ferret/pkg/drivers/http"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Parser structure to handle websites parsing logic
|
// URLParser structure to handle websites parsing logic
|
||||||
type Parser struct {
|
type URLParser struct {
|
||||||
|
url string
|
||||||
includeRegex *regexp.Regexp
|
includeRegex *regexp.Regexp
|
||||||
excludeRegex *regexp.Regexp
|
excludeRegex *regexp.Regexp
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewParser to create a new Parser instance
|
func (p *URLParser) String() string {
|
||||||
func NewParser(browserAddress string, includeRegex string, excludeRegex string) (*Parser, error) {
|
return fmt.Sprintf("URLParser<%s>", p.url)
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewURLParser to create a new URLParser instance
|
||||||
|
func NewURLParser(url string, browserAddress string, includeRegex string, excludeRegex string) (*URLParser, error) {
|
||||||
var err error
|
var err error
|
||||||
var includeRegexCompiled, excludeRegexCompiled *regexp.Regexp
|
var includeRegexCompiled, excludeRegexCompiled *regexp.Regexp
|
||||||
|
|
||||||
|
@ -47,7 +52,8 @@ func NewParser(browserAddress string, includeRegex string, excludeRegex string)
|
||||||
ctx = drivers.WithContext(ctx, cdp.NewDriver(cdp.WithAddress(browserAddress)))
|
ctx = drivers.WithContext(ctx, cdp.NewDriver(cdp.WithAddress(browserAddress)))
|
||||||
ctx = drivers.WithContext(ctx, http.NewDriver(), drivers.AsDefault())
|
ctx = drivers.WithContext(ctx, http.NewDriver(), drivers.AsDefault())
|
||||||
|
|
||||||
return &Parser{
|
return &URLParser{
|
||||||
|
url: url,
|
||||||
includeRegex: includeRegexCompiled,
|
includeRegex: includeRegexCompiled,
|
||||||
excludeRegex: excludeRegexCompiled,
|
excludeRegex: excludeRegexCompiled,
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
|
@ -56,13 +62,13 @@ func NewParser(browserAddress string, includeRegex string, excludeRegex string)
|
||||||
|
|
||||||
// Parse a website to return list of products
|
// Parse a website to return list of products
|
||||||
// TODO: redirect output to logger
|
// TODO: redirect output to logger
|
||||||
func (p *Parser) Parse(url string) ([]*Product, error) {
|
func (p *URLParser) Parse() ([]*Product, error) {
|
||||||
shopName, err := ExtractShopName(url)
|
shopName, err := ExtractShopName(p.url)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
query, err := createQuery(shopName, url)
|
query, err := createQuery(shopName, p.url)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -90,7 +96,7 @@ func (p *Parser) Parse(url string) ([]*Product, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// filterInclusive returns a list of products matching the include regex
|
// filterInclusive returns a list of products matching the include regex
|
||||||
func (p *Parser) filterInclusive(products []*Product) []*Product {
|
func (p *URLParser) filterInclusive(products []*Product) []*Product {
|
||||||
var filtered []*Product
|
var filtered []*Product
|
||||||
if p.includeRegex != nil {
|
if p.includeRegex != nil {
|
||||||
for _, product := range products {
|
for _, product := range products {
|
||||||
|
@ -107,7 +113,7 @@ func (p *Parser) filterInclusive(products []*Product) []*Product {
|
||||||
}
|
}
|
||||||
|
|
||||||
// filterExclusive returns a list of products that don't match the exclude regex
|
// filterExclusive returns a list of products that don't match the exclude regex
|
||||||
func (p *Parser) filterExclusive(products []*Product) []*Product {
|
func (p *URLParser) filterExclusive(products []*Product) []*Product {
|
||||||
var filtered []*Product
|
var filtered []*Product
|
||||||
if p.excludeRegex != nil {
|
if p.excludeRegex != nil {
|
||||||
for _, product := range products {
|
for _, product := range products {
|
|
@ -18,7 +18,7 @@ func TestFilterInclusive(t *testing.T) {
|
||||||
|
|
||||||
for i, tc := range tests {
|
for i, tc := range tests {
|
||||||
t.Run(fmt.Sprintf("TestFilterInclusive#%d", i), func(t *testing.T) {
|
t.Run(fmt.Sprintf("TestFilterInclusive#%d", i), func(t *testing.T) {
|
||||||
p, err := NewParser("", tc.regex, "")
|
p, err := NewURLParser("", "", tc.regex, "")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("failed to initialize parser: %s", err)
|
t.Errorf("failed to initialize parser: %s", err)
|
||||||
} else {
|
} else {
|
||||||
|
@ -58,7 +58,7 @@ func TestFilterExclusive(t *testing.T) {
|
||||||
|
|
||||||
for i, tc := range tests {
|
for i, tc := range tests {
|
||||||
t.Run(fmt.Sprintf("TestFilterExclusive#%d", i), func(t *testing.T) {
|
t.Run(fmt.Sprintf("TestFilterExclusive#%d", i), func(t *testing.T) {
|
||||||
p, err := NewParser("", "", tc.regex)
|
p, err := NewURLParser("", "", "", tc.regex)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("failed to initialize parser: %s", err)
|
t.Errorf("failed to initialize parser: %s", err)
|
||||||
} else {
|
} else {
|
Reference in a new issue