megfile.s3 module

class megfile.s3.S3BufferedWriter(bucket: str, key: str, *, s3_client, block_size: int = 8388608, max_block_size: int = 134217728, max_buffer_size: int = 134217728, max_workers: Optional[int] = None, profile_name: Optional[str] = None)[source]

Bases: Writable

property mode: str
property name: str
tell() int[source]

Return current stream position.

write(data: bytes) int[source]

Write bytes to file.

Return the number of bytes written.

class megfile.s3.S3Cacher(path: str, cache_path: Optional[str] = None, mode: str = 'r')[source]

Bases: FileCacher

cache_path = None
class megfile.s3.S3LimitedSeekableWriter(bucket: str, key: str, *, s3_client, block_size: int = 8388608, head_block_size: Optional[int] = None, tail_block_size: Optional[int] = None, max_block_size: int = 134217728, max_buffer_size: int = 134217728, max_workers: Optional[int] = None, profile_name: Optional[str] = None)[source]

Bases: Seekable, S3BufferedWriter

For file format like msgpack and mp4, it’s a pain that you need to write header before writing the data. So it’s kind of hard to make streaming write to unseekable file system like s3. In this case, we will try to keep the first and last parts of data in memory, so we can come back to head again and write the header at the last second.

seek(offset: int, whence: int = 0) int[source]

Change stream position.

Seek to byte offset cookie relative to position indicated by whence:

0 Start of stream (the default). cookie should be >= 0; 1 Current position - cookie may be negative; 2 End of stream - cookie usually negative.

Return the new absolute position.

write(data: bytes) int[source]

Write bytes to file.

Return the number of bytes written.

class megfile.s3.S3PrefetchReader(bucket: str, key: str, *, s3_client, block_size: int = 8388608, block_capacity: int = 16, block_forward: Optional[int] = None, max_retries: int = 10, max_workers: Optional[int] = None, profile_name: Optional[str] = None)[source]

Bases: BasePrefetchReader

Reader to fast read the s3 content. This will divide the file content into equal parts of block_size size, and will use LRU to cache at most block_capacity blocks in memory. open(), seek() and read() will trigger prefetch read. The prefetch will cached block_forward blocks of data from offset position (the position after reading if the called function is read).

property name: str
class megfile.s3.S3ShareCacheReader(bucket: str, key: str, *, s3_client, block_size: int = 8388608, block_capacity: int = 16, block_forward: Optional[int] = None, max_retries: int = 10, cache_key: str = 'lru', max_workers: Optional[int] = None, profile_name: Optional[str] = None)[source]

Bases: S3PrefetchReader

Reader to fast read the s3 content. This will divide the file content into equal parts of block_size size, and will use LRU to cache at most block_capacity blocks in memory. open(), seek() and read() will trigger prefetch read. The prefetch will cached block_forward blocks of data from offset position (the position after reading if the called function is read).

megfile.s3.get_endpoint_url(profile_name: Optional[str] = None) str[source]

Get the endpoint url of S3

Returns:

S3 endpoint url

megfile.s3.get_s3_client(config: Optional[Config] = None, cache_key: Optional[str] = None, profile_name: Optional[str] = None)[source]

Get S3 client

Returns:

S3 client

megfile.s3.get_s3_session(profile_name=None) Session[source]

Get S3 session

Returns:

S3 session

megfile.s3.is_s3(path: Union[str, BasePath, PathLike]) bool[source]
  1. According to aws-cli , test if a path is s3 path.

  2. megfile also support the path like s3[+profile_name]://bucket/key

Parameters:

path – Path to be tested

Returns:

True if path is s3 path, else False

megfile.s3.parse_s3_url(s3_url: Union[str, BasePath, PathLike]) Tuple[str, str][source]
megfile.s3.s3_access(path: Union[str, BasePath, PathLike], mode: Access = Access.READ, followlinks: bool = False) bool[source]

Test if path has access permission described by mode

Parameters:
  • path – Given path

  • mode – access mode

Returns:

bool, if the bucket of s3_url has read/write access.

