You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					158 lines
				
				5.0 KiB
			
		
		
			
		
	
	
					158 lines
				
				5.0 KiB
			| 
								 
											3 years ago
										 
									 | 
							
								import contextlib
							 | 
						||
| 
								 | 
							
								import itertools
							 | 
						||
| 
								 | 
							
								import logging
							 | 
						||
| 
								 | 
							
								import sys
							 | 
						||
| 
								 | 
							
								import time
							 | 
						||
| 
								 | 
							
								from typing import IO, Iterator
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								from pip._vendor.progress import HIDE_CURSOR, SHOW_CURSOR
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								from pip._internal.utils.compat import WINDOWS
							 | 
						||
| 
								 | 
							
								from pip._internal.utils.logging import get_indentation
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								logger = logging.getLogger(__name__)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								class SpinnerInterface:
							 | 
						||
| 
								 | 
							
								    def spin(self) -> None:
							 | 
						||
| 
								 | 
							
								        raise NotImplementedError()
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    def finish(self, final_status: str) -> None:
							 | 
						||
| 
								 | 
							
								        raise NotImplementedError()
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								class InteractiveSpinner(SpinnerInterface):
							 | 
						||
| 
								 | 
							
								    def __init__(
							 | 
						||
| 
								 | 
							
								        self,
							 | 
						||
| 
								 | 
							
								        message: str,
							 | 
						||
| 
								 | 
							
								        file: IO[str] = None,
							 | 
						||
| 
								 | 
							
								        spin_chars: str = "-\\|/",
							 | 
						||
| 
								 | 
							
								        # Empirically, 8 updates/second looks nice
							 | 
						||
| 
								 | 
							
								        min_update_interval_seconds: float = 0.125,
							 | 
						||
| 
								 | 
							
								    ):
							 | 
						||
| 
								 | 
							
								        self._message = message
							 | 
						||
| 
								 | 
							
								        if file is None:
							 | 
						||
| 
								 | 
							
								            file = sys.stdout
							 | 
						||
| 
								 | 
							
								        self._file = file
							 | 
						||
| 
								 | 
							
								        self._rate_limiter = RateLimiter(min_update_interval_seconds)
							 | 
						||
| 
								 | 
							
								        self._finished = False
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        self._spin_cycle = itertools.cycle(spin_chars)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        self._file.write(" " * get_indentation() + self._message + " ... ")
							 | 
						||
| 
								 | 
							
								        self._width = 0
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    def _write(self, status: str) -> None:
							 | 
						||
| 
								 | 
							
								        assert not self._finished
							 | 
						||
| 
								 | 
							
								        # Erase what we wrote before by backspacing to the beginning, writing
							 | 
						||
| 
								 | 
							
								        # spaces to overwrite the old text, and then backspacing again
							 | 
						||
| 
								 | 
							
								        backup = "\b" * self._width
							 | 
						||
| 
								 | 
							
								        self._file.write(backup + " " * self._width + backup)
							 | 
						||
| 
								 | 
							
								        # Now we have a blank slate to add our status
							 | 
						||
| 
								 | 
							
								        self._file.write(status)
							 | 
						||
| 
								 | 
							
								        self._width = len(status)
							 | 
						||
| 
								 | 
							
								        self._file.flush()
							 | 
						||
| 
								 | 
							
								        self._rate_limiter.reset()
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    def spin(self) -> None:
							 | 
						||
| 
								 | 
							
								        if self._finished:
							 | 
						||
| 
								 | 
							
								            return
							 | 
						||
| 
								 | 
							
								        if not self._rate_limiter.ready():
							 | 
						||
| 
								 | 
							
								            return
							 | 
						||
| 
								 | 
							
								        self._write(next(self._spin_cycle))
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    def finish(self, final_status: str) -> None:
							 | 
						||
| 
								 | 
							
								        if self._finished:
							 | 
						||
| 
								 | 
							
								            return
							 | 
						||
| 
								 | 
							
								        self._write(final_status)
							 | 
						||
| 
								 | 
							
								        self._file.write("\n")
							 | 
						||
| 
								 | 
							
								        self._file.flush()
							 | 
						||
| 
								 | 
							
								        self._finished = True
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								# Used for dumb terminals, non-interactive installs (no tty), etc.
							 | 
						||
| 
								 | 
							
								# We still print updates occasionally (once every 60 seconds by default) to
							 | 
						||
| 
								 | 
							
								# act as a keep-alive for systems like Travis-CI that take lack-of-output as
							 | 
						||
| 
								 | 
							
								# an indication that a task has frozen.
							 | 
						||
| 
								 | 
							
								class NonInteractiveSpinner(SpinnerInterface):
							 | 
						||
| 
								 | 
							
								    def __init__(self, message: str, min_update_interval_seconds: float = 60.0) -> None:
							 | 
						||
| 
								 | 
							
								        self._message = message
							 | 
						||
| 
								 | 
							
								        self._finished = False
							 | 
						||
| 
								 | 
							
								        self._rate_limiter = RateLimiter(min_update_interval_seconds)
							 | 
						||
