[3.13] gh-135034: Normalize link targets in tarfile, add `os.path.realpath(strict='allow_missing')` (GH-135037) (GH-135064)

Addresses CVEs 2024-12718, 2025-4138, 2025-4330, and 2025-4517.
(cherry picked from commit 3612d8f517)

Co-authored-by: Łukasz Langa <lukasz@langa.pl>
Signed-off-by: Łukasz Langa <lukasz@langa.pl>
Co-authored-by: Petr Viktorin <encukou@gmail.com>
Co-authored-by: Seth Michael Larson <seth@python.org>
Co-authored-by: Adam Turner <9087854+AA-Turner@users.noreply.github.com>
Co-authored-by: Serhiy Storchaka <storchaka@gmail.com>
This commit is contained in:
T. Wouters 2025-06-03 15:59:54 +02:00 committed by GitHub
parent 9f3d99967c
commit aa9eb5f757
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 968 additions and 167 deletions

View File

@ -408,9 +408,26 @@ the :mod:`glob` module.)
system). On Windows, this function will also resolve MS-DOS (also called 8.3) system). On Windows, this function will also resolve MS-DOS (also called 8.3)
style names such as ``C:\\PROGRA~1`` to ``C:\\Program Files``. style names such as ``C:\\PROGRA~1`` to ``C:\\Program Files``.
If a path doesn't exist or a symlink loop is encountered, and *strict* is By default, the path is evaluated up to the first component that does not
``True``, :exc:`OSError` is raised. If *strict* is ``False`` these errors exist, is a symlink loop, or whose evaluation raises :exc:`OSError`.
are ignored, and so the result might be missing or otherwise inaccessible. All such components are appended unchanged to the existing part of the path.
Some errors that are handled this way include "access denied", "not a
directory", or "bad argument to internal function". Thus, the
resulting path may be missing or inaccessible, may still contain
links or loops, and may traverse non-directories.
This behavior can be modified by keyword arguments:
If *strict* is ``True``, the first error encountered when evaluating the path is
re-raised.
In particular, :exc:`FileNotFoundError` is raised if *path* does not exist,
or another :exc:`OSError` if it is otherwise inaccessible.
If *strict* is :py:data:`os.path.ALLOW_MISSING`, errors other than
:exc:`FileNotFoundError` are re-raised (as with ``strict=True``).
Thus, the returned path will not contain any symbolic links, but the named
file and some of its parent directories may be missing.
.. note:: .. note::
This function emulates the operating system's procedure for making a path This function emulates the operating system's procedure for making a path
@ -429,6 +446,15 @@ the :mod:`glob` module.)
.. versionchanged:: 3.10 .. versionchanged:: 3.10
The *strict* parameter was added. The *strict* parameter was added.
.. versionchanged:: next
The :py:data:`~os.path.ALLOW_MISSING` value for the *strict* parameter
was added.
.. data:: ALLOW_MISSING
Special value used for the *strict* argument in :func:`realpath`.
.. versionadded:: next
.. function:: relpath(path, start=os.curdir) .. function:: relpath(path, start=os.curdir)

View File

@ -249,6 +249,15 @@ The :mod:`tarfile` module defines the following exceptions:
Raised to refuse extracting a symbolic link pointing outside the destination Raised to refuse extracting a symbolic link pointing outside the destination
directory. directory.
.. exception:: LinkFallbackError
Raised to refuse emulating a link (hard or symbolic) by extracting another
archive member, when that member would be rejected by the filter location.
The exception that was raised to reject the replacement member is available
as :attr:`!BaseException.__context__`.
.. versionadded:: next
The following constants are available at the module level: The following constants are available at the module level:
@ -1052,6 +1061,12 @@ reused in custom filters:
Implements the ``'data'`` filter. Implements the ``'data'`` filter.
In addition to what ``tar_filter`` does: In addition to what ``tar_filter`` does:
- Normalize link targets (:attr:`TarInfo.linkname`) using
:func:`os.path.normpath`.
Note that this removes internal ``..`` components, which may change the
meaning of the link if the path in :attr:`!TarInfo.linkname` traverses
symbolic links.
- :ref:`Refuse <tarfile-extraction-refuse>` to extract links (hard or soft) - :ref:`Refuse <tarfile-extraction-refuse>` to extract links (hard or soft)
that link to absolute paths, or ones that link outside the destination. that link to absolute paths, or ones that link outside the destination.
@ -1080,6 +1095,10 @@ reused in custom filters:
Return the modified ``TarInfo`` member. Return the modified ``TarInfo`` member.
.. versionchanged:: next
Link targets are now normalized.
.. _tarfile-extraction-refuse: .. _tarfile-extraction-refuse:
@ -1106,6 +1125,7 @@ Here is an incomplete list of things to consider:
* Extract to a :func:`new temporary directory <tempfile.mkdtemp>` * Extract to a :func:`new temporary directory <tempfile.mkdtemp>`
to prevent e.g. exploiting pre-existing links, and to make it easier to to prevent e.g. exploiting pre-existing links, and to make it easier to
clean up after a failed extraction. clean up after a failed extraction.
* Disallow symbolic links if you do not need the functionality.
* When working with untrusted data, use external (e.g. OS-level) limits on * When working with untrusted data, use external (e.g. OS-level) limits on
disk, memory and CPU usage. disk, memory and CPU usage.
* Check filenames against an allow-list of characters * Check filenames against an allow-list of characters

View File

@ -2829,3 +2829,36 @@ sys
* The previously undocumented special function :func:`sys.getobjects`, * The previously undocumented special function :func:`sys.getobjects`,
which only exists in specialized builds of Python, may now return objects which only exists in specialized builds of Python, may now return objects
from other interpreters than the one it's called in. from other interpreters than the one it's called in.
Notable changes in 3.13.4
=========================
os.path
-------
* The *strict* parameter to :func:`os.path.realpath` accepts a new value,
:data:`os.path.ALLOW_MISSING`.
If used, errors other than :exc:`FileNotFoundError` will be re-raised;
the resulting path can be missing but it will be free of symlinks.
(Contributed by Petr Viktorin for :cve:`2025-4517`.)
tarfile
-------
* :func:`~tarfile.data_filter` now normalizes symbolic link targets in order to
avoid path traversal attacks.Add commentMore actions
(Contributed by Petr Viktorin in :gh:`127987` and :cve:`2025-4138`.)
* :func:`~tarfile.TarFile.extractall` now skips fixing up directory attributes
when a directory was removed or replaced by another kind of file.
(Contributed by Petr Viktorin in :gh:`127987` and :cve:`2024-12718`.)
* :func:`~tarfile.TarFile.extract` and :func:`~tarfile.TarFile.extractall`
now (re-)apply the extraction filter when substituting a link (hard or
symbolic) with a copy of another archive member, and when fixing up
directory attributes.
The former raises a new exception, :exc:`~tarfile.LinkFallbackError`.
(Contributed by Petr Viktorin for :cve:`2025-4330` and :cve:`2024-12718`.)
* :func:`~tarfile.TarFile.extract` and :func:`~tarfile.TarFile.extractall`
no longer extract rejected members when
:func:`~tarfile.TarFile.errorlevel` is zero.
(Contributed by Matt Prodani and Petr Viktorin in :gh:`112887`
and :cve:`2025-4435`.)

View File

@ -8,7 +8,7 @@
__all__ = ['commonprefix', 'exists', 'getatime', 'getctime', 'getmtime', __all__ = ['commonprefix', 'exists', 'getatime', 'getctime', 'getmtime',
'getsize', 'isdevdrive', 'isdir', 'isfile', 'isjunction', 'islink', 'getsize', 'isdevdrive', 'isdir', 'isfile', 'isjunction', 'islink',
'lexists', 'samefile', 'sameopenfile', 'samestat'] 'lexists', 'samefile', 'sameopenfile', 'samestat', 'ALLOW_MISSING']
# Does a path exist? # Does a path exist?
@ -189,3 +189,12 @@ def _check_arg_types(funcname, *args):
f'os.PathLike object, not {s.__class__.__name__!r}') from None f'os.PathLike object, not {s.__class__.__name__!r}') from None
if hasstr and hasbytes: if hasstr and hasbytes:
raise TypeError("Can't mix strings and bytes in path components") from None raise TypeError("Can't mix strings and bytes in path components") from None
# A singleton with a true boolean value.
@object.__new__
class ALLOW_MISSING:
"""Special value for use in realpath()."""
def __repr__(self):
return 'os.path.ALLOW_MISSING'
def __reduce__(self):
return self.__class__.__name__

View File

@ -29,7 +29,7 @@
"abspath","curdir","pardir","sep","pathsep","defpath","altsep", "abspath","curdir","pardir","sep","pathsep","defpath","altsep",
"extsep","devnull","realpath","supports_unicode_filenames","relpath", "extsep","devnull","realpath","supports_unicode_filenames","relpath",
"samefile", "sameopenfile", "samestat", "commonpath", "isjunction", "samefile", "sameopenfile", "samestat", "commonpath", "isjunction",
"isdevdrive"] "isdevdrive", "ALLOW_MISSING"]
def _get_bothseps(path): def _get_bothseps(path):
if isinstance(path, bytes): if isinstance(path, bytes):
@ -601,9 +601,10 @@ def abspath(path):
from nt import _findfirstfile, _getfinalpathname, readlink as _nt_readlink from nt import _findfirstfile, _getfinalpathname, readlink as _nt_readlink
except ImportError: except ImportError:
# realpath is a no-op on systems without _getfinalpathname support. # realpath is a no-op on systems without _getfinalpathname support.
realpath = abspath def realpath(path, *, strict=False):
return abspath(path)
else: else:
def _readlink_deep(path): def _readlink_deep(path, ignored_error=OSError):
# These error codes indicate that we should stop reading links and # These error codes indicate that we should stop reading links and
# return the path we currently have. # return the path we currently have.
# 1: ERROR_INVALID_FUNCTION # 1: ERROR_INVALID_FUNCTION
@ -636,7 +637,7 @@ def _readlink_deep(path):
path = old_path path = old_path
break break
path = normpath(join(dirname(old_path), path)) path = normpath(join(dirname(old_path), path))
except OSError as ex: except ignored_error as ex:
if ex.winerror in allowed_winerror: if ex.winerror in allowed_winerror:
break break
raise raise
@ -645,7 +646,7 @@ def _readlink_deep(path):
break break
return path return path
def _getfinalpathname_nonstrict(path): def _getfinalpathname_nonstrict(path, ignored_error=OSError):
# These error codes indicate that we should stop resolving the path # These error codes indicate that we should stop resolving the path
# and return the value we currently have. # and return the value we currently have.
# 1: ERROR_INVALID_FUNCTION # 1: ERROR_INVALID_FUNCTION
@ -673,17 +674,18 @@ def _getfinalpathname_nonstrict(path):
try: try:
path = _getfinalpathname(path) path = _getfinalpathname(path)
return join(path, tail) if tail else path return join(path, tail) if tail else path
except OSError as ex: except ignored_error as ex:
if ex.winerror not in allowed_winerror: if ex.winerror not in allowed_winerror:
raise raise
try: try:
# The OS could not resolve this path fully, so we attempt # The OS could not resolve this path fully, so we attempt
# to follow the link ourselves. If we succeed, join the tail # to follow the link ourselves. If we succeed, join the tail
# and return. # and return.
new_path = _readlink_deep(path) new_path = _readlink_deep(path,
ignored_error=ignored_error)
if new_path != path: if new_path != path:
return join(new_path, tail) if tail else new_path return join(new_path, tail) if tail else new_path
except OSError: except ignored_error:
# If we fail to readlink(), let's keep traversing # If we fail to readlink(), let's keep traversing
pass pass
# If we get these errors, try to get the real name of the file without accessing it. # If we get these errors, try to get the real name of the file without accessing it.
@ -691,7 +693,7 @@ def _getfinalpathname_nonstrict(path):
try: try:
name = _findfirstfile(path) name = _findfirstfile(path)
path, _ = split(path) path, _ = split(path)
except OSError: except ignored_error:
path, name = split(path) path, name = split(path)
else: else:
path, name = split(path) path, name = split(path)
@ -721,6 +723,15 @@ def realpath(path, *, strict=False):
if normcase(path) == devnull: if normcase(path) == devnull:
return '\\\\.\\NUL' return '\\\\.\\NUL'
had_prefix = path.startswith(prefix) had_prefix = path.startswith(prefix)
if strict is ALLOW_MISSING:
ignored_error = FileNotFoundError
strict = True
elif strict:
ignored_error = ()
else:
ignored_error = OSError
if not had_prefix and not isabs(path): if not had_prefix and not isabs(path):
path = join(cwd, path) path = join(cwd, path)
try: try:
@ -728,17 +739,16 @@ def realpath(path, *, strict=False):
initial_winerror = 0 initial_winerror = 0
except ValueError as ex: except ValueError as ex:
# gh-106242: Raised for embedded null characters # gh-106242: Raised for embedded null characters
# In strict mode, we convert into an OSError. # In strict modes, we convert into an OSError.
# Non-strict mode returns the path as-is, since we've already # Non-strict mode returns the path as-is, since we've already
# made it absolute. # made it absolute.
if strict: if strict:
raise OSError(str(ex)) from None raise OSError(str(ex)) from None
path = normpath(path) path = normpath(path)
except OSError as ex: except ignored_error as ex:
if strict:
raise
initial_winerror = ex.winerror initial_winerror = ex.winerror
path = _getfinalpathname_nonstrict(path) path = _getfinalpathname_nonstrict(path,
ignored_error=ignored_error)
# The path returned by _getfinalpathname will always start with \\?\ - # The path returned by _getfinalpathname will always start with \\?\ -
# strip off that prefix unless it was already provided on the original # strip off that prefix unless it was already provided on the original
# path. # path.

View File

