[3.13] Add tests for Bluetooth RFCOMM, HCI and SCO (GH-132023) (GH-132071)

(cherry picked from commit 2ccd6aae4d)
This commit is contained in:
Serhiy Storchaka 2025-04-05 15:01:35 +03:00 committed by GitHub
parent 9de2823b5a
commit a5ed92025b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 124 additions and 0 deletions

View File

@ -1,4 +1,5 @@
import unittest
from unittest import mock
from test import support
from test.support import (
is_apple, os_helper, refleak_helper, socket_helper, threading_helper
@ -179,6 +180,17 @@ def _have_socket_bluetooth():
return True
def _have_socket_bluetooth_l2cap():
"""Check whether BTPROTO_L2CAP sockets are supported on this host."""
try:
s = socket.socket(socket.AF_BLUETOOTH, socket.SOCK_SEQPACKET, socket.BTPROTO_L2CAP)
except (AttributeError, OSError):
return False
else:
s.close()
return True
def _have_socket_hyperv():
"""Check whether AF_HYPERV sockets are supported on this host."""
try:
@ -239,6 +251,8 @@ def downgrade_malformed_data_warning():
HAVE_SOCKET_BLUETOOTH = _have_socket_bluetooth()
HAVE_SOCKET_BLUETOOTH_L2CAP = _have_socket_bluetooth_l2cap()
HAVE_SOCKET_HYPERV = _have_socket_hyperv()
# Size in bytes of the int type
@ -2622,6 +2636,116 @@ def testCreateScoSocket(self):
with socket.socket(socket.AF_BLUETOOTH, socket.SOCK_SEQPACKET, socket.BTPROTO_SCO) as s:
pass
@unittest.skipUnless(HAVE_SOCKET_BLUETOOTH_L2CAP, 'Bluetooth L2CAP sockets required for this test')
def testBindBrEdrL2capSocket(self):
with socket.socket(socket.AF_BLUETOOTH, socket.SOCK_SEQPACKET, socket.BTPROTO_L2CAP) as f:
# First user PSM in BR/EDR L2CAP
psm = 0x1001
f.bind((socket.BDADDR_ANY, psm))
addr = f.getsockname()
self.assertEqual(addr, (socket.BDADDR_ANY, psm))
@unittest.skipUnless(HAVE_SOCKET_BLUETOOTH_L2CAP, 'Bluetooth L2CAP sockets required for this test')
def testBadL2capAddr(self):
with socket.socket(socket.AF_BLUETOOTH, socket.SOCK_SEQPACKET, socket.BTPROTO_L2CAP) as f:
with self.assertRaises(OSError):
f.bind((socket.BDADDR_ANY, 0, 0))
with self.assertRaises(OSError):
f.bind((socket.BDADDR_ANY,))
with self.assertRaises(OSError):
f.bind(socket.BDADDR_ANY)
with self.assertRaises(OSError):
f.bind((socket.BDADDR_ANY.encode(), 0x1001))
def testBindRfcommSocket(self):
with socket.socket(socket.AF_BLUETOOTH, socket.SOCK_STREAM, socket.BTPROTO_RFCOMM) as s:
channel = 0
try:
s.bind((socket.BDADDR_ANY, channel))
except OSError as err:
if sys.platform == 'win32' and err.winerror == 10050:
self.skipTest(str(err))
raise
addr = s.getsockname()
self.assertEqual(addr, (mock.ANY, channel))
self.assertRegex(addr[0], r'(?i)[0-9a-f]{2}(?::[0-9a-f]{2}){4}')
if sys.platform != 'win32':
self.assertEqual(addr, (socket.BDADDR_ANY, channel))
with socket.socket(socket.AF_BLUETOOTH, socket.SOCK_STREAM, socket.BTPROTO_RFCOMM) as s:
s.bind(addr)
addr2 = s.getsockname()
self.assertEqual(addr2, addr)
def testBadRfcommAddr(self):
with socket.socket(socket.AF_BLUETOOTH, socket.SOCK_STREAM, socket.BTPROTO_RFCOMM) as s:
channel = 0
with self.assertRaises(OSError):
s.bind((socket.BDADDR_ANY.encode(), channel))
with self.assertRaises(OSError):
s.bind((socket.BDADDR_ANY,))
with self.assertRaises(OSError):
s.bind((socket.BDADDR_ANY, channel, 0))
with self.assertRaises(OSError):
s.bind((socket.BDADDR_ANY + '\0', channel))
with self.assertRaises(OSError):
s.bind(('invalid', channel))
@unittest.skipUnless(hasattr(socket, 'BTPROTO_HCI'), 'Bluetooth HCI sockets required for this test')
def testBindHciSocket(self):
with socket.socket(socket.AF_BLUETOOTH, socket.SOCK_RAW, socket.BTPROTO_HCI) as s:
if sys.platform.startswith(('netbsd', 'dragonfly', 'freebsd')):
s.bind(socket.BDADDR_ANY.encode())
addr = s.getsockname()
self.assertEqual(addr, socket.BDADDR_ANY)
else:
dev = 0
s.bind((dev,))
addr = s.getsockname()
self.assertEqual(addr, dev)
@unittest.skipUnless(hasattr(socket, 'BTPROTO_HCI'), 'Bluetooth HCI sockets required for this test')
def testBadHciAddr(self):
with socket.socket(socket.AF_BLUETOOTH, socket.SOCK_RAW, socket.BTPROTO_HCI) as s:
if sys.platform.startswith(('netbsd', 'dragonfly', 'freebsd')):
with self.assertRaises(OSError):
s.bind(socket.BDADDR_ANY)
with self.assertRaises(OSError):
s.bind((socket.BDADDR_ANY.encode(),))
if sys.platform.startswith('freebsd'):
with self.assertRaises(ValueError):
s.bind(socket.BDADDR_ANY.encode() + b'\0')
with self.assertRaises(ValueError):
s.bind(socket.BDADDR_ANY.encode() + b' '*100)
with self.assertRaises(OSError):
s.bind(b'invalid')
else:
dev = 0
with self.assertRaises(OSError):
s.bind(())
with self.assertRaises(OSError):
s.bind((dev, 0))
with self.assertRaises(OSError):
s.bind(dev)
with self.assertRaises(OSError):
s.bind(socket.BDADDR_ANY.encode())
@unittest.skipUnless(hasattr(socket, 'BTPROTO_SCO'), 'Bluetooth SCO sockets required for this test')
def testBindScoSocket(self):
with socket.socket(socket.AF_BLUETOOTH, socket.SOCK_SEQPACKET, socket.BTPROTO_SCO) as s:
s.bind(socket.BDADDR_ANY.encode())
addr = s.getsockname()
self.assertEqual(addr, socket.BDADDR_ANY)
@unittest.skipUnless(hasattr(socket, 'BTPROTO_SCO'), 'Bluetooth SCO sockets required for this test')
def testBadScoAddr(self):
with socket.socket(socket.AF_BLUETOOTH, socket.SOCK_SEQPACKET, socket.BTPROTO_SCO) as s:
with self.assertRaises(OSError):
s.bind(socket.BDADDR_ANY)
with self.assertRaises(OSError):
s.bind((socket.BDADDR_ANY.encode(),))
with self.assertRaises(OSError):
s.bind(b'invalid')
@unittest.skipUnless(HAVE_SOCKET_HYPERV,
'Hyper-V sockets required for this test.')