gh-133490: Fix syntax highlighting for remote PDB (#133494)

This commit is contained in:
Matt Wozniski 2025-05-06 05:44:49 -04:00 committed by GitHub
parent 120c9d42f2
commit fd37f1a8ad
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 158 additions and 7 deletions

View File

@ -23,9 +23,9 @@
BUILTINS = {str(name) for name in dir(builtins) if not name.startswith('_')}
def THEME():
def THEME(**kwargs):
# Not cached: the user can modify the theme inside the interactive session.
return _colorize.get_theme().syntax
return _colorize.get_theme(**kwargs).syntax
class Span(NamedTuple):
@ -254,7 +254,10 @@ def is_soft_keyword_used(*tokens: TI | None) -> bool:
def disp_str(
buffer: str, colors: list[ColorSpan] | None = None, start_index: int = 0
buffer: str,
colors: list[ColorSpan] | None = None,
start_index: int = 0,
force_color: bool = False,
) -> tuple[CharBuffer, CharWidths]:
r"""Decompose the input buffer into a printable variant with applied colors.
@ -295,7 +298,7 @@ def disp_str(
# move past irrelevant spans
colors.pop(0)
theme = THEME()
theme = THEME(force_color=force_color)
pre_color = ""
post_color = ""
if colors and colors[0].span.start < start_index:

View File

@ -1069,7 +1069,7 @@ def handle_command_def(self, line):
def _colorize_code(self, code):
if self.colorize:
colors = list(_pyrepl.utils.gen_colors(code))
chars, _ = _pyrepl.utils.disp_str(code, colors=colors)
chars, _ = _pyrepl.utils.disp_str(code, colors=colors, force_color=True)
code = "".join(chars)
return code

View File

@ -3,6 +3,7 @@
import itertools
import json
import os
import re
import signal
import socket
import subprocess
@ -12,9 +13,9 @@
import threading
import unittest
import unittest.mock
from contextlib import closing, contextmanager, redirect_stdout, ExitStack
from contextlib import closing, contextmanager, redirect_stdout, redirect_stderr, ExitStack
from pathlib import Path
from test.support import is_wasi, os_helper, requires_subprocess, SHORT_TIMEOUT
from test.support import is_wasi, cpython_only, force_color, requires_subprocess, SHORT_TIMEOUT
from test.support.os_helper import temp_dir, TESTFN, unlink
from typing import Dict, List, Optional, Tuple, Union, Any
@ -1431,5 +1432,152 @@ def test_multi_line_commands(self):
self.assertIn("Function returned: 42", stdout)
self.assertEqual(process.returncode, 0)
def _supports_remote_attaching():
from contextlib import suppress
PROCESS_VM_READV_SUPPORTED = False
try:
from _remote_debugging import PROCESS_VM_READV_SUPPORTED
except ImportError:
pass
return PROCESS_VM_READV_SUPPORTED
@unittest.skipIf(not sys.is_remote_debug_enabled(), "Remote debugging is not enabled")
@unittest.skipIf(sys.platform != "darwin" and sys.platform != "linux" and sys.platform != "win32",
"Test only runs on Linux, Windows and MacOS")
@unittest.skipIf(sys.platform == "linux" and not _supports_remote_attaching(),
"Testing on Linux requires process_vm_readv support")
@cpython_only
@requires_subprocess()
class PdbAttachTestCase(unittest.TestCase):
def setUp(self):
# Create a server socket that will wait for the debugger to connect
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.bind(('127.0.0.1', 0)) # Let OS assign port
self.sock.listen(1)
self.port = self.sock.getsockname()[1]
self._create_script()
def _create_script(self, script=None):
# Create a file for subprocess script
script = textwrap.dedent(
f"""
import socket
import time
def foo():
return bar()
def bar():
return baz()
def baz():
x = 1
# Trigger attach
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('127.0.0.1', {self.port}))
sock.close()
count = 0
while x == 1 and count < 100:
count += 1
time.sleep(0.1)
return x
result = foo()
print(f"Function returned: {{result}}")
"""
)
self.script_path = TESTFN + "_connect_test.py"
with open(self.script_path, 'w') as f:
f.write(script)
def tearDown(self):
self.sock.close()
try:
unlink(self.script_path)
except OSError:
pass
def do_integration_test(self, client_stdin):
process = subprocess.Popen(
[sys.executable, self.script_path],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
self.addCleanup(process.stdout.close)
self.addCleanup(process.stderr.close)
# Wait for the process to reach our attachment point
self.sock.settimeout(10)
conn, _ = self.sock.accept()
conn.close()
client_stdin = io.StringIO(client_stdin)
client_stdout = io.StringIO()
client_stderr = io.StringIO()
self.addCleanup(client_stdin.close)
self.addCleanup(client_stdout.close)
self.addCleanup(client_stderr.close)
self.addCleanup(process.wait)
with (
unittest.mock.patch("sys.stdin", client_stdin),
redirect_stdout(client_stdout),
redirect_stderr(client_stderr),
unittest.mock.patch("sys.argv", ["pdb", "-p", str(process.pid)]),
):
try:
pdb.main()
except PermissionError:
self.skipTest("Insufficient permissions for remote execution")
process.wait()
server_stdout = process.stdout.read()
server_stderr = process.stderr.read()
if process.returncode != 0:
print("server failed")
print(f"server stdout:\n{server_stdout}")
print(f"server stderr:\n{server_stderr}")
self.assertEqual(process.returncode, 0)
return {
"client": {
"stdout": client_stdout.getvalue(),
"stderr": client_stderr.getvalue(),
},
"server": {
"stdout": server_stdout,
"stderr": server_stderr,
},
}
def test_attach_to_process_without_colors(self):
with force_color(False):
output = self.do_integration_test("ll\nx=42\n")
self.assertEqual(output["client"]["stderr"], "")
self.assertEqual(output["server"]["stderr"], "")
self.assertEqual(output["server"]["stdout"], "Function returned: 42\n")
self.assertIn("while x == 1", output["client"]["stdout"])
self.assertNotIn("\x1b", output["client"]["stdout"])
def test_attach_to_process_with_colors(self):
with force_color(True):
output = self.do_integration_test("ll\nx=42\n")
self.assertEqual(output["client"]["stderr"], "")
self.assertEqual(output["server"]["stderr"], "")
self.assertEqual(output["server"]["stdout"], "Function returned: 42\n")
self.assertIn("\x1b", output["client"]["stdout"])
self.assertNotIn("while x == 1", output["client"]["stdout"])
self.assertIn("while x == 1", re.sub("\x1b[^m]*m", "", output["client"]["stdout"]))
if __name__ == "__main__":
unittest.main()