Source code for qetch.extractors._common

# Copyright (c) 2018 Stephen Bunn (stephen@bunn.io)
# MIT License <https://opensource.org/licenses/MIT>

import re
import abc
from typing import (Any, List, Tuple, Match, Generator,)

from .. import (exceptions,)
from ..auth import (AuthRegistry, AuthTypes,)

import furl
import requests


[docs]class BaseExtractor(abc.ABC): """ The base extractor. `All extractors should extend this.` """ def __repr__(self): """ Returns a string representation of the extractor. Returns: str: The string representation of the instance. """ return (f'<{self.__class__.__name__} "{self.name}">') @abc.abstractproperty def name(self): raise NotImplementedError() @abc.abstractproperty def description(self): raise NotImplementedError() @abc.abstractproperty def domains(self): raise NotImplementedError() @abc.abstractproperty def handles(self): raise NotImplementedError() @abc.abstractproperty def authentication(self): raise NotImplementedError() @property def session(self): """ The default session for the extractor. Returns: requests.Session: The default session for the extractor. """ if not hasattr(self, '_session'): self._session = requests.Session() return self._session
[docs] @classmethod def get_handle(cls, url: str) -> Tuple[str, Match]: """ Gets the handle match for a given url. Args: url (str): The url to get the handle match for. Returns: tuple[str, Match]: A tuple of handle and the match for the url. """ for (handle_name, handle_pattern,) in cls.handles.items(): match = re.match(handle_pattern, url) if match:
return (handle_name, match,)
[docs] @classmethod def can_handle(cls, url: str): """ Determines if an extractor can handle a url. Args: url (str): The url to check Returns: bool: True if the extractor can handle, otherwise False """
return cls.get_handle(url) is not None
[docs] def authenticate(self, auth: Tuple[str, str]): """ Handles authenticating the extractor if necessary. Args: auth (tuple[str, str]): The authentication tuple is available. """
pass
[docs] def merge(self, ordered_filepaths: List[str]) -> str: """ Handles merging downloaded fragments into a resulting file. Args: ordered_filepaths (list[str]): The list of ordered filepaths to \ downloaded fragments. Returns: str: The resulting merged file's filepath. """
return (ordered_filepaths[0] if len(ordered_filepaths) > 0 else None)
[docs] def extract( self, url: str, auth: Tuple[str, str]=None ) -> Generator[List[Any], None, None]: """ Extracts lists of content from a url. Note: When an extractor can handle a url with a given ``{handle_name: regex}`` dictionary, the :func:`~qetch.extractors._common.BaseExtractor.extract` method assumes that a method ``handle_{handle_name}`` exists to handle that specific url. If an appropriately named method does not exist, a ``NotImplementedError`` is raised. Args: url (str): The url to extract content from. auth (tuple[str, str], optional): The auth tuple if available. Raises: NotImplementedError: If a given ``handle_{handle_name}`` method does not exist. Yields: list[Content]: A list of similar content of different qualities Examples: Basic usage where ``GFYCAT_ID`` is the id determined from ``GFYCAT_URL``. >>> from qetch.extractors import (GfycatExtractor,) >>> for content_list in GfycatExtractor().extract(GFYCAT_URL): ... for content in content_list: ... print(content) <Content (1.0) "gfycat-GFYCAT_ID-mp4Url"> <Content (0.5) "gfycat-GFYCAT_ID-webmUrl"> <Content (0.0) "gfycat-GFYCAT_ID-webpUrl"> <Content (0.0) "gfycat-GFYCAT_ID-mobileUrl"> <Content (0.0) "gfycat-GFYCAT_ID-mobilePosterUrl"> <Content (0.0) "gfycat-GFYCAT_ID-posterUrl"> <Content (0.0) "gfycat-GFYCAT_ID-thumb360Url"> <Content (0.0) "gfycat-GFYCAT_ID-thumb360PosterUrl"> <Content (0.0) "gfycat-GFYCAT_ID-thumb100PosterUrl"> <Content (0.0) "gfycat-GFYCAT_ID-max5mbGif"> <Content (0.0) "gfycat-GFYCAT_ID-max2mbGif"> <Content (0.0) "gfycat-GFYCAT_ID-mjpgUrl"> <Content (0.0) "gfycat-GFYCAT_ID-miniUrl"> <Content (0.0) "gfycat-GFYCAT_ID-miniPosterUrl"> <Content (0.25) "gfycat-GFYCAT_ID-gifUrl"> """ (handle_name, handle_match,) = self.get_handle(url) handle_method = f'handle_{handle_name}' if not hasattr(self, handle_method): raise NotImplementedError(( f"no handled method named {handle_method!r} is implemented " f"for {self!r}" )) if self.authentication != AuthTypes.NONE: if not isinstance(auth, tuple) or not len(auth) == 2: registry = AuthRegistry() if self.name not in registry: raise exceptions.AuthenticationError(( f"no valid authentication found for " f"{self!r}, received {auth!r} and no " f"registry entry for key {self.name!r}" )) auth = registry[self.name] del registry self.authenticate(auth) for content in getattr(self, handle_method)(url, handle_match):
yield content