megfile.s3 module
- class megfile.s3.S3BufferedWriter(bucket: str, key: str, *, s3_client, block_size: int = 8388608, max_block_size: int = 134217728, max_buffer_size: int = 134217728, max_workers: int | None = None, profile_name: str | None = None)[source]
Bases:
Writable
[bytes
]- property mode: str
- property name: str
- class megfile.s3.S3Cacher(path: str, cache_path: str | None = None, mode: str = 'r')[source]
Bases:
FileCacher
- cache_path = None
- class megfile.s3.S3LimitedSeekableWriter(bucket: str, key: str, *, s3_client, block_size: int = 8388608, head_block_size: int | None = None, tail_block_size: int | None = None, max_block_size: int = 134217728, max_buffer_size: int = 134217728, max_workers: int | None = None, profile_name: str | None = None)[source]
Bases:
S3BufferedWriter
,Seekable
For file format like msgpack and mp4, it’s a pain that you need to write header before writing the data. So it’s kind of hard to make streaming write to unseekable file system like s3. In this case, we will try to keep the first and last parts of data in memory, so we can come back to head again and write the header at the last second.
- seek(offset: int, whence: int = 0) int [source]
Change stream position.
- Seek to byte offset relative to position indicated by whence:
0 Start of stream (the default). offset should be >= 0; 1 Current position - offset may be negative; 2 End of stream - offset usually negative.
Return the new absolute position.
- class megfile.s3.S3PrefetchReader(bucket: str, key: str, *, s3_client, block_size: int = 8388608, block_capacity: int = 16, block_forward: int | None = None, max_retries: int = 10, max_workers: int | None = None, profile_name: str | None = None)[source]
Bases:
BasePrefetchReader
Reader to fast read the s3 content.
This will divide the file content into equalparts of block_size size, and will use LRU to cache at most block_capacity blocks in memory.
open(), seek() and read() will trigger prefetch read. The prefetch will cached block_forward blocks of data from offset position (the position after reading if the called function is read).
- property name: str
Bases:
S3PrefetchReader
Reader to fast read the s3 content.
This will divide the file content into equal parts of block_size size, and will use LRU to cache at most block_capacity blocks in memory.
open(), seek() and read() will trigger prefetch read. The prefetch will cached block_forward blocks of data from offset position (the position after reading if the called function is read).
- megfile.s3.get_endpoint_url(profile_name: str | None = None) str [source]
Get the endpoint url of S3
- Returns:
S3 endpoint url
- megfile.s3.get_s3_client(config: Config | None = None, cache_key: str | None = None, profile_name: str | None = None)[source]
Get S3 client
- Returns:
S3 client
- megfile.s3.is_s3(path: str | BasePath | PathLike) bool [source]
According to aws-cli , test if a path is s3 path.
megfile also support the path like s3[+profile_name]://bucket/key
- Parameters:
path – Path to be tested
- Returns:
True if path is s3 path, else False
- megfile.s3.s3_access(path: str | BasePath | PathLike, mode: Access = Access.READ, followlinks: bool = False) bool [source]
Test if path has access permission described by mode
- Parameters:
path – Given path
mode – access mode
- Returns:
bool, if the bucket of s3_url has read/write access.
- megfile.s3.s3_buffered_open(s3_url: str | BasePath | PathLike, mode: str, followlinks: bool = False, *, max_concurrency: int | None = None, max_buffer_size: int = 134217728, forward_ratio: float | None = None, block_size: int | None = None, limited_seekable: bool = False, buffered: bool = False, share_cache_key: str | None = None, cache_path: str | None = None, min_block_size: int | None = None, max_block_size: int = 134217728) IO [source]
Open an asynchronous prefetch reader, to support fast sequential read
Note
User should make sure that reader / writer are closed correctly
Supports context manager
Some parameter setting may perform well: max_concurrency=10 or 20, max_block_size=8 or 16 MB, default value None means using global thread pool
- Parameters:
max_concurrency – Max download thread number, None by default
max_buffer_size – Max cached buffer size in memory, 128MB by default
min_block_size – Min size of single block, default is same as block_size. Each block will be downloaded by single thread.
max_block_size – Max size of single block, 128MB by default. Each block will be downloaded by single thread.
block_size – Size of single block, 8MB by default. Each block will be uploaded by single thread.
limited_seekable – If write-handle supports limited seek (both file head part and tail part can seek block_size). Notes: This parameter are valid only for write-handle. Read-handle support arbitrary seek
- Returns:
An opened S3PrefetchReader object
- Raises:
S3FileNotFoundError
- megfile.s3.s3_cached_open(s3_url: str | BasePath | PathLike, mode: str, followlinks: bool = False, *, cache_path: str | None = None) S3CachedHandler [source]
Open a local-cache file reader / writer, for frequent random read / write
Note
User should make sure that reader / writer are closed correctly
Supports context manager
cache_path can specify the path of cache file. Performance could be better if cache file path is on ssd or tmpfs
- Parameters:
mode – Mode to open file, could be one of “rb”, “wb” or “ab”
cache_path – cache file path
- Returns:
An opened BufferedReader / BufferedWriter object
- megfile.s3.s3_concat(src_paths: List[str | BasePath | PathLike], dst_path: str | BasePath | PathLike, block_size: int = 8388608, max_workers: int = 32) None [source]
Concatenate s3 files to one file.
- Parameters:
src_paths – Given source paths
dst_path – Given destination path
- megfile.s3.s3_copy(src_url: str | BasePath | PathLike, dst_url: str | BasePath | PathLike, callback: Callable[[int], None] | None = None, followlinks: bool = False, overwrite: bool = True) None [source]
File copy on S3 Copy content of file on src_path to dst_path. It’s caller’s responsibility to ensure the s3_isfile(src_url) is True
- Parameters:
src_url – Given path
dst_path – Target file path
callback – Called periodically during copy, and the input parameter is the data size (in bytes) of copy since the last call
followlinks – False if regard symlink as file, else True
overwrite – whether or not overwrite file when exists, default is True
- megfile.s3.s3_download(src_url: str | BasePath | PathLike, dst_url: str | BasePath | PathLike, callback: Callable[[int], None] | None = None, followlinks: bool = False, overwrite: bool = True) None [source]
Downloads a file from s3 to local filesystem.
- Parameters:
src_url – source s3 path
dst_url – target fs path
callback – Called periodically during copy, and the input parameter is the data size (in bytes) of copy since the last call
followlinks – False if regard symlink as file, else True
overwrite – whether or not overwrite file when exists, default is True
- megfile.s3.s3_exists(path: str | BasePath | PathLike, followlinks: bool = False) bool [source]
Test if s3_url exists
If the bucket of s3_url are not permitted to read, return False
- Parameters:
path – Given path
- Returns:
True if s3_url exists, else False
- megfile.s3.s3_getmd5(path: str | BasePath | PathLike, recalculate: bool = False, followlinks: bool = False) str [source]
Get md5 meta info in files that uploaded/copied via megfile
If meta info is lost or non-existent, return None
- Parameters:
path – Given path
recalculate – calculate md5 in real-time or return s3 etag
followlinks – If is True, calculate md5 for real file
- Returns:
md5 meta info
- megfile.s3.s3_getmtime(path: str | BasePath | PathLike, follow_symlinks: bool = False) float [source]
Get last-modified time of the file on the given s3_url path (in Unix timestamp format).
If the path is an existent directory, return the latest modified time of all file in it. The mtime of empty directory is 1970-01-01 00:00:00
If s3_url is not an existent path, which means s3_exist(s3_url) returns False, then raise S3FileNotFoundError
- Parameters:
path – Given path
- Returns:
Last-modified time
- Raises:
S3FileNotFoundError, UnsupportedError
- megfile.s3.s3_getsize(path: str | BasePath | PathLike, follow_symlinks: bool = False) int [source]
Get file size on the given s3_url path (in bytes).
If the path in a directory, return the sum of all file size in it, including file in subdirectories (if exist).
The result excludes the size of directory itself. In other words, return 0 Byte on an empty directory path.
If s3_url is not an existent path, which means s3_exist(s3_url) returns False, then raise S3FileNotFoundError
- Parameters:
path – Given path
- Returns:
File size
- Raises:
S3FileNotFoundError, UnsupportedError
- megfile.s3.s3_glob(path: str | BasePath | PathLike, recursive: bool = True, missing_ok: bool = True, followlinks: bool = False) List[str] [source]
Return s3 path list in ascending alphabetical order, in which path matches glob pattern
Notes: Only glob in bucket. If trying to match bucket with wildcard characters, raise UnsupportedError
- Parameters:
recursive – If False, ** will not search directory recursively
missing_ok – If False and target path doesn’t match any file, raise FileNotFoundError
- Raises:
UnsupportedError, when bucket part contains wildcard characters
- Returns:
A list contains paths match s3_pathname
- megfile.s3.s3_glob_stat(path: str | BasePath | PathLike, recursive: bool = True, missing_ok: bool = True, followlinks: bool = False) Iterator[FileEntry] [source]
Return a generator contains tuples of path and file stat, in ascending alphabetical order, in which path matches glob pattern
Notes: Only glob in bucket. If trying to match bucket with wildcard characters, raise UnsupportedError
- Parameters:
recursive – If False, ** will not search directory recursively
missing_ok – If False and target path doesn’t match any file, raise FileNotFoundError
- Raises:
UnsupportedError, when bucket part contains wildcard characters
- Returns:
A generator contains tuples of path and file stat, in which paths match s3_pathname
- megfile.s3.s3_hasbucket(path: str | BasePath | PathLike) bool [source]
Test if the bucket of s3_url exists
- Parameters:
path – Given path
- Returns:
True if bucket of s3_url exists, else False
- megfile.s3.s3_iglob(path: str | BasePath | PathLike, recursive: bool = True, missing_ok: bool = True, followlinks: bool = False) Iterator[str] [source]
Return s3 path iterator in ascending alphabetical order, in which path matches glob pattern
Notes: Only glob in bucket. If trying to match bucket with wildcard characters, raise UnsupportedError
- Parameters:
recursive – If False, ** will not search directory recursively
missing_ok – If False and target path doesn’t match any file, raise FileNotFoundError
- Raises:
UnsupportedError, when bucket part contains wildcard characters
- Returns:
An iterator contains paths match s3_pathname
- megfile.s3.s3_isdir(path: str | BasePath | PathLike, followlinks: bool = False) bool [source]
Test if an s3 url is directory Specific procedures are as follows: If there exists a suffix, of which
os.path.join(s3_url, suffix)
is a file If the url is empty bucket or s3://- Parameters:
path – Given path
followlinks – whether followlinks is True or False, result is the same. Because s3 symlink not support dir.
- Returns:
True if path is s3 directory, else False
- megfile.s3.s3_isfile(path: str | BasePath | PathLike, followlinks: bool = False) bool [source]
Test if an s3_url is file
- Parameters:
path – Given path
- Returns:
True if path is s3 file, else False
- megfile.s3.s3_islink(path: str | BasePath | PathLike) bool [source]
Test whether a path is link
- Parameters:
path – Given path
- Returns:
True if a path is link, else False
- Raises:
S3NotALinkError
- megfile.s3.s3_listdir(path: str | BasePath | PathLike, followlinks: bool = False) List[str] [source]
Get all contents of given s3_url. The result is in ascending alphabetical order.
- Parameters:
path – Given path
- Returns:
All contents have prefix of s3_url in ascending alphabetical order
- Raises:
S3FileNotFoundError, S3NotADirectoryError
- megfile.s3.s3_load_content(s3_url, start: int | None = None, stop: int | None = None, followlinks: bool = False) bytes [source]
Get specified file from [start, stop) in bytes
- Parameters:
s3_url – Specified path
start – start index
stop – stop index
- Returns:
bytes content in range [start, stop)
- megfile.s3.s3_load_from(path: str | BasePath | PathLike, followlinks: bool = False) BinaryIO [source]
Read all content in binary on specified path and write into memory
User should close the BinaryIO manually
- Parameters:
path – Given path
- Returns:
BinaryIO
- megfile.s3.s3_lstat(path: str | BasePath | PathLike) StatResult [source]
Like Path.stat() but, if the path points to a symbolic link, return the symbolic link’s information rather than its target’s.
- megfile.s3.s3_makedirs(path: str | BasePath | PathLike, exist_ok: bool = False)[source]
Create an s3 directory. Purely creating directory is invalid because it’s unavailable on OSS. This function is to test the target bucket have WRITE access.
- Parameters:
path – Given path
exist_ok – If False and target directory exists, raise S3FileExistsError
- Raises:
S3BucketNotFoundError, S3FileExistsError
- megfile.s3.s3_memory_open(s3_url: str | BasePath | PathLike, mode: str, followlinks: bool = False) S3MemoryHandler [source]
Open a memory-cache file reader / writer, for frequent random read / write
Note
User should make sure that reader / writer are closed correctly
Supports context manager
- Parameters:
mode – Mode to open file, could be one of “rb”, “wb”, “ab”, “rb+”, “wb+” or “ab+”
- Returns:
An opened BufferedReader / BufferedWriter object
- megfile.s3.s3_move(src_url: str | BasePath | PathLike, dst_url: str | BasePath | PathLike, overwrite: bool = True) None [source]
Move file/directory path from src_url to dst_url
- Parameters:
src_url – Given path
dst_url – Given destination path
overwrite – whether or not overwrite file when exists
- megfile.s3.s3_open(s3_url: str | BasePath | PathLike, mode: str, followlinks: bool = False, *, max_concurrency: int | None = None, max_buffer_size: int = 134217728, forward_ratio: float | None = None, block_size: int | None = None, limited_seekable: bool = False, buffered: bool = False, share_cache_key: str | None = None, cache_path: str | None = None, min_block_size: int | None = None, max_block_size: int = 134217728) IO
Open an asynchronous prefetch reader, to support fast sequential read
Note
User should make sure that reader / writer are closed correctly
Supports context manager
Some parameter setting may perform well: max_concurrency=10 or 20, max_block_size=8 or 16 MB, default value None means using global thread pool
- Parameters:
max_concurrency – Max download thread number, None by default
max_buffer_size – Max cached buffer size in memory, 128MB by default
min_block_size – Min size of single block, default is same as block_size. Each block will be downloaded by single thread.
max_block_size – Max size of single block, 128MB by default. Each block will be downloaded by single thread.
block_size – Size of single block, 8MB by default. Each block will be uploaded by single thread.
limited_seekable – If write-handle supports limited seek (both file head part and tail part can seek block_size). Notes: This parameter are valid only for write-handle. Read-handle support arbitrary seek
- Returns:
An opened S3PrefetchReader object
- Raises:
S3FileNotFoundError
- megfile.s3.s3_path_join(path: str | BasePath | PathLike, *other_paths: str | BasePath | PathLike) str [source]
Concat 2 or more path to a complete path
- Parameters:
path – Given path
other_paths – Paths to be concatenated
- Returns:
Concatenated complete path
Note
The difference between this function and
os.path.join
is that this function ignores left side slash (which indicates absolute path) inother_paths
and will directly concat.e.g. os.path.join(‘/path’, ‘to’, ‘/file’) => ‘/file’, but s3_path_join(‘/path’, ‘to’, ‘/file’) => ‘/path/to/file’
- megfile.s3.s3_pipe_open(s3_url: str | BasePath | PathLike, mode: str, followlinks: bool = False, *, join_thread: bool = True) S3PipeHandler [source]
Open a asynchronous read-write reader / writer, to support fast sequential read / write
Note
User should make sure that reader / writer are closed correctly
Supports context manager
When join_thread is False, while the file handle are closing, this function will not wait until the asynchronous writing finishes;
False doesn’t affect read-handle, but this can speed up write-handle because file will be written asynchronously.
But asynchronous behavior can guarantee the file are successfully written, and frequent execution may cause thread and file handle exhaustion
- Parameters:
mode – Mode to open file, either “rb” or “wb”
join_thread – If wait after function execution until s3 finishes writing
- Returns:
An opened BufferedReader / BufferedWriter object
- megfile.s3.s3_prefetch_open(s3_url: str | BasePath | PathLike, mode: str = 'rb', followlinks: bool = False, *, max_concurrency: int | None = None, max_block_size: int = 8388608) S3PrefetchReader [source]
Open a asynchronous prefetch reader, to support fast sequential read and random read
Note
User should make sure that reader / writer are closed correctly
Supports context manager
Some parameter setting may perform well: max_concurrency=10 or 20, max_block_size=8 or 16 MB, default value None means using global thread pool
- Parameters:
max_concurrency – Max download thread number, None by default
max_block_size – Max data size downloaded by each thread, in bytes, 8MB by default
- Returns:
An opened S3PrefetchReader object
- Raises:
S3FileNotFoundError
- megfile.s3.s3_readlink(path) str [source]
Return a string representing the path to which the symbolic link points.
- Returns:
Return a string representing the path to which the symbolic link points.
- Raises:
S3NameTooLongError, S3BucketNotFoundError, S3IsADirectoryError, S3NotALinkError
- megfile.s3.s3_remove(path: str | BasePath | PathLike, missing_ok: bool = False) None [source]
Remove the file or directory on s3, s3:// and s3://bucket are not permitted to remove
- Parameters:
path – Given path
missing_ok – if False and target file/directory not exists, raise S3FileNotFoundError
- Raises:
S3PermissionError, S3FileNotFoundError, UnsupportedError
- megfile.s3.s3_rename(src_url: str | BasePath | PathLike, dst_url: str | BasePath | PathLike, overwrite: bool = True) None [source]
Move s3 file path from src_url to dst_url
- Parameters:
dst_url – Given destination path
overwrite – whether or not overwrite file when exists
- megfile.s3.s3_save_as(file_object: BinaryIO, path: str | BasePath | PathLike)[source]
Write the opened binary stream to specified path, but the stream won’t be closed
- Parameters:
path – Given path
file_object – Stream to be read
- megfile.s3.s3_scan(path: str | BasePath | PathLike, missing_ok: bool = True, followlinks: bool = False) Iterator[str] [source]
Iteratively traverse only files in given s3 directory, in alphabetical order. Every iteration on generator yields a path string.
If s3_url is a file path, yields the file only
If s3_url is a non-existent path, return an empty generator
If s3_url is a bucket path, return all file paths in the bucket
If s3_url is an empty bucket, return an empty generator
If s3_url doesn’t contain any bucket, which is s3_url == ‘s3://’, raise UnsupportedError. walk() on complete s3 is not supported in megfile
- Parameters:
path – Given path
missing_ok – If False and there’s no file in the directory, raise FileNotFoundError
- Raises:
UnsupportedError
- Returns:
A file path generator
- megfile.s3.s3_scan_stat(path: str | BasePath | PathLike, missing_ok: bool = True, followlinks: bool = False) Iterator[FileEntry] [source]
Iteratively traverse only files in given directory, in alphabetical order. Every iteration on generator yields a tuple of path string and file stat
- Parameters:
path – Given path
missing_ok – If False and there’s no file in the directory, raise FileNotFoundError
- Raises:
UnsupportedError
- Returns:
A file path generator
- megfile.s3.s3_scandir(path: str | BasePath | PathLike, followlinks: bool = False) Iterator[FileEntry] [source]
Get all contents of given s3_url, the order of result is not guaranteed.
- Parameters:
path – Given path
- Returns:
All contents have prefix of s3_url
- Raises:
S3FileNotFoundError, S3NotADirectoryError
Open a asynchronous prefetch reader, to support fast sequential read and random read
Note
User should make sure that reader / writer are closed correctly
Supports context manager
Some parameter setting may perform well: max_concurrency=10 or 20, max_block_size=8 or 16 MB, default value None means using global thread pool
- Parameters:
max_concurrency – Max download thread number, None by default
max_block_size – Max data size downloaded by each thread, in bytes, 8MB by default
- Returns:
An opened S3ShareCacheReader object
- Raises:
S3FileNotFoundError
- megfile.s3.s3_stat(path: str | BasePath | PathLike, follow_symlinks=True) StatResult [source]
Get StatResult of s3_url file, including file size and mtime, referring to s3_getsize and s3_getmtime
If s3_url is not an existent path, which means s3_exist(s3_url) returns False, then raise S3FileNotFoundError
If attempt to get StatResult of complete s3, such as s3_dir_url == ‘s3://’, raise S3BucketNotFoundError
- Parameters:
path – Given path
- Returns:
StatResult
- Raises:
S3FileNotFoundError, S3BucketNotFoundError
- megfile.s3.s3_symlink(src_path: str | BasePath | PathLike, dst_path: str | BasePath | PathLike) None [source]
Create a symbolic link pointing to src_path named dst_path.
- Parameters:
src_path – Given path
dst_path – Destination path
- Raises:
S3NameTooLongError, S3BucketNotFoundError, S3IsADirectoryError
- megfile.s3.s3_sync(src_url: str | BasePath | PathLike, dst_url: str | BasePath | PathLike, followlinks: bool = False, force: bool = False, overwrite: bool = True) None [source]
Copy file/directory on src_url to dst_url
- Parameters:
src_url – Given path
dst_url – Given destination path
followlinks – False if regard symlink as file, else True
force – Sync file forcible, do not ignore same files, priority is higher than ‘overwrite’, default is False
overwrite – whether or not overwrite file when exists, default is True
- megfile.s3.s3_unlink(path: str | BasePath | PathLike, missing_ok: bool = False) None [source]
Remove the file on s3
- Parameters:
path – Given path
missing_ok – if False and target file not exists, raise S3FileNotFoundError
- Raises:
S3PermissionError, S3FileNotFoundError, S3IsADirectoryError
- megfile.s3.s3_upload(src_url: str | BasePath | PathLike, dst_url: str | BasePath | PathLike, callback: Callable[[int], None] | None = None, followlinks: bool = False, overwrite: bool = True) None [source]
Uploads a file from local filesystem to s3.
- Parameters:
src_url – source fs path
dst_url – target s3 path
callback – Called periodically during copy, and the input parameter is the data size (in bytes) of copy since the last call
followlinks – False if regard symlink as file, else True
overwrite – whether or not overwrite file when exists, default is True
- megfile.s3.s3_walk(path: str | BasePath | PathLike, followlinks: bool = False) Iterator[Tuple[str, List[str], List[str]]] [source]
Iteratively traverse the given s3 directory, in top-bottom order. In other words, firstly traverse parent directory, if subdirectories exist, traverse the subdirectories in alphabetical order.
Every iteration on generator yields a 3-tuple: (root, dirs, files)
root: Current s3 path;
dirs: Name list of subdirectories in current directory. The list is sorted by name in ascending alphabetical order;
files: Name list of files in current directory. The list is sorted by name in ascending alphabetical order;
If s3_url is a file path, return an empty generator
If s3_url is a non-existent path, return an empty generator
If s3_url is a bucket path, bucket will be the top directory, and will be returned at first iteration of generator
If s3_url is an empty bucket, only yield one 3-tuple (notes: s3 doesn’t have empty directory)
If s3_url doesn’t contain any bucket, which is s3_url == ‘s3://’, raise UnsupportedError. walk() on complete s3 is not supported in megfile
- Parameters:
path – Given path
followlinks – whether followlinks is True or False, result is the same. Because s3 symlink not support dir.
- Raises:
UnsupportedError
- Returns:
A 3-tuple generator