diff --git a/test/pytests/test_main_regression.py b/test/pytests/test_main_regression.py index 501d5965..c813530c 100644 --- a/test/pytests/test_main_regression.py +++ b/test/pytests/test_main_regression.py @@ -10,10 +10,6 @@ * migrating individual tests if content moves out of main.py * migrating individual tests to test_main.py after assessment of quality * removing and rewriting these tests if contracts change - -For example, since the generation of these tests, main_modes/repl.py was -created, and all tests here touching the REPL functionality should in -principle be removed. """ from __future__ import annotations @@ -25,9 +21,7 @@ import itertools import os from pathlib import Path -import random import sys -import time from types import ModuleType, SimpleNamespace from typing import Any, Callable, Literal, cast @@ -39,8 +33,6 @@ from mycli import main import mycli.key_bindings -import mycli.main_modes.repl -from mycli.packages import key_binding_utils from mycli.packages.sqlresult import SQLResult @@ -80,47 +72,6 @@ def format_output(self, rows: Any, header: Any, format_name: str | None = None, return ['plain output'] -class FakeApp: - def __init__(self, text: str = '', render_counter: int = 0) -> None: - self.current_buffer = SimpleNamespace(text=text) - self.render_counter = render_counter - self.invalidated = False - self.ttimeoutlen: float | None = None - - def invalidate(self) -> None: - self.invalidated = True - - -class FakePromptOutput: - def __init__(self, columns: int = 80, rows: int = 24) -> None: - self.columns = columns - self.rows = rows - self.bell_count = 0 - - def get_size(self) -> SimpleNamespace: - return SimpleNamespace(columns=self.columns, rows=self.rows) - - def bell(self) -> None: - self.bell_count += 1 - - -class FakePromptSession: - def __init__(self, responses: list[Any] | None = None, columns: int = 80, rows: int = 24) -> None: - self.responses = list(responses or []) - self.output = FakePromptOutput(columns=columns, rows=rows) - self.app = FakeApp() - self.prompt_calls: list[dict[str, Any]] = [] - - def prompt(self, **kwargs: Any) -> str: - self.prompt_calls.append(dict(kwargs)) - if not self.responses: - raise EOFError() - response = self.responses.pop(0) - if isinstance(response, BaseException): - raise response - return response - - class FakeCursorBase: def __init__( self, @@ -627,7 +578,7 @@ def test_change_db_handles_empty_same_new_and_backticks(monkeypatch: pytest.Monk assert titles_called['count'] == 2 -def test_execute_from_file_and_change_prompt_format(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None: +def test_execute_from_file(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None: cli = make_bare_mycli() class FakeSQLExecute: @@ -654,9 +605,6 @@ def run(self, query: str) -> list[SQLResult]: ran = list(main.MyCli.execute_from_file(cli, str(sql_file))) assert ran[0].status == 'drop table test;' - assert main.MyCli.change_prompt_format(cli, '')[0].status == 'Missing required argument, format.' - assert main.MyCli.change_prompt_format(cli, '\\u@\\h> ')[0].status == 'Changed prompt format to \\u@\\h> ' - def test_initialize_logging_covers_none_bad_path_and_file_handler(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None: cli = make_bare_mycli() @@ -664,18 +612,15 @@ def test_initialize_logging_covers_none_bad_path_and_file_handler(tmp_path: Path cli.echo = lambda message, **kwargs: echo_calls.append(message) # type: ignore[assignment] cli.config = {'main': {'log_file': str(tmp_path / 'mycli.log'), 'log_level': 'NONE'}} monkeypatch.setattr(main, 'dir_path_exists', lambda path: True) - monkeypatch.setattr(mycli.main_modes.repl, 'dir_path_exists', lambda path: True) main.MyCli.initialize_logging(cli) cli.config = {'main': {'log_file': str(tmp_path / 'missing' / 'mycli.log'), 'log_level': 'INFO'}} monkeypatch.setattr(main, 'dir_path_exists', lambda path: False) - monkeypatch.setattr(mycli.main_modes.repl, 'dir_path_exists', lambda path: False) main.MyCli.initialize_logging(cli) assert echo_calls[-1].startswith('Error: Unable to open the log file') cli.config = {'main': {'log_file': str(tmp_path / 'mycli.log'), 'log_level': 'INFO'}} monkeypatch.setattr(main, 'dir_path_exists', lambda path: True) - monkeypatch.setattr(mycli.main_modes.repl, 'dir_path_exists', lambda path: True) main.MyCli.initialize_logging(cli) @@ -1024,39 +969,6 @@ def __int__(self) -> int: assert any('Invalid port number' in msg for msg in echo_calls) -def test_handle_editor_clip_and_output_timing(monkeypatch: pytest.MonkeyPatch) -> None: - cli = make_bare_mycli() - monkeypatch.setattr(key_binding_utils, 'PromptSession', FakePromptSession) - cli.prompt_session = cast(Any, FakePromptSession(responses=[KeyboardInterrupt(), 'edited sql'])) - cli.get_last_query = lambda: 'last query' # type: ignore[assignment] - monkeypatch.setattr(main.special, 'editor_command', lambda text: text.endswith(r'\e')) - monkeypatch.setattr(main.special, 'get_filename', lambda text: 'query.sql') - monkeypatch.setattr(main.special, 'get_editor_query', lambda text: 'select 1') - monkeypatch.setattr(main.special, 'open_external_editor', lambda filename, sql: ('edited sql', None)) - assert mycli.main_modes.repl.handle_editor_command(cli, r'select 1\e', None, lambda: None) == 'edited sql' - - monkeypatch.setattr(main.special, 'open_external_editor', lambda filename, sql: ('', 'boom')) - with pytest.raises(RuntimeError, match='boom'): - mycli.main_modes.repl.handle_editor_command(cli, r'select 1\e', None, lambda: None) - - monkeypatch.setattr(main.special, 'clip_command', lambda text: True) - monkeypatch.setattr(main.special, 'get_clip_query', lambda text: None) - monkeypatch.setattr(main.special, 'copy_query_to_clipboard', lambda sql: None) - assert mycli.main_modes.repl.handle_clip_command(cli, r'select 1\clip') is True - - monkeypatch.setattr(main.special, 'copy_query_to_clipboard', lambda sql: 'clipboard failed') - with pytest.raises(RuntimeError, match='clipboard failed'): - mycli.main_modes.repl.handle_clip_command(cli, r'select 1\clip') - - monkeypatch.setattr(main.special, 'clip_command', lambda text: False) - assert mycli.main_modes.repl.handle_clip_command(cli, 'select 1') is False - - printed: list[tuple[Any, Any]] = [] - monkeypatch.setattr(main, 'print_formatted_text', lambda text, style=None: printed.append((text, style))) - main.MyCli.output_timing(cli, 'Time: 1.000s', is_warnings_style=True) - assert printed[-1][1] == cli.ptoolkit_style - - def test_format_sqlresult_run_query_reserved_space_and_last_query(monkeypatch: pytest.MonkeyPatch) -> None: cli = make_bare_mycli() cli.main_formatter = DummyFormatter() @@ -1093,7 +1005,7 @@ def test_format_sqlresult_run_query_reserved_space_and_last_query(monkeypatch: p assert main.MyCli.get_last_query(cli) == 'select 1' -def test_reconnect_logging_output_titles_prompt(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None: +def test_reconnect_logging_and_output(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None: cli = make_bare_mycli() sqlexecute = object.__new__(main.SQLExecute) @@ -1144,15 +1056,6 @@ def failing_connect() -> None: assert 'select 1' in contents assert 'hello' in contents - cli.prompt_lines = 0 - prompt_session = FakePromptSession() - prompt_session.app.render_counter = 3 - cli.prompt_session = cast(Any, prompt_session) - monkeypatch.setattr(mycli.main_modes.repl, 'get_prompt', lambda mycli, string, render_counter: 'line1\nline2') - monkeypatch.setattr(main, 'get_prompt', lambda mycli, string, render_counter: 'line1\nline2') - monkeypatch.setattr(main.special, 'is_timing_enabled', lambda: True) - assert main.MyCli.get_output_margin(cli, 'status\nline') == 13 - printed_status: list[Any] = [] echoed_lines: list[str] = [] monkeypatch.setattr(main.special, 'is_redirected', lambda: True) @@ -1160,32 +1063,13 @@ def failing_connect() -> None: monkeypatch.setattr(main.special, 'write_once', lambda text: None) monkeypatch.setattr(main.special, 'write_pipe_once', lambda text: None) monkeypatch.setattr(main.special, 'is_pager_enabled', lambda: False) + monkeypatch.setattr(main.MyCli, 'get_output_margin', lambda self, status=None: 1) monkeypatch.setattr(click, 'secho', lambda line, **kwargs: echoed_lines.append(str(line))) monkeypatch.setattr(main, 'print_formatted_text', lambda text, style=None: printed_status.append((text, style))) main.MyCli.output(cli, itertools.chain(['row 1']), SQLResult(status='status')) assert echoed_lines == [] assert printed_status - cli.prompt_session = None - assert main.to_plain_text(mycli.main_modes.repl.get_custom_toolbar(cli, 'fmt')) == '' - cli.prompt_session = cast(Any, SimpleNamespace(app=None)) - assert main.to_plain_text(mycli.main_modes.repl.get_custom_toolbar(cli, 'fmt')) == '' - - monkeypatch.setattr(mycli.main_modes.repl.sys.stderr, 'isatty', lambda: False) - cli.prompt_session = cast(Any, FakePromptSession()) - cli.terminal_tab_title_format = 'tab' - cli.terminal_window_title_format = 'window' - cli.multiplex_window_title_format = 'mux-window' - cli.multiplex_pane_title_format = 'mux-pane' - mycli.main_modes.repl.set_external_terminal_tab_title(cli) - mycli.main_modes.repl.set_external_terminal_window_title(cli) - monkeypatch.delenv('TMUX', raising=False) - mycli.main_modes.repl.set_external_multiplex_window_title(cli) - mycli.main_modes.repl.set_external_multiplex_pane_title(cli) - monkeypatch.setenv('TMUX', '1') - monkeypatch.setattr(mycli.main_modes.repl.subprocess, 'run', lambda *args, **kwargs: (_ for _ in ()).throw(FileNotFoundError())) - mycli.main_modes.repl.set_external_multiplex_window_title(cli) - def test_reconnect_first_and_second_passes(monkeypatch: pytest.MonkeyPatch) -> None: cli = make_bare_mycli() @@ -1231,45 +1115,7 @@ def fake_reset_connection_id() -> None: assert 'Reconnected successfully.' in echoes -def test_get_prompt_and_completion_helper_fallbacks(monkeypatch: pytest.MonkeyPatch) -> None: - cli = make_bare_mycli() - sqlexecute = object.__new__(main.SQLExecute) - sqlexecute.user = 'alice' - sqlexecute.host = '127.0.0.1' - sqlexecute.dbname = 'db' - sqlexecute.port = 3307 - sqlexecute.socket = '/tmp/mysql.sock' - sqlexecute.server_info = cast(Any, SimpleNamespace(species=SimpleNamespace(name='TiDB'))) - sqlexecute.conn = None - cli.sqlexecute = cast(Any, sqlexecute) - cli.login_path = 'prod' - cli.login_path_as_host = True - cli.dsn_alias = 'dsn' - prompt = mycli.main_modes.repl.get_prompt(cli, r'\h|\H|\A|\y|\Y|\T|\w|\W', 0) - assert prompt == 'prod|prod|dsn|(none)|(none)|(none)|(none)|' - - class PromptCursor: - def __enter__(self) -> 'PromptCursor': - return self - - def __exit__(self, exc_type: Any, exc: Any, tb: Any) -> Literal[False]: - return False - - class PromptConnection: - def cursor(self) -> PromptCursor: - return PromptCursor() - - sqlexecute.conn = cast(Any, PromptConnection()) - cli.login_path_as_host = False - monkeypatch.setattr(mycli.main_modes.repl, 'get_uptime', lambda cur: 123) - monkeypatch.setattr(mycli.main_modes.repl, 'format_uptime', lambda uptime: f'uptime:{uptime}') - monkeypatch.setattr(mycli.main_modes.repl, 'get_ssl_version', lambda cur: 'TLSv1.3') - monkeypatch.setattr(mycli.main_modes.repl, 'get_warning_count', lambda cur: 7) - prompt = mycli.main_modes.repl.get_prompt(cli, r'\H|\y|\Y|\T|\w|\W', 1) - assert prompt == '127.0.0.1|123|uptime:123|TLSv1.3|7|7' - - -def test_format_sqlresult_string_paths_and_close_and_title_early_returns(monkeypatch: pytest.MonkeyPatch) -> None: +def test_format_sqlresult_string_paths_and_close() -> None: cli = make_bare_mycli() closed: list[bool] = [] cli.sqlexecute = cast(Any, SimpleNamespace(close=lambda: closed.append(True))) @@ -1289,18 +1135,6 @@ def format_output(self, rows: Any, header: Any, format_name: str | None = None, assert list(main.MyCli.format_sqlresult(cli, result, max_width=10)) == ['short', 'second'] assert list(main.MyCli.format_sqlresult(cli, result, max_width=2)) == ['vertical-a', 'vertical-b'] - cli.prompt_session = None - cli.terminal_tab_title_format = 'tab' - cli.terminal_window_title_format = 'window' - cli.multiplex_window_title_format = 'mux-window' - cli.multiplex_pane_title_format = 'mux-pane' - monkeypatch.setenv('TMUX', '1') - monkeypatch.setattr(mycli.main_modes.repl.sys.stderr, 'isatty', lambda: True) - mycli.main_modes.repl.set_external_terminal_tab_title(cli) - mycli.main_modes.repl.set_external_terminal_window_title(cli) - mycli.main_modes.repl.set_external_multiplex_window_title(cli) - mycli.main_modes.repl.set_external_multiplex_pane_title(cli) - def test_output_uses_stdout_and_pager_paths(monkeypatch: pytest.MonkeyPatch) -> None: cli = make_bare_mycli() @@ -1331,17 +1165,12 @@ def test_output_uses_stdout_and_pager_paths(monkeypatch: pytest.MonkeyPatch) -> assert paged_lines[-2:] == ['row1\n', 'row2\n'] -def test_format_sqlresult_output_and_prompt_helpers_cover_extra_branches(monkeypatch: pytest.MonkeyPatch) -> None: +def test_format_sqlresult_output_covers_extra_branches(monkeypatch: pytest.MonkeyPatch) -> None: cli = make_bare_mycli() - real_get_prompt = mycli.main_modes.repl.get_prompt cli.main_formatter = DummyFormatter() cli.redirect_formatter = DummyFormatter() cli.get_reserved_space = lambda: 1 # type: ignore[assignment] - cli.prompt_lines = 0 - cli.prompt_session = None - monkeypatch.setattr(main, 'get_prompt', lambda mycli, string, render_counter: 'a\nb') monkeypatch.setattr(main, 'Cursor', FakeCursorBase) - monkeypatch.setattr(main.special, 'is_timing_enabled', lambda: False) rows = FakeCursorBase(rows=[], rowcount=0, description=[('id', 3, None, None, None, None, None)]) result = SQLResult( header=['id'], @@ -1364,6 +1193,7 @@ def test_format_sqlresult_output_and_prompt_helpers_cover_extra_branches(monkeyp monkeypatch.setattr(main.special, 'write_pipe_once', lambda text: None) monkeypatch.setattr(main.special, 'is_redirected', lambda: False) monkeypatch.setattr(main.special, 'is_pager_enabled', lambda: True) + monkeypatch.setattr(main.MyCli, 'get_output_margin', lambda self, status=None: 1) monkeypatch.setattr(click, 'echo_via_pager', lambda gen: paged_lines.extend(list(gen))) monkeypatch.setattr(click, 'secho', lambda line, **kwargs: printed_lines.append(str(line))) monkeypatch.setattr(main, 'print_formatted_text', lambda text, style=None: status_prints.append(text)) @@ -1376,31 +1206,6 @@ def test_format_sqlresult_output_and_prompt_helpers_cover_extra_branches(monkeyp assert printed_lines[-1] == 'short' assert status_prints - assert main.MyCli.get_output_margin(cli, 'ok\nnext') == 5 - - cli.terminal_tab_title_format = '' - cli.terminal_window_title_format = '' - cli.multiplex_window_title_format = '' - cli.multiplex_pane_title_format = '' - mycli.main_modes.repl.set_external_terminal_tab_title(cli) - mycli.main_modes.repl.set_external_terminal_window_title(cli) - mycli.main_modes.repl.set_external_multiplex_window_title(cli) - mycli.main_modes.repl.set_external_multiplex_pane_title(cli) - - cli.sqlexecute = SimpleNamespace( - server_info=SimpleNamespace(species=SimpleNamespace(name='MySQL')), - host=None, - user=None, - dbname=None, - port=3306, - socket=None, - conn=None, - ) - monkeypatch.setattr(main, 'get_prompt', real_get_prompt) - prompt = mycli.main_modes.repl.get_prompt(cli, '\\h \\H \\y \\Y \\T \\w \\W', 0) - assert main.DEFAULT_HOST in prompt - assert '(none)' in prompt - def test_main_handles_click_exception_without_exit_code(monkeypatch: pytest.MonkeyPatch) -> None: class NoExitCode(click.ClickException): @@ -1423,46 +1228,6 @@ def test_filtered_sys_argv_covers_help_and_passthrough(monkeypatch: pytest.Monke assert main.filtered_sys_argv() == ['-h', 'db.example'] -def test_completion_helpers_title_helpers_thanks_tips(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None: - cli = make_bare_mycli() - cli.completer = cast(Any, SimpleNamespace(keyword_casing='auto', get_completions=lambda document, event: ['done'])) - entered_lock = {'count': 0} - - cli._completer_lock = cast(Any, ReusableLock(lambda: entered_lock.__setitem__('count', entered_lock['count'] + 1))) - prompt_session = FakePromptSession() - prompt_session.app.current_buffer.text = '' - cli.prompt_session = cast(Any, prompt_session) - monkeypatch.setattr(mycli.main_modes.repl, 'get_prompt', lambda mycli, string, render_counter: f'title:{string}') - monkeypatch.setattr(mycli.main_modes.repl, 'sanitize_terminal_title', lambda title: title.upper()) - monkeypatch.setattr(mycli.main_modes.repl.sys.stderr, 'isatty', lambda: True) - printed: list[str] = [] - monkeypatch.setattr(builtins, 'print', lambda *args, **kwargs: printed.append(args[0])) - monkeypatch.setattr(mycli.main_modes.repl.subprocess, 'run', lambda *args, **kwargs: None) - monkeypatch.setenv('TMUX', '1') - cli.terminal_tab_title_format = 'tab' - cli.terminal_window_title_format = 'window' - cli.multiplex_window_title_format = 'mux-window' - cli.multiplex_pane_title_format = 'mux-pane' - mycli.main_modes.repl.set_all_external_titles(cli) - assert printed[0].startswith('\x1b]1;TITLE:TAB') - assert printed[1].startswith('\x1b]2;TITLE:WINDOW') - assert printed[2].startswith('\x1b]2;TITLE:MUX-PANE') - monkeypatch.setattr(mycli.main_modes.repl.sys.stderr, 'isatty', lambda: False) - mycli.main_modes.repl.set_external_multiplex_pane_title(cli) - - cli.prompt_session.app.current_buffer.text = 'in progress' - assert mycli.main_modes.repl.get_custom_toolbar(cli, 'x') == cli.last_custom_toolbar_message - cli.prompt_session.app.current_buffer.text = '' - assert 'title:x' in str(mycli.main_modes.repl.get_custom_toolbar(cli, 'x')) - - new_completer = cast(Any, SimpleNamespace(get_completions=lambda document, event: ['done'])) - main.MyCli._on_completions_refreshed(cli, new_completer) - assert cli.completer is new_completer - assert prompt_session.app.invalidated is True - assert list(main.MyCli.get_completions(cli, 'select', 6)) == ['done'] - assert entered_lock['count'] >= 2 - - def test_main_wrapper_and_edit_and_execute(monkeypatch: pytest.MonkeyPatch) -> None: monkeypatch.setattr(main, 'filtered_sys_argv', lambda: ['--help']) monkeypatch.setattr(main.click_entrypoint, 'main', lambda *args, **kwargs: None) @@ -1601,17 +1366,6 @@ def test_click_entrypoint_branches_with_dummy_mycli(monkeypatch: pytest.MonkeyPa assert dummy.main_formatter.format_name == 'csv' assert dummy.run_query_calls[-1][0] == 'select 1' - dummy_class = make_dummy_mycli_class(config={'main': {}, 'alias_dsn': {}}) - monkeypatch.setattr(main, 'MyCli', dummy_class) - monkeypatch.setattr(main.sys, 'stdin', SimpleNamespace(isatty=lambda: True)) - cli_args = main.CliArgs() - assert main.click_entrypoint.callback is not None - cast(Any, main.click_entrypoint.callback).__wrapped__(cli_args) - dummy = dummy_class.last_instance - assert dummy is not None - assert dummy.run_cli_called is True - assert dummy.close_called is True - def test_click_entrypoint_password_file_and_dsn_early_branches(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None: runner = CliRunner() @@ -1973,733 +1727,3 @@ def fake_refresh(reset: bool = False) -> list[SQLResult]: 'keyword_casing': 'upper', } assert result[0].status == 'Auto-completion refresh started in the background.' - - -def test_run_cli_bootstraps_and_processes_a_simple_query(monkeypatch: pytest.MonkeyPatch) -> None: - cli = make_bare_mycli() - cli.smart_completion = True - cli.key_bindings = 'emacs' - cli.config = {'history_file': '~/.mycli-history-testing'} - refresh_resets: list[bool] = [] - - def fake_refresh_completions(reset: bool = False) -> list[SQLResult]: - refresh_resets.append(reset) - return [SQLResult(status='refresh')] - - cli.refresh_completions = fake_refresh_completions # type: ignore[assignment] - echo_calls: list[str] = [] - cli.echo = lambda message, **kwargs: echo_calls.append(str(message)) # type: ignore[assignment] - outputs: list[list[str]] = [] - cli.output = lambda formatted, result, is_warnings_style=False: outputs.append(list(formatted)) # type: ignore[assignment] - cli.handle_editor_command = lambda text, inputhook, loaded_message_fn: text # type: ignore[assignment] - cli.handle_clip_command = lambda text: False # type: ignore[assignment] - cli.log_query = lambda text: None # type: ignore[assignment] - cli.log_output = lambda text: None # type: ignore[assignment] - cli.set_all_external_titles = lambda: None # type: ignore[assignment] - cli.format_sqlresult = lambda result, **kwargs: iter(['formatted']) # type: ignore[assignment] - cli.query_history = [] - prompt_session = FakePromptSession(responses=['select 1', EOFError()]) - - class FakeRunSQLExecute: - def __init__(self) -> None: - self.server_info = SimpleNamespace(species=SimpleNamespace(name='MySQL')) - self.dbname = 'db' - self.connection_id = 0 - self.host = 'localhost' - self.port = 3306 - self.user = 'root' - - def run(self, text: str) -> list[SQLResult]: - return [SQLResult(status='SELECT 1', header=['a'], rows=[(1,)])] - - monkeypatch.setattr(main, 'SQLExecute', FakeRunSQLExecute) - sqlexecute = FakeRunSQLExecute() - cli.sqlexecute = cast(Any, sqlexecute) - monkeypatch.setattr(main, 'SQLExecute', FakeRunSQLExecute) - monkeypatch.setattr(mycli.main_modes.repl, 'PromptSession', lambda **kwargs: prompt_session) - monkeypatch.setattr(mycli.main_modes.repl, 'mycli_bindings', lambda mycli: 'bindings') - monkeypatch.setattr(mycli.main_modes.repl, 'create_toolbar_tokens_func', lambda *args: 'toolbar') - monkeypatch.setattr(main, 'style_factory_ptoolkit', lambda *args, **kwargs: 'style') - monkeypatch.setattr(main, 'dir_path_exists', lambda path: False) - monkeypatch.setattr(mycli.main_modes.repl, 'dir_path_exists', lambda path: False) - monkeypatch.setattr(mycli.main_modes.repl, 'cli_is_multiline', lambda mycli: False) - monkeypatch.setattr(main.special, 'set_expanded_output', lambda value: None) - monkeypatch.setattr(main.special, 'set_forced_horizontal_output', lambda value: None) - monkeypatch.setattr(main.special, 'is_llm_command', lambda text: False) - monkeypatch.setattr(main.special, 'is_expanded_output', lambda: False) - monkeypatch.setattr(main.special, 'is_redirected', lambda: False) - monkeypatch.setattr(main.special, 'is_timing_enabled', lambda: False) - monkeypatch.setattr(main.special, 'write_tee', lambda *args, **kwargs: None) - monkeypatch.setattr(main.special, 'unset_once_if_written', lambda *args, **kwargs: None) - monkeypatch.setattr(main.special, 'flush_pipe_once_if_written', lambda *args, **kwargs: None) - monkeypatch.setattr(main.special, 'close_tee', lambda: None) - monkeypatch.setattr(mycli.main_modes.repl, 'is_redirect_command', lambda text: False) - monkeypatch.setattr(main, 'confirm_destructive_query', lambda keywords, text: None) - monkeypatch.setattr(mycli.main_modes.repl, 'need_completion_refresh', lambda text: False) - monkeypatch.setattr(mycli.main_modes.repl, 'is_dropping_database', lambda text, dbname: False) - main.MyCli.run_cli(cli) - assert refresh_resets == [False] - assert outputs == [['formatted']] - assert cli.query_history[-1].query == 'select 1' - assert echo_calls[0].startswith('Error: Unable to open the history file') - assert prompt_session.app.ttimeoutlen == cli.emacs_ttimeoutlen - - -def test_run_cli_delegates_to_main_repl(monkeypatch: pytest.MonkeyPatch) -> None: - cli = make_bare_mycli() - calls: list[Any] = [] - monkeypatch.setattr(main, 'main_repl', lambda target: calls.append(target)) - main.MyCli.run_cli(cli) - assert calls == [cli] - - -def test_run_cli_large_select_asks_for_confirmation(monkeypatch: pytest.MonkeyPatch) -> None: - cli = make_bare_mycli() - cli.config = {'history_file': '~/.mycli-history-testing'} - cli.handle_editor_command = lambda text, inputhook, loaded_message_fn: text # type: ignore[assignment] - cli.handle_clip_command = lambda text: False # type: ignore[assignment] - cli.log_query = lambda text: None # type: ignore[assignment] - cli.log_output = lambda text: None # type: ignore[assignment] - cli.set_all_external_titles = lambda: None # type: ignore[assignment] - cli.format_sqlresult = lambda result, **kwargs: iter(['formatted']) # type: ignore[assignment] - echoed: list[str] = [] - cli.echo = lambda message, **kwargs: echoed.append(str(message)) # type: ignore[assignment] - prompt_session = FakePromptSession(responses=['select * from t', EOFError()]) - monkeypatch.setattr(mycli.main_modes.repl, 'PromptSession', lambda **kwargs: prompt_session) - monkeypatch.setattr(mycli.main_modes.repl, 'mycli_bindings', lambda mycli: 'bindings') - monkeypatch.setattr(mycli.main_modes.repl, 'create_toolbar_tokens_func', lambda *args: 'toolbar') - monkeypatch.setattr(main, 'style_factory_ptoolkit', lambda *args, **kwargs: 'style') - monkeypatch.setattr(main, 'dir_path_exists', lambda path: True) - monkeypatch.setattr(mycli.main_modes.repl, 'dir_path_exists', lambda path: True) - monkeypatch.setattr(mycli.main_modes.repl, 'cli_is_multiline', lambda mycli: False) - monkeypatch.setattr(mycli.main_modes.repl, 'Cursor', FakeCursorBase) - monkeypatch.setattr(main.special, 'set_expanded_output', lambda value: None) - monkeypatch.setattr(main.special, 'set_forced_horizontal_output', lambda value: None) - monkeypatch.setattr(main.special, 'is_llm_command', lambda text: False) - monkeypatch.setattr(main.special, 'is_expanded_output', lambda: False) - monkeypatch.setattr(main.special, 'is_redirected', lambda: False) - monkeypatch.setattr(main.special, 'is_timing_enabled', lambda: False) - monkeypatch.setattr(main.special, 'write_tee', lambda *args, **kwargs: None) - monkeypatch.setattr(main.special, 'unset_once_if_written', lambda *args, **kwargs: None) - monkeypatch.setattr(main.special, 'flush_pipe_once_if_written', lambda *args, **kwargs: None) - monkeypatch.setattr(main.special, 'close_tee', lambda: None) - monkeypatch.setattr(mycli.main_modes.repl, 'is_redirect_command', lambda text: False) - monkeypatch.setattr(main, 'confirm_destructive_query', lambda keywords, text: None) - monkeypatch.setattr(mycli.main_modes.repl, 'need_completion_refresh', lambda text: False) - monkeypatch.setattr(mycli.main_modes.repl, 'is_dropping_database', lambda text, dbname: False) - monkeypatch.setattr(mycli.main_modes.repl, 'confirm', lambda text: False) - rows = FakeCursorBase(rows=[(1,)], rowcount=1001, description=[('id', 3)], warning_count=0) - - class FakeRunSQLExecute: - def __init__(self) -> None: - self.server_info = SimpleNamespace(species=SimpleNamespace(name='MySQL')) - self.dbname = 'db' - self.connection_id = 0 - - def run(self, text: str) -> list[SQLResult]: - return [SQLResult(status='SELECT 1', header=['id'], rows=cast(Any, rows))] - - monkeypatch.setattr(main, 'SQLExecute', FakeRunSQLExecute) - cli.sqlexecute = cast(Any, FakeRunSQLExecute()) - main.MyCli.run_cli(cli) - assert any('The result set has more than 1000 rows.' in line for line in echoed) - assert any('Aborted!' in line for line in echoed) - - -def test_run_cli_outputs_warnings_and_timing(monkeypatch: pytest.MonkeyPatch) -> None: - cli = make_bare_mycli() - cli.config = {'history_file': '~/.mycli-history-testing'} - cli.handle_editor_command = lambda text, inputhook, loaded_message_fn: text # type: ignore[assignment] - cli.handle_clip_command = lambda text: False # type: ignore[assignment] - cli.log_query = lambda text: None # type: ignore[assignment] - cli.log_output = lambda text: None # type: ignore[assignment] - cli.set_all_external_titles = lambda: None # type: ignore[assignment] - cli.beep_after_seconds = 0.0 - cli.show_warnings = True - rendered: list[list[str]] = [] - cli.output = lambda formatted, result, is_warnings_style=False: rendered.append(list(formatted)) # type: ignore[assignment] - timings: list[tuple[str, bool]] = [] - cli.output_timing = lambda timing, is_warnings_style=False: timings.append((timing, is_warnings_style)) # type: ignore[assignment] - cli.format_sqlresult = lambda result, **kwargs: iter([result.status_plain or 'row']) # type: ignore[assignment] - prompt_session = FakePromptSession(responses=['select 1', EOFError()]) - monkeypatch.setattr(mycli.main_modes.repl, 'PromptSession', lambda **kwargs: prompt_session) - monkeypatch.setattr(mycli.main_modes.repl, 'mycli_bindings', lambda mycli: 'bindings') - monkeypatch.setattr(mycli.main_modes.repl, 'create_toolbar_tokens_func', lambda *args: 'toolbar') - monkeypatch.setattr(main, 'style_factory_ptoolkit', lambda *args, **kwargs: 'style') - monkeypatch.setattr(main, 'dir_path_exists', lambda path: True) - monkeypatch.setattr(mycli.main_modes.repl, 'dir_path_exists', lambda path: True) - monkeypatch.setattr(mycli.main_modes.repl, 'cli_is_multiline', lambda mycli: False) - monkeypatch.setattr(mycli.main_modes.repl, 'Cursor', FakeCursorBase) - monkeypatch.setattr(main.special, 'set_expanded_output', lambda value: None) - monkeypatch.setattr(main.special, 'set_forced_horizontal_output', lambda value: None) - monkeypatch.setattr(main.special, 'is_llm_command', lambda text: False) - monkeypatch.setattr(main.special, 'is_expanded_output', lambda: False) - monkeypatch.setattr(main.special, 'is_redirected', lambda: False) - monkeypatch.setattr(main.special, 'is_timing_enabled', lambda: True) - monkeypatch.setattr(main.special, 'write_tee', lambda *args, **kwargs: None) - monkeypatch.setattr(main.special, 'unset_once_if_written', lambda *args, **kwargs: None) - monkeypatch.setattr(main.special, 'flush_pipe_once_if_written', lambda *args, **kwargs: None) - monkeypatch.setattr(main.special, 'close_tee', lambda: None) - monkeypatch.setattr(mycli.main_modes.repl, 'is_redirect_command', lambda text: False) - monkeypatch.setattr(main, 'confirm_destructive_query', lambda keywords, text: None) - monkeypatch.setattr(mycli.main_modes.repl, 'need_completion_refresh', lambda text: False) - monkeypatch.setattr(mycli.main_modes.repl, 'is_dropping_database', lambda text, dbname: False) - warning_rows = FakeCursorBase(rows=[('Level', 1, 'Message')], rowcount=1, description=[('id', 3)], warning_count=1) - main_result = SQLResult(status='SELECT 1', header=['id'], rows=cast(Any, warning_rows)) - warning_result = SQLResult(status='Warning', header=['level'], rows=[('Warning',)]) - - class FakeRunSQLExecute: - def __init__(self) -> None: - self.server_info = SimpleNamespace(species=SimpleNamespace(name='MySQL')) - self.dbname = 'db' - self.connection_id = 0 - self.host = 'localhost' - self.port = 3306 - self.user = 'root' - - def run(self, text: str) -> list[SQLResult]: - if text == 'SHOW WARNINGS': - return [warning_result] - return [main_result] - - monkeypatch.setattr(main, 'SQLExecute', FakeRunSQLExecute) - cli.sqlexecute = cast(Any, FakeRunSQLExecute()) - main.MyCli.run_cli(cli) - assert rendered[0] == ['SELECT 1'] - assert rendered[1] == ['Warning'] - assert any(item[1] is False for item in timings) - assert any(item[1] is True for item in timings) - - -def test_run_cli_prompt_rendering_startup_modes_and_goodbye(monkeypatch: pytest.MonkeyPatch) -> None: - cli = make_bare_mycli() - cli.less_chatty = False - cli.toolbar_format = 'default' - cli.wider_completion_menu = True - cli.key_bindings = 'vi' - cli.vi_ttimeoutlen = 9.0 - cli.multiline_continuation_char = '>' - cli.max_len_prompt = 5 - cli.config = {'history_file': '~/.mycli-history-testing'} - monkeypatch.setattr( - mycli.main_modes.repl, - 'get_prompt', - lambda mycli, string, render_counter: '0123456789' if string == cli.default_prompt else 'a\nb', - ) - monkeypatch.setattr(mycli.main_modes.repl, 'set_all_external_titles', lambda mycli: None) - toolbar_help: list[bool] = [] - prints: list[str] = [] - prompt_messages: list[str] = [] - continuations: list[Any] = [] - - class InspectPromptSession(FakePromptSession): - def prompt(self, **kwargs: Any) -> str: - prompt_messages.append(main.to_plain_text(kwargs['message']())) - self.app.current_buffer.text = 'typing' - prompt_messages.append(main.to_plain_text(kwargs['message']())) - raise EOFError() - - prompt_session = InspectPromptSession() - - class FakeRunSQLExecute: - def __init__(self) -> None: - self.server_info = 'Server' - self.dbname = 'db' - self.connection_id = 0 - self.host = 'localhost' - self.port = 3306 - self.user = 'root' - - monkeypatch.setattr(main, 'SQLExecute', FakeRunSQLExecute) - cli.sqlexecute = cast(Any, FakeRunSQLExecute()) - - def fake_prompt_session(**kwargs: Any) -> InspectPromptSession: - continuations.append(kwargs['prompt_continuation'](4, 0, 0)) - cli.multiline_continuation_char = '' - continuations.append(kwargs['prompt_continuation'](4, 0, 0)) - cli.multiline_continuation_char = None # type: ignore[assignment] - continuations.append(kwargs['prompt_continuation'](4, 0, 0)) - return prompt_session - - monkeypatch.setattr(mycli.main_modes.repl, 'PromptSession', fake_prompt_session) - monkeypatch.setattr(mycli.main_modes.repl, 'mycli_bindings', lambda mycli: 'bindings') - - def fake_create_toolbar_tokens(mycli: Any, show_help: Any, fmt: str, custom_toolbar: Any) -> str: - toolbar_help.append(show_help()) - return 'toolbar' - - monkeypatch.setattr(mycli.main_modes.repl, 'create_toolbar_tokens_func', fake_create_toolbar_tokens) - monkeypatch.setattr(main, 'style_factory_ptoolkit', lambda *args, **kwargs: 'style') - monkeypatch.setattr(main, 'dir_path_exists', lambda path: True) - monkeypatch.setattr(mycli.main_modes.repl, 'dir_path_exists', lambda path: True) - monkeypatch.setattr(mycli.main_modes.repl, 'cli_is_multiline', lambda mycli: False) - monkeypatch.setattr(main.special, 'close_tee', lambda: None) - monkeypatch.setattr(random, 'random', lambda: 0.4) - monkeypatch.setattr(builtins, 'print', lambda *args, **kwargs: prints.append(' '.join(str(x) for x in args))) - echoed: list[str] = [] - cli.echo = lambda message, **kwargs: echoed.append(str(message)) # type: ignore[assignment] - main.MyCli.run_cli(cli) - assert toolbar_help == [True] - assert prints[0] == 'Server' - assert any('Thanks to the contributor' in line for line in prints) - assert prompt_messages == ['a\nb', 'a\nb'] - assert continuations == [[('class:continuation', ' > ')], [('class:continuation', '')], [('class:continuation', ' ')]] - assert prompt_session.app.ttimeoutlen == 9.0 - assert echoed[-1] == 'Goodbye!' - - -def test_run_cli_llm_paths_and_finish_iteration(monkeypatch: pytest.MonkeyPatch) -> None: - cli = make_bare_mycli() - cli.config = {'history_file': '~/.mycli-history-testing'} - cli.llm_prompt_field_truncate = 0 - cli.llm_prompt_section_truncate = 0 - cli.log_query = lambda text: None # type: ignore[assignment] - cli.log_output = lambda text: None # type: ignore[assignment] - cli.set_all_external_titles = lambda: None # type: ignore[assignment] - outputs: list[list[str]] = [] - cli.output = lambda formatted, result, is_warnings_style=False: outputs.append(list(formatted)) # type: ignore[assignment] - cli.format_sqlresult = lambda result, **kwargs: iter([result.status_plain or 'row']) # type: ignore[assignment] - timings: list[str] = [] - cli.output_timing = lambda timing, is_warnings_style=False: timings.append(timing) # type: ignore[assignment] - click_output: list[str] = [] - monkeypatch.setattr(click, 'echo', lambda message='', **kwargs: click_output.append(str(message))) - - class LLMConnection: - def cursor(self) -> str: - return 'cursor' - - class FakeRunSQLExecute: - def __init__(self) -> None: - self.server_info = SimpleNamespace(species=SimpleNamespace(name='MySQL')) - self.dbname = 'db' - self.connection_id = 0 - self.conn = LLMConnection() - self.host = 'localhost' - self.port = 3306 - self.user = 'root' - - def run(self, text: str) -> Iterator[SQLResult]: - return iter([SQLResult(status=f'ran:{text}')]) - - monkeypatch.setattr(main, 'SQLExecute', FakeRunSQLExecute) - cli.sqlexecute = cast(Any, FakeRunSQLExecute()) - prompt_session = FakePromptSession(responses=['\\llm ask', 'select 1', '\\llm finish', '\\llm empty', '\\llm err', EOFError()]) - monkeypatch.setattr(mycli.main_modes.repl, 'PromptSession', lambda **kwargs: prompt_session) - monkeypatch.setattr(mycli.main_modes.repl, 'mycli_bindings', lambda mycli: 'bindings') - monkeypatch.setattr(mycli.main_modes.repl, 'create_toolbar_tokens_func', lambda *args: 'toolbar') - monkeypatch.setattr(main, 'style_factory_ptoolkit', lambda *args, **kwargs: 'style') - monkeypatch.setattr(main, 'dir_path_exists', lambda path: True) - monkeypatch.setattr(mycli.main_modes.repl, 'dir_path_exists', lambda path: True) - monkeypatch.setattr(mycli.main_modes.repl, 'cli_is_multiline', lambda mycli: False) - monkeypatch.setattr(main.special, 'set_expanded_output', lambda value: None) - monkeypatch.setattr(main.special, 'set_forced_horizontal_output', lambda value: None) - monkeypatch.setattr(main.special, 'is_expanded_output', lambda: False) - monkeypatch.setattr(main.special, 'is_redirected', lambda: False) - monkeypatch.setattr(main.special, 'is_timing_enabled', lambda: True) - monkeypatch.setattr(main.special, 'write_tee', lambda *args, **kwargs: None) - monkeypatch.setattr(main.special, 'unset_once_if_written', lambda *args, **kwargs: None) - monkeypatch.setattr(main.special, 'flush_pipe_once_if_written', lambda *args, **kwargs: None) - monkeypatch.setattr(main.special, 'close_tee', lambda: None) - monkeypatch.setattr(mycli.main_modes.repl, 'is_redirect_command', lambda text: False) - monkeypatch.setattr(main, 'confirm_destructive_query', lambda keywords, text: None) - monkeypatch.setattr(mycli.main_modes.repl, 'need_completion_refresh', lambda text: False) - monkeypatch.setattr(mycli.main_modes.repl, 'is_dropping_database', lambda text, dbname: False) - monkeypatch.setattr(main.special, 'is_llm_command', lambda text: text.startswith('\\llm')) - - def fake_handle_llm(text: str, cur: Any, dbname: str, field_truncate: int, section_truncate: int) -> tuple[str, str, float]: - if text == '\\llm ask': - return ('context', 'select 1', 1.25) - if text == '\\llm finish': - raise main.special.FinishIteration(iter([SQLResult(status='llm-finished')])) - if text == '\\llm empty': - raise main.special.FinishIteration(None) - raise RuntimeError('llm boom') - - monkeypatch.setattr(main.special, 'handle_llm', fake_handle_llm) - cli.echo = lambda message, **kwargs: click_output.append(str(message)) # type: ignore[assignment] - main.MyCli.run_cli(cli) - assert click_output[:3] == ['LLM Response:', 'context', '---'] - assert any('Time: 1.25 seconds' in timing for timing in timings) - assert ['ran:select 1'] in outputs - assert ['llm-finished'] in outputs - assert any('llm boom' in line for line in click_output) - - -def test_run_cli_reconnect_and_exception_paths(monkeypatch: pytest.MonkeyPatch) -> None: - cli = make_bare_mycli() - cli.config = {'history_file': '~/.mycli-history-testing'} - cli.log_query = lambda text: None # type: ignore[assignment] - cli.log_output = lambda text: None # type: ignore[assignment] - cli.set_all_external_titles = lambda: None # type: ignore[assignment] - cli.output = lambda formatted, result, is_warnings_style=False: None # type: ignore[assignment] - cli.format_sqlresult = lambda result, **kwargs: iter([result.status_plain or 'row']) # type: ignore[assignment] - cli.handle_editor_command = lambda text, inputhook, loaded_message_fn: text # type: ignore[assignment] - cli.handle_clip_command = lambda text: False # type: ignore[assignment] - prompt_session = FakePromptSession( - responses=[ - 'iface', - 'op-reconnect', - 'op-error', - 'generic', - 'nyi', - 'dropdb', - EOFError(), - ] - ) - echoes: list[str] = [] - cli.echo = lambda message, **kwargs: echoes.append(str(message)) # type: ignore[assignment] - refresh_calls: list[bool] = [] - - def fake_refresh_completions(reset: bool = False) -> list[SQLResult]: - refresh_calls.append(reset) - return [SQLResult(status='refresh')] - - cli.refresh_completions = fake_refresh_completions # type: ignore[assignment] - reconnect_calls: list[str] = [] - reconnect_results = iter([True, True]) - - def fake_reconnect(database: str = '') -> bool: - reconnect_calls.append(database) - return next(reconnect_results) - - cli.reconnect = fake_reconnect # type: ignore[assignment] - - class FakeRunSQLExecute: - def __init__(self) -> None: - self.server_info = SimpleNamespace(species=SimpleNamespace(name='MySQL')) - self.dbname: str | None = 'db' - self.connection_id = 0 - self.conn = SimpleNamespace() - self.calls: list[str] = [] - self.host = 'localhost' - self.port = 3306 - self.user = 'root' - - def connect(self) -> None: - self.calls.append('connect') - - def run(self, text: str) -> Iterator[SQLResult]: - self.calls.append(text) - if text == 'iface' and self.calls.count('iface') == 1: - raise pymysql.err.InterfaceError() - if text == 'op-reconnect' and self.calls.count('op-reconnect') == 1: - raise pymysql.OperationalError(2003, 'lost') - if text == 'op-error': - raise pymysql.OperationalError(9999, 'bad op') - if text == 'generic': - raise RuntimeError('boom') - if text == 'nyi': - raise NotImplementedError() - return iter([SQLResult(status='DROP 1') if text == 'dropdb' else SQLResult(status=f'ok:{text}')]) - - monkeypatch.setattr(main, 'SQLExecute', FakeRunSQLExecute) - sqlexecute = FakeRunSQLExecute() - cli.sqlexecute = cast(Any, sqlexecute) - monkeypatch.setattr(mycli.main_modes.repl, 'PromptSession', lambda **kwargs: prompt_session) - monkeypatch.setattr(mycli.main_modes.repl, 'mycli_bindings', lambda mycli: 'bindings') - monkeypatch.setattr(mycli.main_modes.repl, 'create_toolbar_tokens_func', lambda *args: 'toolbar') - monkeypatch.setattr(main, 'style_factory_ptoolkit', lambda *args, **kwargs: 'style') - monkeypatch.setattr(main, 'dir_path_exists', lambda path: True) - monkeypatch.setattr(mycli.main_modes.repl, 'dir_path_exists', lambda path: True) - monkeypatch.setattr(mycli.main_modes.repl, 'cli_is_multiline', lambda mycli: False) - monkeypatch.setattr(main.special, 'set_expanded_output', lambda value: None) - monkeypatch.setattr(main.special, 'set_forced_horizontal_output', lambda value: None) - monkeypatch.setattr(main.special, 'is_llm_command', lambda text: False) - monkeypatch.setattr(main.special, 'is_expanded_output', lambda: False) - monkeypatch.setattr(main.special, 'is_redirected', lambda: False) - monkeypatch.setattr(main.special, 'is_timing_enabled', lambda: False) - monkeypatch.setattr(main.special, 'write_tee', lambda *args, **kwargs: None) - monkeypatch.setattr(main.special, 'unset_once_if_written', lambda *args, **kwargs: None) - monkeypatch.setattr(main.special, 'flush_pipe_once_if_written', lambda *args, **kwargs: None) - monkeypatch.setattr(main.special, 'close_tee', lambda: None) - monkeypatch.setattr(mycli.main_modes.repl, 'is_redirect_command', lambda text: False) - monkeypatch.setattr(main, 'confirm_destructive_query', lambda keywords, text: None) - monkeypatch.setattr(mycli.main_modes.repl, 'need_completion_refresh', lambda text: text == 'dropdb') - monkeypatch.setattr(mycli.main_modes.repl, 'need_completion_reset', lambda text: True) - monkeypatch.setattr(mycli.main_modes.repl, 'is_dropping_database', lambda text, dbname: text == 'dropdb') - - main.MyCli.run_cli(cli) - assert reconnect_calls == ['', ''] - assert any('bad op' in line for line in echoes) - assert any('boom' in line for line in echoes) - assert 'Not Yet Implemented.' in echoes - assert sqlexecute.dbname is None - assert refresh_calls == [True] - - -def test_run_cli_additional_interrupt_empty_and_cancel_paths(monkeypatch: pytest.MonkeyPatch) -> None: - cli = make_bare_mycli() - cli.config = {'history_file': '~/.mycli-history-testing'} - cli.log_query = lambda text: None # type: ignore[assignment] - cli.log_output = lambda text: None # type: ignore[assignment] - cli.set_all_external_titles = lambda: None # type: ignore[assignment] - cli.output = lambda formatted, result, is_warnings_style=False: None # type: ignore[assignment] - cli.format_sqlresult = lambda result, **kwargs: iter([result.status_plain or 'row']) # type: ignore[assignment] - cli.handle_editor_command = lambda text, inputhook, loaded_message_fn: text # type: ignore[assignment] - cli.handle_clip_command = lambda text: False # type: ignore[assignment] - cli.llm_prompt_field_truncate = 0 - cli.llm_prompt_section_truncate = 0 - echoes: list[str] = [] - cli.echo = lambda message, **kwargs: echoes.append(str(message)) # type: ignore[assignment] - prompt_session = FakePromptSession( - responses=[ - KeyboardInterrupt(), - ' ', - '\\llm stop', - 'cancel-ok', - 'cancel-missing-id', - 'eof-run', - ] - ) - - class FakeRunSQLExecute: - def __init__(self) -> None: - self.server_info = SimpleNamespace(species=SimpleNamespace(name='MySQL')) - self.dbname = 'db' - self.connection_id = 0 - self.conn = SimpleNamespace(cursor=lambda: 'cursor') - self.host = 'localhost' - self.port = 3306 - self.user = 'root' - - def connect(self) -> None: - return None - - def run(self, text: str) -> Iterator[SQLResult]: - if text == 'cancel-ok': - self.connection_id = 7 - raise KeyboardInterrupt() - if text == 'kill 7': - return iter([SQLResult(status='OK')]) - if text == 'cancel-missing-id': - self.connection_id = 0 - raise KeyboardInterrupt() - if text == 'eof-run': - raise EOFError() - return iter([SQLResult(status=f'ok:{text}')]) - - monkeypatch.setattr(mycli.main_modes.repl, 'PromptSession', lambda **kwargs: prompt_session) - monkeypatch.setattr(mycli.main_modes.repl, 'mycli_bindings', lambda mycli: 'bindings') - monkeypatch.setattr(mycli.main_modes.repl, 'create_toolbar_tokens_func', lambda *args: 'toolbar') - monkeypatch.setattr(main, 'style_factory_ptoolkit', lambda *args, **kwargs: 'style') - monkeypatch.setattr(main, 'dir_path_exists', lambda path: True) - monkeypatch.setattr(mycli.main_modes.repl, 'dir_path_exists', lambda path: True) - monkeypatch.setattr(mycli.main_modes.repl, 'cli_is_multiline', lambda mycli: False) - monkeypatch.setattr(main.special, 'set_expanded_output', lambda value: None) - monkeypatch.setattr(main.special, 'set_forced_horizontal_output', lambda value: None) - monkeypatch.setattr(main.special, 'is_expanded_output', lambda: False) - monkeypatch.setattr(main.special, 'is_redirected', lambda: False) - monkeypatch.setattr(main.special, 'is_timing_enabled', lambda: False) - monkeypatch.setattr(main.special, 'write_tee', lambda *args, **kwargs: None) - monkeypatch.setattr(main.special, 'unset_once_if_written', lambda *args, **kwargs: None) - monkeypatch.setattr(main.special, 'flush_pipe_once_if_written', lambda *args, **kwargs: None) - monkeypatch.setattr(main.special, 'close_tee', lambda: None) - monkeypatch.setattr(mycli.main_modes.repl, 'is_redirect_command', lambda text: False) - monkeypatch.setattr(main, 'confirm_destructive_query', lambda keywords, text: None) - monkeypatch.setattr(mycli.main_modes.repl, 'need_completion_refresh', lambda text: False) - monkeypatch.setattr(mycli.main_modes.repl, 'is_dropping_database', lambda text, dbname: False) - monkeypatch.setattr(main.special, 'is_llm_command', lambda text: text.startswith('\\llm')) - monkeypatch.setattr(main.special, 'handle_llm', lambda *args, **kwargs: (_ for _ in ()).throw(KeyboardInterrupt())) - monkeypatch.setattr(main, 'SQLExecute', FakeRunSQLExecute) - cli.sqlexecute = cast(Any, FakeRunSQLExecute()) - main.MyCli.run_cli(cli) - assert 'Cancelled query id: 7' in echoes - assert 'Did not get a connection id, skip cancelling query' in echoes - - -def test_run_cli_interface_and_operational_reconnect_false(monkeypatch: pytest.MonkeyPatch) -> None: - cli = make_bare_mycli() - cli.config = {'history_file': '~/.mycli-history-testing'} - cli.log_query = lambda text: None # type: ignore[assignment] - cli.log_output = lambda text: None # type: ignore[assignment] - cli.set_all_external_titles = lambda: None # type: ignore[assignment] - cli.output = lambda formatted, result, is_warnings_style=False: None # type: ignore[assignment] - cli.format_sqlresult = lambda result, **kwargs: iter([result.status_plain or 'row']) # type: ignore[assignment] - cli.handle_editor_command = lambda text, inputhook, loaded_message_fn: text # type: ignore[assignment] - cli.handle_clip_command = lambda text: False # type: ignore[assignment] - cli.reconnect = lambda database='': False # type: ignore[assignment] - prompt_session = FakePromptSession(responses=['iface', 'oplost', EOFError()]) - - class FakeRunSQLExecute: - def __init__(self) -> None: - self.server_info = SimpleNamespace(species=SimpleNamespace(name='MySQL')) - self.dbname = 'db' - self.connection_id = 0 - self.host = 'localhost' - self.port = 3306 - self.user = 'root' - - def run(self, text: str) -> Iterator[SQLResult]: - if text == 'iface': - raise pymysql.err.InterfaceError() - raise pymysql.OperationalError(2003, 'lost') - - monkeypatch.setattr(mycli.main_modes.repl, 'PromptSession', lambda **kwargs: prompt_session) - monkeypatch.setattr(mycli.main_modes.repl, 'mycli_bindings', lambda mycli: 'bindings') - monkeypatch.setattr(mycli.main_modes.repl, 'create_toolbar_tokens_func', lambda *args: 'toolbar') - monkeypatch.setattr(main, 'style_factory_ptoolkit', lambda *args, **kwargs: 'style') - monkeypatch.setattr(main, 'dir_path_exists', lambda path: True) - monkeypatch.setattr(mycli.main_modes.repl, 'dir_path_exists', lambda path: True) - monkeypatch.setattr(mycli.main_modes.repl, 'cli_is_multiline', lambda mycli: False) - monkeypatch.setattr(main.special, 'set_expanded_output', lambda value: None) - monkeypatch.setattr(main.special, 'set_forced_horizontal_output', lambda value: None) - monkeypatch.setattr(main.special, 'is_llm_command', lambda text: False) - monkeypatch.setattr(main.special, 'is_expanded_output', lambda: False) - monkeypatch.setattr(main.special, 'is_redirected', lambda: False) - monkeypatch.setattr(main.special, 'is_timing_enabled', lambda: False) - monkeypatch.setattr(main.special, 'write_tee', lambda *args, **kwargs: None) - monkeypatch.setattr(main.special, 'unset_once_if_written', lambda *args, **kwargs: None) - monkeypatch.setattr(main.special, 'flush_pipe_once_if_written', lambda *args, **kwargs: None) - monkeypatch.setattr(main.special, 'close_tee', lambda: None) - monkeypatch.setattr(mycli.main_modes.repl, 'is_redirect_command', lambda text: False) - monkeypatch.setattr(main, 'confirm_destructive_query', lambda keywords, text: None) - monkeypatch.setattr(mycli.main_modes.repl, 'need_completion_refresh', lambda text: False) - monkeypatch.setattr(mycli.main_modes.repl, 'is_dropping_database', lambda text, dbname: False) - monkeypatch.setattr(main, 'SQLExecute', FakeRunSQLExecute) - cli.sqlexecute = cast(Any, FakeRunSQLExecute()) - main.MyCli.run_cli(cli) - - -def test_run_cli_watch_beep_auto_vertical_and_cancel_failure_paths(monkeypatch: pytest.MonkeyPatch) -> None: - cli = make_bare_mycli() - cli.config = {'history_file': '~/.mycli-history-testing'} - cli.auto_vertical_output = True - cli.beep_after_seconds = 0.1 - cli.log_query = lambda text: None # type: ignore[assignment] - cli.log_output = lambda text: None # type: ignore[assignment] - cli.set_all_external_titles = lambda: None # type: ignore[assignment] - cli.handle_editor_command = lambda text, inputhook, loaded_message_fn: text # type: ignore[assignment] - cli.handle_clip_command = lambda text: False # type: ignore[assignment] - echoes: list[str] = [] - cli.echo = lambda message, **kwargs: echoes.append(str(message)) # type: ignore[assignment] - recorded_widths: list[int | None] = [] - - def fake_format_watch(result: Any, **kwargs: Any) -> Iterator[str]: - recorded_widths.append(kwargs.get('max_width')) - return iter(['row']) - - cli.format_sqlresult = fake_format_watch # type: ignore[assignment] - cli.output = lambda formatted, result, is_warnings_style=False: None # type: ignore[assignment] - cli.output_timing = lambda timing, is_warnings_style=False: None # type: ignore[assignment] - prompt_session = FakePromptSession(responses=['watch good', 'cancel-fail', 'cancel-error', EOFError()], columns=91) - - class FakeRunSQLExecute: - def __init__(self) -> None: - self.server_info = SimpleNamespace(species=SimpleNamespace(name='MySQL')) - self.dbname = 'db' - self.connection_id = 0 - self.conn = SimpleNamespace() - self.host = 'localhost' - self.port = 3306 - self.user = 'root' - - def connect(self) -> None: - return None - - def run(self, text: str) -> Iterator[SQLResult]: - if text == 'watch good': - return iter([ - SQLResult(status='watch', command={'name': 'watch', 'seconds': '1'}), - SQLResult(status='watch', command={'name': 'watch', 'seconds': '1'}), - ]) - if text == 'cancel-fail': - self.connection_id = 8 - raise KeyboardInterrupt() - if text == 'kill 8': - return iter([SQLResult(status='failed')]) - if text == 'cancel-error': - self.connection_id = 9 - raise KeyboardInterrupt() - if text == 'kill 9': - raise RuntimeError('kill failed') - return iter([]) - - monkeypatch.setattr(main, 'SQLExecute', FakeRunSQLExecute) - cli.sqlexecute = cast(Any, FakeRunSQLExecute()) - monkeypatch.setattr(mycli.main_modes.repl, 'PromptSession', lambda **kwargs: prompt_session) - monkeypatch.setattr(mycli.main_modes.repl, 'mycli_bindings', lambda mycli: 'bindings') - monkeypatch.setattr(mycli.main_modes.repl, 'create_toolbar_tokens_func', lambda *args: 'toolbar') - monkeypatch.setattr(main, 'style_factory_ptoolkit', lambda *args, **kwargs: 'style') - monkeypatch.setattr(main, 'dir_path_exists', lambda path: True) - monkeypatch.setattr(mycli.main_modes.repl, 'dir_path_exists', lambda path: True) - monkeypatch.setattr(mycli.main_modes.repl, 'cli_is_multiline', lambda mycli: False) - monkeypatch.setattr(main.special, 'set_expanded_output', lambda value: None) - monkeypatch.setattr(main.special, 'set_forced_horizontal_output', lambda value: None) - monkeypatch.setattr(main.special, 'is_llm_command', lambda text: False) - monkeypatch.setattr(main.special, 'is_expanded_output', lambda: False) - monkeypatch.setattr(main.special, 'is_redirected', lambda: False) - monkeypatch.setattr(main.special, 'is_timing_enabled', lambda: False) - monkeypatch.setattr(main.special, 'write_tee', lambda *args, **kwargs: None) - monkeypatch.setattr(main.special, 'unset_once_if_written', lambda *args, **kwargs: None) - monkeypatch.setattr(main.special, 'flush_pipe_once_if_written', lambda *args, **kwargs: None) - monkeypatch.setattr(main.special, 'close_tee', lambda: None) - monkeypatch.setattr(mycli.main_modes.repl, 'is_redirect_command', lambda text: False) - monkeypatch.setattr(main, 'confirm_destructive_query', lambda keywords, text: None) - monkeypatch.setattr(mycli.main_modes.repl, 'need_completion_refresh', lambda text: False) - monkeypatch.setattr(mycli.main_modes.repl, 'is_dropping_database', lambda text, dbname: False) - monkeypatch.setattr(time, 'time', iter([0.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0]).__next__) - main.MyCli.run_cli(cli) - assert recorded_widths[:2] == [91, 91] - assert '' in echoes - assert prompt_session.output.bell_count >= 1 - assert any('Failed to confirm query cancellation' in line for line in echoes) - assert any('Encountered error while cancelling query' in line for line in echoes) - - -def test_run_cli_auto_vertical_uses_default_width_when_prompt_session_is_cleared(monkeypatch: pytest.MonkeyPatch) -> None: - cli = make_bare_mycli() - cli.config = {'history_file': '~/.mycli-history-testing'} - cli.auto_vertical_output = True - cli.log_query = lambda text: None # type: ignore[assignment] - cli.log_output = lambda text: None # type: ignore[assignment] - cli.set_all_external_titles = lambda: None # type: ignore[assignment] - cli.handle_editor_command = lambda text, inputhook, loaded_message_fn: text # type: ignore[assignment] - cli.handle_clip_command = lambda text: False # type: ignore[assignment] - widths: list[int | None] = [] - - def fake_format_default_width(result: Any, **kwargs: Any) -> Iterator[str]: - widths.append(kwargs.get('max_width')) - return iter(['row']) - - cli.format_sqlresult = fake_format_default_width # type: ignore[assignment] - prompt_session = FakePromptSession(responses=['select 1', EOFError()]) - cli.output = lambda formatted, result, is_warnings_style=False: setattr(cli, 'prompt_session', prompt_session) # type: ignore[assignment] - - class FakeRunSQLExecute: - def __init__(self) -> None: - self.server_info = SimpleNamespace(species=SimpleNamespace(name='MySQL')) - self.dbname = 'db' - self.connection_id = 0 - self.host = 'localhost' - self.port = 3306 - self.user = 'root' - - def run(self, text: str) -> Iterator[SQLResult]: - cli.prompt_session = None - return iter([SQLResult(status='ok')]) - - monkeypatch.setattr(main, 'SQLExecute', FakeRunSQLExecute) - cli.sqlexecute = cast(Any, FakeRunSQLExecute()) - monkeypatch.setattr(mycli.main_modes.repl, 'PromptSession', lambda **kwargs: prompt_session) - monkeypatch.setattr(mycli.main_modes.repl, 'mycli_bindings', lambda mycli: 'bindings') - monkeypatch.setattr(mycli.main_modes.repl, 'create_toolbar_tokens_func', lambda *args: 'toolbar') - monkeypatch.setattr(main, 'style_factory_ptoolkit', lambda *args, **kwargs: 'style') - monkeypatch.setattr(main, 'dir_path_exists', lambda path: True) - monkeypatch.setattr(mycli.main_modes.repl, 'dir_path_exists', lambda path: True) - monkeypatch.setattr(mycli.main_modes.repl, 'cli_is_multiline', lambda mycli: False) - monkeypatch.setattr(main.special, 'set_expanded_output', lambda value: None) - monkeypatch.setattr(main.special, 'set_forced_horizontal_output', lambda value: None) - monkeypatch.setattr(main.special, 'is_llm_command', lambda text: False) - monkeypatch.setattr(main.special, 'is_expanded_output', lambda: False) - monkeypatch.setattr(main.special, 'is_redirected', lambda: False) - monkeypatch.setattr(main.special, 'is_timing_enabled', lambda: False) - monkeypatch.setattr(main.special, 'write_tee', lambda *args, **kwargs: None) - monkeypatch.setattr(main.special, 'unset_once_if_written', lambda *args, **kwargs: None) - monkeypatch.setattr(main.special, 'flush_pipe_once_if_written', lambda *args, **kwargs: None) - monkeypatch.setattr(main.special, 'close_tee', lambda: None) - monkeypatch.setattr(mycli.main_modes.repl, 'is_redirect_command', lambda text: False) - monkeypatch.setattr(main, 'confirm_destructive_query', lambda keywords, text: None) - monkeypatch.setattr(mycli.main_modes.repl, 'need_completion_refresh', lambda text: False) - monkeypatch.setattr(mycli.main_modes.repl, 'is_dropping_database', lambda text, dbname: False) - main.MyCli.run_cli(cli) - assert widths == [main.DEFAULT_WIDTH]