Compare commits
No commits in common. "master" and "29c82a4ff70e5985f1322e20a087a84484508fd6" have entirely different histories.
master
...
29c82a4ff7
10
.coveragerc
10
.coveragerc
@ -1,10 +0,0 @@
|
|||||||
[run]
|
|
||||||
source = .
|
|
||||||
branch = true
|
|
||||||
omit =
|
|
||||||
.venv/*
|
|
||||||
|
|
||||||
[report]
|
|
||||||
fail_under = 100
|
|
||||||
show_missing = True
|
|
||||||
skip_covered = True
|
|
8
.gitignore
vendored
8
.gitignore
vendored
@ -1,10 +1,2 @@
|
|||||||
# Virtual environment
|
|
||||||
/.venv/
|
/.venv/
|
||||||
# PyCharm:
|
|
||||||
/.idea/
|
/.idea/
|
||||||
# Distribution / packaging:
|
|
||||||
*.egg-info/
|
|
||||||
# Python cache:
|
|
||||||
__pycache__/
|
|
||||||
# Tests:
|
|
||||||
.coverage
|
|
@ -1,3 +0,0 @@
|
|||||||
#!/usr/bin/env sh
|
|
||||||
|
|
||||||
coverage erase && coverage run -m unittest discover && coverage report
|
|
@ -19,7 +19,7 @@ keywords = webscraping, html, markup, dom, scraper, attributes, tags, stocks, fi
|
|||||||
package_dir =
|
package_dir =
|
||||||
= src
|
= src
|
||||||
packages = find:
|
packages = find:
|
||||||
python_requires = >=3.8
|
python_requires = >=3.7
|
||||||
install_requires =
|
install_requires =
|
||||||
beautifulsoup4
|
beautifulsoup4
|
||||||
aiohttp
|
aiohttp
|
||||||
|
@ -1 +0,0 @@
|
|||||||
from .scrape import *
|
|
@ -5,10 +5,10 @@ import csv
|
|||||||
from argparse import ArgumentParser
|
from argparse import ArgumentParser
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
from . import get_all_data, log
|
from .scrape import get_all_data, log
|
||||||
|
|
||||||
|
|
||||||
async def main() -> None:
|
def main() -> None:
|
||||||
parser = ArgumentParser(description="Scrape all stock symbols")
|
parser = ArgumentParser(description="Scrape all stock symbols")
|
||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
'-v', '--verbose',
|
'-v', '--verbose',
|
||||||
@ -29,7 +29,7 @@ async def main() -> None:
|
|||||||
if args.verbose:
|
if args.verbose:
|
||||||
log.setLevel(logging.DEBUG)
|
log.setLevel(logging.DEBUG)
|
||||||
|
|
||||||
data = await get_all_data(args.sequential)
|
data = asyncio.run(get_all_data(args.sequential))
|
||||||
|
|
||||||
if args.to_file is None:
|
if args.to_file is None:
|
||||||
csv.writer(sys.stdout).writerows(data)
|
csv.writer(sys.stdout).writerows(data)
|
||||||
@ -39,4 +39,4 @@ async def main() -> None:
|
|||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
asyncio.run(main())
|
main()
|
||||||
|
@ -25,7 +25,6 @@ DIGIT_CATEGORY = '0-9'
|
|||||||
OTHER_CATEGORY = 'Other'
|
OTHER_CATEGORY = 'Other'
|
||||||
CATEGORIES = [DIGIT_CATEGORY] + list(ascii_uppercase) + [OTHER_CATEGORY]
|
CATEGORIES = [DIGIT_CATEGORY] + list(ascii_uppercase) + [OTHER_CATEGORY]
|
||||||
STOCK_SYMBOL_PATTERN = re.compile(r'\(([\w.&]+)\)')
|
STOCK_SYMBOL_PATTERN = re.compile(r'\(([\w.&]+)\)')
|
||||||
HTML_PARSER = 'html.parser'
|
|
||||||
|
|
||||||
|
|
||||||
class UnexpectedMarkupError(Exception):
|
class UnexpectedMarkupError(Exception):
|
||||||
@ -89,34 +88,16 @@ def get_str_from_td(td: Tag) -> str:
|
|||||||
return str(content).strip()
|
return str(content).strip()
|
||||||
|
|
||||||
|
|
||||||
async def soup_from_url(url: str, session: ClientSession = None) -> BeautifulSoup:
|
async def trs_from_page(url: str, session: ClientSession = None, limit: int = None) -> ResultSet:
|
||||||
"""
|
|
||||||
Requests page and converts contents into a BeautifulSoup object.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
url:
|
|
||||||
URL string leading to any page with matching content.
|
|
||||||
session (optional):
|
|
||||||
If passed a ClientSession instance, all HTTP requests will be made using that session;
|
|
||||||
otherwise a new one is created.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
A BeautifulSoup object for further data extraction
|
|
||||||
"""
|
|
||||||
if session is None:
|
|
||||||
session = ClientSession()
|
|
||||||
async with session.get(url) as response:
|
|
||||||
html = await response.text()
|
|
||||||
return BeautifulSoup(html, HTML_PARSER)
|
|
||||||
|
|
||||||
|
|
||||||
def trs_from_page(soup: BeautifulSoup, limit: int = None) -> ResultSet:
|
|
||||||
"""
|
"""
|
||||||
Returns the table rows found on the specified page.
|
Returns the table rows found on the specified page.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
soup:
|
url:
|
||||||
Page text to be scoured for table rows.
|
URL string leading to a page with matching content.
|
||||||
|
session (optional):
|
||||||
|
If passed a ClientSession instance, all HTTP requests will be made using that session;
|
||||||
|
otherwise a new one is created.
|
||||||
limit (optional):
|
limit (optional):
|
||||||
Stop looking after finding this many results;
|
Stop looking after finding this many results;
|
||||||
finds all matches by default.
|
finds all matches by default.
|
||||||
@ -128,13 +109,18 @@ def trs_from_page(soup: BeautifulSoup, limit: int = None) -> ResultSet:
|
|||||||
Returns:
|
Returns:
|
||||||
A ResultSet object containing all extracted 'tr' Tag objects
|
A ResultSet object containing all extracted 'tr' Tag objects
|
||||||
"""
|
"""
|
||||||
|
if session is None:
|
||||||
|
session = ClientSession()
|
||||||
|
async with session.get(url) as response:
|
||||||
|
html = await response.text()
|
||||||
|
soup = BeautifulSoup(html, 'html.parser')
|
||||||
try:
|
try:
|
||||||
return soup.find('div', {'id': 'marketsindex'}).table.tbody.find_all('tr', limit=limit)
|
return soup.find('div', {'id': 'marketsindex'}).table.tbody.find_all('tr', limit=limit)
|
||||||
except AttributeError:
|
except AttributeError:
|
||||||
log.error("Unexpected HTML markup!")
|
log.error("Unexpected HTML markup!")
|
||||||
file_name = f'unexpected_response_at_{datetime.now().strftime("%Y-%m-%d_%H-%M")}.html'
|
file_name = f'unexpected_response_at_{datetime.now().strftime("%Y-%m-%d_%H-%M")}.html'
|
||||||
with open(file_name, 'w') as f:
|
with open(file_name, 'w') as f:
|
||||||
f.write(soup.prettify())
|
f.write(html)
|
||||||
raise UnexpectedMarkupError
|
raise UnexpectedMarkupError
|
||||||
|
|
||||||
|
|
||||||
@ -162,14 +148,12 @@ async def get_data_from_category(category: str, session: ClientSession = None,
|
|||||||
session = ClientSession()
|
session = ClientSession()
|
||||||
data: list[row_type] = []
|
data: list[row_type] = []
|
||||||
page = first_page
|
page = first_page
|
||||||
soup = await soup_from_url(f'{BASE_URL}{category}', session)
|
trs = await trs_from_page(f'{BASE_URL}{category}', session)
|
||||||
trs = trs_from_page(soup)
|
|
||||||
while page <= last_page and len(trs) > 0:
|
while page <= last_page and len(trs) > 0:
|
||||||
data.extend(extract_row_data(*trs))
|
data.extend(extract_row_data(*trs))
|
||||||
log.info(f"Scraped '{category}' page {page}")
|
log.info(f"Scraped '{category}' page {page}")
|
||||||
page += 1
|
page += 1
|
||||||
soup = await soup_from_url(f'{BASE_URL}{category}/{page}', session)
|
trs = await trs_from_page(f'{BASE_URL}{category}/{page}', session)
|
||||||
trs = trs_from_page(soup)
|
|
||||||
return data
|
return data
|
||||||
|
|
||||||
|
|
||||||
|
@ -1,164 +0,0 @@
|
|||||||
import logging
|
|
||||||
from unittest import IsolatedAsyncioTestCase
|
|
||||||
from unittest.mock import patch, MagicMock, AsyncMock, call
|
|
||||||
|
|
||||||
from bs4 import BeautifulSoup
|
|
||||||
|
|
||||||
from stocksymbolscraper import scrape
|
|
||||||
|
|
||||||
|
|
||||||
class ScrapeTestCase(IsolatedAsyncioTestCase):
|
|
||||||
|
|
||||||
@patch.object(scrape, 'get_single_tr_data')
|
|
||||||
def test_extract_row_data(self, mock_get_single_tr_data: MagicMock):
|
|
||||||
foo = 'foo'
|
|
||||||
mock_get_single_tr_data.return_value = foo
|
|
||||||
input1, input2, input3 = MagicMock(), MagicMock(), MagicMock()
|
|
||||||
# Although the function expects BS4 Tag objects as arguments, we substitute with Mocks here
|
|
||||||
# because those arguments are immediately passed into another function, which we mock out anyway.
|
|
||||||
output = scrape.extract_row_data(input1, input2, input3)
|
|
||||||
expected_output = [foo, foo, foo]
|
|
||||||
self.assertListEqual(expected_output, output)
|
|
||||||
mock_get_single_tr_data.assert_has_calls([call(input1), call(input2), call(input3)])
|
|
||||||
|
|
||||||
@patch.object(scrape, 'get_str_from_td')
|
|
||||||
def test_get_single_tr_data(self, mock_get_str_from_td: MagicMock):
|
|
||||||
a, b, x = 'a', 'b', 'x'
|
|
||||||
mock_get_str_from_td.return_value = x
|
|
||||||
test_html = f'<tr> <td><a>{a}<small>({b})</small></a></td>' + \
|
|
||||||
f'<td>foo</td> <td>bar</td> <td>baz</td> </tr>'
|
|
||||||
test_tr = BeautifulSoup(test_html, scrape.HTML_PARSER).tr
|
|
||||||
test_tds = test_tr.find_all('td')
|
|
||||||
output = scrape.get_single_tr_data(test_tr)
|
|
||||||
expected_output = (a, b, x, x, x)
|
|
||||||
self.assertTupleEqual(expected_output, output)
|
|
||||||
mock_get_str_from_td.assert_has_calls([call(test_tds[1]), call(test_tds[2]), call(test_tds[3])])
|
|
||||||
|
|
||||||
test_html = f'<tr> <td><a>{a}<small>***{b}***</small></a></td>' + \
|
|
||||||
f'<td>foo</td> <td>bar</td> <td>baz</td> </tr>'
|
|
||||||
test_tr = BeautifulSoup(test_html, scrape.HTML_PARSER).tr
|
|
||||||
test_tds = test_tr.find_all('td')
|
|
||||||
output = scrape.get_single_tr_data(test_tr)
|
|
||||||
expected_output = (a, f'***{b}***', x, x, x)
|
|
||||||
self.assertTupleEqual(expected_output, output)
|
|
||||||
mock_get_str_from_td.assert_has_calls([call(test_tds[1]), call(test_tds[2]), call(test_tds[3])])
|
|
||||||
|
|
||||||
def test_get_str_from_td(self):
|
|
||||||
expected_output = 'foo bar'
|
|
||||||
test_td = BeautifulSoup(f'<td> {expected_output} </td>', scrape.HTML_PARSER).td
|
|
||||||
output = scrape.get_str_from_td(test_td)
|
|
||||||
self.assertEqual(expected_output, output)
|
|
||||||
|
|
||||||
expected_output = ''
|
|
||||||
test_td = BeautifulSoup('<td></td>', scrape.HTML_PARSER).td
|
|
||||||
output = scrape.get_str_from_td(test_td)
|
|
||||||
self.assertEqual(expected_output, output)
|
|
||||||
|
|
||||||
@patch.object(scrape, 'ClientSession')
|
|
||||||
async def test_soup_from_url(self, mock_session_cls):
|
|
||||||
test_html = '<b>foo</b>'
|
|
||||||
mock_response = MagicMock()
|
|
||||||
mock_response.text = AsyncMock(return_value=test_html)
|
|
||||||
mock_get_return = MagicMock()
|
|
||||||
mock_get_return.__aenter__ = AsyncMock(return_value=mock_response)
|
|
||||||
mock_session_obj = MagicMock()
|
|
||||||
mock_session_obj.get = MagicMock(return_value=mock_get_return)
|
|
||||||
mock_session_cls.return_value = mock_session_obj
|
|
||||||
output = await scrape.soup_from_url('foo')
|
|
||||||
expected_output = BeautifulSoup(test_html, scrape.HTML_PARSER)
|
|
||||||
self.assertEqual(expected_output, output)
|
|
||||||
|
|
||||||
output = await scrape.soup_from_url('foo', mock_session_obj)
|
|
||||||
self.assertEqual(expected_output, output)
|
|
||||||
|
|
||||||
def test_trs_from_page(self):
|
|
||||||
tr1_text, tr2_text = '<tr>foo</tr>', '<tr>bar</tr>'
|
|
||||||
test_html = f'<div id="marketsindex"><table><tbody>{tr1_text}{tr2_text}</tbody></table></div>'
|
|
||||||
test_soup = BeautifulSoup(test_html, scrape.HTML_PARSER)
|
|
||||||
output = scrape.trs_from_page(test_soup)
|
|
||||||
expected_output = test_soup.find_all('tr')
|
|
||||||
self.assertSequenceEqual(expected_output, output)
|
|
||||||
|
|
||||||
logging.disable(logging.CRITICAL)
|
|
||||||
test_html = f'<div id="marketsindex"><table>garbage</table></div>'
|
|
||||||
test_soup = BeautifulSoup(test_html, scrape.HTML_PARSER)
|
|
||||||
with patch.object(scrape, 'open') as mock_open:
|
|
||||||
self.assertRaises(scrape.UnexpectedMarkupError, scrape.trs_from_page, test_soup)
|
|
||||||
mock_open.assert_called_once()
|
|
||||||
mock_open.return_value.__enter__.return_value.write.assert_called_once_with(test_soup.prettify())
|
|
||||||
logging.disable(logging.NOTSET)
|
|
||||||
|
|
||||||
@patch.object(scrape, 'soup_from_url')
|
|
||||||
@patch.object(scrape, 'trs_from_page')
|
|
||||||
@patch.object(scrape, 'extract_row_data')
|
|
||||||
@patch.object(scrape, 'ClientSession')
|
|
||||||
async def test_get_data_from_category(self, mock_session_cls, mock_extract_row_data,
|
|
||||||
mock_trs_from_page, mock_soup_from_url):
|
|
||||||
# We do not pass a session object into the tested function,
|
|
||||||
# so we mock the ClientSession class for the first two tests.
|
|
||||||
mock_session = MagicMock()
|
|
||||||
mock_session_cls.return_value = mock_session
|
|
||||||
mock_soup = MagicMock()
|
|
||||||
mock_soup_from_url.return_value = mock_soup
|
|
||||||
category = 'ßßß'
|
|
||||||
url = f'{scrape.BASE_URL}{category}'
|
|
||||||
|
|
||||||
# First test with no TRs returned, thus not entering the loop.
|
|
||||||
mock_trs = []
|
|
||||||
mock_trs_from_page.return_value = mock_trs
|
|
||||||
mock_extract_row_data.return_value = expected_output = []
|
|
||||||
output = await scrape.get_data_from_category(category)
|
|
||||||
self.assertListEqual(expected_output, output)
|
|
||||||
mock_soup_from_url.assert_called_once_with(url, mock_session)
|
|
||||||
mock_trs_from_page.assert_called_once_with(mock_soup)
|
|
||||||
mock_extract_row_data.assert_not_called()
|
|
||||||
|
|
||||||
mock_soup_from_url.reset_mock()
|
|
||||||
mock_trs_from_page.reset_mock()
|
|
||||||
|
|
||||||
# Second test with (fake) TRs returned, thus entering the loop.
|
|
||||||
# We pass a last_page argument to stop after the first iteration.
|
|
||||||
mock_trs = ['foo', 'bar']
|
|
||||||
mock_trs_from_page.return_value = mock_trs
|
|
||||||
mock_extract_row_data.return_value = expected_output = ['a', 'b']
|
|
||||||
|
|
||||||
# Factored out checks because the only difference in the next two tests is the presence or absence of a
|
|
||||||
# real session instance.
|
|
||||||
def check_assertions(test_output, session_obj):
|
|
||||||
self.assertListEqual(expected_output, test_output)
|
|
||||||
mock_soup_from_url.assert_has_calls([call(url, session_obj), call(f'{url}/{2}', session_obj)])
|
|
||||||
mock_trs_from_page.assert_has_calls([call(mock_soup), call(mock_soup)])
|
|
||||||
mock_extract_row_data.assert_called_once_with(*mock_trs)
|
|
||||||
|
|
||||||
output = await scrape.get_data_from_category(category, last_page=1)
|
|
||||||
check_assertions(output, mock_session)
|
|
||||||
|
|
||||||
mock_soup_from_url.reset_mock()
|
|
||||||
mock_trs_from_page.reset_mock()
|
|
||||||
mock_extract_row_data.reset_mock()
|
|
||||||
|
|
||||||
# Third test with (fake) TRs returned and explicitly passing a real session object.
|
|
||||||
async with scrape.ClientSession() as session:
|
|
||||||
output = await scrape.get_data_from_category(category, session, last_page=1)
|
|
||||||
check_assertions(output, session)
|
|
||||||
|
|
||||||
@patch.object(scrape, 'get_data_from_category')
|
|
||||||
@patch.object(scrape, 'ClientSession')
|
|
||||||
async def test_get_all_data(self, mock_session_cls, mock_get_data_from_category):
|
|
||||||
mock_session = MagicMock()
|
|
||||||
mock_session_cls.return_value.__aenter__.return_value = mock_session
|
|
||||||
mock_result = ['foo']
|
|
||||||
expected_output = len(scrape.CATEGORIES) * mock_result
|
|
||||||
mock_get_data_from_category.return_value = mock_result
|
|
||||||
|
|
||||||
output = await scrape.get_all_data(sequential=True)
|
|
||||||
self.assertListEqual(expected_output, output)
|
|
||||||
mock_get_data_from_category.assert_has_calls([
|
|
||||||
call(category, mock_session) for category in scrape.CATEGORIES
|
|
||||||
])
|
|
||||||
|
|
||||||
output = await scrape.get_all_data(sequential=False)
|
|
||||||
self.assertListEqual(expected_output, output)
|
|
||||||
mock_get_data_from_category.assert_has_calls([
|
|
||||||
call(category, mock_session) for category in scrape.CATEGORIES
|
|
||||||
])
|
|
Loading…
x
Reference in New Issue
Block a user