You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							289 lines
						
					
					
						
							9.6 KiB
						
					
					
				
			
		
		
	
	
							289 lines
						
					
					
						
							9.6 KiB
						
					
					
				import functools
 | 
						|
import logging
 | 
						|
import os
 | 
						|
import posixpath
 | 
						|
import re
 | 
						|
import urllib.parse
 | 
						|
from typing import TYPE_CHECKING, Dict, List, NamedTuple, Optional, Tuple, Union
 | 
						|
 | 
						|
from pip._internal.utils.filetypes import WHEEL_EXTENSION
 | 
						|
from pip._internal.utils.hashes import Hashes
 | 
						|
from pip._internal.utils.misc import (
 | 
						|
    redact_auth_from_url,
 | 
						|
    split_auth_from_netloc,
 | 
						|
    splitext,
 | 
						|
)
 | 
						|
from pip._internal.utils.models import KeyBasedCompareMixin
 | 
						|
from pip._internal.utils.urls import path_to_url, url_to_path
 | 
						|
 | 
						|
if TYPE_CHECKING:
 | 
						|
    from pip._internal.index.collector import HTMLPage
 | 
						|
 | 
						|
logger = logging.getLogger(__name__)
 | 
						|
 | 
						|
 | 
						|
_SUPPORTED_HASHES = ("sha1", "sha224", "sha384", "sha256", "sha512", "md5")
 | 
						|
 | 
						|
 | 
						|
