tools/gdb: use iterator for list

Signed-off-by: xuxingliang <xuxingliang@xiaomi.com>
This commit is contained in:
xuxingliang 2024-09-16 20:42:45 +08:00 committed by Xiang Xiao
parent e7c2e7c576
commit 5a82e21edb
4 changed files with 127 additions and 82 deletions

View file

@ -20,6 +20,8 @@
#
############################################################################
import argparse
import gdb
import utils
@ -28,31 +30,95 @@ sq_queue_type = utils.lookup_type("sq_queue_t")
dq_queue_type = utils.lookup_type("dq_queue_t")
def list_for_each(head):
"""Iterate over a list"""
if head.type == list_node_type.pointer():
head = head.dereference()
elif head.type != list_node_type:
raise TypeError("Must be struct list_node not {}".format(head.type))
class NxList:
def __init__(self, list, container_type=None, member=None, reverse=False):
"""Initialize the list iterator. Optionally specify the container type and member name."""
if head["next"] == 0:
gdb.write(
"list_for_each: Uninitialized list '{}' treated as empty\n".format(
head.address
)
if not list:
raise ValueError("The head cannot be None.\n")
if list.type.code != gdb.TYPE_CODE_PTR:
list = list.address # Make sure list is a pointer.
if container_type and not member:
raise ValueError("Must specify the member name in container.\n")
self.list = list
self.reverse = reverse
self.container_type = container_type
self.member = member
self.current = self._get_first()
def _get_first(self):
"""Get the initial node based on the direction of traversal."""
prev = self.list["prev"]
next = self.list["next"]
first = prev if self.reverse else next
return first if first and first != self.list else None
def _get_next(self, node):
# for(node = (list)->next; node != (list); node = node->next)
return node["next"] if node["next"] != self.list else None
def _get_prev(self, node):
# for(node = (list)->next; node != (list); node = node->prev)
return node["prev"] if node["prev"] != self.list else None
def __iter__(self):
return self
def __next__(self):
if self.current is None:
raise StopIteration
node = self.current
self.current = self._get_prev(node) if self.reverse else self._get_next(node)
return (
utils.container_of(node, self.container_type, self.member)
if self.container_type
else node
)
return
node = head["next"].dereference()
while node.address != head.address:
yield node.address
node = node["next"].dereference()
def list_for_each_entry(head, gdbtype, member):
"""Iterate over a list of structs"""
for node in list_for_each(head):
yield utils.container_of(node, gdbtype, member)
class NxSQueue(NxList):
def __init__(self, list, container_type=None, member=None, reverse=False):
"""Initialize the singly linked list iterator. Optionally specify the container type and member name."""
if reverse:
raise ValueError(
"Reverse iteration is not supported for singly linked lists.\n"
)
super().__init__(list, container_type, member, reverse)
def _get_first(self):
# for ((p) = (q)->head; (p) != NULL; (p) = (p)->flink)
return self.list["head"] or None
def _get_next(self, node):
# if not node["flink"], then return None, to indicate end of list
return node["flink"] or None
class NxDQueue(NxList):
def __init__(self, list, container_type=None, member=None, reverse=False):
"""Initialize the doubly linked list iterator. Optionally specify the container type and member name."""
super().__init__(list, container_type, member, reverse)
def _get_first(self):
head = self.list["head"]
tail = self.list["tail"]
first = head if not self.reverse else tail
return first or None
def _get_next(self, node):
# for ((p) = (q)->head; (p) != NULL; (p) = (p)->flink)
return node["flink"] or None
def _get_prev(self, node):
# for ((p) = (q)->tail; (p) != NULL; (p) = (p)->blink)
return node["blink"] or None
def list_check(head):
@ -121,25 +187,6 @@ def list_check(head):
return
def sq_for_every(sq, entry=None):
"""Iterate over a singly linked list from the head or specified entry"""
if sq.type == sq_queue_type.pointer():
sq = sq.dereference()
elif sq.type != sq_queue_type:
gdb.write("Must be struct sq_queue not {}".format(sq.type))
return
if sq["head"] == 0:
return
if not entry:
entry = sq["head"].dereference()
while entry.address:
yield entry.address
entry = entry["flink"].dereference()
def sq_is_empty(sq):
"""Check if a singly linked list is empty"""
if sq.type == sq_queue_type.pointer():
@ -252,21 +299,20 @@ class ForeachListEntry(gdb.Command):
def invoke(self, arg, from_tty):
argv = gdb.string_to_argv(arg)
if len(argv) != 3:
gdb.write(
"list_for_every_entry takes three arguments" "head, type, member\n"
)
gdb.write("eg: list_for_every_entry &g_list 'struct type' 'node '\n")
parser = argparse.ArgumentParser(description="Iterate the items in list")
parser.add_argument("head", type=str, help="List head")
parser.add_argument("type", type=str, help="Container type")
parser.add_argument("member", type=str, help="Member name in container")
try:
args = parser.parse_args(argv)
except SystemExit:
gdb.write("Invalid arguments\n")
return
i = 0
for entry in list_for_each_entry(
gdb.parse_and_eval(argv[0]), gdb.lookup_type(argv[1]).pointer(), argv[2]
):
gdb.write(f"{i}: ({argv[1]} *){entry}\n")
gdb.execute(f"print *({argv[1]} *){entry}")
i += 1
ListCheck()
ForeachListEntry()
pointer = gdb.parse_and_eval(args.head)
container_type = gdb.lookup_type(args.type)
member = args.member
list = NxList(pointer, container_type, member)
for i, entry in enumerate(list):
entry = entry.dereference()
gdb.write(f"{i}: {entry.format_string(styling=True)}\n")

View file

@ -26,7 +26,7 @@ import time
import gdb
import utils
from lists import sq_for_every
from lists import NxSQueue
from utils import get_long_type, get_symbol_value, lookup_type, read_ulong
MM_ALLOC_BIT = 0x1
@ -227,7 +227,7 @@ def mempool_foreach(pool):
yield buf
nblk -= 1
for entry in sq_for_every(pool["equeue"]):
for entry in NxSQueue(pool["equeue"]):
nblk = (pool["expandsize"] - sq_entry_type.sizeof) / blocksize
base = int(entry) - nblk * blocksize
while nblk > 0:
@ -354,12 +354,12 @@ class Memdump(gdb.Command):
"""Dump the mempool memory"""
for pool in mempool_multiple_foreach(mpool):
if pid == PID_MM_FREE:
for entry in sq_for_every(pool["queue"]):
for entry in NxSQueue(pool["queue"]):
gdb.write("%12u%#*x\n" % (pool["blocksize"], self.align, entry))
self.aordblks += 1
self.uordblks += mempool_realblocksize(pool)
for entry in sq_for_every(pool["iqueue"]):
for entry in NxSQueue(pool["iqueue"]):
gdb.write("%12u%#*x\n" % (pool["blocksize"], self.align, entry))
self.aordblks += 1
self.uordblks += mempool_realblocksize(pool)

View file

@ -20,7 +20,7 @@
import gdb
import utils
from lists import dq_for_every, sq_for_every
from lists import NxDQueue, NxSQueue
socket = utils.import_check(
"socket", errmsg="No socket module found, please try gdb-multiarch instead.\n"
@ -66,29 +66,25 @@ def socket_for_each_entry(proto):
readahead = conn["readahead"]
"""
sock_gdbtype = gdb.lookup_type("struct socket_conn_s").pointer()
conn_gdbtype = gdb.lookup_type("struct %s_conn_s" % proto).pointer()
for node in dq_for_every(gdb.parse_and_eval("g_active_%s_connections" % proto)):
g_active_connections = gdb.parse_and_eval("g_active_%s_connections" % proto)
for node in NxDQueue(g_active_connections, "struct socket_conn_s", "node"):
# udp_conn_s::socket_conn_s sconn
yield utils.container_of(
utils.container_of(
node, sock_gdbtype, "node"
), # struct socket_conn_s::dq_entry_t node
conn_gdbtype,
node,
"struct %s_conn_s" % proto,
"sconn",
) # udp_conn_s::socket_conn_s sconn
)
def wrbuffer_inqueue_size(queue=None, protocol="tcp"):
"""Calculate the total size of all iob in the write queue of a udp connection"""
total = 0
if queue:
wrb_gdbtype = gdb.lookup_type("struct %s_wrbuffer_s" % protocol).pointer()
for entry in sq_for_every(queue):
entry = utils.container_of(entry, wrb_gdbtype, "wb_node")
total += entry["wb_iob"]["io_pktlen"]
return total
if not queue:
return 0
type = "struct %s_wrbuffer_s" % protocol
node = "wb_node"
return sum(entry["wb_iob"]["io_pktlen"] for entry in NxSQueue(queue, type, node))
def tcp_ofoseg_bufsize(conn):

View file

@ -81,15 +81,18 @@ def get_long_type():
return long_type
def offset_of(typeobj, field):
def offset_of(typeobj: gdb.Type, field: str) -> Union[int, None]:
"""Return the offset of a field in a structure"""
element = gdb.Value(0).cast(typeobj)
return int(str(element[field].address).split()[0], 16)
for f in typeobj.fields():
if f.name == field:
return f.bitpos // 8 if f.bitpos is not None else None
return None
def container_of(ptr, typeobj, member):
def container_of(ptr: gdb.Value, typeobj: gdb.Type, member: str) -> gdb.Value:
"""Return pointer to containing data structure"""
return (ptr.cast(get_long_type()) - offset_of(typeobj, member)).cast(typeobj)
return gdb.Value(ptr.address - offset_of(typeobj, member)).cast(typeobj.pointer())
class ContainerOf(gdb.Function):