megfile.s3.s3_buffered_open(s3_url: Union[str, BasePath, PathLike], mode: str, followlinks: bool = False, *, max_concurrency: Optional[int] = None, max_buffer_size: int = 134217728, forward_ratio: Optional[float] = None, block_size: int = 8388608, limited_seekable: bool = False, buffered: bool = True, share_cache_key: Optional[str] = None, cache_path: Optional[str] = None) Union[S3PrefetchReader, S3BufferedWriter, BufferedReader, BufferedWriter, S3MemoryHandler][source]

Open an asynchronous prefetch reader, to support fast sequential read

Note

User should make sure that reader / writer are closed correctly

Supports context manager

Some parameter setting may perform well: max_concurrency=10 or 20, max_block_size=8 or 16 MB, default value None means using global thread pool

Parameters:
  • max_concurrency – Max download thread number, None by default

  • max_buffer_size – Max cached buffer size in memory, 128MB by default

  • block_size – Size of single block, 8MB by default. Each block will be uploaded or downloaded by single thread.

  • limited_seekable – If write-handle supports limited seek (both file head part and tail part can seek block_size). Notes: This parameter are valid only for write-handle. Read-handle support arbitrary seek

Returns:

An opened S3PrefetchReader object

Raises:

S3FileNotFoundError

megfile.s3.s3_cached_open(s3_url: Union[str, BasePath, PathLike], mode: str, followlinks: bool = False, *, cache_path: Optional[str] = None) S3CachedHandler[source]

Open a local-cache file reader / writer, for frequent random read / write

Note

User should make sure that reader / writer are closed correctly

Supports context manager

cache_path can specify the path of cache file. Performance could be better if cache file path is on ssd or tmpfs

Parameters:
  • mode – Mode to open file, could be one of “rb”, “wb” or “ab”

  • cache_path – cache file path

Returns:

An opened BufferedReader / BufferedWriter object

megfile.s3.s3_concat(src_paths: List[Union[str, BasePath, PathLike]], dst_path: Union[str, BasePath, PathLike], block_size: int = 8388608, max_workers: int = 32) None[source]

Concatenate s3 files to one file.

Parameters:
  • src_paths – Given source paths

  • dst_path – Given destination path

megfile.s3.s3_copy(src_url: Union[str, BasePath, PathLike], dst_url: Union[str, BasePath, PathLike], followlinks: bool = False, callback: Optional[Callable[[int], None]] = None) None[source]

File copy on S3 Copy content of file on src_path to dst_path. It’s caller’s responsebility to ensure the s3_isfile(src_url) == True

Parameters:
  • src_url – Given path

  • dst_path – Target file path

  • callback – Called periodically during copy, and the input parameter is the data size (in bytes) of copy since the last call

megfile.s3.s3_download(src_url: Union[str, BasePath, PathLike], dst_url: Union[str, BasePath, PathLike], followlinks: bool = False, callback: Optional[Callable[[int], None]] = None) None[source]

Downloads a file from s3 to local filesystem. :param src_url: source s3 path :param dst_url: target fs path :param callback: Called periodically during copy, and the input parameter is the data size (in bytes) of copy since the last call

megfile.s3.s3_exists(path: Union[str, BasePath, PathLike], followlinks: bool = False) bool[source]

Test if s3_url exists

If the bucket of s3_url are not permitted to read, return False

Parameters:

path – Given path

Returns:

True if s3_url eixsts, else False

megfile.s3.s3_getmd5(path: Union[str, BasePath, PathLike], recalculate: bool = False, followlinks: bool = False) str[source]

Get md5 meta info in files that uploaded/copied via megfile

If meta info is lost or non-existent, return None

Parameters:
  • path – Given path

  • recalculate – calculate md5 in real-time or return s3 etag

  • followlinks – If is True, calculate md5 for real file

Returns:

md5 meta info

megfile.s3.s3_getmtime(path: Union[str, BasePath, PathLike], follow_symlinks: bool = False) float[source]