@ -36,7 +36,7 @@
"samefile","sameopenfile","samestat", "samefile","sameopenfile","samestat",
"curdir","pardir","sep","pathsep","defpath","altsep","extsep", "curdir","pardir","sep","pathsep","defpath","altsep","extsep",
"devnull","realpath","supports_unicode_filenames","relpath", "devnull","realpath","supports_unicode_filenames","relpath",
"commonpath", "isjunction","isdevdrive"] "commonpath", "isjunction","isdevdrive","ALLOW_MISSING"]
def _get_sep(path): def _get_sep(path):
@ -402,6 +402,15 @@ def realpath(filename, *, strict=False):
curdir = '.' curdir = '.'
pardir = '..' pardir = '..'
getcwd = os.getcwd getcwd = os.getcwd
if strict is ALLOW_MISSING:
ignored_error = FileNotFoundError
strict = True
elif strict:
ignored_error = ()
else:
ignored_error = OSError
maxlinks = None
# The stack of unresolved path parts. When popped, a special value of None # The stack of unresolved path parts. When popped, a special value of None
# indicates that a symlink target has been resolved, and that the original # indicates that a symlink target has been resolved, and that the original
@ -462,25 +471,28 @@ def realpath(filename, *, strict=False):
path = newpath path = newpath
continue continue
target = os.readlink(newpath) target = os.readlink(newpath)
except OSError: except ignored_error:
if strict: pass
raise else:
path = newpath # Resolve the symbolic link
if target.startswith(sep):
# Symlink target is absolute; reset resolved path.
path = sep
if maxlinks is None:
# Mark this symlink as seen but not fully resolved.
seen[newpath] = None
# Push the symlink path onto the stack, and signal its specialness
# by also pushing None. When these entries are popped, we'll
# record the fully-resolved symlink target in the 'seen' mapping.
rest.append(newpath)
rest.append(None)
# Push the unresolved symlink target parts onto the stack.
target_parts = target.split(sep)[::-1]
rest.extend(target_parts)
part_count += len(target_parts)
continue continue
# Resolve the symbolic link # An error occurred and was ignored.
seen[newpath] = None # not resolved symlink path = newpath
if target.startswith(sep):
# Symlink target is absolute; reset resolved path.
path = sep
# Push the symlink path onto the stack, and signal its specialness by
# also pushing None. When these entries are popped, we'll record the
# fully-resolved symlink target in the 'seen' mapping.
rest.append(newpath)
rest.append(None)
# Push the unresolved symlink target parts onto the stack.
target_parts = target.split(sep)[::-1]
rest.extend(target_parts)
part_count += len(target_parts)
return path return path

View File

