Archived
1
0
Fork 0

Initial release

This commit is contained in:
Julien Riou 2020-12-27 18:27:07 +01:00
parent 89c7ec935b
commit 0b0d2727e8
No known key found for this signature in database
GPG key ID: FF42D23B580C89F7
13 changed files with 1011 additions and 0 deletions

6
.gitignore vendored Normal file
View file

@ -0,0 +1,6 @@
__pycache__
config.json
*.html
TODO.txt
restock.db
geckodriver.log

29
.pre-commit-config.yaml Normal file
View file

@ -0,0 +1,29 @@
---
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: master
hooks:
- id: check-executables-have-shebangs
- id: check-merge-conflict
- id: double-quote-string-fixer
- id: end-of-file-fixer
- id: fix-encoding-pragma
args: ['--remove']
- id: requirements-txt-fixer
- id: trailing-whitespace
- id: check-json
- repo: https://gitlab.com/pycqa/flake8
rev: master
hooks:
- id: flake8
args: ['--max-line-length=120']
- repo: https://github.com/FalconSocial/pre-commit-python-sorter
rev: master
hooks:
- id: python-import-sorter
args: ['--silent-overwrite']
- repo: https://github.com/chewse/pre-commit-mirrors-pydocstyle
rev: master
hooks:
- id: pydocstyle
args: ['--config=.pydocstyle', '--match="(?!test_).*\.py"']

2
.pydocstyle Normal file
View file

@ -0,0 +1,2 @@
[pydocstyle]
ignore = D100,D104,D400,D203,D204,D101,D213,D202

89
README.md Normal file
View file