Get last-modified time of the file on the given s3_url path (in Unix timestamp format). If the path is an existent directory, return the latest modified time of all file in it. The mtime of empty directory is 1970-01-01 00:00:00

If s3_url is not an existent path, which means s3_exist(s3_url) returns False, then raise S3FileNotFoundError

Parameters:

path – Given path

Returns:

Last-modified time

Raises:

S3FileNotFoundError, UnsupportedError

megfile.s3.s3_getsize(path: Union[str, BasePath, PathLike], follow_symlinks: bool = False) int[source]

Get file size on the given s3_url path (in bytes). If the path in a directory, return the sum of all file size in it, including file in subdirectories (if exist). The result excludes the size of directory itself. In other words, return 0 Byte on an empty directory path.

If s3_url is not an existent path, which means s3_exist(s3_url) returns False, then raise S3FileNotFoundError

Parameters:

path – Given path

Returns:

File size

Raises:

S3FileNotFoundError, UnsupportedError

megfile.s3.s3_glob(path: Union[str, BasePath, PathLike], recursive: bool = True, missing_ok: bool = True, followlinks: bool = False) List[str][source]

Return s3 path list in ascending alphabetical order, in which path matches glob pattern Notes: Only glob in bucket. If trying to match bucket with wildcard characters, raise UnsupportedError

Parameters:
  • recursive – If False, ** will not search directory recursively

  • missing_ok – If False and target path doesn’t match any file, raise FileNotFoundError

Raises:

UnsupportedError, when bucket part contains wildcard characters

Returns:

A list contains paths match s3_pathname

megfile.s3.s3_glob_stat(path: Union[str, BasePath, PathLike], recursive: bool = True, missing_ok: bool = True, followlinks: bool = False) Iterator[FileEntry][source]

Return a generator contains tuples of path and file stat, in ascending alphabetical order, in which path matches glob pattern Notes: Only glob in bucket. If trying to match bucket with wildcard characters, raise UnsupportedError

Parameters:
  • recursive – If False, ** will not search directory recursively

  • missing_ok – If False and target path doesn’t match any file, raise FileNotFoundError

Raises:

UnsupportedError, when bucket part contains wildcard characters

Returns:

A generator contains tuples of path and file stat, in which paths match s3_pathname

megfile.s3.s3_hasbucket(path: Union[str, BasePath, PathLike]) bool[source]

Test if the bucket of s3_url exists

Parameters:

path – Given path

Returns:

True if bucket of s3_url eixsts, else False

megfile.s3.s3_iglob(path: Union[str, BasePath, PathLike], recursive: bool = True, missing_ok: bool = True, followlinks: bool = False) Iterator[str][source]

Return s3 path iterator in ascending alphabetical order, in which path matches glob pattern Notes: Only glob in bucket. If trying to match bucket with wildcard characters, raise UnsupportedError

Parameters:
  • recursive – If False, ** will not search directory recursively

  • missing_ok – If False and target path doesn’t match any file, raise FileNotFoundError

Raises:

UnsupportedError, when bucket part contains wildcard characters

Returns:

An iterator contains paths match s3_pathname

megfile.s3.s3_isdir(path: Union[str, BasePath, PathLike], followlinks: bool = False) bool[source]

Test if an s3 url is directory Specific procedures are as follows: If there exists a suffix, of which os.path.join(s3_url, suffix) is a file If the url is empty bucket or s3://

Parameters:
  • path – Given path

  • followlinks – whether followlinks is True or False, result is the same. Because s3 symlink not support dir.

Returns:

True if path is s3 directory, else False

megfile.s3.s3_isfile(path: Union[str, BasePath, PathLike], followlinks: bool = False) bool[source]

Test if an s3_url is file

Parameters:

path – Given path

Returns:

True if path is s3 file, else False

Test whether a path is link

Parameters:

path – Given path

Returns:

True if a path is link, else False

Raises:

S3NotALinkError

megfile.s3.s3_listdir(path: Union[str, BasePath, PathLike], followlinks: bool = False) List[str][source]

Get all contents of given s3_url. The result is in acsending alphabetical order.

