Source code for iio_attr
#!/usr/bin/env python
"""
Copyright (C) 2020 Analog Devices, Inc.
Author: Cristian Iacob <cristian.iacob@analog.com>
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; either version 2
of the License, or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
"""
import sys
import argparse
import iio
def _str_match(string, other_string, ignore_case):
if ignore_case:
string = string.lower()
other_string = other_string.lower()
return string == other_string
[docs]class Arguments:
"""Class for parsing the input arguments."""
def __init__(self):
"""Arguments class constructor."""
self.device = None
self.channel = None
self.attr = None
self.buffer = None
self.search_context = False
self.search_device = False
self.search_channel = False
self.search_buffer = False
self.search_debug = False
self.detect_context = None
self.arg_uri = None
self.input_only = None
self.output_only = None
self.scan_only = None
self.ignore_case = None
self.quiet = None
self.parser = argparse.ArgumentParser(description="iio_attr")
self._add_required_mutex_group()
self._add_help_group()
self._add_context_group()
self._add_channel_group()
args = self.parser.parse_args()
self._read_arguments(args)
def _add_required_mutex_group(self):
self._required_mutex_group = self.parser.add_mutually_exclusive_group(required=True)
self._required_mutex_group.add_argument("-d", "--device-attr",
type=str, metavar="", nargs="*",
help="Usage: [device] [attr] [value]\nRead/Write device attributes")
self._required_mutex_group.add_argument("-c", "--channel-attr",
type=str, metavar="", nargs="*",
help="Usage: [device] [channel] [attr] [value]\n"
"Read/Write channel attributes.")
self._required_mutex_group.add_argument("-B", "--buffer-attr",
type=str, metavar="", nargs="*",
help="Usage: [device] [attr] [value]\nRead/Write buffer attributes.")
self._required_mutex_group.add_argument("-D", "--debug-attr",
type=str, metavar="", nargs="*",
help="Usage: [device] [attr] [value]\nRead/Write debug attributes.")
self._required_mutex_group.add_argument("-C", "--context-attr",
type=str, metavar="", nargs="*",
help="Usage: [attr]\nRead IIO context attributes.")
def _add_help_group(self):
self._help_group = self.parser.add_argument_group("General")
self._help_group.add_argument("-I", "--ignore-case", action="store_true", help="Ignore case distinctions.")
self._help_group.add_argument("-q", "--quiet", action="store_true", help="Return result only.")
def _add_context_group(self):
self._context_group = self.parser.add_argument_group("Context connection")
self._context_group.add_argument("-u", "--uri", metavar="", type=str, nargs=1,
help="Use the context at the provided URI.")
self._context_group.add_argument("-a", "--auto", action="store_true",
help="Use the first context found.")
def _add_channel_group(self):
self._channel_group = self.parser.add_argument_group("Channel qualifiers")
self._channel_group.add_argument("-i", "--input-channel", action="store_true",
help="Filter Input Channels only.")
self._channel_group.add_argument("-o", "--output-channel", action="store_true",
help="Filter Output Channels only.")
self._channel_group.add_argument("-s", "--scan-channel", action="store_true",
help="Filter Scan Channels only.")
def _read_optional_arguments(self, args):
self.detect_context = args.auto
self.arg_uri = args.uri[0] if args.uri is not None else None
self.input_only = args.input_channel
self.output_only = args.output_channel
self.scan_only = args.scan_channel
self.ignore_case = args.ignore_case
self.quiet = args.quiet
def _read_device_arguments(self, args):
if len(args.device_attr) >= 4:
print("Too many options for searching for device attributes")
sys.exit(1)
self.search_device = True
self.device = args.device_attr[0] if len(args.device_attr) >= 1 else None
self.attr = args.device_attr[1] if len(args.device_attr) >= 2 else None
self.buffer = args.device_attr[2] if len(args.device_attr) >= 3 else None
def _read_channel_arguments(self, args):
if len(args.channel_attr) >= 5:
print("Too many options for searching for channel attributes")
sys.exit(1)
self.search_channel = True
self.device = args.channel_attr[0] if len(args.channel_attr) >= 1 else None
self.channel = args.channel_attr[1] if len(args.channel_attr) >= 2 else None
self.attr = args.channel_attr[2] if len(args.channel_attr) >= 3 else None
self.buffer = args.channel_attr[3] if len(args.channel_attr) >= 4 else None
def _read_buffer_arguments(self, args):
if len(args.buffer_attr) >= 4:
print("Too many options for searching for buffer attributes")
sys.exit(1)
self.search_buffer = True
self.device = args.buffer_attr[0] if len(args.buffer_attr) >= 1 else None
self.attr = args.buffer_attr[1] if len(args.buffer_attr) >= 2 else None
self.buffer = args.buffer_attr[2] if len(args.buffer_attr) >= 3 else None
def _read_debug_arguments(self, args):
if len(args.debug_attr) >= 4:
print("Too many options for searching for debug attributes")
sys.exit(1)
self.search_debug = True
self.device = args.debug_attr[0] if len(args.debug_attr) >= 1 else None
self.attr = args.debug_attr[1] if len(args.debug_attr) >= 2 else None
self.buffer = args.debug_attr[2] if len(args.debug_attr) >= 3 else None
def _read_context_arguments(self, args):
if len(args.context_attr) >= 2:
print("Too many options for searching for context attributes")
sys.exit(1)
self.search_context = True
self.attr = args.context_attr[0] if len(args.context_attr) >= 1 else None
def _read_arguments(self, args):
self._read_optional_arguments(args)
if args.device_attr is not None:
self._read_device_arguments(args)
if args.channel_attr is not None:
self._read_channel_arguments(args)
if args.buffer_attr is not None:
self._read_buffer_arguments(args)
if args.debug_attr is not None:
self._read_debug_arguments(args)
if args.context_attr is not None:
self._read_context_arguments(args)
[docs]class ContextBuilder:
"""Class for creating the requested context."""
def __init__(self, arguments):
"""
Class constructor.
Args:
arguments: type=Arguments
Contains the input arguments.
"""
self.ctx = None
self.arguments = arguments
def _auto(self):
contexts = iio.scan_contexts()
if len(contexts) == 0:
raise Exception("No IIO context found.\n")
if len(contexts) == 1:
uri, _ = contexts.popitem()
self.ctx = iio.Context(_context=uri)
else:
print("Multiple contexts found. Please select one using --uri!")
for uri, _ in contexts.items():
print(uri)
sys.exit(0)
return self
def _uri(self):
self.ctx = iio.Context(_context=self.arguments.arg_uri)
return self
def _default(self):
self.ctx = iio.Context()
return self
[docs] def create(self):
"""Create the requested context."""
try:
if self.arguments.detect_context:
self._auto()
elif self.arguments.arg_uri:
self._uri()
else:
self._default()
except FileNotFoundError:
raise Exception("Unable to create IIO context!\n")
return self.ctx
[docs]class Information:
"""Class for receiving the requested information about the attributes."""
def __init__(self, arguments, context):
"""
Class constructor.
Args:
arguments: type=Arguments
Contains the input arguments.
context: type=iio.Context
The created context.
"""
self.arguments = arguments
self.context = context
[docs] def write_information(self):
"""Write the requested information."""
self._context_information()
if self.arguments.search_device or self.arguments.search_channel or \
self.arguments.search_buffer or self.arguments.search_debug:
self._devices_information()
def _context_information(self):
if self.context is None:
print("Unable to create IIO context")
sys.exit(1)
if self.arguments.search_context:
if self.arguments.attr is None and len(self.context.attrs) > 0:
print("IIO context with " + str(len(self.context.attrs)) + " attributes:")
for key, value in self.context.attrs.items():
if self.arguments.attr is None or \
_str_match(key, self.arguments.attr, self.arguments.ignore_case):
print(key + ": " + value)
def _devices_information(self):
if self.arguments.device is None:
print("IIO context has " + str(len(self.context.devices)) + " devices:")
for dev in self.context.devices:
self._device_information(dev)
self._device_attributes_information(dev)
self._buffer_attributes_information(dev)
self._debug_attributes_information(dev)
def _device_information(self, dev):
if self.arguments.device is not None and \
not _str_match(dev.name, self.arguments.device, self.arguments.ignore_case):
return
if self.arguments.device is None:
print("\t" + dev.id + ":", end="")
if dev.name is not None:
print(" " + dev.name, end="")
print(", ", end="")
if self.arguments.search_channel and self.arguments.device is None:
print("found " + str(len(dev.channels)) + " channels")
for channel in dev.channels:
self._channel_information(dev, channel)
def _channel_information(self, dev, channel):
channel_stop = not self.arguments.search_channel or self.arguments.device is None
input_stop = self.arguments.input_only and channel.output
output_stop = self.arguments.output_only and not channel.output
scan_stop = self.arguments.scan_only and not channel.scan_element
if channel_stop or input_stop or output_stop or scan_stop:
return
type_name = "output" if channel.output else "input"
if not _str_match(self.arguments.device, dev.name, self.arguments.ignore_case):
return
if self.arguments.channel is not None and \
not _str_match(channel.id, self.arguments.channel, self.arguments.ignore_case) and \
(channel.name is None or (channel.name is not None and
not _str_match(channel.name, self.arguments.channel,
self.arguments.ignore_case))):
return
if (not self.arguments.scan_only and self.arguments.channel is None) or \
(self.arguments.scan_only and channel.scan_element):
print("dev \'" + dev.name + "\', channel \'" + channel.id + "\'", end="")
if channel.name is not None:
print(", id \'" + channel.name + "\'", end="")
print(" (" + type_name, end="")
if channel.scan_element:
self._scan_channel_information(channel)
else:
print("), ", end="")
self._channel_attributes_information(dev, channel)
def _scan_channel_information(self, channel):
sign = "s" if channel.data_format.is_signed else "u"
if channel.data_format.is_fully_defined:
sign = sign.upper()
if channel.data_format.repeat > 1:
print("X" + str(channel.data_format.repeat), end="")
print(", index: " + str(channel.index) + ", format: "
+ "b" if channel.data_format.is_be else "l"
+ "e:" + sign + str(channel.data_format.bits)
+ "/" + str(channel.data_format.length) + str(channel.data_format.repeat)
+ ">>" + str(channel.data_format.shift))
print("" if self.arguments.scan_only else ", ", end="")
def _channel_attributes_information(self, dev, channel):
if self.arguments.channel is None:
print("found " + str(len(channel.attrs)) + " channel-specific attributes")
if len(channel.attrs) == 0 or self.arguments.channel is None:
return
for key, _ in channel.attrs.items():
if self.arguments.attr is not None and \
not _str_match(key, self.arguments.attr, self.arguments.ignore_case):
continue
self._channel_attribute_information(dev, channel, key)
def _channel_attribute_information(self, dev, channel, attr):
if self.arguments.buffer is None or not self.arguments.quiet:
type_name = "output" if channel.output else "input"
if not self.arguments.quiet:
print("dev \'" + dev.name + "\', channel \'" + channel.id + "\' (" + type_name + "), ", end="")
if channel.name is not None:
print("id \'" + channel.name + "\', ", end="")
print("attr \'" + attr + "\', ", end="")
try:
print(channel.attrs[attr].value if self.arguments.quiet else
"value \'" + channel.attrs[attr].value + "\'")
except OSError as err:
print("ERROR: " + err.strerror + " (-" + str(err.errno) + ")")
if self.arguments.buffer is not None:
channel.attrs[attr].value = self.arguments.buffer
if not self.arguments.quiet:
print("wrote " + str(len(self.arguments.buffer)) + " bytes to " + attr)
self.arguments.buffer = None
self._channel_attribute_information(dev, channel, attr)
def _device_attributes_information(self, dev):
if self.arguments.search_device and self.arguments.device is None:
print("found " + str(len(dev.attrs)) + " device attributes")
if self.arguments.search_device and self.arguments.device is not None and len(dev.attrs) > 0:
for key, _ in dev.attrs.items():
if self.arguments.attr is not None and \
not _str_match(key, self.arguments.attr, self.arguments.ignore_case):
continue
self._device_attribute_information(dev, key)
def _device_attribute_information(self, dev, attr):
if self.arguments.buffer is None or not self.arguments.quiet:
if not self.arguments.quiet:
print("dev \'" + dev.name + "\', attr \'" + attr + "\', value: ", end="")
try:
print(dev.attrs[attr].value if self.arguments.quiet else "\'" + dev.attrs[attr].value + "\'")
except OSError as err:
print("ERROR: " + err.strerror + " (-" + str(err.errno) + ")")
if self.arguments.buffer is not None:
dev.attrs[attr].value = self.arguments.buffer
if not self.arguments.quiet:
print("wrote " + str(len(self.arguments.buffer)) + " bytes to " + attr)
self.arguments.buffer = None
self._device_attribute_information(dev, attr)
def _buffer_attributes_information(self, dev):
if self.arguments.search_buffer and self.arguments.device is None:
print("found " + str(len(dev.buffer_attrs)) + " buffer attributes")
elif self.arguments.search_buffer and _str_match(self.arguments.device, dev.name, self.arguments.ignore_case) \
and len(dev.buffer_attrs) > 0:
for key, _ in dev.buffer_attrs.items():
if (self.arguments.attr is not None and
_str_match(key, self.arguments.attr, self.arguments.ignore_case)) or \
self.arguments.attr is None:
self._buffer_attribute_information(dev, key)
def _buffer_attribute_information(self, dev, attr):
if self.arguments.buffer is None or not self.arguments.quiet:
if not self.arguments.quiet:
print("dev \'" + dev.name + "\', buffer attr \'" + attr + "\', value: ", end="")
try:
print(dev.buffer_attrs[attr].value if self.arguments.quiet else
"\'" + dev.buffer_attrs[attr].value + "\'")
except OSError as err:
print("ERROR: " + err.strerror + " (-" + str(err.errno) + ")")
if self.arguments.buffer is not None:
dev.buffer_attrs[attr].value = self.arguments.buffer
if not self.arguments.quiet:
print("wrote " + str(len(self.arguments.buffer)) + " bytes to " + attr)
self.arguments.buffer = None
self._buffer_attribute_information(dev, attr)
def _debug_attributes_information(self, dev):
if self.arguments.search_debug and self.arguments.device is None:
print("found " + str(len(dev.debug_attrs)) + " debug attributes")
elif self.arguments.search_debug and _str_match(self.arguments.device, dev.name, self.arguments.ignore_case) \
and len(dev.debug_attrs) > 0:
for key, _ in dev.debug_attrs.items():
if (self.arguments.attr is not None and _str_match(key, self.arguments.attr, self.arguments.quiet)) \
or self.arguments.attr is None:
self._debug_attribute_information(dev, key)
def _debug_attribute_information(self, dev, attr):
if self.arguments.buffer is None or not self.arguments.quiet:
if not self.arguments.quiet:
print("dev \'" + dev.name + "\', debug attr \'" + attr + "\', value: ", end="")
try:
print(dev.debug_attrs[attr].value if self.arguments.quiet else
"\'" + dev.debug_attrs[attr].value + "\'")
except OSError as err:
print("ERROR: " + err.strerror + " (-" + str(err.errno) + ")")
if self.arguments.buffer is not None:
dev.debug_attrs[attr].value = self.arguments.buffer
if not self.arguments.quiet:
print("wrote " + str(len(self.arguments.buffer)) + " bytes to " + attr)
self.arguments.buffer = None
self._debug_attribute_information(dev, attr)
[docs]def main():
"""Module's main method."""
arguments = Arguments()
context_builder = ContextBuilder(arguments)
context = context_builder.create()
information = Information(arguments, context)
information.write_information()
if __name__ == "__main__":
main()