mirror of
https://github.com/PiBrewing/craftbeerpi4.git
synced 2025-01-04 03:41:46 +01:00
573 lines
23 KiB
Python
573 lines
23 KiB
Python
import shutil
|
|
import sys
|
|
from enum import Enum
|
|
from io import StringIO
|
|
from itertools import chain
|
|
from pathlib import Path
|
|
from typing import Iterator, Optional, Set, TextIO, Union, cast
|
|
from warnings import warn
|
|
|
|
from isort import core
|
|
|
|
from . import files, identify, io
|
|
from .exceptions import (
|
|
ExistingSyntaxErrors,
|
|
FileSkipComment,
|
|
FileSkipSetting,
|
|
IntroducedSyntaxErrors,
|
|
)
|
|
from .format import ask_whether_to_apply_changes_to_file, create_terminal_printer, show_unified_diff
|
|
from .io import Empty
|
|
from .place import module as place_module # noqa: F401
|
|
from .place import module_with_reason as place_module_with_reason # noqa: F401
|
|
from .settings import DEFAULT_CONFIG, Config
|
|
|
|
|
|
class ImportKey(Enum):
|
|
"""Defines how to key an individual import, generally for deduping.
|
|
|
|
Import keys are defined from less to more specific:
|
|
|
|
from x.y import z as a
|
|
______| | | |
|
|
| | | |
|
|
PACKAGE | | |
|
|
________| | |
|
|
| | |
|
|
MODULE | |
|
|
_________________| |
|
|
| |
|
|
ATTRIBUTE |
|
|
______________________|
|
|
|
|
|
ALIAS
|
|
"""
|
|
|
|
PACKAGE = 1
|
|
MODULE = 2
|
|
ATTRIBUTE = 3
|
|
ALIAS = 4
|
|
|
|
|
|
def sort_code_string(
|
|
code: str,
|
|
extension: Optional[str] = None,
|
|
config: Config = DEFAULT_CONFIG,
|
|
file_path: Optional[Path] = None,
|
|
disregard_skip: bool = False,
|
|
show_diff: Union[bool, TextIO] = False,
|
|
**config_kwargs,
|
|
):
|
|
"""Sorts any imports within the provided code string, returning a new string with them sorted.
|
|
|
|
- **code**: The string of code with imports that need to be sorted.
|
|
- **extension**: The file extension that contains imports. Defaults to filename extension or py.
|
|
- **config**: The config object to use when sorting imports.
|
|
- **file_path**: The disk location where the code string was pulled from.
|
|
- **disregard_skip**: set to `True` if you want to ignore a skip set in config for this file.
|
|
- **show_diff**: If `True` the changes that need to be done will be printed to stdout, if a
|
|
TextIO stream is provided results will be written to it, otherwise no diff will be computed.
|
|
- ****config_kwargs**: Any config modifications.
|
|
"""
|
|
input_stream = StringIO(code)
|
|
output_stream = StringIO()
|
|
config = _config(path=file_path, config=config, **config_kwargs)
|
|
sort_stream(
|
|
input_stream,
|
|
output_stream,
|
|
extension=extension,
|
|
config=config,
|
|
file_path=file_path,
|
|
disregard_skip=disregard_skip,
|
|
show_diff=show_diff,
|
|
)
|
|
output_stream.seek(0)
|
|
return output_stream.read()
|
|
|
|
|
|
def check_code_string(
|
|
code: str,
|
|
show_diff: Union[bool, TextIO] = False,
|
|
extension: Optional[str] = None,
|
|
config: Config = DEFAULT_CONFIG,
|
|
file_path: Optional[Path] = None,
|
|
disregard_skip: bool = False,
|
|
**config_kwargs,
|
|
) -> bool:
|
|
"""Checks the order, format, and categorization of imports within the provided code string.
|
|
Returns `True` if everything is correct, otherwise `False`.
|
|
|
|
- **code**: The string of code with imports that need to be sorted.
|
|
- **show_diff**: If `True` the changes that need to be done will be printed to stdout, if a
|
|
TextIO stream is provided results will be written to it, otherwise no diff will be computed.
|
|
- **extension**: The file extension that contains imports. Defaults to filename extension or py.
|
|
- **config**: The config object to use when sorting imports.
|
|
- **file_path**: The disk location where the code string was pulled from.
|
|
- **disregard_skip**: set to `True` if you want to ignore a skip set in config for this file.
|
|
- ****config_kwargs**: Any config modifications.
|
|
"""
|
|
config = _config(path=file_path, config=config, **config_kwargs)
|
|
return check_stream(
|
|
StringIO(code),
|
|
show_diff=show_diff,
|
|
extension=extension,
|
|
config=config,
|
|
file_path=file_path,
|
|
disregard_skip=disregard_skip,
|
|
)
|
|
|
|
|
|
def sort_stream(
|
|
input_stream: TextIO,
|
|
output_stream: TextIO,
|
|
extension: Optional[str] = None,
|
|
config: Config = DEFAULT_CONFIG,
|
|
file_path: Optional[Path] = None,
|
|
disregard_skip: bool = False,
|
|
show_diff: Union[bool, TextIO] = False,
|
|
**config_kwargs,
|
|
) -> bool:
|
|
"""Sorts any imports within the provided code stream, outputs to the provided output stream.
|
|
Returns `True` if anything is modified from the original input stream, otherwise `False`.
|
|
|
|
- **input_stream**: The stream of code with imports that need to be sorted.
|
|
- **output_stream**: The stream where sorted imports should be written to.
|
|
- **extension**: The file extension that contains imports. Defaults to filename extension or py.
|
|
- **config**: The config object to use when sorting imports.
|
|
- **file_path**: The disk location where the code string was pulled from.
|
|
- **disregard_skip**: set to `True` if you want to ignore a skip set in config for this file.
|
|
- **show_diff**: If `True` the changes that need to be done will be printed to stdout, if a
|
|
TextIO stream is provided results will be written to it, otherwise no diff will be computed.
|
|
- ****config_kwargs**: Any config modifications.
|
|
"""
|
|
if show_diff:
|
|
_output_stream = StringIO()
|
|
_input_stream = StringIO(input_stream.read())
|
|
changed = sort_stream(
|
|
input_stream=_input_stream,
|
|
output_stream=_output_stream,
|
|
extension=extension,
|
|
config=config,
|
|
file_path=file_path,
|
|
disregard_skip=disregard_skip,
|
|
**config_kwargs,
|
|
)
|
|
_output_stream.seek(0)
|
|
_input_stream.seek(0)
|
|
show_unified_diff(
|
|
file_input=_input_stream.read(),
|
|
file_output=_output_stream.read(),
|
|
file_path=file_path,
|
|
output=output_stream if show_diff is True else cast(TextIO, show_diff),
|
|
color_output=config.color_output,
|
|
)
|
|
return changed
|
|
|
|
config = _config(path=file_path, config=config, **config_kwargs)
|
|
content_source = str(file_path or "Passed in content")
|
|
if not disregard_skip:
|
|
if file_path and config.is_skipped(file_path):
|
|
raise FileSkipSetting(content_source)
|
|
|
|
_internal_output = output_stream
|
|
|
|
if config.atomic:
|
|
try:
|
|
file_content = input_stream.read()
|
|
compile(file_content, content_source, "exec", 0, 1)
|
|
input_stream = StringIO(file_content)
|
|
except SyntaxError:
|
|
raise ExistingSyntaxErrors(content_source)
|
|
|
|
if not output_stream.readable():
|
|
_internal_output = StringIO()
|
|
|
|
try:
|
|
changed = core.process(
|
|
input_stream,
|
|
_internal_output,
|
|
extension=extension or (file_path and file_path.suffix.lstrip(".")) or "py",
|
|
config=config,
|
|
)
|
|
except FileSkipComment:
|
|
raise FileSkipComment(content_source)
|
|
|
|
if config.atomic:
|
|
_internal_output.seek(0)
|
|
try:
|
|
compile(_internal_output.read(), content_source, "exec", 0, 1)
|
|
_internal_output.seek(0)
|
|
if _internal_output != output_stream:
|
|
output_stream.write(_internal_output.read())
|
|
except SyntaxError: # pragma: no cover
|
|
raise IntroducedSyntaxErrors(content_source)
|
|
|
|
return changed
|
|
|
|
|
|
def check_stream(
|
|
input_stream: TextIO,
|
|
show_diff: Union[bool, TextIO] = False,
|
|
extension: Optional[str] = None,
|
|
config: Config = DEFAULT_CONFIG,
|
|
file_path: Optional[Path] = None,
|
|
disregard_skip: bool = False,
|
|
**config_kwargs,
|
|
) -> bool:
|
|
"""Checks any imports within the provided code stream, returning `False` if any unsorted or
|
|
incorrectly imports are found or `True` if no problems are identified.
|
|
|
|
- **input_stream**: The stream of code with imports that need to be sorted.
|
|
- **show_diff**: If `True` the changes that need to be done will be printed to stdout, if a
|
|
TextIO stream is provided results will be written to it, otherwise no diff will be computed.
|
|
- **extension**: The file extension that contains imports. Defaults to filename extension or py.
|
|
- **config**: The config object to use when sorting imports.
|
|
- **file_path**: The disk location where the code string was pulled from.
|
|
- **disregard_skip**: set to `True` if you want to ignore a skip set in config for this file.
|
|
- ****config_kwargs**: Any config modifications.
|
|
"""
|
|
config = _config(path=file_path, config=config, **config_kwargs)
|
|
|
|
if show_diff:
|
|
input_stream = StringIO(input_stream.read())
|
|
|
|
changed: bool = sort_stream(
|
|
input_stream=input_stream,
|
|
output_stream=Empty,
|
|
extension=extension,
|
|
config=config,
|
|
file_path=file_path,
|
|
disregard_skip=disregard_skip,
|
|
)
|
|
printer = create_terminal_printer(color=config.color_output)
|
|
if not changed:
|
|
if config.verbose and not config.only_modified:
|
|
printer.success(f"{file_path or ''} Everything Looks Good!")
|
|
return True
|
|
|
|
printer.error(f"{file_path or ''} Imports are incorrectly sorted and/or formatted.")
|
|
if show_diff:
|
|
output_stream = StringIO()
|
|
input_stream.seek(0)
|
|
file_contents = input_stream.read()
|
|
sort_stream(
|
|
input_stream=StringIO(file_contents),
|
|
output_stream=output_stream,
|
|
extension=extension,
|
|
config=config,
|
|
file_path=file_path,
|
|
disregard_skip=disregard_skip,
|
|
)
|
|
output_stream.seek(0)
|
|
|
|
show_unified_diff(
|
|
file_input=file_contents,
|
|
file_output=output_stream.read(),
|
|
file_path=file_path,
|
|
output=None if show_diff is True else cast(TextIO, show_diff),
|
|
color_output=config.color_output,
|
|
)
|
|
return False
|
|
|
|
|
|
def check_file(
|
|
filename: Union[str, Path],
|
|
show_diff: Union[bool, TextIO] = False,
|
|
config: Config = DEFAULT_CONFIG,
|
|
file_path: Optional[Path] = None,
|
|
disregard_skip: bool = True,
|
|
extension: Optional[str] = None,
|
|
**config_kwargs,
|
|
) -> bool:
|
|
"""Checks any imports within the provided file, returning `False` if any unsorted or
|
|
incorrectly imports are found or `True` if no problems are identified.
|
|
|
|
- **filename**: The name or Path of the file to check.
|
|
- **show_diff**: If `True` the changes that need to be done will be printed to stdout, if a
|
|
TextIO stream is provided results will be written to it, otherwise no diff will be computed.
|
|
- **config**: The config object to use when sorting imports.
|
|
- **file_path**: The disk location where the code string was pulled from.
|
|
- **disregard_skip**: set to `True` if you want to ignore a skip set in config for this file.
|
|
- **extension**: The file extension that contains imports. Defaults to filename extension or py.
|
|
- ****config_kwargs**: Any config modifications.
|
|
"""
|
|
with io.File.read(filename) as source_file:
|
|
return check_stream(
|
|
source_file.stream,
|
|
show_diff=show_diff,
|
|
extension=extension,
|
|
config=config,
|
|
file_path=file_path or source_file.path,
|
|
disregard_skip=disregard_skip,
|
|
**config_kwargs,
|
|
)
|
|
|
|
|
|
def sort_file(
|
|
filename: Union[str, Path],
|
|
extension: Optional[str] = None,
|
|
config: Config = DEFAULT_CONFIG,
|
|
file_path: Optional[Path] = None,
|
|
disregard_skip: bool = True,
|
|
ask_to_apply: bool = False,
|
|
show_diff: Union[bool, TextIO] = False,
|
|
write_to_stdout: bool = False,
|
|
output: Optional[TextIO] = None,
|
|
**config_kwargs,
|
|
) -> bool:
|
|
"""Sorts and formats any groups of imports imports within the provided file or Path.
|
|
Returns `True` if the file has been changed, otherwise `False`.
|
|
|
|
- **filename**: The name or Path of the file to format.
|
|
- **extension**: The file extension that contains imports. Defaults to filename extension or py.
|
|
- **config**: The config object to use when sorting imports.
|
|
- **file_path**: The disk location where the code string was pulled from.
|
|
- **disregard_skip**: set to `True` if you want to ignore a skip set in config for this file.
|
|
- **ask_to_apply**: If `True`, prompt before applying any changes.
|
|
- **show_diff**: If `True` the changes that need to be done will be printed to stdout, if a
|
|
TextIO stream is provided results will be written to it, otherwise no diff will be computed.
|
|
- **write_to_stdout**: If `True`, write to stdout instead of the input file.
|
|
- **output**: If a TextIO is provided, results will be written there rather than replacing
|
|
the original file content.
|
|
- ****config_kwargs**: Any config modifications.
|
|
"""
|
|
with io.File.read(filename) as source_file:
|
|
actual_file_path = file_path or source_file.path
|
|
config = _config(path=actual_file_path, config=config, **config_kwargs)
|
|
changed: bool = False
|
|
try:
|
|
if write_to_stdout:
|
|
changed = sort_stream(
|
|
input_stream=source_file.stream,
|
|
output_stream=sys.stdout,
|
|
config=config,
|
|
file_path=actual_file_path,
|
|
disregard_skip=disregard_skip,
|
|
extension=extension,
|
|
)
|
|
else:
|
|
if output is None:
|
|
tmp_file = source_file.path.with_suffix(source_file.path.suffix + ".isorted")
|
|
try:
|
|
with tmp_file.open(
|
|
"w", encoding=source_file.encoding, newline=""
|
|
) as output_stream:
|
|
shutil.copymode(filename, tmp_file)
|
|
changed = sort_stream(
|
|
input_stream=source_file.stream,
|
|
output_stream=output_stream,
|
|
config=config,
|
|
file_path=actual_file_path,
|
|
disregard_skip=disregard_skip,
|
|
extension=extension,
|
|
)
|
|
if changed:
|
|
if show_diff or ask_to_apply:
|
|
source_file.stream.seek(0)
|
|
with tmp_file.open(
|
|
encoding=source_file.encoding, newline=""
|
|
) as tmp_out:
|
|
show_unified_diff(
|
|
file_input=source_file.stream.read(),
|
|
file_output=tmp_out.read(),
|
|
file_path=actual_file_path,
|
|
output=None
|
|
if show_diff is True
|
|
else cast(TextIO, show_diff),
|
|
color_output=config.color_output,
|
|
)
|
|
if show_diff or (
|
|
ask_to_apply
|
|
and not ask_whether_to_apply_changes_to_file(
|
|
str(source_file.path)
|
|
)
|
|
):
|
|
return False
|
|
source_file.stream.close()
|
|
tmp_file.replace(source_file.path)
|
|
if not config.quiet:
|
|
print(f"Fixing {source_file.path}")
|
|
finally:
|
|
try: # Python 3.8+: use `missing_ok=True` instead of try except.
|
|
tmp_file.unlink()
|
|
except FileNotFoundError:
|
|
pass # pragma: no cover
|
|
else:
|
|
changed = sort_stream(
|
|
input_stream=source_file.stream,
|
|
output_stream=output,
|
|
config=config,
|
|
file_path=actual_file_path,
|
|
disregard_skip=disregard_skip,
|
|
extension=extension,
|
|
)
|
|
if changed:
|
|
if show_diff:
|
|
source_file.stream.seek(0)
|
|
output.seek(0)
|
|
show_unified_diff(
|
|
file_input=source_file.stream.read(),
|
|
file_output=output.read(),
|
|
file_path=actual_file_path,
|
|
output=None if show_diff is True else cast(TextIO, show_diff),
|
|
color_output=config.color_output,
|
|
)
|
|
source_file.stream.close()
|
|
|
|
except ExistingSyntaxErrors:
|
|
warn(f"{actual_file_path} unable to sort due to existing syntax errors")
|
|
except IntroducedSyntaxErrors: # pragma: no cover
|
|
warn(f"{actual_file_path} unable to sort as isort introduces new syntax errors")
|
|
|
|
return changed
|
|
|
|
|
|
def find_imports_in_code(
|
|
code: str,
|
|
config: Config = DEFAULT_CONFIG,
|
|
file_path: Optional[Path] = None,
|
|
unique: Union[bool, ImportKey] = False,
|
|
top_only: bool = False,
|
|
**config_kwargs,
|
|
) -> Iterator[identify.Import]:
|
|
"""Finds and returns all imports within the provided code string.
|
|
|
|
- **code**: The string of code with imports that need to be sorted.
|
|
- **config**: The config object to use when sorting imports.
|
|
- **file_path**: The disk location where the code string was pulled from.
|
|
- **unique**: If True, only the first instance of an import is returned.
|
|
- **top_only**: If True, only return imports that occur before the first function or class.
|
|
- ****config_kwargs**: Any config modifications.
|
|
"""
|
|
yield from find_imports_in_stream(
|
|
input_stream=StringIO(code),
|
|
config=config,
|
|
file_path=file_path,
|
|
unique=unique,
|
|
top_only=top_only,
|
|
**config_kwargs,
|
|
)
|
|
|
|
|
|
def find_imports_in_stream(
|
|
input_stream: TextIO,
|
|
config: Config = DEFAULT_CONFIG,
|
|
file_path: Optional[Path] = None,
|
|
unique: Union[bool, ImportKey] = False,
|
|
top_only: bool = False,
|
|
_seen: Optional[Set[str]] = None,
|
|
**config_kwargs,
|
|
) -> Iterator[identify.Import]:
|
|
"""Finds and returns all imports within the provided code stream.
|
|
|
|
- **input_stream**: The stream of code with imports that need to be sorted.
|
|
- **config**: The config object to use when sorting imports.
|
|
- **file_path**: The disk location where the code string was pulled from.
|
|
- **unique**: If True, only the first instance of an import is returned.
|
|
- **top_only**: If True, only return imports that occur before the first function or class.
|
|
- **_seen**: An optional set of imports already seen. Generally meant only for internal use.
|
|
- ****config_kwargs**: Any config modifications.
|
|
"""
|
|
config = _config(config=config, **config_kwargs)
|
|
identified_imports = identify.imports(
|
|
input_stream, config=config, file_path=file_path, top_only=top_only
|
|
)
|
|
if not unique:
|
|
yield from identified_imports
|
|
|
|
seen: Set[str] = set() if _seen is None else _seen
|
|
for identified_import in identified_imports:
|
|
if unique in (True, ImportKey.ALIAS):
|
|
key = identified_import.statement()
|
|
elif unique == ImportKey.ATTRIBUTE:
|
|
key = f"{identified_import.module}.{identified_import.attribute}"
|
|
elif unique == ImportKey.MODULE:
|
|
key = identified_import.module
|
|
elif unique == ImportKey.PACKAGE:
|
|
key = identified_import.module.split(".")[0]
|
|
|
|
if key and key not in seen:
|
|
seen.add(key)
|
|
yield identified_import
|
|
|
|
|
|
def find_imports_in_file(
|
|
filename: Union[str, Path],
|
|
config: Config = DEFAULT_CONFIG,
|
|
file_path: Optional[Path] = None,
|
|
unique: Union[bool, ImportKey] = False,
|
|
top_only: bool = False,
|
|
**config_kwargs,
|
|
) -> Iterator[identify.Import]:
|
|
"""Finds and returns all imports within the provided source file.
|
|
|
|
- **filename**: The name or Path of the file to look for imports in.
|
|
- **extension**: The file extension that contains imports. Defaults to filename extension or py.
|
|
- **config**: The config object to use when sorting imports.
|
|
- **file_path**: The disk location where the code string was pulled from.
|
|
- **unique**: If True, only the first instance of an import is returned.
|
|
- **top_only**: If True, only return imports that occur before the first function or class.
|
|
- ****config_kwargs**: Any config modifications.
|
|
"""
|
|
with io.File.read(filename) as source_file:
|
|
yield from find_imports_in_stream(
|
|
input_stream=source_file.stream,
|
|
config=config,
|
|
file_path=file_path or source_file.path,
|
|
unique=unique,
|
|
top_only=top_only,
|
|
**config_kwargs,
|
|
)
|
|
|
|
|
|
def find_imports_in_paths(
|
|
paths: Iterator[Union[str, Path]],
|
|
config: Config = DEFAULT_CONFIG,
|
|
file_path: Optional[Path] = None,
|
|
unique: Union[bool, ImportKey] = False,
|
|
top_only: bool = False,
|
|
**config_kwargs,
|
|
) -> Iterator[identify.Import]:
|
|
"""Finds and returns all imports within the provided source paths.
|
|
|
|
- **paths**: A collection of paths to recursively look for imports within.
|
|
- **extension**: The file extension that contains imports. Defaults to filename extension or py.
|
|
- **config**: The config object to use when sorting imports.
|
|
- **file_path**: The disk location where the code string was pulled from.
|
|
- **unique**: If True, only the first instance of an import is returned.
|
|
- **top_only**: If True, only return imports that occur before the first function or class.
|
|
- ****config_kwargs**: Any config modifications.
|
|
"""
|
|
config = _config(config=config, **config_kwargs)
|
|
seen: Optional[Set[str]] = set() if unique else None
|
|
yield from chain(
|
|
*(
|
|
find_imports_in_file(
|
|
file_name, unique=unique, config=config, top_only=top_only, _seen=seen
|
|
)
|
|
for file_name in files.find(map(str, paths), config, [], [])
|
|
)
|
|
)
|
|
|
|
|
|
def _config(
|
|
path: Optional[Path] = None, config: Config = DEFAULT_CONFIG, **config_kwargs
|
|
) -> Config:
|
|
if path:
|
|
if (
|
|
config is DEFAULT_CONFIG
|
|
and "settings_path" not in config_kwargs
|
|
and "settings_file" not in config_kwargs
|
|
):
|
|
config_kwargs["settings_path"] = path
|
|
|
|
if config_kwargs:
|
|
if config is not DEFAULT_CONFIG:
|
|
raise ValueError(
|
|
"You can either specify custom configuration options using kwargs or "
|
|
"passing in a Config object. Not Both!"
|
|
)
|
|
|
|
config = Config(**config_kwargs)
|
|
|
|
return config
|