@ -0,0 +1,89 @@
Year 2020 has been quite hard for hardware supply. Graphics Cards are out of stock everywhere. Nobody can grab the
new generation (AMD RX 6000 series, NVIDIA GeForce RTX 3000 series). Even older generations are hard to find.
**GraphicRestock** is a bot that crawl retailers websites and notify when a product is available.
# Setup
Based on Debian 10:
```
apt install python3-selenium python3-sqlalchemy python3-tweepy firefox-esr
curl -L -s https://github.com/mozilla/geckodriver/releases/download/v0.28.0/geckodriver-v0.28.0-linux64.tar.gz | tar xvpzf - -C /usr/local/bin/
chown root:root /usr/local/bin/geckodriver
chmod +x /usr/local/bin/geckodriver
```
# Configure
Configuration file example can be found [here](config.json.example).
Options:
* **twitter.consumer_key**: key of your Twitter application
* **twitter.consumer_secret**: secret of your Twitter application
* **twitter.access_token**: authentication token generated by [twitter_auth.py](twitter_auth.py)
* **twitter.access_token_secret**: authentication token secret generated by [twitter_auth.py](twitter_auth.py)
* **urls**: list of retailers web pages (they need to respect crawlers' format)
* **executable_path** (optional): path to selenium driver (firefox/gecko browser)
# Twitter authentication
Create a configuration file with **twitter.consumer_key** and **twitter.consumer_secret** parameters.
Then authenticate:
```
python3 twitter_auth.py
```
You will have to open the URL and authenticate:
```
Please go to https://api.twitter.com/oauth/authorize?oauth_token=****
```
Click on **Authorize app**. A verifier code will be shown. Go back to your console and enter the code.
```
Verifier:*******
```
Tokens will be created:
```
access_token = *****
access_token_secret = ****
```
Finally, write them to configuration file in **twitter.access_token** and **twitter.access_token_secret** parameters.
# Usage
```
python3 main.py --help
```
# How to contribute
First things first, check issues to ensure the feature or bug you are facing is not already declared.
Pull requests are highly appreciated.
Please lint your code:
```
docker run -it -v $(pwd):/mnt/ --rm debian:10 bash
apt-get update && apt-get upgrade -y && apt-get install -y python3-pip git
pip3 install pre-commit
cd /mnt
pre-commit run --all-files
```
Happy coding!
# Disclaimer
Crawling a website should be used with caution. Please check with retailers if the bot respects the terms of use for
their websites. Authors of the bot are not responsible of the bot usage.

18
config.json.example Normal file
View file

@ -0,0 +1,18 @@
{
"twitter": {
"consumer_key": "***",
"consumer_secret": "***",
"access_token": "***",
"access_token_secret": "***"
},
"urls": [
"https://www.topachat.com/pages/produits_cat_est_micro_puis_rubrique_est_wgfx_pcie_puis_f_est_58-11447,11445,11446,11559,11558.html",
"https://www.ldlc.com/informatique/pieces-informatique/carte-graphique-interne/c4684/+fv121-19183,19184,19185,19339,19340.html",
"https://www.materiel.net/carte-graphique/l426/+fv121-19183,19184,19185,19339,19340/",
"https://www.alternate.be/Hardware/Grafische-kaarten/NVIDIA/RTX-3060-Ti",
"https://www.alternate.be/Hardware/Grafische-kaarten/NVIDIA/RTX-3070",
"https://www.alternate.be/Hardware/Grafische-kaarten/NVIDIA/RTX-3080",
"https://www.alternate.be/Hardware/Grafische-kaarten/NVIDIA/RTX-3090"
],
"executable_path": "/usr/bin/geckodriver"
}

24
config.py Normal file
View file

@ -0,0 +1,24 @@
import json
from utils import parse_base_url
def read_config(filename):
with open(filename, 'r') as fd:
return json.load(fd)
def extract_shops(urls):
"""
Parse shop name and return list of addresses for each shop
Example: {"toto.com/first", "toto.com/second", "tata.com/first"}
-> {"toto.com": ["toto.com/first", "toto.com/second"], "tata.com": ["tata.com/first"]}
"""
result = {}
for url in urls:
base_url = parse_base_url(url, include_scheme=False)
if base_url not in result:
result[base_url] = [url]
else:
result[base_url].append(url)
return result

94
crawlers.py Normal file
View file

@ -0,0 +1,94 @@
import logging
from parsers import (AlternateParser, LDLCParser, MaterielNetParser,
TopAchatParser)
from selenium import webdriver
from selenium.common.exceptions import TimeoutException
from selenium.webdriver.common.by import By
from selenium.webdriver.firefox.options import Options
from selenium.webdriver.support import expected_conditions
from selenium.webdriver.support.ui import WebDriverWait
logger = logging.getLogger(__name__)
class ProductCrawler(object):
TIMEOUT = 3
def __init__(self, shop):
options = Options()
options.headless = True
self._driver = webdriver.Firefox(executable_path='/usr/local/bin/geckodriver', options=options)
self._shop = shop
self.products = []
def __del__(self):
self._driver.quit()
def fetch(self, url, wait_for=None):
self._driver.get(url)
if wait_for:
try:
condition = expected_conditions.presence_of_element_located((By.CLASS_NAME, wait_for))
WebDriverWait(self._driver, self.TIMEOUT).until(condition)
except TimeoutException:
logger.warning(f'timeout waiting for element "{wait_for}" at {url}')
logger.info(f'url {url} fetched')
webpage = self._driver.execute_script("return document.getElementsByTagName('html')[0].innerHTML")
return webpage
def add_shop(self, products):
for product in products:
product.shop = self._shop
return products
class TopAchatCrawler(ProductCrawler):
def __init__(self, shop, urls):
super().__init__(shop)
parser = TopAchatParser()
for url in urls:
webpage = self.fetch(url=url)
parser.feed(webpage)
self.products += self.add_shop(parser.products)
class LDLCCrawler(ProductCrawler):
def __init__(self, shop, urls):
super().__init__(shop)
parser = LDLCParser()
for url in urls:
next_page = url
previous_page = None
while next_page != previous_page:
webpage = self.fetch(url=next_page)
parser.feed(webpage)
previous_page = next_page
next_page = parser.next_page
self.products += self.add_shop(parser.products)
class MaterielNetCrawler(ProductCrawler):
def __init__(self, shop, urls):
super().__init__(shop)
parser = MaterielNetParser()
for url in urls:
next_page = url
previous_page = None
while next_page != previous_page:
webpage = self.fetch(url=next_page, wait_for='o-product__price')
parser.feed(webpage)
previous_page = next_page
next_page = parser.next_page
self.products += self.add_shop(parser.products)
class AlternateCrawler(ProductCrawler):
def __init__(self, shop, urls):
super().__init__(shop)
parser = AlternateParser()
for url in urls:
webpage = self.fetch(url=url)
parser.feed(webpage)
self.products += self.add_shop(parser.products)

119
db.py Normal file
View file

@ -0,0 +1,119 @@
import logging
from datetime import datetime
from sqlalchemy import (Boolean, Column, DateTime, Float, ForeignKey, Integer,
String, create_engine, exc)
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship, sessionmaker
logger = logging.getLogger(__name__)
Base = declarative_base()
engine = create_engine('sqlite:///restock.db')
Session = sessionmaker(bind=engine, autoflush=False)
class Shop(Base):
__tablename__ = 'shop'
id = Column(Integer, primary_key=True)
name = Column(String, unique=True, nullable=False)
def __repr__(self):
return f'Shop<{self.name}>'
def __ne__(self, shop):
return self.name != shop.name
class Product(Base):
__tablename__ = 'product'
id = Column(Integer, primary_key=True)
name = Column(String, nullable=False)
url = Column(String, nullable=False, unique=True)
price = Column(Float, nullable=False)
price_currency = Column(String, nullable=False)
available = Column(Boolean, nullable=False)
updated_at = Column(DateTime)
tweet_id = Column(Integer, unique=True)
shop_id = Column(Integer, ForeignKey('shop.id'), nullable=False)
shop = relationship('Shop', foreign_keys=[shop_id])
def __repr__(self):
return f'Product<{self.name}@{self.shop.name}>'
def __ne__(self, product):
return self.name != product.name or self.price != product.price or self.available != product.available \
or self.url != product.url or self.shop != product.shop
def ok(self):
return self.name and self.url and self.price and self.price_currency and self.available is not None
def create_tables():
Base.metadata.create_all(engine)
logger.debug('tables created')
def list_shops():
session = Session()
shops = session.query(Shop).all()
session.close()
return shops
def upsert_shops(names):
session = Session()
try:
for name in names:
shop = Shop(name=name)
query = session.query(Shop).filter(Shop.name == shop.name)
shop_database = query.first()
if not shop_database:
logger.info(f'{shop} added')
session.add(shop)
session.commit()
logger.debug('transaction committed')
except exc.SQLAlchemyError:
logger.exception('cannot commit transaction')
finally:
session.close()
def upsert_products(products, notifier=None):
session = Session()
try:
for product in products:
query = session.query(Product).filter(Product.name == product.name, Product.shop == product.shop)
product_database = query.first()
now = datetime.utcnow()
tweet_id = None
if not product_database:
# product is new and available so we need to create an initial thread
if notifier and product.available:
product.tweet_id = notifier.create_thread(product).id
product.updated_at = now
session.add(product)
logger.info(f'{product} added')
elif product != product_database:
# notifications
if notifier and product.available != product_database.available:
if product.available and not product_database.tweet_id:
# product is now available so we need to create an initial tweet (or thread)
tweet = notifier.create_thread(product)
if tweet:
tweet_id = tweet.id
elif not product.available and product_database.available and product_database.tweet_id:
# product is out of stock so we need to reply to previous tweet to close the thread
notifier.close_thread(tweet_id=product_database.tweet_id,
duration=now-product_database.updated_at)
query.update({Product.price: product.price, Product.price_currency: product.price_currency,
Product.available: product.available, Product.url: product.url,
Product.tweet_id: tweet_id, Product.updated_at: now})
logger.info(f'{product} updated')
session.commit()
logger.debug('transaction committed')
except exc.SQLAlchemyError:
logger.exception('cannot commit transaction')
finally:
session.close()

71
main.py Normal file
View file

@ -0,0 +1,71 @@
#!/usr/bin/env python3
import argparse
import logging
from config import extract_shops, read_config
from crawlers import (AlternateCrawler, LDLCCrawler, MaterielNetCrawler,
TopAchatCrawler)
from db import create_tables, list_shops, upsert_products, upsert_shops
from notifiers import TwitterNotifier
logger = logging.getLogger(__name__)
def parse_arguments():
parser = argparse.ArgumentParser()
parser.add_argument('-v', '--verbose', dest='loglevel', action='store_const', const=logging.INFO,
help='print more output')
parser.add_argument('-d', '--debug', dest='loglevel', action='store_const', const=logging.DEBUG,
default=logging.WARNING, help='print even more output')
parser.add_argument('-o', '--logfile', help='logging file location')
parser.add_argument('-c', '--config', default='config.json', help='configuration file location')
parser.add_argument('-N', '--disable-notifications', dest='disable_notifications', action='store_true',
help='Do not send notifications')
args = parser.parse_args()
return args
def setup_logging(args):
log_format = '%(asctime)s %(levelname)s: %(message)s' if args.logfile else '%(levelname)s: %(message)s'
logging.basicConfig(format=log_format, level=args.loglevel, filename=args.logfile)
def main():
args = parse_arguments()
setup_logging(args)
config = read_config(args.config)
create_tables()
shops = extract_shops(config['urls'])
upsert_shops(shops.keys())
if args.disable_notifications:
notifier = None
else:
notifier = TwitterNotifier(consumer_key=config['twitter']['consumer_key'],
consumer_secret=config['twitter']['consumer_secret'],
access_token=config['twitter']['access_token'],
access_token_secret=config['twitter']['access_token_secret'])
for shop in list_shops():
logger.debug(f'processing {shop}')
urls = shops.get(shop.name)
if not urls:
logger.warning(f'cannot find urls for shop {shop} in the configuration file')
continue
if shop.name == 'topachat.com':
crawler = TopAchatCrawler(shop=shop, urls=urls)
elif shop.name == 'ldlc.com':
crawler = LDLCCrawler(shop=shop, urls=urls)
elif shop.name == 'materiel.net':
crawler = MaterielNetCrawler(shop=shop, urls=urls)
elif shop.name == 'alternate.be':
crawler = AlternateCrawler(shop=shop, urls=urls)
else:
logger.warning(f'shop {shop} not supported')
continue
upsert_products(products=crawler.products, notifier=notifier)
if __name__ == '__main__':
main()

57
notifiers.py Normal file
View file

@ -0,0 +1,57 @@
import logging
import tweepy
from utils import format_timedelta
logger = logging.getLogger(__name__)
class TwitterNotifier(object):
_hashtags_map = {
'rtx 3060 ti': ['#nvidia', '#rtx3060ti'],
'rtx 3070': ['#nvidia', '#rtx3070'],
'rtx 3080': ['#nvidia', '#rtx3080'],
'rtx 3090': ['#nvidia', '#rtx3090'],
'rx 6800 xt': ['#amd', '#rx6800xt'],
'rx 6800': ['#amd', '#rx6800'],
}
_currency_map = {
'EUR': ''
}
def __init__(self, consumer_key, consumer_secret, access_token, access_token_secret):
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)
self._api = tweepy.API(auth)
def create_thread(self, product):
currency_sign = self._currency_map[product.price_currency]
shop_name = product.shop.name
price = f'{product.price}{currency_sign}'
message = f'{shop_name}: {product.name} for {price} is available at {product.url}'
hashtags = self._parse_hashtags(product)
if hashtags:
message += f' {hashtags}'
return self._create_tweet(message=message)
def close_thread(self, tweet_id, duration):
thread = self._api.get_status(id=tweet_id)
duration = format_timedelta(duration, '{hours_total}h{minutes2}m')
message = f'''@{thread.user.screen_name} And it's over ({duration})'''
return self._create_tweet(message=message, tweet_id=tweet_id)
def _create_tweet(self, message, tweet_id=None):
try:
tweet = self._api.update_status(status=message, in_reply_to_status_id=tweet_id)
logger.info(f'tweet {tweet.id} sent with message "{message}"')
return tweet
except tweepy.error.TweepError as err:
logger.warning('cannot send tweet with message "{message}"')
logger.warning(str(err))
def _parse_hashtags(self, product):
for patterns in self._hashtags_map:
if all(elem in product.name.lower().split(' ') for elem in patterns.split(' ')):
return ' '.join(self._hashtags_map[patterns])

