soupjobs/src/soupjobs/scrape.py

210 lines
7.5 KiB
Python

from typing import Tuple, List, Dict, Callable, Any
from urllib.parse import urlparse, urljoin
import asyncio
import random
import re
import json
from aiohttp import ClientSession
from bs4 import BeautifulSoup
from bs4.element import Tag
import yaml
from .settings import settings, OutputFormat, TagAttrs
BS_PARSER = 'html.parser'
class Job:
def __init__(self, session: ClientSession = None):
self.loop = None
self.session = session
self.lock = None
self.results: Dict[str, List[Any]] = {}
self.num_targets_found: int = 0
self.num_links_followed: int = 0
self.page_counter: int = 0
self.entry_urls: List[str] = []
urls = [settings.entry_urls] if isinstance(settings.entry_urls, str) else settings.entry_urls
for url in urls:
if is_valid_url(url):
self.entry_urls.append(url)
else:
with open(url, 'r') as f:
self.entry_urls += f.readlines()
async def init_async(self) -> None:
self.loop = asyncio.get_event_loop()
self.session = ClientSession(loop=self.loop)
self.lock = asyncio.Lock()
async def start(self) -> None:
await self.init_async()
print_start()
try:
await self.run(*self.entry_urls)
finally:
await self.session.close()
print_end()
async def run(self, *urls: str, depth: int = 0) -> None:
async with self.lock:
urls = list(set(urls).difference(*self.results.keys()))
if settings.max_pages:
num_requests_left = settings.max_pages - self.num_links_followed
if num_requests_left <= 0:
return
urls = urls[:num_requests_left]
self.num_links_followed += len(urls)
output = await asyncio.gather(*(self.get_and_scrape(url) for url in urls), loop=self.loop)
assert isinstance(output, list)
next_links = construct_next_urls(urls, output)
if depth < settings.max_depth:
await self.run(*next_links, depth=depth + 1)
async def get_and_scrape(self, url: str) -> List[str]:
async with self.lock:
if settings.target_limit and self.num_targets_found >= settings.target_limit:
return []
async with self.session.get(url) as response:
html = await response.text()
targets, links = scrape_document(html)
async with self.lock:
self.page_counter += 1
num_targets_left = settings.target_limit - self.num_targets_found if settings.target_limit else None
targets = targets[:num_targets_left]
self.results[url] = targets
self.num_targets_found += len(targets)
print_page_results(url, targets, self.limit_reached())
return links
def limit_reached(self) -> bool:
if settings.max_pages and self.page_counter >= settings.max_pages:
return True
if settings.target_limit and self.num_targets_found >= settings.target_limit:
return True
return False
def scrape_document(html: str) -> Tuple[List[Any], List[str]]:
soup = BeautifulSoup(html, BS_PARSER)
# Targets:
targets = []
for tag in soup.find_all(target_filter, limit=settings.target_limit):
target = tag.text if settings.target_extract_text else str(tag)
if settings.target_transform:
target = settings.target_transform(target)
targets.append(target)
# Next links:
if settings.next_link_random:
links = soup.find_all(link_filter)
if settings.next_link_limit and settings.next_link_limit < len(links):
indices = list(range(len(links)))
random.shuffle(indices)
links = links[:settings.next_link_limit]
else:
links = soup.find_all(link_filter, limit=settings.next_link_limit)
return targets, [a['href'] for a in links]
def link_filter(tag: Tag) -> bool:
try:
if not string_matches(tag['href'], settings.next_link_href, settings.regex_mode):
return False
except KeyError:
return False
return tag_filter(tag, 'a', text=settings.next_link_text, attrs=settings.next_link_attrs,
func=settings.next_link_match_func, regex=settings.regex_mode)
def target_filter(tag: Tag) -> bool:
return tag_filter(tag, settings.target_tag, text=settings.target_text, attrs=settings.target_attrs,
func=settings.target_match_func, regex=settings.regex_mode)
def tag_filter(tag: Tag, name: str = None, text: str = None, attrs: TagAttrs = None, func: Callable[[Tag], bool] = None,
regex: bool = False) -> bool:
"""
Returns `True` only if the `tag` matches all provided filter criteria.
Built to be used in the `find_all` method from BeautifulSoup4.
Args:
tag:
The BS4 Tag object to check
name (optional):
What kind of tag will be matched (e.g. 'a' would match an HTML anchor tag)
text (optional):
The text enclosed by the tag to be matched
func (optional):
Function to run on the tag for filtering (should return `True` if the tag matches)
attrs (optional):
Any additional attributes the tag should match, e.g. {'class': 'relevant'}
regex (optional):
If `True`, all checks are performed by matching the tag's attributes using the provided arguments as
a regular expression, otherwise they are checked for string equality.
"""
if not string_matches(tag.name, name, regex):
return False
if not string_matches(tag.text, text, regex):
return False
for attr_name, attr_value in attrs.items():
try:
if not string_matches(tag[attr_name], attr_value, regex):
return False
except KeyError:
return False
if func:
return func(tag)
return True
def string_matches(search_string: str, expression: str = None, regex: bool = False) -> bool:
if expression is None:
return True
if not regex:
return search_string == expression
return re.compile(expression).search(search_string) is not None
def construct_next_urls(urls: List[str], next_links_lists: List[List[str]]) -> List[str]:
output = set()
for url, next_links in zip(urls, next_links_lists):
for link in next_links:
output.add(urljoin(url, link))
return list(output)
def is_valid_url(string: str) -> bool:
parsed = urlparse(string)
if not all([parsed.scheme, parsed.netloc]):
return False
if parsed.scheme not in ('http', 'https'):
return False
return True
def print_start() -> None:
if settings.output_format == OutputFormat.json:
print('{') if settings.output_with_urls else print('[')
def print_end() -> None:
if settings.output_format == OutputFormat.json:
print('}') if settings.output_with_urls else print(']')
def print_page_results(url: str, targets: List[str], last: bool = False) -> None:
if settings.output_format == OutputFormat.yaml:
output = yaml.safe_dump({url: targets} if settings.output_with_urls else targets)
end = ''
elif settings.output_format == OutputFormat.json:
output = json.dumps({url: targets} if settings.output_with_urls else targets, indent=2)[2:-2]
end = '\n' if last else ',\n'
else:
output = '\n'.join(targets)
if settings.output_with_urls:
output = url + ':\n' + output
end = '\n'
print(output, end=end)