From 0b0d2727e86024de48f14606597d92ac2b59fc9e Mon Sep 17 00:00:00 2001 From: Julien Riou Date: Sun, 27 Dec 2020 18:27:07 +0100 Subject: [PATCH] Initial release --- .gitignore | 6 + .pre-commit-config.yaml | 29 +++ .pydocstyle | 2 + README.md | 89 +++++++++ config.json.example | 18 ++ config.py | 24 +++ crawlers.py | 94 ++++++++++ db.py | 119 ++++++++++++ main.py | 71 +++++++ notifiers.py | 57 ++++++ parsers.py | 403 ++++++++++++++++++++++++++++++++++++++++ twitter_auth.py | 46 +++++ utils.py | 53 ++++++ 13 files changed, 1011 insertions(+) create mode 100644 .gitignore create mode 100644 .pre-commit-config.yaml create mode 100644 .pydocstyle create mode 100644 README.md create mode 100644 config.json.example create mode 100644 config.py create mode 100644 crawlers.py create mode 100644 db.py create mode 100644 main.py create mode 100644 notifiers.py create mode 100644 parsers.py create mode 100644 twitter_auth.py create mode 100644 utils.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..4cb714b --- /dev/null +++ b/.gitignore @@ -0,0 +1,6 @@ +__pycache__ +config.json +*.html +TODO.txt +restock.db +geckodriver.log diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 0000000..8e71337 --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,29 @@ +--- +repos: + - repo: https://github.com/pre-commit/pre-commit-hooks + rev: master + hooks: + - id: check-executables-have-shebangs + - id: check-merge-conflict + - id: double-quote-string-fixer + - id: end-of-file-fixer + - id: fix-encoding-pragma + args: ['--remove'] + - id: requirements-txt-fixer + - id: trailing-whitespace + - id: check-json + - repo: https://gitlab.com/pycqa/flake8 + rev: master + hooks: + - id: flake8 + args: ['--max-line-length=120'] + - repo: https://github.com/FalconSocial/pre-commit-python-sorter + rev: master + hooks: + - id: python-import-sorter + args: ['--silent-overwrite'] + - repo: https://github.com/chewse/pre-commit-mirrors-pydocstyle + rev: master + hooks: + - id: pydocstyle + args: ['--config=.pydocstyle', '--match="(?!test_).*\.py"'] diff --git a/.pydocstyle b/.pydocstyle new file mode 100644 index 0000000..aef2483 --- /dev/null +++ b/.pydocstyle @@ -0,0 +1,2 @@ +[pydocstyle] +ignore = D100,D104,D400,D203,D204,D101,D213,D202 diff --git a/README.md b/README.md new file mode 100644 index 0000000..950f47d --- /dev/null +++ b/README.md @@ -0,0 +1,89 @@ +Year 2020 has been quite hard for hardware supply. Graphics Cards are out of stock everywhere. Nobody can grab the +new generation (AMD RX 6000 series, NVIDIA GeForce RTX 3000 series). Even older generations are hard to find. +**GraphicRestock** is a bot that crawl retailers websites and notify when a product is available. + +# Setup + +Based on Debian 10: + +``` +apt install python3-selenium python3-sqlalchemy python3-tweepy firefox-esr +curl -L -s https://github.com/mozilla/geckodriver/releases/download/v0.28.0/geckodriver-v0.28.0-linux64.tar.gz | tar xvpzf - -C /usr/local/bin/ +chown root:root /usr/local/bin/geckodriver +chmod +x /usr/local/bin/geckodriver +``` + +# Configure + +Configuration file example can be found [here](config.json.example). + +Options: +* **twitter.consumer_key**: key of your Twitter application +* **twitter.consumer_secret**: secret of your Twitter application +* **twitter.access_token**: authentication token generated by [twitter_auth.py](twitter_auth.py) +* **twitter.access_token_secret**: authentication token secret generated by [twitter_auth.py](twitter_auth.py) +* **urls**: list of retailers web pages (they need to respect crawlers' format) +* **executable_path** (optional): path to selenium driver (firefox/gecko browser) + + +# Twitter authentication + +Create a configuration file with **twitter.consumer_key** and **twitter.consumer_secret** parameters. + +Then authenticate: + +``` +python3 twitter_auth.py +``` + +You will have to open the URL and authenticate: + +``` +Please go to https://api.twitter.com/oauth/authorize?oauth_token=**** + +``` +Click on **Authorize app**. A verifier code will be shown. Go back to your console and enter the code. + +``` +Verifier:******* +``` + +Tokens will be created: + +``` +access_token = ***** +access_token_secret = **** +``` + +Finally, write them to configuration file in **twitter.access_token** and **twitter.access_token_secret** parameters. + + +# Usage + +``` +python3 main.py --help +``` + +# How to contribute + +First things first, check issues to ensure the feature or bug you are facing is not already declared. + +Pull requests are highly appreciated. + +Please lint your code: + +``` +docker run -it -v $(pwd):/mnt/ --rm debian:10 bash +apt-get update && apt-get upgrade -y && apt-get install -y python3-pip git +pip3 install pre-commit +cd /mnt +pre-commit run --all-files +``` + +Happy coding! + + +# Disclaimer + +Crawling a website should be used with caution. Please check with retailers if the bot respects the terms of use for +their websites. Authors of the bot are not responsible of the bot usage. diff --git a/config.json.example b/config.json.example new file mode 100644 index 0000000..2085547 --- /dev/null +++ b/config.json.example @@ -0,0 +1,18 @@ +{ + "twitter": { + "consumer_key": "***", + "consumer_secret": "***", + "access_token": "***", + "access_token_secret": "***" + }, + "urls": [ + "https://www.topachat.com/pages/produits_cat_est_micro_puis_rubrique_est_wgfx_pcie_puis_f_est_58-11447,11445,11446,11559,11558.html", + "https://www.ldlc.com/informatique/pieces-informatique/carte-graphique-interne/c4684/+fv121-19183,19184,19185,19339,19340.html", + "https://www.materiel.net/carte-graphique/l426/+fv121-19183,19184,19185,19339,19340/", + "https://www.alternate.be/Hardware/Grafische-kaarten/NVIDIA/RTX-3060-Ti", + "https://www.alternate.be/Hardware/Grafische-kaarten/NVIDIA/RTX-3070", + "https://www.alternate.be/Hardware/Grafische-kaarten/NVIDIA/RTX-3080", + "https://www.alternate.be/Hardware/Grafische-kaarten/NVIDIA/RTX-3090" + ], + "executable_path": "/usr/bin/geckodriver" +} diff --git a/config.py b/config.py new file mode 100644 index 0000000..d354f41 --- /dev/null +++ b/config.py @@ -0,0 +1,24 @@ +import json + +from utils import parse_base_url + + +def read_config(filename): + with open(filename, 'r') as fd: + return json.load(fd) + + +def extract_shops(urls): + """ + Parse shop name and return list of addresses for each shop + Example: {"toto.com/first", "toto.com/second", "tata.com/first"} + -> {"toto.com": ["toto.com/first", "toto.com/second"], "tata.com": ["tata.com/first"]} + """ + result = {} + for url in urls: + base_url = parse_base_url(url, include_scheme=False) + if base_url not in result: + result[base_url] = [url] + else: + result[base_url].append(url) + return result diff --git a/crawlers.py b/crawlers.py new file mode 100644 index 0000000..a04021c --- /dev/null +++ b/crawlers.py @@ -0,0 +1,94 @@ +import logging + +from parsers import (AlternateParser, LDLCParser, MaterielNetParser, + TopAchatParser) +from selenium import webdriver +from selenium.common.exceptions import TimeoutException +from selenium.webdriver.common.by import By +from selenium.webdriver.firefox.options import Options +from selenium.webdriver.support import expected_conditions +from selenium.webdriver.support.ui import WebDriverWait + +logger = logging.getLogger(__name__) + + +class ProductCrawler(object): + + TIMEOUT = 3 + + def __init__(self, shop): + options = Options() + options.headless = True + self._driver = webdriver.Firefox(executable_path='/usr/local/bin/geckodriver', options=options) + self._shop = shop + self.products = [] + + def __del__(self): + self._driver.quit() + + def fetch(self, url, wait_for=None): + self._driver.get(url) + if wait_for: + try: + condition = expected_conditions.presence_of_element_located((By.CLASS_NAME, wait_for)) + WebDriverWait(self._driver, self.TIMEOUT).until(condition) + except TimeoutException: + logger.warning(f'timeout waiting for element "{wait_for}" at {url}') + logger.info(f'url {url} fetched') + webpage = self._driver.execute_script("return document.getElementsByTagName('html')[0].innerHTML") + return webpage + + def add_shop(self, products): + for product in products: + product.shop = self._shop + return products + + +class TopAchatCrawler(ProductCrawler): + def __init__(self, shop, urls): + super().__init__(shop) + parser = TopAchatParser() + for url in urls: + webpage = self.fetch(url=url) + parser.feed(webpage) + self.products += self.add_shop(parser.products) + + +class LDLCCrawler(ProductCrawler): + def __init__(self, shop, urls): + super().__init__(shop) + parser = LDLCParser() + for url in urls: + next_page = url + previous_page = None + while next_page != previous_page: + webpage = self.fetch(url=next_page) + parser.feed(webpage) + previous_page = next_page + next_page = parser.next_page + self.products += self.add_shop(parser.products) + + +class MaterielNetCrawler(ProductCrawler): + def __init__(self, shop, urls): + super().__init__(shop) + parser = MaterielNetParser() + for url in urls: + next_page = url + previous_page = None + while next_page != previous_page: + webpage = self.fetch(url=next_page, wait_for='o-product__price') + parser.feed(webpage) + previous_page = next_page + next_page = parser.next_page + self.products += self.add_shop(parser.products) + + +class AlternateCrawler(ProductCrawler): + def __init__(self, shop, urls): + super().__init__(shop) + parser = AlternateParser() + for url in urls: + webpage = self.fetch(url=url) + parser.feed(webpage) + self.products += self.add_shop(parser.products) diff --git a/db.py b/db.py new file mode 100644 index 0000000..2c51293 --- /dev/null +++ b/db.py @@ -0,0 +1,119 @@ +import logging +from datetime import datetime + +from sqlalchemy import (Boolean, Column, DateTime, Float, ForeignKey, Integer, + String, create_engine, exc) +from sqlalchemy.ext.declarative import declarative_base +from sqlalchemy.orm import relationship, sessionmaker + +logger = logging.getLogger(__name__) + + +Base = declarative_base() +engine = create_engine('sqlite:///restock.db') +Session = sessionmaker(bind=engine, autoflush=False) + + +class Shop(Base): + __tablename__ = 'shop' + id = Column(Integer, primary_key=True) + name = Column(String, unique=True, nullable=False) + + def __repr__(self): + return f'Shop<{self.name}>' + + def __ne__(self, shop): + return self.name != shop.name + + +class Product(Base): + __tablename__ = 'product' + id = Column(Integer, primary_key=True) + name = Column(String, nullable=False) + url = Column(String, nullable=False, unique=True) + price = Column(Float, nullable=False) + price_currency = Column(String, nullable=False) + available = Column(Boolean, nullable=False) + updated_at = Column(DateTime) + tweet_id = Column(Integer, unique=True) + shop_id = Column(Integer, ForeignKey('shop.id'), nullable=False) + shop = relationship('Shop', foreign_keys=[shop_id]) + + def __repr__(self): + return f'Product<{self.name}@{self.shop.name}>' + + def __ne__(self, product): + return self.name != product.name or self.price != product.price or self.available != product.available \ + or self.url != product.url or self.shop != product.shop + + def ok(self): + return self.name and self.url and self.price and self.price_currency and self.available is not None + + +def create_tables(): + Base.metadata.create_all(engine) + logger.debug('tables created') + + +def list_shops(): + session = Session() + shops = session.query(Shop).all() + session.close() + return shops + + +def upsert_shops(names): + session = Session() + try: + for name in names: + shop = Shop(name=name) + query = session.query(Shop).filter(Shop.name == shop.name) + shop_database = query.first() + if not shop_database: + logger.info(f'{shop} added') + session.add(shop) + session.commit() + logger.debug('transaction committed') + except exc.SQLAlchemyError: + logger.exception('cannot commit transaction') + finally: + session.close() + + +def upsert_products(products, notifier=None): + session = Session() + try: + for product in products: + query = session.query(Product).filter(Product.name == product.name, Product.shop == product.shop) + product_database = query.first() + now = datetime.utcnow() + tweet_id = None + if not product_database: + # product is new and available so we need to create an initial thread + if notifier and product.available: + product.tweet_id = notifier.create_thread(product).id + product.updated_at = now + session.add(product) + logger.info(f'{product} added') + elif product != product_database: + # notifications + if notifier and product.available != product_database.available: + if product.available and not product_database.tweet_id: + # product is now available so we need to create an initial tweet (or thread) + tweet = notifier.create_thread(product) + if tweet: + tweet_id = tweet.id + elif not product.available and product_database.available and product_database.tweet_id: + # product is out of stock so we need to reply to previous tweet to close the thread + notifier.close_thread(tweet_id=product_database.tweet_id, + duration=now-product_database.updated_at) + query.update({Product.price: product.price, Product.price_currency: product.price_currency, + Product.available: product.available, Product.url: product.url, + Product.tweet_id: tweet_id, Product.updated_at: now}) + logger.info(f'{product} updated') + session.commit() + logger.debug('transaction committed') + except exc.SQLAlchemyError: + logger.exception('cannot commit transaction') + finally: + session.close() diff --git a/main.py b/main.py new file mode 100644 index 0000000..9b8d621 --- /dev/null +++ b/main.py @@ -0,0 +1,71 @@ +#!/usr/bin/env python3 +import argparse +import logging + +from config import extract_shops, read_config +from crawlers import (AlternateCrawler, LDLCCrawler, MaterielNetCrawler, + TopAchatCrawler) +from db import create_tables, list_shops, upsert_products, upsert_shops +from notifiers import TwitterNotifier + +logger = logging.getLogger(__name__) + + +def parse_arguments(): + parser = argparse.ArgumentParser() + parser.add_argument('-v', '--verbose', dest='loglevel', action='store_const', const=logging.INFO, + help='print more output') + parser.add_argument('-d', '--debug', dest='loglevel', action='store_const', const=logging.DEBUG, + default=logging.WARNING, help='print even more output') + parser.add_argument('-o', '--logfile', help='logging file location') + parser.add_argument('-c', '--config', default='config.json', help='configuration file location') + parser.add_argument('-N', '--disable-notifications', dest='disable_notifications', action='store_true', + help='Do not send notifications') + args = parser.parse_args() + return args + + +def setup_logging(args): + log_format = '%(asctime)s %(levelname)s: %(message)s' if args.logfile else '%(levelname)s: %(message)s' + logging.basicConfig(format=log_format, level=args.loglevel, filename=args.logfile) + + +def main(): + args = parse_arguments() + setup_logging(args) + config = read_config(args.config) + create_tables() + + shops = extract_shops(config['urls']) + upsert_shops(shops.keys()) + + if args.disable_notifications: + notifier = None + else: + notifier = TwitterNotifier(consumer_key=config['twitter']['consumer_key'], + consumer_secret=config['twitter']['consumer_secret'], + access_token=config['twitter']['access_token'], + access_token_secret=config['twitter']['access_token_secret']) + + for shop in list_shops(): + logger.debug(f'processing {shop}') + urls = shops.get(shop.name) + if not urls: + logger.warning(f'cannot find urls for shop {shop} in the configuration file') + continue + if shop.name == 'topachat.com': + crawler = TopAchatCrawler(shop=shop, urls=urls) + elif shop.name == 'ldlc.com': + crawler = LDLCCrawler(shop=shop, urls=urls) + elif shop.name == 'materiel.net': + crawler = MaterielNetCrawler(shop=shop, urls=urls) + elif shop.name == 'alternate.be': + crawler = AlternateCrawler(shop=shop, urls=urls) + else: + logger.warning(f'shop {shop} not supported') + continue + upsert_products(products=crawler.products, notifier=notifier) + + +if __name__ == '__main__': + main() diff --git a/notifiers.py b/notifiers.py new file mode 100644 index 0000000..99c6369 --- /dev/null +++ b/notifiers.py @@ -0,0 +1,57 @@ +import logging + +import tweepy +from utils import format_timedelta + +logger = logging.getLogger(__name__) + + +class TwitterNotifier(object): + + _hashtags_map = { + 'rtx 3060 ti': ['#nvidia', '#rtx3060ti'], + 'rtx 3070': ['#nvidia', '#rtx3070'], + 'rtx 3080': ['#nvidia', '#rtx3080'], + 'rtx 3090': ['#nvidia', '#rtx3090'], + 'rx 6800 xt': ['#amd', '#rx6800xt'], + 'rx 6800': ['#amd', '#rx6800'], + } + + _currency_map = { + 'EUR': '€' + } + + def __init__(self, consumer_key, consumer_secret, access_token, access_token_secret): + auth = tweepy.OAuthHandler(consumer_key, consumer_secret) + auth.set_access_token(access_token, access_token_secret) + self._api = tweepy.API(auth) + + def create_thread(self, product): + currency_sign = self._currency_map[product.price_currency] + shop_name = product.shop.name + price = f'{product.price}{currency_sign}' + message = f'{shop_name}: {product.name} for {price} is available at {product.url}' + hashtags = self._parse_hashtags(product) + if hashtags: + message += f' {hashtags}' + return self._create_tweet(message=message) + + def close_thread(self, tweet_id, duration): + thread = self._api.get_status(id=tweet_id) + duration = format_timedelta(duration, '{hours_total}h{minutes2}m') + message = f'''@{thread.user.screen_name} And it's over ({duration})''' + return self._create_tweet(message=message, tweet_id=tweet_id) + + def _create_tweet(self, message, tweet_id=None): + try: + tweet = self._api.update_status(status=message, in_reply_to_status_id=tweet_id) + logger.info(f'tweet {tweet.id} sent with message "{message}"') + return tweet + except tweepy.error.TweepError as err: + logger.warning('cannot send tweet with message "{message}"') + logger.warning(str(err)) + + def _parse_hashtags(self, product): + for patterns in self._hashtags_map: + if all(elem in product.name.lower().split(' ') for elem in patterns.split(' ')): + return ' '.join(self._hashtags_map[patterns]) diff --git a/parsers.py b/parsers.py new file mode 100644 index 0000000..e6fa5b4 --- /dev/null +++ b/parsers.py @@ -0,0 +1,403 @@ +import logging +from html.parser import HTMLParser + +from db import Product +from utils import parse_base_url + +logger = logging.getLogger(__name__) + + +# Parsers definitively need to be replaced by beautifulsoup because the code is not maintainable + + +class ProductParser(HTMLParser): + def __init__(self): + super().__init__() + self.products = [] + self.next_page = None + + +class TopAchatParser(ProductParser): + def __init__(self, url=None): + super().__init__() + self._parsing_article = False + self._parsing_availability = False + self._parsing_price = False + self._parsing_price_currency = False + self._parsing_name = False + self._parsing_url = False + self._product = Product() + if url: + self._base_url = parse_base_url(url) + else: + self._base_url = 'https://www.topachat.com' + + @staticmethod + def parse_name(data): + return data.split(' + ')[0].strip() + + def handle_starttag(self, tag, attrs): + if tag == 'article': + for name, value in attrs: + if 'grille-produit' in value.split(' '): + self._parsing_article = True + elif self._parsing_article: + if tag == 'link': + for name, value in attrs: + if name == 'itemprop' and value == 'availability': + self._parsing_availability = True + elif self._parsing_availability and name == 'href': + self._product.available = value != 'http://schema.org/OutOfStock' + elif tag == 'div': + for name, value in attrs: + if name == 'itemprop' and value == 'price': + self._parsing_price = True + elif self._parsing_price and name == 'content': + self._product.price = float(value) + elif name == 'class' and value == 'libelle': + self._parsing_url = True + self._parsing_name = True + elif tag == 'meta': + for name, value in attrs: + if name == 'itemprop' and value == 'priceCurrency': + self._parsing_price_currency = True + elif self._parsing_price_currency and name == 'content': + self._product.price_currency = value + elif tag == 'a': + for name, value in attrs: + if self._parsing_url and name == 'href': + self._product.url = f'{self._base_url}{value}' + + def handle_data(self, data): + if self._parsing_name and self.get_starttag_text().startswith('

