diff --git a/ctfcli/core/challenge.py b/ctfcli/core/challenge.py index dc4c58f..195f43e 100644 --- a/ctfcli/core/challenge.py +++ b/ctfcli/core/challenge.py @@ -56,6 +56,7 @@ class Challenge(dict): "host", "connection_info", "healthcheck", + "solution", "attempts", "logic", "flags", @@ -442,32 +443,49 @@ def _create_hints(self): r = self.api.post("/api/v1/hints", json=hint_payload) r.raise_for_status() + def _parse_solution_definition(self) -> tuple[str, str] | None: + solution = self.get("solution", None) + if not solution: + return None - def _resolve_writeup(self) -> Optional[Path]: - writeup_path_candidates = [ - self.challenge_directory / "writeup" / "WRITEUP.md", - self.challenge_directory / "writeup" / "writeup.md", - self.challenge_directory / "WRITEUP.md", - self.challenge_directory / "writeup.md", - self.challenge_directory / "writeup" / "README.md", - self.challenge_directory / "writeup" / "readme.md", - ] + if type(solution) == str: + return solution, "hidden" - writeup_path = None - for candidate in writeup_path_candidates: - if candidate.exists(): - writeup_path = candidate - break + if type(solution) != dict: + click.secho( + "The solution field must be a string path or an object with path and visibility/state", + fg="red", + ) + return None + + solution_path = solution.get("path") + if type(solution_path) != str or not solution_path: + click.secho("The solution object must define a non-empty string path field", fg="red") + return None + + solution_state = solution.get("visibility", solution.get("state", "hidden")) + if type(solution_state) != str or solution_state not in ["hidden", "visible", "solved"]: + click.secho("The solution visibility/state must be one of: hidden, visible, solved", fg="red") + return None + + return solution_path, solution_state - if writeup_path is None: + def _resolve_solution_path(self) -> tuple[Path, str] | None: + parsed_solution = self._parse_solution_definition() + if not parsed_solution: + return None + + solution_path_string, solution_state = parsed_solution + solution_path = self.challenge_directory / solution_path_string + if not solution_path.is_file(): click.secho( - f"Could not find a writeup file for challenge {self}!", + f"Solution file '{solution_path_string}' specified, but not found at {solution_path}", fg="red", ) - return + return None + + return solution_path, solution_state - return writeup_path - def _delete_existing_solution(self): remote_solutions = self.api.get("/api/v1/solutions").json()["data"] for solution in remote_solutions: @@ -475,44 +493,50 @@ def _delete_existing_solution(self): r = self.api.delete(f"/api/v1/solutions/{solution['id']}") r.raise_for_status() + def _get_existing_solution_id(self) -> int | None: + r = self.api.get("/api/v1/solutions") + r.raise_for_status() + remote_solutions = r.json().get("data") or [] + for solution in remote_solutions: + if solution["challenge_id"] == self.challenge_id: + return solution["id"] + return None + def _create_solution(self): - writeup_path = self._resolve_writeup() + resolved_solution = self._resolve_solution_path() + if not resolved_solution: + return + solution_path, solution_state = resolved_solution - if not writeup_path: - click.secho( - f"Failed to create solution for {self}!", - fg="red", + solution_id = self._get_existing_solution_id() + if solution_id is None: + solution_payload_create = {"challenge_id": self.challenge_id, "state": solution_state, "content": ""} + + r = self.api.post("/api/v1/solutions", json=solution_payload_create) + r.raise_for_status() + solution_id = r.json()["data"]["id"] + else: + # Keep solution state in sync and clear stale content before rebuilding references. + r = self.api.patch( + f"/api/v1/solutions/{solution_id}", + json={"state": solution_state, "content": ""}, ) - return - - solution_payload_create = { - "challenge_id": self.challenge_id, - "state": "hidden", - "content": "" - } + r.raise_for_status() - r = self.api.post("/api/v1/solutions", json=solution_payload_create) - r.raise_for_status() - solution_id = r.json()["data"]["id"] - - with writeup_path.open("r") as writeup_file: - content = writeup_file.read() - - # Find all images in the content (both markdown and HTML formats) + with solution_path.open("r") as solution_file: + content = solution_file.read() + + # Find all images in the content (markdown format; ignore html format) # Markdown format: ![alt text](image_url) # Returns tuples: (full_match, alt_text, image_path) - markdown_images = re.findall(r'(!\[([^\]]*)\]\(([^\)]+)\))', content) - # HTML format: - # Returns tuples: (full_match, image_path) - html_images = re.findall(r'(]+src=["\']([^"\']+)["\'][^>]*>)', content) - + markdown_images = re.findall(r"(!\[([^\]]*)\]\(([^\)]+)\))", content) + # Find all snippet includes (MkDocs style: --8<-- "filename") # Returns tuples: (full_match, filename) snippet_includes = re.findall(r'(--8<--\s+["\']([^"\']+)["\'])', content) - for mdx, alt, path in markdown_images: - new_file = ("file", open(writeup_path.parent / path, mode="rb")) + new_file = ("file", open(solution_path.parent / path, mode="rb")) file_payload = { "type": "solution", "solution_id": solution_id, @@ -527,7 +551,7 @@ def _create_solution(self): # Process snippet includes (--8<-- "filename") for full_match, filename in snippet_includes: - snippet_file_path = writeup_path.parent / filename + snippet_file_path = solution_path.parent / filename if snippet_file_path.exists(): with snippet_file_path.open("r") as snippet_file: snippet_content = snippet_file.read() @@ -536,15 +560,8 @@ def _create_solution(self): else: log.warning(f"Snippet file not found: {filename}") - # # Log found images for debugging - # if markdown_images: - # print(f"Found {len(markdown_images)} markdown images in writeup") - # if html_images: - # log.debug(f"Found {len(html_images)} HTML images in writeup") - solution_payload_patch = { - "content": content - } + solution_payload_patch = {"content": content} r = self.api.patch(f"/api/v1/solutions/{solution_id}", json=solution_payload_patch) r.raise_for_status() @@ -990,6 +1007,10 @@ def create(self, ignore: tuple[str] = ()) -> None: if "next" not in ignore: self._set_next(_next) + # Add solution + if "solution" not in ignore: + self._create_solution() + # Bring back the challenge if it's supposed to be visible # Either explicitly, or by assuming the default value (possibly because the state is ignored) if challenge.get("state", "visible") == "visible" or "state" in ignore: @@ -1060,6 +1081,35 @@ def lint(self, skip_hadolint=False, flag_format="flag{") -> bool: f"Challenge file '{challenge_file}' specified, but not found at {challenge_file_path}" ) + # Check that the optional solution file exists + solution = self.get("solution", None) + if solution: + solution_file = None + solution_state = "hidden" + + if type(solution) == str: + solution_file = solution + elif type(solution) == dict: + solution_file = solution.get("path") + solution_state = solution.get("visibility", solution.get("state", "hidden")) + + if type(solution_state) != str or solution_state not in ["hidden", "visible", "solved"]: + issues["fields"].append("The solution visibility/state must be one of: hidden, visible, solved") + + else: + issues["fields"].append( + "The solution field must be a string path or an object with path and visibility/state" + ) + + if type(solution_file) != str or not solution_file: + issues["fields"].append("The solution object must define a non-empty string path field") + else: + solution_file_path = self.challenge_directory / solution_file + if solution_file_path.is_file() is False: + issues["files"].append( + f"Solution file '{solution_file}' specified, but not found at {solution_file_path}" + ) + # Check that files don't have a flag in them for challenge_file in files: challenge_file_path = self.challenge_directory / challenge_file diff --git a/ctfcli/spec/challenge-example.yml b/ctfcli/spec/challenge-example.yml index ecee287..a8d1aef 100644 --- a/ctfcli/spec/challenge-example.yml +++ b/ctfcli/spec/challenge-example.yml @@ -48,6 +48,15 @@ connection_info: nc hostname 12345 # ./writeup/exploit.sh --connection-info "nc hostname 12345" healthcheck: writeup/exploit.sh +# solution is used to provide a path to the challenge solution document. +# The file path is relative to this challenge.yml file. +# If provided as a string path, ctfcli uploads it as a hidden CTFd solution during sync. +# You can also use an object: +# solution: +# path: writeup/WRITEUP.md +# visibility: solved # hidden | visible | solved +solution: writeup/WRITEUP.md + # Can be removed if unused attempts: 5 diff --git a/tests/core/test_challenge.py b/tests/core/test_challenge.py index fe1e2a4..53c5797 100644 --- a/tests/core/test_challenge.py +++ b/tests/core/test_challenge.py @@ -134,6 +134,217 @@ def test_load_installed_challenges(self, mock_api: MagicMock): mock_get.assert_called_once_with("/api/v1/challenges?view=admin") +class TestChallengeSolutions(unittest.TestCase): + minimal_challenge = BASE_DIR / "fixtures" / "challenges" / "test-challenge-minimal" / "challenge.yml" + solution_challenge = BASE_DIR / "fixtures" / "challenges" / "test-challenge-solution" / "challenge.yml" + + def test_resolves_solution_from_specified_path(self): + challenge = Challenge(self.minimal_challenge, {"solution": "challenge.yml"}) + solution_path, solution_state = challenge._resolve_solution_path() + self.assertEqual(solution_path, challenge.challenge_directory / "challenge.yml") + self.assertEqual(solution_state, "hidden") + + def test_resolves_solution_object_from_specified_path(self): + challenge = Challenge( + self.minimal_challenge, + { + "solution": { + "path": "challenge.yml", + "visibility": "solved", + } + }, + ) + solution_path, solution_state = challenge._resolve_solution_path() + self.assertEqual(solution_path, challenge.challenge_directory / "challenge.yml") + self.assertEqual(solution_state, "solved") + + def test_resolves_solution_object_state_alias_from_specified_path(self): + challenge = Challenge( + self.minimal_challenge, + { + "solution": { + "path": "challenge.yml", + "state": "visible", + } + }, + ) + solution_path, solution_state = challenge._resolve_solution_path() + self.assertEqual(solution_path, challenge.challenge_directory / "challenge.yml") + self.assertEqual(solution_state, "visible") + + def test_does_not_resolve_solution_if_not_specified(self): + challenge = Challenge(self.minimal_challenge) + self.assertIsNone(challenge._resolve_solution_path()) + + @mock.patch("ctfcli.core.challenge.click.secho") + def test_does_not_resolve_solution_if_missing(self, mock_secho: MagicMock): + challenge = Challenge(self.minimal_challenge, {"solution": "writeup/WRITEUP.md"}) + self.assertIsNone(challenge._resolve_solution_path()) + mock_secho.assert_called_once_with( + f"Solution file 'writeup/WRITEUP.md' specified, but not found at " + f"{challenge.challenge_directory / 'writeup/WRITEUP.md'}", + fg="red", + ) + + @mock.patch("ctfcli.core.challenge.API") + def test_creates_solution_from_specified_path(self, mock_api_constructor: MagicMock): + challenge = Challenge(self.minimal_challenge, {"solution": "challenge.yml"}) + challenge.challenge_id = 1 + + def mock_get(*args, **kwargs): + path = args[0] + if path == "/api/v1/solutions": + mock_response = MagicMock() + mock_response.json.return_value = {"success": True, "data": []} + return mock_response + return MagicMock() + + def mock_post(*args, **kwargs): + path = args[0] + + if path == "/api/v1/solutions": + mock_response = MagicMock() + mock_response.json.return_value = {"success": True, "data": {"id": 5}} + return mock_response + + return MagicMock() + + mock_api: MagicMock = mock_api_constructor.return_value + mock_api.get.side_effect = mock_get + mock_api.post.side_effect = mock_post + + challenge._create_solution() + + mock_api.post.assert_has_calls( + [call("/api/v1/solutions", json={"challenge_id": 1, "state": "hidden", "content": ""})] + ) + mock_api.patch.assert_has_calls([call("/api/v1/solutions/5", json={"content": ANY})]) + + @mock.patch("ctfcli.core.challenge.API") + def test_creates_solution_from_object_with_visibility(self, mock_api_constructor: MagicMock): + challenge = Challenge(self.minimal_challenge, {"solution": {"path": "challenge.yml", "visibility": "visible"}}) + challenge.challenge_id = 1 + + def mock_get(*args, **kwargs): + path = args[0] + if path == "/api/v1/solutions": + mock_response = MagicMock() + mock_response.json.return_value = {"success": True, "data": []} + return mock_response + return MagicMock() + + def mock_post(*args, **kwargs): + path = args[0] + + if path == "/api/v1/solutions": + mock_response = MagicMock() + mock_response.json.return_value = {"success": True, "data": {"id": 5}} + return mock_response + + return MagicMock() + + mock_api: MagicMock = mock_api_constructor.return_value + mock_api.get.side_effect = mock_get + mock_api.post.side_effect = mock_post + + challenge._create_solution() + + mock_api.post.assert_has_calls( + [call("/api/v1/solutions", json={"challenge_id": 1, "state": "visible", "content": ""})] + ) + + @mock.patch("ctfcli.core.challenge.API") + def test_updates_existing_solution_instead_of_creating_duplicate(self, mock_api_constructor: MagicMock): + challenge = Challenge(self.minimal_challenge, {"solution": {"path": "challenge.yml", "visibility": "solved"}}) + challenge.challenge_id = 1 + + def mock_get(*args, **kwargs): + path = args[0] + if path == "/api/v1/solutions": + mock_response = MagicMock() + mock_response.json.return_value = { + "success": True, + "data": [{"id": 9, "challenge_id": 1, "state": "hidden", "content": "old"}], + } + return mock_response + return MagicMock() + + mock_api: MagicMock = mock_api_constructor.return_value + mock_api.get.side_effect = mock_get + + challenge._create_solution() + + mock_api.post.assert_not_called() + mock_api.patch.assert_has_calls( + [ + call("/api/v1/solutions/9", json={"state": "solved", "content": ""}), + call("/api/v1/solutions/9", json={"content": ANY}), + ], + any_order=True, + ) + + @mock.patch("ctfcli.core.challenge.API") + def test_does_not_create_solution_if_not_specified(self, mock_api_constructor: MagicMock): + challenge = Challenge(self.minimal_challenge) + challenge.challenge_id = 1 + + mock_api: MagicMock = mock_api_constructor.return_value + challenge._create_solution() + + mock_api.post.assert_not_called() + mock_api.patch.assert_not_called() + + @mock.patch("ctfcli.core.challenge.API") + def test_solution_uploads_markdown_images_and_inlines_snippets(self, mock_api_constructor: MagicMock): + challenge = Challenge(self.solution_challenge) + challenge.challenge_id = 1 + + def mock_get(*args, **kwargs): + path = args[0] + if path == "/api/v1/solutions": + mock_response = MagicMock() + mock_response.json.return_value = {"success": True, "data": []} + return mock_response + return MagicMock() + + def mock_post(*args, **kwargs): + path = args[0] + + if path == "/api/v1/solutions": + mock_response = MagicMock() + mock_response.json.return_value = {"success": True, "data": {"id": 5}} + return mock_response + + if path == "/api/v1/files": + mock_response = MagicMock() + mock_response.json.return_value = { + "success": True, + "data": [{"location": "solution-uploaded/test.png"}], + } + return mock_response + + return MagicMock() + + mock_api: MagicMock = mock_api_constructor.return_value + mock_api.get.side_effect = mock_get + mock_api.post.side_effect = mock_post + + challenge._create_solution() + + mock_api.post.assert_has_calls( + [ + call("/api/v1/solutions", json={"challenge_id": 1, "state": "hidden", "content": ""}), + call("/api/v1/files", files=ANY, data={"type": "solution", "solution_id": 5}), + ] + ) + mock_api.patch.assert_called_once() + patched_content = mock_api.patch.call_args.kwargs["json"]["content"] + + self.assertIn("![diagram](/files/solution-uploaded/test.png)", patched_content) + self.assertIn('print("snippet from python")', patched_content) + self.assertIn('', patched_content) + + class TestSyncChallenge(unittest.TestCase): installed_challenges = [ { @@ -1055,6 +1266,7 @@ def test_does_not_update_ignored_attributes(self): "files", "hints", "requirements", + "solution", # fmt: on ] @@ -1133,6 +1345,9 @@ def test_does_not_update_ignored_attributes(self): if p in ["flags", "topics", "tags", "files", "hints", "requirements"]: challenge[p] = ["new-value"] + if p == "solution": + challenge[p] = "challenge.yml" + challenge.sync(ignore=[p]) mock_api: MagicMock = mock_api_constructor.return_value @@ -1267,6 +1482,45 @@ def mock_post(*args, **kwargs): mock_api.delete.assert_not_called() + @mock.patch("ctfcli.core.challenge.Challenge.load_installed_challenges", return_value=installed_challenges) + @mock.patch("ctfcli.core.challenge.API") + def test_creates_solution_on_create(self, mock_api_constructor: MagicMock, *args, **kwargs): + challenge = Challenge(self.minimal_challenge, {"solution": "challenge.yml"}) + + def mock_post(*args, **kwargs): + path = args[0] + + if path == "/api/v1/challenges": + mock_response = MagicMock() + mock_response.json.return_value = {"success": True, "data": {"id": 3}} + return mock_response + + if path == "/api/v1/solutions": + mock_response = MagicMock() + mock_response.json.return_value = {"success": True, "data": {"id": 5}} + return mock_response + + return MagicMock() + + mock_api: MagicMock = mock_api_constructor.return_value + mock_api.post.side_effect = mock_post + + challenge.create() + + mock_api.post.assert_has_calls( + [ + call("/api/v1/challenges", json=ANY), + call("/api/v1/solutions", json={"challenge_id": 3, "state": "hidden", "content": ""}), + ], + any_order=True, + ) + mock_api.patch.assert_has_calls( + [ + call("/api/v1/solutions/5", json={"content": ANY}), + ], + any_order=True, + ) + @mock.patch("ctfcli.core.challenge.Challenge.load_installed_challenges", return_value=installed_challenges) @mock.patch("ctfcli.core.challenge.API") def test_exits_if_files_do_not_exist(self, mock_api_constructor: MagicMock, *args, **kwargs): @@ -1286,7 +1540,7 @@ def test_does_not_set_ignored_attributes(self): # fmt:off properties = [ "value", "category", "description", "attribution", "attempts", "connection_info", "state", # simple types - "extra", "flags", "topics", "tags", "files", "hints", "requirements" # complex types + "extra", "flags", "topics", "tags", "files", "hints", "requirements", "solution" # complex types ] # fmt:on @@ -1347,6 +1601,9 @@ def test_does_not_set_ignored_attributes(self): if p in ["flags", "topics", "tags", "files", "hints", "requirements"]: challenge[p] = ["new-value"] + if p == "solution": + challenge[p] = "challenge.yml" + def mock_post(*args, **kwargs): path = args[0] @@ -1501,6 +1758,37 @@ def test_looks_for_flags_in_dist_files(self, *args, **kwargs): self.assertDictEqual(expected_lint_issues, e.exception.issues) + def test_validates_solution_file_exists(self): + challenge = Challenge(self.minimal_challenge, {"solution": "writeup/WRITEUP.md"}) + + with self.assertRaises(LintException) as e: + challenge.lint(skip_hadolint=True) + + expected_solution_path = (challenge.challenge_directory / "writeup" / "WRITEUP.md").absolute() + expected_lint_issues = { + "fields": [], + "dockerfile": [], + "hadolint": [], + "files": [f"Solution file 'writeup/WRITEUP.md' specified, but not found at {expected_solution_path}"], + } + + self.assertDictEqual(expected_lint_issues, e.exception.issues) + + def test_validates_solution_visibility(self): + challenge = Challenge(self.minimal_challenge, {"solution": {"path": "challenge.yml", "visibility": "public"}}) + + with self.assertRaises(LintException) as e: + challenge.lint(skip_hadolint=True) + + expected_lint_issues = { + "fields": ["The solution visibility/state must be one of: hidden, visible, solved"], + "dockerfile": [], + "hadolint": [], + "files": [], + } + + self.assertDictEqual(expected_lint_issues, e.exception.issues) + class TestVerifyMirrorChallenge(unittest.TestCase): installed_challenges = [ diff --git a/tests/fixtures/challenges/test-challenge-solution/challenge.yml b/tests/fixtures/challenges/test-challenge-solution/challenge.yml new file mode 100644 index 0000000..2f0ca83 --- /dev/null +++ b/tests/fixtures/challenges/test-challenge-solution/challenge.yml @@ -0,0 +1,9 @@ +name: Test Challenge Solution +category: Test +description: Test challenge with solution fixture +attribution: Test Attribution +value: 100 +author: Test +type: standard +state: hidden +solution: writeup/WRITEUP.md diff --git a/tests/fixtures/challenges/test-challenge-solution/writeup/WRITEUP.md b/tests/fixtures/challenges/test-challenge-solution/writeup/WRITEUP.md new file mode 100644 index 0000000..740009d --- /dev/null +++ b/tests/fixtures/challenges/test-challenge-solution/writeup/WRITEUP.md @@ -0,0 +1,10 @@ +# Solution + +This markdown image should be uploaded and rewritten: +![diagram](images/test.png) + +This HTML image is currently not rewritten by ctfcli core: + + +Inline snippet: +--8<-- "src/example.py" diff --git a/tests/fixtures/challenges/test-challenge-solution/writeup/images/test-html.png b/tests/fixtures/challenges/test-challenge-solution/writeup/images/test-html.png new file mode 100644 index 0000000..0040c1e --- /dev/null +++ b/tests/fixtures/challenges/test-challenge-solution/writeup/images/test-html.png @@ -0,0 +1 @@ +fake-html-png-bytes diff --git a/tests/fixtures/challenges/test-challenge-solution/writeup/images/test.png b/tests/fixtures/challenges/test-challenge-solution/writeup/images/test.png new file mode 100644 index 0000000..87c3e19 --- /dev/null +++ b/tests/fixtures/challenges/test-challenge-solution/writeup/images/test.png @@ -0,0 +1 @@ +fake-png-bytes diff --git a/tests/fixtures/challenges/test-challenge-solution/writeup/src/example.py b/tests/fixtures/challenges/test-challenge-solution/writeup/src/example.py new file mode 100644 index 0000000..b274895 --- /dev/null +++ b/tests/fixtures/challenges/test-challenge-solution/writeup/src/example.py @@ -0,0 +1 @@ +print("snippet from python")