Source code for iio_writedev

#!/usr/bin/env python
"""
Copyright (C) 2020 Analog Devices, Inc.
Author: Cristian Iacob <cristian.iacob@analog.com>

This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; either version 2
of the License, or (at your option) any later version.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
GNU General Public License for more details.

You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
"""

import time
import sys
import argparse
import iio


[docs] class Arguments: """Class for parsing the input arguments.""" def __init__(self): """Arguments class constructor.""" self.parser = argparse.ArgumentParser(description="iio_writedev") self._add_parser_arguments() args = self.parser.parse_args() self.network = str(args.network) if args.network else None self.arg_uri = str(args.uri) if args.uri else None self.scan_for_context = args.auto self.buffer_size = int(args.buffer_size) if args.buffer_size else 256 self.num_samples = int(args.samples) if args.samples else 0 self.timeout = int(args.timeout) if args.timeout else 0 self.cyclic = args.cyclic self.device_name = args.device[0] self.channels = args.channel def _add_parser_arguments(self): self.parser.add_argument( "-n", "--network", type=str, metavar="", help="Use the network backend with the provided hostname.", ) self.parser.add_argument( "-u", "--uri", type=str, metavar="", help="Use the context with the provided URI.", ) self.parser.add_argument( "-b", "--buffer-size", type=int, metavar="", help="Size of the capture buffer. Default is 256.", ) self.parser.add_argument( "-s", "--samples", type=int, metavar="", help="Number of samples to capture, 0 = infinite. Default is 0.", ) self.parser.add_argument( "-T", "--timeout", type=int, metavar="", help="Buffer timeout in milliseconds. 0 = no timeout", ) self.parser.add_argument( "-a", "--auto", action="store_true", help="Scan for available contexts and if only one is available use it.", ) self.parser.add_argument( "-c", "--cyclic", action="store_true", help="Use cyclic buffer mode." ) self.parser.add_argument("device", type=str, nargs=1) self.parser.add_argument("channel", type=str, nargs="*")
[docs] class ContextBuilder: """Class for creating the requested context.""" def __init__(self, arguments): """ Class constructor. Args: arguments: type=Arguments Contains the input arguments. """ self.ctx = None self.arguments = arguments def _timeout(self): if self.arguments.timeout >= 0: self.ctx.timeout = self.arguments.timeout return self def _auto(self): contexts = iio.scan_contexts() if len(contexts) == 0: raise Exception("No IIO context found.\n") if len(contexts) == 1: uri, _ = contexts.popitem() self.ctx = iio.Context(_context=uri) else: print("Multiple contexts found. Please select one using --uri!") for uri, _ in contexts.items(): print(uri) sys.exit(0) return self def _uri(self): self.ctx = iio.Context(_context=self.arguments.arg_uri) return self def _network(self): self.ctx = iio.NetworkContext(self.arguments.network) return self def _default(self): self.ctx = iio.Context() return self
[docs] def create(self): """Create the requested context.""" try: if self.arguments.scan_for_context: self._auto() elif self.arguments.arg_uri: self._uri() elif self.arguments.arg_ip: self._network() else: self._default() except FileNotFoundError: raise Exception("Unable to create IIO context!\n") self._timeout() return self.ctx
[docs] class BufferBuilder: """Class for creating the buffer.""" def __init__(self, ctx, arguments): """ Class constructor. Args: ctx: type=iio.Context This buffer's context. arguments: type=Arguments Contains the input arguments. """ self.ctx = ctx self.arguments = arguments self.dev = None def _device(self): self.dev = self.ctx.find_device(self.arguments.device_name) if self.dev is None: raise Exception("Device %s not found!" % self.arguments.device_name) return self def _channels(self): if len(self.arguments.channels) == 0: for channel in self.dev.channels: channel.enabled = True else: for channel_idx in self.arguments.channels: self.dev.channels[int(channel_idx)].enabled = True return self
[docs] def create(self): """Create the IIO buffer.""" self._device() self._channels() buffer = iio.Buffer(self.dev, self.arguments.buffer_size, self.arguments.cyclic) if buffer is None: raise Exception("Unable to create buffer!\n") return buffer
[docs] class DataWriter: """Class for writing samples to the device.""" def __init__(self, ctx, arguments): """ Class constructor. Args: ctx: type=iio.Context Current context. arguments: type=Arguments Contains the input arguments. """ buffer_builder = BufferBuilder(ctx, arguments) self.buffer = buffer_builder.create() self.device = buffer_builder.dev self.arguments = arguments
[docs] def write(self): """Push data into the buffer.""" app_running = True num_samples = self.arguments.num_samples while app_running: bytes_to_read = ( len(self.buffer) if num_samples == 0 else min(len(self.buffer), num_samples * self.device.sample_size) ) write_len = bytes_to_read data = [] while bytes_to_read > 0: read_data = sys.stdin.buffer.read(bytes_to_read) if len(read_data) == 0: sys.exit(0) bytes_to_read -= len(read_data) data.extend(read_data) if self.buffer.write(bytearray(data)) == 0: raise Exception("Unable to push buffer!") self.buffer.push() while app_running and self.arguments.cyclic: time.sleep(1) if num_samples > 0: num_samples -= write_len // self.device.sample_size if num_samples == 0: app_running = False
[docs] def main(): """Module's main method.""" arguments = Arguments() context_builder = ContextBuilder(arguments) writer = DataWriter(context_builder.create(), arguments) writer.write()
if __name__ == "__main__": main()