| 
								 | 
							
								        self._update("started")
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    def _update(self, status: str) -> None:
							 | 
						||
| 
								 | 
							
								        assert not self._finished
							 | 
						||
| 
								 | 
							
								        self._rate_limiter.reset()
							 | 
						||
| 
								 | 
							
								        logger.info("%s: %s", self._message, status)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    def spin(self) -> None:
							 | 
						||
| 
								 | 
							
								        if self._finished:
							 | 
						||
| 
								 | 
							
								            return
							 | 
						||
| 
								 | 
							
								        if not self._rate_limiter.ready():
							 | 
						||
| 
								 | 
							
								            return
							 | 
						||
| 
								 | 
							
								        self._update("still running...")
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    def finish(self, final_status: str) -> None:
							 | 
						||
| 
								 | 
							
								        if self._finished:
							 | 
						||
| 
								 | 
							
								            return
							 | 
						||
| 
								 | 
							
								        self._update(f"finished with status '{final_status}'")
							 | 
						||
| 
								 | 
							
								        self._finished = True
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								class RateLimiter:
							 | 
						||
| 
								 | 
							
								    def __init__(self, min_update_interval_seconds: float) -> None:
							 | 
						||
| 
								 | 
							
								        self._min_update_interval_seconds = min_update_interval_seconds
							 | 
						||
| 
								 | 
							
								        self._last_update: float = 0
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    def ready(self) -> bool:
							 | 
						||
| 
								 | 
							
								        now = time.time()
							 | 
						||
| 
								 | 
							
								        delta = now - self._last_update
							 | 
						||
| 
								 | 
							
								        return delta >= self._min_update_interval_seconds
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    def reset(self) -> None:
							 | 
						||
| 
								 | 
							
								        self._last_update = time.time()
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								@contextlib.contextmanager
							 | 
						||
| 
								 | 
							
								def open_spinner(message: str) -> Iterator[SpinnerInterface]:
							 | 
						||
| 
								 | 
							
								    # Interactive spinner goes directly to sys.stdout rather than being routed
							 | 
						||
| 
								 | 
							
								    # through the logging system, but it acts like it has level INFO,
							 | 
						||
| 
								 | 
							
								    # i.e. it's only displayed if we're at level INFO or better.
							 | 
						||
| 
								 | 
							
								    # Non-interactive spinner goes through the logging system, so it is always
							 | 
						||
| 
								 | 
							
								    # in sync with logging configuration.
							 | 
						||
| 
								 | 
							
								    if sys.stdout.isatty() and logger.getEffectiveLevel() <= logging.INFO:
							 | 
						||
| 
								 | 
							
								        spinner: SpinnerInterface = InteractiveSpinner(message)
							 | 
						||
| 
								 | 
							
								    else:
							 | 
						||
| 
								 | 
							
								        spinner = NonInteractiveSpinner(message)
							 | 
						||
| 
								 | 
							
								    try:
							 | 
						||
| 
								 | 
							
								        with hidden_cursor(sys.stdout):
							 | 
						||
| 
								 | 
							
								            yield spinner
							 | 
						||
| 
								 | 
							
								    except KeyboardInterrupt:
							 | 
						||
| 
								 | 
							
								        spinner.finish("canceled")
							 | 
						||
| 
								 | 
							
								        raise
							 | 
						||
| 
								 | 
							
								    except Exception:
							 | 
						||
| 
								 | 
							
								        spinner.finish("error")
							 | 
						||
| 
								 | 
							
								        raise
							 | 
						||
| 
								 | 
							
								    else:
							 | 
						||
| 
								 | 
							
								        spinner.finish("done")
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								@contextlib.contextmanager
							 | 
						||
| 
								 | 
							
								def hidden_cursor(file: IO[str]) -> Iterator[None]:
							 | 
						||
| 
								 | 
							
								    # The Windows terminal does not support the hide/show cursor ANSI codes,
							 | 
						||
| 
								 | 
							
								    # even via colorama. So don't even try.
							 | 
						||
| 
								 | 
							
								    if WINDOWS:
							 | 
						||
| 
								 | 
							
								        yield
							 | 
						||
| 
								 | 
							
								    # We don't want to clutter the output with control characters if we're
							 | 
						||
| 
								 | 
							
								    # writing to a file, or if the user is running with --quiet.
							 | 
						||
| 
								 | 
							
								    # See https://github.com/pypa/pip/issues/3418
							 | 
						||
| 
								 | 
							
								    elif not file.isatty() or logger.getEffectiveLevel() > logging.INFO:
							 | 
						||
| 
								 | 
							
								        yield
							 | 
						||
| 
								 | 
							
								    else:
							 | 
						||
| 
								 | 
							
								        file.write(HIDE_CURSOR)
							 | 
						||
| 
								 | 
							
								        try:
							 | 
						||
| 
								 | 
							
								            yield
							 | 
						||
| 
								 | 
							
								        finally:
							 | 
						||
| 
								 | 
							
								            file.write(SHOW_CURSOR)
							 |