') and not self._product.name: + self._product.name = self.parse_name(data) + self._parsing_name = False + + def handle_endtag(self, tag): + if self._parsing_article and tag == 'article': + self._parsing_article = False + self.products.append(self._product) + self._product = Product() + elif self._parsing_availability and tag == 'link': + self._parsing_availability = False + elif self._parsing_price and tag == 'div': + self._parsing_price = False + elif self._parsing_price_currency and tag == 'meta': + self._parsing_price_currency = False + + +class LDLCParser(ProductParser): + def __init__(self, url=None): + super().__init__() + self._product = Product() + self.__parsing_pdt_item = False + self.__parsing_pdt_id = False + self._parsing_title = False + self.__parsing_pagination = False + self.__parsing_next_page_section = False + self._parsing_stock = False + self._parsing_price = False + if url: + self._base_url = parse_base_url(url) + else: + self._base_url = 'https://www.ldlc.com' + + @property + def _parsing_item(self): + return self.__parsing_pdt_item and self.__parsing_pdt_id + + @property + def _parsing_next_page(self): + return self.__parsing_pagination and self.__parsing_next_page_section + + @staticmethod + def parse_price(string): + currency = None + if '€' in string: + currency = 'EUR' + price = int(''.join([i for i in string if i.isdigit()])) + return price, currency + + def handle_starttag(self, tag, attrs): + if not self._parsing_item and tag == 'li' and not self.__parsing_pagination: + for name, value in attrs: + if name == 'class' and value == 'pdt-item': + self.__parsing_pdt_item = True + elif name == 'id' and value.startswith('pdt-'): + self.__parsing_pdt_id = True + elif not self.__parsing_pagination and tag == 'ul': + for name, value in attrs: + if name == 'class' and value == 'pagination': + self.__parsing_pagination = True + elif self.__parsing_pagination and tag == 'li': + for name, value in attrs: + if name == 'class' and value == 'next': + self.__parsing_next_page_section = True + elif self._parsing_next_page and tag == 'a': + for name, value in attrs: + if name == 'href': + self.next_page = f'{self._base_url}{value}' + elif self._parsing_item: + if tag == 'h3': + self._parsing_title = True + elif self._parsing_title and tag == 'a': + for name, value in attrs: + if name == 'href': + self._product.url = f'{self._base_url}{value}' + elif tag == 'div': + for name, value in attrs: + if not self._parsing_stock and name == 'class' and 'modal-stock-web' in value.split(' '): + self._parsing_stock = True + elif not self._parsing_price and name == 'class' and value == 'price': + self._parsing_price = True + + def handle_data(self, data): + last_tag = self.get_starttag_text() + if self._parsing_title and not self._product.name and last_tag.startswith(''): + self._product.available = data.strip() != 'Rupture' + elif self._parsing_price: + if last_tag.startswith(''): + self._product.price += int(data) / 100 + + def handle_endtag(self, tag): + if self._parsing_item and tag == 'li': + self.__parsing_pdt_item = False + self.__parsing_pdt_id = False + self.products.append(self._product) + self._product = Product() + elif self._parsing_title and tag == 'h3': + self._parsing_title = False + elif self._parsing_stock and tag == 'span': + self._parsing_stock = False + elif self._parsing_price and tag == 'div': + self._parsing_price = False + elif self.__parsing_pagination and tag == 'ul': + self.__parsing_pagination = False + elif self.__parsing_next_page_section and tag == 'a': + self.__parsing_next_page_section = False + + +class MaterielNetParser(ProductParser): + def __init__(self, url=None): + super().__init__() + self._product = Product() + self._parsing_product = False + self._parsing_product_meta = False + self._parsing_title = False + self.__parsing_product_availability = False + self.__stock_web_id = None + self._parsing_availability = False + self.__parsing_price_category = False + self.__parsing_price_objects = False + self._parsing_price = False + self._parsing_pagination = False + self.__active_page_found = False + self.__parsing_next_page = False + self._pagination_parsed = False + if url: + self._base_url = parse_base_url(url) + else: + self._base_url = 'https://www.materiel.net' + + @property + def _parsing_web_availability(self): + return self.__parsing_product_availability and self.__stock_web_id + + def _close_availability_parsing(self): + self._parsing_availability = False + self.__stock_web_id = None + self.__parsing_product_availability = False + + def _close_product_meta_parsing(self): + self._parsing_product_meta = False + + def _close_title_parsing(self): + self._parsing_title = False + + def _close_price_parsing(self): + self.__parsing_price_category = False + self.__parsing_price_objects = False + self._parsing_price = False + + def _close_product_parsing(self): + self._parsing_product = False + self.products.append(self._product) + self._product = Product() + + def _close_pagination_parsing(self): + self._parsing_pagination = False + self._pagination_parsed = True + + @staticmethod + def parse_price(string): + currency = None + if '€' in string: + currency = 'EUR' + price = int(''.join([i for i in string if i.isdigit()])) + return price, currency + + def handle_starttag(self, tag, attrs): + if not self._parsing_product and tag == 'li': + for name, value in attrs: + if name == 'class' and 'ajax-product-item' in value.split(' '): + self._parsing_product = True + + if not self._parsing_product_meta and tag == 'div': + for name, value in attrs: + if name == 'class' and value == 'c-product__meta': + self._parsing_product_meta = True + elif self._parsing_product_meta: + if tag == 'a': + for name, value in attrs: + if name == 'href': + self._product.url = f'{self._base_url}{value}' + elif tag == 'h2': + for name, value in attrs: + if name == 'class' and value == 'c-product__title': + self._parsing_title = True + if tag == 'div': + for name, value in attrs: + if not self.__parsing_product_availability and name == 'class' and value == 'c-product__availability': + self.__parsing_product_availability = True + elif self.__parsing_product_availability and name == 'data-stock-web': + self.__stock_web_id = value + elif tag == 'span' and self._parsing_web_availability: + for name, value in attrs: + availability_class_name = f'o-availability__value--stock_{self.__stock_web_id}' + if name == 'class' and availability_class_name in value.split(' '): + self._parsing_availability = True + if not self.__parsing_price_objects and tag == 'div': + for name, value in attrs: + if not self.__parsing_price_category and name == 'class' and value == 'c-product__prices': + self.__parsing_price_category = True + elif self.__parsing_price_category and name == 'class' and 'o-product__prices' in value.split(' '): + self.__parsing_price_objects = True + elif self.__parsing_price_objects and tag == 'span': + for name, value in attrs: + if name == 'class' and value == 'o-product__price': + self._parsing_price = True + if not self._pagination_parsed: + if not self._parsing_pagination and tag == 'ul': + for name, value in attrs: + if name == 'class' and value == 'pagination': + self._parsing_pagination = True + elif self._parsing_pagination and tag == 'li': + for name, value in attrs: + values = value.split(' ') + if not self.__active_page_found and name == 'class' and 'page-item' in values \ + and 'active' in values: + self.__active_page_found = True + elif self.__active_page_found and name == 'class' and 'page-item' in values: + self.__parsing_next_page = True + elif self.__parsing_next_page and tag == 'a': + for name, value in attrs: + if name == 'href': + self.next_page = f'{self._base_url}{value}' + self.__parsing_next_page = False + self._pagination_parsed = True + + def handle_endtag(self, tag): + if self._parsing_product_meta and tag == 'div': + self._close_product_meta_parsing() + elif self._parsing_product and tag == 'li': + self._close_product_parsing() + elif self._parsing_pagination and tag == 'ul': + self._close_pagination_parsing() + + def handle_data(self, data): + last_tag = self.get_starttag_text() + if self._parsing_title and last_tag.startswith(''): + self._product.price += int(data) / 100 + self._close_price_parsing() + + +class AlternateParser(ProductParser): + def __init__(self, url=None): + super().__init__() + self._product = Product() + if url: + self._base_url = parse_base_url(url) + else: + self._base_url = 'https://www.alternate.be' + self._parsing_row = False + self._parsing_name = False + self._parsing_price = False + + def handle_starttag(self, tag, attrs): + if not self._parsing_row and tag == 'div': + for name, value in attrs: + if name == 'class' and value == 'listRow': + self._parsing_row = True + elif self._parsing_row: + if tag == 'a': + for name, value in attrs: + if name == 'href' and not self._product.url: + self._product.url = self.parse_url(value) + elif tag == 'span': + if not self._parsing_name: + for name, value in attrs: + if name == 'class': + if value == 'name': + self._parsing_name = True + elif self._parsing_name: + for name, value in attrs: + if name == 'class' and value == 'additional': + self._parsing_name = False + if not self._parsing_price: + for name, value in attrs: + if name == 'class' and 'price' in value.split(' '): + self._parsing_price = True + elif tag == 'strong': + for name, value in attrs: + if name == 'class' and 'stockStatus' in value.split(' '): + values = value.split(' ') + available = 'available_unsure' not in values and 'preorder' not in values + self._product.available = available + + def handle_data(self, data): + if self._parsing_name: + data = data.replace('grafische kaart', '').strip() + if data: + if not self._product.name: + self._product.name = data + else: + self._product.name += f' {data}' + elif self._parsing_price: + price, currency = self.parse_price(data) + if price and currency: + self._product.price = price + self._product.price_currency = currency + self._parsing_price = False + + def handle_endtag(self, tag): + if tag == 'span' and self._parsing_price: + self._parsing_price = False + elif tag == 'div' and self._parsing_row and self._product.ok(): + self._parsing_row = False + self.products.append(self._product) + self._product = Product() + + @staticmethod + def parse_price(string): + currency = None + if '€' in string: + currency = 'EUR' + price = int(''.join([i for i in string if i.isdigit()])) + return price, currency + + def parse_url(self, string): + string = string.split('?')[0] # remove query string + return f'{self._base_url}{string}' diff --git a/twitter_auth.py b/twitter_auth.py new file mode 100644 index 0000000..9c44384 --- /dev/null +++ b/twitter_auth.py @@ -0,0 +1,46 @@ +#!/usr/bin/env python3 + +import json +from urllib.parse import urlparse + +import tweepy + + +def main(): + with open('config.json', 'r') as fd: + config = json.load(fd) + + if 'access_token' in config['twitter'] and 'access_token_secret' in config['twitter']: + access_token = config['twitter']['access_token'] + access_token_secret = config['twitter']['access_token_secret'] + else: + consumer_key = config['twitter']['consumer_key'] + consumer_secret = config['twitter']['consumer_secret'] + + auth = tweepy.OAuthHandler(consumer_key, consumer_secret) + + try: + redirect_url = auth.get_authorization_url() + print(f'Please go to {redirect_url}') + except tweepy.TweepError: + print('Failed to get request token') + + token = urlparse(redirect_url).query.split('=')[1] + + verifier = input('Verifier:') + auth.request_token = {'oauth_token': token, 'oauth_token_secret': verifier} + + try: + auth.get_access_token(verifier) + except tweepy.TweepError: + print('Failed to get access token') + + access_token = auth.access_token + access_token_secret = auth.access_token_secret + + print(f'access_token = {access_token}') + print(f'access_token_secret = {access_token_secret}') + + +if __name__ == '__main__': + main() diff --git a/utils.py b/utils.py new file mode 100644 index 0000000..7e753eb --- /dev/null +++ b/utils.py @@ -0,0 +1,53 @@ +from math import floor +from urllib.parse import urlparse + + +def format_timedelta(value, time_format='{days} days, {hours2}:{minutes2}:{seconds2}'): + """ + Taken from https://github.com/frnhr/django_timedeltatemplatefilter and + https://github.com/frnhr/django_timedeltatemplatefilter + """ + if hasattr(value, 'seconds'): + seconds = value.seconds + value.days * 24 * 3600 + else: + seconds = int(value) + + seconds_total = seconds + + minutes = int(floor(seconds / 60)) + minutes_total = minutes + seconds -= minutes * 60 + + hours = int(floor(minutes / 60)) + hours_total = hours + minutes -= hours * 60 + + days = int(floor(hours / 24)) + days_total = days + hours -= days * 24 + + years = int(floor(days / 365)) + years_total = years + days -= years * 365 + + return time_format.format(**{ + 'seconds': seconds, + 'seconds2': str(seconds).zfill(2), + 'minutes': minutes, + 'minutes2': str(minutes).zfill(2), + 'hours': hours, + 'hours2': str(hours).zfill(2), + 'days': days, + 'years': years, + 'seconds_total': seconds_total, + 'minutes_total': minutes_total, + 'hours_total': hours_total, + 'days_total': days_total, + 'years_total': years_total, + }) + + +def parse_base_url(url, include_scheme=True): + result = urlparse(url) + base_url = f'{result.scheme}://{result.netloc}' if include_scheme else result.netloc.replace('www.', '') + return base_url