@ -68,7 +68,7 @@
"DEFAULT_FORMAT", "open","fully_trusted_filter", "data_filter", "DEFAULT_FORMAT", "open","fully_trusted_filter", "data_filter",
"tar_filter", "FilterError", "AbsoluteLinkError", "tar_filter", "FilterError", "AbsoluteLinkError",
"OutsideDestinationError", "SpecialFileError", "AbsolutePathError", "OutsideDestinationError", "SpecialFileError", "AbsolutePathError",
"LinkOutsideDestinationError"] "LinkOutsideDestinationError", "LinkFallbackError"]
#--------------------------------------------------------- #---------------------------------------------------------
@ -755,10 +755,22 @@ def __init__(self, tarinfo, path):
super().__init__(f'{tarinfo.name!r} would link to {path!r}, ' super().__init__(f'{tarinfo.name!r} would link to {path!r}, '
+ 'which is outside the destination') + 'which is outside the destination')
class LinkFallbackError(FilterError):
def __init__(self, tarinfo, path):
self.tarinfo = tarinfo
self._path = path
super().__init__(f'link {tarinfo.name!r} would be extracted as a '
+ f'copy of {path!r}, which was rejected')
# Errors caused by filters -- both "fatal" and "non-fatal" -- that
# we consider to be issues with the argument, rather than a bug in the
# filter function
_FILTER_ERRORS = (FilterError, OSError, ExtractError)
def _get_filtered_attrs(member, dest_path, for_data=True): def _get_filtered_attrs(member, dest_path, for_data=True):
new_attrs = {} new_attrs = {}
name = member.name name = member.name
dest_path = os.path.realpath(dest_path) dest_path = os.path.realpath(dest_path, strict=os.path.ALLOW_MISSING)
# Strip leading / (tar's directory separator) from filenames. # Strip leading / (tar's directory separator) from filenames.
# Include os.sep (target OS directory separator) as well. # Include os.sep (target OS directory separator) as well.
if name.startswith(('/', os.sep)): if name.startswith(('/', os.sep)):
@ -768,7 +780,8 @@ def _get_filtered_attrs(member, dest_path, for_data=True):
# For example, 'C:/foo' on Windows. # For example, 'C:/foo' on Windows.
raise AbsolutePathError(member) raise AbsolutePathError(member)
# Ensure we stay in the destination # Ensure we stay in the destination
target_path = os.path.realpath(os.path.join(dest_path, name)) target_path = os.path.realpath(os.path.join(dest_path, name),
strict=os.path.ALLOW_MISSING)
if os.path.commonpath([target_path, dest_path]) != dest_path: if os.path.commonpath([target_path, dest_path]) != dest_path:
raise OutsideDestinationError(member, target_path) raise OutsideDestinationError(member, target_path)
# Limit permissions (no high bits, and go-w) # Limit permissions (no high bits, and go-w)
@ -806,6 +819,9 @@ def _get_filtered_attrs(member, dest_path, for_data=True):
if member.islnk() or member.issym(): if member.islnk() or member.issym():
if os.path.isabs(member.linkname): if os.path.isabs(member.linkname):
raise AbsoluteLinkError(member) raise AbsoluteLinkError(member)
normalized = os.path.normpath(member.linkname)
if normalized != member.linkname:
new_attrs['linkname'] = normalized
if member.issym(): if member.issym():
target_path = os.path.join(dest_path, target_path = os.path.join(dest_path,
os.path.dirname(name), os.path.dirname(name),
@ -813,7 +829,8 @@ def _get_filtered_attrs(member, dest_path, for_data=True):
else: else:
target_path = os.path.join(dest_path, target_path = os.path.join(dest_path,
member.linkname) member.linkname)
target_path = os.path.realpath(target_path) target_path = os.path.realpath(target_path,
strict=os.path.ALLOW_MISSING)
if os.path.commonpath([target_path, dest_path]) != dest_path: if os.path.commonpath([target_path, dest_path]) != dest_path:
raise LinkOutsideDestinationError(member, target_path) raise LinkOutsideDestinationError(member, target_path)
return new_attrs return new_attrs
@ -2323,30 +2340,58 @@ def extractall(self, path=".", members=None, *, numeric_owner=False,
members = self members = self
for member in members: for member in members:
tarinfo = self._get_extract_tarinfo(member, filter_function, path) tarinfo, unfiltered = self._get_extract_tarinfo(
member, filter_function, path)
if tarinfo is None: if tarinfo is None:
continue continue
if tarinfo.isdir(): if tarinfo.isdir():
# For directories, delay setting attributes until later, # For directories, delay setting attributes until later,
# since permissions can interfere with extraction and # since permissions can interfere with extraction and
# extracting contents can reset mtime. # extracting contents can reset mtime.
directories.append(tarinfo) directories.append(unfiltered)
self._extract_one(tarinfo, path, set_attrs=not tarinfo.isdir(), self._extract_one(tarinfo, path, set_attrs=not tarinfo.isdir(),
numeric_owner=numeric_owner) numeric_owner=numeric_owner,
filter_function=filter_function)
# Reverse sort directories. # Reverse sort directories.
directories.sort(key=lambda a: a.name, reverse=True) directories.sort(key=lambda a: a.name, reverse=True)
# Set correct owner, mtime and filemode on directories. # Set correct owner, mtime and filemode on directories.
for tarinfo in directories: for unfiltered in directories:
dirpath = os.path.join(path, tarinfo.name)
try: try:
# Need to re-apply any filter, to take the *current* filesystem
# state into account.
try:
tarinfo = filter_function(unfiltered, path)
except _FILTER_ERRORS as exc:
self._log_no_directory_fixup(unfiltered, repr(exc))
continue
if tarinfo is None:
self._log_no_directory_fixup(unfiltered,
'excluded by filter')
continue
dirpath = os.path.join(path, tarinfo.name)
try:
lstat = os.lstat(dirpath)
except FileNotFoundError:
self._log_no_directory_fixup(tarinfo, 'missing')
continue
if not stat.S_ISDIR(lstat.st_mode):
# This is no longer a directory; presumably a later
# member overwrote the entry.
self._log_no_directory_fixup(tarinfo, 'not a directory')
continue
self.chown(tarinfo, dirpath, numeric_owner=numeric_owner) self.chown(tarinfo, dirpath, numeric_owner=numeric_owner)
self.utime(tarinfo, dirpath) self.utime(tarinfo, dirpath)
self.chmod(tarinfo, dirpath) self.chmod(tarinfo, dirpath)
except ExtractError as e: except ExtractError as e:
self._handle_nonfatal_error(e) self._handle_nonfatal_error(e)
def _log_no_directory_fixup(self, member, reason):
self._dbg(2, "tarfile: Not fixing up directory %r (%s)" %
(member.name, reason))
def extract(self, member, path="", set_attrs=True, *, numeric_owner=False, def extract(self, member, path="", set_attrs=True, *, numeric_owner=False,
filter=None): filter=None):
"""Extract a member from the archive to the current working directory, """Extract a member from the archive to the current working directory,
@ -2362,41 +2407,56 @@ def extract(self, member, path="", set_attrs=True, *, numeric_owner=False,
String names of common filters are accepted. String names of common filters are accepted.
""" """
filter_function = self._get_filter_function(filter) filter_function = self._get_filter_function(filter)
tarinfo = self._get_extract_tarinfo(member, filter_function, path) tarinfo, unfiltered = self._get_extract_tarinfo(
member, filter_function, path)
if tarinfo is not None: if tarinfo is not None:
self._extract_one(tarinfo, path, set_attrs, numeric_owner) self._extract_one(tarinfo, path, set_attrs, numeric_owner)
def _get_extract_tarinfo(self, member, filter_function, path): def _get_extract_tarinfo(self, member, filter_function, path):
"""Get filtered TarInfo (or None) from member, which might be a str""" """Get (filtered, unfiltered) TarInfos from *member*
if isinstance(member, str):
tarinfo = self.getmember(member)
else:
tarinfo = member
unfiltered = tarinfo *member* might be a string.
Return (None, None) if not found.
"""
if isinstance(member, str):
unfiltered = self.getmember(member)
else:
unfiltered = member
filtered = None
try: try:
tarinfo = filter_function(tarinfo, path) filtered = filter_function(unfiltered, path)
except (OSError, UnicodeEncodeError, FilterError) as e: except (OSError, UnicodeEncodeError, FilterError) as e:
self._handle_fatal_error(e) self._handle_fatal_error(e)
except ExtractError as e: except ExtractError as e:
self._handle_nonfatal_error(e) self._handle_nonfatal_error(e)
if tarinfo is None: if filtered is None:
self._dbg(2, "tarfile: Excluded %r" % unfiltered.name) self._dbg(2, "tarfile: Excluded %r" % unfiltered.name)
return None return None, None
# Prepare the link target for makelink().
if tarinfo.islnk():
tarinfo = copy.copy(tarinfo)
tarinfo._link_target = os.path.join(path, tarinfo.linkname)
return tarinfo
def _extract_one(self, tarinfo, path, set_attrs, numeric_owner): # Prepare the link target for makelink().
"""Extract from filtered tarinfo to disk""" if filtered.islnk():
filtered = copy.copy(filtered)
filtered._link_target = os.path.join(path, filtered.linkname)
return filtered, unfiltered
def _extract_one(self, tarinfo, path, set_attrs, numeric_owner,
filter_function=None):
"""Extract from filtered tarinfo to disk.
filter_function is only used when extracting a *different*
member (e.g. as fallback to creating a symlink)
"""
self._check("r") self._check("r")
try: try:
self._extract_member(tarinfo, os.path.join(path, tarinfo.name), self._extract_member(tarinfo, os.path.join(path, tarinfo.name),
set_attrs=set_attrs, set_attrs=set_attrs,
numeric_owner=numeric_owner) numeric_owner=numeric_owner,
filter_function=filter_function,
extraction_root=path)
except (OSError, UnicodeEncodeError) as e: except (OSError, UnicodeEncodeError) as e:
self._handle_fatal_error(e) self._handle_fatal_error(e)
except ExtractError as e: except ExtractError as e:
@ -2454,9 +2514,13 @@ def extractfile(self, member):
return None return None
def _extract_member(self, tarinfo, targetpath, set_attrs=True, def _extract_member(self, tarinfo, targetpath, set_attrs=True,
numeric_owner=False): numeric_owner=False, *, filter_function=None,
"""Extract the TarInfo object tarinfo to a physical extraction_root=None):
"""Extract the filtered TarInfo object tarinfo to a physical
file called targetpath. file called targetpath.
filter_function is only used when extracting a *different*
member (e.g. as fallback to creating a symlink)
""" """
# Fetch the TarInfo object for the given name # Fetch the TarInfo object for the given name
# and build the destination pathname, replacing # and build the destination pathname, replacing
@ -2485,7 +2549,10 @@ def _extract_member(self, tarinfo, targetpath, set_attrs=True,
elif tarinfo.ischr() or tarinfo.isblk(): elif tarinfo.ischr() or tarinfo.isblk():
self.makedev(tarinfo, targetpath) self.makedev(tarinfo, targetpath)
elif tarinfo.islnk() or tarinfo.issym(): elif tarinfo.islnk() or tarinfo.issym():
self.makelink(tarinfo, targetpath) self.makelink_with_filter(
tarinfo, targetpath,
filter_function=filter_function,
extraction_root=extraction_root)
elif tarinfo.type not in SUPPORTED_TYPES: elif tarinfo.type not in SUPPORTED_TYPES:
self.makeunknown(tarinfo, targetpath) self.makeunknown(tarinfo, targetpath)
else: else:
@ -2568,10 +2635,18 @@ def makedev(self, tarinfo, targetpath):
os.makedev(tarinfo.devmajor, tarinfo.devminor)) os.makedev(tarinfo.devmajor, tarinfo.devminor))
def makelink(self, tarinfo, targetpath): def makelink(self, tarinfo, targetpath):
return self.makelink_with_filter(tarinfo, targetpath, None, None)
def makelink_with_filter(self, tarinfo, targetpath,
filter_function, extraction_root):
"""Make a (symbolic) link called targetpath. If it cannot be created """Make a (symbolic) link called targetpath. If it cannot be created
(platform limitation), we try to make a copy of the referenced file (platform limitation), we try to make a copy of the referenced file
instead of a link. instead of a link.
filter_function is only used when extracting a *different*
member (e.g. as fallback to creating a link).
""" """
keyerror_to_extracterror = False
try: try:
# For systems that support symbolic and hard links. # For systems that support symbolic and hard links.
if tarinfo.issym(): if tarinfo.issym():
@ -2579,18 +2654,38 @@ def makelink(self, tarinfo, targetpath):
# Avoid FileExistsError on following os.symlink. # Avoid FileExistsError on following os.symlink.
os.unlink(targetpath) os.unlink(targetpath)
os.symlink(tarinfo.linkname, targetpath) os.symlink(tarinfo.linkname, targetpath)
return
else: else:
if os.path.exists(tarinfo._link_target): if os.path.exists(tarinfo._link_target):
os.link(tarinfo._link_target, targetpath) os.link(tarinfo._link_target, targetpath)
else: return
self._extract_member(self._find_link_target(tarinfo),
targetpath)
except symlink_exception: except symlink_exception:
keyerror_to_extracterror = True
try:
unfiltered = self._find_link_target(tarinfo)
except KeyError:
if keyerror_to_extracterror:
raise ExtractError(
"unable to resolve link inside archive") from None
else:
raise
if filter_function is None:
filtered = unfiltered
else:
if extraction_root is None:
raise ExtractError(
"makelink_with_filter: if filter_function is not None, "
+ "extraction_root must also not be None")
try: try:
self._extract_member(self._find_link_target(tarinfo), filtered = filter_function(unfiltered, extraction_root)
targetpath) except _FILTER_ERRORS as cause:
except KeyError: raise LinkFallbackError(tarinfo, unfiltered.name) from cause
raise ExtractError("unable to resolve link inside archive") from None if filtered is not None:
self._extract_member(filtered, targetpath,
filter_function=filter_function,
extraction_root=extraction_root)
def chown(self, tarinfo, targetpath, numeric_owner): def chown(self, tarinfo, targetpath, numeric_owner):
"""Set owner of targetpath according to tarinfo. If numeric_owner """Set owner of targetpath according to tarinfo. If numeric_owner

View File

@ -6,6 +6,7 @@
import sys import sys
import unittest import unittest
import warnings import warnings
from ntpath import ALLOW_MISSING
from test.support import cpython_only, os_helper from test.support import cpython_only, os_helper
from test.support import TestFailed, is_emscripten from test.support import TestFailed, is_emscripten
from test.support.os_helper import FakePath from test.support.os_helper import FakePath
@ -77,6 +78,27 @@ def tester(fn, wantResult):
%(str(fn), str(wantResult), repr(gotResult))) %(str(fn), str(wantResult), repr(gotResult)))
def _parameterize(*parameters):
"""Simplistic decorator to parametrize a test
Runs the decorated test multiple times in subTest, with a value from
'parameters' passed as an extra positional argument.
Calls doCleanups() after each run.
Not for general use. Intended to avoid indenting for easier backports.
See https://discuss.python.org/t/91827 for discussing generalizations.
"""
def _parametrize_decorator(func):
def _parameterized(self, *args, **kwargs):
for parameter in parameters:
with self.subTest(parameter):
func(self, *args, parameter, **kwargs)
self.doCleanups()
return _parameterized
return _parametrize_decorator
class NtpathTestCase(unittest.TestCase): class NtpathTestCase(unittest.TestCase):
def assertPathEqual(self, path1, path2): def assertPathEqual(self, path1, path2):
if path1 == path2 or _norm(path1) == _norm(path2): if path1 == path2 or _norm(path1) == _norm(path2):
@ -475,6 +497,27 @@ def test_realpath_curdir(self):
tester("ntpath.realpath('.\\.')", expected) tester("ntpath.realpath('.\\.')", expected)
tester("ntpath.realpath('\\'.join(['.'] * 100))", expected) tester("ntpath.realpath('\\'.join(['.'] * 100))", expected)
def test_realpath_curdir_strict(self):
expected = ntpath.normpath(os.getcwd())
tester("ntpath.realpath('.', strict=True)", expected)
tester("ntpath.realpath('./.', strict=True)", expected)
tester("ntpath.realpath('/'.join(['.'] * 100), strict=True)", expected)
tester("ntpath.realpath('.\\.', strict=True)", expected)
tester("ntpath.realpath('\\'.join(['.'] * 100), strict=True)", expected)
def test_realpath_curdir_missing_ok(self):
expected = ntpath.normpath(os.getcwd())
tester("ntpath.realpath('.', strict=ALLOW_MISSING)",
expected)
tester("ntpath.realpath('./.', strict=ALLOW_MISSING)",
expected)
tester("ntpath.realpath('/'.join(['.'] * 100), strict=ALLOW_MISSING)",
expected)
tester("ntpath.realpath('.\\.', strict=ALLOW_MISSING)",
expected)
tester("ntpath.realpath('\\'.join(['.'] * 100), strict=ALLOW_MISSING)",
expected)
def test_realpath_pardir(self): def test_realpath_pardir(self):
expected = ntpath.normpath(os.getcwd()) expected = ntpath.normpath(os.getcwd())
tester("ntpath.realpath('..')", ntpath.dirname(expected)) tester("ntpath.realpath('..')", ntpath.dirname(expected))
@ -487,24 +530,59 @@ def test_realpath_pardir(self):
tester("ntpath.realpath('\\'.join(['..'] * 50))", tester("ntpath.realpath('\\'.join(['..'] * 50))",
ntpath.splitdrive(expected)[0] + '\\') ntpath.splitdrive(expected)[0] + '\\')
def test_realpath_pardir_strict(self):
expected = ntpath.normpath(os.getcwd())
tester("ntpath.realpath('..', strict=True)", ntpath.dirname(expected))
tester("ntpath.realpath('../..', strict=True)",
ntpath.dirname(ntpath.dirname(expected)))
tester("ntpath.realpath('/'.join(['..'] * 50), strict=True)",
ntpath.splitdrive(expected)[0] + '\\')
tester("ntpath.realpath('..\\..', strict=True)",
ntpath.dirname(ntpath.dirname(expected)))
tester("ntpath.realpath('\\'.join(['..'] * 50), strict=True)",
ntpath.splitdrive(expected)[0] + '\\')
def test_realpath_pardir_missing_ok(self):
expected = ntpath.normpath(os.getcwd())
tester("ntpath.realpath('..', strict=ALLOW_MISSING)",
ntpath.dirname(expected))
tester("ntpath.realpath('../..', strict=ALLOW_MISSING)",
ntpath.dirname(ntpath.dirname(expected)))
tester("ntpath.realpath('/'.join(['..'] * 50), strict=ALLOW_MISSING)",
ntpath.splitdrive(expected)[0] + '\\')
tester("ntpath.realpath('..\\..', strict=ALLOW_MISSING)",
ntpath.dirname(ntpath.dirname(expected)))
tester("ntpath.realpath('\\'.join(['..'] * 50), strict=ALLOW_MISSING)",
ntpath.splitdrive(expected)[0] + '\\')
@os_helper.skip_unless_symlink @os_helper.skip_unless_symlink
@unittest.skipUnless(HAVE_GETFINALPATHNAME, 'need _getfinalpathname') @unittest.skipUnless(HAVE_GETFINALPATHNAME, 'need _getfinalpathname')
def test_realpath_basic(self): @_parameterize({}, {'strict': True}, {'strict': ALLOW_MISSING})
def test_realpath_basic(self, kwargs):
ABSTFN = ntpath.abspath(os_helper.TESTFN) ABSTFN = ntpath.abspath(os_helper.TESTFN)
open(ABSTFN, "wb").close() open(ABSTFN, "wb").close()
self.addCleanup(os_helper.unlink, ABSTFN) self.addCleanup(os_helper.unlink, ABSTFN)
self.addCleanup(os_helper.unlink, ABSTFN + "1") self.addCleanup(os_helper.unlink, ABSTFN + "1")
os.symlink(ABSTFN, ABSTFN + "1") os.symlink(ABSTFN, ABSTFN + "1")
self.assertPathEqual(ntpath.realpath(ABSTFN + "1"), ABSTFN) self.assertPathEqual(ntpath.realpath(ABSTFN + "1", **kwargs), ABSTFN)
self.assertPathEqual(ntpath.realpath(os.fsencode(ABSTFN + "1")), self.assertPathEqual(ntpath.realpath(os.fsencode(ABSTFN + "1"), **kwargs),
os.fsencode(ABSTFN)) os.fsencode(ABSTFN))
# gh-88013: call ntpath.realpath with binary drive name may raise a # gh-88013: call ntpath.realpath with binary drive name may raise a
# TypeError. The drive should not exist to reproduce the bug. # TypeError. The drive should not exist to reproduce the bug.
drives = {f"{c}:\\" for c in string.ascii_uppercase} - set(os.listdrives()) drives = {f"{c}:\\" for c in string.ascii_uppercase} - set(os.listdrives())
d = drives.pop().encode() d = drives.pop().encode()
self.assertEqual(ntpath.realpath(d), d) self.assertEqual(ntpath.realpath(d, strict=False), d)
# gh-106242: Embedded nulls and non-strict fallback to abspath
if kwargs:
with self.assertRaises(OSError):
ntpath.realpath(os_helper.TESTFN + "\0spam",
**kwargs)
else:
self.assertEqual(ABSTFN + "\0spam",
ntpath.realpath(os_helper.TESTFN + "\0spam", **kwargs))
@os_helper.skip_unless_symlink @os_helper.skip_unless_symlink
@unittest.skipUnless(HAVE_GETFINALPATHNAME, 'need _getfinalpathname') @unittest.skipUnless(HAVE_GETFINALPATHNAME, 'need _getfinalpathname')
@ -527,51 +605,66 @@ def test_realpath_invalid_paths(self):
self.assertEqual(realpath(path, strict=False), path) self.assertEqual(realpath(path, strict=False), path)
# gh-106242: Embedded nulls should raise OSError (not ValueError) # gh-106242: Embedded nulls should raise OSError (not ValueError)
self.assertRaises(OSError, realpath, path, strict=True) self.assertRaises(OSError, realpath, path, strict=True)
self.assertRaises(OSError, realpath, path, strict=ALLOW_MISSING)
path = ABSTFNb + b'\x00' path = ABSTFNb + b'\x00'
self.assertEqual(realpath(path, strict=False), path) self.assertEqual(realpath(path, strict=False), path)
self.assertRaises(OSError, realpath, path, strict=True) self.assertRaises(OSError, realpath, path, strict=True)
self.assertRaises(OSError, realpath, path, strict=ALLOW_MISSING)
path = ABSTFN + '\\nonexistent\\x\x00' path = ABSTFN + '\\nonexistent\\x\x00'
self.assertEqual(realpath(path, strict=False), path) self.assertEqual(realpath(path, strict=False), path)
self.assertRaises(OSError, realpath, path, strict=True) self.assertRaises(OSError, realpath, path, strict=True)
self.assertRaises(OSError, realpath, path, strict=ALLOW_MISSING)
path = ABSTFNb + b'\\nonexistent\\x\x00' path = ABSTFNb + b'\\nonexistent\\x\x00'
self.assertEqual(realpath(path, strict=False), path) self.assertEqual(realpath(path, strict=False), path)
self.assertRaises(OSError, realpath, path, strict=True) self.assertRaises(OSError, realpath, path, strict=True)
self.assertRaises(OSError, realpath, path, strict=ALLOW_MISSING)
path = ABSTFN + '\x00\\..' path = ABSTFN + '\x00\\..'
self.assertEqual(realpath(path, strict=False), os.getcwd()) self.assertEqual(realpath(path, strict=False), os.getcwd())
self.assertEqual(realpath(path, strict=True), os.getcwd()) self.assertEqual(realpath(path, strict=True), os.getcwd())
self.assertEqual(realpath(path, strict=ALLOW_MISSING), os.getcwd())
path = ABSTFNb + b'\x00\\..' path = ABSTFNb + b'\x00\\..'
self.assertEqual(realpath(path, strict=False), os.getcwdb()) self.assertEqual(realpath(path, strict=False), os.getcwdb())
self.assertEqual(realpath(path, strict=True), os.getcwdb()) self.assertEqual(realpath(path, strict=True), os.getcwdb())
self.assertEqual(realpath(path, strict=ALLOW_MISSING), os.getcwdb())
path = ABSTFN + '\\nonexistent\\x\x00\\..' path = ABSTFN + '\\nonexistent\\x\x00\\..'
self.assertEqual(realpath(path, strict=False), ABSTFN + '\\nonexistent') self.assertEqual(realpath(path, strict=False), ABSTFN + '\\nonexistent')
self.assertRaises(OSError, realpath, path, strict=True) self.assertRaises(OSError, realpath, path, strict=True)
self.assertEqual(realpath(path, strict=ALLOW_MISSING), ABSTFN + '\\nonexistent')
path = ABSTFNb + b'\\nonexistent\\x\x00\\..' path = ABSTFNb + b'\\nonexistent\\x\x00\\..'
self.assertEqual(realpath(path, strict=False), ABSTFNb + b'\\nonexistent') self.assertEqual(realpath(path, strict=False), ABSTFNb + b'\\nonexistent')
self.assertRaises(OSError, realpath, path, strict=True) self.assertRaises(OSError, realpath, path, strict=True)
self.assertEqual(realpath(path, strict=ALLOW_MISSING), ABSTFNb + b'\\nonexistent')
@unittest.skipUnless(HAVE_GETFINALPATHNAME, 'need _getfinalpathname')
@_parameterize({}, {'strict': True}, {'strict': ALLOW_MISSING})
def test_realpath_invalid_unicode_paths(self, kwargs):
realpath = ntpath.realpath
ABSTFN = ntpath.abspath(os_helper.TESTFN)
ABSTFNb = os.fsencode(ABSTFN)
path = ABSTFNb + b'\xff' path = ABSTFNb + b'\xff'
self.assertRaises(UnicodeDecodeError, realpath, path, strict=False) self.assertRaises(UnicodeDecodeError, realpath, path, **kwargs)
self.assertRaises(UnicodeDecodeError, realpath, path, strict=True) self.assertRaises(UnicodeDecodeError, realpath, path, **kwargs)
path = ABSTFNb + b'\\nonexistent\\\xff' path = ABSTFNb + b'\\nonexistent\\\xff'
self.assertRaises(UnicodeDecodeError, realpath, path, strict=False) self.assertRaises(UnicodeDecodeError, realpath, path, **kwargs)
self.assertRaises(UnicodeDecodeError, realpath, path, strict=True) self.assertRaises(UnicodeDecodeError, realpath, path, **kwargs)
path = ABSTFNb + b'\xff\\..' path = ABSTFNb + b'\xff\\..'
self.assertRaises(UnicodeDecodeError, realpath, path, strict=False) self.assertRaises(UnicodeDecodeError, realpath, path, **kwargs)
self.assertRaises(UnicodeDecodeError, realpath, path, strict=True) self.assertRaises(UnicodeDecodeError, realpath, path, **kwargs)
path = ABSTFNb + b'\\nonexistent\\\xff\\..' path = ABSTFNb + b'\\nonexistent\\\xff\\..'
self.assertRaises(UnicodeDecodeError, realpath, path, strict=False) self.assertRaises(UnicodeDecodeError, realpath, path, **kwargs)
self.assertRaises(UnicodeDecodeError, realpath, path, strict=True) self.assertRaises(UnicodeDecodeError, realpath, path, **kwargs)
@os_helper.skip_unless_symlink @os_helper.skip_unless_symlink
@unittest.skipUnless(HAVE_GETFINALPATHNAME, 'need _getfinalpathname') @unittest.skipUnless(HAVE_GETFINALPATHNAME, 'need _getfinalpathname')
def test_realpath_relative(self): @_parameterize({}, {'strict': True}, {'strict': ALLOW_MISSING})
def test_realpath_relative(self, kwargs):
ABSTFN = ntpath.abspath(os_helper.TESTFN) ABSTFN = ntpath.abspath(os_helper.TESTFN)
open(ABSTFN, "wb").close() open(ABSTFN, "wb").close()
self.addCleanup(os_helper.unlink, ABSTFN) self.addCleanup(os_helper.unlink, ABSTFN)
self.addCleanup(os_helper.unlink, ABSTFN + "1") self.addCleanup(os_helper.unlink, ABSTFN + "1")
os.symlink(ABSTFN, ntpath.relpath(ABSTFN + "1")) os.symlink(ABSTFN, ntpath.relpath(ABSTFN + "1"))
self.assertPathEqual(ntpath.realpath(ABSTFN + "1"), ABSTFN) self.assertPathEqual(ntpath.realpath(ABSTFN + "1", **kwargs), ABSTFN)
@os_helper.skip_unless_symlink @os_helper.skip_unless_symlink
@unittest.skipUnless(HAVE_GETFINALPATHNAME, 'need _getfinalpathname') @unittest.skipUnless(HAVE_GETFINALPATHNAME, 'need _getfinalpathname')
@ -723,7 +816,62 @@ def test_realpath_symlink_loops_strict(self):
@os_helper.skip_unless_symlink @os_helper.skip_unless_symlink
@unittest.skipUnless(HAVE_GETFINALPATHNAME, 'need _getfinalpathname') @unittest.skipUnless(HAVE_GETFINALPATHNAME, 'need _getfinalpathname')
def test_realpath_symlink_prefix(self): def test_realpath_symlink_loops_raise(self):
# Symlink loops raise OSError in ALLOW_MISSING mode
ABSTFN = ntpath.abspath(os_helper.TESTFN)
self.addCleanup(os_helper.unlink, ABSTFN)
self.addCleanup(os_helper.unlink, ABSTFN + "1")
self.addCleanup(os_helper.unlink, ABSTFN + "2")
self.addCleanup(os_helper.unlink, ABSTFN + "y")
self.addCleanup(os_helper.unlink, ABSTFN + "c")
self.addCleanup(os_helper.unlink, ABSTFN + "a")
self.addCleanup(os_helper.unlink, ABSTFN + "x")
os.symlink(ABSTFN, ABSTFN)
self.assertRaises(OSError, ntpath.realpath, ABSTFN, strict=ALLOW_MISSING)
os.symlink(ABSTFN + "1", ABSTFN + "2")
os.symlink(ABSTFN + "2", ABSTFN + "1")
self.assertRaises(OSError, ntpath.realpath, ABSTFN + "1",
strict=ALLOW_MISSING)
self.assertRaises(OSError, ntpath.realpath, ABSTFN + "2",
strict=ALLOW_MISSING)
self.assertRaises(OSError, ntpath.realpath, ABSTFN + "1\\x",
strict=ALLOW_MISSING)
# Windows eliminates '..' components before resolving links;
# realpath is not expected to raise if this removes the loop.
self.assertPathEqual(ntpath.realpath(ABSTFN + "1\\.."),
ntpath.dirname(ABSTFN))
self.assertPathEqual(ntpath.realpath(ABSTFN + "1\\..\\x"),
ntpath.dirname(ABSTFN) + "\\x")
os.symlink(ABSTFN + "x", ABSTFN + "y")
self.assertPathEqual(ntpath.realpath(ABSTFN + "1\\..\\"
+ ntpath.basename(ABSTFN) + "y"),
ABSTFN + "x")
self.assertRaises(
OSError, ntpath.realpath,
ABSTFN + "1\\..\\" + ntpath.basename(ABSTFN) + "1",
strict=ALLOW_MISSING)
os.symlink(ntpath.basename(ABSTFN) + "a\\b", ABSTFN + "a")
self.assertRaises(OSError, ntpath.realpath, ABSTFN + "a",
strict=ALLOW_MISSING)
os.symlink("..\\" + ntpath.basename(ntpath.dirname(ABSTFN))
+ "\\" + ntpath.basename(ABSTFN) + "c", ABSTFN + "c")
self.assertRaises(OSError, ntpath.realpath, ABSTFN + "c",
strict=ALLOW_MISSING)
# Test using relative path as well.
self.assertRaises(OSError, ntpath.realpath, ntpath.basename(ABSTFN),
strict=ALLOW_MISSING)
@os_helper.skip_unless_symlink
@unittest.skipUnless(HAVE_GETFINALPATHNAME, 'need _getfinalpathname')
@_parameterize({}, {'strict': True}, {'strict': ALLOW_MISSING})
def test_realpath_symlink_prefix(self, kwargs):
ABSTFN = ntpath.abspath(os_helper.TESTFN) ABSTFN = ntpath.abspath(os_helper.TESTFN)
self.addCleanup(os_helper.unlink, ABSTFN + "3") self.addCleanup(os_helper.unlink, ABSTFN + "3")
self.addCleanup(os_helper.unlink, "\\\\?\\" + ABSTFN + "3.") self.addCleanup(os_helper.unlink, "\\\\?\\" + ABSTFN + "3.")
@ -738,9 +886,9 @@ def test_realpath_symlink_prefix(self):
f.write(b'1') f.write(b'1')
os.symlink("\\\\?\\" + ABSTFN + "3.", ABSTFN + "3.link") os.symlink("\\\\?\\" + ABSTFN + "3.", ABSTFN + "3.link")
self.assertPathEqual(ntpath.realpath(ABSTFN + "3link"), self.assertPathEqual(ntpath.realpath(ABSTFN + "3link", **kwargs),
ABSTFN + "3") ABSTFN + "3")
self.assertPathEqual(ntpath.realpath(ABSTFN + "3.link"), self.assertPathEqual(ntpath.realpath(ABSTFN + "3.link", **kwargs),
"\\\\?\\" + ABSTFN + "3.") "\\\\?\\" + ABSTFN + "3.")
# Resolved paths should be usable to open target files # Resolved paths should be usable to open target files
@ -750,14 +898,17 @@ def test_realpath_symlink_prefix(self):
self.assertEqual(f.read(), b'1') self.assertEqual(f.read(), b'1')
# When the prefix is included, it is not stripped # When the prefix is included, it is not stripped
self.assertPathEqual(ntpath.realpath("\\\\?\\" + ABSTFN + "3link"), self.assertPathEqual(ntpath.realpath("\\\\?\\" + ABSTFN + "3link", **kwargs),
"\\\\?\\" + ABSTFN + "3") "\\\\?\\" + ABSTFN + "3")
self.assertPathEqual(ntpath.realpath("\\\\?\\" + ABSTFN + "3.link"), self.assertPathEqual(ntpath.realpath("\\\\?\\" + ABSTFN + "3.link", **kwargs),
"\\\\?\\" + ABSTFN + "3.") "\\\\?\\" + ABSTFN + "3.")
@unittest.skipUnless(HAVE_GETFINALPATHNAME, 'need _getfinalpathname') @unittest.skipUnless(HAVE_GETFINALPATHNAME, 'need _getfinalpathname')
def test_realpath_nul(self): def test_realpath_nul(self):
tester("ntpath.realpath('NUL')", r'\\.\NUL') tester("ntpath.realpath('NUL')", r'\\.\NUL')
tester("ntpath.realpath('NUL', strict=False)", r'\\.\NUL')
tester("ntpath.realpath('NUL', strict=True)", r'\\.\NUL')
tester("ntpath.realpath('NUL', strict=ALLOW_MISSING)", r'\\.\NUL')
@unittest.skipUnless(HAVE_GETFINALPATHNAME, 'need _getfinalpathname') @unittest.skipUnless(HAVE_GETFINALPATHNAME, 'need _getfinalpathname')
@unittest.skipUnless(HAVE_GETSHORTPATHNAME, 'need _getshortpathname') @unittest.skipUnless(HAVE_GETSHORTPATHNAME, 'need _getshortpathname')
@ -781,12 +932,20 @@ def test_realpath_cwd(self):
self.assertPathEqual(test_file_long, ntpath.realpath(test_file_short)) self.assertPathEqual(test_file_long, ntpath.realpath(test_file_short))
with os_helper.change_cwd(test_dir_long): for kwargs in {}, {'strict': True}, {'strict': ALLOW_MISSING}:
self.assertPathEqual(test_file_long, ntpath.realpath("file.txt")) with self.subTest(**kwargs):
with os_helper.change_cwd(test_dir_long.lower()): with os_helper.change_cwd(test_dir_long):
self.assertPathEqual(test_file_long, ntpath.realpath("file.txt")) self.assertPathEqual(
with os_helper.change_cwd(test_dir_short): test_file_long,
self.assertPathEqual(test_file_long, ntpath.realpath("file.txt")) ntpath.realpath("file.txt", **kwargs))
with os_helper.change_cwd(test_dir_long.lower()):
self.assertPathEqual(
test_file_long,
ntpath.realpath("file.txt", **kwargs))
with os_helper.change_cwd(test_dir_short):
self.assertPathEqual(
test_file_long,
ntpath.realpath("file.txt", **kwargs))
@unittest.skipUnless(HAVE_GETFINALPATHNAME, 'need _getfinalpathname') @unittest.skipUnless(HAVE_GETFINALPATHNAME, 'need _getfinalpathname')
def test_realpath_permission(self): def test_realpath_permission(self):
@ -807,12 +966,15 @@ def test_realpath_permission(self):
# Automatic generation of short names may be disabled on # Automatic generation of short names may be disabled on
# NTFS volumes for the sake of performance. # NTFS volumes for the sake of performance.
# They're not supported at all on ReFS and exFAT. # They're not supported at all on ReFS and exFAT.
subprocess.run( p = subprocess.run(
# Try to set the short name manually. # Try to set the short name manually.
['fsutil.exe', 'file', 'setShortName', test_file, 'LONGFI~1.TXT'], ['fsutil.exe', 'file', 'setShortName', test_file, 'LONGFI~1.TXT'],
creationflags=subprocess.DETACHED_PROCESS creationflags=subprocess.DETACHED_PROCESS
) )
if p.returncode:
raise unittest.SkipTest('failed to set short name')
try: try:
self.assertPathEqual(test_file, ntpath.realpath(test_file_short)) self.assertPathEqual(test_file, ntpath.realpath(test_file_short))
except AssertionError: except AssertionError:

View File

@ -4,7 +4,8 @@
import random import random
import sys import sys
import unittest import unittest
from posixpath import realpath, abspath, dirname, basename from functools import partial
from posixpath import realpath, abspath, dirname, basename, ALLOW_MISSING
from test import support from test import support
from test import test_genericpath from test import test_genericpath
from test.support import import_helper from test.support import import_helper
@ -33,6 +34,27 @@ def skip_if_ABSTFN_contains_backslash(test):
msg = "ABSTFN is not a posix path - tests fail" msg = "ABSTFN is not a posix path - tests fail"
return [test, unittest.skip(msg)(test)][found_backslash] return [test, unittest.skip(msg)(test)][found_backslash]
def _parameterize(*parameters):
"""Simplistic decorator to parametrize a test
Runs the decorated test multiple times in subTest, with a value from
'parameters' passed as an extra positional argument.
Does *not* call doCleanups() after each run.
Not for general use. Intended to avoid indenting for easier backports.
See https://discuss.python.org/t/91827 for discussing generalizations.
"""
def _parametrize_decorator(func):
def _parameterized(self, *args, **kwargs):
for parameter in parameters:
with self.subTest(parameter):
func(self, *args, parameter, **kwargs)
return _parameterized
return _parametrize_decorator
class PosixPathTest(unittest.TestCase): class PosixPathTest(unittest.TestCase):
def setUp(self): def setUp(self):
@ -442,32 +464,35 @@ def test_normpath(self):
self.assertEqual(result, expected) self.assertEqual(result, expected)
@skip_if_ABSTFN_contains_backslash @skip_if_ABSTFN_contains_backslash
def test_realpath_curdir(self): @_parameterize({}, {'strict': True}, {'strict': ALLOW_MISSING})
self.assertEqual(realpath('.'), os.getcwd()) def test_realpath_curdir(self, kwargs):
self.assertEqual(realpath('./.'), os.getcwd()) self.assertEqual(realpath('.', **kwargs), os.getcwd())
self.assertEqual(realpath('/'.join(['.'] * 100)), os.getcwd()) self.assertEqual(realpath('./.', **kwargs), os.getcwd())
self.assertEqual(realpath('/'.join(['.'] * 100), **kwargs), os.getcwd())
self.assertEqual(realpath(b'.'), os.getcwdb()) self.assertEqual(realpath(b'.', **kwargs), os.getcwdb())
self.assertEqual(realpath(b'./.'), os.getcwdb()) self.assertEqual(realpath(b'./.', **kwargs), os.getcwdb())
self.assertEqual(realpath(b'/'.join([b'.'] * 100)), os.getcwdb()) self.assertEqual(realpath(b'/'.join([b'.'] * 100), **kwargs), os.getcwdb())
@skip_if_ABSTFN_contains_backslash @skip_if_ABSTFN_contains_backslash
def test_realpath_pardir(self): @_parameterize({}, {'strict': True}, {'strict': ALLOW_MISSING})
self.assertEqual(realpath('..'), dirname(os.getcwd())) def test_realpath_pardir(self, kwargs):
self.assertEqual(realpath('../..'), dirname(dirname(os.getcwd()))) self.assertEqual(realpath('..', **kwargs), dirname(os.getcwd()))
self.assertEqual(realpath('/'.join(['..'] * 100)), '/') self.assertEqual(realpath('../..', **kwargs), dirname(dirname(os.getcwd())))
self.assertEqual(realpath('/'.join(['..'] * 100), **kwargs), '/')
self.assertEqual(realpath(b'..'), dirname(os.getcwdb())) self.assertEqual(realpath(b'..', **kwargs), dirname(os.getcwdb()))
self.assertEqual(realpath(b'../..'), dirname(dirname(os.getcwdb()))) self.assertEqual(realpath(b'../..', **kwargs), dirname(dirname(os.getcwdb())))
self.assertEqual(realpath(b'/'.join([b'..'] * 100)), b'/') self.assertEqual(realpath(b'/'.join([b'..'] * 100), **kwargs), b'/')
@os_helper.skip_unless_symlink @os_helper.skip_unless_symlink
@skip_if_ABSTFN_contains_backslash @skip_if_ABSTFN_contains_backslash
def test_realpath_basic(self): @_parameterize({}, {'strict': ALLOW_MISSING})
def test_realpath_basic(self, kwargs):
# Basic operation. # Basic operation.
try: try:
os.symlink(ABSTFN+"1", ABSTFN) os.symlink(ABSTFN+"1", ABSTFN)
self.assertEqual(realpath(ABSTFN), ABSTFN+"1") self.assertEqual(realpath(ABSTFN, **kwargs), ABSTFN+"1")
finally: finally:
os_helper.unlink(ABSTFN) os_helper.unlink(ABSTFN)
@ -487,90 +512,115 @@ def test_realpath_invalid_paths(self):
path = '/\x00' path = '/\x00'
self.assertRaises(ValueError, realpath, path, strict=False) self.assertRaises(ValueError, realpath, path, strict=False)
self.assertRaises(ValueError, realpath, path, strict=True) self.assertRaises(ValueError, realpath, path, strict=True)
self.assertRaises(ValueError, realpath, path, strict=ALLOW_MISSING)
path = b'/\x00' path = b'/\x00'
self.assertRaises(ValueError, realpath, path, strict=False) self.assertRaises(ValueError, realpath, path, strict=False)
self.assertRaises(ValueError, realpath, path, strict=True) self.assertRaises(ValueError, realpath, path, strict=True)
self.assertRaises(ValueError, realpath, path, strict=ALLOW_MISSING)
path = '/nonexistent/x\x00' path = '/nonexistent/x\x00'
self.assertRaises(ValueError, realpath, path, strict=False) self.assertRaises(ValueError, realpath, path, strict=False)
self.assertRaises(FileNotFoundError, realpath, path, strict=True) self.assertRaises(FileNotFoundError, realpath, path, strict=True)
self.assertRaises(ValueError, realpath, path, strict=ALLOW_MISSING)
path = b'/nonexistent/x\x00' path = b'/nonexistent/x\x00'
self.assertRaises(ValueError, realpath, path, strict=False) self.assertRaises(ValueError, realpath, path, strict=False)
self.assertRaises(FileNotFoundError, realpath, path, strict=True) self.assertRaises(FileNotFoundError, realpath, path, strict=True)
self.assertRaises(ValueError, realpath, path, strict=ALLOW_MISSING)
path = '/\x00/..' path = '/\x00/..'
self.assertRaises(ValueError, realpath, path, strict=False) self.assertRaises(ValueError, realpath, path, strict=False)
self.assertRaises(ValueError, realpath, path, strict=True) self.assertRaises(ValueError, realpath, path, strict=True)
self.assertRaises(ValueError, realpath, path, strict=ALLOW_MISSING)
path = b'/\x00/..' path = b'/\x00/..'
self.assertRaises(ValueError, realpath, path, strict=False) self.assertRaises(ValueError, realpath, path, strict=False)
self.assertRaises(ValueError, realpath, path, strict=True) self.assertRaises(ValueError, realpath, path, strict=True)
self.assertRaises(ValueError, realpath, path, strict=ALLOW_MISSING)
path = '/nonexistent/x\x00/..' path = '/nonexistent/x\x00/..'
self.assertRaises(ValueError, realpath, path, strict=False) self.assertRaises(ValueError, realpath, path, strict=False)
self.assertRaises(FileNotFoundError, realpath, path, strict=True) self.assertRaises(FileNotFoundError, realpath, path, strict=True)
self.assertRaises(ValueError, realpath, path, strict=ALLOW_MISSING)
path = b'/nonexistent/x\x00/..' path = b'/nonexistent/x\x00/..'
self.assertRaises(ValueError, realpath, path, strict=False) self.assertRaises(ValueError, realpath, path, strict=False)
self.assertRaises(FileNotFoundError, realpath, path, strict=True) self.assertRaises(FileNotFoundError, realpath, path, strict=True)
self.assertRaises(ValueError, realpath, path, strict=ALLOW_MISSING)
path = '/\udfff' path = '/\udfff'
if sys.platform == 'win32': if sys.platform == 'win32':
self.assertEqual(realpath(path, strict=False), path) self.assertEqual(realpath(path, strict=False), path)
self.assertRaises(FileNotFoundError, realpath, path, strict=True) self.assertRaises(FileNotFoundError, realpath, path, strict=True)
self.assertEqual(realpath(path, strict=ALLOW_MISSING), path)
else: else:
self.assertRaises(UnicodeEncodeError, realpath, path, strict=False) self.assertRaises(UnicodeEncodeError, realpath, path, strict=False)
self.assertRaises(UnicodeEncodeError, realpath, path, strict=True) self.assertRaises(UnicodeEncodeError, realpath, path, strict=True)
self.assertRaises(UnicodeEncodeError, realpath, path, strict=ALLOW_MISSING)
path = '/nonexistent/\udfff' path = '/nonexistent/\udfff'
if sys.platform == 'win32': if sys.platform == 'win32':
self.assertEqual(realpath(path, strict=False), path) self.assertEqual(realpath(path, strict=False), path)
self.assertEqual(realpath(path, strict=ALLOW_MISSING), path)
else: else:
self.assertRaises(UnicodeEncodeError, realpath, path, strict=False) self.assertRaises(UnicodeEncodeError, realpath, path, strict=False)
self.assertRaises(UnicodeEncodeError, realpath, path, strict=ALLOW_MISSING)
self.assertRaises(FileNotFoundError, realpath, path, strict=True) self.assertRaises(FileNotFoundError, realpath, path, strict=True)
path = '/\udfff/..' path = '/\udfff/..'
if sys.platform == 'win32': if sys.platform == 'win32':
self.assertEqual(realpath(path, strict=False), '/') self.assertEqual(realpath(path, strict=False), '/')
self.assertRaises(FileNotFoundError, realpath, path, strict=True) self.assertRaises(FileNotFoundError, realpath, path, strict=True)
self.assertEqual(realpath(path, strict=ALLOW_MISSING), '/')
else: else:
self.assertRaises(UnicodeEncodeError, realpath, path, strict=False) self.assertRaises(UnicodeEncodeError, realpath, path, strict=False)
self.assertRaises(UnicodeEncodeError, realpath, path, strict=True) self.assertRaises(UnicodeEncodeError, realpath, path, strict=True)
self.assertRaises(UnicodeEncodeError, realpath, path, strict=ALLOW_MISSING)
path = '/nonexistent/\udfff/..' path = '/nonexistent/\udfff/..'
if sys.platform == 'win32': if sys.platform == 'win32':
self.assertEqual(realpath(path, strict=False), '/nonexistent') self.assertEqual(realpath(path, strict=False), '/nonexistent')
self.assertEqual(realpath(path, strict=ALLOW_MISSING), '/nonexistent')
else: else:
self.assertRaises(UnicodeEncodeError, realpath, path, strict=False) self.assertRaises(UnicodeEncodeError, realpath, path, strict=False)
self.assertRaises(UnicodeEncodeError, realpath, path, strict=ALLOW_MISSING)
self.assertRaises(FileNotFoundError, realpath, path, strict=True) self.assertRaises(FileNotFoundError, realpath, path, strict=True)
path = b'/\xff' path = b'/\xff'
if sys.platform == 'win32': if sys.platform == 'win32':
self.assertRaises(UnicodeDecodeError, realpath, path, strict=False) self.assertRaises(UnicodeDecodeError, realpath, path, strict=False)
self.assertRaises(UnicodeDecodeError, realpath, path, strict=True) self.assertRaises(UnicodeDecodeError, realpath, path, strict=True)
self.assertRaises(UnicodeDecodeError, realpath, path, strict=ALLOW_MISSING)
else: else:
self.assertEqual(realpath(path, strict=False), path) self.assertEqual(realpath(path, strict=False), path)
if support.is_wasi: if support.is_wasi:
self.assertRaises(OSError, realpath, path, strict=True) self.assertRaises(OSError, realpath, path, strict=True)
self.assertRaises(OSError, realpath, path, strict=ALLOW_MISSING)
else: else:
self.assertRaises(FileNotFoundError, realpath, path, strict=True) self.assertRaises(FileNotFoundError, realpath, path, strict=True)
self.assertEqual(realpath(path, strict=ALLOW_MISSING), path)
path = b'/nonexistent/\xff' path = b'/nonexistent/\xff'
if sys.platform == 'win32': if sys.platform == 'win32':
self.assertRaises(UnicodeDecodeError, realpath, path, strict=False) self.assertRaises(UnicodeDecodeError, realpath, path, strict=False)
self.assertRaises(UnicodeDecodeError, realpath, path, strict=ALLOW_MISSING)
else: else:
self.assertEqual(realpath(path, strict=False), path) self.assertEqual(realpath(path, strict=False), path)
if support.is_wasi: if support.is_wasi:
self.assertRaises(OSError, realpath, path, strict=True) self.assertRaises(OSError, realpath, path, strict=True)
self.assertRaises(OSError, realpath, path, strict=ALLOW_MISSING)
else: else:
self.assertRaises(FileNotFoundError, realpath, path, strict=True) self.assertRaises(FileNotFoundError, realpath, path, strict=True)
@os_helper.skip_unless_symlink @os_helper.skip_unless_symlink
@skip_if_ABSTFN_contains_backslash @skip_if_ABSTFN_contains_backslash
def test_realpath_relative(self): @_parameterize({}, {'strict': ALLOW_MISSING})
def test_realpath_relative(self, kwargs):
try: try:
os.symlink(posixpath.relpath(ABSTFN+"1"), ABSTFN) os.symlink(posixpath.relpath(ABSTFN+"1"), ABSTFN)
self.assertEqual(realpath(ABSTFN), ABSTFN+"1") self.assertEqual(realpath(ABSTFN, **kwargs), ABSTFN+"1")
finally: finally:
os_helper.unlink(ABSTFN) os_helper.unlink(ABSTFN)
@os_helper.skip_unless_symlink @os_helper.skip_unless_symlink
@skip_if_ABSTFN_contains_backslash @skip_if_ABSTFN_contains_backslash
def test_realpath_missing_pardir(self): @_parameterize({}, {'strict': ALLOW_MISSING})
def test_realpath_missing_pardir(self, kwargs):
try: try:
os.symlink(TESTFN + "1", TESTFN) os.symlink(TESTFN + "1", TESTFN)
self.assertEqual(realpath("nonexistent/../" + TESTFN), ABSTFN + "1") self.assertEqual(
realpath("nonexistent/../" + TESTFN, **kwargs), ABSTFN + "1")
finally: finally:
os_helper.unlink(TESTFN) os_helper.unlink(TESTFN)
@ -617,37 +667,38 @@ def test_realpath_symlink_loops(self):
@os_helper.skip_unless_symlink @os_helper.skip_unless_symlink
@skip_if_ABSTFN_contains_backslash @skip_if_ABSTFN_contains_backslash
def test_realpath_symlink_loops_strict(self): @_parameterize({'strict': True}, {'strict': ALLOW_MISSING})
def test_realpath_symlink_loops_strict(self, kwargs):
# Bug #43757, raise OSError if we get into an infinite symlink loop in # Bug #43757, raise OSError if we get into an infinite symlink loop in
# strict mode. # the strict modes.
try: try:
os.symlink(ABSTFN, ABSTFN) os.symlink(ABSTFN, ABSTFN)
self.assertRaises(OSError, realpath, ABSTFN, strict=True) self.assertRaises(OSError, realpath, ABSTFN, **kwargs)
os.symlink(ABSTFN+"1", ABSTFN+"2") os.symlink(ABSTFN+"1", ABSTFN+"2")
os.symlink(ABSTFN+"2", ABSTFN+"1") os.symlink(ABSTFN+"2", ABSTFN+"1")
self.assertRaises(OSError, realpath, ABSTFN+"1", strict=True) self.assertRaises(OSError, realpath, ABSTFN+"1", **kwargs)
self.assertRaises(OSError, realpath, ABSTFN+"2", strict=True) self.assertRaises(OSError, realpath, ABSTFN+"2", **kwargs)
self.assertRaises(OSError, realpath, ABSTFN+"1/x", strict=True) self.assertRaises(OSError, realpath, ABSTFN+"1/x", **kwargs)
self.assertRaises(OSError, realpath, ABSTFN+"1/..", strict=True) self.assertRaises(OSError, realpath, ABSTFN+"1/..", **kwargs)
self.assertRaises(OSError, realpath, ABSTFN+"1/../x", strict=True) self.assertRaises(OSError, realpath, ABSTFN+"1/../x", **kwargs)
os.symlink(ABSTFN+"x", ABSTFN+"y") os.symlink(ABSTFN+"x", ABSTFN+"y")
self.assertRaises(OSError, realpath, self.assertRaises(OSError, realpath,
ABSTFN+"1/../" + basename(ABSTFN) + "y", strict=True) ABSTFN+"1/../" + basename(ABSTFN) + "y", **kwargs)
self.assertRaises(OSError, realpath, self.assertRaises(OSError, realpath,
ABSTFN+"1/../" + basename(ABSTFN) + "1", strict=True) ABSTFN+"1/../" + basename(ABSTFN) + "1", **kwargs)
os.symlink(basename(ABSTFN) + "a/b", ABSTFN+"a") os.symlink(basename(ABSTFN) + "a/b", ABSTFN+"a")
self.assertRaises(OSError, realpath, ABSTFN+"a", strict=True) self.assertRaises(OSError, realpath, ABSTFN+"a", **kwargs)
os.symlink("../" + basename(dirname(ABSTFN)) + "/" + os.symlink("../" + basename(dirname(ABSTFN)) + "/" +
basename(ABSTFN) + "c", ABSTFN+"c") basename(ABSTFN) + "c", ABSTFN+"c")
self.assertRaises(OSError, realpath, ABSTFN+"c", strict=True) self.assertRaises(OSError, realpath, ABSTFN+"c", **kwargs)
# Test using relative path as well. # Test using relative path as well.
with os_helper.change_cwd(dirname(ABSTFN)): with os_helper.change_cwd(dirname(ABSTFN)):
self.assertRaises(OSError, realpath, basename(ABSTFN), strict=True) self.assertRaises(OSError, realpath, basename(ABSTFN), **kwargs)
finally: finally:
os_helper.unlink(ABSTFN) os_helper.unlink(ABSTFN)
os_helper.unlink(ABSTFN+"1") os_helper.unlink(ABSTFN+"1")
@ -658,13 +709,14 @@ def test_realpath_symlink_loops_strict(self):
@os_helper.skip_unless_symlink @os_helper.skip_unless_symlink
@skip_if_ABSTFN_contains_backslash @skip_if_ABSTFN_contains_backslash
def test_realpath_repeated_indirect_symlinks(self): @_parameterize({}, {'strict': True}, {'strict': ALLOW_MISSING})
def test_realpath_repeated_indirect_symlinks(self, kwargs):
# Issue #6975. # Issue #6975.
try: try:
os.mkdir(ABSTFN) os.mkdir(ABSTFN)
os.symlink('../' + basename(ABSTFN), ABSTFN + '/self') os.symlink('../' + basename(ABSTFN), ABSTFN + '/self')
os.symlink('self/self/self', ABSTFN + '/link') os.symlink('self/self/self', ABSTFN + '/link')
self.assertEqual(realpath(ABSTFN + '/link'), ABSTFN) self.assertEqual(realpath(ABSTFN + '/link', **kwargs), ABSTFN)
finally: finally:
os_helper.unlink(ABSTFN + '/self') os_helper.unlink(ABSTFN + '/self')
os_helper.unlink(ABSTFN + '/link') os_helper.unlink(ABSTFN + '/link')
@ -672,14 +724,15 @@ def test_realpath_repeated_indirect_symlinks(self):
@os_helper.skip_unless_symlink @os_helper.skip_unless_symlink
@skip_if_ABSTFN_contains_backslash @skip_if_ABSTFN_contains_backslash
def test_realpath_deep_recursion(self): @_parameterize({}, {'strict': True}, {'strict': ALLOW_MISSING})
def test_realpath_deep_recursion(self, kwargs):
depth = 10 depth = 10
try: try:
os.mkdir(ABSTFN) os.mkdir(ABSTFN)
for i in range(depth): for i in range(depth):
os.symlink('/'.join(['%d' % i] * 10), ABSTFN + '/%d' % (i + 1)) os.symlink('/'.join(['%d' % i] * 10), ABSTFN + '/%d' % (i + 1))
os.symlink('.', ABSTFN + '/0') os.symlink('.', ABSTFN + '/0')
self.assertEqual(realpath(ABSTFN + '/%d' % depth), ABSTFN) self.assertEqual(realpath(ABSTFN + '/%d' % depth, **kwargs), ABSTFN)
# Test using relative path as well. # Test using relative path as well.
with os_helper.change_cwd(ABSTFN): with os_helper.change_cwd(ABSTFN):
@ -691,7 +744,8 @@ def test_realpath_deep_recursion(self):
@os_helper.skip_unless_symlink @os_helper.skip_unless_symlink
@skip_if_ABSTFN_contains_backslash @skip_if_ABSTFN_contains_backslash
def test_realpath_resolve_parents(self): @_parameterize({}, {'strict': ALLOW_MISSING})
def test_realpath_resolve_parents(self, kwargs):
# We also need to resolve any symlinks in the parents of a relative # We also need to resolve any symlinks in the parents of a relative
# path passed to realpath. E.g.: current working directory is # path passed to realpath. E.g.: current working directory is
# /usr/doc with 'doc' being a symlink to /usr/share/doc. We call # /usr/doc with 'doc' being a symlink to /usr/share/doc. We call
@ -702,7 +756,8 @@ def test_realpath_resolve_parents(self):
os.symlink(ABSTFN + "/y", ABSTFN + "/k") os.symlink(ABSTFN + "/y", ABSTFN + "/k")
with os_helper.change_cwd(ABSTFN + "/k"): with os_helper.change_cwd(ABSTFN + "/k"):
self.assertEqual(realpath("a"), ABSTFN + "/y/a") self.assertEqual(realpath("a", **kwargs),
ABSTFN + "/y/a")
finally: finally:
os_helper.unlink(ABSTFN + "/k") os_helper.unlink(ABSTFN + "/k")
os_helper.rmdir(ABSTFN + "/y") os_helper.rmdir(ABSTFN + "/y")
@ -710,7 +765,8 @@ def test_realpath_resolve_parents(self):
@os_helper.skip_unless_symlink @os_helper.skip_unless_symlink
@skip_if_ABSTFN_contains_backslash @skip_if_ABSTFN_contains_backslash
def test_realpath_resolve_before_normalizing(self): @_parameterize({}, {'strict': True}, {'strict': ALLOW_MISSING})
def test_realpath_resolve_before_normalizing(self, kwargs):
# Bug #990669: Symbolic links should be resolved before we # Bug #990669: Symbolic links should be resolved before we
# normalize the path. E.g.: if we have directories 'a', 'k' and 'y' # normalize the path. E.g.: if we have directories 'a', 'k' and 'y'
# in the following hierarchy: # in the following hierarchy:
@ -725,10 +781,10 @@ def test_realpath_resolve_before_normalizing(self):
os.symlink(ABSTFN + "/k/y", ABSTFN + "/link-y") os.symlink(ABSTFN + "/k/y", ABSTFN + "/link-y")
# Absolute path. # Absolute path.
self.assertEqual(realpath(ABSTFN + "/link-y/.."), ABSTFN + "/k") self.assertEqual(realpath(ABSTFN + "/link-y/..", **kwargs), ABSTFN + "/k")
# Relative path. # Relative path.
with os_helper.change_cwd(dirname(ABSTFN)): with os_helper.change_cwd(dirname(ABSTFN)):
self.assertEqual(realpath(basename(ABSTFN) + "/link-y/.."), self.assertEqual(realpath(basename(ABSTFN) + "/link-y/..", **kwargs),
ABSTFN + "/k") ABSTFN + "/k")
finally: finally:
os_helper.unlink(ABSTFN + "/link-y") os_helper.unlink(ABSTFN + "/link-y")
@ -738,7 +794,8 @@ def test_realpath_resolve_before_normalizing(self):
@os_helper.skip_unless_symlink @os_helper.skip_unless_symlink
@skip_if_ABSTFN_contains_backslash @skip_if_ABSTFN_contains_backslash
def test_realpath_resolve_first(self): @_parameterize({}, {'strict': True}, {'strict': ALLOW_MISSING})
def test_realpath_resolve_first(self, kwargs):
# Bug #1213894: The first component of the path, if not absolute, # Bug #1213894: The first component of the path, if not absolute,
# must be resolved too. # must be resolved too.
@ -748,8 +805,8 @@ def test_realpath_resolve_first(self):
os.symlink(ABSTFN, ABSTFN + "link") os.symlink(ABSTFN, ABSTFN + "link")
with os_helper.change_cwd(dirname(ABSTFN)): with os_helper.change_cwd(dirname(ABSTFN)):
base = basename(ABSTFN) base = basename(ABSTFN)
self.assertEqual(realpath(base + "link"), ABSTFN) self.assertEqual(realpath(base + "link", **kwargs), ABSTFN)
self.assertEqual(realpath(base + "link/k"), ABSTFN + "/k") self.assertEqual(realpath(base + "link/k", **kwargs), ABSTFN + "/k")
finally: finally:
os_helper.unlink(ABSTFN + "link") os_helper.unlink(ABSTFN + "link")
os_helper.rmdir(ABSTFN + "/k") os_helper.rmdir(ABSTFN + "/k")
@ -767,12 +824,67 @@ def test_realpath_unreadable_symlink(self):
self.assertEqual(realpath(ABSTFN + '/foo'), ABSTFN + '/foo') self.assertEqual(realpath(ABSTFN + '/foo'), ABSTFN + '/foo')
self.assertEqual(realpath(ABSTFN + '/../foo'), dirname(ABSTFN) + '/foo') self.assertEqual(realpath(ABSTFN + '/../foo'), dirname(ABSTFN) + '/foo')
self.assertEqual(realpath(ABSTFN + '/foo/..'), ABSTFN) self.assertEqual(realpath(ABSTFN + '/foo/..'), ABSTFN)
with self.assertRaises(PermissionError):
realpath(ABSTFN, strict=True)
finally: finally:
os.chmod(ABSTFN, 0o755, follow_symlinks=False) os.chmod(ABSTFN, 0o755, follow_symlinks=False)
os_helper.unlink(ABSTFN) os_helper.unlink(ABSTFN)
@os_helper.skip_unless_symlink
@skip_if_ABSTFN_contains_backslash
@unittest.skipIf(os.chmod not in os.supports_follow_symlinks, "Can't set symlink permissions")
@unittest.skipIf(sys.platform != "darwin", "only macOS requires read permission to readlink()")
@_parameterize({'strict': True}, {'strict': ALLOW_MISSING})
def test_realpath_unreadable_symlink_strict(self, kwargs):
try:
os.symlink(ABSTFN+"1", ABSTFN)
os.chmod(ABSTFN, 0o000, follow_symlinks=False)
with self.assertRaises(PermissionError):
realpath(ABSTFN, **kwargs)
with self.assertRaises(PermissionError):
realpath(ABSTFN + '/foo', **kwargs),
with self.assertRaises(PermissionError):
realpath(ABSTFN + '/../foo', **kwargs)
with self.assertRaises(PermissionError):
realpath(ABSTFN + '/foo/..', **kwargs)
finally:
os.chmod(ABSTFN, 0o755, follow_symlinks=False)
os.unlink(ABSTFN)
@skip_if_ABSTFN_contains_backslash
@os_helper.skip_unless_symlink
def test_realpath_unreadable_directory(self):
try:
os.mkdir(ABSTFN)
os.mkdir(ABSTFN + '/k')
os.chmod(ABSTFN, 0o000)
self.assertEqual(realpath(ABSTFN, strict=False), ABSTFN)
self.assertEqual(realpath(ABSTFN, strict=True), ABSTFN)
self.assertEqual(realpath(ABSTFN, strict=ALLOW_MISSING), ABSTFN)
try:
os.stat(ABSTFN)
except PermissionError:
pass
else:
self.skipTest('Cannot block permissions')
self.assertEqual(realpath(ABSTFN + '/k', strict=False),
ABSTFN + '/k')
self.assertRaises(PermissionError, realpath, ABSTFN + '/k',
strict=True)
self.assertRaises(PermissionError, realpath, ABSTFN + '/k',
strict=ALLOW_MISSING)
self.assertEqual(realpath(ABSTFN + '/missing', strict=False),
ABSTFN + '/missing')
self.assertRaises(PermissionError, realpath, ABSTFN + '/missing',
strict=True)
self.assertRaises(PermissionError, realpath, ABSTFN + '/missing',
strict=ALLOW_MISSING)
finally:
os.chmod(ABSTFN, 0o755)
os_helper.rmdir(ABSTFN + '/k')
os_helper.rmdir(ABSTFN)
@skip_if_ABSTFN_contains_backslash @skip_if_ABSTFN_contains_backslash
def test_realpath_nonterminal_file(self): def test_realpath_nonterminal_file(self):
try: try:
@ -780,14 +892,27 @@ def test_realpath_nonterminal_file(self):
f.write('test_posixpath wuz ere') f.write('test_posixpath wuz ere')
self.assertEqual(realpath(ABSTFN, strict=False), ABSTFN) self.assertEqual(realpath(ABSTFN, strict=False), ABSTFN)
self.assertEqual(realpath(ABSTFN, strict=True), ABSTFN) self.assertEqual(realpath(ABSTFN, strict=True), ABSTFN)
self.assertEqual(realpath(ABSTFN, strict=ALLOW_MISSING), ABSTFN)
self.assertEqual(realpath(ABSTFN + "/", strict=False), ABSTFN) self.assertEqual(realpath(ABSTFN + "/", strict=False), ABSTFN)
self.assertRaises(NotADirectoryError, realpath, ABSTFN + "/", strict=True) self.assertRaises(NotADirectoryError, realpath, ABSTFN + "/", strict=True)
self.assertRaises(NotADirectoryError, realpath, ABSTFN + "/",
strict=ALLOW_MISSING)
self.assertEqual(realpath(ABSTFN + "/.", strict=False), ABSTFN) self.assertEqual(realpath(ABSTFN + "/.", strict=False), ABSTFN)
self.assertRaises(NotADirectoryError, realpath, ABSTFN + "/.", strict=True) self.assertRaises(NotADirectoryError, realpath, ABSTFN + "/.", strict=True)
self.assertRaises(NotADirectoryError, realpath, ABSTFN + "/.",
strict=ALLOW_MISSING)
self.assertEqual(realpath(ABSTFN + "/..", strict=False), dirname(ABSTFN)) self.assertEqual(realpath(ABSTFN + "/..", strict=False), dirname(ABSTFN))
self.assertRaises(NotADirectoryError, realpath, ABSTFN + "/..", strict=True) self.assertRaises(NotADirectoryError, realpath, ABSTFN + "/..", strict=True)
self.assertRaises(NotADirectoryError, realpath, ABSTFN + "/..",
strict=ALLOW_MISSING)
self.assertEqual(realpath(ABSTFN + "/subdir", strict=False), ABSTFN + "/subdir") self.assertEqual(realpath(ABSTFN + "/subdir", strict=False), ABSTFN + "/subdir")
self.assertRaises(NotADirectoryError, realpath, ABSTFN + "/subdir", strict=True) self.assertRaises(NotADirectoryError, realpath, ABSTFN + "/subdir", strict=True)
self.assertRaises(NotADirectoryError, realpath, ABSTFN + "/subdir",
strict=ALLOW_MISSING)
finally: finally:
os_helper.unlink(ABSTFN) os_helper.unlink(ABSTFN)
@ -800,14 +925,27 @@ def test_realpath_nonterminal_symlink_to_file(self):
os.symlink(ABSTFN + "1", ABSTFN) os.symlink(ABSTFN + "1", ABSTFN)
self.assertEqual(realpath(ABSTFN, strict=False), ABSTFN + "1") self.assertEqual(realpath(ABSTFN, strict=False), ABSTFN + "1")
self.assertEqual(realpath(ABSTFN, strict=True), ABSTFN + "1") self.assertEqual(realpath(ABSTFN, strict=True), ABSTFN + "1")
self.assertEqual(realpath(ABSTFN, strict=ALLOW_MISSING), ABSTFN + "1")
self.assertEqual(realpath(ABSTFN + "/", strict=False), ABSTFN + "1") self.assertEqual(realpath(ABSTFN + "/", strict=False), ABSTFN + "1")
self.assertRaises(NotADirectoryError, realpath, ABSTFN + "/", strict=True) self.assertRaises(NotADirectoryError, realpath, ABSTFN + "/", strict=True)
self.assertRaises(NotADirectoryError, realpath, ABSTFN + "/",
strict=ALLOW_MISSING)
self.assertEqual(realpath(ABSTFN + "/.", strict=False), ABSTFN + "1") self.assertEqual(realpath(ABSTFN + "/.", strict=False), ABSTFN + "1")
self.assertRaises(NotADirectoryError, realpath, ABSTFN + "/.", strict=True) self.assertRaises(NotADirectoryError, realpath, ABSTFN + "/.", strict=True)
self.assertRaises(NotADirectoryError, realpath, ABSTFN + "/.",
strict=ALLOW_MISSING)
self.assertEqual(realpath(ABSTFN + "/..", strict=False), dirname(ABSTFN)) self.assertEqual(realpath(ABSTFN + "/..", strict=False), dirname(ABSTFN))
self.assertRaises(NotADirectoryError, realpath, ABSTFN + "/..", strict=True) self.assertRaises(NotADirectoryError, realpath, ABSTFN + "/..", strict=True)
self.assertRaises(NotADirectoryError, realpath, ABSTFN + "/..",
strict=ALLOW_MISSING)
self.assertEqual(realpath(ABSTFN + "/subdir", strict=False), ABSTFN + "1/subdir") self.assertEqual(realpath(ABSTFN + "/subdir", strict=False), ABSTFN + "1/subdir")
self.assertRaises(NotADirectoryError, realpath, ABSTFN + "/subdir", strict=True) self.assertRaises(NotADirectoryError, realpath, ABSTFN + "/subdir", strict=True)
self.assertRaises(NotADirectoryError, realpath, ABSTFN + "/subdir",
strict=ALLOW_MISSING)
finally: finally:
os_helper.unlink(ABSTFN) os_helper.unlink(ABSTFN)
os_helper.unlink(ABSTFN + "1") os_helper.unlink(ABSTFN + "1")
@ -822,14 +960,27 @@ def test_realpath_nonterminal_symlink_to_symlinks_to_file(self):
os.symlink(ABSTFN + "1", ABSTFN) os.symlink(ABSTFN + "1", ABSTFN)
self.assertEqual(realpath(ABSTFN, strict=False), ABSTFN + "2") self.assertEqual(realpath(ABSTFN, strict=False), ABSTFN + "2")
self.assertEqual(realpath(ABSTFN, strict=True), ABSTFN + "2") self.assertEqual(realpath(ABSTFN, strict=True), ABSTFN + "2")
self.assertEqual(realpath(ABSTFN, strict=True), ABSTFN + "2")
self.assertEqual(realpath(ABSTFN + "/", strict=False), ABSTFN + "2") self.assertEqual(realpath(ABSTFN + "/", strict=False), ABSTFN + "2")
self.assertRaises(NotADirectoryError, realpath, ABSTFN + "/", strict=True) self.assertRaises(NotADirectoryError, realpath, ABSTFN + "/", strict=True)
self.assertRaises(NotADirectoryError, realpath, ABSTFN + "/",
strict=ALLOW_MISSING)
self.assertEqual(realpath(ABSTFN + "/.", strict=False), ABSTFN + "2") self.assertEqual(realpath(ABSTFN + "/.", strict=False), ABSTFN + "2")
self.assertRaises(NotADirectoryError, realpath, ABSTFN + "/.", strict=True) self.assertRaises(NotADirectoryError, realpath, ABSTFN + "/.", strict=True)
self.assertRaises(NotADirectoryError, realpath, ABSTFN + "/.",
strict=ALLOW_MISSING)
self.assertEqual(realpath(ABSTFN + "/..", strict=False), dirname(ABSTFN)) self.assertEqual(realpath(ABSTFN + "/..", strict=False), dirname(ABSTFN))
self.assertRaises(NotADirectoryError, realpath, ABSTFN + "/..", strict=True) self.assertRaises(NotADirectoryError, realpath, ABSTFN + "/..", strict=True)
self.assertRaises(NotADirectoryError, realpath, ABSTFN + "/..",
strict=ALLOW_MISSING)
self.assertEqual(realpath(ABSTFN + "/subdir", strict=False), ABSTFN + "2/subdir") self.assertEqual(realpath(ABSTFN + "/subdir", strict=False), ABSTFN + "2/subdir")
self.assertRaises(NotADirectoryError, realpath, ABSTFN + "/subdir", strict=True) self.assertRaises(NotADirectoryError, realpath, ABSTFN + "/subdir", strict=True)
self.assertRaises(NotADirectoryError, realpath, ABSTFN + "/subdir",
strict=ALLOW_MISSING)
finally: finally:
os_helper.unlink(ABSTFN) os_helper.unlink(ABSTFN)
os_helper.unlink(ABSTFN + "1") os_helper.unlink(ABSTFN + "1")
@ -1017,9 +1168,12 @@ def test_path_normpath(self):
def test_path_abspath(self): def test_path_abspath(self):
self.assertPathEqual(self.path.abspath) self.assertPathEqual(self.path.abspath)
def test_path_realpath(self): @_parameterize({}, {'strict': True}, {'strict': ALLOW_MISSING})
def test_path_realpath(self, kwargs):
self.assertPathEqual(self.path.realpath) self.assertPathEqual(self.path.realpath)
self.assertPathEqual(partial(self.path.realpath, **kwargs))
def test_path_relpath(self): def test_path_relpath(self):
self.assertPathEqual(self.path.relpath) self.assertPathEqual(self.path.relpath)

View File

@ -2682,6 +2682,31 @@ def test_useful_error_message_when_modules_missing(self):
str(excinfo.exception), str(excinfo.exception),
) )
@unittest.skipUnless(os_helper.can_symlink(), 'requires symlink support')
@unittest.skipUnless(hasattr(os, 'chmod'), "missing os.chmod")
@unittest.mock.patch('os.chmod')
def test_deferred_directory_attributes_update(self, mock_chmod):
# Regression test for gh-127987: setting attributes on arbitrary files
tempdir = os.path.join(TEMPDIR, 'test127987')
def mock_chmod_side_effect(path, mode, **kwargs):
target_path = os.path.realpath(path)
if os.path.commonpath([target_path, tempdir]) != tempdir:
raise Exception("should not try to chmod anything outside the destination", target_path)
mock_chmod.side_effect = mock_chmod_side_effect
outside_tree_dir = os.path.join(TEMPDIR, 'outside_tree_dir')
with ArchiveMaker() as arc:
arc.add('x', symlink_to='.')
arc.add('x', type=tarfile.DIRTYPE, mode='?rwsrwsrwt')
arc.add('x', symlink_to=outside_tree_dir)
os.makedirs(outside_tree_dir)
try:
arc.open().extractall(path=tempdir, filter='tar')
finally:
os_helper.rmtree(outside_tree_dir)
os_helper.rmtree(tempdir)
class CommandLineTest(unittest.TestCase): class CommandLineTest(unittest.TestCase):
@ -3242,6 +3267,10 @@ def check_files_present(self, directory):
got_paths = set( got_paths = set(
p.relative_to(directory) p.relative_to(directory)
for p in pathlib.Path(directory).glob('**/*')) for p in pathlib.Path(directory).glob('**/*'))
if self.extraction_filter == 'data':
# The 'data' filter is expected to reject special files
for path in 'ustar/fifotype', 'ustar/blktype', 'ustar/chrtype':
got_paths.discard(pathlib.Path(path))
self.assertEqual(self.control_paths, got_paths) self.assertEqual(self.control_paths, got_paths)
@contextmanager @contextmanager
@ -3471,12 +3500,28 @@ def __exit__(self, *exc):
self.bio = None self.bio = None
def add(self, name, *, type=None, symlink_to=None, hardlink_to=None, def add(self, name, *, type=None, symlink_to=None, hardlink_to=None,
mode=None, size=None, **kwargs): mode=None, size=None, content=None, **kwargs):
"""Add a member to the test archive. Call within `with`.""" """Add a member to the test archive. Call within `with`.
Provides many shortcuts:
- default `type` is based on symlink_to, hardlink_to, and trailing `/`
in name (which is stripped)
- size & content defaults are based on each other
- content can be str or bytes
- mode should be textual ('-rwxrwxrwx')
(add more! this is unstable internal test-only API)
"""
name = str(name) name = str(name)
tarinfo = tarfile.TarInfo(name).replace(**kwargs) tarinfo = tarfile.TarInfo(name).replace(**kwargs)
if content is not None:
if isinstance(content, str):
content = content.encode()
size = len(content)
if size is not None: if size is not None:
tarinfo.size = size tarinfo.size = size
if content is None:
content = bytes(tarinfo.size)
if mode: if mode:
tarinfo.mode = _filemode_to_int(mode) tarinfo.mode = _filemode_to_int(mode)
if symlink_to is not None: if symlink_to is not None:
@ -3490,7 +3535,7 @@ def add(self, name, *, type=None, symlink_to=None, hardlink_to=None,
if type is not None: if type is not None:
tarinfo.type = type tarinfo.type = type
if tarinfo.isreg(): if tarinfo.isreg():
fileobj = io.BytesIO(bytes(tarinfo.size)) fileobj = io.BytesIO(content)
else: else:
fileobj = None fileobj = None
self.tar_w.addfile(tarinfo, fileobj) self.tar_w.addfile(tarinfo, fileobj)
@ -3524,7 +3569,7 @@ class TestExtractionFilters(unittest.TestCase):
destdir = outerdir / 'dest' destdir = outerdir / 'dest'
@contextmanager @contextmanager
def check_context(self, tar, filter): def check_context(self, tar, filter, *, check_flag=True):
"""Extracts `tar` to `self.destdir` and allows checking the result """Extracts `tar` to `self.destdir` and allows checking the result
If an error occurs, it must be checked using `expect_exception` If an error occurs, it must be checked using `expect_exception`
@ -3533,27 +3578,40 @@ def check_context(self, tar, filter):
except the destination directory itself and parent directories of except the destination directory itself and parent directories of
other files. other files.
When checking directories, do so before their contents. When checking directories, do so before their contents.
A file called 'flag' is made in outerdir (i.e. outside destdir)
before extraction; it should not be altered nor should its contents
be read/copied.
""" """
with os_helper.temp_dir(self.outerdir): with os_helper.temp_dir(self.outerdir):
flag_path = self.outerdir / 'flag'
flag_path.write_text('capture me')
try: try:
tar.extractall(self.destdir, filter=filter) tar.extractall(self.destdir, filter=filter)
except Exception as exc: except Exception as exc:
self.raised_exception = exc self.raised_exception = exc
self.reraise_exception = True
self.expected_paths = set() self.expected_paths = set()
else: else:
self.raised_exception = None self.raised_exception = None
self.reraise_exception = False
self.expected_paths = set(self.outerdir.glob('**/*')) self.expected_paths = set(self.outerdir.glob('**/*'))
self.expected_paths.discard(self.destdir) self.expected_paths.discard(self.destdir)
self.expected_paths.discard(flag_path)
try: try:
yield yield self
finally: finally:
tar.close() tar.close()
if self.raised_exception: if self.reraise_exception:
raise self.raised_exception raise self.raised_exception
self.assertEqual(self.expected_paths, set()) self.assertEqual(self.expected_paths, set())
if check_flag:
self.assertEqual(flag_path.read_text(), 'capture me')
else:
assert filter == 'fully_trusted'
def expect_file(self, name, type=None, symlink_to=None, mode=None, def expect_file(self, name, type=None, symlink_to=None, mode=None,
size=None): size=None, content=None):
"""Check a single file. See check_context.""" """Check a single file. See check_context."""
if self.raised_exception: if self.raised_exception:
raise self.raised_exception raise self.raised_exception
@ -3572,26 +3630,45 @@ def expect_file(self, name, type=None, symlink_to=None, mode=None,
# The symlink might be the same (textually) as what we expect, # The symlink might be the same (textually) as what we expect,
# but some systems change the link to an equivalent path, so # but some systems change the link to an equivalent path, so
# we fall back to samefile(). # we fall back to samefile().
if expected != got: try:
self.assertTrue(got.samefile(expected)) if expected != got:
self.assertTrue(got.samefile(expected))
except Exception as e:
# attach a note, so it's shown even if `samefile` fails
e.add_note(f'{expected=}, {got=}')
raise
elif type == tarfile.REGTYPE or type is None: elif type == tarfile.REGTYPE or type is None:
self.assertTrue(path.is_file()) self.assertTrue(path.is_file())
elif type == tarfile.DIRTYPE: elif type == tarfile.DIRTYPE:
self.assertTrue(path.is_dir()) self.assertTrue(path.is_dir())
elif type == tarfile.FIFOTYPE: elif type == tarfile.FIFOTYPE:
self.assertTrue(path.is_fifo()) self.assertTrue(path.is_fifo())
elif type == tarfile.SYMTYPE:
self.assertTrue(path.is_symlink())
else: else:
raise NotImplementedError(type) raise NotImplementedError(type)
if size is not None: if size is not None:
self.assertEqual(path.stat().st_size, size) self.assertEqual(path.stat().st_size, size)
if content is not None:
self.assertEqual(path.read_text(), content)
for parent in path.parents: for parent in path.parents:
self.expected_paths.discard(parent) self.expected_paths.discard(parent)
def expect_any_tree(self, name):
"""Check a directory; forget about its contents."""
tree_path = (self.destdir / name).resolve()
self.expect_file(tree_path, type=tarfile.DIRTYPE)
self.expected_paths = {
p for p in self.expected_paths
if tree_path not in p.parents
}
def expect_exception(self, exc_type, message_re='.'): def expect_exception(self, exc_type, message_re='.'):
with self.assertRaisesRegex(exc_type, message_re): with self.assertRaisesRegex(exc_type, message_re):
if self.raised_exception is not None: if self.raised_exception is not None:
raise self.raised_exception raise self.raised_exception
self.raised_exception = None self.reraise_exception = False
return self.raised_exception
def test_benign_file(self): def test_benign_file(self):
with ArchiveMaker() as arc: with ArchiveMaker() as arc:
@ -3676,6 +3753,80 @@ def test_parent_symlink(self):
with self.check_context(arc.open(), 'data'): with self.check_context(arc.open(), 'data'):
self.expect_file('parent/evil') self.expect_file('parent/evil')
@symlink_test
@os_helper.skip_unless_symlink
def test_realpath_limit_attack(self):
# (CVE-2025-4517)
with ArchiveMaker() as arc:
# populate the symlinks and dirs that expand in os.path.realpath()
# The component length is chosen so that in common cases, the unexpanded
# path fits in PATH_MAX, but it overflows when the final symlink
# is expanded
steps = "abcdefghijklmnop"
if sys.platform == 'win32':
component = 'd' * 25
elif 'PC_PATH_MAX' in os.pathconf_names:
max_path_len = os.pathconf(self.outerdir.parent, "PC_PATH_MAX")
path_sep_len = 1
dest_len = len(str(self.destdir)) + path_sep_len
component_len = (max_path_len - dest_len) // (len(steps) + path_sep_len)
component = 'd' * component_len
else:
raise NotImplementedError("Need to guess component length for {sys.platform}")
path = ""
step_path = ""
for i in steps:
arc.add(os.path.join(path, component), type=tarfile.DIRTYPE,
mode='drwxrwxrwx')
arc.add(os.path.join(path, i), symlink_to=component)
path = os.path.join(path, component)
step_path = os.path.join(step_path, i)
# create the final symlink that exceeds PATH_MAX and simply points
# to the top dir.
# this link will never be expanded by
# os.path.realpath(strict=False), nor anything after it.
linkpath = os.path.join(*steps, "l"*254)
parent_segments = [".."] * len(steps)
arc.add(linkpath, symlink_to=os.path.join(*parent_segments))
# make a symlink outside to keep the tar command happy
arc.add("escape", symlink_to=os.path.join(linkpath, ".."))
# use the symlinks above, that are not checked, to create a hardlink
# to a file outside of the destination path
arc.add("flaglink", hardlink_to=os.path.join("escape", "flag"))
# now that we have the hardlink we can overwrite the file
arc.add("flaglink", content='overwrite')
# we can also create new files as well!
arc.add("escape/newfile", content='new')
with (self.subTest('fully_trusted'),
self.check_context(arc.open(), filter='fully_trusted',
check_flag=False)):
if sys.platform == 'win32':
self.expect_exception((FileNotFoundError, FileExistsError))
elif self.raised_exception:
# Cannot symlink/hardlink: tarfile falls back to getmember()
self.expect_exception(KeyError)
# Otherwise, this block should never enter.
else:
self.expect_any_tree(component)
self.expect_file('flaglink', content='overwrite')
self.expect_file('../newfile', content='new')
self.expect_file('escape', type=tarfile.SYMTYPE)
self.expect_file('a', symlink_to=component)
for filter in 'tar', 'data':
with self.subTest(filter), self.check_context(arc.open(), filter=filter):
exc = self.expect_exception((OSError, KeyError))
if isinstance(exc, OSError):
if sys.platform == 'win32':
# 3: ERROR_PATH_NOT_FOUND
# 5: ERROR_ACCESS_DENIED
# 206: ERROR_FILENAME_EXCED_RANGE
self.assertIn(exc.winerror, (3, 5, 206))
else:
self.assertEqual(exc.errno, errno.ENAMETOOLONG)
@symlink_test @symlink_test
def test_parent_symlink2(self): def test_parent_symlink2(self):
# Test interplaying symlinks # Test interplaying symlinks
@ -3898,8 +4049,8 @@ def test_chains(self):
arc.add('symlink2', symlink_to=os.path.join( arc.add('symlink2', symlink_to=os.path.join(
'linkdir', 'hardlink2')) 'linkdir', 'hardlink2'))
arc.add('targetdir/target', size=3) arc.add('targetdir/target', size=3)
arc.add('linkdir/hardlink', hardlink_to='targetdir/target') arc.add('linkdir/hardlink', hardlink_to=os.path.join('targetdir', 'target'))
arc.add('linkdir/hardlink2', hardlink_to='linkdir/symlink') arc.add('linkdir/hardlink2', hardlink_to=os.path.join('linkdir', 'symlink'))
for filter in 'tar', 'data', 'fully_trusted': for filter in 'tar', 'data', 'fully_trusted':
with self.check_context(arc.open(), filter): with self.check_context(arc.open(), filter):
@ -3915,6 +4066,129 @@ def test_chains(self):
self.expect_file('linkdir/symlink', size=3) self.expect_file('linkdir/symlink', size=3)
self.expect_file('symlink2', size=3) self.expect_file('symlink2', size=3)
@symlink_test
def test_sneaky_hardlink_fallback(self):
# (CVE-2025-4330)
# Test that when hardlink extraction falls back to extracting members
# from the archive, the extracted member is (re-)filtered.
with ArchiveMaker() as arc:
# Create a directory structure so the c/escape symlink stays
# inside the path
arc.add("a/t/dummy")
# Create b/ directory
arc.add("b/")
# Point "c" to the bottom of the tree in "a"
arc.add("c", symlink_to=os.path.join("a", "t"))
# link to non-existant location under "a"
arc.add("c/escape", symlink_to=os.path.join("..", "..",
"link_here"))
# Move "c" to point to "b" ("c/escape" no longer exists)
arc.add("c", symlink_to="b")
# Attempt to create a hard link to "c/escape". Since it doesn't
# exist it will attempt to extract "cescape" but at "boom".
arc.add("boom", hardlink_to=os.path.join("c", "escape"))
with self.check_context(arc.open(), 'data'):
if not os_helper.can_symlink():
# When 'c/escape' is extracted, 'c' is a regular
# directory, and 'c/escape' *would* point outside
# the destination if symlinks were allowed.
self.expect_exception(
tarfile.LinkOutsideDestinationError)
elif sys.platform == "win32":
# On Windows, 'c/escape' points outside the destination
self.expect_exception(tarfile.LinkOutsideDestinationError)
else:
e = self.expect_exception(
tarfile.LinkFallbackError,
"link 'boom' would be extracted as a copy of "
+ "'c/escape', which was rejected")
self.assertIsInstance(e.__cause__,
tarfile.LinkOutsideDestinationError)
for filter in 'tar', 'fully_trusted':
with self.subTest(filter), self.check_context(arc.open(), filter):
if not os_helper.can_symlink():
self.expect_file("a/t/dummy")
self.expect_file("b/")
self.expect_file("c/")
else:
self.expect_file("a/t/dummy")
self.expect_file("b/")
self.expect_file("a/t/escape", symlink_to='../../link_here')
self.expect_file("boom", symlink_to='../../link_here')
self.expect_file("c", symlink_to='b')
@symlink_test
def test_exfiltration_via_symlink(self):
# (CVE-2025-4138)
# Test changing symlinks that result in a symlink pointing outside
# the extraction directory, unless prevented by 'data' filter's
# normalization.
with ArchiveMaker() as arc:
arc.add("escape", symlink_to=os.path.join('link', 'link', '..', '..', 'link-here'))
arc.add("link", symlink_to='./')
for filter in 'tar', 'data', 'fully_trusted':
with self.check_context(arc.open(), filter):
if os_helper.can_symlink():
self.expect_file("link", symlink_to='./')
if filter == 'data':
self.expect_file("escape", symlink_to='link-here')
else:
self.expect_file("escape",
symlink_to='link/link/../../link-here')
else:
# Nothing is extracted.
pass
@symlink_test
def test_chmod_outside_dir(self):
# (CVE-2024-12718)
# Test that members used for delayed updates of directory metadata
# are (re-)filtered.
with ArchiveMaker() as arc:
# "pwn" is a veeeery innocent symlink:
arc.add("a/pwn", symlink_to='.')
# But now "pwn" is also a directory, so it's scheduled to have its
# metadata updated later:
arc.add("a/pwn/", mode='drwxrwxrwx')
# Oops, "pwn" is not so innocent any more:
arc.add("a/pwn", symlink_to='x/../')
# Newly created symlink points to the dest dir,
# so it's OK for the "data" filter.
arc.add('a/x', symlink_to=('../'))
# But now "pwn" points outside the dest dir
for filter in 'tar', 'data', 'fully_trusted':
with self.check_context(arc.open(), filter) as cc:
if not os_helper.can_symlink():
self.expect_file("a/pwn/")
elif filter == 'data':
self.expect_file("a/x", symlink_to='../')
self.expect_file("a/pwn", symlink_to='.')
else:
self.expect_file("a/x", symlink_to='../')
self.expect_file("a/pwn", symlink_to='x/../')
if sys.platform != "win32":
st_mode = cc.outerdir.stat().st_mode
self.assertNotEqual(st_mode & 0o777, 0o777)
def test_link_fallback_normalizes(self):
# Make sure hardlink fallbacks work for non-normalized paths for all
# filters
with ArchiveMaker() as arc:
arc.add("dir/")
arc.add("dir/../afile")
arc.add("link1", hardlink_to='dir/../afile')
arc.add("link2", hardlink_to='dir/../dir/../afile')
for filter in 'tar', 'data', 'fully_trusted':
with self.check_context(arc.open(), filter) as cc:
self.expect_file("dir/")
self.expect_file("afile")
self.expect_file("link1")
self.expect_file("link2")
def test_modes(self): def test_modes(self):
# Test how file modes are extracted # Test how file modes are extracted
# (Note that the modes are ignored on platforms without working chmod) # (Note that the modes are ignored on platforms without working chmod)
@ -4039,7 +4313,7 @@ def test_tar_filter(self):
# The 'tar' filter returns TarInfo objects with the same name/type. # The 'tar' filter returns TarInfo objects with the same name/type.
# (It can also fail for particularly "evil" input, but we don't have # (It can also fail for particularly "evil" input, but we don't have
# that in the test archive.) # that in the test archive.)
with tarfile.TarFile.open(tarname) as tar: with tarfile.TarFile.open(tarname, encoding="iso8859-1") as tar:
for tarinfo in tar.getmembers(): for tarinfo in tar.getmembers():
try: try:
filtered = tarfile.tar_filter(tarinfo, '') filtered = tarfile.tar_filter(tarinfo, '')
@ -4051,7 +4325,7 @@ def test_tar_filter(self):
def test_data_filter(self): def test_data_filter(self):
# The 'data' filter either raises, or returns TarInfo with the same # The 'data' filter either raises, or returns TarInfo with the same
# name/type. # name/type.
with tarfile.TarFile.open(tarname) as tar: with tarfile.TarFile.open(tarname, encoding="iso8859-1") as tar:
for tarinfo in tar.getmembers(): for tarinfo in tar.getmembers():
try: try:
filtered = tarfile.data_filter(tarinfo, '') filtered = tarfile.data_filter(tarinfo, '')
@ -4218,13 +4492,13 @@ def valueerror_filter(tarinfo, path):
# If errorlevel is 0, errors affected by errorlevel are ignored # If errorlevel is 0, errors affected by errorlevel are ignored
with self.check_context(arc.open(errorlevel=0), extracterror_filter): with self.check_context(arc.open(errorlevel=0), extracterror_filter):
self.expect_file('file') pass
with self.check_context(arc.open(errorlevel=0), filtererror_filter): with self.check_context(arc.open(errorlevel=0), filtererror_filter):
self.expect_file('file') pass
with self.check_context(arc.open(errorlevel=0), oserror_filter): with self.check_context(arc.open(errorlevel=0), oserror_filter):
self.expect_file('file') pass
with self.check_context(arc.open(errorlevel=0), tarerror_filter): with self.check_context(arc.open(errorlevel=0), tarerror_filter):
self.expect_exception(tarfile.TarError) self.expect_exception(tarfile.TarError)
@ -4235,7 +4509,7 @@ def valueerror_filter(tarinfo, path):
# If 1, all fatal errors are raised # If 1, all fatal errors are raised
with self.check_context(arc.open(errorlevel=1), extracterror_filter): with self.check_context(arc.open(errorlevel=1), extracterror_filter):
self.expect_file('file') pass
with self.check_context(arc.open(errorlevel=1), filtererror_filter): with self.check_context(arc.open(errorlevel=1), filtererror_filter):
self.expect_exception(tarfile.FilterError) self.expect_exception(tarfile.FilterError)

View File

@ -0,0 +1,6 @@
Fixes multiple issues that allowed ``tarfile`` extraction filters
(``filter="data"`` and ``filter="tar"``) to be bypassed using crafted
symlinks and hard links.
Addresses :cve:`2024-12718`, :cve:`2025-4138`, :cve:`2025-4330`, and :cve:`2025-4517`.