diff --git a/tools/pynuttx/nxgdb/circbuf.py b/tools/pynuttx/nxgdb/circbuf.py new file mode 100644 index 0000000000..3f0750ba4a --- /dev/null +++ b/tools/pynuttx/nxgdb/circbuf.py @@ -0,0 +1,195 @@ +############################################################################ +# tools/pynuttx/nxgdb/circbuf.py +# +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. The +# ASF licenses this file to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance with the +# License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +############################################################################ + +import argparse +from typing import Generator + +import gdb + +from . import utils +from .protocols import circbuf as p + + +class CircBuf(utils.Value, p.CircBuf): + def __init__( + self, obj: gdb.Value | utils.Value, datatype: gdb.Type | None = None + ) -> None: + circbuf_s = utils.lookup_type("struct circbuf_s") + if obj.type.code == gdb.TYPE_CODE_INT: + obj = obj.cast(circbuf_s.pointer()) + + if obj.type.code == gdb.TYPE_CODE_PTR: + obj.cast(circbuf_s.pointer()) + obj = obj.dereference() + + super().__init__(obj) + + # datatype must not be a pointer, because we are going to construct value from memory + if not datatype: + datatype = utils.lookup_type("char") + + if isinstance(datatype, str): + datatype = utils.lookup_type(datatype) + + if datatype.code == gdb.TYPE_CODE_PTR: + datatype = datatype.target() + + self.datatype = datatype + + def __str__(self) -> str: + return ( + f"(struct circbuf_s *){hex(self.address)} base: {self.base} " + f"size: {self.size} head: {self.head} tail: {self.tail}" + ) + + @property + def size(self) -> int: + return int(self["size"]) + + @property + def used(self) -> int: + return int(self["head"]) - int(self["tail"]) + + @property + def space(self) -> int: + return self.size - self.used + + @property + def is_inited(self) -> bool: + return bool(self["base"]) + + @property + def is_empty(self) -> bool: + return self.used == 0 + + @property + def is_full(self) -> bool: + return not self.space + + def _peekat(self, pos, len) -> memoryview: + if len > self.size: + return None + + pos = pos % self.size + total = len + if pos + len > self.size: + len = self.size - pos + + memory = gdb.selected_inferior().read_memory(self.base + pos, len) + if len < total: + memory += gdb.selected_inferior().read_memory(self.base, total - len) + return memory + + @property + def history(self) -> Generator[utils.Value, None, None]: + """Iterate over the history data in the circbuf_s, from oldest to newest""" + if not self.base or not self.size: + # Uninitialized buffer + return [] + + head = int(self.head) + size = int(self.size) + sizeof = self.datatype.sizeof + + if head < size: + # The buffer is never wrapped, read from the beginning + offset = 0 + end = head + else: + # The buffer is wrapped, read from the head + offset = head % size + end = offset + size + + while offset < end: + memory = self._peekat(offset, sizeof) + value = gdb.Value(memory, self.datatype) + yield value + offset += sizeof + + @property + def unread(self) -> Generator[utils.Value, None, None]: + """Return all unread data in circle buffer""" + if not self.base or not self.size: + return [] + + # Read from tail towards head for all data. + tail = int(self.tail) + head = int(self.head) + sizeof = self.datatype.sizeof + offset = tail + while offset < head: + memory = self._peekat(offset, sizeof) + yield gdb.Value(memory, self.datatype) + offset += sizeof + + +class CircBufInfo(gdb.Command): + """Print circbuf_s information""" + + def __init__(self): + super().__init__("circbuf", gdb.COMMAND_USER) + + def invoke(self, arg: str, from_tty: bool) -> None: + parser = argparse.ArgumentParser(description="Dump circle buffer information") + parser.add_argument( + "--type", + type=str, + help="The data type the circbuf_s contains", + default=None, + ) + parser.add_argument( + "--history", + action="store_true", + help="Dump the history data in the circbuf_s", + ) + parser.add_argument( + "--unread", + action="store_true", + help="Dump the unread data in the circbuf_s", + ) + parser.add_argument( + "address", + type=str, + help="The address of the circubuf_s", + ) + + try: + args = parser.parse_args(gdb.string_to_argv(arg)) + except SystemExit: + gdb.write("Invalid arguments\n") + return + + entry = utils.parse_and_eval(args.address) + circbuf = CircBuf(entry, datatype=args.type) + + print(circbuf) # Dump buffer basic information + + if args.history: + dumpdata = circbuf.history + elif args.unread: + dumpdata = circbuf.unread + else: + dumpdata = [] + + print(f"Dumping data with type {args.type}") + for i, data in enumerate(dumpdata): + print(f"{i}: {data.format_string(styling=True)}") diff --git a/tools/pynuttx/nxgdb/protocols/circbuf.py b/tools/pynuttx/nxgdb/protocols/circbuf.py new file mode 100644 index 0000000000..85b3b68819 --- /dev/null +++ b/tools/pynuttx/nxgdb/protocols/circbuf.py @@ -0,0 +1,35 @@ +############################################################################ +# tools/pynuttx/nxgdb/protocols/circbuf.py +# +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. The +# ASF licenses this file to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance with the +# License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +############################################################################ + +from __future__ import annotations + +from .value import Value + + +class CircBuf(Value): + """struct circbuf_s""" + + base: Value + size: Value + head: Value + tail: Value + external: Value diff --git a/tools/pynuttx/nxgdb/protocols/uorb.py b/tools/pynuttx/nxgdb/protocols/uorb.py new file mode 100644 index 0000000000..49740a6e7e --- /dev/null +++ b/tools/pynuttx/nxgdb/protocols/uorb.py @@ -0,0 +1,103 @@ +############################################################################ +# tools/pynuttx/nxgdb/protocols/uorb.py +# +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. The +# ASF licenses this file to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance with the +# License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +############################################################################ + +from __future__ import annotations + +from .value import Value + + +class OrbMetadata(Value): + """struct orb_metadata_s""" + + o_name: Value + o_size: Value + o_format: Value + + +class SensorMeta(Value): + """struct sensor_meta_s""" + + esize: Value + name: Value + + +class SensorState(Value): + """struct sensor_state_s""" + + esize: Value + nbuffer: Value + min_latency: Value + min_interval: Value + nsubscribers: Value + nadvertisers: Value + generation: Value + priv: Value + + +class SensorUState(Value): + """struct sensor_ustate_s""" + + esize: Value + latency: Value + interval: Value + generation: Value + + +class SensorUpper(Value): + """struct sensor_upperhalf_s""" + + lower: Value + state: SensorState + timing: Value + buffer: Value + lock: Value + userlist: Value + + +class SensorLower(Value): + """struct sensor_lowerhalf_s""" + + type: Value + nbuffer: Value + uncalibrated: Value + ops: Value + push_event: Value + notify_event: Value + + sensor_lock: Value + sensor_unlock: Value + priv: Value + persist: Value + + +class SensorUser(Value): + """struct sensor_user_s""" + + node: Value + fds: Value + role: Value + changed: Value + event: Value + flushing: Value + buffersem: Value + bufferpos: Value + state: SensorUState diff --git a/tools/pynuttx/nxgdb/uorb.py b/tools/pynuttx/nxgdb/uorb.py new file mode 100644 index 0000000000..3e0c611edb --- /dev/null +++ b/tools/pynuttx/nxgdb/uorb.py @@ -0,0 +1,158 @@ +############################################################################ +# tools/pynuttx/nxgdb/uorb.py +# +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. The +# ASF licenses this file to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance with the +# License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +############################################################################ + +from __future__ import annotations + +import argparse +import re +from typing import List + +import gdb + +from . import fs, utils +from .circbuf import CircBuf +from .protocols import uorb as p + + +class Sensor(utils.Value, p.SensorUpper): + """struct sensor_upperhalf_s and enhancement""" + + inode_s = utils.lookup_type("struct inode") + sensor_upperhalf_s = utils.lookup_type("struct sensor_upperhalf_s") + + def __init__(self, inode: gdb.Value | utils.Value, path=None): + # inode must be type of struct inode * + if inode.type.code != gdb.TYPE_CODE_PTR: + raise ValueError(f"Expect pointer type, got {inode.type}") + + super().__init__(inode["i_private"].cast(self.sensor_upperhalf_s.pointer())) + self.inode = inode + self._path = path + + def __repr__(self) -> str: + state = self.state + return f"{hex(self)} {self.topicname} {state.nsubscribers} subscribers, {state.nadvertisers} advertisers" + + def details(self) -> str: + state = self.state + return f"nbuffer: {state.nbuffer}, latency: {state.min_latency}, interval: {state.min_interval}" + + def __str__(self) -> str: + return self.__repr__() + + @property + def path(self): + return self._path or fs.inode_getpath(self.inode) + + @property + def nsubscribers(self) -> int: + return int(self.state["nsubscribers"]) + + @property + def nadvertisers(self) -> int: + return int(self.state["nadvertisers"]) + + @property + def topicname(self): + name = self.path.split("/")[-1] + name = re.sub(r"(\d$)", "", name) + name = re.sub(r"(_uncal$)", "", name) + return name + + @property + def metadata(self) -> p.OrbMetadata: + return utils.gdb_eval_or_none(f"g_orb_{self.topicname}") + + @property + def datatype(self) -> gdb.Type: + """Return the datatype of the topic like struct sensor_accel""" + return utils.lookup_type(f"struct {self.topicname}") + + @property + def circbuf(self) -> CircBuf: + if not self.datatype: + return None + return CircBuf(self.buffer, datatype=self.datatype.pointer()) + + +def get_topic_inodes(topic: str = None) -> List[fs.Inode]: + nodes = ( + (node, path) + for node, path in fs.foreach_inode() + if path.startswith("/dev/uorb/") and (not topic or topic in path) + ) + return nodes + + +def get_topics(topic: str = None) -> List[Sensor]: + nodes = get_topic_inodes(topic) + return (Sensor(node, path=path) for node, path in nodes) + + +class uORBDump(gdb.Command): + """Dump uORB topics""" + + formatter = "{:<20} {:<24} {:<6} {:<6} {:<6} {:<6} {:<12} {:<12} {:<20}" + header = ( + "Address", + "Topic", + "Subs", + "Ads", + "esize", + "nbuf", + "latency", + "interval", + "Circbuf", + ) + + def __init__(self): + super().__init__("uorb", gdb.COMMAND_USER) + + def invoke(self, arg: str, from_tty: bool) -> None: + parser = argparse.ArgumentParser(description=self.__doc__) + parser.add_argument( + "--topic", + type=str, + help="The topic name to dump, e.g. 'sensor_accel'", + default=None, + ) + + try: + args = parser.parse_args(gdb.string_to_argv(arg)) + except SystemExit: + return + + print(self.formatter.format(*self.header)) + for topic in get_topics(topic=args.topic): + print( + self.formatter.format( + hex(topic), + topic.topicname, + topic.nsubscribers, + topic.nadvertisers, + topic.state.esize, + topic.state.nbuffer, + topic.state.min_latency, + topic.state.min_interval, + hex(topic.buffer.address), + ) + )