Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 84 additions & 14 deletions helpers/git.py
Original file line number Diff line number Diff line change
Expand Up @@ -425,27 +425,97 @@ def is_official_agent_zero_repo() -> bool:
return False


# ---------------------------------------------------------------------------
# URL helpers for authenticated cloning
# ---------------------------------------------------------------------------

_SAFE_URL_RE = re.compile(
r'^(https?://[A-Za-z0-9._~:/?#\[\]@!$&\'()*+,;=%\-]+' # http(s)
r'|ssh://[A-Za-z0-9._\-]+@[A-Za-z0-9._\-]+(:\d+)?/[A-Za-z0-9/_\.\-]+' # ssh://user@host/
r'|[A-Za-z0-9._\-]+@[A-Za-z0-9._\-]+:[A-Za-z0-9/_\.\-]+)$' # user@host:path
)


def _validate_git_url(url: str) -> None:
"""Raise ValueError for URLs that are not safe git remote URLs."""
if not url or not isinstance(url, str):
raise ValueError("git_url must be a non-empty string")
url = url.strip()
if not _SAFE_URL_RE.match(url):
raise ValueError(
f"Invalid or unsafe git_url: only https://, http://, ssh://user@host/, "
f"or user@host: URLs are accepted"
)


def _is_http_url(url: str) -> bool:
return url.startswith('https://') or url.startswith('http://')


def _inject_token_into_url(clean_url: str, token: str) -> str:
"""Return a clone URL with embedded credentials.

- Bitbucket: https://x-token-auth:<token>@bitbucket.org/...
- All others: https://oauth2:<token>@<host>/...
"""
parsed = urlparse(clean_url)
host = (parsed.hostname or '').lower()
if 'bitbucket.org' in host:
userinfo = f'x-token-auth:{token}'
else:
userinfo = f'oauth2:{token}'
# Reconstruct netloc with credentials
netloc = f'{userinfo}@{parsed.hostname}'
if parsed.port:
netloc += f':{parsed.port}'
return urlunparse((parsed.scheme, netloc, parsed.path, '', '', ''))


def _sanitize_token(text: str, token: str) -> str:
"""Replace any occurrence of *token* in *text* with ***."""
if not token:
return text
return text.replace(token, '***')


def clone_repo(url: str, dest: str, token: str | None = None):
"""Clone a git repository. Uses http.extraHeader for token auth (never stored in URL/config)."""
cmd = ['git']

if token:
# GitHub Git HTTP requires Basic Auth, not Bearer
auth_string = f"x-access-token:{token}"
auth_base64 = base64.b64encode(auth_string.encode()).decode()
cmd.extend(['-c', f'http.extraHeader=Authorization: Basic {auth_base64}'])

cmd.extend(['clone', '--progress', '--', url, dest])
"""Clone a git repository.

When *token* is provided the URL is temporarily rewritten to embed
credentials (oauth2 for GitHub/GitLab, x-token-auth for Bitbucket).
The token is NEVER written to .git/config – after the clone the remote
origin is reset to the clean URL. Any error message that would expose
the token is sanitised before being re-raised.
"""
_validate_git_url(url)
clean_url = strip_auth_from_url(url)

env = os.environ.copy()
env['GIT_TERMINAL_PROMPT'] = '0'


if token and _is_http_url(url):
auth_url = _inject_token_into_url(clean_url, token)
cmd = ['git', 'clone', '--progress', '--', auth_url, dest]
else:
auth_url = None
cmd = ['git', 'clone', '--progress', '--', url, dest]

result = subprocess.run(cmd, capture_output=True, text=True, env=env)

if result.returncode != 0:
error_msg = result.stderr.strip() or result.stdout.strip() or 'Unknown error'
if token:
error_msg = _sanitize_token(error_msg, token)
raise Exception(f"Git clone failed: {error_msg}")


# Reset remote origin to the clean URL so the token is never on disk
if auth_url is not None:
try:
cloned = Repo(dest)
cloned.remotes.origin.set_url(clean_url)
except Exception:
pass # non-fatal – repo was cloned successfully

return Repo(dest)


Expand Down
70 changes: 70 additions & 0 deletions tests/test_git_url_validation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
"""Regression tests for helpers.git._validate_git_url URL allowlist.

Codex review on PR #1601 (commit ce38afa) flagged that an early version of
_SAFE_URL_RE only accepted the literal `git@` SSH username, which would
reject valid Git remotes that use a different SSH user (e.g. self-hosted
or enterprise repositories using `alice@host:org/repo.git`).

Commit 49a0ff9 loosened the regex to allow any safe username class.
These tests guard against a regression of that fix.
"""

import sys
import types
from pathlib import Path

import pytest

PROJECT_ROOT = Path(__file__).resolve().parents[1]
if str(PROJECT_ROOT) not in sys.path:
sys.path.insert(0, str(PROJECT_ROOT))

# Stub giturlparse so importing helpers.git does not require the optional dep.
sys.modules.setdefault(
"giturlparse",
types.SimpleNamespace(
parse=lambda *args, **kwargs: types.SimpleNamespace(
owner="",
repo="",
name="",
valid=False,
)
),
)

from helpers import git as git_helpers # noqa: E402


@pytest.mark.parametrize(
"url",
[
# Non-git SSH usernames - the case Codex flagged.
"alice@host:org/repo.git",
"ssh://alice@host/org/repo.git",
"ssh://alice@host:2222/org/repo.git",
# Original git@ form must still work.
"git@github.com:agent0ai/agent-zero.git",
"ssh://git@github.com/agent0ai/agent-zero.git",
# https form for completeness.
"https://github.com/agent0ai/agent-zero.git",
],
)
def test_validate_git_url_accepts_safe_urls(url):
"""_validate_git_url must accept SSH URLs with arbitrary safe usernames."""
git_helpers._validate_git_url(url)


@pytest.mark.parametrize(
"url",
[
"",
"file:///etc/passwd",
"https://example.com/repo.git; rm -rf /",
"ssh://user@host/path`whoami`",
],
)
def test_validate_git_url_rejects_unsafe_urls(url):
"""_validate_git_url must reject empty, non-http(s)/ssh, or shell-meta URLs."""
with pytest.raises(ValueError):
git_helpers._validate_git_url(url)