stocksymbolscraper/src/stocksymbolscraper/scrape.py

197 lines
6.1 KiB
Python

import logging
import re
import asyncio
from datetime import datetime
from string import ascii_uppercase
from math import inf
from aiohttp import ClientSession
from bs4 import BeautifulSoup
from bs4.element import Tag, ResultSet
log = logging.getLogger(__name__)
log.setLevel(logging.ERROR)
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
log.addHandler(ch)
row_type = tuple[str, str, str, str, str]
DOMAIN = 'www.marketwatch.com'
BASE_URL = f'https://{DOMAIN}/tools/markets/stocks/a-z/'
DIGIT_CATEGORY = '0-9'
OTHER_CATEGORY = 'Other'
CATEGORIES = [DIGIT_CATEGORY] + list(ascii_uppercase) + [OTHER_CATEGORY]
STOCK_SYMBOL_PATTERN = re.compile(r'\(([\w.&]+)\)')
HTML_PARSER = 'html.parser'
class UnexpectedMarkupError(Exception):
pass
def extract_row_data(*table_rows: Tag) -> list[row_type]:
"""
Iterates over any number of table rows to extract data from them.
Args:
table_rows:
Arbitrary number of 'tr' Tag objects to be processed for data.
Returns:
A list of 5-tuples (of string elements)
"""
return [get_single_tr_data(tr) for tr in table_rows]
def get_single_tr_data(table_row: Tag) -> row_type:
"""
Returns the data from a table row.
Args:
table_row:
Specific 'tr' Tag object to be processed for data.
Returns:
A 5-tuple of string elements
"""
tds = table_row.find_all('td')
company_name = str(tds[0].a.contents[0]).strip()
stock_symbol = str(tds[0].a.contents[1].contents[0]).strip()
m = re.search(STOCK_SYMBOL_PATTERN, stock_symbol)
if m is None:
log.warning(f"{stock_symbol} did not match the stock symbol pattern; saving as is")
else:
stock_symbol = m.group(1)
country = get_str_from_td(tds[1])
exchange = get_str_from_td(tds[2])
sector = get_str_from_td(tds[3])
return company_name, stock_symbol, country, exchange, sector
def get_str_from_td(td: Tag) -> str:
"""
Returns content of a 'td' Tag object as a string. The only content has to be a NavigableString object.
Args:
td:
The table cell to be converted into a string.
Returns:
String content from a cell
"""
try:
content = td.contents[0]
except IndexError:
return ''
return str(content).strip()
async def soup_from_url(url: str, session: ClientSession = None) -> BeautifulSoup:
"""
Requests page and converts contents into a BeautifulSoup object.
Args:
url:
URL string leading to any page with matching content.
session (optional):
If passed a ClientSession instance, all HTTP requests will be made using that session;
otherwise a new one is created.
Returns:
A BeautifulSoup object for further data extraction
"""
if session is None:
session = ClientSession()
async with session.get(url) as response:
html = await response.text()
return BeautifulSoup(html, HTML_PARSER)
def trs_from_page(soup: BeautifulSoup, limit: int = None) -> ResultSet:
"""
Returns the table rows found on the specified page.
Args:
soup:
Page text to be scoured for table rows.
limit (optional):
Stop looking after finding this many results;
finds all matches by default.
Raises:
UnexpectedMarkupError:
If no table or table body are found.
Returns:
A ResultSet object containing all extracted 'tr' Tag objects
"""
try:
return soup.find('div', {'id': 'marketsindex'}).table.tbody.find_all('tr', limit=limit)
except AttributeError:
log.error("Unexpected HTML markup!")
file_name = f'unexpected_response_at_{datetime.now().strftime("%Y-%m-%d_%H-%M")}.html'
with open(file_name, 'w') as f:
f.write(soup.prettify())
raise UnexpectedMarkupError
async def get_data_from_category(category: str, session: ClientSession = None,
first_page: int = 1, last_page: int = inf) -> list[row_type]:
"""
Returns data rows from a category (i.e. companies starting with that specific letter).
Args:
category:
Must be a valid component of the URL path indicating the first character-category (e.g. 'A' or '0-9').
session (optional):
If passed a ClientSession instance, all HTTP requests will be made using that session;
otherwise a new one is created.
first_page (optional):
The number of the page to begin with when scraping the results; defaults to 1.
last_page (optional):
The number of the last page to scrape; by default all pages starting with `first_page` are scraped.
Returns:
A list of 5-tuples (of string elements) extracted from the specified pages
"""
log.info(f"Getting companies starting with '{category}'")
if session is None:
session = ClientSession()
data: list[row_type] = []
page = first_page
soup = await soup_from_url(f'{BASE_URL}{category}', session)
trs = trs_from_page(soup)
while page <= last_page and len(trs) > 0:
data.extend(extract_row_data(*trs))
log.info(f"Scraped '{category}' page {page}")
page += 1
soup = await soup_from_url(f'{BASE_URL}{category}/{page}', session)
trs = trs_from_page(soup)
return data
async def get_all_data(sequential: bool = False) -> list[row_type]:
"""
Returns a list with every available data row from all categories.
Args:
sequential (optional):
Whether or not to forgo the asynchronous gathering capabilities;
by default requests are issued concurrently.
Returns:
A list of 5-tuples (of strings)
"""
async with ClientSession() as session:
if sequential:
results = [await get_data_from_category(category, session) for category in CATEGORIES]
else:
results = await asyncio.gather(*(get_data_from_category(category, session) for category in CATEGORIES))
data = []
for result in results:
data.extend(result)
return data