Parameters:

path – Given path

Returns:

All contents have prefix of s3_url in acsending alphabetical order

Raises:

S3FileNotFoundError, S3NotADirectoryError

megfile.s3.s3_load_content(s3_url, start: Optional[int] = None, stop: Optional[int] = None, followlinks: bool = False) bytes[source]

Get specified file from [start, stop) in bytes

Parameters:
  • s3_url – Specified path

  • start – start index

  • stop – stop index

Returns:

bytes content in range [start, stop)

megfile.s3.s3_load_from(path: Union[str, BasePath, PathLike], followlinks: bool = False) BinaryIO[source]

Read all content in binary on specified path and write into memory

User should close the BinaryIO manually

Parameters:

path – Given path

Returns:

BinaryIO

megfile.s3.s3_lstat(path: Union[str, BasePath, PathLike]) StatResult[source]

Like Path.stat() but, if the path points to a symbolic link, return the symbolic link’s information rather than its target’s.

megfile.s3.s3_makedirs(path: Union[str, BasePath, PathLike], exist_ok: bool = False)[source]

Create an s3 directory. Purely creating directory is invalid because it’s unavailable on OSS. This function is to test the target bucket have WRITE access.

Parameters:
  • path – Given path

  • exist_ok – If False and target directory exists, raise S3FileExistsError

Raises:

S3BucketNotFoundError, S3FileExistsError

megfile.s3.s3_memory_open(s3_url: Union[str, BasePath, PathLike], mode: str, followlinks: bool = False) S3MemoryHandler[source]

Open a memory-cache file reader / writer, for frequent random read / write

Note

User should make sure that reader / writer are closed correctly

Supports context manager

Parameters:

mode – Mode to open file, could be one of “rb”, “wb”, “ab”, “rb+”, “wb+” or “ab+”

Returns:

An opened BufferedReader / BufferedWriter object

megfile.s3.s3_move(src_url: Union[str, BasePath, PathLike], dst_url: Union[str, BasePath, PathLike]) None[source]

Move file/directory path from src_url to dst_url

Parameters:
  • src_url – Given path

  • dst_url – Given destination path

megfile.s3.s3_open(s3_url: Union[str, BasePath, PathLike], mode: str, followlinks: bool = False, *, max_concurrency: Optional[int] = None, max_buffer_size: int = 134217728, forward_ratio: Optional[float] = None, block_size: int = 8388608, limited_seekable: bool = False, buffered: bool = True, share_cache_key: Optional[str] = None, cache_path: Optional[str] = None) Union[S3PrefetchReader, S3BufferedWriter, BufferedReader, BufferedWriter, S3MemoryHandler]

Open an asynchronous prefetch reader, to support fast sequential read

Note

User should make sure that reader / writer are closed correctly

Supports context manager

Some parameter setting may perform well: max_concurrency=10 or 20, max_block_size=8 or 16 MB, default value None means using global thread pool

Parameters:
  • max_concurrency – Max download thread number, None by default

  • max_buffer_size – Max cached buffer size in memory, 128MB by default

  • block_size – Size of single block, 8MB by default. Each block will be uploaded or downloaded by single thread.

  • limited_seekable – If write-handle supports limited seek (both file head part and tail part can seek block_size). Notes: This parameter are valid only for write-handle. Read-handle support arbitrary seek

Returns:

An opened S3PrefetchReader object

Raises:

S3FileNotFoundError

megfile.s3.s3_path_join(path: Union[str, BasePath, PathLike], *other_paths: Union[str, BasePath, PathLike]) str[source]

Concat 2 or more path to a complete path

Parameters:
  • path – Given path

  • other_paths – Paths to be concatenated

Returns:

Concatenated complete path

Note

The difference between this function and os.path.join is that this function ignores left side slash (which indicates absolute path) in other_paths and will directly concat. e.g. os.path.join(‘/path’, ‘to’, ‘/file’) => ‘/file’, but s3_path_join(‘/path’, ‘to’, ‘/file’) => ‘/path/to/file’

