|
| 1 | +"""Git source control management implementation.""" |
| 2 | + |
| 3 | +import os |
| 4 | +import re |
| 5 | +import shlex |
| 6 | +import subprocess |
| 7 | +from pathlib import Path |
| 8 | +from tempfile import NamedTemporaryFile |
| 9 | +from typing import Any, ClassVar, MutableMapping, Optional |
| 10 | + |
| 11 | +from bumpversion.exceptions import DirtyWorkingDirectoryError |
| 12 | +from bumpversion.scm.models import LatestTagInfo, SCMConfig |
| 13 | +from bumpversion.ui import get_indented_logger |
| 14 | +from bumpversion.utils import format_and_raise_error, is_subpath, run_command |
| 15 | + |
| 16 | +logger = get_indented_logger(__name__) |
| 17 | + |
| 18 | + |
| 19 | +class Git: |
| 20 | + """Git implementation.""" |
| 21 | + |
| 22 | + _TEST_AVAILABLE_COMMAND: ClassVar[list[str]] = ["git", "rev-parse", "--git-dir"] |
| 23 | + _COMMIT_COMMAND: ClassVar[list[str]] = ["git", "commit", "-F"] |
| 24 | + _ALL_TAGS_COMMAND: ClassVar[list[str]] = ["git", "tag", "--list"] |
| 25 | + |
| 26 | + def __init__(self, config: SCMConfig): |
| 27 | + self.config = config |
| 28 | + self._latest_tag_info: Optional[LatestTagInfo] = None |
| 29 | + |
| 30 | + def is_available(self) -> bool: |
| 31 | + """Is the VCS implementation usable?""" |
| 32 | + try: |
| 33 | + result = run_command(self._TEST_AVAILABLE_COMMAND) |
| 34 | + return result.returncode == 0 |
| 35 | + except (FileNotFoundError, PermissionError, NotADirectoryError, subprocess.CalledProcessError): |
| 36 | + return False |
| 37 | + |
| 38 | + def latest_tag_info(self) -> LatestTagInfo: |
| 39 | + """Return information about the latest tag.""" |
| 40 | + if self._latest_tag_info is not None: |
| 41 | + return self._latest_tag_info |
| 42 | + |
| 43 | + if not self.is_available(): |
| 44 | + return LatestTagInfo() |
| 45 | + |
| 46 | + info: dict[str, Any] = {} |
| 47 | + update_index() |
| 48 | + |
| 49 | + info |= commit_info(self.config) |
| 50 | + info |= revision_info() |
| 51 | + |
| 52 | + self._latest_tag_info = LatestTagInfo(**info) |
| 53 | + return self._latest_tag_info |
| 54 | + |
| 55 | + def add_path(self, path: str | Path) -> None: |
| 56 | + """Add a path to the VCS.""" |
| 57 | + repository_root = self.latest_tag_info().repository_root |
| 58 | + if not (repository_root and is_subpath(repository_root, path)): |
| 59 | + return |
| 60 | + |
| 61 | + cwd = Path.cwd() |
| 62 | + temp_path = os.path.relpath(path, cwd) |
| 63 | + try: |
| 64 | + run_command(["git", "add", "--update", str(temp_path)]) |
| 65 | + except subprocess.CalledProcessError as e: |
| 66 | + format_and_raise_error(e) |
| 67 | + |
| 68 | + def get_all_tags(self) -> list[str]: |
| 69 | + """Return all tags in git.""" |
| 70 | + try: |
| 71 | + result = run_command(self._ALL_TAGS_COMMAND) |
| 72 | + return result.stdout.splitlines() |
| 73 | + except (FileNotFoundError, PermissionError, NotADirectoryError, subprocess.CalledProcessError): |
| 74 | + return [] |
| 75 | + |
| 76 | + def commit_and_tag(self, files: list[Path | str], context: MutableMapping, dry_run: bool = False) -> None: |
| 77 | + """Commit and tag files to the repository using the configuration.""" |
| 78 | + if dry_run: |
| 79 | + return |
| 80 | + |
| 81 | + if self.config.commit: |
| 82 | + for path in files: |
| 83 | + self.add_path(path) |
| 84 | + |
| 85 | + self.commit(context) |
| 86 | + |
| 87 | + if self.config.tag: |
| 88 | + tag_name = self.config.tag_name.format(**context) |
| 89 | + tag_message = self.config.tag_message.format(**context) |
| 90 | + tag(tag_name, sign=self.config.sign_tags, message=tag_message) |
| 91 | + |
| 92 | + for m_tag_name in self.config.moveable_tags: |
| 93 | + moveable_tag(m_tag_name) |
| 94 | + |
| 95 | + def assert_nondirty(self) -> None: |
| 96 | + """ |
| 97 | + Asserts that the repository is not dirty. |
| 98 | +
|
| 99 | + Raises: |
| 100 | + DirtyWorkingDirectoryError: If the repository is not clean. |
| 101 | + """ |
| 102 | + assert_nondirty() |
| 103 | + |
| 104 | + def commit(self, context: MutableMapping) -> None: |
| 105 | + """Commit the changes.""" |
| 106 | + extra_args = shlex.split(self.config.commit_args) if self.config.commit_args else [] |
| 107 | + |
| 108 | + current_version = context.get("current_version", "") |
| 109 | + new_version = context.get("new_version", "") |
| 110 | + commit_message = self.config.message.format(**context) |
| 111 | + |
| 112 | + if not current_version: |
| 113 | + logger.warning("No current version given, using an empty string.") |
| 114 | + if not new_version: |
| 115 | + logger.warning("No new version given, using an empty string.") |
| 116 | + |
| 117 | + with NamedTemporaryFile("wb", delete=False) as f: |
| 118 | + f.write(commit_message.encode("utf-8")) |
| 119 | + |
| 120 | + env = os.environ.copy() |
| 121 | + env["BUMPVERSION_CURRENT_VERSION"] = current_version |
| 122 | + env["BUMPVERSION_NEW_VERSION"] = new_version |
| 123 | + |
| 124 | + try: |
| 125 | + cmd = [*self._COMMIT_COMMAND, f.name, *extra_args] |
| 126 | + run_command(cmd, env=env) |
| 127 | + except (subprocess.CalledProcessError, TypeError) as exc: # pragma: no-coverage |
| 128 | + format_and_raise_error(exc) |
| 129 | + finally: |
| 130 | + os.unlink(f.name) |
| 131 | + |
| 132 | + |
| 133 | +def update_index() -> None: |
| 134 | + """Update the git index.""" |
| 135 | + try: |
| 136 | + run_command(["git", "update-index", "--refresh", "-q"]) |
| 137 | + except subprocess.CalledProcessError as e: |
| 138 | + logger.debug("Error when running git update-index: %s", e.stderr) |
| 139 | + |
| 140 | + |
| 141 | +def commit_info(config: SCMConfig) -> dict: |
| 142 | + """ |
| 143 | + Get the commit info for the repo. |
| 144 | +
|
| 145 | + Args: |
| 146 | + config: The source control configuration. |
| 147 | +
|
| 148 | + Returns: |
| 149 | + A dictionary containing information about the latest commit. |
| 150 | + """ |
| 151 | + tag_pattern = config.tag_name.replace("{new_version}", "*") |
| 152 | + info = dict.fromkeys(["dirty", "commit_sha", "distance_to_latest_tag", "current_version", "current_tag"]) |
| 153 | + info["distance_to_latest_tag"] = 0 |
| 154 | + try: |
| 155 | + git_cmd = ["git", "describe", "--dirty", "--tags", "--long", "--abbrev=40", f"--match={tag_pattern}"] |
| 156 | + result = run_command(git_cmd) |
| 157 | + except subprocess.CalledProcessError as e: |
| 158 | + if e.stderr and "fatal: no names found, cannot describe anything." in e.stderr: |
| 159 | + logger.debug("No tags found, returning default values.") |
| 160 | + else: |
| 161 | + logger.debug("Error when running git describe: %s", e.stderr) |
| 162 | + return info |
| 163 | + |
| 164 | + describe_out = result.stdout.strip().split("-") |
| 165 | + if describe_out[-1].strip() == "dirty": |
| 166 | + info["dirty"] = True |
| 167 | + describe_out.pop() |
| 168 | + else: |
| 169 | + info["dirty"] = False |
| 170 | + |
| 171 | + info["commit_sha"] = describe_out.pop().lstrip("g") |
| 172 | + info["distance_to_latest_tag"] = int(describe_out.pop()) |
| 173 | + info["current_tag"] = "-".join(describe_out) |
| 174 | + version = config.get_version_from_tag("-".join(describe_out)) |
| 175 | + info["current_version"] = version or "-".join(describe_out).lstrip("v") |
| 176 | + |
| 177 | + return info |
| 178 | + |
| 179 | + |
| 180 | +def revision_info() -> dict: |
| 181 | + """ |
| 182 | + Returns a dictionary containing revision information. |
| 183 | +
|
| 184 | + If an error occurs while running the git command, the dictionary values will be set to None. |
| 185 | +
|
| 186 | + Returns: |
| 187 | + A dictionary with the following keys: |
| 188 | + - branch_name: The name of the current branch. |
| 189 | + - short_branch_name: A 20 lowercase characters of the branch name with special characters removed. |
| 190 | + - repository_root: The root directory of the Git repository. |
| 191 | + """ |
| 192 | + info = dict.fromkeys(["branch_name", "short_branch_name", "repository_root"]) |
| 193 | + repo_root_command = ["git", "rev-parse", "--show-toplevel"] |
| 194 | + current_branch_command = ["git", "branch", "--show-current"] |
| 195 | + |
| 196 | + try: |
| 197 | + repository_root_result = run_command(repo_root_command) |
| 198 | + except subprocess.CalledProcessError as e: |
| 199 | + logger.debug("Error when determining the repository root: %s", e.stderr) |
| 200 | + return info |
| 201 | + |
| 202 | + try: |
| 203 | + branch_name_result = run_command(current_branch_command) |
| 204 | + except subprocess.CalledProcessError as e: |
| 205 | + logger.debug("Error when determining the current branch: %s", e.stderr) |
| 206 | + return info |
| 207 | + |
| 208 | + repository_root = Path(repository_root_result.stdout.strip()) |
| 209 | + branch_name = branch_name_result.stdout.strip() |
| 210 | + short_branch_name = re.sub(r"([^a-zA-Z0-9]*)", "", branch_name).lower()[:20] |
| 211 | + info["branch_name"] = branch_name |
| 212 | + info["short_branch_name"] = short_branch_name |
| 213 | + info["repository_root"] = repository_root |
| 214 | + |
| 215 | + return info |
| 216 | + |
| 217 | + |
| 218 | +def tag(name: str, sign: bool = False, message: Optional[str] = None) -> None: |
| 219 | + """ |
| 220 | + Create a tag of the new_version in git. |
| 221 | +
|
| 222 | + If only name is given, bumpversion uses a lightweight tag. |
| 223 | + Otherwise, it uses an annotated tag. |
| 224 | +
|
| 225 | + Args: |
| 226 | + name: The name of the tag |
| 227 | + sign: True to sign the tag |
| 228 | + message: An optional message to annotate the tag. |
| 229 | + """ |
| 230 | + command = ["git", "tag", name] |
| 231 | + if sign: |
| 232 | + command += ["--sign"] |
| 233 | + if message: |
| 234 | + command += ["--message", message] |
| 235 | + try: |
| 236 | + run_command(command) |
| 237 | + except subprocess.CalledProcessError as e: |
| 238 | + format_and_raise_error(e) |
| 239 | + |
| 240 | + |
| 241 | +def moveable_tag(name: str) -> None: |
| 242 | + """ |
| 243 | + Create a new lightweight tag that should overwrite any previous tags with the same name. |
| 244 | +
|
| 245 | + Args: |
| 246 | + name: The name of the moveable tag. |
| 247 | + """ |
| 248 | + try: |
| 249 | + run_command(["git", "tag", "-f", name]) |
| 250 | + push_remote("origin", name, force=True) |
| 251 | + except subprocess.CalledProcessError as e: |
| 252 | + format_and_raise_error(e) |
| 253 | + |
| 254 | + |
| 255 | +def assert_nondirty() -> None: |
| 256 | + """Assert that the working directory is not dirty.""" |
| 257 | + lines = [ |
| 258 | + line.strip() |
| 259 | + for line in run_command(["git", "status", "--porcelain"]).stdout.splitlines() |
| 260 | + if not line.strip().startswith("??") |
| 261 | + ] |
| 262 | + if joined_lines := "\n".join(lines): |
| 263 | + raise DirtyWorkingDirectoryError(f"Git working directory is not clean:\n\n{joined_lines}") |
| 264 | + |
| 265 | + |
| 266 | +def push_remote(remote_name: str, ref_name: str, force: bool = False) -> None: |
| 267 | + """Push the `ref_name` to the `remote_name` repository, optionally forcing the push.""" |
| 268 | + try: |
| 269 | + result = run_command(["git", "remote"]) |
| 270 | + if remote_name not in result.stdout: |
| 271 | + logger.warning("Remote '%s' not found, skipping push.", remote_name) |
| 272 | + return |
| 273 | + run_command(["git", "push", remote_name, ref_name, "--force" if force else ""]) |
| 274 | + except subprocess.CalledProcessError as e: |
| 275 | + format_and_raise_error(e) |
0 commit comments