403
parsers.py Normal file
View file

@ -0,0 +1,403 @@
import logging
from html.parser import HTMLParser
from db import Product
from utils import parse_base_url
logger = logging.getLogger(__name__)
# Parsers definitively need to be replaced by beautifulsoup because the code is not maintainable
class ProductParser(HTMLParser):
def __init__(self):
super().__init__()
self.products = []
self.next_page = None
class TopAchatParser(ProductParser):
def __init__(self, url=None):
super().__init__()
self._parsing_article = False
self._parsing_availability = False
self._parsing_price = False
self._parsing_price_currency = False
self._parsing_name = False
self._parsing_url = False
self._product = Product()
if url:
self._base_url = parse_base_url(url)
else:
self._base_url = 'https://www.topachat.com'
@staticmethod
def parse_name(data):
return data.split(' + ')[0].strip()
def handle_starttag(self, tag, attrs):
if tag == 'article':
for name, value in attrs:
if 'grille-produit' in value.split(' '):
self._parsing_article = True
elif self._parsing_article:
if tag == 'link':
for name, value in attrs:
if name == 'itemprop' and value == 'availability':
self._parsing_availability = True
elif self._parsing_availability and name == 'href':
self._product.available = value != 'http://schema.org/OutOfStock'
elif tag == 'div':
for name, value in attrs:
if name == 'itemprop' and value == 'price':
self._parsing_price = True
elif self._parsing_price and name == 'content':
self._product.price = float(value)
elif name == 'class' and value == 'libelle':
self._parsing_url = True
self._parsing_name = True
elif tag == 'meta':
for name, value in attrs:
if name == 'itemprop' and value == 'priceCurrency':
self._parsing_price_currency = True
elif self._parsing_price_currency and name == 'content':
self._product.price_currency = value
elif tag == 'a':
for name, value in attrs:
if self._parsing_url and name == 'href':
self._product.url = f'{self._base_url}{value}'
def handle_data(self, data):
if self._parsing_name and self.get_starttag_text().startswith('<h3>') and not self._product.name:
self._product.name = self.parse_name(data)
self._parsing_name = False
def handle_endtag(self, tag):
if self._parsing_article and tag == 'article':
self._parsing_article = False
self.products.append(self._product)
self._product = Product()
elif self._parsing_availability and tag == 'link':
self._parsing_availability = False
elif self._parsing_price and tag == 'div':
self._parsing_price = False
elif self._parsing_price_currency and tag == 'meta':
self._parsing_price_currency = False
class LDLCParser(ProductParser):
def __init__(self, url=None):
super().__init__()
self._product = Product()
self.__parsing_pdt_item = False
self.__parsing_pdt_id = False
self._parsing_title = False
self.__parsing_pagination = False
self.__parsing_next_page_section = False
self._parsing_stock = False
self._parsing_price = False
if url:
self._base_url = parse_base_url(url)
else:
self._base_url = 'https://www.ldlc.com'
@property
def _parsing_item(self):
return self.__parsing_pdt_item and self.__parsing_pdt_id
@property
def _parsing_next_page(self):
return self.__parsing_pagination and self.__parsing_next_page_section
@staticmethod
def parse_price(string):
currency = None
if '' in string:
currency = 'EUR'
price = int(''.join([i for i in string if i.isdigit()]))
return price, currency
def handle_starttag(self, tag, attrs):
if not self._parsing_item and tag == 'li' and not self.__parsing_pagination:
for name, value in attrs:
if name == 'class' and value == 'pdt-item':
self.__parsing_pdt_item = True
elif name == 'id' and value.startswith('pdt-'):
self.__parsing_pdt_id = True
elif not self.__parsing_pagination and tag == 'ul':
for name, value in attrs:
if name == 'class' and value == 'pagination':
self.__parsing_pagination = True
elif self.__parsing_pagination and tag == 'li':
for name, value in attrs:
if name == 'class' and value == 'next':
self.__parsing_next_page_section = True
elif self._parsing_next_page and tag == 'a':
for name, value in attrs:
if name == 'href':
self.next_page = f'{self._base_url}{value}'
elif self._parsing_item:
if tag == 'h3':
self._parsing_title = True
elif self._parsing_title and tag == 'a':
for name, value in attrs:
if name == 'href':
self._product.url = f'{self._base_url}{value}'
elif tag == 'div':
for name, value in attrs:
if not self._parsing_stock and name == 'class' and 'modal-stock-web' in value.split(' '):
self._parsing_stock = True
elif not self._parsing_price and name == 'class' and value == 'price':
self._parsing_price = True
def handle_data(self, data):
last_tag = self.get_starttag_text()
if self._parsing_title and not self._product.name and last_tag.startswith('<a'):
self._product.name = data.strip()
elif self._parsing_stock and self._product.available is None and last_tag.startswith('<span>'):
self._product.available = data.strip() != 'Rupture'
elif self._parsing_price:
if last_tag.startswith('<div'):
self._product.price, self._product.price_currency = self.parse_price(data)
elif last_tag.startswith('<sup>'):
self._product.price += int(data) / 100
def handle_endtag(self, tag):
if self._parsing_item and tag == 'li':
self.__parsing_pdt_item = False
self.__parsing_pdt_id = False
self.products.append(self._product)
self._product = Product()
elif self._parsing_title and tag == 'h3':
self._parsing_title = False
elif self._parsing_stock and tag == 'span':
self._parsing_stock = False
elif self._parsing_price and tag == 'div':
self._parsing_price = False
elif self.__parsing_pagination and tag == 'ul':
self.__parsing_pagination = False
elif self.__parsing_next_page_section and tag == 'a':
self.__parsing_next_page_section = False
class MaterielNetParser(ProductParser):
def __init__(self, url=None):
super().__init__()
self._product = Product()
self._parsing_product = False
self._parsing_product_meta = False
self._parsing_title = False
self.__parsing_product_availability = False
self.__stock_web_id = None
self._parsing_availability = False
self.__parsing_price_category = False
self.__parsing_price_objects = False
self._parsing_price = False
self._parsing_pagination = False
self.__active_page_found = False
self.__parsing_next_page = False
self._pagination_parsed = False
if url:
self._base_url = parse_base_url(url)
else:
self._base_url = 'https://www.materiel.net'
@property
def _parsing_web_availability(self):
return self.__parsing_product_availability and self.__stock_web_id
def _close_availability_parsing(self):
self._parsing_availability = False
self.__stock_web_id = None
self.__parsing_product_availability = False
def _close_product_meta_parsing(self):
self._parsing_product_meta = False
def _close_title_parsing(self):
self._parsing_title = False
def _close_price_parsing(self):
self.__parsing_price_category = False
self.__parsing_price_objects = False
self._parsing_price = False
def _close_product_parsing(self):
self._parsing_product = False
self.products.append(self._product)
self._product = Product()
def _close_pagination_parsing(self):
self._parsing_pagination = False
self._pagination_parsed = True
@staticmethod
def parse_price(string):
currency = None
if '' in string:
currency = 'EUR'
price = int(''.join([i for i in string if i.isdigit()]))
return price, currency
def handle_starttag(self, tag, attrs):
if not self._parsing_product and tag == 'li':
for name, value in attrs:
if name == 'class' and 'ajax-product-item' in value.split(' '):
self._parsing_product = True
if not self._parsing_product_meta and tag == 'div':
for name, value in attrs:
if name == 'class' and value == 'c-product__meta':
self._parsing_product_meta = True
elif self._parsing_product_meta:
if tag == 'a':
for name, value in attrs:
if name == 'href':
self._product.url = f'{self._base_url}{value}'
elif tag == 'h2':
for name, value in attrs:
if name == 'class' and value == 'c-product__title':
self._parsing_title = True
if tag == 'div':
for name, value in attrs:
if not self.__parsing_product_availability and name == 'class' and value == 'c-product__availability':
self.__parsing_product_availability = True
elif self.__parsing_product_availability and name == 'data-stock-web':
self.__stock_web_id = value
elif tag == 'span' and self._parsing_web_availability:
for name, value in attrs:
availability_class_name = f'o-availability__value--stock_{self.__stock_web_id}'
if name == 'class' and availability_class_name in value.split(' '):
self._parsing_availability = True
if not self.__parsing_price_objects and tag == 'div':
for name, value in attrs:
if not self.__parsing_price_category and name == 'class' and value == 'c-product__prices':
self.__parsing_price_category = True
elif self.__parsing_price_category and name == 'class' and 'o-product__prices' in value.split(' '):
self.__parsing_price_objects = True
elif self.__parsing_price_objects and tag == 'span':
for name, value in attrs:
if name == 'class' and value == 'o-product__price':
self._parsing_price = True
if not self._pagination_parsed:
if not self._parsing_pagination and tag == 'ul':
for name, value in attrs:
if name == 'class' and value == 'pagination':
self._parsing_pagination = True
elif self._parsing_pagination and tag == 'li':
for name, value in attrs:
values = value.split(' ')
if not self.__active_page_found and name == 'class' and 'page-item' in values \
and 'active' in values:
self.__active_page_found = True
elif self.__active_page_found and name == 'class' and 'page-item' in values:
self.__parsing_next_page = True
elif self.__parsing_next_page and tag == 'a':
for name, value in attrs:
if name == 'href':
self.next_page = f'{self._base_url}{value}'
self.__parsing_next_page = False
self._pagination_parsed = True
def handle_endtag(self, tag):
if self._parsing_product_meta and tag == 'div':
self._close_product_meta_parsing()
elif self._parsing_product and tag == 'li':
self._close_product_parsing()
elif self._parsing_pagination and tag == 'ul':
self._close_pagination_parsing()
def handle_data(self, data):
last_tag = self.get_starttag_text()
if self._parsing_title and last_tag.startswith('<h2'):
self._product.name = data
self._close_title_parsing()
elif self._parsing_availability and last_tag.startswith('<span'):
self._product.available = data != 'Rupture'
self._close_availability_parsing()
elif self._parsing_price:
if last_tag.startswith('<span'):
self._product.price, self._product.price_currency = self.parse_price(data)
elif last_tag.startswith('<sup>'):
self._product.price += int(data) / 100
self._close_price_parsing()
class AlternateParser(ProductParser):
def __init__(self, url=None):
super().__init__()
self._product = Product()
if url:
self._base_url = parse_base_url(url)
else:
self._base_url = 'https://www.alternate.be'
self._parsing_row = False
self._parsing_name = False
self._parsing_price = False
def handle_starttag(self, tag, attrs):
if not self._parsing_row and tag == 'div':
for name, value in attrs:
if name == 'class' and value == 'listRow':
self._parsing_row = True
elif self._parsing_row:
if tag == 'a':
for name, value in attrs:
if name == 'href' and not self._product.url:
self._product.url = self.parse_url(value)
elif tag == 'span':
if not self._parsing_name:
for name, value in attrs:
if name == 'class':
if value == 'name':
self._parsing_name = True
elif self._parsing_name:
for name, value in attrs:
if name == 'class' and value == 'additional':
self._parsing_name = False
if not self._parsing_price:
for name, value in attrs:
if name == 'class' and 'price' in value.split(' '):
self._parsing_price = True
elif tag == 'strong':
for name, value in attrs:
if name == 'class' and 'stockStatus' in value.split(' '):
values = value.split(' ')
available = 'available_unsure' not in values and 'preorder' not in values
self._product.available = available
def handle_data(self, data):
if self._parsing_name:
data = data.replace('grafische kaart', '').strip()
if data:
if not self._product.name:
self._product.name = data
else:
self._product.name += f' {data}'
elif self._parsing_price:
price, currency = self.parse_price(data)
if price and currency:
self._product.price = price
self._product.price_currency = currency
self._parsing_price = False
def handle_endtag(self, tag):
if tag == 'span' and self._parsing_price:
self._parsing_price = False
elif tag == 'div' and self._parsing_row and self._product.ok():
self._parsing_row = False
self.products.append(self._product)
self._product = Product()
@staticmethod
def parse_price(string):
currency = None
if '' in string:
currency = 'EUR'
price = int(''.join([i for i in string if i.isdigit()]))
return price, currency
def parse_url(self, string):
string = string.split('?')[0] # remove query string
return f'{self._base_url}{string}'

