Compare commits

...

7 Commits

6 changed files with 105 additions and 51 deletions

View File

@ -1 +1 @@
from .functions import get_company_financials
from .functions import get_balance_sheet, get_income_statement, get_cash_flow_statement, get_all_financials

View File

@ -6,8 +6,8 @@ from argparse import ArgumentParser
from pathlib import Path
from typing import Dict
from .functions import get_company_financials, ResultDict
from .constants import END_DATE, MAIN_LOGGER_NAME
from .functions import get_all_financials, ResultDict
from .constants import END_DATE, MAIN_LOGGER_NAME, DEFAULT_CONCURRENT_BATCH_SIZE
log = logging.getLogger(MAIN_LOGGER_NAME)
@ -16,6 +16,7 @@ JSON_EXT, CSV_EXT = '.json', '.csv'
TICKER_SYMBOL = 'ticker_symbol'
QUARTERLY = 'quarterly'
BATCH_SIZE = 'concurrent_batch_size'
TO_FILE = 'to_file'
JSON_INDENT = 'json_indent'
VERBOSE = 'verbose'
@ -26,6 +27,7 @@ def parse_cli() -> dict:
parser.add_argument(
TICKER_SYMBOL,
type=str,
nargs='+',
help="Stock ticker symbol of the company to be scraped the financials of"
)
parser.add_argument(
@ -33,6 +35,14 @@ def parse_cli() -> dict:
action='store_true',
help="If set, the financial data for the last quarters is returned; otherwise yearly data is returned."
)
parser.add_argument(
'-b', f'--{BATCH_SIZE.replace("_", "-")}',
type=int,
default=DEFAULT_CONCURRENT_BATCH_SIZE,
help="If multiple ticker symbols are passed, the company financials can be scraped concurrently. "
"This argument determines how many datasets are scraped concurrently at any moment in time. "
"By default, they are scraped sequentially."
)
parser.add_argument(
'-f', f'--{TO_FILE.replace("_", "-")}',
type=Path,
@ -78,7 +88,8 @@ def write_to_csv(data: Dict[str, ResultDict], file_obj) -> None:
async def main() -> None:
args = parse_cli()
configure_logging(args[VERBOSE])
data = await get_company_financials(args[TICKER_SYMBOL], quarterly=args[QUARTERLY])
data = await get_all_financials(*args[TICKER_SYMBOL], quarterly=args[QUARTERLY],
concurrent_batch_size=args[BATCH_SIZE])
path: Path = args[TO_FILE]
if path is None:
print(json.dumps(data, indent=args[JSON_INDENT]))

View File

@ -1,3 +1,4 @@
DEV_MODE = False
MAIN_LOGGER_NAME = 'mwfin'
HTML_PARSER = 'html.parser'
@ -15,7 +16,7 @@ END_DATE = 'End Date'
# All items marked `False` do not need to be scraped
# because they are calculated from other items (e.g. growth or ratios).
FINANCIAL_STATEMENT_ITEMS = {
FIN_STMT_ITEMS = {
#################
# Balance Sheet #
#################

10
src/mwfin/exceptions.py Normal file
View File

@ -0,0 +1,10 @@
class WrongAssumptions(Exception):
pass
class UnexpectedMarkup(WrongAssumptions):
pass
class UnknownFinancialStatementItem(WrongAssumptions):
pass

View File

@ -1,12 +1,16 @@
import logging
import asyncio
from typing import Union, List, Dict
from datetime import datetime
from aiohttp.client import ClientSession
from bs4 import BeautifulSoup
from bs4.element import Tag
from webutils import in_async_session, gather_in_batches
from . import constants
from .constants import (DEV_MODE, HTML_PARSER, BASE_URL, END_DATE, BS, IS, CF, FIN_STMT_URL_SUFFIX, FIN_STMT_ITEMS,
DEFAULT_CONCURRENT_BATCH_SIZE)
from .exceptions import UnknownFinancialStatementItem
log = logging.getLogger(__name__)
@ -25,7 +29,7 @@ async def soup_from_url(url: str, session: ClientSession = None) -> BeautifulSou
"""
async with session.get(url) as response:
html = await response.text()
return BeautifulSoup(html, constants.HTML_PARSER)
return BeautifulSoup(html, HTML_PARSER)
def extract_end_dates(soup: BeautifulSoup) -> tuple[str]:
@ -43,18 +47,27 @@ def is_relevant_table_row(tr: Tag) -> bool:
"""
item_name = str(tr.td.div.string).strip()
try:
return constants.FINANCIAL_STATEMENT_ITEMS[item_name]
return FIN_STMT_ITEMS[item_name]
except KeyError:
log.warning(f"Unknown item name '{item_name}' found in financial statement.")
return False
raise UnknownFinancialStatementItem
def find_relevant_table_rows(soup: BeautifulSoup) -> List[Tag]:
"""
Returns the table rows containing the data of interest.
"""
trs = soup.find('div', attrs={'class': 'financials'}).tbody.find_all('tr')
return [tr for tr in trs if is_relevant_table_row(tr)]
now = datetime.utcnow()
trs = []
for tr in soup.find('div', attrs={'class': 'financials'}).tbody.find_all('tr'):
try:
if is_relevant_table_row(tr):
trs.append(tr)
except UnknownFinancialStatementItem:
if DEV_MODE:
with open(f'mwfin_unknown_items_{now.strftime("%Y-%m-%d_%H-%M-%S")}.html', 'w') as f:
f.write(str(soup))
return trs
def extract_row_data(tr: Tag) -> tuple[str, tuple[int]]:
@ -73,7 +86,7 @@ def extract_all_data(soup: BeautifulSoup) -> ResultDict:
"""
Extracts financials from the page.
"""
output = {constants.END_DATE: extract_end_dates(soup)}
output = {END_DATE: extract_end_dates(soup)}
for row in find_relevant_table_rows(soup):
row_data = extract_row_data(row)
output[row_data[0]] = row_data[1]
@ -86,7 +99,8 @@ async def _get_single_company_fin_stmt(statement: str, ticker_symbol: str, quart
"""
Returns data from the specified financial statement of the specified company.
"""
url = f'{constants.BASE_URL}/{ticker_symbol}/financials{constants.FIN_STMT_URL_SUFFIX[statement]}'
log.info(f"Scraping {statement} for {ticker_symbol}")
url = f'{BASE_URL}/{ticker_symbol}/financials{FIN_STMT_URL_SUFFIX[statement]}'
if quarterly:
url += '/quarter'
soup = await soup_from_url(url, session)
@ -95,70 +109,69 @@ async def _get_single_company_fin_stmt(statement: str, ticker_symbol: str, quart
@in_async_session
async def _get_multi_companies_fin_stmt(statement: str, *ticker_symbols: str, quarterly: bool = False,
concurrent_batch_size: int = constants.DEFAULT_CONCURRENT_BATCH_SIZE,
concurrent_batch_size: int = DEFAULT_CONCURRENT_BATCH_SIZE,
session: ClientSession = None) -> Union[ResultDict, Dict[str, ResultDict]]:
if len(ticker_symbols) == 1:
return await _get_single_company_fin_stmt(statement, ticker_symbols[0], quarterly, session)
result_list = await gather_in_batches(
concurrent_batch_size,
*(_get_single_company_fin_stmt(statement, symbol, quarterly, session) for symbol in ticker_symbols)
)
coroutines = (_get_single_company_fin_stmt(statement, symbol, quarterly, session) for symbol in ticker_symbols)
result_list = await gather_in_batches(concurrent_batch_size, *coroutines)
return {symbol: data for symbol, data in zip(ticker_symbols, result_list)}
@in_async_session
async def get_balance_sheet(*ticker_symbols: str, quarterly: bool = False,
concurrent_batch_size: int = constants.DEFAULT_CONCURRENT_BATCH_SIZE,
concurrent_batch_size: int = DEFAULT_CONCURRENT_BATCH_SIZE,
session: ClientSession = None) -> Union[ResultDict, Dict[str, ResultDict]]:
"""
Returns data from the balance sheet of the specified company.
"""
return await _get_multi_companies_fin_stmt(constants.BS, *ticker_symbols,
return await _get_multi_companies_fin_stmt(BS, *ticker_symbols,
quarterly=quarterly, concurrent_batch_size=concurrent_batch_size,
session=session)
@in_async_session
async def get_income_statement(*ticker_symbols: str, quarterly: bool = False,
concurrent_batch_size: int = constants.DEFAULT_CONCURRENT_BATCH_SIZE,
concurrent_batch_size: int = DEFAULT_CONCURRENT_BATCH_SIZE,
session: ClientSession = None) -> Union[ResultDict, Dict[str, ResultDict]]:
"""
Returns data from the income statement of the specified company.
"""
return await _get_multi_companies_fin_stmt(constants.IS, *ticker_symbols,
return await _get_multi_companies_fin_stmt(IS, *ticker_symbols,
quarterly=quarterly, concurrent_batch_size=concurrent_batch_size,
session=session)
@in_async_session
async def get_cash_flow_statement(*ticker_symbols: str, quarterly: bool = False,
concurrent_batch_size: int = constants.DEFAULT_CONCURRENT_BATCH_SIZE,
concurrent_batch_size: int = DEFAULT_CONCURRENT_BATCH_SIZE,
session: ClientSession = None) -> Union[ResultDict, Dict[str, ResultDict]]:
"""
Returns data from the cash flow statement of the specified company.
"""
return await _get_multi_companies_fin_stmt(constants.CF, *ticker_symbols,
return await _get_multi_companies_fin_stmt(CF, *ticker_symbols,
quarterly=quarterly, concurrent_batch_size=concurrent_batch_size,
session=session)
@in_async_session
async def _get_single_company_financials(ticker_symbol: str, quarterly: bool = False,
session: ClientSession = None) -> Dict[str, ResultDict]:
return {
constants.BS: await _get_single_company_fin_stmt(constants.BS, ticker_symbol, quarterly, session),
constants.IS: await _get_single_company_fin_stmt(constants.IS, ticker_symbol, quarterly, session),
constants.CF: await _get_single_company_fin_stmt(constants.CF, ticker_symbol, quarterly, session)
}
async def _get_single_company_all_financials(ticker_symbol: str, quarterly: bool = False,
session: ClientSession = None) -> Dict[str, ResultDict]:
coroutines = (_get_single_company_fin_stmt(stmt, ticker_symbol, quarterly, session) for stmt in (BS, IS, CF))
results = await asyncio.gather(*coroutines)
return {stmt: data for stmt, data in zip((BS, IS, CF), results)}
@in_async_session
async def get_company_financials(*ticker_symbols: str, quarterly: bool = False,
session: ClientSession = None) -> Union[Dict[str, ResultDict],
Dict[str, Dict[str, ResultDict]]]:
async def get_all_financials(*ticker_symbols: str, quarterly: bool = False,
concurrent_batch_size: int = DEFAULT_CONCURRENT_BATCH_SIZE,
session: ClientSession = None) -> Union[Dict[str, ResultDict],
Dict[str, Dict[str, ResultDict]]]:
"""
Returns all fundamentals (balance sheet, income statement and cash flow statement) of the specified company.
"""
if len(ticker_symbols) == 1:
return await _get_single_company_financials(ticker_symbols[0], quarterly, session)
return {symbol: await _get_single_company_financials(symbol, quarterly, session) for symbol in ticker_symbols}
return await _get_single_company_all_financials(ticker_symbols[0], quarterly, session)
coroutines = (_get_single_company_all_financials(symbol, quarterly, session) for symbol in ticker_symbols)
result_list = await gather_in_batches(concurrent_batch_size, *coroutines)
return {symbol: data for symbol, data in zip(ticker_symbols, result_list)}

View File

@ -7,6 +7,7 @@ from bs4 import BeautifulSoup
from mwfin import functions
from mwfin.constants import HTML_PARSER, BASE_URL, FIN_STMT_URL_SUFFIX, IS, BS, CF, END_DATE
from mwfin.exceptions import UnknownFinancialStatementItem
THIS_DIR = Path(__file__).parent
@ -62,15 +63,33 @@ class FunctionsTestCase(IsolatedAsyncioTestCase):
test_soup = BeautifulSoup('<tr><td><div> Cash & Short Term Investments Growth </div></td></tr>', HTML_PARSER)
self.assertFalse(functions.is_relevant_table_row(test_soup.tr))
test_soup = BeautifulSoup('<tr><td><div> baz </div></td></tr>', HTML_PARSER)
self.assertFalse(functions.is_relevant_table_row(test_soup.tr))
with self.assertRaises(UnknownFinancialStatementItem):
functions.is_relevant_table_row(test_soup.tr)
@patch.object(functions, 'open')
@patch.object(functions, 'is_relevant_table_row')
def test_find_relevant_table_rows(self, mock_is_relevant_table_row):
def test_find_relevant_table_rows(self, mock_is_relevant_table_row, mock_open):
mock_is_relevant_table_row.return_value = True
expected_output = self.test_soup.find('div', attrs={'class': 'financials'}).tbody.find_all('tr')
tr0, tr1 = expected_output
output = functions.find_relevant_table_rows(self.test_soup)
self.assertListEqual(expected_output, output)
mock_is_relevant_table_row.assert_has_calls([call(expected_output[0]), call(expected_output[1])])
mock_is_relevant_table_row.assert_has_calls([call(tr0), call(tr1)])
mock_is_relevant_table_row.reset_mock()
mock_is_relevant_table_row.side_effect = UnknownFinancialStatementItem()
expected_output = self.test_soup.find_all('thistagdoesntexist')
output = functions.find_relevant_table_rows(self.test_soup)
self.assertListEqual(expected_output, output)
mock_is_relevant_table_row.assert_has_calls([call(tr0), call(tr1)])
mock_is_relevant_table_row.reset_mock()
mock_write = mock_open.return_value.__enter__.return_value.write
with patch.object(functions, 'DEV_MODE', new=True):
output = functions.find_relevant_table_rows(self.test_soup)
self.assertListEqual(expected_output, output)
mock_is_relevant_table_row.assert_has_calls([call(tr0), call(tr1)])
mock_write.assert_has_calls([call(str(self.test_soup)), call(str(self.test_soup))])
def test_extract_row_data(self):
test_row = self.test_soup.find('div', attrs={'class': 'financials'}).tbody.tr
@ -171,11 +190,11 @@ class FunctionsTestCase(IsolatedAsyncioTestCase):
await self._helper_test_get_any_statement(CF, mock__get_multi_companies_fin_stmt)
@patch.object(functions, '_get_single_company_fin_stmt')
async def test__get_single_company_financials(self, mock__get_single_company_fin_stmt):
async def test__get_single_company_all_financials(self, mock__get_single_company_fin_stmt):
symbol, quarterly, mock_session = 'foo', False, MagicMock()
mock__get_single_company_fin_stmt.return_value = bar = 'bar'
expected_output = {BS: bar, IS: bar, CF: bar}
output = await functions._get_single_company_financials(symbol, quarterly, mock_session)
output = await functions._get_single_company_all_financials(symbol, quarterly, mock_session)
self.assertDictEqual(expected_output, output)
mock__get_single_company_fin_stmt.assert_has_calls([
call(BS, symbol, quarterly, mock_session),
@ -183,21 +202,21 @@ class FunctionsTestCase(IsolatedAsyncioTestCase):
call(CF, symbol, quarterly, mock_session)
])
@patch.object(functions, '_get_single_company_financials')
async def test_get_company_financials(self, mock__get_single_company_financials):
mock__get_single_company_financials.return_value = expected_output = 'baz'
@patch.object(functions, '_get_single_company_all_financials')
async def test_get_company_financials(self, mock__get_single_company_all_financials):
mock__get_single_company_all_financials.return_value = expected_output = 'baz'
symbol, quarterly, mock_session = 'foo', False, MagicMock()
output = await functions.get_company_financials(symbol, quarterly=quarterly, session=mock_session)
output = await functions.get_all_financials(symbol, quarterly=quarterly, session=mock_session)
self.assertEqual(expected_output, output)
mock__get_single_company_financials.assert_called_once_with(symbol, quarterly, mock_session)
mock__get_single_company_financials.reset_mock()
mock__get_single_company_all_financials.assert_called_once_with(symbol, quarterly, mock_session)
mock__get_single_company_all_financials.reset_mock()
test_sym1, test_sym2 = 'x', 'y'
expected_output = {test_sym1: expected_output, test_sym2: expected_output}
output = await functions.get_company_financials(test_sym1, test_sym2,
quarterly=quarterly, session=mock_session)
output = await functions.get_all_financials(test_sym1, test_sym2,
quarterly=quarterly, session=mock_session)
self.assertDictEqual(expected_output, output)
mock__get_single_company_financials.assert_has_calls([
mock__get_single_company_all_financials.assert_has_calls([
call(test_sym1, quarterly, mock_session),
call(test_sym2, quarterly, mock_session)
])
@ -212,7 +231,7 @@ class FunctionsTestCase(IsolatedAsyncioTestCase):
IS: {END_DATE: ('End_Date_1', 'End_Date_2'), 'Cash & Short Term Investments': (11000000, -22000000)},
CF: {END_DATE: ('End_Date_1', 'End_Date_2'), 'Cash & Short Term Investments': (11000000, -22000000)}
}
output = await functions.get_company_financials(symbol, session=mock_session_obj)
output = await functions.get_all_financials(symbol, session=mock_session_obj)
self.assertDictEqual(expected_output, output)
mock_session_obj.get.assert_has_calls([
call(f'{BASE_URL}/{symbol}/financials{FIN_STMT_URL_SUFFIX[BS]}'),