Compare commits

...

7 Commits

6 changed files with 105 additions and 51 deletions

View File

@ -1 +1 @@
from .functions import get_company_financials from .functions import get_balance_sheet, get_income_statement, get_cash_flow_statement, get_all_financials

View File

@ -6,8 +6,8 @@ from argparse import ArgumentParser
from pathlib import Path from pathlib import Path
from typing import Dict from typing import Dict
from .functions import get_company_financials, ResultDict from .functions import get_all_financials, ResultDict
from .constants import END_DATE, MAIN_LOGGER_NAME from .constants import END_DATE, MAIN_LOGGER_NAME, DEFAULT_CONCURRENT_BATCH_SIZE
log = logging.getLogger(MAIN_LOGGER_NAME) log = logging.getLogger(MAIN_LOGGER_NAME)
@ -16,6 +16,7 @@ JSON_EXT, CSV_EXT = '.json', '.csv'
TICKER_SYMBOL = 'ticker_symbol' TICKER_SYMBOL = 'ticker_symbol'
QUARTERLY = 'quarterly' QUARTERLY = 'quarterly'
BATCH_SIZE = 'concurrent_batch_size'
TO_FILE = 'to_file' TO_FILE = 'to_file'
JSON_INDENT = 'json_indent' JSON_INDENT = 'json_indent'
VERBOSE = 'verbose' VERBOSE = 'verbose'
@ -26,6 +27,7 @@ def parse_cli() -> dict:
parser.add_argument( parser.add_argument(
TICKER_SYMBOL, TICKER_SYMBOL,
type=str, type=str,
nargs='+',
help="Stock ticker symbol of the company to be scraped the financials of" help="Stock ticker symbol of the company to be scraped the financials of"
) )
parser.add_argument( parser.add_argument(
@ -33,6 +35,14 @@ def parse_cli() -> dict:
action='store_true', action='store_true',
help="If set, the financial data for the last quarters is returned; otherwise yearly data is returned." help="If set, the financial data for the last quarters is returned; otherwise yearly data is returned."
) )
parser.add_argument(
'-b', f'--{BATCH_SIZE.replace("_", "-")}',
type=int,
default=DEFAULT_CONCURRENT_BATCH_SIZE,
help="If multiple ticker symbols are passed, the company financials can be scraped concurrently. "
"This argument determines how many datasets are scraped concurrently at any moment in time. "
"By default, they are scraped sequentially."
)
parser.add_argument( parser.add_argument(
'-f', f'--{TO_FILE.replace("_", "-")}', '-f', f'--{TO_FILE.replace("_", "-")}',
type=Path, type=Path,
@ -78,7 +88,8 @@ def write_to_csv(data: Dict[str, ResultDict], file_obj) -> None:
async def main() -> None: async def main() -> None:
args = parse_cli() args = parse_cli()
configure_logging(args[VERBOSE]) configure_logging(args[VERBOSE])
data = await get_company_financials(args[TICKER_SYMBOL], quarterly=args[QUARTERLY]) data = await get_all_financials(*args[TICKER_SYMBOL], quarterly=args[QUARTERLY],
concurrent_batch_size=args[BATCH_SIZE])
path: Path = args[TO_FILE] path: Path = args[TO_FILE]
if path is None: if path is None:
print(json.dumps(data, indent=args[JSON_INDENT])) print(json.dumps(data, indent=args[JSON_INDENT]))

View File

@ -1,3 +1,4 @@
DEV_MODE = False
MAIN_LOGGER_NAME = 'mwfin' MAIN_LOGGER_NAME = 'mwfin'
HTML_PARSER = 'html.parser' HTML_PARSER = 'html.parser'
@ -15,7 +16,7 @@ END_DATE = 'End Date'
# All items marked `False` do not need to be scraped # All items marked `False` do not need to be scraped
# because they are calculated from other items (e.g. growth or ratios). # because they are calculated from other items (e.g. growth or ratios).
FINANCIAL_STATEMENT_ITEMS = { FIN_STMT_ITEMS = {
################# #################
# Balance Sheet # # Balance Sheet #
################# #################

10
src/mwfin/exceptions.py Normal file
View File

@ -0,0 +1,10 @@
class WrongAssumptions(Exception):
pass
class UnexpectedMarkup(WrongAssumptions):
pass
class UnknownFinancialStatementItem(WrongAssumptions):
pass

View File

@ -1,12 +1,16 @@
import logging import logging
import asyncio
from typing import Union, List, Dict from typing import Union, List, Dict
from datetime import datetime
from aiohttp.client import ClientSession from aiohttp.client import ClientSession
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
from bs4.element import Tag from bs4.element import Tag
from webutils import in_async_session, gather_in_batches from webutils import in_async_session, gather_in_batches
from . import constants from .constants import (DEV_MODE, HTML_PARSER, BASE_URL, END_DATE, BS, IS, CF, FIN_STMT_URL_SUFFIX, FIN_STMT_ITEMS,
DEFAULT_CONCURRENT_BATCH_SIZE)
from .exceptions import UnknownFinancialStatementItem
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -25,7 +29,7 @@ async def soup_from_url(url: str, session: ClientSession = None) -> BeautifulSou
""" """
async with session.get(url) as response: async with session.get(url) as response:
html = await response.text() html = await response.text()
return BeautifulSoup(html, constants.HTML_PARSER) return BeautifulSoup(html, HTML_PARSER)
def extract_end_dates(soup: BeautifulSoup) -> tuple[str]: def extract_end_dates(soup: BeautifulSoup) -> tuple[str]:
@ -43,18 +47,27 @@ def is_relevant_table_row(tr: Tag) -> bool:
""" """
item_name = str(tr.td.div.string).strip() item_name = str(tr.td.div.string).strip()
try: try:
return constants.FINANCIAL_STATEMENT_ITEMS[item_name] return FIN_STMT_ITEMS[item_name]
except KeyError: except KeyError:
log.warning(f"Unknown item name '{item_name}' found in financial statement.") log.warning(f"Unknown item name '{item_name}' found in financial statement.")
return False raise UnknownFinancialStatementItem
def find_relevant_table_rows(soup: BeautifulSoup) -> List[Tag]: def find_relevant_table_rows(soup: BeautifulSoup) -> List[Tag]:
""" """
Returns the table rows containing the data of interest. Returns the table rows containing the data of interest.
""" """
trs = soup.find('div', attrs={'class': 'financials'}).tbody.find_all('tr') now = datetime.utcnow()
return [tr for tr in trs if is_relevant_table_row(tr)] trs = []
for tr in soup.find('div', attrs={'class': 'financials'}).tbody.find_all('tr'):
try:
if is_relevant_table_row(tr):
trs.append(tr)
except UnknownFinancialStatementItem:
if DEV_MODE:
with open(f'mwfin_unknown_items_{now.strftime("%Y-%m-%d_%H-%M-%S")}.html', 'w') as f:
f.write(str(soup))
return trs
def extract_row_data(tr: Tag) -> tuple[str, tuple[int]]: def extract_row_data(tr: Tag) -> tuple[str, tuple[int]]:
@ -73,7 +86,7 @@ def extract_all_data(soup: BeautifulSoup) -> ResultDict:
""" """
Extracts financials from the page. Extracts financials from the page.
""" """
output = {constants.END_DATE: extract_end_dates(soup)} output = {END_DATE: extract_end_dates(soup)}
for row in find_relevant_table_rows(soup): for row in find_relevant_table_rows(soup):
row_data = extract_row_data(row) row_data = extract_row_data(row)
output[row_data[0]] = row_data[1] output[row_data[0]] = row_data[1]
@ -86,7 +99,8 @@ async def _get_single_company_fin_stmt(statement: str, ticker_symbol: str, quart
""" """
Returns data from the specified financial statement of the specified company. Returns data from the specified financial statement of the specified company.
""" """
url = f'{constants.BASE_URL}/{ticker_symbol}/financials{constants.FIN_STMT_URL_SUFFIX[statement]}' log.info(f"Scraping {statement} for {ticker_symbol}")
url = f'{BASE_URL}/{ticker_symbol}/financials{FIN_STMT_URL_SUFFIX[statement]}'
if quarterly: if quarterly:
url += '/quarter' url += '/quarter'
soup = await soup_from_url(url, session) soup = await soup_from_url(url, session)
@ -95,70 +109,69 @@ async def _get_single_company_fin_stmt(statement: str, ticker_symbol: str, quart
@in_async_session @in_async_session
async def _get_multi_companies_fin_stmt(statement: str, *ticker_symbols: str, quarterly: bool = False, async def _get_multi_companies_fin_stmt(statement: str, *ticker_symbols: str, quarterly: bool = False,
concurrent_batch_size: int = constants.DEFAULT_CONCURRENT_BATCH_SIZE, concurrent_batch_size: int = DEFAULT_CONCURRENT_BATCH_SIZE,
session: ClientSession = None) -> Union[ResultDict, Dict[str, ResultDict]]: session: ClientSession = None) -> Union[ResultDict, Dict[str, ResultDict]]:
if len(ticker_symbols) == 1: if len(ticker_symbols) == 1:
return await _get_single_company_fin_stmt(statement, ticker_symbols[0], quarterly, session) return await _get_single_company_fin_stmt(statement, ticker_symbols[0], quarterly, session)
result_list = await gather_in_batches( coroutines = (_get_single_company_fin_stmt(statement, symbol, quarterly, session) for symbol in ticker_symbols)
concurrent_batch_size, result_list = await gather_in_batches(concurrent_batch_size, *coroutines)
*(_get_single_company_fin_stmt(statement, symbol, quarterly, session) for symbol in ticker_symbols)
)
return {symbol: data for symbol, data in zip(ticker_symbols, result_list)} return {symbol: data for symbol, data in zip(ticker_symbols, result_list)}
@in_async_session @in_async_session
async def get_balance_sheet(*ticker_symbols: str, quarterly: bool = False, async def get_balance_sheet(*ticker_symbols: str, quarterly: bool = False,
concurrent_batch_size: int = constants.DEFAULT_CONCURRENT_BATCH_SIZE, concurrent_batch_size: int = DEFAULT_CONCURRENT_BATCH_SIZE,
session: ClientSession = None) -> Union[ResultDict, Dict[str, ResultDict]]: session: ClientSession = None) -> Union[ResultDict, Dict[str, ResultDict]]:
""" """
Returns data from the balance sheet of the specified company. Returns data from the balance sheet of the specified company.
""" """
return await _get_multi_companies_fin_stmt(constants.BS, *ticker_symbols, return await _get_multi_companies_fin_stmt(BS, *ticker_symbols,
quarterly=quarterly, concurrent_batch_size=concurrent_batch_size, quarterly=quarterly, concurrent_batch_size=concurrent_batch_size,
session=session) session=session)
@in_async_session @in_async_session
async def get_income_statement(*ticker_symbols: str, quarterly: bool = False, async def get_income_statement(*ticker_symbols: str, quarterly: bool = False,
concurrent_batch_size: int = constants.DEFAULT_CONCURRENT_BATCH_SIZE, concurrent_batch_size: int = DEFAULT_CONCURRENT_BATCH_SIZE,
session: ClientSession = None) -> Union[ResultDict, Dict[str, ResultDict]]: session: ClientSession = None) -> Union[ResultDict, Dict[str, ResultDict]]:
""" """
Returns data from the income statement of the specified company. Returns data from the income statement of the specified company.
""" """
return await _get_multi_companies_fin_stmt(constants.IS, *ticker_symbols, return await _get_multi_companies_fin_stmt(IS, *ticker_symbols,
quarterly=quarterly, concurrent_batch_size=concurrent_batch_size, quarterly=quarterly, concurrent_batch_size=concurrent_batch_size,
session=session) session=session)
@in_async_session @in_async_session
async def get_cash_flow_statement(*ticker_symbols: str, quarterly: bool = False, async def get_cash_flow_statement(*ticker_symbols: str, quarterly: bool = False,
concurrent_batch_size: int = constants.DEFAULT_CONCURRENT_BATCH_SIZE, concurrent_batch_size: int = DEFAULT_CONCURRENT_BATCH_SIZE,
session: ClientSession = None) -> Union[ResultDict, Dict[str, ResultDict]]: session: ClientSession = None) -> Union[ResultDict, Dict[str, ResultDict]]:
""" """
Returns data from the cash flow statement of the specified company. Returns data from the cash flow statement of the specified company.
""" """
return await _get_multi_companies_fin_stmt(constants.CF, *ticker_symbols, return await _get_multi_companies_fin_stmt(CF, *ticker_symbols,
quarterly=quarterly, concurrent_batch_size=concurrent_batch_size, quarterly=quarterly, concurrent_batch_size=concurrent_batch_size,
session=session) session=session)
@in_async_session @in_async_session
async def _get_single_company_financials(ticker_symbol: str, quarterly: bool = False, async def _get_single_company_all_financials(ticker_symbol: str, quarterly: bool = False,
session: ClientSession = None) -> Dict[str, ResultDict]: session: ClientSession = None) -> Dict[str, ResultDict]:
return { coroutines = (_get_single_company_fin_stmt(stmt, ticker_symbol, quarterly, session) for stmt in (BS, IS, CF))
constants.BS: await _get_single_company_fin_stmt(constants.BS, ticker_symbol, quarterly, session), results = await asyncio.gather(*coroutines)
constants.IS: await _get_single_company_fin_stmt(constants.IS, ticker_symbol, quarterly, session), return {stmt: data for stmt, data in zip((BS, IS, CF), results)}
constants.CF: await _get_single_company_fin_stmt(constants.CF, ticker_symbol, quarterly, session)
}
@in_async_session @in_async_session
async def get_company_financials(*ticker_symbols: str, quarterly: bool = False, async def get_all_financials(*ticker_symbols: str, quarterly: bool = False,
session: ClientSession = None) -> Union[Dict[str, ResultDict], concurrent_batch_size: int = DEFAULT_CONCURRENT_BATCH_SIZE,
Dict[str, Dict[str, ResultDict]]]: session: ClientSession = None) -> Union[Dict[str, ResultDict],
Dict[str, Dict[str, ResultDict]]]:
""" """
Returns all fundamentals (balance sheet, income statement and cash flow statement) of the specified company. Returns all fundamentals (balance sheet, income statement and cash flow statement) of the specified company.
""" """
if len(ticker_symbols) == 1: if len(ticker_symbols) == 1:
return await _get_single_company_financials(ticker_symbols[0], quarterly, session) return await _get_single_company_all_financials(ticker_symbols[0], quarterly, session)
return {symbol: await _get_single_company_financials(symbol, quarterly, session) for symbol in ticker_symbols} coroutines = (_get_single_company_all_financials(symbol, quarterly, session) for symbol in ticker_symbols)
result_list = await gather_in_batches(concurrent_batch_size, *coroutines)
return {symbol: data for symbol, data in zip(ticker_symbols, result_list)}

View File

@ -7,6 +7,7 @@ from bs4 import BeautifulSoup
from mwfin import functions from mwfin import functions
from mwfin.constants import HTML_PARSER, BASE_URL, FIN_STMT_URL_SUFFIX, IS, BS, CF, END_DATE from mwfin.constants import HTML_PARSER, BASE_URL, FIN_STMT_URL_SUFFIX, IS, BS, CF, END_DATE
from mwfin.exceptions import UnknownFinancialStatementItem
THIS_DIR = Path(__file__).parent THIS_DIR = Path(__file__).parent
@ -62,15 +63,33 @@ class FunctionsTestCase(IsolatedAsyncioTestCase):
test_soup = BeautifulSoup('<tr><td><div> Cash & Short Term Investments Growth </div></td></tr>', HTML_PARSER) test_soup = BeautifulSoup('<tr><td><div> Cash & Short Term Investments Growth </div></td></tr>', HTML_PARSER)
self.assertFalse(functions.is_relevant_table_row(test_soup.tr)) self.assertFalse(functions.is_relevant_table_row(test_soup.tr))
test_soup = BeautifulSoup('<tr><td><div> baz </div></td></tr>', HTML_PARSER) test_soup = BeautifulSoup('<tr><td><div> baz </div></td></tr>', HTML_PARSER)
self.assertFalse(functions.is_relevant_table_row(test_soup.tr)) with self.assertRaises(UnknownFinancialStatementItem):
functions.is_relevant_table_row(test_soup.tr)
@patch.object(functions, 'open')
@patch.object(functions, 'is_relevant_table_row') @patch.object(functions, 'is_relevant_table_row')
def test_find_relevant_table_rows(self, mock_is_relevant_table_row): def test_find_relevant_table_rows(self, mock_is_relevant_table_row, mock_open):
mock_is_relevant_table_row.return_value = True mock_is_relevant_table_row.return_value = True
expected_output = self.test_soup.find('div', attrs={'class': 'financials'}).tbody.find_all('tr') expected_output = self.test_soup.find('div', attrs={'class': 'financials'}).tbody.find_all('tr')
tr0, tr1 = expected_output
output = functions.find_relevant_table_rows(self.test_soup) output = functions.find_relevant_table_rows(self.test_soup)
self.assertListEqual(expected_output, output) self.assertListEqual(expected_output, output)
mock_is_relevant_table_row.assert_has_calls([call(expected_output[0]), call(expected_output[1])]) mock_is_relevant_table_row.assert_has_calls([call(tr0), call(tr1)])
mock_is_relevant_table_row.reset_mock()
mock_is_relevant_table_row.side_effect = UnknownFinancialStatementItem()
expected_output = self.test_soup.find_all('thistagdoesntexist')
output = functions.find_relevant_table_rows(self.test_soup)
self.assertListEqual(expected_output, output)
mock_is_relevant_table_row.assert_has_calls([call(tr0), call(tr1)])
mock_is_relevant_table_row.reset_mock()
mock_write = mock_open.return_value.__enter__.return_value.write
with patch.object(functions, 'DEV_MODE', new=True):
output = functions.find_relevant_table_rows(self.test_soup)
self.assertListEqual(expected_output, output)
mock_is_relevant_table_row.assert_has_calls([call(tr0), call(tr1)])
mock_write.assert_has_calls([call(str(self.test_soup)), call(str(self.test_soup))])
def test_extract_row_data(self): def test_extract_row_data(self):
test_row = self.test_soup.find('div', attrs={'class': 'financials'}).tbody.tr test_row = self.test_soup.find('div', attrs={'class': 'financials'}).tbody.tr
@ -171,11 +190,11 @@ class FunctionsTestCase(IsolatedAsyncioTestCase):
await self._helper_test_get_any_statement(CF, mock__get_multi_companies_fin_stmt) await self._helper_test_get_any_statement(CF, mock__get_multi_companies_fin_stmt)
@patch.object(functions, '_get_single_company_fin_stmt') @patch.object(functions, '_get_single_company_fin_stmt')
async def test__get_single_company_financials(self, mock__get_single_company_fin_stmt): async def test__get_single_company_all_financials(self, mock__get_single_company_fin_stmt):
symbol, quarterly, mock_session = 'foo', False, MagicMock() symbol, quarterly, mock_session = 'foo', False, MagicMock()
mock__get_single_company_fin_stmt.return_value = bar = 'bar' mock__get_single_company_fin_stmt.return_value = bar = 'bar'
expected_output = {BS: bar, IS: bar, CF: bar} expected_output = {BS: bar, IS: bar, CF: bar}
output = await functions._get_single_company_financials(symbol, quarterly, mock_session) output = await functions._get_single_company_all_financials(symbol, quarterly, mock_session)
self.assertDictEqual(expected_output, output) self.assertDictEqual(expected_output, output)
mock__get_single_company_fin_stmt.assert_has_calls([ mock__get_single_company_fin_stmt.assert_has_calls([
call(BS, symbol, quarterly, mock_session), call(BS, symbol, quarterly, mock_session),
@ -183,21 +202,21 @@ class FunctionsTestCase(IsolatedAsyncioTestCase):
call(CF, symbol, quarterly, mock_session) call(CF, symbol, quarterly, mock_session)
]) ])
@patch.object(functions, '_get_single_company_financials') @patch.object(functions, '_get_single_company_all_financials')
async def test_get_company_financials(self, mock__get_single_company_financials): async def test_get_company_financials(self, mock__get_single_company_all_financials):
mock__get_single_company_financials.return_value = expected_output = 'baz' mock__get_single_company_all_financials.return_value = expected_output = 'baz'
symbol, quarterly, mock_session = 'foo', False, MagicMock() symbol, quarterly, mock_session = 'foo', False, MagicMock()
output = await functions.get_company_financials(symbol, quarterly=quarterly, session=mock_session) output = await functions.get_all_financials(symbol, quarterly=quarterly, session=mock_session)
self.assertEqual(expected_output, output) self.assertEqual(expected_output, output)
mock__get_single_company_financials.assert_called_once_with(symbol, quarterly, mock_session) mock__get_single_company_all_financials.assert_called_once_with(symbol, quarterly, mock_session)
mock__get_single_company_financials.reset_mock() mock__get_single_company_all_financials.reset_mock()
test_sym1, test_sym2 = 'x', 'y' test_sym1, test_sym2 = 'x', 'y'
expected_output = {test_sym1: expected_output, test_sym2: expected_output} expected_output = {test_sym1: expected_output, test_sym2: expected_output}
output = await functions.get_company_financials(test_sym1, test_sym2, output = await functions.get_all_financials(test_sym1, test_sym2,
quarterly=quarterly, session=mock_session) quarterly=quarterly, session=mock_session)
self.assertDictEqual(expected_output, output) self.assertDictEqual(expected_output, output)
mock__get_single_company_financials.assert_has_calls([ mock__get_single_company_all_financials.assert_has_calls([
call(test_sym1, quarterly, mock_session), call(test_sym1, quarterly, mock_session),
call(test_sym2, quarterly, mock_session) call(test_sym2, quarterly, mock_session)
]) ])
@ -212,7 +231,7 @@ class FunctionsTestCase(IsolatedAsyncioTestCase):
IS: {END_DATE: ('End_Date_1', 'End_Date_2'), 'Cash & Short Term Investments': (11000000, -22000000)}, IS: {END_DATE: ('End_Date_1', 'End_Date_2'), 'Cash & Short Term Investments': (11000000, -22000000)},
CF: {END_DATE: ('End_Date_1', 'End_Date_2'), 'Cash & Short Term Investments': (11000000, -22000000)} CF: {END_DATE: ('End_Date_1', 'End_Date_2'), 'Cash & Short Term Investments': (11000000, -22000000)}
} }
output = await functions.get_company_financials(symbol, session=mock_session_obj) output = await functions.get_all_financials(symbol, session=mock_session_obj)
self.assertDictEqual(expected_output, output) self.assertDictEqual(expected_output, output)
mock_session_obj.get.assert_has_calls([ mock_session_obj.get.assert_has_calls([
call(f'{BASE_URL}/{symbol}/financials{FIN_STMT_URL_SUFFIX[BS]}'), call(f'{BASE_URL}/{symbol}/financials{FIN_STMT_URL_SUFFIX[BS]}'),