Source code for qetch.downloaders.http

# Copyright (c) 2018 Stephen Bunn (stephen@bunn.io)
# MIT License <https://opensource.org/licenses/MIT>

from concurrent.futures import (ThreadPoolExecutor,)

from ..content import (Content,)
from ._common import (BaseDownloader, DownloadState,)

import requests
import blinker


[docs]class HTTPDownloader(BaseDownloader): """ The downloader for HTTP served content. """ @property def session(self) -> requests.Session: """ requests.Session: The requests session to use for downloading. """ if not hasattr(self, '_session'): self._session = requests.Session() return self._session @session.setter def session(self, session: requests.Session): if isinstance(session, requests.Session): self._session = session
[docs] @classmethod def can_handle(cls, content: Content) -> bool: """ Determines if a given content can be handled by this downloader. Args: content (Content): The content the check. Returns: bool: True if the content can be handled, otherwise False. """ return all( requests.head(fragment).status_code == 200 for fragment in content.fragments
)
[docs] def handle_chunk( self, download_id: str, url: str, to_path: str, start: int, end: int, chunk_size: int=1024 ): """ Handles downloading a specific range of bytes for a url. Args: download_id (str): The unique id of the download request. url (str): The url to download. to_path (str): The local path to save the download. start (int): The starting byte position to download. end (int): The ending byte position to download. chunk_size (int, optional): The size of the chunks to stream in. """ with open(to_path, 'wb') as file_: file_.seek(start) with self.session.get( url, headers={'range': f'bytes={start}-{end}'}, stream=True ) as request_stream: for segment in request_stream.iter_content( chunk_size=chunk_size ): if self.download_state[download_id] == \ DownloadState.STOPPED: return file_.write(segment) if download_id not in self.progress_store: self.progress_store[download_id] = 0
self.progress_store[download_id] += len(segment)
[docs] def handle_download( self, download_id: str, url: str, to_path: str, max_connections: int=8 ): """ Handles downloading a specific url. Note: ``max_connections`` defaults to 8 because many content hosting \ sites will typically flag/ban IPs that use over 10 connections. Args: download_id (str): The unique id of the download request. url (str): The url to download. to_path (str): The local path to save the download. max_connections (int, optional): The number of allowed \ connections for parallel downloading of the url. """ self.download_state[download_id] = DownloadState.PREPARING headers = self.session.head(url).headers content_length = int(headers['Content-Length']) # preallocate file with content size with open(to_path, 'wb') as file_: file_.seek(content_length - 1) file_.write(b'\x00') if headers.get('Accept-Ranges').lower() != 'bytes': max_connections = 1 chunk_futures = [] # start thread pool for chunks url with max connections with ThreadPoolExecutor(max_workers=max_connections) as executor: for (start, end,) in \ self._calc_ranges(content_length, max_connections): if self.download_state[download_id] != DownloadState.RUNNING: self.download_state[download_id] = DownloadState.RUNNING chunk_futures.append(executor.submit( self.handle_chunk, *(download_id, url, to_path, start, end) )) [future.result() for future in chunk_futures]
return to_path