175 lines
5.3 KiB
Python
175 lines
5.3 KiB
Python
# -*- coding: utf-8 -*-
|
|
from __future__ import annotations
|
|
|
|
import abc
|
|
import logging
|
|
|
|
from pyquery import PyQuery
|
|
import requests
|
|
|
|
from hotpocket_backend._meta import version as backend_version
|
|
from hotpocket_backend.apps.bot.conf import bot_settings
|
|
from hotpocket_backend.apps.bot.dto.strategy import FetchResult
|
|
from hotpocket_common.url import URL
|
|
from hotpocket_soa.dto import BotResultOut
|
|
|
|
LOGGER = logging.getLogger(__name__)
|
|
|
|
|
|
class Strategy(abc.ABC):
|
|
class StrategyError(Exception):
|
|
pass
|
|
|
|
class FetchError(StrategyError):
|
|
pass
|
|
|
|
class RuntimeError(StrategyError):
|
|
pass
|
|
|
|
USER_AGENT = (
|
|
'Mozilla/5.0 '
|
|
'('
|
|
'compatible; '
|
|
f'BTHLabsHotPocketBot/{backend_version}; '
|
|
'+https://hotpocket.app/bot.txt'
|
|
')'
|
|
)
|
|
TITLE_TAG_SELECTORS = [
|
|
'head > meta[name=title]',
|
|
'head > meta[property="og:title"]',
|
|
'head > title',
|
|
]
|
|
DESCRIPTION_TAG_SELECTORS = [
|
|
'head > meta[property="og:description"]',
|
|
'head > meta[name=description]',
|
|
]
|
|
|
|
def __init__(self, url: str):
|
|
super().__init__()
|
|
self.url = url
|
|
self.parsed_url = URL(self.url)
|
|
|
|
self.logger = self.get_logger()
|
|
|
|
def get_logger(self) -> logging.Logger:
|
|
return LOGGER.getChild(self.__class__.__name__)
|
|
|
|
def is_netloc_banned(self) -> bool:
|
|
result = False
|
|
|
|
for banned_netloc in bot_settings.BANNED_HOSTNAMES:
|
|
hostname = self.parsed_url.hostname
|
|
if hostname is not None and hostname.endswith(banned_netloc) is True:
|
|
result = True
|
|
break
|
|
|
|
return result
|
|
|
|
def fetch(self, url: str) -> FetchResult:
|
|
try:
|
|
response = requests.request(
|
|
'GET',
|
|
url,
|
|
headers={
|
|
'User-Agent': self.USER_AGENT,
|
|
},
|
|
)
|
|
response.raise_for_status()
|
|
|
|
return FetchResult.model_validate(dict(
|
|
status_code=response.status_code,
|
|
content=response.content,
|
|
content_type=response.headers.get('Content-Type', None),
|
|
encoding=response.encoding or response.apparent_encoding,
|
|
))
|
|
except Exception as exception:
|
|
self.logger.error(
|
|
'Fetch error: %s', exception, exc_info=True,
|
|
)
|
|
raise self.FetchError() from exception
|
|
|
|
def extract_title_and_description_from_html(self, content: str) -> tuple[str | None, str | None]:
|
|
dom = PyQuery(content)
|
|
|
|
title: str | None = None
|
|
description: str | None = None
|
|
|
|
for selector in self.TITLE_TAG_SELECTORS:
|
|
title_tags = dom.find(selector)
|
|
if len(title_tags) > 0:
|
|
title_tag = PyQuery(title_tags[0])
|
|
if title_tag.is_('meta'):
|
|
title = title_tag.attr('content')
|
|
else:
|
|
title = title_tag.text()
|
|
|
|
break
|
|
|
|
for selector in self.DESCRIPTION_TAG_SELECTORS:
|
|
description_tags = dom.find(selector)
|
|
if len(description_tags) > 0:
|
|
description = PyQuery(description_tags[0]).attr('content')
|
|
|
|
break
|
|
|
|
if description is None:
|
|
try:
|
|
description = PyQuery(dom.find('p')[0]).text()
|
|
except IndexError:
|
|
pass
|
|
|
|
return (
|
|
title.strip() or None
|
|
if title is not None
|
|
else None,
|
|
description.strip() or None
|
|
if description is not None
|
|
else None,
|
|
)
|
|
|
|
def run(self) -> BotResultOut:
|
|
result = BotResultOut.model_validate(dict(
|
|
title=None,
|
|
description=None,
|
|
is_netloc_banned=False,
|
|
))
|
|
|
|
result.is_netloc_banned = self.is_netloc_banned()
|
|
|
|
if result.is_netloc_banned is False:
|
|
fetch_result = self.fetch(self.url)
|
|
|
|
try:
|
|
assert fetch_result.content is not None, (
|
|
'Received empty content'
|
|
)
|
|
assert fetch_result.content_type is not None, (
|
|
'Unable to determine the content type'
|
|
)
|
|
assert fetch_result.content_type.startswith('text/html') is True, (
|
|
f'Unsupported content type: `{fetch_result.content_type}`'
|
|
)
|
|
except AssertionError as exception:
|
|
self.logger.error(
|
|
'Unprocessable fetch result: %s', exception, exc_info=exception,
|
|
)
|
|
raise self.RuntimeError(exception.args[0]) from exception
|
|
|
|
try:
|
|
decoded_content = fetch_result.content.decode(fetch_result.encoding)
|
|
|
|
title, description = self.extract_title_and_description_from_html(
|
|
decoded_content,
|
|
)
|
|
result.title = title
|
|
result.description = description
|
|
except Exception as exception:
|
|
self.logger.error(
|
|
'Processing error: %s', exception, exc_info=exception,
|
|
)
|
|
raise self.RuntimeError() from exception
|
|
else:
|
|
self.logger.debug('Skipping banned netloc: url=`%s`', self.url)
|
|
|
|
return result
|