class Link(KeyBasedCompareMixin):
 | 
						|
    """Represents a parsed link from a Package Index's simple URL"""
 | 
						|
 | 
						|
    __slots__ = [
 | 
						|
        "_parsed_url",
 | 
						|
        "_url",
 | 
						|
        "comes_from",
 | 
						|
        "requires_python",
 | 
						|
        "yanked_reason",
 | 
						|
        "cache_link_parsing",
 | 
						|
    ]
 | 
						|
 | 
						|
    def __init__(
 | 
						|
        self,
 | 
						|
        url: str,
 | 
						|
        comes_from: Optional[Union[str, "HTMLPage"]] = None,
 | 
						|
        requires_python: Optional[str] = None,
 | 
						|
        yanked_reason: Optional[str] = None,
 | 
						|
        cache_link_parsing: bool = True,
 | 
						|
    ) -> None:
 | 
						|
        """
 | 
						|
        :param url: url of the resource pointed to (href of the link)
 | 
						|
        :param comes_from: instance of HTMLPage where the link was found,
 | 
						|
            or string.
 | 
						|
        :param requires_python: String containing the `Requires-Python`
 | 
						|
            metadata field, specified in PEP 345. This may be specified by
 | 
						|
            a data-requires-python attribute in the HTML link tag, as
 | 
						|
            described in PEP 503.
 | 
						|
        :param yanked_reason: the reason the file has been yanked, if the
 | 
						|
            file has been yanked, or None if the file hasn't been yanked.
 | 
						|
            This is the value of the "data-yanked" attribute, if present, in
 | 
						|
            a simple repository HTML link. If the file has been yanked but
 | 
						|
            no reason was provided, this should be the empty string. See
 | 
						|
            PEP 592 for more information and the specification.
 | 
						|
        :param cache_link_parsing: A flag that is used elsewhere to determine
 | 
						|
                                   whether resources retrieved from this link
 | 
						|
                                   should be cached. PyPI index urls should
 | 
						|
                                   generally have this set to False, for
 | 
						|
                                   example.
 | 
						|
        """
 | 
						|
 | 
						|
        # url can be a UNC windows share
 | 
						|
        if url.startswith("\\\\"):
 | 
						|
            url = path_to_url(url)
 | 
						|
 | 
						|
        self._parsed_url = urllib.parse.urlsplit(url)
 | 
						|
        # Store the url as a private attribute to prevent accidentally
 | 
						|
        # trying to set a new value.
 | 
						|
        self._url = url
 | 
						|
 | 
						|
        self.comes_from = comes_from
 | 
						|
        self.requires_python = requires_python if requires_python else None
 | 
						|
        self.yanked_reason = yanked_reason
 | 
						|
 | 
						|
        super().__init__(key=url, defining_class=Link)
 | 
						|
 | 
						|
        self.cache_link_parsing = cache_link_parsing
 | 
						|
 | 
						|
    def __str__(self) -> str:
 | 
						|
        if self.requires_python:
 | 
						|
            rp = f" (requires-python:{self.requires_python})"
 | 
						|
        else:
 | 
						|
            rp = ""
 | 
						|
        if self.comes_from:
 | 
						|
            return "{} (from {}){}".format(
 | 
						|
                redact_auth_from_url(self._url), self.comes_from, rp
 | 
						|
            )
 | 
						|
        else:
 | 
						|
            return redact_auth_from_url(str(self._url))
 | 
						|
 | 
						|
    def __repr__(self) -> str:
 | 
						|
        return f"<Link {self}>"
 | 
						|
 | 
						|
    @property
 | 
						|
    def url(self) -> str:
 | 
						|
        return self._url
 | 
						|
 | 
						|
    @property
 | 
						|
    def filename(self) -> str:
 | 
						|
        path = self.path.rstrip("/")
 | 
						|
        name = posixpath.basename(path)
 | 
						|
        if not name:
 | 
						|
            # Make sure we don't leak auth information if the netloc
 | 
						|
            # includes a username and password.
 | 
						|
            netloc, user_pass = split_auth_from_netloc(self.netloc)
 | 
						|
            return netloc
 | 
						|
 | 
						|
        name = urllib.parse.unquote(name)
 | 
						|
        assert name, f"URL {self._url!r} produced no filename"
 | 
						|
        return name
 | 
						|
 | 
						|
    @property
 | 
						|
    def file_path(self) -> str:
 | 
						|
        return url_to_path(self.url)
 | 
						|
 | 
						|
    @property
 | 
						|
    def scheme(self) -> str:
 | 
						|
        return self._parsed_url.scheme
 | 
						|
 | 
						|
    @property
 | 
						|
    def netloc(self) -> str:
 | 
						|
        """
 | 
						|
        This can contain auth information.
 | 
						|
        """
 | 
						|
        return self._parsed_url.netloc
 | 
						|
 | 
						|
    @property
 | 
						|
    def path(self) -> str:
 | 
						|
        return urllib.parse.unquote(self._parsed_url.path)
 | 
						|
 | 
						|
    def splitext(self) -> Tuple[str, str]:
 | 
						|
        return splitext(posixpath.basename(self.path.rstrip("/")))
 | 
						|
 | 
						|
    @property
 | 
						|
    def ext(self) -> str:
 | 
						|
        return self.splitext()[1]
 | 
						|
 | 
						|
    @property
 | 
						|
    def url_without_fragment(self) -> str:
 | 
						|
        scheme, netloc, path, query, fragment = self._parsed_url
 | 
						|
        return urllib.parse.urlunsplit((scheme, netloc, path, query, ""))
 | 
						|
 | 
						|
    _egg_fragment_re = re.compile(r"[#&]egg=([^&]*)")
 | 
						|
 | 
						|
    @property
 | 
						|
    def egg_fragment(self) -> Optional[str]:
 | 
						|
        match = self._egg_fragment_re.search(self._url)
 | 
						|
        if not match:
 | 
						|
            return None
 | 
						|
        return match.group(1)
 | 
						|
 | 
						|
    _subdirectory_fragment_re = re.compile(r"[#&]subdirectory=([^&]*)")
 | 
						|
 | 
						|
    @property
 | 
						|
    def subdirectory_fragment(self) -> Optional[str]:
 | 
						|
        match = self._subdirectory_fragment_re.search(self._url)
 | 
						|
        if not match:
 | 
						|
            return None
 | 
						|
        return match.group(1)
 | 
						|
 | 
						|
    _hash_re = re.compile(
 | 
						|
        r"({choices})=([a-f0-9]+)".format(choices="|".join(_SUPPORTED_HASHES))
 | 
						|
    )
 | 
						|
 | 
						|
    @property
 | 
						|
    def hash(self) -> Optional[str]:
 | 
						|
        match = self._hash_re.search(self._url)
 | 
						|
        if match:
 | 
						|
            return match.group(2)
 | 
						|
        return None
 | 
						|
 | 
						|
    @property
 | 
						|
    def hash_name(self) -> Optional[str]:
 | 
						|
        match = self._hash_re.search(self._url)
 | 
						|
        if match:
 | 
						|
            return match.group(1)
 | 
						|
        return None
 | 
						|
 | 
						|
    @property
 | 
						|
    def show_url(self) -> str:
 | 
						|
        return posixpath.basename(self._url.split("#", 1)[0].split("?", 1)[0])
 | 
						|
 | 
						|
    @property
 | 
						|
    def is_file(self) -> bool:
 | 
						|
        return self.scheme == "file"
 | 
						|
 | 
						|
    def is_existing_dir(self) -> bool:
 | 
						|
        return self.is_file and os.path.isdir(self.file_path)
 | 
						|
 | 
						|
    @property
 | 
						|
    def is_wheel(self) -> bool:
 | 
						|
        return self.ext == WHEEL_EXTENSION
 | 
						|
 | 
						|
    @property
 | 
						|
    def is_vcs(self) -> bool:
 | 
						|
        from pip._internal.vcs import vcs
 | 
						|
 | 
						|
        return self.scheme in vcs.all_schemes
 | 
						|
 | 
						|
    @property
 | 
						|
    def is_yanked(self) -> bool:
 | 
						|
        return self.yanked_reason is not None
 | 
						|
 | 
						|
    @property
 | 
						|
    def has_hash(self) -> bool:
 | 
						|
        return self.hash_name is not None
 | 
						|
 | 
						|
    def is_hash_allowed(self, hashes: Optional[Hashes]) -> bool:
 | 
						|
        """
 | 
						|
        Return True if the link has a hash and it is allowed.
 | 
						|
        """
 | 
						|
        if hashes is None or not self.has_hash:
 | 
						|
            return False
 | 
						|
        # Assert non-None so mypy knows self.hash_name and self.hash are str.
 | 
						|
        assert self.hash_name is not None
 | 
						|
        assert self.hash is not None
 | 
						|
 | 
						|
        return hashes.is_hash_allowed(self.hash_name, hex_digest=self.hash)
 | 
						|
 | 
						|
 | 
						|
