246 lines
7.3 KiB
Python
246 lines
7.3 KiB
Python
#!/usr/bin/python
|
|
|
|
import sys
|
|
import os
|
|
import pprint
|
|
import struct
|
|
|
|
|
|
# Change path so we find Xlib
|
|
sys.path.append(os.path.join(os.path.dirname(__file__), '..'))
|
|
|
|
def dummy_buffer(str, x, y = sys.maxint):
|
|
return str[x:y]
|
|
|
|
__builtins__.buffer = dummy_buffer
|
|
|
|
from Xlib.protocol import display, request, rq, event
|
|
from Xlib import error
|
|
|
|
# We don't want any fancy dictwrapper, just plain mappings
|
|
rq.DictWrapper = lambda x: x
|
|
|
|
class BugFile(object):
|
|
def __init__(self, file):
|
|
self.file = file
|
|
self.cbuf = self.sbuf = ''
|
|
|
|
def read_client(self, bytes):
|
|
while len(self.cbuf) < bytes and self.file:
|
|
self.read_next()
|
|
|
|
d = self.cbuf[:bytes]
|
|
self.cbuf = self.cbuf[bytes:]
|
|
|
|
return d
|
|
|
|
def read_server(self, bytes):
|
|
while len(self.sbuf) < bytes and self.file:
|
|
self.read_next()
|
|
|
|
d = self.sbuf[:bytes]
|
|
self.sbuf = self.sbuf[bytes:]
|
|
|
|
return d
|
|
|
|
def read_next(self):
|
|
line = self.file.readline()
|
|
if line == '':
|
|
self.file = None
|
|
return
|
|
|
|
src = line[0]
|
|
length = int(line[1:-1])
|
|
data = self.file.read(length)
|
|
if src == 'C':
|
|
self.cbuf = self.cbuf + data
|
|
elif src == 'S':
|
|
self.sbuf = self.sbuf + data
|
|
else:
|
|
raise ValueError('Bad control line: %s' % line)
|
|
|
|
|
|
class ParseString(object):
|
|
def __init__(self, datafunc):
|
|
self.get_data = datafunc
|
|
self.data = ''
|
|
|
|
def __getitem__(self, key):
|
|
if not isinstance(key, slice):
|
|
if key < 0:
|
|
raise ValueError('bad string index: {0}'.format(key))
|
|
|
|
if len(self.data) <= key:
|
|
if not self.get_data:
|
|
raise RuntimeError('attempt to allocate more data after returning a new ParseString')
|
|
|
|
self.data = self.data + self.get_data(key - len(self.data) + 1)
|
|
|
|
return self.data[key]
|
|
|
|
else:
|
|
# replacement of __getslice__
|
|
i = key.start
|
|
j = key.stop
|
|
if j == sys.maxint:
|
|
if self.get_data:
|
|
ps = ParseString(self.get_data)
|
|
self.get_data = None
|
|
return ps
|
|
else:
|
|
raise RuntimeError('attempt to allocate another ParseString')
|
|
|
|
|
|
if i < 0 or j < 0 or i > j:
|
|
raise ValueError('bad slice indices: [%d:%d]' % (i, j))
|
|
|
|
if len(self.data) < j:
|
|
if not self.get_data:
|
|
raise RuntimeError('attempt to allocate more data after returning a new ParseString')
|
|
|
|
self.data = self.data + self.get_data(j - len(self.data))
|
|
|
|
return self.data[i:j]
|
|
|
|
class DummyDisplay(object):
|
|
def get_resource_class(self, name):
|
|
return None
|
|
|
|
class ParseXbug(object):
|
|
def __init__(self, infile = sys.stdin, outfile = sys.stdout):
|
|
bf = BugFile(infile)
|
|
self.cdata = ParseString(bf.read_client)
|
|
sdata = ParseString(bf.read_server)
|
|
|
|
self.outfile = outfile
|
|
self.xpprint = pprint.PrettyPrinter(indent = 2, stream = outfile)
|
|
|
|
self.disp = DummyDisplay()
|
|
|
|
# Parse client setup request
|
|
r, self.cdata = display.ConnectionSetupRequest._request.parse_binary(self.cdata, self.disp)
|
|
|
|
self.print_xbug('request', 'ConnectionSetup', r)
|
|
|
|
# Parse server reply
|
|
r, sdata = display.ConnectionSetupRequest._reply.parse_binary(sdata, self.disp)
|
|
|
|
extra = r['additional_length'] * 4
|
|
del r['additional_length']
|
|
|
|
extradata = sdata[:extra]
|
|
sdata = sdata[extra:]
|
|
|
|
if r['status'] == 0:
|
|
r['reason'] = extradata[:r['reason_length']]
|
|
del r['status']
|
|
del r['reason_length']
|
|
self.print_xbug('error', 'ConnectionSetup', r)
|
|
return
|
|
|
|
elif r['status'] == 1:
|
|
r2, d = display.ConnectionSetupRequest._success_reply.parse_binary(extradata, self.disp)
|
|
del r['status']
|
|
del r['reason_length']
|
|
r.update(r2)
|
|
del r2
|
|
self.print_xbug('reply', 'ConnectionSetup', r)
|
|
|
|
else:
|
|
raise ValueError('bad connection setup reply status: %d' % r['status'])
|
|
|
|
|
|
self.last_serial = 0
|
|
self.last_request = None
|
|
|
|
while 1:
|
|
# Get next server item, always at least 32 bytes
|
|
d = sdata[:32]
|
|
if len(d) != 32:
|
|
# Print out remaining requests
|
|
try:
|
|
self.get_requests(sys.maxint)
|
|
except ValueError:
|
|
pass
|
|
return
|
|
|
|
sdata = sdata[32:]
|
|
|
|
# Check type
|
|
t = ord(d[0])
|
|
|
|
# Error
|
|
if t == 0:
|
|
# Code is second byte
|
|
code = ord(d[1])
|
|
# Fetch error class
|
|
estruct = error.xerror_class.get(code, error.XError)
|
|
r, d = estruct._fields.parse_binary(d, self.disp)
|
|
del r['type']
|
|
|
|
self.get_requests(r['sequence_number'])
|
|
self.print_xbug('error', estruct.__name__, r)
|
|
|
|
# Reply
|
|
elif t == 1:
|
|
# Get sequence number, and read corresponding request
|
|
sno = struct.unpack('=H', d[2:4])[0]
|
|
self.get_requests(sno)
|
|
|
|
# Get entire reply length
|
|
rlen = int(struct.unpack('=L', d[4:8])[0]) * 4
|
|
d = d + sdata[:rlen]
|
|
sdata = sdata[rlen:]
|
|
|
|
if self.last_request:
|
|
r, d = self.last_request._reply.parse_binary(d, self.disp)
|
|
self.print_xbug('reply', self.last_request.__name__, r)
|
|
else:
|
|
self.print_xbug('reply', 'Unknown',
|
|
{ 'sequence_number': sno })
|
|
|
|
# Some event
|
|
else:
|
|
estruct = event.event_class.get(t, event.AnyEvent)
|
|
r, d = estruct._fields.parse_binary(d, self.disp)
|
|
|
|
self.get_requests(r['sequence_number'])
|
|
self.print_xbug('event', estruct.__name__, r)
|
|
|
|
def get_requests(self, serial):
|
|
# Get request length
|
|
while self.last_serial < serial:
|
|
d = self.cdata[2:4]
|
|
if len(d) != 2:
|
|
raise ValueError('client request missing')
|
|
|
|
rlen = struct.unpack('=H', d)[0] * 4
|
|
d = self.cdata[:rlen]
|
|
if len(d) != rlen:
|
|
raise ValueError('client request missing')
|
|
|
|
self.cdata = self.cdata[rlen:]
|
|
|
|
opcode = ord(d[0])
|
|
self.last_request = request.major_codes.get(opcode)
|
|
self.last_serial = self.last_serial + 1
|
|
|
|
if self.last_request:
|
|
r, d = self.last_request._request.parse_binary(d, self.disp)
|
|
r['sequence_number'] = self.last_serial
|
|
self.print_xbug('request', self.last_request.__name__, r)
|
|
|
|
else:
|
|
self.print_xbug('request', 'Unknown (%d)' % opcode,
|
|
{ 'sequence_number': self.last_serial })
|
|
|
|
|
|
|
|
def print_xbug(self, rtype, name, data):
|
|
self.outfile.write('%-8s %s\n' % (rtype + ':', name))
|
|
self.xpprint.pprint(data)
|
|
self.outfile.write('\n')
|
|
|
|
if __name__ == '__main__':
|
|
ParseXbug()
|