Compare commits

...

9 Commits

9 changed files with 221 additions and 19 deletions

10
.coveragerc Normal file
View File

@ -0,0 +1,10 @@
[run]
source = .
branch = true
omit =
.venv/*
[report]
fail_under = 100
show_missing = True
skip_covered = True

8
.gitignore vendored
View File

@ -1,2 +1,10 @@
# Virtual environment
/.venv/
# PyCharm:
/.idea/
# Distribution / packaging:
*.egg-info/
# Python cache:
__pycache__/
# Tests:
.coverage

3
coverage.sh Executable file
View File

@ -0,0 +1,3 @@
#!/usr/bin/env sh
coverage erase && coverage run -m unittest discover && coverage report

View File

@ -19,7 +19,7 @@ keywords = webscraping, html, markup, dom, scraper, attributes, tags, stocks, fi
package_dir =
= src
packages = find:
python_requires = >=3.7
python_requires = >=3.8
install_requires =
beautifulsoup4
aiohttp

View File

@ -0,0 +1 @@
from .scrape import *

View File

@ -5,10 +5,10 @@ import csv
from argparse import ArgumentParser
from pathlib import Path
from .scrape import get_all_data, log
from . import get_all_data, log
def main() -> None:
async def main() -> None:
parser = ArgumentParser(description="Scrape all stock symbols")
parser.add_argument(
'-v', '--verbose',
@ -29,7 +29,7 @@ def main() -> None:
if args.verbose:
log.setLevel(logging.DEBUG)
data = asyncio.run(get_all_data(args.sequential))
data = await get_all_data(args.sequential)
if args.to_file is None:
csv.writer(sys.stdout).writerows(data)
@ -39,4 +39,4 @@ def main() -> None:
if __name__ == '__main__':
main()
asyncio.run(main())

View File

@ -25,6 +25,7 @@ DIGIT_CATEGORY = '0-9'
OTHER_CATEGORY = 'Other'
CATEGORIES = [DIGIT_CATEGORY] + list(ascii_uppercase) + [OTHER_CATEGORY]
STOCK_SYMBOL_PATTERN = re.compile(r'\(([\w.&]+)\)')
HTML_PARSER = 'html.parser'
class UnexpectedMarkupError(Exception):
@ -88,16 +89,34 @@ def get_str_from_td(td: Tag) -> str:
return str(content).strip()
async def trs_from_page(url: str, session: ClientSession = None, limit: int = None) -> ResultSet:
async def soup_from_url(url: str, session: ClientSession = None) -> BeautifulSoup:
"""
Requests page and converts contents into a BeautifulSoup object.
Args:
url:
URL string leading to any page with matching content.
session (optional):
If passed a ClientSession instance, all HTTP requests will be made using that session;
otherwise a new one is created.
Returns:
A BeautifulSoup object for further data extraction
"""
if session is None:
session = ClientSession()
async with session.get(url) as response:
html = await response.text()
return BeautifulSoup(html, HTML_PARSER)
def trs_from_page(soup: BeautifulSoup, limit: int = None) -> ResultSet:
"""
Returns the table rows found on the specified page.
Args:
url:
URL string leading to a page with matching content.
session (optional):
If passed a ClientSession instance, all HTTP requests will be made using that session;
otherwise a new one is created.
soup:
Page text to be scoured for table rows.
limit (optional):
Stop looking after finding this many results;
finds all matches by default.
@ -109,18 +128,13 @@ async def trs_from_page(url: str, session: ClientSession = None, limit: int = No
Returns:
A ResultSet object containing all extracted 'tr' Tag objects
"""
if session is None:
session = ClientSession()
async with session.get(url) as response:
html = await response.text()
soup = BeautifulSoup(html, 'html.parser')
try:
return soup.find('div', {'id': 'marketsindex'}).table.tbody.find_all('tr', limit=limit)
except AttributeError:
log.error("Unexpected HTML markup!")
file_name = f'unexpected_response_at_{datetime.now().strftime("%Y-%m-%d_%H-%M")}.html'
with open(file_name, 'w') as f:
f.write(html)
f.write(soup.prettify())
raise UnexpectedMarkupError
@ -148,12 +162,14 @@ async def get_data_from_category(category: str, session: ClientSession = None,
session = ClientSession()
data: list[row_type] = []
page = first_page
trs = await trs_from_page(f'{BASE_URL}{category}', session)
soup = await soup_from_url(f'{BASE_URL}{category}', session)
trs = trs_from_page(soup)
while page <= last_page and len(trs) > 0:
data.extend(extract_row_data(*trs))
log.info(f"Scraped '{category}' page {page}")
page += 1
trs = await trs_from_page(f'{BASE_URL}{category}/{page}', session)
soup = await soup_from_url(f'{BASE_URL}{category}/{page}', session)
trs = trs_from_page(soup)
return data

0
tests/__init__.py Normal file
View File

164
tests/test_scrape.py Normal file
View File

@ -0,0 +1,164 @@
import logging
from unittest import IsolatedAsyncioTestCase
from unittest.mock import patch, MagicMock, AsyncMock, call
from bs4 import BeautifulSoup
from stocksymbolscraper import scrape
class ScrapeTestCase(IsolatedAsyncioTestCase):
@patch.object(scrape, 'get_single_tr_data')
def test_extract_row_data(self, mock_get_single_tr_data: MagicMock):
foo = 'foo'
mock_get_single_tr_data.return_value = foo
input1, input2, input3 = MagicMock(), MagicMock(), MagicMock()
# Although the function expects BS4 Tag objects as arguments, we substitute with Mocks here
# because those arguments are immediately passed into another function, which we mock out anyway.
output = scrape.extract_row_data(input1, input2, input3)
expected_output = [foo, foo, foo]
self.assertListEqual(expected_output, output)
mock_get_single_tr_data.assert_has_calls([call(input1), call(input2), call(input3)])
@patch.object(scrape, 'get_str_from_td')
def test_get_single_tr_data(self, mock_get_str_from_td: MagicMock):
a, b, x = 'a', 'b', 'x'
mock_get_str_from_td.return_value = x
test_html = f'<tr> <td><a>{a}<small>({b})</small></a></td>' + \
f'<td>foo</td> <td>bar</td> <td>baz</td> </tr>'
test_tr = BeautifulSoup(test_html, scrape.HTML_PARSER).tr
test_tds = test_tr.find_all('td')
output = scrape.get_single_tr_data(test_tr)
expected_output = (a, b, x, x, x)
self.assertTupleEqual(expected_output, output)
mock_get_str_from_td.assert_has_calls([call(test_tds[1]), call(test_tds[2]), call(test_tds[3])])
test_html = f'<tr> <td><a>{a}<small>***{b}***</small></a></td>' + \
f'<td>foo</td> <td>bar</td> <td>baz</td> </tr>'
test_tr = BeautifulSoup(test_html, scrape.HTML_PARSER).tr
test_tds = test_tr.find_all('td')
output = scrape.get_single_tr_data(test_tr)
expected_output = (a, f'***{b}***', x, x, x)
self.assertTupleEqual(expected_output, output)
mock_get_str_from_td.assert_has_calls([call(test_tds[1]), call(test_tds[2]), call(test_tds[3])])
def test_get_str_from_td(self):
expected_output = 'foo bar'
test_td = BeautifulSoup(f'<td> {expected_output} </td>', scrape.HTML_PARSER).td
output = scrape.get_str_from_td(test_td)
self.assertEqual(expected_output, output)
expected_output = ''
test_td = BeautifulSoup('<td></td>', scrape.HTML_PARSER).td
output = scrape.get_str_from_td(test_td)
self.assertEqual(expected_output, output)
@patch.object(scrape, 'ClientSession')
async def test_soup_from_url(self, mock_session_cls):
test_html = '<b>foo</b>'
mock_response = MagicMock()
mock_response.text = AsyncMock(return_value=test_html)
mock_get_return = MagicMock()
mock_get_return.__aenter__ = AsyncMock(return_value=mock_response)
mock_session_obj = MagicMock()
mock_session_obj.get = MagicMock(return_value=mock_get_return)
mock_session_cls.return_value = mock_session_obj
output = await scrape.soup_from_url('foo')
expected_output = BeautifulSoup(test_html, scrape.HTML_PARSER)
self.assertEqual(expected_output, output)
output = await scrape.soup_from_url('foo', mock_session_obj)
self.assertEqual(expected_output, output)
def test_trs_from_page(self):
tr1_text, tr2_text = '<tr>foo</tr>', '<tr>bar</tr>'
test_html = f'<div id="marketsindex"><table><tbody>{tr1_text}{tr2_text}</tbody></table></div>'
test_soup = BeautifulSoup(test_html, scrape.HTML_PARSER)
output = scrape.trs_from_page(test_soup)
expected_output = test_soup.find_all('tr')
self.assertSequenceEqual(expected_output, output)
logging.disable(logging.CRITICAL)
test_html = f'<div id="marketsindex"><table>garbage</table></div>'
test_soup = BeautifulSoup(test_html, scrape.HTML_PARSER)
with patch.object(scrape, 'open') as mock_open:
self.assertRaises(scrape.UnexpectedMarkupError, scrape.trs_from_page, test_soup)
mock_open.assert_called_once()
mock_open.return_value.__enter__.return_value.write.assert_called_once_with(test_soup.prettify())
logging.disable(logging.NOTSET)
@patch.object(scrape, 'soup_from_url')
@patch.object(scrape, 'trs_from_page')
@patch.object(scrape, 'extract_row_data')
@patch.object(scrape, 'ClientSession')
async def test_get_data_from_category(self, mock_session_cls, mock_extract_row_data,
mock_trs_from_page, mock_soup_from_url):
# We do not pass a session object into the tested function,
# so we mock the ClientSession class for the first two tests.
mock_session = MagicMock()
mock_session_cls.return_value = mock_session
mock_soup = MagicMock()
mock_soup_from_url.return_value = mock_soup
category = 'ßßß'
url = f'{scrape.BASE_URL}{category}'
# First test with no TRs returned, thus not entering the loop.
mock_trs = []
mock_trs_from_page.return_value = mock_trs
mock_extract_row_data.return_value = expected_output = []
output = await scrape.get_data_from_category(category)
self.assertListEqual(expected_output, output)
mock_soup_from_url.assert_called_once_with(url, mock_session)
mock_trs_from_page.assert_called_once_with(mock_soup)
mock_extract_row_data.assert_not_called()
mock_soup_from_url.reset_mock()
mock_trs_from_page.reset_mock()
# Second test with (fake) TRs returned, thus entering the loop.
# We pass a last_page argument to stop after the first iteration.
mock_trs = ['foo', 'bar']
mock_trs_from_page.return_value = mock_trs
mock_extract_row_data.return_value = expected_output = ['a', 'b']
# Factored out checks because the only difference in the next two tests is the presence or absence of a
# real session instance.
def check_assertions(test_output, session_obj):
self.assertListEqual(expected_output, test_output)
mock_soup_from_url.assert_has_calls([call(url, session_obj), call(f'{url}/{2}', session_obj)])
mock_trs_from_page.assert_has_calls([call(mock_soup), call(mock_soup)])
mock_extract_row_data.assert_called_once_with(*mock_trs)
output = await scrape.get_data_from_category(category, last_page=1)
check_assertions(output, mock_session)
mock_soup_from_url.reset_mock()
mock_trs_from_page.reset_mock()
mock_extract_row_data.reset_mock()
# Third test with (fake) TRs returned and explicitly passing a real session object.
async with scrape.ClientSession() as session:
output = await scrape.get_data_from_category(category, session, last_page=1)
check_assertions(output, session)
@patch.object(scrape, 'get_data_from_category')
@patch.object(scrape, 'ClientSession')
async def test_get_all_data(self, mock_session_cls, mock_get_data_from_category):
mock_session = MagicMock()
mock_session_cls.return_value.__aenter__.return_value = mock_session
mock_result = ['foo']
expected_output = len(scrape.CATEGORIES) * mock_result
mock_get_data_from_category.return_value = mock_result
output = await scrape.get_all_data(sequential=True)
self.assertListEqual(expected_output, output)
mock_get_data_from_category.assert_has_calls([
call(category, mock_session) for category in scrape.CATEGORIES
])
output = await scrape.get_all_data(sequential=False)
self.assertListEqual(expected_output, output)
mock_get_data_from_category.assert_has_calls([
call(category, mock_session) for category in scrape.CATEGORIES
])