import logging import re import asyncio from datetime import datetime from string import ascii_uppercase from aiohttp import ClientSession from bs4 import BeautifulSoup from bs4.element import Tag, ResultSet log = logging.getLogger(__name__) log.setLevel(logging.DEBUG) ch = logging.StreamHandler() ch.setLevel(logging.DEBUG) log.addHandler(ch) row_type = tuple[str, str, str, str, str] DOMAIN = 'www.marketwatch.com' BASE_URL = f'https://{DOMAIN}/tools/markets/stocks/a-z/' DIGIT_CATEGORY = '0-9' OTHER_CATEGORY = 'Other' CATEGORIES = [DIGIT_CATEGORY] + list(ascii_uppercase) + [OTHER_CATEGORY] STOCK_SYMBOL_PATTERN = re.compile(r'\(([\w.&]+)\)') class UnexpectedMarkupError(Exception): pass def data_from_rows(trs: ResultSet) -> list[row_type]: data: list[row_type] = [] for row in trs: data.append(get_single_row_data(row)) return data def get_single_row_data(table_row: Tag) -> row_type: tds = table_row.find_all('td') company_name = str(tds[0].a.contents[0]).strip() stock_symbol = str(tds[0].a.contents[1].contents[0]).strip() m = re.search(STOCK_SYMBOL_PATTERN, stock_symbol) if m is None: log.error(f"{stock_symbol} did not match the stock symbol pattern") else: stock_symbol = m.group(1) country = get_str_from_td(tds[1]) exchange = get_str_from_td(tds[2]) sector = get_str_from_td(tds[3]) return company_name, stock_symbol, country, exchange, sector def get_str_from_td(td: Tag) -> str: try: content = td.contents[0] except IndexError: return '' return str(content).strip() async def all_trs_from_page(url: str) -> ResultSet: async with ClientSession() as session: async with session.get(url) as response: html = await response.text() soup = BeautifulSoup(html, 'html.parser') try: return soup.find('div', {'id': 'marketsindex'}).table.tbody.find_all('tr') except AttributeError: log.error("Unexpected HTML markup!") file_name = f'unexpected_response_at_{datetime.now().strftime("%Y-%m-%d_%H-%M")}.html' with open(file_name, 'w') as f: f.write(html) raise UnexpectedMarkupError async def all_data_from_category(category: str) -> list[row_type]: log.info(f"Getting companies starting with '{category}'") data: list[row_type] = [] page = 1 trs = await all_trs_from_page(f'{BASE_URL}{category}') while len(trs) > 0: log.info(f"Scraping page {page}") data.extend(data_from_rows(trs)) page += 1 trs = await all_trs_from_page(f'{BASE_URL}{category}/{page}') return data async def get_all_data(asynchronous: bool = False) -> list[row_type]: if asynchronous: results = await asyncio.gather(*(all_data_from_category(category) for category in CATEGORIES)) else: results = [await all_data_from_category(category) for category in CATEGORIES] data = [] for result in results: data.extend(result) return data def main() -> None: data = asyncio.run(get_all_data(True)) for tup in data: print(tup) print(len(data), 'datasets') if __name__ == '__main__': main()