165 lines
7.8 KiB
Python
165 lines
7.8 KiB
Python
import logging
|
|
from unittest import IsolatedAsyncioTestCase
|
|
from unittest.mock import patch, MagicMock, AsyncMock, call
|
|
|
|
from bs4 import BeautifulSoup
|
|
|
|
from stocksymbolscraper import scrape
|
|
|
|
|
|
class ScrapeTestCase(IsolatedAsyncioTestCase):
|
|
|
|
@patch.object(scrape, 'get_single_tr_data')
|
|
def test_extract_row_data(self, mock_get_single_tr_data: MagicMock):
|
|
foo = 'foo'
|
|
mock_get_single_tr_data.return_value = foo
|
|
input1, input2, input3 = MagicMock(), MagicMock(), MagicMock()
|
|
# Although the function expects BS4 Tag objects as arguments, we substitute with Mocks here
|
|
# because those arguments are immediately passed into another function, which we mock out anyway.
|
|
output = scrape.extract_row_data(input1, input2, input3)
|
|
expected_output = [foo, foo, foo]
|
|
self.assertListEqual(expected_output, output)
|
|
mock_get_single_tr_data.assert_has_calls([call(input1), call(input2), call(input3)])
|
|
|
|
@patch.object(scrape, 'get_str_from_td')
|
|
def test_get_single_tr_data(self, mock_get_str_from_td: MagicMock):
|
|
a, b, x = 'a', 'b', 'x'
|
|
mock_get_str_from_td.return_value = x
|
|
test_html = f'<tr> <td><a>{a}<small>({b})</small></a></td>' + \
|
|
f'<td>foo</td> <td>bar</td> <td>baz</td> </tr>'
|
|
test_tr = BeautifulSoup(test_html, scrape.HTML_PARSER).tr
|
|
test_tds = test_tr.find_all('td')
|
|
output = scrape.get_single_tr_data(test_tr)
|
|
expected_output = (a, b, x, x, x)
|
|
self.assertTupleEqual(expected_output, output)
|
|
mock_get_str_from_td.assert_has_calls([call(test_tds[1]), call(test_tds[2]), call(test_tds[3])])
|
|
|
|
test_html = f'<tr> <td><a>{a}<small>***{b}***</small></a></td>' + \
|
|
f'<td>foo</td> <td>bar</td> <td>baz</td> </tr>'
|
|
test_tr = BeautifulSoup(test_html, scrape.HTML_PARSER).tr
|
|
test_tds = test_tr.find_all('td')
|
|
output = scrape.get_single_tr_data(test_tr)
|
|
expected_output = (a, f'***{b}***', x, x, x)
|
|
self.assertTupleEqual(expected_output, output)
|
|
mock_get_str_from_td.assert_has_calls([call(test_tds[1]), call(test_tds[2]), call(test_tds[3])])
|
|
|
|
def test_get_str_from_td(self):
|
|
expected_output = 'foo bar'
|
|
test_td = BeautifulSoup(f'<td> {expected_output} </td>', scrape.HTML_PARSER).td
|
|
output = scrape.get_str_from_td(test_td)
|
|
self.assertEqual(expected_output, output)
|
|
|
|
expected_output = ''
|
|
test_td = BeautifulSoup('<td></td>', scrape.HTML_PARSER).td
|
|
output = scrape.get_str_from_td(test_td)
|
|
self.assertEqual(expected_output, output)
|
|
|
|
@patch.object(scrape, 'ClientSession')
|
|
async def test_soup_from_url(self, mock_session_cls):
|
|
test_html = '<b>foo</b>'
|
|
mock_response = MagicMock()
|
|
mock_response.text = AsyncMock(return_value=test_html)
|
|
mock_get_return = MagicMock()
|
|
mock_get_return.__aenter__ = AsyncMock(return_value=mock_response)
|
|
mock_session_obj = MagicMock()
|
|
mock_session_obj.get = MagicMock(return_value=mock_get_return)
|
|
mock_session_cls.return_value = mock_session_obj
|
|
output = await scrape.soup_from_url('foo')
|
|
expected_output = BeautifulSoup(test_html, scrape.HTML_PARSER)
|
|
self.assertEqual(expected_output, output)
|
|
|
|
output = await scrape.soup_from_url('foo', mock_session_obj)
|
|
self.assertEqual(expected_output, output)
|
|
|
|
def test_trs_from_page(self):
|
|
tr1_text, tr2_text = '<tr>foo</tr>', '<tr>bar</tr>'
|
|
test_html = f'<div id="marketsindex"><table><tbody>{tr1_text}{tr2_text}</tbody></table></div>'
|
|
test_soup = BeautifulSoup(test_html, scrape.HTML_PARSER)
|
|
output = scrape.trs_from_page(test_soup)
|
|
expected_output = test_soup.find_all('tr')
|
|
self.assertSequenceEqual(expected_output, output)
|
|
|
|
logging.disable(logging.CRITICAL)
|
|
test_html = f'<div id="marketsindex"><table>garbage</table></div>'
|
|
test_soup = BeautifulSoup(test_html, scrape.HTML_PARSER)
|
|
with patch.object(scrape, 'open') as mock_open:
|
|
self.assertRaises(scrape.UnexpectedMarkupError, scrape.trs_from_page, test_soup)
|
|
mock_open.assert_called_once()
|
|
mock_open.return_value.__enter__.return_value.write.assert_called_once_with(test_soup.prettify())
|
|
logging.disable(logging.NOTSET)
|
|
|
|
@patch.object(scrape, 'soup_from_url')
|
|
@patch.object(scrape, 'trs_from_page')
|
|
@patch.object(scrape, 'extract_row_data')
|
|
@patch.object(scrape, 'ClientSession')
|
|
async def test_get_data_from_category(self, mock_session_cls, mock_extract_row_data,
|
|
mock_trs_from_page, mock_soup_from_url):
|
|
# We do not pass a session object into the tested function,
|
|
# so we mock the ClientSession class for the first two tests.
|
|
mock_session = MagicMock()
|
|
mock_session_cls.return_value = mock_session
|
|
mock_soup = MagicMock()
|
|
mock_soup_from_url.return_value = mock_soup
|
|
category = 'ßßß'
|
|
url = f'{scrape.BASE_URL}{category}'
|
|
|
|
# First test with no TRs returned, thus not entering the loop.
|
|
mock_trs = []
|
|
mock_trs_from_page.return_value = mock_trs
|
|
mock_extract_row_data.return_value = expected_output = []
|
|
output = await scrape.get_data_from_category(category)
|
|
self.assertListEqual(expected_output, output)
|
|
mock_soup_from_url.assert_called_once_with(url, mock_session)
|
|
mock_trs_from_page.assert_called_once_with(mock_soup)
|
|
mock_extract_row_data.assert_not_called()
|
|
|
|
mock_soup_from_url.reset_mock()
|
|
mock_trs_from_page.reset_mock()
|
|
|
|
# Second test with (fake) TRs returned, thus entering the loop.
|
|
# We pass a last_page argument to stop after the first iteration.
|
|
mock_trs = ['foo', 'bar']
|
|
mock_trs_from_page.return_value = mock_trs
|
|
mock_extract_row_data.return_value = expected_output = ['a', 'b']
|
|
|
|
# Factored out checks because the only difference in the next two tests is the presence or absence of a
|
|
# real session instance.
|
|
def check_assertions(test_output, session_obj):
|
|
self.assertListEqual(expected_output, test_output)
|
|
mock_soup_from_url.assert_has_calls([call(url, session_obj), call(f'{url}/{2}', session_obj)])
|
|
mock_trs_from_page.assert_has_calls([call(mock_soup), call(mock_soup)])
|
|
mock_extract_row_data.assert_called_once_with(*mock_trs)
|
|
|
|
output = await scrape.get_data_from_category(category, last_page=1)
|
|
check_assertions(output, mock_session)
|
|
|
|
mock_soup_from_url.reset_mock()
|
|
mock_trs_from_page.reset_mock()
|
|
mock_extract_row_data.reset_mock()
|
|
|
|
# Third test with (fake) TRs returned and explicitly passing a real session object.
|
|
async with scrape.ClientSession() as session:
|
|
output = await scrape.get_data_from_category(category, session, last_page=1)
|
|
check_assertions(output, session)
|
|
|
|
@patch.object(scrape, 'get_data_from_category')
|
|
@patch.object(scrape, 'ClientSession')
|
|
async def test_get_all_data(self, mock_session_cls, mock_get_data_from_category):
|
|
mock_session = MagicMock()
|
|
mock_session_cls.return_value.__aenter__.return_value = mock_session
|
|
mock_result = ['foo']
|
|
expected_output = len(scrape.CATEGORIES) * mock_result
|
|
mock_get_data_from_category.return_value = mock_result
|
|
|
|
output = await scrape.get_all_data(sequential=True)
|
|
self.assertListEqual(expected_output, output)
|
|
mock_get_data_from_category.assert_has_calls([
|
|
call(category, mock_session) for category in scrape.CATEGORIES
|
|
])
|
|
|
|
output = await scrape.get_all_data(sequential=False)
|
|
self.assertListEqual(expected_output, output)
|
|
mock_get_data_from_category.assert_has_calls([
|
|
call(category, mock_session) for category in scrape.CATEGORIES
|
|
])
|