from typing import Tuple, List, Dict, Callable, Any from urllib.parse import urlparse, urljoin import asyncio import random import re import json from aiohttp import ClientSession from bs4 import BeautifulSoup from bs4.element import Tag import yaml from .settings import settings, OutputFormat, TagAttrs BS_PARSER = 'html.parser' class Job: def __init__(self, session: ClientSession = None): self.loop = None self.session = session self.lock = None self.results: Dict[str, List[Any]] = {} self.num_targets_found: int = 0 self.num_links_followed: int = 0 self.page_counter: int = 0 self.entry_urls: List[str] = [] urls = [settings.entry_urls] if isinstance(settings.entry_urls, str) else settings.entry_urls for url in urls: if is_valid_url(url): self.entry_urls.append(url) else: with open(url, 'r') as f: self.entry_urls += f.readlines() async def init_async(self) -> None: self.loop = asyncio.get_event_loop() self.session = ClientSession(loop=self.loop) self.lock = asyncio.Lock() async def start(self) -> None: await self.init_async() print_start() try: await self.run(*self.entry_urls) finally: await self.session.close() print_end() async def run(self, *urls: str, depth: int = 0) -> None: async with self.lock: urls = list(set(urls).difference(*self.results.keys())) if settings.max_pages: num_requests_left = settings.max_pages - self.num_links_followed if num_requests_left <= 0: return urls = urls[:num_requests_left] self.num_links_followed += len(urls) output = await asyncio.gather(*(self.get_and_scrape(url) for url in urls), loop=self.loop) assert isinstance(output, list) next_links = construct_next_urls(urls, output) if depth < settings.max_depth: await self.run(*next_links, depth=depth + 1) async def get_and_scrape(self, url: str) -> List[str]: async with self.lock: if settings.target_limit and self.num_targets_found >= settings.target_limit: return [] async with self.session.get(url) as response: html = await response.text() targets, links = scrape_document(html) async with self.lock: self.page_counter += 1 num_targets_left = settings.target_limit - self.num_targets_found if settings.target_limit else None targets = targets[:num_targets_left] self.results[url] = targets self.num_targets_found += len(targets) print_page_results(url, targets, self.limit_reached()) return links def limit_reached(self) -> bool: if settings.max_pages and self.page_counter >= settings.max_pages: return True if settings.target_limit and self.num_targets_found >= settings.target_limit: return True return False def scrape_document(html: str) -> Tuple[List[Any], List[str]]: soup = BeautifulSoup(html, BS_PARSER) # Targets: targets = [] for tag in soup.find_all(target_filter, limit=settings.target_limit): if settings.target_extract_text: targets.append(settings.target_transform(tag.text) if settings.target_transform else tag.text) else: targets.append(settings.target_transform(tag) if settings.target_transform else str(tag)) # Next links: if settings.next_link_random: links = soup.find_all(link_filter) if settings.next_link_limit and settings.next_link_limit < len(links): indices = list(range(len(links))) random.shuffle(indices) links = links[:settings.next_link_limit] else: links = soup.find_all(link_filter, limit=settings.next_link_limit) return targets, [a['href'] for a in links] def link_filter(tag: Tag) -> bool: try: if not string_matches(tag['href'], settings.next_link_href, settings.regex_mode): return False except KeyError: return False return tag_filter(tag, 'a', text=settings.next_link_text, attrs=settings.next_link_attrs, func=settings.next_link_match_func, regex=settings.regex_mode) def target_filter(tag: Tag) -> bool: return tag_filter(tag, settings.target_tag, text=settings.target_text, attrs=settings.target_attrs, func=settings.target_match_func, regex=settings.regex_mode) def tag_filter(tag: Tag, name: str = None, text: str = None, attrs: TagAttrs = None, func: Callable[[Tag], bool] = None, regex: bool = False) -> bool: """ Returns `True` only if the `tag` matches all provided filter criteria. Built to be used in the `find_all` method from BeautifulSoup4. Args: tag: The BS4 Tag object to check name (optional): What kind of tag will be matched (e.g. 'a' would match an HTML anchor tag) text (optional): The text enclosed by the tag to be matched func (optional): Function to run on the tag for filtering (should return `True` if the tag matches) attrs (optional): Any additional attributes the tag should match, e.g. {'class': 'relevant'} regex (optional): If `True`, all checks are performed by matching the tag's attributes using the provided arguments as a regular expression, otherwise they are checked for string equality. """ if not string_matches(tag.name, name, regex): return False if not string_matches(tag.text, text, regex): return False for attr_name, attr_values in attrs.items(): try: values = tag[attr_name] except KeyError: return False if not isinstance(values, list): values = [values] if not isinstance(attr_values, list): attr_values = [attr_values] for attr_value in attr_values: if not any(string_matches(value, attr_value, regex) for value in values): return False if func: return func(tag) return True def string_matches(search_string: str, expression: str = None, regex: bool = False) -> bool: if expression is None: return True if not regex: return search_string == expression return re.compile(expression).search(search_string) is not None def construct_next_urls(urls: List[str], next_links_lists: List[List[str]]) -> List[str]: output = set() for url, next_links in zip(urls, next_links_lists): for link in next_links: output.add(urljoin(url, link)) return list(output) def is_valid_url(string: str) -> bool: parsed = urlparse(string) if not all([parsed.scheme, parsed.netloc]): return False if parsed.scheme not in ('http', 'https'): return False return True def print_start() -> None: if settings.output_format == OutputFormat.json: print('{') if settings.output_with_urls else print('[') def print_end() -> None: if settings.output_format == OutputFormat.json: print('}') if settings.output_with_urls else print(']') def print_page_results(url: str, targets: List[str], last: bool = False) -> None: if settings.output_format == OutputFormat.yaml: output = yaml.safe_dump({url: targets} if settings.output_with_urls else targets) end = '' elif settings.output_format == OutputFormat.json: output = json.dumps({url: targets} if settings.output_with_urls else targets, indent=2)[2:-2] end = '\n' if last else ',\n' else: output = '\n'.join(targets) if settings.output_with_urls: output = url + ':\n' + output end = '\n' print(output, end=end)