Source code for megfile.http_path

import io
import time
from functools import partial
from io import BufferedReader
from logging import getLogger as get_logger
from typing import Iterable, Iterator, Optional, Tuple, Union

import requests

from megfile.config import DEFAULT_BLOCK_SIZE
from megfile.errors import http_should_retry, patch_method, translate_http_error
from megfile.interfaces import PathLike, StatResult, URIPath
from megfile.lib.compat import fspath
from megfile.lib.http_prefetch_reader import HttpPrefetchReader
from megfile.lib.s3_buffered_writer import DEFAULT_MAX_BUFFER_SIZE
from megfile.lib.url import get_url_scheme
from megfile.pathlike import PathLike
from megfile.smart_path import SmartPath
from megfile.utils import binary_open

__all__ = [
    'HttpPath',
    'HttpsPath',
    'get_http_session',
    'is_http',
    'http_open',
]

_logger = get_logger(__name__)
max_retries = 10


[docs]def get_http_session( timeout: Union[int, Tuple[int, int]] = (9, 60), status_forcelist: Iterable[int] = (500, 502, 503, 504) ) -> requests.Session: session = requests.Session() def after_callback(response, *args, **kwargs): if response.status_code in status_forcelist: response.raise_for_status() return response def before_callback(method, url, **kwargs): _logger.debug( 'send http request: %s %r, with parameters: %s', method, url, kwargs) def retry_callback( error, method, url, params=None, data=None, headers=None, cookies=None, files=None, auth=None, timeout=None, allow_redirects=True, proxies=None, hooks=None, stream=None, verify=None, cert=None, json=None, **kwargs, ): if data and hasattr(data, 'seek'): data.seek(0) elif isinstance(data, Iterator): _logger.warning(f'Can not retry http request with iterator data') raise if files: def seek_or_reopen(file_object): if isinstance(file_object, (str, bytes)): return file_object elif hasattr(file_object, 'seek'): file_object.seek(0) return file_object elif hasattr(file_object, 'name'): with SmartPath(file_object.name).open('rb') as f: return io.BytesIO(f.read()) else: _logger.warning( f'Can not retry http request, because the file object is not seekable and unsupport "name"' ) raise for key, file_info in files.items(): if hasattr(file_info, 'seek'): file_info.seek(0) elif isinstance(file_info, (tuple, list)) and len(file_info) >= 2: file_info = list(file_info) if isinstance(file_info[1], (tuple, list)) and len(file_info[1]) >= 2: file_info[1] = list(file_info[1]) file_info[1] = seek_or_reopen(file_info[1]) else: file_info[1] = seek_or_reopen(file_info[1]) files[key] = file_info session.request = patch_method( partial(session.request, timeout=timeout), max_retries=max_retries, should_retry=http_should_retry, before_callback=before_callback, after_callback=after_callback, retry_callback=retry_callback, ) return session
[docs]def is_http(path: PathLike) -> bool: '''http scheme definition: http(s)://domain/path :param path: Path to be tested :returns: True if path is http url, else False ''' path = fspath(path) if not isinstance(path, str) or not (path.startswith('http://') or path.startswith('https://')): return False scheme = get_url_scheme(path) return scheme == 'http' or scheme == 'https'
[docs]def http_open( path: PathLike, mode: str = 'rb', *, encoding: Optional[str] = None, errors: Optional[str] = None, max_concurrency: Optional[int] = None, max_buffer_size: int = DEFAULT_MAX_BUFFER_SIZE, forward_ratio: Optional[float] = None, block_size: int = DEFAULT_BLOCK_SIZE, **kwargs) -> Union[BufferedReader, HttpPrefetchReader]: '''Open a BytesIO to read binary data of given http(s) url .. note :: Essentially, it reads data of http(s) url to memory by requests, and then return BytesIO to user. :param path: Given path :param mode: Only supports 'rb' mode now :param encoding: encoding is the name of the encoding used to decode or encode the file. This should only be used in text mode. :param errors: errors is an optional string that specifies how encoding and decoding errors are to be handled—this cannot be used in binary mode. :param max_concurrency: Max download thread number, None by default :param max_buffer_size: Max cached buffer size in memory, 128MB by default :param block_size: Size of single block, 8MB by default. Each block will be uploaded or downloaded by single thread. :return: BytesIO initialized with http(s) data ''' return HttpPath(path).open( mode, encoding=encoding, errors=errors, max_concurrency=max_concurrency, max_buffer_size=max_buffer_size, forward_ratio=forward_ratio, block_size=block_size)
[docs]@SmartPath.register class HttpPath(URIPath): protocol = "http" def __init__(self, path: PathLike, *other_paths: PathLike): if str(path).startswith('https://'): self.protocol = 'https' super().__init__(path, *other_paths)
[docs] @binary_open def open( self, mode: str = 'rb', *, max_concurrency: Optional[int] = None, max_buffer_size: int = DEFAULT_MAX_BUFFER_SIZE, forward_ratio: Optional[float] = None, block_size: int = DEFAULT_BLOCK_SIZE, **kwargs) -> Union[BufferedReader, HttpPrefetchReader]: '''Open a BytesIO to read binary data of given http(s) url .. note :: Essentially, it reads data of http(s) url to memory by requests, and then return BytesIO to user. :param mode: Only supports 'rb' mode now :param encoding: encoding is the name of the encoding used to decode or encode the file. This should only be used in text mode. :param errors: errors is an optional string that specifies how encoding and decoding errors are to be handled—this cannot be used in binary mode. :param max_concurrency: Max download thread number, None by default :param max_buffer_size: Max cached buffer size in memory, 128MB by default :param block_size: Size of single block, 8MB by default. Each block will be uploaded or downloaded by single thread. :return: BytesIO initialized with http(s) data ''' if mode not in ('rb',): raise ValueError('unacceptable mode: %r' % mode) response = None try: response = get_http_session(status_forcelist=()).get( self.path_with_protocol, stream=True) response.raise_for_status() except Exception as error: if response: response.close() raise translate_http_error(error, self.path_with_protocol) content_size = int(response.headers['Content-Length']) if response.headers.get( 'Accept-Ranges') == 'bytes' and content_size >= block_size * 2: response.close() block_capacity = max_buffer_size // block_size if forward_ratio is None: block_forward = None else: block_forward = max(int(block_capacity * forward_ratio), 1) return HttpPrefetchReader( self.path_with_protocol, content_size=content_size, max_retries=max_retries, max_workers=max_concurrency, block_capacity=block_capacity, block_forward=block_forward, block_size=block_size, ) response.raw.auto_close = False response.raw.name = self.path_with_protocol return BufferedReader(response.raw)
[docs] def stat(self, follow_symlinks=True) -> StatResult: ''' Get StatResult of http_url response, including size and mtime, referring to http_getsize and http_getmtime :param follow_symlinks: Ignore this parameter, just for compatibility :returns: StatResult :raises: HttpPermissionError, HttpFileNotFoundError ''' try: with get_http_session(status_forcelist=()).get( self.path_with_protocol, stream=True) as response: response.raise_for_status() headers = response.headers except Exception as error: raise translate_http_error(error, self.path_with_protocol) size = headers.get('Content-Length') if size: size = int(size) last_modified = headers.get('Last-Modified') if last_modified: last_modified = time.mktime( time.strptime(last_modified, "%a, %d %b %Y %H:%M:%S %Z")) return StatResult( # pyre-ignore[20] size=size, mtime=last_modified, isdir=False, islnk=False, extra=headers)
[docs] def getsize(self, follow_symlinks: bool = False) -> int: ''' Get file size on the given http_url path. If http response header don't support Content-Length, will return None :param follow_symlinks: Ignore this parameter, just for compatibility :returns: File size (in bytes) :raises: HttpPermissionError, HttpFileNotFoundError ''' return self.stat().size
[docs] def getmtime(self, follow_symlinks: bool = False) -> float: ''' Get Last-Modified time of the http request on the given http_url path. If http response header don't support Last-Modified, will return None :param follow_symlinks: Ignore this parameter, just for compatibility :returns: Last-Modified time (in Unix timestamp format) :raises: HttpPermissionError, HttpFileNotFoundError ''' return self.stat().mtime
[docs] def exists(self, followlinks: bool = False) -> bool: """Test if http path exists :param followlinks: ignore this parameter, just for compatibility :type followlinks: bool, optional :return: return True if exists :rtype: bool """ try: with get_http_session(status_forcelist=()).get( self.path_with_protocol, stream=True) as response: if response.status_code == 404: return False return True except requests.exceptions.ConnectionError: return False
[docs]@SmartPath.register class HttpsPath(HttpPath): protocol = "https"