gh-133447: Add basic color to `sqlite3` CLI (#133461)

This commit is contained in:
Stan Ulbrych 2025-05-10 08:59:01 +01:00 committed by GitHub
parent 116a9f9b37
commit 30b1d8f11d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 36 additions and 13 deletions

View File

@ -10,9 +10,10 @@
from argparse import ArgumentParser from argparse import ArgumentParser
from code import InteractiveConsole from code import InteractiveConsole
from textwrap import dedent from textwrap import dedent
from _colorize import get_theme, theme_no_color
def execute(c, sql, suppress_errors=True): def execute(c, sql, suppress_errors=True, theme=theme_no_color):
"""Helper that wraps execution of SQL code. """Helper that wraps execution of SQL code.
This is used both by the REPL and by direct execution from the CLI. This is used both by the REPL and by direct execution from the CLI.
@ -25,11 +26,15 @@ def execute(c, sql, suppress_errors=True):
for row in c.execute(sql): for row in c.execute(sql):
print(row) print(row)
except sqlite3.Error as e: except sqlite3.Error as e:
t = theme.traceback
tp = type(e).__name__ tp = type(e).__name__
try: try:
print(f"{tp} ({e.sqlite_errorname}): {e}", file=sys.stderr) tp += f" ({e.sqlite_errorname})"
except AttributeError: except AttributeError:
print(f"{tp}: {e}", file=sys.stderr) pass
print(
f"{t.type}{tp}{t.reset}: {t.message}{e}{t.reset}", file=sys.stderr
)
if not suppress_errors: if not suppress_errors:
sys.exit(1) sys.exit(1)
@ -37,10 +42,11 @@ def execute(c, sql, suppress_errors=True):
class SqliteInteractiveConsole(InteractiveConsole): class SqliteInteractiveConsole(InteractiveConsole):
"""A simple SQLite REPL.""" """A simple SQLite REPL."""
def __init__(self, connection): def __init__(self, connection, use_color=False):
super().__init__() super().__init__()
self._con = connection self._con = connection
self._cur = connection.cursor() self._cur = connection.cursor()
self._use_color = use_color
def runsource(self, source, filename="<input>", symbol="single"): def runsource(self, source, filename="<input>", symbol="single"):
"""Override runsource, the core of the InteractiveConsole REPL. """Override runsource, the core of the InteractiveConsole REPL.
@ -48,6 +54,8 @@ def runsource(self, source, filename="<input>", symbol="single"):
Return True if more input is needed; buffering is done automatically. Return True if more input is needed; buffering is done automatically.
Return False if input is a complete statement ready for execution. Return False if input is a complete statement ready for execution.
""" """
theme = get_theme(force_no_color=not self._use_color)
if not source or source.isspace(): if not source or source.isspace():
return False return False
if source[0] == ".": if source[0] == ".":
@ -61,12 +69,13 @@ def runsource(self, source, filename="<input>", symbol="single"):
case "": case "":
pass pass
case _ as unknown: case _ as unknown:
self.write("Error: unknown command or invalid arguments:" t = theme.traceback
f' "{unknown}".\n') self.write(f'{t.type}Error{t.reset}:{t.message} unknown'
f'command or invalid arguments: "{unknown}".\n{t.reset}')
else: else:
if not sqlite3.complete_statement(source): if not sqlite3.complete_statement(source):
return True return True
execute(self._cur, source) execute(self._cur, source, theme=theme)
return False return False
@ -113,17 +122,21 @@ def main(*args):
Each command will be run using execute() on the cursor. Each command will be run using execute() on the cursor.
Type ".help" for more information; type ".quit" or {eofkey} to quit. Type ".help" for more information; type ".quit" or {eofkey} to quit.
""").strip() """).strip()
sys.ps1 = "sqlite> "
sys.ps2 = " ... " theme = get_theme()
s = theme.syntax
sys.ps1 = f"{s.prompt}sqlite> {s.reset}"
sys.ps2 = f"{s.prompt} ... {s.reset}"
con = sqlite3.connect(args.filename, isolation_level=None) con = sqlite3.connect(args.filename, isolation_level=None)
try: try:
if args.sql: if args.sql:
# SQL statement provided on the command-line; execute it directly. # SQL statement provided on the command-line; execute it directly.
execute(con, args.sql, suppress_errors=False) execute(con, args.sql, suppress_errors=False, theme=theme)
else: else:
# No SQL provided; start the REPL. # No SQL provided; start the REPL.
console = SqliteInteractiveConsole(con) console = SqliteInteractiveConsole(con, use_color=True)
try: try:
import readline # noqa: F401 import readline # noqa: F401
except ImportError: except ImportError:

View File

@ -8,10 +8,11 @@
captured_stdout, captured_stdout,
captured_stderr, captured_stderr,
captured_stdin, captured_stdin,
force_not_colorized, force_not_colorized_test_class,
) )
@force_not_colorized_test_class
class CommandLineInterface(unittest.TestCase): class CommandLineInterface(unittest.TestCase):
def _do_test(self, *args, expect_success=True): def _do_test(self, *args, expect_success=True):
@ -37,7 +38,6 @@ def expect_failure(self, *args):
self.assertEqual(out, "") self.assertEqual(out, "")
return err return err
@force_not_colorized
def test_cli_help(self): def test_cli_help(self):
out = self.expect_success("-h") out = self.expect_success("-h")
self.assertIn("usage: ", out) self.assertIn("usage: ", out)
@ -69,6 +69,7 @@ def test_cli_on_disk_db(self):
self.assertIn("(0,)", out) self.assertIn("(0,)", out)
@force_not_colorized_test_class
class InteractiveSession(unittest.TestCase): class InteractiveSession(unittest.TestCase):
MEMORY_DB_MSG = "Connected to a transient in-memory database" MEMORY_DB_MSG = "Connected to a transient in-memory database"
PS1 = "sqlite> " PS1 = "sqlite> "
@ -190,6 +191,14 @@ def test_interact_on_disk_file(self):
out, _ = self.run_cli(TESTFN, commands=("SELECT count(t) FROM t;",)) out, _ = self.run_cli(TESTFN, commands=("SELECT count(t) FROM t;",))
self.assertIn("(0,)\n", out) self.assertIn("(0,)\n", out)
def test_color(self):
with unittest.mock.patch("_colorize.can_colorize", return_value=True):
out, err = self.run_cli(commands="TEXT\n")
self.assertIn("\x1b[1;35msqlite> \x1b[0m", out)
self.assertIn("\x1b[1;35m ... \x1b[0m\x1b", out)
out, err = self.run_cli(commands=("sel;",))
self.assertIn('\x1b[1;35mOperationalError (SQLITE_ERROR)\x1b[0m: '
'\x1b[35mnear "sel": syntax error\x1b[0m', err)
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()

View File

@ -0,0 +1 @@
Add basic color to :mod:`sqlite3` CLI interface.