diff --git a/scripts/deploy.py b/scripts/deploy.py index 662139e7f3b..dffd1854605 100644 --- a/scripts/deploy.py +++ b/scripts/deploy.py @@ -6,16 +6,12 @@ from typing import Iterable -import requests - from deploy_util import ( get_git_deploy_reason, - deploy_file_to_wiki, - read_cookie_jar, read_file_from_path, write_to_github_summary_file, ) -from login_and_get_token import get_token +from mediawiki_session import MediaWikiSession HEADER_PATTERN = re.compile( r"\A---\n" r"-- @Liquipedia\n" r"-- page=(?P[^\n]*)\n" @@ -26,9 +22,7 @@ def deploy_all_files_for_wiki( wiki: str, file_paths: Iterable[pathlib.Path], deploy_reason: str ) -> bool: all_modules_deployed = True - token = get_token(wiki) - with requests.Session() as session: - session.cookies = read_cookie_jar(wiki) + with MediaWikiSession(wiki) as session: for file_path in file_paths: print(f"::group::Checking {str(file_path)}") file_content = read_file_from_path(file_path) @@ -40,8 +34,8 @@ def deploy_all_files_for_wiki( page = header_match.groupdict()["pageName"] + ( os.getenv("LUA_DEV_ENV_NAME") or "" ) - module_deployed, _ = deploy_file_to_wiki( - session, file_path, file_content, wiki, page, token, deploy_reason + module_deployed, _ = session.deploy_file( + file_path, file_content, page, deploy_reason ) all_modules_deployed &= module_deployed print("::endgroup::") diff --git a/scripts/deploy_res.py b/scripts/deploy_res.py index 2b30f76a840..56cb17e3fd3 100644 --- a/scripts/deploy_res.py +++ b/scripts/deploy_res.py @@ -5,66 +5,55 @@ from typing import Iterable -import requests - from deploy_util import ( - HEADER, get_git_deploy_reason, - get_wiki_api_url, - deploy_file_to_wiki, - read_cookie_jar, read_file_from_path, ) -from login_and_get_token import get_token +from mediawiki_session import MediaWikiSession def deploy_resources( - res_type: str, file_paths: Iterable[pathlib.Path], deploy_reason: str + session: MediaWikiSession, + res_type: str, + file_paths: Iterable[pathlib.Path], + deploy_reason: str, ) -> tuple[bool, bool]: all_deployed = True changes_made = False - token = get_token("commons") - with requests.Session() as session: - session.cookies = read_cookie_jar("commons") - for file_path in file_paths: - print(f"::group::Checking {str(file_path)}") - file_content = read_file_from_path(file_path) - page = ( - f"MediaWiki:Common.{'js' if res_type == '.js' else 'css'}/" - + file_path.name - ) - print(f"...page = {page}") - deploy_result = deploy_file_to_wiki( - session, file_path, file_content, "commons", page, token, deploy_reason - ) - all_deployed = all_deployed and deploy_result[0] - changes_made = changes_made or deploy_result[1] - print("::endgroup::") + for file_path in file_paths: + print(f"::group::Checking {str(file_path)}") + file_content = read_file_from_path(file_path) + page = ( + f"MediaWiki:Common.{'js' if res_type == '.js' else 'css'}/" + file_path.name + ) + print(f"...page = {page}") + deploy_result = session.deploy_file( + file_path, file_content, page, deploy_reason + ) + all_deployed = all_deployed and deploy_result[0] + changes_made = changes_made or deploy_result[1] + print("::endgroup::") return (all_deployed, changes_made) -def update_cache(): - with requests.Session() as session: - session.cookies = read_cookie_jar("commons") - cache_result = session.post( - get_wiki_api_url("commons"), - headers=HEADER, - params={"format": "json", "action": "updatelpmwmessageapi"}, - data={ - "messagename": "Resourceloaderarticles-cacheversion", - "value": subprocess.check_output(["git", "log", "-1", "--pretty=%h"]) - .decode() - .strip(), - }, - ).json() - if ( - cache_result["updatelpmwmessageapi"].get("message") - == "Successfully changed the message value" - ): - print("Resource cache version updated succesfully!") - else: - print("::error::Resource cache version unable to be updated!") - exit(1) +def update_cache(session: MediaWikiSession): + cache_result = session.make_action( + "updatelpmwmessageapi", + data={ + "messagename": "Resourceloaderarticles-cacheversion", + "value": subprocess.check_output(["git", "log", "-1", "--pretty=%h"]) + .decode() + .strip(), + }, + ) + if ( + cache_result["updatelpmwmessageapi"].get("message") + == "Successfully changed the message value" + ): + print("Resource cache version updated succesfully!") + else: + print("::error::Resource cache version unable to be updated!") + exit(1) def main(): @@ -82,22 +71,23 @@ def main(): resource_files = [pathlib.Path(arg) for arg in sys.argv[1:]] git_deploy_reason = get_git_deploy_reason() - for res_type, files in itertools.groupby( - sorted(resource_files), lambda path: path.suffix - ): - group_all_deployed, group_changes_made = deploy_resources( - res_type, list(files), git_deploy_reason - ) - all_deployed = all_deployed and group_all_deployed - changes_made = changes_made or group_changes_made + with MediaWikiSession("commons") as commons_session: + for res_type, files in itertools.groupby( + sorted(resource_files), lambda path: path.suffix + ): + group_all_deployed, group_changes_made = deploy_resources( + commons_session, res_type, list(files), git_deploy_reason + ) + all_deployed = all_deployed and group_all_deployed + changes_made = changes_made or group_changes_made - if not all_deployed: - print( - "::error::Some files were not deployed; resource cache version not updated!" - ) - exit(1) - elif changes_made: - update_cache() + if not all_deployed: + print( + "::error::Some files were not deployed; resource cache version not updated!" + ) + exit(1) + elif changes_made: + update_cache(commons_session) if __name__ == "__main__": diff --git a/scripts/deploy_util.py b/scripts/deploy_util.py index 32552af2238..b2e617ec46a 100644 --- a/scripts/deploy_util.py +++ b/scripts/deploy_util.py @@ -1,5 +1,4 @@ import functools -import http.cookiejar import os import pathlib import subprocess @@ -8,22 +7,17 @@ import requests __all__ = [ - "DEPLOY_TRIGGER", "HEADER", "SLEEP_DURATION", - "deploy_file_to_wiki", "get_git_deploy_reason", - "get_wiki_api_url", "get_wikis", - "read_cookie_jar", "read_file_from_path", "write_to_github_summary_file", ] -DEPLOY_TRIGGER = os.getenv("DEPLOY_TRIGGER") GITHUB_STEP_SUMMARY_FILE = os.getenv("GITHUB_STEP_SUMMARY") USER_AGENT = f"GitHub Autodeploy Bot/2.0.0 ({os.getenv('WIKI_UA_EMAIL')})" -WIKI_BASE_URL = os.getenv("WIKI_BASE_URL") + HEADER = { "User-Agent": USER_AGENT, "accept": "application/json", @@ -32,19 +26,15 @@ SLEEP_DURATION = 4 -def get_wikis() -> set[str]: +@functools.cache +def get_wikis() -> frozenset[str]: response = requests.get( "https://liquipedia.net/api.php", headers=HEADER, ) wikis = response.json() time.sleep(SLEEP_DURATION) - return set(wikis["allwikis"].keys()) - - -@functools.cache -def get_wiki_api_url(wiki: str) -> str: - return f"{WIKI_BASE_URL}/{wiki}/api.php" + return frozenset(wikis["allwikis"].keys()) def get_git_deploy_reason(): @@ -55,85 +45,6 @@ def get_git_deploy_reason(): ) -def deploy_file_to_wiki( - session: requests.Session, - file_path: pathlib.Path, - file_content: str, - wiki: str, - target_page: str, - token: str, - deploy_reason: str, -) -> tuple[bool, bool]: - change_made = False - deployed = True - response = session.post( - get_wiki_api_url(wiki), - headers=HEADER, - params={"format": "json", "action": "edit"}, - data={ - "title": target_page, - "text": file_content, - "summary": f"Git: {deploy_reason}", - "bot": "true", - "recreate": "true", - "token": token, - }, - ).json() - edit_info = response.get("edit") - error_info = response.get("error") - - # Handle API errors or unexpected response structure - if error_info is not None or edit_info is None: - print(f"::warning file={str(file_path)}::failed to deploy (API error)") - details = "" - if isinstance(error_info, dict): - code = error_info.get("code") - info = error_info.get("info") - detail_parts = [] - if code: - detail_parts.append(f"code={code}") - if info: - detail_parts.append(f"info={info}") - if detail_parts: - details = " (" + ", ".join(detail_parts) + ")" - write_to_github_summary_file( - f":warning: {str(file_path)} failed to deploy due to API error{details}" - ) - deployed = False - time.sleep(SLEEP_DURATION) - return deployed, change_made - - result = edit_info.get("result") - new_rev_id = edit_info.get("newrevid") - if result == "Success": - if new_rev_id is not None: - change_made = True - if DEPLOY_TRIGGER != "push": - print(f"::warning file={str(file_path)}::File changed") - print(f"...{result}") - print("...done") - write_to_github_summary_file( - f":information_source: {str(file_path)} successfully deployed" - ) - - else: - print(f"::warning file={str(file_path)}::failed to deploy") - write_to_github_summary_file(f":warning: {str(file_path)} failed to deploy") - deployed = False - time.sleep(SLEEP_DURATION) - return deployed, change_made - - -def read_cookie_jar(wiki: str) -> http.cookiejar.FileCookieJar: - ckf = f"cookie_{wiki}.ck" - cookie_jar = http.cookiejar.LWPCookieJar(filename=ckf) - try: - cookie_jar.load(ignore_discard=True) - except OSError: - pass - return cookie_jar - - def read_file_from_path(file_path: pathlib.Path) -> str: with file_path.open("r") as file: return file.read() diff --git a/scripts/login_and_get_token.py b/scripts/login_and_get_token.py deleted file mode 100644 index d91b92afd10..00000000000 --- a/scripts/login_and_get_token.py +++ /dev/null @@ -1,64 +0,0 @@ -import functools -import os -import time - -import requests - -from deploy_util import HEADER, SLEEP_DURATION, get_wiki_api_url, read_cookie_jar - -__all__ = ["get_token"] - -WIKI_USER = os.getenv("WIKI_USER") -WIKI_PASSWORD = os.getenv("WIKI_PASSWORD") - -loggedin: set[str] = set() - - -def login(wiki: str): - if wiki in loggedin: - return - cookie_jar = read_cookie_jar(wiki) - print(f"...logging in on {wiki}") - with requests.Session() as session: - session.cookies = cookie_jar - token_response = session.post( - get_wiki_api_url(wiki), - headers=HEADER, - params={ - "format": "json", - "action": "query", - "meta": "tokens", - "type": "login", - }, - ).json() - session.post( - get_wiki_api_url(wiki), - headers=HEADER, - data={ - "lgname": WIKI_USER, - "lgpassword": WIKI_PASSWORD, - "lgtoken": token_response["query"]["tokens"]["logintoken"], - }, - params={"format": "json", "action": "login"}, - ) - loggedin.add(wiki) - cookie_jar.save(ignore_discard=True) - time.sleep(SLEEP_DURATION) - - -@functools.cache -def get_token(wiki: str) -> str: - login(wiki) - - with requests.Session() as session: - session.cookies = read_cookie_jar(wiki) - token_response = session.post( - get_wiki_api_url(wiki), - headers=HEADER, - params={ - "format": "json", - "action": "query", - "meta": "tokens", - }, - ).json() - return token_response["query"]["tokens"]["csrftoken"] diff --git a/scripts/mediawiki_session.py b/scripts/mediawiki_session.py new file mode 100644 index 00000000000..b7b80ab6e5a --- /dev/null +++ b/scripts/mediawiki_session.py @@ -0,0 +1,167 @@ +import contextlib +import functools +import http.cookiejar +import os +import pathlib +import time + +import requests + +from typing import Any, Optional + +from deploy_util import ( + HEADER, + SLEEP_DURATION, + write_to_github_summary_file, +) + +__all__ = [ + "MediaWikiSession", + "MediaWikiSessionError", +] + +ACTIONS_STEP_DEBUG = os.getenv("ACTIONS_STEP_DEBUG") == "true" +DEPLOY_TRIGGER = os.getenv("DEPLOY_TRIGGER") +WIKI_BASE_URL = os.getenv("WIKI_BASE_URL") +WIKI_USER = os.getenv("WIKI_USER") +WIKI_PASSWORD = os.getenv("WIKI_PASSWORD") + + +class MediaWikiSessionError(IOError): + pass + + +class MediaWikiSession(contextlib.AbstractContextManager): + __cookie_jar: http.cookiejar.FileCookieJar + __session: requests.Session + __wiki: str + + def __init__(self, wiki: str): + self.__wiki = wiki + self.__cookie_jar = self.__read_cookie_jar() + self.__session = requests.session() + self.__session.cookies = self.__cookie_jar + self.__session.headers.update(HEADER) + + def __read_cookie_jar(self) -> http.cookiejar.FileCookieJar: + ckf = f"cookie_{self.wiki}.ck" + cookie_jar = http.cookiejar.LWPCookieJar(filename=ckf) + with contextlib.suppress(OSError): + cookie_jar.load(ignore_discard=True) + return cookie_jar + + @functools.cache + def __get_wiki_api_url(self): + return f"{WIKI_BASE_URL}/{self.wiki}/api.php" + + def _login(self): + token_response = self.make_action( + "query", params={"meta": "tokens", "type": "login"} + ) + self.make_action( + "login", + data={ + "lgname": WIKI_USER, + "lgpassword": WIKI_PASSWORD, + "lgtoken": token_response["tokens"]["logintoken"], + }, + ) + self.__cookie_jar.save(ignore_discard=True) + self.cooldown() + + @functools.cached_property + def token(self) -> str: + self._login() + token = self.make_action("query", params={"meta": "tokens"})["tokens"][ + "csrftoken" + ] + if ACTIONS_STEP_DEBUG: + print(f"::add-mask::{token}") + return token + + @property + def wiki(self) -> str: + return self.__wiki + + def make_action( + self, action: str, params: Optional[dict] = None, data: Optional[dict] = None + ) -> dict[str, Any]: + merged_params = {"format": "json", "action": action} + if params is not None: + merged_params |= params + response = self.__session.post( + self.__get_wiki_api_url(), params=merged_params, data=data + ) + if ACTIONS_STEP_DEBUG: + print(f"params: {merged_params}") + print(f"data: {data}") + print(f"HTTP Status: {response.status_code}") + print(f'Raw response: "{response}"') + parsed_response: dict[str, Any] = response.json() + if "error" in parsed_response.keys(): + raise MediaWikiSessionError(parsed_response["error"]["info"]) + return parsed_response[action] + + def cooldown(self): + time.sleep(SLEEP_DURATION) + + def deploy_file( + self, + file_path: pathlib.Path, + file_content: str, + target_page: str, + deploy_reason: str, + ) -> tuple[bool, bool]: + try: + change_made = False + deployed = True + response = self.make_action( + "edit", + data={ + "title": target_page, + "text": file_content, + "summary": f"Git: {deploy_reason}", + "bot": "true", + "recreate": "true", + "token": self.token, + }, + ) + result = response.get("result") + new_rev_id = response.get("newrevid") + if result == "Success": + if new_rev_id is not None: + change_made = True + if DEPLOY_TRIGGER != "push": + print(f"::warning file={str(file_path)}::File changed") + print(f"...{result}") + print("...done") + write_to_github_summary_file( + f":information_source: {str(file_path)} successfully deployed" + ) + + else: + print(f"::warning file={str(file_path)}::failed to deploy") + write_to_github_summary_file( + f":warning: {str(file_path)} failed to deploy" + ) + deployed = False + return deployed, change_made + except MediaWikiSessionError as e: + print(f"::warning file={str(file_path)}::failed to deploy (API error)") + write_to_github_summary_file( + f":warning: {str(file_path)} failed to deploy due to API error: {str(e)}" + ) + deployed = False + return deployed, change_made + finally: + self.cooldown() + + def close(self): + self.__cookie_jar.save(ignore_discard=True) + self.__session.close() + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, traceback): + self.close() diff --git a/scripts/protect.py b/scripts/protect.py index 562544b6c75..fa2455abf21 100644 --- a/scripts/protect.py +++ b/scripts/protect.py @@ -1,58 +1,90 @@ +import itertools import os import pathlib import sys -from typing import Iterable - from deploy_util import get_wikis +from mediawiki_session import MediaWikiSession from protect_page import ( - protect_non_existing_page, - protect_existing_page, + protect_non_existing_pages, + protect_existing_pages, handle_protect_errors, ) WIKI_TO_PROTECT = os.getenv("WIKI_TO_PROTECT") -def check_for_local_version(module: str, wiki: str): - if wiki == "commons": - return False - return pathlib.Path(f"./lua/wikis/{wiki}/{module}.lua").exists() +def protect_new_wiki(wiki_to_protect: str): + lua_files = itertools.chain( + pathlib.Path("./lua/wikis/commons/").rglob("*.lua"), + pathlib.Path("./lua/wikis/" + wiki_to_protect + "/").rglob("*.lua"), + ) + commons_modules: set[str] = set() + local_modules: set[str] = set() -def protect_if_has_no_local_version(module: str, wiki: str): - page = "Module:" + module - if not check_for_local_version(module, wiki): - protect_non_existing_page(page, wiki) + for file_to_protect in sorted(lua_files): + wiki = file_to_protect.parts[2] + module = "/".join(file_to_protect.parts[3:])[:-4] + page = "Module:" + module + + if wiki == wiki_to_protect: + local_modules.add(page) + elif wiki == "commons": + commons_modules.add(page) + + with MediaWikiSession(wiki_to_protect) as session: + print(f"::group::Protecting {WIKI_TO_PROTECT}") + protect_non_existing_pages(session, commons_modules - local_modules) + protect_existing_pages(session, local_modules) + print("::endgroup::") + + handle_protect_errors() def main(): - lua_files: Iterable[pathlib.Path] - if len(sys.argv[1:]) > 0: - lua_files = [pathlib.Path(arg) for arg in sys.argv[1:]] - elif WIKI_TO_PROTECT: - lua_files = pathlib.Path("./lua/wikis/").rglob("*.lua") - else: + if WIKI_TO_PROTECT: + protect_new_wiki(WIKI_TO_PROTECT) + return + elif len(sys.argv[1:]) == 0: print("Nothing to protect") exit(0) - for file_to_protect in sorted(lua_files): - print(f"::group::Checking {str(file_to_protect)}") - wiki = file_to_protect.parts[2] - module = "/".join(file_to_protect.parts[3:])[:-4] - page = "Module:" + module - if WIKI_TO_PROTECT: - if wiki == WIKI_TO_PROTECT: - protect_existing_page(page, wiki) - elif wiki == "commons": - protect_if_has_no_local_version(module, WIKI_TO_PROTECT) - elif wiki != "commons": - protect_existing_page(page, wiki) - else: # commons case - protect_existing_page(page, wiki) - for deploy_wiki in get_wikis() - {"commons"}: - protect_if_has_no_local_version(module, deploy_wiki) - print("::endgroup::") + lua_files = [pathlib.Path(arg) for arg in sys.argv[1:]] + + files_to_protect_by_wiki: dict[str, set[str]] = dict() + + for wiki, files_to_protect in itertools.groupby( + sorted(lua_files), lambda path: path.parts[2] + ): + files_to_protect_by_wiki[wiki] = set( + [ + "Module:" + "/".join(file_to_protect.parts[3:])[:-4] + for file_to_protect in files_to_protect + ] + ) + + new_commons_modules = files_to_protect_by_wiki.get("commons") + + if new_commons_modules: + for wiki in sorted(get_wikis()): + with MediaWikiSession(wiki) as session: + if wiki == "commons": + protect_existing_pages(session, new_commons_modules) + else: + new_local_modules = files_to_protect_by_wiki.get(wiki) + if new_local_modules: + protect_existing_pages(session, new_local_modules) + protect_non_existing_pages( + session, new_commons_modules - new_local_modules + ) + else: + protect_non_existing_pages(session, new_commons_modules) + else: + for wiki, new_modules in files_to_protect_by_wiki: + with MediaWikiSession(wiki) as session: + protect_existing_pages(session, new_modules) + handle_protect_errors() diff --git a/scripts/protect_page.py b/scripts/protect_page.py index 3d34c9a21ae..a7ff00b9584 100644 --- a/scripts/protect_page.py +++ b/scripts/protect_page.py @@ -1,28 +1,23 @@ -import time +from typing import Iterable, Literal -from typing import Literal -import requests - -from deploy_util import ( - HEADER, - SLEEP_DURATION, - get_wiki_api_url, - read_cookie_jar, - write_to_github_summary_file, -) -from login_and_get_token import get_token +from deploy_util import write_to_github_summary_file +from mediawiki_session import MediaWikiSession, MediaWikiSessionError __all__ = [ - "protect_non_existing_page", - "protect_existing_page", + "protect_non_existing_pages", + "protect_existing_pages", "handle_protect_errors", ] protect_errors = list() -def protect_page(page: str, wiki: str, protect_mode: Literal["edit", "create"]): +def protect_pages( + session: MediaWikiSession, + pages: Iterable[str], + protect_mode: Literal["edit", "create"], +): protect_options: str if protect_mode == "edit": protect_options = "edit=allow-only-sysop|move=allow-only-sysop" @@ -30,65 +25,60 @@ def protect_page(page: str, wiki: str, protect_mode: Literal["edit", "create"]): protect_options = "create=allow-only-sysop" else: raise ValueError(f"invalid protect mode: {protect_mode}") - print(f"...wiki = {wiki}") - print(f"...page = {page}") - token = get_token(wiki) - with requests.Session() as session: - session.cookies = read_cookie_jar(wiki) - response = session.post( - get_wiki_api_url(wiki), - headers=HEADER, - params={"format": "json", "action": "protect"}, - data={ - "title": page, - "protections": protect_options, - "reason": "Git maintained", - "expiry": "infinite", - "bot": "true", - "token": token, - }, - ).json() - - time.sleep(SLEEP_DURATION) - if response.get("error"): + print(f"::group::Protecting {session.wiki}") + for page in sorted(pages): + print(f"...page = {page}") + try: + protections = session.make_action( + "protect", + data={ + "title": page, + "protections": protect_options, + "reason": "Git maintained", + "expiry": "infinite", + "bot": "true", + "token": session.token, + }, + )["protections"] + + for protection in protections: + if protection[protect_mode] == "allow-only-sysop": + break + else: + print( + f"::warning::could not ({protect_mode}) protect {page} on {session.wiki}" + ) + protect_errors.append(f"{protect_mode}:{session.wiki}:{page}") + except MediaWikiSessionError as e: print( - f"::warning::could not ({protect_mode}) protect {page} on {wiki}: {response['error']['info']}" + f"::warning::could not ({protect_mode}) protect {page} on {session.wiki}: {str(e)}" ) - protect_errors.append(f"{protect_mode}:{wiki}:{page}") - return - protections = response["protect"].get("protections") - for protection in protections: - if protection[protect_mode] == "allow-only-sysop": - return - print(f"::warning::could not ({protect_mode}) protect {page} on {wiki}") - protect_errors.append(f"{protect_mode}:{wiki}:{page}") - - -def check_if_page_exists(page: str, wiki: str) -> bool: - with requests.Session() as session: - session.cookies = read_cookie_jar(wiki) - - result = session.post( - get_wiki_api_url(wiki), - headers=HEADER, - params={"format": "json", "action": "query"}, - data={"titles": page, "prop": "info"}, - ).json() + protect_errors.append(f"{protect_mode}:{session.wiki}:{page}") + finally: + session.cooldown() + print("::endgroup::") - time.sleep(SLEEP_DURATION) - return "-1" not in result["query"]["pages"] +def protect_non_existing_pages(session: MediaWikiSession, pages: Iterable[str]): + def filter_non_existing_pages(page: str) -> bool: + try: + result = session.make_action( + "query", + data={"titles": page, "prop": "info"}, + ) + if "-1" in result["pages"]: + return True + print(f"::warning::{page} already exists on {session.wiki}") + protect_errors.append(f"create:{session.wiki}:{page}") + return False + finally: + session.cooldown() -def protect_non_existing_page(page: str, wiki: str): - if check_if_page_exists(page, wiki): - print(f"::warning::{page} already exists on {wiki}") - protect_errors.append(f"create:{wiki}:{page}") - else: - protect_page(page, wiki, "create") + protect_pages(session, filter(filter_non_existing_pages, pages), "create") -def protect_existing_page(page: str, wiki: str): - protect_page(page, wiki, "edit") +def protect_existing_pages(session: MediaWikiSession, pages: Iterable[str]): + protect_pages(session, pages, "edit") def handle_protect_errors(): diff --git a/scripts/protect_templates.py b/scripts/protect_templates.py index 488b93be20b..2bf92cc487f 100644 --- a/scripts/protect_templates.py +++ b/scripts/protect_templates.py @@ -1,8 +1,9 @@ import os +from mediawiki_session import MediaWikiSession from protect_page import ( - protect_non_existing_page, - protect_existing_page, + protect_non_existing_pages, + protect_existing_pages, handle_protect_errors, ) @@ -11,15 +12,19 @@ def main(): with open("./templates/templatesToProtect", "r") as templates_to_protect: - for template_name in templates_to_protect.read().splitlines(): - if len(template_name.strip()) == 0: - continue - template = "Template:" + template_name - print(f"::group::Checking {WIKI_TO_PROTECT}:{template}") + templates = [ + "Template:" + template_name + for template_name in filter( + lambda template: len(template.strip()) > 0, + templates_to_protect.read().splitlines(), + ) + ] + with MediaWikiSession(WIKI_TO_PROTECT) as session: + print(f"::group::Protecting {WIKI_TO_PROTECT}") if WIKI_TO_PROTECT == "commons": - protect_existing_page(template, WIKI_TO_PROTECT) + protect_existing_pages(session, templates) else: - protect_non_existing_page(template, WIKI_TO_PROTECT) + protect_non_existing_pages(session, templates) print("::endgroup::") handle_protect_errors() diff --git a/scripts/remove_dev.py b/scripts/remove_dev.py index c8918c96652..8b0a80ecd95 100644 --- a/scripts/remove_dev.py +++ b/scripts/remove_dev.py @@ -1,89 +1,74 @@ import os -import time - -import requests from deploy_util import ( - HEADER, - SLEEP_DURATION, - get_wiki_api_url, get_wikis, - read_cookie_jar, write_to_github_summary_file, ) -from login_and_get_token import get_token, login +from mediawiki_session import MediaWikiSession, MediaWikiSessionError LUA_DEV_ENV_NAME = os.getenv("LUA_DEV_ENV_NAME") remove_errors: list[str] = list() -def remove_page(session: requests.Session, page: str, wiki: str): - print(f"deleting {wiki}:{page}") - token = get_token(wiki) - - result = session.post( - get_wiki_api_url(wiki), - headers=HEADER, - params={ - "format": "json", - "action": "delete", - }, - data={ - "title": page, - "reason": f"Remove {LUA_DEV_ENV_NAME}", - "token": token, - }, - ).json() - time.sleep(SLEEP_DURATION) - - if "delete" not in result.keys(): - print(f"::warning::could not delete {page} on {wiki}") - write_to_github_summary_file(f":warning: could not delete {page} on {wiki}") - remove_errors.append(f"{wiki}:{page}") +def remove_page(session: MediaWikiSession, page: str): + print(f"deleting {session.wiki}:{page}") - -def search_and_remove(wiki: str): - with requests.Session() as session: - search_result = session.post( - get_wiki_api_url(wiki), - headers=HEADER, - params={"format": "json", "action": "query"}, + try: + session.make_action( + "delete", data={ - "list": "search", - "srsearch": f"intitle:{LUA_DEV_ENV_NAME}", - "srnamespace": 828, - "srlimit": 5000, - "srprop": "", + "title": page, + "reason": f"Remove {LUA_DEV_ENV_NAME}", + "token": session.token, }, - ).json() - time.sleep(SLEEP_DURATION) + ) + except MediaWikiSessionError: + print(f"::warning::could not delete {page} on {session.wiki}") + write_to_github_summary_file( + f":warning: could not delete {page} on {session.wiki}" + ) + remove_errors.append(f"{session.wiki}:{page}") + finally: + session.cooldown() - # Handle API error responses and missing or empty search results safely. - if "error" in search_result: - error_info = search_result.get("error") - print(f"::warning::search API error on {wiki}: {error_info}") + +def search_and_remove(wiki: str): + with MediaWikiSession(wiki) as session: + search_result: dict + try: + search_result = session.make_action( + "query", + data={ + "list": "search", + "srsearch": f"intitle:{LUA_DEV_ENV_NAME}", + "srnamespace": 828, + "srlimit": 5000, + "srprop": "", + }, + ) + except MediaWikiSessionError as e: + print(f"::warning::search API error on {wiki}: {str(e)}") write_to_github_summary_file( - f":warning: search API error on {wiki}: {error_info}" + f":warning: search API error on {wiki}: {str(e)}" ) return + finally: + session.cooldown() - pages = search_result.get("query", {}).get("search") or [] + pages = search_result.get("search") or [] if len(pages) == 0: return - login(wiki) - session.cookies = read_cookie_jar(wiki) - for page in pages: if os.getenv("INCLUDE_SUB_ENVS") == "true" or page["title"].endswith( LUA_DEV_ENV_NAME ): - remove_page(session, page["title"], wiki) + remove_page(session, page["title"]) def main(): - for wiki in get_wikis(): + for wiki in sorted(get_wikis()): if wiki == "commons" and os.getenv("INCLUDE_COMMONS") != "true": continue search_and_remove(wiki)