Source code for megfile.s3_path

import hashlib
import io
import os
import re
import time
from concurrent.futures import ThreadPoolExecutor
from functools import lru_cache, wraps
from logging import getLogger as get_logger
from typing import IO, Any, AnyStr, BinaryIO, Callable, Dict, Iterator, List, Optional, Tuple, Union

import boto3
import botocore
from botocore.awsrequest import AWSResponse

from megfile.errors import S3BucketNotFoundError, S3ConfigError, S3FileExistsError, S3FileNotFoundError, S3IsADirectoryError, S3NameTooLongError, S3NotADirectoryError, S3NotALinkError, S3PermissionError, S3UnknownError, SameFileError, UnsupportedError, _create_missing_ok_generator
from megfile.errors import _logger as error_logger
from megfile.errors import patch_method, raise_s3_error, s3_error_code_should_retry, s3_should_retry, translate_fs_error, translate_s3_error
from megfile.interfaces import Access, ContextIterator, FileCacher, FileEntry, PathLike, StatResult, URIPath
from import is_same_file
from megfile.lib.compat import fspath
from megfile.lib.fnmatch import translate
from megfile.lib.glob import has_magic, has_magic_ignore_brace, ungloblize
from megfile.lib.joinpath import uri_join
from megfile.lib.s3_buffered_writer import DEFAULT_MAX_BUFFER_SIZE, GLOBAL_MAX_WORKERS, S3BufferedWriter
from megfile.lib.s3_cached_handler import S3CachedHandler
from megfile.lib.s3_limited_seekable_writer import S3LimitedSeekableWriter
from megfile.lib.s3_memory_handler import S3MemoryHandler
from megfile.lib.s3_pipe_handler import S3PipeHandler
from megfile.lib.s3_prefetch_reader import S3PrefetchReader
from megfile.lib.s3_share_cache_reader import S3ShareCacheReader
from megfile.lib.url import get_url_scheme
from megfile.smart_path import SmartPath
from megfile.utils import cachedproperty, calculate_md5, generate_cache_path, get_binary_mode, get_content_offset, is_readable, necessary_params, process_local, thread_local

