Compare commits
No commits in common. "master" and "29c82a4ff70e5985f1322e20a087a84484508fd6" have entirely different histories.
master
...
29c82a4ff7
10
.coveragerc
10
.coveragerc
@ -1,10 +0,0 @@
|
||||
[run]
|
||||
source = .
|
||||
branch = true
|
||||
omit =
|
||||
.venv/*
|
||||
|
||||
[report]
|
||||
fail_under = 100
|
||||
show_missing = True
|
||||
skip_covered = True
|
8
.gitignore
vendored
8
.gitignore
vendored
@ -1,10 +1,2 @@
|
||||
# Virtual environment
|
||||
/.venv/
|
||||
# PyCharm:
|
||||
/.idea/
|
||||
# Distribution / packaging:
|
||||
*.egg-info/
|
||||
# Python cache:
|
||||
__pycache__/
|
||||
# Tests:
|
||||
.coverage
|
@ -1,3 +0,0 @@
|
||||
#!/usr/bin/env sh
|
||||
|
||||
coverage erase && coverage run -m unittest discover && coverage report
|
@ -19,7 +19,7 @@ keywords = webscraping, html, markup, dom, scraper, attributes, tags, stocks, fi
|
||||
package_dir =
|
||||
= src
|
||||
packages = find:
|
||||
python_requires = >=3.8
|
||||
python_requires = >=3.7
|
||||
install_requires =
|
||||
beautifulsoup4
|
||||
aiohttp
|
||||
|
@ -1 +0,0 @@
|
||||
from .scrape import *
|
@ -5,10 +5,10 @@ import csv
|
||||
from argparse import ArgumentParser
|
||||
from pathlib import Path
|
||||
|
||||
from . import get_all_data, log
|
||||
from .scrape import get_all_data, log
|
||||
|
||||
|
||||
async def main() -> None:
|
||||
def main() -> None:
|
||||
parser = ArgumentParser(description="Scrape all stock symbols")
|
||||
parser.add_argument(
|
||||
'-v', '--verbose',
|
||||
@ -29,7 +29,7 @@ async def main() -> None:
|
||||
if args.verbose:
|
||||
log.setLevel(logging.DEBUG)
|
||||
|
||||
data = await get_all_data(args.sequential)
|
||||
data = asyncio.run(get_all_data(args.sequential))
|
||||
|
||||
if args.to_file is None:
|
||||
csv.writer(sys.stdout).writerows(data)
|
||||
@ -39,4 +39,4 @@ async def main() -> None:
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
asyncio.run(main())
|
||||
main()
|
||||
|
@ -25,7 +25,6 @@ DIGIT_CATEGORY = '0-9'
|
||||
OTHER_CATEGORY = 'Other'
|
||||
CATEGORIES = [DIGIT_CATEGORY] + list(ascii_uppercase) + [OTHER_CATEGORY]
|
||||
STOCK_SYMBOL_PATTERN = re.compile(r'\(([\w.&]+)\)')
|
||||
HTML_PARSER = 'html.parser'
|
||||
|
||||
|
||||
class UnexpectedMarkupError(Exception):
|
||||
@ -89,34 +88,16 @@ def get_str_from_td(td: Tag) -> str:
|
||||
return str(content).strip()
|
||||
|
||||
|
||||
async def soup_from_url(url: str, session: ClientSession = None) -> BeautifulSoup:
|
||||
"""
|
||||
Requests page and converts contents into a BeautifulSoup object.
|
||||
|
||||
Args:
|
||||
url:
|
||||
URL string leading to any page with matching content.
|
||||
session (optional):
|
||||
If passed a ClientSession instance, all HTTP requests will be made using that session;
|
||||
otherwise a new one is created.
|
||||
|
||||
Returns:
|
||||
A BeautifulSoup object for further data extraction
|
||||
"""
|
||||
if session is None:
|
||||
session = ClientSession()
|
||||
async with session.get(url) as response:
|
||||
html = await response.text()
|
||||
return BeautifulSoup(html, HTML_PARSER)
|
||||
|
||||
|
||||
def trs_from_page(soup: BeautifulSoup, limit: int = None) -> ResultSet:
|
||||
async def trs_from_page(url: str, session: ClientSession = None, limit: int = None) -> ResultSet:
|
||||
"""
|
||||
Returns the table rows found on the specified page.
|
||||
|
||||
Args:
|
||||
soup:
|
||||
Page text to be scoured for table rows.
|
||||
url:
|
||||
URL string leading to a page with matching content.
|
||||
session (optional):
|
||||
If passed a ClientSession instance, all HTTP requests will be made using that session;
|
||||
otherwise a new one is created.
|
||||
limit (optional):
|
||||
Stop looking after finding this many results;
|
||||
finds all matches by default.
|
||||
@ -128,13 +109,18 @@ def trs_from_page(soup: BeautifulSoup, limit: int = None) -> ResultSet:
|
||||
Returns:
|
||||
A ResultSet object containing all extracted 'tr' Tag objects
|
||||
"""
|
||||
if session is None:
|
||||
session = ClientSession()
|
||||
async with session.get(url) as response:
|
||||
html = await response.text()
|
||||
soup = BeautifulSoup(html, 'html.parser')
|
||||
try:
|
||||
return soup.find('div', {'id': 'marketsindex'}).table.tbody.find_all('tr', limit=limit)
|
||||
except AttributeError:
|
||||
log.error("Unexpected HTML markup!")
|
||||
file_name = f'unexpected_response_at_{datetime.now().strftime("%Y-%m-%d_%H-%M")}.html'
|
||||
with open(file_name, 'w') as f:
|
||||
f.write(soup.prettify())
|
||||
f.write(html)
|
||||
raise UnexpectedMarkupError
|
||||
|
||||
|
||||
@ -162,14 +148,12 @@ async def get_data_from_category(category: str, session: ClientSession = None,
|
||||
session = ClientSession()
|
||||
data: list[row_type] = []
|
||||
page = first_page
|
||||
soup = await soup_from_url(f'{BASE_URL}{category}', session)
|
||||
trs = trs_from_page(soup)
|
||||
trs = await trs_from_page(f'{BASE_URL}{category}', session)
|
||||
while page <= last_page and len(trs) > 0:
|
||||
data.extend(extract_row_data(*trs))
|
||||
log.info(f"Scraped '{category}' page {page}")
|
||||
page += 1
|
||||
soup = await soup_from_url(f'{BASE_URL}{category}/{page}', session)
|
||||
trs = trs_from_page(soup)
|
||||
trs = await trs_from_page(f'{BASE_URL}{category}/{page}', session)
|
||||
return data
|
||||
|
||||
|
||||
|
@ -1,164 +0,0 @@
|
||||
import logging
|
||||
from unittest import IsolatedAsyncioTestCase
|
||||
from unittest.mock import patch, MagicMock, AsyncMock, call
|
||||
|
||||
from bs4 import BeautifulSoup
|
||||
|
||||
from stocksymbolscraper import scrape
|
||||
|
||||
|
||||
class ScrapeTestCase(IsolatedAsyncioTestCase):
|
||||
|
||||
@patch.object(scrape, 'get_single_tr_data')
|
||||
def test_extract_row_data(self, mock_get_single_tr_data: MagicMock):
|
||||
foo = 'foo'
|
||||
mock_get_single_tr_data.return_value = foo
|
||||
input1, input2, input3 = MagicMock(), MagicMock(), MagicMock()
|
||||
# Although the function expects BS4 Tag objects as arguments, we substitute with Mocks here
|
||||
# because those arguments are immediately passed into another function, which we mock out anyway.
|
||||
output = scrape.extract_row_data(input1, input2, input3)
|
||||
expected_output = [foo, foo, foo]
|
||||
self.assertListEqual(expected_output, output)
|
||||
mock_get_single_tr_data.assert_has_calls([call(input1), call(input2), call(input3)])
|
||||
|
||||
@patch.object(scrape, 'get_str_from_td')
|
||||
def test_get_single_tr_data(self, mock_get_str_from_td: MagicMock):
|
||||
a, b, x = 'a', 'b', 'x'
|
||||
mock_get_str_from_td.return_value = x
|
||||
test_html = f'<tr> <td><a>{a}<small>({b})</small></a></td>' + \
|
||||
f'<td>foo</td> <td>bar</td> <td>baz</td> </tr>'
|
||||
test_tr = BeautifulSoup(test_html, scrape.HTML_PARSER).tr
|
||||
test_tds = test_tr.find_all('td')
|
||||
output = scrape.get_single_tr_data(test_tr)
|
||||
expected_output = (a, b, x, x, x)
|
||||
self.assertTupleEqual(expected_output, output)
|
||||
mock_get_str_from_td.assert_has_calls([call(test_tds[1]), call(test_tds[2]), call(test_tds[3])])
|
||||
|
||||
test_html = f'<tr> <td><a>{a}<small>***{b}***</small></a></td>' + \
|
||||
f'<td>foo</td> <td>bar</td> <td>baz</td> </tr>'
|
||||
test_tr = BeautifulSoup(test_html, scrape.HTML_PARSER).tr
|
||||
test_tds = test_tr.find_all('td')
|
||||
output = scrape.get_single_tr_data(test_tr)
|
||||
expected_output = (a, f'***{b}***', x, x, x)
|
||||
self.assertTupleEqual(expected_output, output)
|
||||
mock_get_str_from_td.assert_has_calls([call(test_tds[1]), call(test_tds[2]), call(test_tds[3])])
|
||||
|
||||
def test_get_str_from_td(self):
|
||||
expected_output = 'foo bar'
|
||||
test_td = BeautifulSoup(f'<td> {expected_output} </td>', scrape.HTML_PARSER).td
|
||||
output = scrape.get_str_from_td(test_td)
|
||||
self.assertEqual(expected_output, output)
|
||||
|
||||
expected_output = ''
|
||||
test_td = BeautifulSoup('<td></td>', scrape.HTML_PARSER).td
|
||||
output = scrape.get_str_from_td(test_td)
|
||||
self.assertEqual(expected_output, output)
|
||||
|
||||
@patch.object(scrape, 'ClientSession')
|
||||
async def test_soup_from_url(self, mock_session_cls):
|
||||
test_html = '<b>foo</b>'
|
||||
mock_response = MagicMock()
|
||||
mock_response.text = AsyncMock(return_value=test_html)
|
||||
mock_get_return = MagicMock()
|
||||
mock_get_return.__aenter__ = AsyncMock(return_value=mock_response)
|
||||
mock_session_obj = MagicMock()
|
||||
mock_session_obj.get = MagicMock(return_value=mock_get_return)
|
||||
mock_session_cls.return_value = mock_session_obj
|
||||
output = await scrape.soup_from_url('foo')
|
||||
expected_output = BeautifulSoup(test_html, scrape.HTML_PARSER)
|
||||
self.assertEqual(expected_output, output)
|
||||
|
||||
output = await scrape.soup_from_url('foo', mock_session_obj)
|
||||
self.assertEqual(expected_output, output)
|
||||
|
||||
def test_trs_from_page(self):
|
||||
tr1_text, tr2_text = '<tr>foo</tr>', '<tr>bar</tr>'
|
||||
test_html = f'<div id="marketsindex"><table><tbody>{tr1_text}{tr2_text}</tbody></table></div>'
|
||||
test_soup = BeautifulSoup(test_html, scrape.HTML_PARSER)
|
||||
output = scrape.trs_from_page(test_soup)
|
||||
expected_output = test_soup.find_all('tr')
|
||||
self.assertSequenceEqual(expected_output, output)
|
||||
|
||||
logging.disable(logging.CRITICAL)
|
||||
test_html = f'<div id="marketsindex"><table>garbage</table></div>'
|
||||
test_soup = BeautifulSoup(test_html, scrape.HTML_PARSER)
|
||||
with patch.object(scrape, 'open') as mock_open:
|
||||
self.assertRaises(scrape.UnexpectedMarkupError, scrape.trs_from_page, test_soup)
|
||||
mock_open.assert_called_once()
|
||||
mock_open.return_value.__enter__.return_value.write.assert_called_once_with(test_soup.prettify())
|
||||
logging.disable(logging.NOTSET)
|
||||
|
||||
@patch.object(scrape, 'soup_from_url')
|
||||
@patch.object(scrape, 'trs_from_page')
|
||||
@patch.object(scrape, 'extract_row_data')
|
||||
@patch.object(scrape, 'ClientSession')
|
||||
async def test_get_data_from_category(self, mock_session_cls, mock_extract_row_data,
|
||||
mock_trs_from_page, mock_soup_from_url):
|
||||
# We do not pass a session object into the tested function,
|
||||
# so we mock the ClientSession class for the first two tests.
|
||||
mock_session = MagicMock()
|
||||
mock_session_cls.return_value = mock_session
|
||||
mock_soup = MagicMock()
|
||||
mock_soup_from_url.return_value = mock_soup
|
||||
category = 'ßßß'
|
||||
url = f'{scrape.BASE_URL}{category}'
|
||||
|
||||
# First test with no TRs returned, thus not entering the loop.
|
||||
mock_trs = []
|
||||
mock_trs_from_page.return_value = mock_trs
|
||||
mock_extract_row_data.return_value = expected_output = []
|
||||
output = await scrape.get_data_from_category(category)
|
||||
self.assertListEqual(expected_output, output)
|
||||
mock_soup_from_url.assert_called_once_with(url, mock_session)
|
||||
mock_trs_from_page.assert_called_once_with(mock_soup)
|
||||
mock_extract_row_data.assert_not_called()
|
||||
|
||||
mock_soup_from_url.reset_mock()
|
||||
mock_trs_from_page.reset_mock()
|
||||
|
||||
# Second test with (fake) TRs returned, thus entering the loop.
|
||||
# We pass a last_page argument to stop after the first iteration.
|
||||
mock_trs = ['foo', 'bar']
|
||||
mock_trs_from_page.return_value = mock_trs
|
||||
mock_extract_row_data.return_value = expected_output = ['a', 'b']
|
||||
|
||||
# Factored out checks because the only difference in the next two tests is the presence or absence of a
|
||||
# real session instance.
|
||||
def check_assertions(test_output, session_obj):
|
||||
self.assertListEqual(expected_output, test_output)
|
||||
mock_soup_from_url.assert_has_calls([call(url, session_obj), call(f'{url}/{2}', session_obj)])
|
||||
mock_trs_from_page.assert_has_calls([call(mock_soup), call(mock_soup)])
|
||||
mock_extract_row_data.assert_called_once_with(*mock_trs)
|
||||
|
||||
output = await scrape.get_data_from_category(category, last_page=1)
|
||||
check_assertions(output, mock_session)
|
||||
|
||||
mock_soup_from_url.reset_mock()
|
||||
mock_trs_from_page.reset_mock()
|
||||
mock_extract_row_data.reset_mock()
|
||||
|
||||
# Third test with (fake) TRs returned and explicitly passing a real session object.
|
||||
async with scrape.ClientSession() as session:
|
||||
output = await scrape.get_data_from_category(category, session, last_page=1)
|
||||
check_assertions(output, session)
|
||||
|
||||
@patch.object(scrape, 'get_data_from_category')
|
||||
@patch.object(scrape, 'ClientSession')
|
||||
async def test_get_all_data(self, mock_session_cls, mock_get_data_from_category):
|
||||
mock_session = MagicMock()
|
||||
mock_session_cls.return_value.__aenter__.return_value = mock_session
|
||||
mock_result = ['foo']
|
||||
expected_output = len(scrape.CATEGORIES) * mock_result
|
||||
mock_get_data_from_category.return_value = mock_result
|
||||
|
||||
output = await scrape.get_all_data(sequential=True)
|
||||
self.assertListEqual(expected_output, output)
|
||||
mock_get_data_from_category.assert_has_calls([
|
||||
call(category, mock_session) for category in scrape.CATEGORIES
|
||||
])
|
||||
|
||||
output = await scrape.get_all_data(sequential=False)
|
||||
self.assertListEqual(expected_output, output)
|
||||
mock_get_data_from_category.assert_has_calls([
|
||||
call(category, mock_session) for category in scrape.CATEGORIES
|
||||
])
|
Loading…
Reference in New Issue
Block a user