class _CleanResult(NamedTuple):
 | 
						|
    """Convert link for equivalency check.
 | 
						|
 | 
						|
    This is used in the resolver to check whether two URL-specified requirements
 | 
						|
    likely point to the same distribution and can be considered equivalent. This
 | 
						|
    equivalency logic avoids comparing URLs literally, which can be too strict
 | 
						|
    (e.g. "a=1&b=2" vs "b=2&a=1") and produce conflicts unexpecting to users.
 | 
						|
 | 
						|
    Currently this does three things:
 | 
						|
 | 
						|
    1. Drop the basic auth part. This is technically wrong since a server can
 | 
						|
       serve different content based on auth, but if it does that, it is even
 | 
						|
       impossible to guarantee two URLs without auth are equivalent, since
 | 
						|
       the user can input different auth information when prompted. So the
 | 
						|
       practical solution is to assume the auth doesn't affect the response.
 | 
						|
    2. Parse the query to avoid the ordering issue. Note that ordering under the
 | 
						|
       same key in the query are NOT cleaned; i.e. "a=1&a=2" and "a=2&a=1" are
 | 
						|
       still considered different.
 | 
						|
    3. Explicitly drop most of the fragment part, except ``subdirectory=`` and
 | 
						|
       hash values, since it should have no impact the downloaded content. Note
 | 
						|
       that this drops the "egg=" part historically used to denote the requested
 | 
						|
       project (and extras), which is wrong in the strictest sense, but too many
 | 
						|
       people are supplying it inconsistently to cause superfluous resolution
 | 
						|
       conflicts, so we choose to also ignore them.
 | 
						|
    """
 | 
						|
 | 
						|
    parsed: urllib.parse.SplitResult
 | 
						|
    query: Dict[str, List[str]]
 | 
						|
    subdirectory: str
 | 
						|
    hashes: Dict[str, str]
 | 
						|
 | 
						|
 | 
						|
def _clean_link(link: Link) -> _CleanResult:
 | 
						|
    parsed = link._parsed_url
 | 
						|
    netloc = parsed.netloc.rsplit("@", 1)[-1]
 | 
						|
    # According to RFC 8089, an empty host in file: means localhost.
 | 
						|
    if parsed.scheme == "file" and not netloc:
 | 
						|
        netloc = "localhost"
 | 
						|
    fragment = urllib.parse.parse_qs(parsed.fragment)
 | 
						|
    if "egg" in fragment:
 | 
						|
        logger.debug("Ignoring egg= fragment in %s", link)
 | 
						|
    try:
 | 
						|
        # If there are multiple subdirectory values, use the first one.
 | 
						|
        # This matches the behavior of Link.subdirectory_fragment.
 | 
						|
        subdirectory = fragment["subdirectory"][0]
 | 
						|
    except (IndexError, KeyError):
 | 
						|
        subdirectory = ""
 | 
						|
    # If there are multiple hash values under the same algorithm, use the
 | 
						|
    # first one. This matches the behavior of Link.hash_value.
 | 
						|
    hashes = {k: fragment[k][0] for k in _SUPPORTED_HASHES if k in fragment}
 | 
						|
    return _CleanResult(
 | 
						|
        parsed=parsed._replace(netloc=netloc, query="", fragment=""),
 | 
						|
        query=urllib.parse.parse_qs(parsed.query),
 | 
						|
        subdirectory=subdirectory,
 | 
						|
        hashes=hashes,
 | 
						|
    )
 | 
						|
 | 
						|
 | 
						|
@functools.lru_cache(maxsize=None)
 | 
						|
def links_equivalent(link1: Link, link2: Link) -> bool:
 | 
						|
    return _clean_link(link1) == _clean_link(link2)
 |