megfile.s3.s3_pipe_open(s3_url: Union[str, BasePath, PathLike], mode: str, followlinks: bool = False, *, join_thread: bool = True) S3PipeHandler[source]

Open a asynchronous read-write reader / writer, to support fast sequential read / write

Note

User should make sure that reader / writer are closed correctly

Supports context manager

When join_thread is False, while the file handle are closing, this function will not wait until the asynchronous writing finishes; False doesn’t affect read-handle, but this can speed up write-handle because file will be written asynchronously. But asynchronous behaviour can guarantee the file are successfully written, and frequent execution may cause thread and file handle exhaustion

Parameters:
  • mode – Mode to open file, either “rb” or “wb”

  • join_thread – If wait after function execution until s3 finishes writing

Returns:

An opened BufferedReader / BufferedWriter object

megfile.s3.s3_prefetch_open(s3_url: Union[str, BasePath, PathLike], mode: str = 'rb', followlinks: bool = False, *, max_concurrency: Optional[int] = None, max_block_size: int = 8388608) S3PrefetchReader[source]

Open a asynchronous prefetch reader, to support fast sequential read and random read

Note

User should make sure that reader / writer are closed correctly

Supports context manager

Some parameter setting may perform well: max_concurrency=10 or 20, max_block_size=8 or 16 MB, default value None means using global thread pool

Parameters:
  • max_concurrency – Max download thread number, None by default

  • max_block_size – Max data size downloaded by each thread, in bytes, 8MB by default

Returns:

An opened S3PrefetchReader object

Raises:

S3FileNotFoundError

Return a string representing the path to which the symbolic link points.

Returns:

Return a string representing the path to which the symbolic link points.

Raises:

S3NameTooLongError, S3BucketNotFoundError, S3IsADirectoryError, S3NotALinkError

megfile.s3.s3_remove(path: Union[str, BasePath, PathLike], missing_ok: bool = False) None[source]

Remove the file or directory on s3, s3:// and s3://bucket are not permitted to remove

Parameters:
  • path – Given path

  • missing_ok – if False and target file/directory not exists, raise S3FileNotFoundError

Raises:

S3PermissionError, S3FileNotFoundError, UnsupportedError

megfile.s3.s3_rename(src_url: Union[str, BasePath, PathLike], dst_url: Union[str, BasePath, PathLike])[source]

Move s3 file path from src_url to dst_url

Parameters:

dst_url – Given destination path

megfile.s3.s3_save_as(file_object: BinaryIO, path: Union[str, BasePath, PathLike])[source]

Write the opened binary stream to specified path, but the stream won’t be closed

Parameters:
  • path – Given path

  • file_object – Stream to be read

megfile.s3.s3_scan(path: Union[str, BasePath, PathLike], missing_ok: bool = True, followlinks: bool = False) Iterator[str][source]

Iteratively traverse only files in given s3 directory, in alphabetical order. Every iteration on generator yields a path string.

If s3_url is a file path, yields the file only If s3_url is a non-existent path, return an empty generator If s3_url is a bucket path, return all file paths in the bucket If s3_url is an empty bucket, return an empty generator If s3_url doesn’t contain any bucket, which is s3_url == ‘s3://’, raise UnsupportedError. walk() on complete s3 is not supported in megfile

Parameters:
  • path – Given path

  • missing_ok – If False and there’s no file in the directory, raise FileNotFoundError

Raises:

UnsupportedError

Returns:

A file path generator

megfile.s3.s3_scan_stat(path: Union[str, BasePath, PathLike], missing_ok: bool = True, followlinks: bool = False) Iterator[FileEntry][source]

Iteratively traverse only files in given directory, in alphabetical order. Every iteration on generator yields a tuple of path string and file stat

Parameters:
  • path – Given path

  • missing_ok – If False and there’s no file in the directory, raise FileNotFoundError

Raises:

UnsupportedError

Returns:

A file path generator

megfile.s3.s3_scandir(path: Union[str, BasePath, PathLike], followlinks: bool = False) Iterator[FileEntry][source]