__all__ = [
_logger = get_logger(__name__)
content_md5_header = 'megfile-content-md5'
endpoint_url = ''
max_pool_connections = GLOBAL_MAX_WORKERS  # for compatibility
max_retries = 10
max_keys = 1000

def _patch_make_request(client: botocore.client.BaseClient):

    def after_callback(result: Tuple[AWSResponse, dict], *args, **kwargs):
        if not isinstance(result, tuple) or len(result) != 2 \
            or not isinstance(result[0], AWSResponse) or not isinstance(result[1], dict):
            return result
        http, parsed_response = result
        if http.status_code >= 500:
            error_code = parsed_response.get("Error", {}).get("Code")
            operation_model = kwargs.get('operation_model') or (
                args[0] if args else None)
            operation_name = if operation_model else 'ProxyMethod'
            error_class = client.exceptions.from_code(error_code)
            raise error_class(parsed_response, operation_name)
        return result

    def retry_callback(error, operation_model, request_dict, request_context):
        if is_readable(request_dict['body']):

    def before_callback(operation_model, request_dict, request_context):
            'send s3 request: %r, with parameters: %s',,

    client._make_request = patch_method(
    return client

[docs]def parse_s3_url(s3_url: PathLike) -> Tuple[str, str]: s3_url = fspath(s3_url) if not is_s3(s3_url): raise ValueError('Not a s3 url: %r' % s3_url) rightpart = s3_url.split('://', maxsplit=1)[1] bucketmatch = re.match('(.*?)/', rightpart) if bucketmatch is None: bucket = rightpart path = '' else: bucket = path = rightpart[len(bucket) + 1:] return bucket, path
def get_scoped_config(profile_name: Optional[str] = None) -> Dict: return get_s3_session( profile_name=profile_name)._session.get_scoped_config() @lru_cache() def warning_endpoint_url(key: str, endpoint_url: str):"using %s: %s" % (key, endpoint_url))
[docs]def get_endpoint_url(profile_name: Optional[str] = None) -> str: '''Get the endpoint url of S3 :returns: S3 endpoint url ''' environ_key = f'{profile_name}__OSS_ENDPOINT'.upper( ) if profile_name else 'OSS_ENDPOINT' environ_endpoint_url = os.environ.get(environ_key) if environ_endpoint_url: warning_endpoint_url(environ_key, environ_endpoint_url) return environ_endpoint_url try: config_endpoint_url = get_scoped_config(profile_name=profile_name).get( 's3', {}).get('endpoint_url') if config_endpoint_url: warning_endpoint_url('~/.aws/config', config_endpoint_url) return config_endpoint_url except botocore.exceptions.ProfileNotFound: pass return endpoint_url
[docs]def get_s3_session(profile_name=None) -> boto3.Session: '''Get S3 session :returns: S3 session ''' return thread_local( f's3_session:{profile_name}', boto3.Session, profile_name=profile_name)
def get_access_token(profile_name=None): access_key_env_name = f"{profile_name}__AWS_ACCESS_KEY_ID".upper( ) if profile_name else "AWS_ACCESS_KEY_ID" secret_key_env_name = f"{profile_name}__AWS_SECRET_ACCESS_KEY".upper( ) if profile_name else "AWS_SECRET_ACCESS_KEY" access_key = os.getenv(access_key_env_name) secret_key = os.getenv(secret_key_env_name) if access_key and secret_key: return access_key, secret_key try: credentials = get_s3_session( profile_name=profile_name).get_credentials() except botocore.exceptions.ProfileNotFound: credentials = None if credentials: if not access_key: access_key = credentials.access_key if not secret_key: secret_key = credentials.secret_key return access_key, secret_key
[docs]def get_s3_client( config: Optional[botocore.config.Config] = None, cache_key: Optional[str] = None, profile_name: Optional[str] = None): '''Get S3 client :returns: S3 client ''' if cache_key is not None: local_storage = thread_local if S3_CLIENT_CACHE_MODE == 'process_local': local_storage = process_local return local_storage( f"{cache_key}:{profile_name}", get_s3_client, config=config, profile_name=profile_name) if config: config = botocore.config.Config( connect_timeout=5, max_pool_connections=GLOBAL_MAX_WORKERS).merge(config) else: config = botocore.config.Config( connect_timeout=5, max_pool_connections=GLOBAL_MAX_WORKERS) addressing_style_env_key = 'AWS_S3_ADDRESSING_STYLE' if profile_name: addressing_style_env_key = f'{profile_name}__AWS_S3_ADDRESSING_STYLE'.upper( ) addressing_style = os.environ.get(addressing_style_env_key) if addressing_style: config = config.merge( botocore.config.Config(s3={'addressing_style': addressing_style})) access_key, secret_key = get_access_token(profile_name) try: session = get_s3_session(profile_name=profile_name) except botocore.exceptions.ProfileNotFound: session = get_s3_session() client = session.client( 's3', endpoint_url=get_endpoint_url(profile_name=profile_name), config=config, aws_access_key_id=access_key, aws_secret_access_key=secret_key, ) client = _patch_make_request(client) return client
def get_s3_client_with_cache( config: Optional[botocore.config.Config] = None, profile_name: Optional[str] = None): return get_s3_client( config=config, cache_key='s3_filelike_client', profile_name=profile_name)
[docs]def s3_path_join(path: PathLike, *other_paths: PathLike) -> str: ''' Concat 2 or more path to a complete path :param path: Given path :param other_paths: Paths to be concatenated :returns: Concatenated complete path .. note :: The difference between this function and ``os.path.join`` is that this function ignores left side slash (which indicates absolute path) in ``other_paths`` and will directly concat. e.g. os.path.join('/path', 'to', '/file') => '/file', but s3_path_join('/path', 'to', '/file') => '/path/to/file' ''' return uri_join(fspath(path), *map(fspath, other_paths))
def _list_all_buckets(profile_name: Optional[str] = None) -> List[str]: client = get_s3_client_with_cache(profile_name=profile_name) response = client.list_buckets() return [content['Name'] for content in response['Buckets']] def _parse_s3_url_ignore_brace(s3_url: str) -> Tuple[str, str]: s3_url = fspath(s3_url) s3_scheme, rightpart = s3_url[:5], s3_url[5:] if s3_scheme != 's3://': raise ValueError('Not a s3 url: %r' % s3_url) left_brace = False for current_index, current_character in enumerate(rightpart): if current_character == "/" and left_brace is False: return rightpart[:current_index], rightpart[current_index + 1:] elif current_character == "{": left_brace = True elif current_character == "}": left_brace = False return rightpart, "" def _group_s3path_by_bucket( s3_pathname: str, profile_name: Optional[str] = None) -> List[str]: bucket, key = _parse_s3_url_ignore_brace(s3_pathname) if not bucket: if not key: raise UnsupportedError('Glob whole s3', s3_pathname) raise S3BucketNotFoundError('Empty bucket name: %r' % s3_pathname) grouped_path = [] def generate_s3_path(bucket: str, key: str) -> str: if key: return "s3://%s/%s" % (bucket, key) return "s3://%s%s" % (bucket, "/" if s3_pathname.endswith("/") else "") all_bucket = lru_cache(maxsize=1)(_list_all_buckets) for bucketname in ungloblize(bucket): if has_magic(bucketname): split_bucketname = bucketname.split("/", 1) path_part = None if len(split_bucketname) == 2: bucketname, path_part = split_bucketname pattern = re.compile(translate(re.sub(r'\*{2,}', '*', bucketname))) for bucket in all_bucket(profile_name): if pattern.fullmatch(bucket) is not None: if path_part is not None: bucket = "%s/%s" % (bucket, path_part) grouped_path.append(generate_s3_path(bucket, key)) else: grouped_path.append(generate_s3_path(bucketname, key)) return grouped_path def _s3_split_magic_ignore_brace(s3_pathname: str) -> Tuple[str, str]: if not s3_pathname: raise ValueError("s3_pathname: %s", s3_pathname) has_protocol = False if s3_pathname.startswith("s3://"): has_protocol = True s3_pathname = s3_pathname[5:] has_delimiter = False if s3_pathname.endswith("/"): has_delimiter = True s3_pathname = s3_pathname[:-1] normal_parts = [] magic_parts = [] left_brace = False left_index = 0 for current_index, current_character in enumerate(s3_pathname): if current_character == "/" and left_brace is False: if has_magic_ignore_brace(s3_pathname[left_index:current_index]): magic_parts.append(s3_pathname[left_index:current_index]) if s3_pathname[current_index + 1:]: magic_parts.append(s3_pathname[current_index + 1:]) left_index = len(s3_pathname) break normal_parts.append(s3_pathname[left_index:current_index]) left_index = current_index + 1 elif current_character == "{": left_brace = True elif current_character == "}": left_brace = False if s3_pathname[left_index:]: if has_magic_ignore_brace(s3_pathname[left_index:]): magic_parts.append(s3_pathname[left_index:]) else: normal_parts.append(s3_pathname[left_index:]) if has_protocol and normal_parts: normal_parts.insert(0, "s3:/") elif has_protocol: magic_parts.insert(0, "s3:/") if has_delimiter and magic_parts: magic_parts.append("") elif has_delimiter: normal_parts.append("") return "/".join(normal_parts), "/".join(magic_parts) def _group_s3path_by_prefix(s3_pathname: str) -> List[str]: _, key = parse_s3_url(s3_pathname) if not key: return ungloblize(s3_pathname) top_dir, magic_part = _s3_split_magic_ignore_brace(s3_pathname) if not top_dir: return [magic_part] grouped_path = [] for pathname in ungloblize(top_dir): if magic_part: pathname = "/".join([pathname, magic_part]) grouped_path.append(pathname) return grouped_path def _become_prefix(prefix: str) -> str: if prefix != '' and not prefix.endswith('/'): prefix += '/' return prefix def _s3_split_magic(s3_pathname: str) -> Tuple[str, str]: if not has_magic(s3_pathname): return s3_pathname, '' delimiter = '/' normal_parts = [] magic_parts = [] all_parts = s3_pathname.split(delimiter) for i, part in enumerate(all_parts): if not has_magic(part): normal_parts.append(part) else: magic_parts = all_parts[i:] break return delimiter.join(normal_parts), delimiter.join(magic_parts) def _list_objects_recursive( s3_client, bucket: str, prefix: str, delimiter: str = ''): resp = s3_client.list_objects_v2( Bucket=bucket, Prefix=prefix, Delimiter=delimiter, MaxKeys=max_keys) while True: yield resp if not resp['IsTruncated']: break resp = s3_client.list_objects_v2( Bucket=bucket, Prefix=prefix, Delimiter=delimiter, ContinuationToken=resp['NextContinuationToken'], MaxKeys=max_keys) def _make_stat(content: Dict[str, Any]): return StatResult( islnk=content.get('islnk', False), size=content['Size'], mtime=content['LastModified'].timestamp(), extra=content, ) def _s3_glob_stat_single_path( s3_pathname: PathLike, recursive: bool = True, missing_ok: bool = True, followlinks: bool = False, profile_name: Optional[str] = None) -> Iterator[FileEntry]: if not recursive: # If not recursive, replace ** with * s3_pathname = re.sub(r'\*{2,}', '*', s3_pathname) top_dir, wildcard_part = _s3_split_magic(s3_pathname) search_dir = wildcard_part.endswith('/') def should_recursive(wildcard_part: str) -> bool: if '**' in wildcard_part: return True for expanded_path in ungloblize(wildcard_part): parts_length = len(expanded_path.split('/')) if parts_length + search_dir >= 2: return True return False def create_generator(_s3_pathname) -> Iterator[FileEntry]: if not S3Path(top_dir).exists(): return if not has_magic(_s3_pathname): _s3_pathname_obj = S3Path(_s3_pathname) if _s3_pathname_obj.is_file(): stat = S3Path(_s3_pathname).stat(follow_symlinks=followlinks) yield FileEntry(, _s3_pathname_obj.path, stat) if _s3_pathname_obj.is_dir(): yield FileEntry(, _s3_pathname_obj.path, StatResult(isdir=True)) return delimiter = '' if not should_recursive(wildcard_part): delimiter = '/' dirnames = set() pattern = re.compile(translate(_s3_pathname)) bucket, key = parse_s3_url(top_dir) prefix = _become_prefix(key) client = get_s3_client_with_cache(profile_name=profile_name) with raise_s3_error(_s3_pathname): for resp in _list_objects_recursive(client, bucket, prefix, delimiter): for content in resp.get('Contents', []): path = s3_path_join('s3://', bucket, content['Key']) if not search_dir and pattern.match(path): yield FileEntry( S3Path(path).name, path, _make_stat(content)) dirname = os.path.dirname(path) while dirname not in dirnames and dirname != top_dir: dirnames.add(dirname) path = dirname + '/' if search_dir else dirname if pattern.match(path): yield FileEntry( S3Path(path).name, path, StatResult(isdir=True)) dirname = os.path.dirname(dirname) for common_prefix in resp.get('CommonPrefixes', []): path = s3_path_join( 's3://', bucket, common_prefix['Prefix']) dirname = os.path.dirname(path) if dirname not in dirnames and dirname != top_dir: dirnames.add(dirname) path = dirname + '/' if search_dir else dirname if pattern.match(path): yield FileEntry( S3Path(path).name, path, StatResult(isdir=True)) return create_generator(s3_pathname) def _s3_scan_pairs(src_url: PathLike, dst_url: PathLike) -> Iterator[Tuple[PathLike, PathLike]]: for src_file_path in S3Path(src_url).scan(): content_path = src_file_path[len(src_url):] if len(content_path) > 0: dst_file_path = s3_path_join(dst_url, content_path) else: dst_file_path = dst_url yield src_file_path, dst_file_path
[docs]def is_s3(path: PathLike) -> bool: ''' 1. According to `aws-cli <>`_ , test if a path is s3 path. 2. megfile also support the path like `s3[+profile_name]://bucket/key` :param path: Path to be tested :returns: True if path is s3 path, else False ''' path = fspath(path) if re.match(r'^s3(\+\w+)?:\/\/', path): return True return False
def _s3_binary_mode(s3_open_func): @wraps(s3_open_func) def wrapper( s3_url, mode: str = 'rb', encoding: Optional[str] = None, errors: Optional[str] = None, **kwargs): bucket, key = parse_s3_url(s3_url) if not bucket: raise S3BucketNotFoundError('Empty bucket name: %r' % s3_url) if not key or key.endswith('/'): raise S3IsADirectoryError('Is a directory: %r' % s3_url) if 'x' in mode: if S3Path(s3_url).is_file(): raise S3FileExistsError('File exists: %r' % s3_url) mode = mode.replace('x', 'w') fileobj = s3_open_func(s3_url, get_binary_mode(mode), **kwargs) if 'b' not in mode: fileobj = io.TextIOWrapper( fileobj, encoding=encoding, errors=errors) # pytype: disable=wrong-arg-types fileobj.mode = mode return fileobj return wrapper
[docs]@_s3_binary_mode def s3_prefetch_open( s3_url: PathLike, mode: str = 'rb', followlinks: bool = False, *, max_concurrency: Optional[int] = None, max_block_size: int = DEFAULT_BLOCK_SIZE) -> S3PrefetchReader: '''Open a asynchronous prefetch reader, to support fast sequential read and random read .. note :: User should make sure that reader / writer are closed correctly Supports context manager Some parameter setting may perform well: max_concurrency=10 or 20, max_block_size=8 or 16 MB, default value None means using global thread pool :param max_concurrency: Max download thread number, None by default :param max_block_size: Max data size downloaded by each thread, in bytes, 8MB by default :returns: An opened S3PrefetchReader object :raises: S3FileNotFoundError ''' if mode != 'rb': raise ValueError('unacceptable mode: %r' % mode) if not isinstance(s3_url, S3Path): s3_url = S3Path(s3_url) if followlinks: try: s3_url = s3_url.readlink() except S3NotALinkError: pass bucket, key = parse_s3_url(s3_url.path_with_protocol) config = botocore.config.Config(max_pool_connections=max_pool_connections) client = get_s3_client_with_cache( config=config, profile_name=s3_url._profile_name) return S3PrefetchReader( bucket, key, s3_client=client, max_retries=max_retries, max_workers=max_concurrency, block_size=max_block_size, profile_name=s3_url._profile_name)
[docs]@_s3_binary_mode def s3_share_cache_open( s3_url: PathLike, mode: str = 'rb', followlinks: bool = False, *, cache_key: str = 'lru', max_concurrency: Optional[int] = None, max_block_size: int = DEFAULT_BLOCK_SIZE) -> S3ShareCacheReader: '''Open a asynchronous prefetch reader, to support fast sequential read and random read .. note :: User should make sure that reader / writer are closed correctly Supports context manager Some parameter setting may perform well: max_concurrency=10 or 20, max_block_size=8 or 16 MB, default value None means using global thread pool :param max_concurrency: Max download thread number, None by default :param max_block_size: Max data size downloaded by each thread, in bytes, 8MB by default :returns: An opened S3ShareCacheReader object :raises: S3FileNotFoundError ''' if mode != 'rb': raise ValueError('unacceptable mode: %r' % mode) if not isinstance(s3_url, S3Path): s3_url = S3Path(s3_url) if followlinks: try: s3_url = s3_url.readlink() except S3NotALinkError: pass bucket, key = parse_s3_url(s3_url.path_with_protocol) config = botocore.config.Config(max_pool_connections=max_pool_connections) client = get_s3_client_with_cache( config=config, profile_name=s3_url._profile_name) return S3ShareCacheReader( bucket, key, cache_key=cache_key, s3_client=client, max_retries=max_retries, max_workers=max_concurrency, block_size=max_block_size, profile_name=s3_url._profile_name)
[docs]@_s3_binary_mode def s3_pipe_open( s3_url: PathLike, mode: str, followlinks: bool = False, *, join_thread: bool = True) -> S3PipeHandler: '''Open a asynchronous read-write reader / writer, to support fast sequential read / write .. note :: User should make sure that reader / writer are closed correctly Supports context manager When join_thread is False, while the file handle are closing, this function will not wait until the asynchronous writing finishes; False doesn't affect read-handle, but this can speed up write-handle because file will be written asynchronously. But asynchronous behaviour can guarantee the file are successfully written, and frequent execution may cause thread and file handle exhaustion :param mode: Mode to open file, either "rb" or "wb" :param join_thread: If wait after function execution until s3 finishes writing :returns: An opened BufferedReader / BufferedWriter object ''' if mode not in ('rb', 'wb'): raise ValueError('unacceptable mode: %r' % mode) if mode[0] == 'r' and not S3Path(s3_url).is_file(): raise S3FileNotFoundError('No such file: %r' % s3_url) if not isinstance(s3_url, S3Path): s3_url = S3Path(s3_url) if followlinks: try: s3_url = s3_url.readlink() except S3NotALinkError: pass bucket, key = parse_s3_url(s3_url.path_with_protocol) config = botocore.config.Config(max_pool_connections=max_pool_connections) client = get_s3_client_with_cache( config=config, profile_name=s3_url._profile_name) return S3PipeHandler( bucket, key, mode, s3_client=client, join_thread=join_thread, profile_name=s3_url._profile_name)
[docs]@_s3_binary_mode def s3_cached_open( s3_url: PathLike, mode: str, followlinks: bool = False, *, cache_path: Optional[str] = None) -> S3CachedHandler: '''Open a local-cache file reader / writer, for frequent random read / write .. note :: User should make sure that reader / writer are closed correctly Supports context manager cache_path can specify the path of cache file. Performance could be better if cache file path is on ssd or tmpfs :param mode: Mode to open file, could be one of "rb", "wb" or "ab" :param cache_path: cache file path :returns: An opened BufferedReader / BufferedWriter object ''' if mode not in ('rb', 'wb', 'ab', 'rb+', 'wb+', 'ab+'): raise ValueError('unacceptable mode: %r' % mode) if not isinstance(s3_url, S3Path): s3_url = S3Path(s3_url) if followlinks: try: s3_url = s3_url.readlink() except S3NotALinkError: pass bucket, key = parse_s3_url(s3_url.path_with_protocol) config = botocore.config.Config(max_pool_connections=max_pool_connections) client = get_s3_client_with_cache( config=config, profile_name=s3_url._profile_name) return S3CachedHandler( bucket, key, mode, s3_client=client, cache_path=cache_path, profile_name=s3_url._profile_name)
[docs]@_s3_binary_mode def s3_buffered_open( s3_url: PathLike, mode: str, followlinks: bool = False, *, max_concurrency: Optional[int] = None, max_buffer_size: int = DEFAULT_MAX_BUFFER_SIZE, forward_ratio: Optional[float] = None, block_size: int = DEFAULT_BLOCK_SIZE, limited_seekable: bool = False, buffered: bool = True, share_cache_key: Optional[str] = None, cache_path: Optional[str] = None ) -> Union[S3PrefetchReader, S3BufferedWriter, io.BufferedReader, io. BufferedWriter, S3MemoryHandler]: '''Open an asynchronous prefetch reader, to support fast sequential read .. note :: User should make sure that reader / writer are closed correctly Supports context manager Some parameter setting may perform well: max_concurrency=10 or 20, max_block_size=8 or 16 MB, default value None means using global thread pool :param max_concurrency: Max download thread number, None by default :param max_buffer_size: Max cached buffer size in memory, 128MB by default :param block_size: Size of single block, 8MB by default. Each block will be uploaded or downloaded by single thread. :param limited_seekable: If write-handle supports limited seek (both file head part and tail part can seek block_size). Notes: This parameter are valid only for write-handle. Read-handle support arbitrary seek :returns: An opened S3PrefetchReader object :raises: S3FileNotFoundError ''' if mode not in ('rb', 'wb', 'ab', 'rb+', 'wb+', 'ab+'): raise ValueError('unacceptable mode: %r' % mode) if not isinstance(s3_url, S3Path): s3_url = S3Path(s3_url) if followlinks: try: s3_url = s3_url.readlink() except S3NotALinkError: pass bucket, key = parse_s3_url(s3_url.path_with_protocol) config = botocore.config.Config(max_pool_connections=max_pool_connections) client = get_s3_client_with_cache( config=config, profile_name=s3_url._profile_name) if 'a' in mode or '+' in mode: if cache_path is None: return S3MemoryHandler( bucket, key, mode, s3_client=client, profile_name=s3_url._profile_name) return S3CachedHandler( bucket, key, mode, s3_client=client, cache_path=cache_path, profile_name=s3_url._profile_name) if mode == 'rb': # A rough conversion algorithm to align 2 types of Reader / Writer parameters # TODO: Optimize the conversion algorithm block_capacity = max_buffer_size // block_size if forward_ratio is None: block_forward = None else: block_forward = max(int(block_capacity * forward_ratio), 1) if share_cache_key is not None: reader = S3ShareCacheReader( bucket, key, cache_key=share_cache_key, s3_client=client, max_retries=max_retries, max_workers=max_concurrency, block_size=block_size, block_forward=block_forward, profile_name=s3_url._profile_name) else: reader = S3PrefetchReader( bucket, key, s3_client=client, max_retries=max_retries, max_workers=max_concurrency, block_capacity=block_capacity, block_forward=block_forward, block_size=block_size, profile_name=s3_url._profile_name) if buffered: reader = io.BufferedReader(reader) # pytype: disable=wrong-arg-types return reader if limited_seekable: writer = S3LimitedSeekableWriter( bucket, key, s3_client=client, max_workers=max_concurrency, max_buffer_size=max_buffer_size, block_size=block_size, profile_name=s3_url._profile_name) else: writer = S3BufferedWriter( bucket, key, s3_client=client, max_workers=max_concurrency, max_buffer_size=max_buffer_size, block_size=block_size, profile_name=s3_url._profile_name) if buffered: writer = io.BufferedWriter(writer) # pytype: disable=wrong-arg-types return writer
[docs]@_s3_binary_mode def s3_memory_open( s3_url: PathLike, mode: str, followlinks: bool = False) -> S3MemoryHandler: '''Open a memory-cache file reader / writer, for frequent random read / write .. note :: User should make sure that reader / writer are closed correctly Supports context manager :param mode: Mode to open file, could be one of "rb", "wb", "ab", "rb+", "wb+" or "ab+" :returns: An opened BufferedReader / BufferedWriter object ''' if mode not in ('rb', 'wb', 'ab', 'rb+', 'wb+', 'ab+'): raise ValueError('unacceptable mode: %r' % mode) if not isinstance(s3_url, S3Path): s3_url = S3Path(s3_url) if followlinks: try: s3_url = s3_url.readlink() except S3NotALinkError: pass bucket, key = parse_s3_url(s3_url.path_with_protocol) config = botocore.config.Config(max_pool_connections=max_pool_connections) client = get_s3_client_with_cache( config=config, profile_name=s3_url._profile_name) return S3MemoryHandler( bucket, key, mode, s3_client=client, profile_name=s3_url._profile_name)
s3_open = s3_buffered_open
[docs]def s3_download( src_url: PathLike, dst_url: PathLike, followlinks: bool = False, callback: Optional[Callable[[int], None]] = None) -> None: ''' Downloads a file from s3 to local filesystem. :param src_url: source s3 path :param dst_url: target fs path :param callback: Called periodically during copy, and the input parameter is the data size (in bytes) of copy since the last call ''' from megfile.fs import is_fs from megfile.fs_path import FSPath if not isinstance(src_url, S3Path): src_url = S3Path(src_url) if followlinks: try: src_url = src_url.readlink() except S3NotALinkError: pass src_bucket, src_key = parse_s3_url(src_url.path_with_protocol) if not src_bucket: raise S3BucketNotFoundError( 'Empty bucket name: %r' % src_url.path_with_protocol) if not src_url.exists(): raise S3FileNotFoundError( 'File not found: %r' % src_url.path_with_protocol) if not src_url.is_file(): raise S3IsADirectoryError( 'Is a directory: %r' % src_url.path_with_protocol) dst_url = fspath(dst_url) if not is_fs(dst_url): raise OSError(f'dst_url is not fs path: {dst_url}') if not dst_url or dst_url.endswith('/'): raise S3IsADirectoryError('Is a directory: %r' % dst_url) dst_path = FSPath(dst_url) dst_directory = os.path.dirname(dst_path.path_without_protocol) if dst_directory != '': os.makedirs(dst_directory, exist_ok=True) client = get_s3_client_with_cache(profile_name=src_url._profile_name) download_file = patch_method( client.download_file, max_retries=max_retries, should_retry=s3_should_retry, ) try: download_file( src_bucket, src_key, dst_path.path_without_protocol, Callback=callback) except Exception as error: error = translate_fs_error(error, dst_url) error = translate_s3_error(error, src_url.path_with_protocol) raise error src_stat = src_url.stat() os.utime( dst_path.path_without_protocol, (src_stat.st_mtime, src_stat.st_mtime))
[docs]def s3_upload( src_url: PathLike, dst_url: PathLike, callback: Optional[Callable[[int], None]] = None, **kwargs) -> None: ''' Uploads a file from local filesystem to s3. :param src_url: source fs path :param dst_url: target s3 path :param callback: Called periodically during copy, and the input parameter is the data size (in bytes) of copy since the last call ''' from megfile.fs import is_fs from megfile.fs_path import FSPath if not is_fs(src_url): raise OSError(f'src_url is not fs path: {src_url}') src_path = FSPath(src_url) dst_bucket, dst_key = parse_s3_url(dst_url) if not dst_bucket: raise S3BucketNotFoundError('Empty bucket name: %r' % dst_url) if not dst_key or dst_key.endswith('/'): raise S3IsADirectoryError('Is a directory: %r' % dst_url) client = get_s3_client_with_cache( profile_name=S3Path(dst_url)._profile_name) upload_fileobj = patch_method( client.upload_fileobj, max_retries=max_retries, should_retry=s3_should_retry, ) with open(src_path.path_without_protocol, 'rb') as src, raise_s3_error(dst_url): upload_fileobj(src, Bucket=dst_bucket, Key=dst_key, Callback=callback)
[docs]def s3_load_content( s3_url, start: Optional[int] = None, stop: Optional[int] = None, followlinks: bool = False) -> bytes: ''' Get specified file from [start, stop) in bytes :param s3_url: Specified path :param start: start index :param stop: stop index :returns: bytes content in range [start, stop) ''' def _get_object(client, bucket, key, range_str): return client.get_object( Bucket=bucket, Key=key, Range=range_str)['Body'].read() s3_url = S3Path(s3_url) if followlinks: try: s3_url = s3_url.readlink() except S3NotALinkError: pass bucket, key = parse_s3_url(s3_url.path_with_protocol) if not bucket: raise S3BucketNotFoundError('Empty bucket name: %r' % s3_url) if not key or key.endswith('/'): raise S3IsADirectoryError('Is a directory: %r' % s3_url) start, stop = get_content_offset( start, stop, s3_url.getsize(follow_symlinks=False)) if start == 0 and stop == 0: return b'' range_str = 'bytes=%d-%d' % (start, stop - 1) client = get_s3_client_with_cache(profile_name=s3_url._profile_name) with raise_s3_error(s3_url.path): return patch_method( _get_object, max_retries=max_retries, should_retry=s3_should_retry, )(client, bucket, key, range_str)
[docs]def s3_rename(src_url: PathLike, dst_url: PathLike): ''' Move s3 file path from src_url to dst_url :param dst_url: Given destination path ''' src_path_ins = S3Path(src_url) if src_path_ins.is_file(): src_path_ins.copy(dst_url) else: src_path_ins.sync(dst_url) src_path_ins.remove()
[docs]class S3Cacher(FileCacher): cache_path = None def __init__( self, path: str, cache_path: Optional[str] = None, mode: str = 'r'): if mode not in ('r', 'w', 'a'): raise ValueError('unacceptable mode: %r' % mode) if mode in ('r', 'a'): if cache_path is None: cache_path = generate_cache_path(path) s3_download(path, cache_path) = path self.mode = mode self.cache_path = cache_path def _close(self): if self.cache_path is not None and \ os.path.exists(self.cache_path): if self.mode in ('w', 'a'): s3_upload(self.cache_path, os.unlink(self.cache_path)
[docs]def s3_glob( path: PathLike, recursive: bool = True, missing_ok: bool = True, followlinks: bool = False, ) -> List[str]: '''Return s3 path list in ascending alphabetical order, in which path matches glob pattern Notes: Only glob in bucket. If trying to match bucket with wildcard characters, raise UnsupportedError :param recursive: If False, `**` will not search directory recursively :param missing_ok: If False and target path doesn't match any file, raise FileNotFoundError :raises: UnsupportedError, when bucket part contains wildcard characters :returns: A list contains paths match `s3_pathname` ''' return list( s3_iglob( path=path, recursive=recursive, missing_ok=missing_ok, followlinks=followlinks))
[docs]def s3_glob_stat( path: PathLike, recursive: bool = True, missing_ok: bool = True, followlinks: bool = False) -> Iterator[FileEntry]: '''Return a generator contains tuples of path and file stat, in ascending alphabetical order, in which path matches glob pattern Notes: Only glob in bucket. If trying to match bucket with wildcard characters, raise UnsupportedError :param recursive: If False, `**` will not search directory recursively :param missing_ok: If False and target path doesn't match any file, raise FileNotFoundError :raises: UnsupportedError, when bucket part contains wildcard characters :returns: A generator contains tuples of path and file stat, in which paths match `s3_pathname` ''' return S3Path(path).glob_stat( pattern="", recursive=recursive, missing_ok=missing_ok, followlinks=followlinks)
[docs]def s3_iglob( path: PathLike, recursive: bool = True, missing_ok: bool = True, followlinks: bool = False, ) -> Iterator[str]: '''Return s3 path iterator in ascending alphabetical order, in which path matches glob pattern Notes: Only glob in bucket. If trying to match bucket with wildcard characters, raise UnsupportedError :param recursive: If False, `**` will not search directory recursively :param missing_ok: If False and target path doesn't match any file, raise FileNotFoundError :raises: UnsupportedError, when bucket part contains wildcard characters :returns: An iterator contains paths match `s3_pathname` ''' for path_obj in S3Path(path).iglob(pattern="", recursive=recursive, missing_ok=missing_ok, followlinks=followlinks): yield path_obj.path_with_protocol
[docs]def s3_makedirs(path: PathLike, exist_ok: bool = False): ''' Create an s3 directory. Purely creating directory is invalid because it's unavailable on OSS. This function is to test the target bucket have WRITE access. :param path: Given path :param exist_ok: If False and target directory exists, raise S3FileExistsError :raises: S3BucketNotFoundError, S3FileExistsError ''' return S3Path(path).mkdir(parents=True, exist_ok=exist_ok)
def _group_src_paths_by_block( src_paths: List[PathLike], block_size: int = DEFAULT_BLOCK_SIZE ) -> List[List[Tuple[PathLike, Optional[str]]]]: groups = [] current_group, current_group_size = [], 0 for src_path in src_paths: current_file_size = S3Path(src_path).stat().size if current_file_size == 0: continue if current_file_size >= block_size: if len(groups) == 0: if current_group_size + current_file_size > 2 * block_size: group_lack_size = block_size - current_group_size current_group.append( (src_path, f'bytes=0-{group_lack_size-1}')) groups.extend( [ current_group, [ ( src_path, f'bytes={group_lack_size}-{current_file_size-1}' ) ] ]) else: current_group.append((src_path, None)) groups.append(current_group) else: groups[-1].extend(current_group) groups.append([(src_path, None)]) current_group, current_group_size = [], 0 else: current_group.append((src_path, None)) current_group_size += current_file_size if current_group_size >= block_size: groups.append(current_group) current_group, current_group_size = [], 0 if current_group: groups.append(current_group) return groups
[docs]def s3_concat( src_paths: List[PathLike], dst_path: PathLike, block_size: int = DEFAULT_BLOCK_SIZE, max_workers: int = GLOBAL_MAX_WORKERS) -> None: '''Concatenate s3 files to one file. :param src_paths: Given source paths :param dst_path: Given destination path ''' client = S3Path(dst_path)._client with raise_s3_error(dst_path): if block_size == 0: groups = [[(src_path, None)] for src_path in src_paths] else: groups = _group_src_paths_by_block(src_paths, block_size=block_size) with MultiPartWriter(client, dst_path) as writer, ThreadPoolExecutor( max_workers=max_workers) as executor: for index, group in enumerate(groups, start=1): if len(group) == 1: executor.submit( writer.upload_part_copy, index, group[0][0], group[0][1]) else: executor.submit(writer.upload_part_by_paths, index, group)
[docs]def s3_lstat(path: PathLike) -> StatResult: '''Like Path.stat() but, if the path points to a symbolic link, return the symbolic link’s information rather than its target’s.''' return S3Path(path).lstat()
[docs]@SmartPath.register class S3Path(URIPath): protocol = "s3" def __init__(self, path: "PathLike", *other_paths: "PathLike"): super().__init__(path, *other_paths) protocol = get_url_scheme(self.path) self._protocol_with_profile = self.protocol self._profile_name = None if protocol.startswith('s3+'): self._protocol_with_profile = protocol self._profile_name = protocol[3:] self._s3_path = f"s3://{self.path[len(protocol)+3:]}" elif not protocol: self._s3_path = f"s3://{self.path.lstrip('/')}" else: self._s3_path = self.path
[docs] @cachedproperty def path_with_protocol(self) -> str: '''Return path with protocol, like file:///root, s3://bucket/key''' path = self.path protocol_prefix = self._protocol_with_profile + "://" if path.startswith(protocol_prefix): return path return protocol_prefix + path.lstrip('/')
[docs] @cachedproperty def path_without_protocol(self) -> str: '''Return path without protocol, example: if path is s3://bucket/key, return bucket/key''' path = self.path protocol_prefix = self._protocol_with_profile + "://" if path.startswith(protocol_prefix): path = path[len(protocol_prefix):] return path
[docs] @cachedproperty def parts(self) -> Tuple[str]: '''A tuple giving access to the path’s various components''' parts = [f"{self._protocol_with_profile}://"] path = self.path_without_protocol path = path.lstrip('/') if path != '': parts.extend(path.split('/')) return tuple(parts)
@cachedproperty def _client(self): return get_s3_client_with_cache(profile_name=self._profile_name) def _s3_get_metadata(self) -> dict: ''' Get object metadata :param path: Object path :returns: Object metadata ''' bucket, key = parse_s3_url(self.path_with_protocol) if not bucket: return {} if not key or key.endswith('/'): return {} try: with raise_s3_error(self.path_with_protocol): resp = self._client.head_object(Bucket=bucket, Key=key) return dict( (key.lower(), value) for key, value in resp['Metadata'].items()) except Exception as error: if isinstance(error, (S3UnknownError, S3ConfigError, S3PermissionError)): raise error return {}
[docs] def access( self, mode: Access = Access.READ, followlinks: bool = False) -> bool: ''' Test if path has access permission described by mode :param mode: access mode :returns: bool, if the bucket of s3_url has read/write access. ''' s3_url = self.path_with_protocol if followlinks: try: s3_url = self.readlink().path_with_protocol except S3NotALinkError: pass bucket, key = parse_s3_url(s3_url) # only check bucket accessibility if not bucket: raise Exception("No available bucket") if not isinstance(mode, Access): raise TypeError( 'Unsupported mode: {} -- Mode should use one of the enums belonging to: {}' .format(mode, ', '.join([str(a) for a in Access]))) if mode not in (Access.READ, Access.WRITE): raise TypeError('Unsupported mode: {}'.format(mode)) try: if not self.exists(): return False except Exception as error: error = translate_s3_error(error, s3_url) if isinstance(error, S3PermissionError): return False raise error if mode == Access.READ: return True try: if not key: key = 'test' elif key.endswith('/'): key = key[:-1] upload_id = self._client.create_multipart_upload( Bucket=bucket, Key=key, )['UploadId'] self._client.abort_multipart_upload( Bucket=bucket, Key=key, UploadId=upload_id, ) return True except Exception as error: error = translate_s3_error(error, s3_url) if isinstance(error, S3PermissionError): return False raise error
[docs] def exists(self, followlinks: bool = False) -> bool: ''' Test if s3_url exists If the bucket of s3_url are not permitted to read, return False :returns: True if s3_url eixsts, else False ''' bucket, key = parse_s3_url(self.path_with_protocol) if not bucket: # s3:// => True, s3:///key => False return not key return self.is_file(followlinks) or self.is_dir()
[docs] def getmtime(self, follow_symlinks: bool = False) -> float: ''' Get last-modified time of the file on the given s3_url path (in Unix timestamp format). If the path is an existent directory, return the latest modified time of all file in it. The mtime of empty directory is 1970-01-01 00:00:00 If s3_url is not an existent path, which means s3_exist(s3_url) returns False, then raise S3FileNotFoundError :returns: Last-modified time :raises: S3FileNotFoundError, UnsupportedError ''' return self.stat(follow_symlinks=follow_symlinks).mtime
[docs] def getsize(self, follow_symlinks: bool = False) -> int: ''' Get file size on the given s3_url path (in bytes). If the path in a directory, return the sum of all file size in it, including file in subdirectories (if exist). The result excludes the size of directory itself. In other words, return 0 Byte on an empty directory path. If s3_url is not an existent path, which means s3_exist(s3_url) returns False, then raise S3FileNotFoundError :returns: File size :raises: S3FileNotFoundError, UnsupportedError ''' return self.stat(follow_symlinks=follow_symlinks).size
[docs] def glob( self, pattern, recursive: bool = True, missing_ok: bool = True, followlinks: bool = False, ) -> List['S3Path']: '''Return s3 path list in ascending alphabetical order, in which path matches glob pattern Notes: Only glob in bucket. If trying to match bucket with wildcard characters, raise UnsupportedError :param pattern: Glob the given relative pattern in the directory represented by this path :param recursive: If False, `**` will not search directory recursively :param missing_ok: If False and target path doesn't match any file, raise FileNotFoundError :raises: UnsupportedError, when bucket part contains wildcard characters :returns: A list contains paths match `s3_pathname` ''' return list( self.iglob( pattern=pattern, recursive=recursive, missing_ok=missing_ok, followlinks=followlinks))
[docs] def glob_stat( self, pattern, recursive: bool = True, missing_ok: bool = True, followlinks: bool = False) -> Iterator[FileEntry]: '''Return a generator contains tuples of path and file stat, in ascending alphabetical order, in which path matches glob pattern Notes: Only glob in bucket. If trying to match bucket with wildcard characters, raise UnsupportedError :param pattern: Glob the given relative pattern in the directory represented by this path :param recursive: If False, `**` will not search directory recursively :param missing_ok: If False and target path doesn't match any file, raise FileNotFoundError :raises: UnsupportedError, when bucket part contains wildcard characters :returns: A generator contains tuples of path and file stat, in which paths match `s3_pathname` ''' glob_path = self._s3_path if pattern: glob_path = self.joinpath(pattern)._s3_path # pytype: disable=attribute-error s3_pathname = fspath(glob_path) def create_generator(): for group_s3_pathname_1 in _group_s3path_by_bucket( s3_pathname, self._profile_name): for group_s3_pathname_2 in _group_s3path_by_prefix( group_s3_pathname_1): for file_entry in _s3_glob_stat_single_path( group_s3_pathname_2, recursive, missing_ok, followlinks=followlinks, profile_name=self._profile_name): if self._profile_name: file_entry = file_entry._replace( path= f"{self._protocol_with_profile}://{file_entry.path[5:]}" ) yield file_entry return _create_missing_ok_generator( create_generator(), missing_ok, S3FileNotFoundError('No match any file: %r' % s3_pathname))
[docs] def iglob( self, pattern, recursive: bool = True, missing_ok: bool = True, followlinks: bool = False, ) -> Iterator['S3Path']: '''Return s3 path iterator in ascending alphabetical order, in which path matches glob pattern Notes: Only glob in bucket. If trying to match bucket with wildcard characters, raise UnsupportedError :param pattern: Glob the given relative pattern in the directory represented by this path :param recursive: If False, `**` will not search directory recursively :param missing_ok: If False and target path doesn't match any file, raise FileNotFoundError :raises: UnsupportedError, when bucket part contains wildcard characters :returns: An iterator contains paths match `s3_pathname` ''' for file_entry in self.glob_stat(pattern=pattern, recursive=recursive, missing_ok=missing_ok, followlinks=followlinks): yield self.from_path(file_entry.path)
[docs] def is_dir(self, followlinks: bool = False) -> bool: ''' Test if an s3 url is directory Specific procedures are as follows: If there exists a suffix, of which ``os.path.join(s3_url, suffix)`` is a file If the url is empty bucket or s3:// :param followlinks: whether followlinks is True or False, result is the same. Because s3 symlink not support dir. :returns: True if path is s3 directory, else False ''' bucket, key = parse_s3_url(self.path_with_protocol) if not bucket: # s3:// => True, s3:///key => False return not key prefix = _become_prefix(key) try: resp = self._client.list_objects_v2( Bucket=bucket, Prefix=prefix, Delimiter='/', MaxKeys=1) except Exception as error: error = translate_s3_error(error, self.path_with_protocol) if isinstance(error, (S3UnknownError, S3ConfigError, S3PermissionError)): raise error return False if not key: # bucket is accessible return True if 'KeyCount' in resp: return resp['KeyCount'] > 0 return len(resp.get('Contents', [])) > 0 or \ len(resp.get('CommonPrefixes', [])) > 0
[docs] def is_file(self, followlinks: bool = False) -> bool: ''' Test if an s3_url is file :returns: True if path is s3 file, else False ''' s3_url = self.path_with_protocol if followlinks: try: s3_url = self.readlink().path_with_protocol except S3NotALinkError: pass bucket, key = parse_s3_url(s3_url) if not bucket or not key or key.endswith('/'): # s3://, s3:///key, s3://bucket, s3://bucket/prefix/ return False try: self._client.head_object(Bucket=bucket, Key=key) except Exception as error: error = translate_s3_error(error, s3_url) if isinstance(error, (S3UnknownError, S3ConfigError, S3PermissionError)): raise error return False return True
[docs] def listdir(self, followlinks: bool = False) -> List[str]: ''' Get all contents of given s3_url. The result is in acsending alphabetical order. :returns: All contents have prefix of s3_url in acsending alphabetical order :raises: S3FileNotFoundError, S3NotADirectoryError ''' entries = list(self.scandir(followlinks=followlinks)) return sorted([ for entry in entries])
[docs] def iterdir(self, followlinks: bool = False) -> Iterator['S3Path']: ''' Get all contents of given s3_url. The result is in acsending alphabetical order. :returns: All contents have prefix of s3_url in acsending alphabetical order :raises: S3FileNotFoundError, S3NotADirectoryError ''' for path in self.listdir(followlinks=followlinks): yield self.joinpath(path) # type: ignore
[docs] def load(self, followlinks: bool = False) -> BinaryIO: '''Read all content in binary on specified path and write into memory User should close the BinaryIO manually :returns: BinaryIO ''' s3_url = self.path_with_protocol if followlinks: try: s3_url = self.readlink().path_with_protocol except S3NotALinkError: pass bucket, key = parse_s3_url(s3_url) if not bucket: raise S3BucketNotFoundError('Empty bucket name: %r' % s3_url) if not key or key.endswith('/'): raise S3IsADirectoryError('Is a directory: %r' % s3_url) buffer = io.BytesIO() with raise_s3_error(s3_url): self._client.download_fileobj(bucket, key, buffer) return buffer
[docs] def hasbucket(self) -> bool: ''' Test if the bucket of s3_url exists :returns: True if bucket of s3_url eixsts, else False ''' bucket, _ = parse_s3_url(self.path_with_protocol) if not bucket: return False try: self._client.head_bucket(Bucket=bucket) except Exception as error: error = translate_s3_error(error, self.path_with_protocol) if isinstance(error, S3PermissionError): # Aliyun OSS doesn't give bucket api permission when you only have read and write permission try: self._client.list_objects_v2(Bucket=bucket, MaxKeys=1) return True except Exception as error2: error2 = translate_s3_error(error2, self.path_with_protocol) if isinstance( error2, (S3UnknownError, S3ConfigError, S3PermissionError)): raise error2 return False elif isinstance(error, (S3UnknownError, S3ConfigError)): raise error elif isinstance(error, S3FileNotFoundError): return False return True
[docs] def mkdir(self, mode=0o777, parents: bool = False, exist_ok: bool = False): ''' Create an s3 directory. Purely creating directory is invalid because it's unavailable on OSS. This function is to test the target bucket have WRITE access. :param mode: mode is ignored, only be compatible with pathlib.Path :param parents: parents is ignored, only be compatible with pathlib.Path :param exist_ok: If False and target directory exists, raise S3FileExistsError :raises: S3BucketNotFoundError, S3FileExistsError ''' bucket, _ = parse_s3_url(self.path_with_protocol) if not bucket: raise S3BucketNotFoundError( 'Empty bucket name: %r' % self.path_with_protocol) if not self.hasbucket(): raise S3BucketNotFoundError( 'No such bucket: %r' % self.path_with_protocol) if exist_ok: if self.is_file(): raise S3FileExistsError( 'File exists: %r' % self.path_with_protocol) return if self.exists(): raise S3FileExistsError('File exists: %r' % self.path_with_protocol)
[docs] def move(self, dst_url: PathLike) -> None: ''' Move file/directory path from src_url to dst_url :param dst_url: Given destination path ''' for src_file_path, dst_file_path in _s3_scan_pairs( self.path_with_protocol, dst_url): S3Path(src_file_path).rename(dst_file_path)
[docs] def remove(self, missing_ok: bool = False) -> None: ''' Remove the file or directory on s3, `s3://` and `s3://bucket` are not permitted to remove :param missing_ok: if False and target file/directory not exists, raise S3FileNotFoundError :raises: S3PermissionError, S3FileNotFoundError, UnsupportedError ''' bucket, key = parse_s3_url(self.path_with_protocol) if not bucket: if not key: raise UnsupportedError( 'Remove whole s3', self.path_with_protocol) raise S3BucketNotFoundError( 'Empty bucket name: %r' % self.path_with_protocol) if not key: raise UnsupportedError('Remove bucket', self.path_with_protocol) if not self.exists(): if missing_ok: return raise S3FileNotFoundError( 'No such file or directory: %r' % self.path_with_protocol) client = self._client with raise_s3_error(self.path_with_protocol): if self.is_file(): client.delete_object(Bucket=bucket, Key=key) return prefix = _become_prefix(key) total_count, error_count = 0, 0 for resp in _list_objects_recursive(client, bucket, prefix): if 'Contents' in resp: keys = [ { 'Key': content['Key'] } for content in resp['Contents'] ] total_count += len(keys) errors = [] retries = 2 retry_interval = min(0.1 * 2**retries, 30) for i in range(retries): # doc: if not keys: break response = client.delete_objects( Bucket=bucket, Delete={'Objects': keys}) keys = [] for error_info in response.get('Errors', []): if s3_error_code_should_retry( error_info.get('Code')): error_logger.warning( "retry %s times, removing file: %s, with error %s: %s" % ( i + 1, error_info['Key'], error_info['Code'], error_info['Message'])) keys.append({'Key': error_info['Key']}) else: errors.append(error_info) time.sleep(retry_interval) for error_info in errors: error_logger.error( "failed remove file: %s, with error %s: %s" % ( error_info['Key'], error_info['Code'], error_info['Message'])) error_count += len(errors) if error_count > 0: error_msg = "failed remove path: %s, total file count: %s, failed count: %s" % ( self.path_with_protocol, total_count, error_count) raise S3UnknownError( Exception(error_msg), self.path_with_protocol)
[docs] def rename(self, dst_path: PathLike) -> 'S3Path': ''' Move s3 file path from src_url to dst_url :param dst_path: Given destination path ''' s3_rename(self.path_with_protocol, dst_path) return self.from_path(dst_path)
[docs] def scan(self, missing_ok: bool = True, followlinks: bool = False) -> Iterator[str]: ''' Iteratively traverse only files in given s3 directory, in alphabetical order. Every iteration on generator yields a path string. If s3_url is a file path, yields the file only If s3_url is a non-existent path, return an empty generator If s3_url is a bucket path, return all file paths in the bucket If s3_url is an empty bucket, return an empty generator If s3_url doesn't contain any bucket, which is s3_url == 's3://', raise UnsupportedError. walk() on complete s3 is not supported in megfile :param missing_ok: If False and there's no file in the directory, raise FileNotFoundError :raises: UnsupportedError :returns: A file path generator ''' scan_stat_iter = self.scan_stat( missing_ok=missing_ok, followlinks=followlinks) def create_generator() -> Iterator[str]: for file_entry in scan_stat_iter: yield file_entry.path return create_generator()
[docs] def scan_stat(self, missing_ok: bool = True, followlinks: bool = False) -> Iterator[FileEntry]: ''' Iteratively traverse only files in given directory, in alphabetical order. Every iteration on generator yields a tuple of path string and file stat :param missing_ok: If False and there's no file in the directory, raise FileNotFoundError :raises: UnsupportedError :returns: A file path generator ''' bucket, key = parse_s3_url(self.path_with_protocol) if not bucket: raise UnsupportedError('Scan whole s3', self.path_with_protocol) def create_generator() -> Iterator[FileEntry]: if not self.is_dir(): if self.is_file(): # On s3, file and directory may be of same name and level, so need to test the path is file or directory yield FileEntry(, fspath(self.path_with_protocol), self.stat(follow_symlinks=followlinks)) return if not key.endswith('/') and self.is_file(): yield FileEntry(, fspath(self.path_with_protocol), self.stat(follow_symlinks=followlinks)) prefix = _become_prefix(key) client = self._client with raise_s3_error(self.path_with_protocol): for resp in _list_objects_recursive(client, bucket, prefix): for content in resp.get('Contents', []): full_path = s3_path_join( f'{self._protocol_with_profile}://', bucket, content['Key']) if followlinks: try: origin_path = self.from_path( full_path).readlink() yield FileEntry(, origin_path.path_with_protocol, origin_path.lstat()) continue except S3NotALinkError: pass yield FileEntry( S3Path(full_path).name, full_path, _make_stat(content)) return _create_missing_ok_generator( create_generator(), missing_ok, S3FileNotFoundError( 'No match any file in: %r' % self.path_with_protocol))
[docs] def scandir(self, followlinks: bool = False) -> Iterator[FileEntry]: ''' Get all contents of given s3_url, the order of result is not guaranteed. :returns: All contents have prefix of s3_url :raises: S3FileNotFoundError, S3NotADirectoryError ''' bucket, key = parse_s3_url(self.path_with_protocol) if not bucket and key: raise S3BucketNotFoundError( 'Empty bucket name: %r' % self.path_with_protocol) if self.is_file(): raise S3NotADirectoryError( 'Not a directory: %r' % self.path_with_protocol) elif not self.is_dir(): raise S3FileNotFoundError( 'No such directory: %r' % self.path_with_protocol) prefix = _become_prefix(key) client = self._client # In order to do check on creation, # we need to wrap the iterator in another function def create_generator() -> Iterator[FileEntry]: with raise_s3_error(self.path_with_protocol): def generate_s3_path( protocol: str, bucket: str, key: str) -> str: return "%s://%s/%s" % (protocol, bucket, key) if not bucket and not key: # list buckets response = client.list_buckets() for content in response['Buckets']: yield FileEntry( content['Name'], f"s3://{content['Name']}", StatResult( ctime=content['CreationDate'].timestamp(), isdir=True, extra=content, )) return for resp in _list_objects_recursive(client, bucket, prefix, '/'): for common_prefix in resp.get('CommonPrefixes', []): yield FileEntry( common_prefix['Prefix'][len(prefix):-1], generate_s3_path( self._protocol_with_profile, bucket, common_prefix['Prefix']), StatResult(isdir=True, extra=common_prefix)) for content in resp.get('Contents', []): src_url = generate_s3_path( self._protocol_with_profile, bucket, content['Key']) if followlinks: try: origin_path = self.from_path(src_url).readlink() yield FileEntry(, origin_path.path_with_protocol, origin_path.lstat()) continue except S3NotALinkError: pass yield FileEntry( content['Key'][len(prefix):], src_url, _make_stat(content)) return ContextIterator(create_generator())
def _getdirstat(self) -> StatResult: ''' Return StatResult of given s3_url directory, including: 1. Directory size: the sum of all file size in it, including file in subdirectories (if exist). The result excludes the size of directory itself. In other words, return 0 Byte on an empty directory path 2. Last-modified time of directory: return the latest modified time of all file in it. The mtime of empty directory is 1970-01-01 00:00:00 :returns: An int indicates size in Bytes ''' if not self.is_dir(): raise S3FileNotFoundError( 'No such file or directory: %r' % self.path_with_protocol) bucket, key = parse_s3_url(self.path_with_protocol) prefix = _become_prefix(key) client = self._client size = 0 mtime = 0.0 with raise_s3_error(self.path_with_protocol): for resp in _list_objects_recursive(client, bucket, prefix): for content in resp.get('Contents', []): size += content['Size'] last_modified = content['LastModified'].timestamp() if mtime < last_modified: mtime = last_modified return StatResult(size=size, mtime=mtime, isdir=True)
[docs] def stat(self, follow_symlinks=True) -> StatResult: ''' Get StatResult of s3_url file, including file size and mtime, referring to s3_getsize and s3_getmtime If s3_url is not an existent path, which means s3_exist(s3_url) returns False, then raise S3FileNotFoundError If attempt to get StatResult of complete s3, such as s3_dir_url == 's3://', raise S3BucketNotFoundError :returns: StatResult :raises: S3FileNotFoundError, S3BucketNotFoundError ''' islnk = False bucket, key = parse_s3_url(self.path_with_protocol) if not bucket: raise S3BucketNotFoundError( 'Empty bucket name: %r' % self.path_with_protocol) if not self.is_file(): return self._getdirstat() client = self._client with raise_s3_error(self.path_with_protocol): content = client.head_object(Bucket=bucket, Key=key) if 'Metadata' in content: metadata = dict( (key.lower(), value) for key, value in content['Metadata'].items()) if metadata and 'symlink_to' in metadata: islnk = True if islnk and follow_symlinks: s3_url = metadata['symlink_to'] bucket, key = parse_s3_url(s3_url) content = client.head_object(Bucket=bucket, Key=key) stat_record = StatResult( islnk=islnk, size=content['ContentLength'], mtime=content['LastModified'].timestamp(), extra=content) return stat_record
[docs] def walk(self, followlinks: bool = False ) -> Iterator[Tuple[str, List[str], List[str]]]: ''' Iteratively traverse the given s3 directory, in top-bottom order. In other words, firstly traverse parent directory, if subdirectories exist, traverse the subdirectories in alphabetical order. Every iteration on generator yields a 3-tuple: (root, dirs, files) - root: Current s3 path; - dirs: Name list of subdirectories in current directory. The list is sorted by name in ascending alphabetical order; - files: Name list of files in current directory. The list is sorted by name in ascending alphabetical order; If s3_url is a file path, return an empty generator If s3_url is a non-existent path, return an empty generator If s3_url is a bucket path, bucket will be the top directory, and will be returned at first iteration of generator If s3_url is an empty bucket, only yield one 3-tuple (notes: s3 doesn't have empty directory) If s3_url doesn't contain any bucket, which is s3_url == 's3://', raise UnsupportedError. walk() on complete s3 is not supported in megfile :param followlinks: whether followlinks is True or False, result is the same. Because s3 symlink not support dir. :raises: UnsupportedError :returns: A 3-tuple generator ''' bucket, key = parse_s3_url(self.path_with_protocol) if not bucket: raise UnsupportedError('Walk whole s3', self.path_with_protocol) if not self.is_dir(): return stack = [key] client = self._client while len(stack) > 0: current = _become_prefix(stack.pop()) dirs, files = [], [] for resp in _list_objects_recursive(client, bucket, current, '/'): for common_prefix in resp.get('CommonPrefixes', []): dirs.append(common_prefix['Prefix'][:-1]) for content in resp.get('Contents', []): files.append(content['Key']) dirs = sorted(dirs) stack.extend(reversed(dirs)) root = s3_path_join( f'{self._protocol_with_profile}://', bucket, current)[:-1] dirs = [path[len(current):] for path in dirs] files = sorted(path[len(current):] for path in files) yield root, dirs, files
[docs] def md5(self, recalculate: bool = False, followlinks: bool = False) -> str: ''' Get md5 meta info in files that uploaded/copied via megfile If meta info is lost or non-existent, return None :param recalculate: calculate md5 in real-time or return s3 etag :param followlinks: If is True, calculate md5 for real file :returns: md5 meta info ''' bucket, _ = parse_s3_url(self.path_with_protocol) if not bucket: raise S3BucketNotFoundError( 'Empty bucket name: %r' % self.path_with_protocol) stat = self.stat(follow_symlinks=followlinks) if stat.isdir: hash_md5 = hashlib.md5() # nosec for file_name in self.listdir(): chunk = S3Path( s3_path_join( self.path_with_protocol, file_name)).md5(recalculate=recalculate).encode() hash_md5.update(chunk) return hash_md5.hexdigest() if recalculate: path_instance = self if followlinks: try: path_instance = self.readlink() except S3NotALinkError: pass with'rb') as f: return calculate_md5(f) return stat.extra.get('ETag', '')[1:-1]
[docs] def copy( self, dst_url: PathLike, followlinks: bool = False, callback: Optional[Callable[[int], None]] = None) -> None: ''' File copy on S3 Copy content of file on `src_path` to `dst_path`. It's caller's responsebility to ensure the s3_isfile(src_url) == True :param dst_path: Target file path :param callback: Called periodically during copy, and the input parameter is the data size (in bytes) of copy since the last call ''' src_url = self.path_with_protocol src_bucket, src_key = parse_s3_url(src_url) dst_bucket, dst_key = parse_s3_url(dst_url) if dst_bucket == src_bucket and src_key.rstrip('/') == dst_key.rstrip( '/'): raise SameFileError( f"'{src_url}' and '{dst_url}' are the same file") if not src_bucket: raise S3BucketNotFoundError('Empty bucket name: %r' % src_url) if self.is_dir(): raise S3IsADirectoryError('Is a directory: %r' % src_url) if not dst_bucket: raise S3BucketNotFoundError('Empty bucket name: %r' % dst_url) if not dst_key or dst_key.endswith('/'): raise S3IsADirectoryError('Is a directory: %r' % dst_url) if followlinks: try: s3_url = self.readlink().path src_bucket, src_key = parse_s3_url(s3_url) except S3NotALinkError: pass with raise_s3_error(f"'{src_url}' or '{dst_url}'"): self._client.copy( { 'Bucket': src_bucket, 'Key': src_key, }, Bucket=dst_bucket, Key=dst_key, Callback=callback)
[docs] def sync( self, dst_url: PathLike, followlinks: bool = False, force: bool = False) -> None: ''' Copy file/directory on src_url to dst_url :param dst_url: Given destination path :param followlinks: False if regard symlink as file, else True :param force: Sync file forcely, do not ignore same files ''' for src_file_path, dst_file_path in _s3_scan_pairs( self.path_with_protocol, dst_url): src_file_path = self.from_path(src_file_path) dst_file_path = self.from_path(dst_file_path) if not force and dst_file_path.exists() and is_same_file( src_file_path.stat(), dst_file_path.stat(), 'copy'): continue src_file_path.copy(dst_file_path, followlinks=followlinks)
[docs] def save(self, file_object: BinaryIO): '''Write the opened binary stream to specified path, but the stream won't be closed :param file_object: Stream to be read ''' bucket, key = parse_s3_url(self.path_with_protocol) if not bucket: raise S3BucketNotFoundError( 'Empty bucket name: %r' % self.path_with_protocol) if not key or key.endswith('/'): raise S3IsADirectoryError( 'Is a directory: %r' % self.path_with_protocol) with raise_s3_error(self.path_with_protocol): self._client.upload_fileobj(file_object, Bucket=bucket, Key=key)
[docs] def open( self, mode: str = 'r', *, encoding: Optional[str] = None, errors: Optional[str] = None, s3_open_func: Callable[[str, str], BinaryIO] = s3_open, **kwargs) -> IO[AnyStr]: # pytype: disable=signature-mismatch return s3_open_func( # pytype: disable=wrong-keyword-args self, mode, encoding=encoding, errors=errors, **necessary_params(s3_open_func, **kwargs))
[docs] def absolute(self) -> 'S3Path': ''' Make the path absolute, without normalization or resolving symlinks. Returns a new path object ''' return self
[docs] def cwd(self) -> 'S3Path': '''Return current working directory returns: Current working directory ''' return self.from_path(self.path_with_protocol)
class MultiPartWriter: def __init__(self, client, path: PathLike) -> None: self._client = client self._multipart_upload_info = [] bucket, key = parse_s3_url(path) self._bucket = bucket self._key = key self._upload_id = self._client.create_multipart_upload( Bucket=self._bucket, Key=self._key)['UploadId'] def upload_part(self, part_num: int, file_obj: io.BytesIO) -> None: response = self._client.upload_part( Body=file_obj, UploadId=self._upload_id, PartNumber=part_num, Bucket=self._bucket, Key=self._key, ) self._multipart_upload_info.append( { 'PartNumber': part_num, 'ETag': response['ETag'] }) def upload_part_by_paths( self, part_num: int, paths: List[Tuple[PathLike, str]]) -> None: file_obj = io.BytesIO() def get_object( client, bucket, key, range_str: Optional[str] = None) -> bytes: if range_str: return client.get_object( Bucket=bucket, Key=key, Range=range_str)['Body'].read() else: return client.get_object(Bucket=bucket, Key=key)['Body'].read() get_object = patch_method( get_object, max_retries=max_retries, should_retry=s3_should_retry, ) for path, bytes_range in paths: bucket, key = parse_s3_url(path) if bytes_range: file_obj.write( get_object(self._client, bucket, key, bytes_range)) else: file_obj.write(get_object(self._client, bucket, key)), os.SEEK_SET) self.upload_part(part_num, file_obj) def upload_part_copy( self, part_num: int, path: PathLike, copy_source_range: Optional[str] = None) -> None: bucket, key = parse_s3_url(path) params = dict( UploadId=self._upload_id, PartNumber=part_num, CopySource={ 'Bucket': bucket, 'Key': key }, Bucket=self._bucket, Key=self._key, ) if copy_source_range: params['CopySourceRange'] = copy_source_range response = self._client.upload_part_copy(**params) self._multipart_upload_info.append( { 'PartNumber': part_num, 'ETag': response['CopyPartResult']['ETag'] }) def close(self): self._multipart_upload_info.sort(key=lambda t: t['PartNumber']) self._client.complete_multipart_upload( UploadId=self._upload_id, Bucket=self._bucket, Key=self._key, MultipartUpload={'Parts': self._multipart_upload_info}) def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.close()