import logging import re import asyncio from datetime import datetime from string import ascii_uppercase from math import inf from aiohttp import ClientSession from bs4 import BeautifulSoup from bs4.element import Tag, ResultSet log = logging.getLogger(__name__) log.setLevel(logging.ERROR) ch = logging.StreamHandler() ch.setLevel(logging.DEBUG) log.addHandler(ch) row_type = tuple[str, str, str, str, str] DOMAIN = 'www.marketwatch.com' BASE_URL = f'https://{DOMAIN}/tools/markets/stocks/a-z/' DIGIT_CATEGORY = '0-9' OTHER_CATEGORY = 'Other' CATEGORIES = [DIGIT_CATEGORY] + list(ascii_uppercase) + [OTHER_CATEGORY] STOCK_SYMBOL_PATTERN = re.compile(r'\(([\w.&]+)\)') HTML_PARSER = 'html.parser' class UnexpectedMarkupError(Exception): pass def extract_row_data(*table_rows: Tag) -> list[row_type]: """ Iterates over any number of table rows to extract data from them. Args: table_rows: Arbitrary number of 'tr' Tag objects to be processed for data. Returns: A list of 5-tuples (of string elements) """ return [get_single_tr_data(tr) for tr in table_rows] def get_single_tr_data(table_row: Tag) -> row_type: """ Returns the data from a table row. Args: table_row: Specific 'tr' Tag object to be processed for data. Returns: A 5-tuple of string elements """ tds = table_row.find_all('td') company_name = str(tds[0].a.contents[0]).strip() stock_symbol = str(tds[0].a.contents[1].contents[0]).strip() m = re.search(STOCK_SYMBOL_PATTERN, stock_symbol) if m is None: log.warning(f"{stock_symbol} did not match the stock symbol pattern; saving as is") else: stock_symbol = m.group(1) country = get_str_from_td(tds[1]) exchange = get_str_from_td(tds[2]) sector = get_str_from_td(tds[3]) return company_name, stock_symbol, country, exchange, sector def get_str_from_td(td: Tag) -> str: """ Returns content of a 'td' Tag object as a string. The only content has to be a NavigableString object. Args: td: The table cell to be converted into a string. Returns: String content from a cell """ try: content = td.contents[0] except IndexError: return '' return str(content).strip() async def soup_from_url(url: str, session: ClientSession = None) -> BeautifulSoup: """ Requests page and converts contents into a BeautifulSoup object. Args: url: URL string leading to any page with matching content. session (optional): If passed a ClientSession instance, all HTTP requests will be made using that session; otherwise a new one is created. Returns: A BeautifulSoup object for further data extraction """ if session is None: session = ClientSession() async with session.get(url) as response: html = await response.text() return BeautifulSoup(html, HTML_PARSER) def trs_from_page(soup: BeautifulSoup, limit: int = None) -> ResultSet: """ Returns the table rows found on the specified page. Args: soup: Page text to be scoured for table rows. limit (optional): Stop looking after finding this many results; finds all matches by default. Raises: UnexpectedMarkupError: If no table or table body are found. Returns: A ResultSet object containing all extracted 'tr' Tag objects """ try: return soup.find('div', {'id': 'marketsindex'}).table.tbody.find_all('tr', limit=limit) except AttributeError: log.error("Unexpected HTML markup!") file_name = f'unexpected_response_at_{datetime.now().strftime("%Y-%m-%d_%H-%M")}.html' with open(file_name, 'w') as f: f.write(soup.prettify()) raise UnexpectedMarkupError async def get_data_from_category(category: str, session: ClientSession = None, first_page: int = 1, last_page: int = inf) -> list[row_type]: """ Returns data rows from a category (i.e. companies starting with that specific letter). Args: category: Must be a valid component of the URL path indicating the first character-category (e.g. 'A' or '0-9'). session (optional): If passed a ClientSession instance, all HTTP requests will be made using that session; otherwise a new one is created. first_page (optional): The number of the page to begin with when scraping the results; defaults to 1. last_page (optional): The number of the last page to scrape; by default all pages starting with `first_page` are scraped. Returns: A list of 5-tuples (of string elements) extracted from the specified pages """ log.info(f"Getting companies starting with '{category}'") if session is None: session = ClientSession() data: list[row_type] = [] page = first_page soup = await soup_from_url(f'{BASE_URL}{category}', session) trs = trs_from_page(soup) while page <= last_page and len(trs) > 0: data.extend(extract_row_data(*trs)) log.info(f"Scraped '{category}' page {page}") page += 1 soup = await soup_from_url(f'{BASE_URL}{category}/{page}', session) trs = trs_from_page(soup) return data async def get_all_data(sequential: bool = False) -> list[row_type]: """ Returns a list with every available data row from all categories. Args: sequential (optional): Whether or not to forgo the asynchronous gathering capabilities; by default requests are issued concurrently. Returns: A list of 5-tuples (of strings) """ async with ClientSession() as session: if sequential: results = [await get_data_from_category(category, session) for category in CATEGORIES] else: results = await asyncio.gather(*(get_data_from_category(category, session) for category in CATEGORIES)) data = [] for result in results: data.extend(result) return data