Get all contents of given s3_url, the order of result is not guaranteed.

Parameters:

path – Given path

Returns:

All contents have prefix of s3_url

Raises:

S3FileNotFoundError, S3NotADirectoryError

megfile.s3.s3_share_cache_open(s3_url: Union[str, BasePath, PathLike], mode: str = 'rb', followlinks: bool = False, *, cache_key: str = 'lru', max_concurrency: Optional[int] = None, max_block_size: int = 8388608) S3ShareCacheReader[source]

Open a asynchronous prefetch reader, to support fast sequential read and random read

Note

User should make sure that reader / writer are closed correctly

Supports context manager

Some parameter setting may perform well: max_concurrency=10 or 20, max_block_size=8 or 16 MB, default value None means using global thread pool

Parameters:
  • max_concurrency – Max download thread number, None by default

  • max_block_size – Max data size downloaded by each thread, in bytes, 8MB by default

Returns:

An opened S3ShareCacheReader object

Raises:

S3FileNotFoundError

megfile.s3.s3_stat(path: Union[str, BasePath, PathLike], follow_symlinks=True) StatResult[source]

Get StatResult of s3_url file, including file size and mtime, referring to s3_getsize and s3_getmtime

If s3_url is not an existent path, which means s3_exist(s3_url) returns False, then raise S3FileNotFoundError If attempt to get StatResult of complete s3, such as s3_dir_url == ‘s3://’, raise S3BucketNotFoundError

Parameters:

path – Given path

Returns:

StatResult

Raises:

S3FileNotFoundError, S3BucketNotFoundError

Create a symbolic link pointing to src_path named dst_path.

Parameters:
  • src_path – Given path

  • dst_path – Desination path

Raises:

S3NameTooLongError, S3BucketNotFoundError, S3IsADirectoryError

megfile.s3.s3_sync(src_url: Union[str, BasePath, PathLike], dst_url: Union[str, BasePath, PathLike], followlinks: bool = False, force: bool = False) None[source]

Copy file/directory on src_url to dst_url

Parameters:
  • src_url – Given path

  • dst_url – Given destination path

  • followlinks – False if regard symlink as file, else True

  • force – Sync file forcely, do not ignore same files

Remove the file on s3

Parameters:
  • path – Given path

  • missing_ok – if False and target file not exists, raise S3FileNotFoundError

Raises:

S3PermissionError, S3FileNotFoundError, S3IsADirectoryError

megfile.s3.s3_upload(src_url: Union[str, BasePath, PathLike], dst_url: Union[str, BasePath, PathLike], callback: Optional[Callable[[int], None]] = None, **kwargs) None[source]

Uploads a file from local filesystem to s3. :param src_url: source fs path :param dst_url: target s3 path :param callback: Called periodically during copy, and the input parameter is the data size (in bytes) of copy since the last call

megfile.s3.s3_walk(path: Union[str, BasePath, PathLike], followlinks: bool = False) Iterator[Tuple[str, List[str], List[str]]][source]

Iteratively traverse the given s3 directory, in top-bottom order. In other words, firstly traverse parent directory, if subdirectories exist, traverse the subdirectories in alphabetical order. Every iteration on generator yields a 3-tuple: (root, dirs, files)

  • root: Current s3 path;

  • dirs: Name list of subdirectories in current directory. The list is sorted by name in ascending alphabetical order;

  • files: Name list of files in current directory. The list is sorted by name in ascending alphabetical order;

If s3_url is a file path, return an empty generator If s3_url is a non-existent path, return an empty generator If s3_url is a bucket path, bucket will be the top directory, and will be returned at first iteration of generator If s3_url is an empty bucket, only yield one 3-tuple (notes: s3 doesn’t have empty directory) If s3_url doesn’t contain any bucket, which is s3_url == ‘s3://’, raise UnsupportedError. walk() on complete s3 is not supported in megfile

Parameters:
  • path – Given path

  • followlinks – whether followlinks is True or False, result is the same. Because s3 symlink not support dir.

Raises:

UnsupportedError

Returns:

A 3-tuple generator