Source code for qetch.extractors.imgur

# Copyright (c) 2018 Stephen Bunn (
# MIT License <>

import datetime
from typing import (Any, Tuple, List, Dict, Match, Generator,)

from .. import (exceptions,)
from ..auth import (AuthTypes,)
from ..content import (Content,)
from ._common import (BaseExtractor,)

import furl
import ujson

[docs]class ImgurExtractor(BaseExtractor): """ The extractor for links to media from ````. """ name = 'imgur' description = ('Dedicated image host originally built for Reddit.') authentication = AuthTypes.OAUTH domains = ['', ''] handles = { 'basic': ( r'^https?://(?:www\.)?imgur\.com/(?P<id>[a-zA-Z0-9]+)/?$' ), 'album': ( r'^https?://(?:www\.)?imgur\.com/' r'(?:a|gallery)/(?P<id>[a-zA-Z0-9]+)/?$' ), 'raw': ( r'^https?://(?:www\.)?(?:[a-z]\.)imgur\.com/' r'(?P<id>[a-zA-Z0-9]+)\..*$' ) } _api_base = '' _content_urls = ( 'mp4', 'gifv', 'link', ) _quality_map = { 'mp4': 1.0, 'gifv': 0.5, 'link': 0.0, } def _get_data( self, id: str, is_album: bool=False, is_raw: bool=False ) -> Dict[str, Any]: """ Gets API data for a specific imgur id. Args: id (str): The id of the imgur content to retrieve. is_album (bool, optional): If True, indicates that id is that of an album. is_raw (bool, optional): If True, indicates that id is that of some raw imgur link. Raises: exceptions.ExtractionError: When API call results in non 200 status Returns: dict[str,....]: API data dictionary response """ query_url = furl.furl(self._api_base).add( path=( f'{"/gallery" if is_raw else ""}' f'{"album" if is_album else "image"}/{id}' ) ) response = self.session.get(query_url.url) if response.status_code not in (200,): raise exceptions.ExtractionError(( f"error retrieving source for {query_url.url!r} " f"recieved status {response.status_code}" )) return ujson.loads(response.text).get('data')
[docs] def handle_basic( self, source: str, match: Match ) -> Generator[List[Content], None, None]: """ Handles ``basic`` links to imgur media. Args: source (str): The source url match (Match): The source match regex Yields: list[Content]: A list of various levels of quality content for \ the same source url """ data = self._get_data(match.groupdict()['id']) content_list = [] for url_type in self._content_urls: if url_type in data: content_list.append(Content( uid=f'{}-{data["id"]}-{url_type}', source=source, fragments=[data[url_type]], extractor=self, extension=data[url_type].split('.')[-1], title=data.get('title'), description=data.get('description'), quality=self._quality_map.get(url_type, 0.0), uploaded_by=data.get('account_id'), uploaded_date=datetime.datetime.fromtimestamp( int(data.get('datetime')) ), metadata=data ))
yield content_list
[docs] def handle_album( self, source: str, match: Match ) -> Generator[List[Content], None, None]: """ Handles ``album`` links to imgur media. Args: source (str): The source url match (Match): The source match regex Yields: list[Content]: A list of various levels of quality content for \ the same source url """ try: data = self._get_data(match.groupdict()['id'], is_album=True) for image in data.get('images', []): content_list = [] for url_type in self._content_urls: if url_type in image: content_list.append(Content( uid=f'{}-{image["id"]}-{url_type}', source=source, fragments=[image[url_type]], extractor=self, extension=image[url_type].split('.')[-1], title=data.get('title'), description=data.get('description'), quality=self._quality_map.get(url_type, 0.0), uploaded_by=data.get('account_id'), uploaded_date=datetime.datetime.fromtimestamp( int(data.get('datetime')) ), metadata=image )) yield content_list except exceptions.ExtractionError as exc: for content_list in self.handle_basic(source, match):
yield content_list
[docs] def handle_raw( self, source: str, match: Match ) -> Generator[List[Content], None, None]: """ Handles ``raw`` links to imgur media. Args: source (str): The source url match (Match): The source match regex Yields: list[Content]: A list of various levels of quality content for \ the same source url """ data = self._get_data(match.groupdict()['id'], is_raw=True) content_list = [] for url_type in self._content_urls: if url_type in data: content_list.append(Content( uid=f'{}-{data["id"]}-{url_type}', source=source, fragments=[data[url_type]], extractor=self, extension=data[url_type].split('.')[-1], title=data.get('title'), description=data.get('description'), quality=self._quality_map.get(url_type, 0.0), uploaded_by=data.get('account_id'), uploaded_date=datetime.datetime.fromtimestamp( int(data.get('timestamp')) ), meatdata=data ))
yield content_list
[docs] def authenticate(self, auth: Tuple[str, str]): """ Handles authenticating the extractor if necessary. Args: auth (tuple[str, str]): The authentication tuple is available. """ self.session.headers.update({ 'authorization': f'Client-ID {auth[0]}'