46
twitter_auth.py Normal file
View file

@ -0,0 +1,46 @@
#!/usr/bin/env python3
import json
from urllib.parse import urlparse
import tweepy
def main():
with open('config.json', 'r') as fd:
config = json.load(fd)
if 'access_token' in config['twitter'] and 'access_token_secret' in config['twitter']:
access_token = config['twitter']['access_token']
access_token_secret = config['twitter']['access_token_secret']
else:
consumer_key = config['twitter']['consumer_key']
consumer_secret = config['twitter']['consumer_secret']
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
try:
redirect_url = auth.get_authorization_url()
print(f'Please go to {redirect_url}')
except tweepy.TweepError:
print('Failed to get request token')
token = urlparse(redirect_url).query.split('=')[1]
verifier = input('Verifier:')
auth.request_token = {'oauth_token': token, 'oauth_token_secret': verifier}
try:
auth.get_access_token(verifier)
except tweepy.TweepError:
print('Failed to get access token')
access_token = auth.access_token
access_token_secret = auth.access_token_secret
print(f'access_token = {access_token}')
print(f'access_token_secret = {access_token_secret}')
if __name__ == '__main__':
main()

53
utils.py Normal file
View file

@ -0,0 +1,53 @@
from math import floor
from urllib.parse import urlparse
def format_timedelta(value, time_format='{days} days, {hours2}:{minutes2}:{seconds2}'):
"""
Taken from https://github.com/frnhr/django_timedeltatemplatefilter and
https://github.com/frnhr/django_timedeltatemplatefilter
"""
if hasattr(value, 'seconds'):
seconds = value.seconds + value.days * 24 * 3600
else:
seconds = int(value)
seconds_total = seconds
minutes = int(floor(seconds / 60))
minutes_total = minutes
seconds -= minutes * 60
hours = int(floor(minutes / 60))
hours_total = hours
minutes -= hours * 60
days = int(floor(hours / 24))
days_total = days
hours -= days * 24
years = int(floor(days / 365))
years_total = years
days -= years * 365
return time_format.format(**{
'seconds': seconds,
'seconds2': str(seconds).zfill(2),
'minutes': minutes,
'minutes2': str(minutes).zfill(2),
'hours': hours,
'hours2': str(hours).zfill(2),
'days': days,
'years': years,
'seconds_total': seconds_total,
'minutes_total': minutes_total,
'hours_total': hours_total,
'days_total': days_total,
'years_total': years_total,
})
def parse_base_url(url, include_scheme=True):
result = urlparse(url)
base_url = f'{result.scheme}://{result.netloc}' if include_scheme else result.netloc.replace('www.', '')
return base_url