stocksymbolscraper/src/stock-symbol-scraper/main.py

219 lines
6.7 KiB
Python

import logging
import re
import csv
import sys
import asyncio
from argparse import ArgumentParser
from pathlib import Path
from datetime import datetime
from string import ascii_uppercase
from math import inf
from aiohttp import ClientSession
from bs4 import BeautifulSoup
from bs4.element import Tag, ResultSet
log = logging.getLogger(__name__)
log.setLevel(logging.ERROR)
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
log.addHandler(ch)
row_type = tuple[str, str, str, str, str]
DOMAIN = 'www.marketwatch.com'
BASE_URL = f'https://{DOMAIN}/tools/markets/stocks/a-z/'
DIGIT_CATEGORY = '0-9'
OTHER_CATEGORY = 'Other'
CATEGORIES = [DIGIT_CATEGORY] + list(ascii_uppercase) + [OTHER_CATEGORY]
STOCK_SYMBOL_PATTERN = re.compile(r'\(([\w.&]+)\)')
class UnexpectedMarkupError(Exception):
pass
def extract_row_data(*table_rows: Tag) -> list[row_type]:
"""
Iterates over any number of table rows to extract data from them.
Args:
table_rows:
Arbitrary number of 'tr' Tag objects to be processed for data.
Returns:
A list of 5-tuples (of string elements)
"""
return [get_single_tr_data(tr) for tr in table_rows]
def get_single_tr_data(table_row: Tag) -> row_type:
"""
Returns the data from a table row.
Args:
table_row:
Specific 'tr' Tag object to be processed for data.
Returns:
A 5-tuple of string elements
"""
tds = table_row.find_all('td')
company_name = str(tds[0].a.contents[0]).strip()
stock_symbol = str(tds[0].a.contents[1].contents[0]).strip()
m = re.search(STOCK_SYMBOL_PATTERN, stock_symbol)
if m is None:
log.warning(f"{stock_symbol} did not match the stock symbol pattern; saving as is")
else:
stock_symbol = m.group(1)
country = get_str_from_td(tds[1])
exchange = get_str_from_td(tds[2])
sector = get_str_from_td(tds[3])
return company_name, stock_symbol, country, exchange, sector
def get_str_from_td(td: Tag) -> str:
"""
Returns content of a 'td' Tag object as a string. The only content has to be a NavigableString object.
Args:
td:
The table cell to be converted into a string.
Returns:
String content from a cell
"""
try:
content = td.contents[0]
except IndexError:
return ''
return str(content).strip()
async def trs_from_page(url: str, session: ClientSession = None, limit: int = None) -> ResultSet:
"""
Returns the table rows found on the specified page.
Args:
url:
URL string leading to a page with matching content.
session (optional):
If passed a ClientSession instance, all HTTP requests will be made using that session;
otherwise a new one is created.
limit (optional):
Stop looking after finding this many results;
finds all matches by default.
Raises:
UnexpectedMarkupError:
If no table or table body are found.
Returns:
A ResultSet object containing all extracted 'tr' Tag objects
"""
if session is None:
session = ClientSession()
async with session.get(url) as response:
html = await response.text()
soup = BeautifulSoup(html, 'html.parser')
try:
return soup.find('div', {'id': 'marketsindex'}).table.tbody.find_all('tr', limit=limit)
except AttributeError:
log.error("Unexpected HTML markup!")
file_name = f'unexpected_response_at_{datetime.now().strftime("%Y-%m-%d_%H-%M")}.html'
with open(file_name, 'w') as f:
f.write(html)
raise UnexpectedMarkupError
async def get_data_from_category(category: str, session: ClientSession = None,
first_page: int = 1, last_page: int = inf) -> list[row_type]:
"""
Returns data rows from a category (i.e. companies starting with that specific letter).
Args:
category:
Must be a valid component of the URL path indicating the first character-category (e.g. 'A' or '0-9').
session (optional):
If passed a ClientSession instance, all HTTP requests will be made using that session;
otherwise a new one is created.
first_page (optional):
The number of the page to begin with when scraping the results; defaults to 1.
last_page (optional):
The number of the last page to scrape; by default all pages starting with `first_page` are scraped.
Returns:
A list of 5-tuples (of string elements) extracted from the specified pages
"""
log.info(f"Getting companies starting with '{category}'")
if session is None:
session = ClientSession()
data: list[row_type] = []
page = first_page
trs = await trs_from_page(f'{BASE_URL}{category}', session)
while page <= last_page and len(trs) > 0:
data.extend(extract_row_data(*trs))
log.info(f"Scraped '{category}' page {page}")
page += 1
trs = await trs_from_page(f'{BASE_URL}{category}/{page}', session)
return data
async def get_all_data(sequential: bool = False) -> list[row_type]:
"""
Returns a list with every available data row from all categories.
Args:
sequential (optional):
Whether or not to forgo the asynchronous gathering capabilities;
by default requests are issued concurrently.
Returns:
A list of 5-tuples (of strings)
"""
async with ClientSession() as session:
if sequential:
results = [await get_data_from_category(category, session) for category in CATEGORIES]
else:
results = await asyncio.gather(*(get_data_from_category(category, session) for category in CATEGORIES))
data = []
for result in results:
data.extend(result)
return data
def main() -> None:
parser = ArgumentParser(description="Scrape all stock symbols")
parser.add_argument(
'-v', '--verbose',
action='store_true',
help="If set, prints all sorts of stuff."
)
parser.add_argument(
'-S', '--sequential',
action='store_true',
help="If set, all requests are performed sequentially; otherwise async capabilities are used for concurrency."
)
parser.add_argument(
'-f', '--to-file',
type=Path,
help="Writes results to the specified destination file. If omitted results are printed to stdout."
)
args = parser.parse_args()
if args.verbose:
log.setLevel(logging.DEBUG)
data = asyncio.run(get_all_data(args.sequential))
if args.to_file is None:
csv.writer(sys.stdout).writerows(data)
else:
with open(args.to_file, 'w') as f:
csv.writer(f).writerows(data)
if __name__ == '__main__':
main()