Skip to content

Commit

Permalink
Added initial code
Browse files Browse the repository at this point in the history
  • Loading branch information
Fescron committed Apr 7, 2024
1 parent 35f2eda commit 7117773
Show file tree
Hide file tree
Showing 9 changed files with 928 additions and 4 deletions.
5 changes: 3 additions & 2 deletions LICENSE
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
MIT License
# MIT License

Copyright (c) 2024 Brecht Van Eeckhoudt
- Copyright (c) 2024 Fescron (Brecht Van Eeckhoudt)
- Copyright (c) 2022 didim99

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
Expand Down
91 changes: 89 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,89 @@
# witrn-ui-bokeh
Python Bokeh utility for reading, plotting and storing data from modern WITRN USB-meters using a web-browser
# WITRN UI BOKEH

The code on this repository builds upon the existing Python driver-functionality of [witrn-ui](https://github.com/didim99/witrn-ui) by [didim99](https://github.com/didim99), by adding real-time plotting (and logging) functionality using [Bokeh](http://bokeh.org/). The result allows the user to control a WITRN USB-meter to be controlled using a web-browser.

Currently the code has only been tested using a WITRN C4. Functionality was tried to be added for the U2P but on first glance its protocol (see [this](https://wiki.cuvoodoo.info/doku.php?id=web-u2) and [this](https://git.cuvoodoo.info/kingkevin/web-u2/src/branch/master/u2_usb.c)) seems to be different.

<br>

## 1 - Installation

### 1.1 - Install required packages

```bash
pip install -r requirements.txt
```

**NOTE:** The `requirements.txt` file is generated by running `pipreqs` (`pip install pipreqs`) in the project directory. `~=` means *compatible version*.

<br>

### 1.2 - Configure platform-dependent functionality

#### 1.2.1 - Linux

Copy the udev-rules-file to the correct location.

```bash
sudo cp udev/90-usbmeter.rules /etc/udev/rules.d/90-usbmeter.rules
```

<br>

Restart the udev management tool.

```bash
sudo udevadm control --reload-rules
sudo udevadm trigger
```

<br>

#### 1.2.2 - Windows

Use [Zadig](https://zadig.akeo.ie/) to replace Windows's default HID-driver to `libusbK`.

1. Run `zadig-2.8.exe`
2. Click on `Options` > `List All Devices`
3. Select the `WITRN.C4` device (in case the [Witrn C4](https://aliexpress.com/item/1005004748597690.html) is used)
4. Select `libusbK (v3.0.7.0)` above the `Replace Driver` button
5. Click on `Replace Driver`
6. Confirm the driver replacement operation, wait for the tool to finish

<br>

## 2 - Running

Running `witrn-ui-bokeh.py` *as-is* with Python won't do anything. It is supposed to be launched using [Bokeh server](https://docs.bokeh.org/en/latest/docs/user_guide/server/server_introduction.html#ug-server-introduction) (`bokeh serve`), which creates (and opens, on [http://localhost:5006/witrn-ui-bokeh](http://localhost:5006/witrn-ui-bokeh)) an interactive web application that connects front-end UI events to running Python code. The correct usage is shown below.

```bash
bokeh serve --show witrn-ui-bokeh.py
```

<br>

Once the web-interface opens, use the following steps.

1. Select a **Device** using the dropdown (`C4` selected by default).
- Depending on the way the current will flow/flows through the USB-meter, disable (default enabled) the **Invert Current Sign** logic using the corresponding toggle-switch.
2. Click on <kbd>Open Connection</kbd>
- Change the **Plot Period \[ms\]** to a value greater than `0` if the measurement-throughput is too/unnecessary high (the tool defaults to (try to) plot all incoming measurements, a non-zero value only plots incoming measurements at the defined period)
- Click on <kbd>Clear Plot</kbd> to clear all of the already plotted measurements (the tool only keeps `6000` (configurable, using `MAX_X_POINTS` in [witrn-ui-bokeh.py](witrn-ui-bokeh.py)) data-points on the X-axis (per data-line) before discarding old data, in an attempt to maintain a somewhat responsive real-time application)
- Click on an item in the **Legend** to enable or disable the display of certain lines on the plot
3. Use the tooltips on the side (activated when hovering over the plot) to zoom in or out on the measurements, ...
4. Change the plot- and file- **Title** and **Log Period \[ms\]** using the corresponding fields (if necessary), click on <kbd>Start Logging</kbd> to start saving values to a `.csv`-file.
- The generated file has the following syntax (`DateTime` corresponds to the time at which the button has been pressed): `YYYY-MM-DD_HH-MM-SS_<Title>.csv`

<br>

Usage sample:

<img src="docs/witrn-ui-bokeh.gif" alt="witrn-ui-bokeh usage example">

<br>

## 3 - TODO

- Add functionality to re-plot stored CSV-data
- Add functionality to "download" offline recordings from the USB-meter
- Implement U2P functionality
Binary file added docs/witrn-ui-bokeh.gif
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
88 changes: 88 additions & 0 deletions driver/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
# Original code by didim99, modified by Fescron

from threading import Thread, Event
from typing import Callable
import usb.core
import usb.util
from usb.core import USBError, Device as USBDevice
from driver.protocol import DeviceInfo, KnownDevice, HIDPacket


class USBMeter:
_info: DeviceInfo
_device: USBDevice

_running: Event
_recv_thread: Thread
_recv_cb: Callable
_error_cb: Callable

def __init__(self, _info: KnownDevice):
self._info = _info.value
self._running = Event()

def recv_callback(self, callback: Callable):
self._recv_cb = callback

def error_callback(self, callback: Callable):
self._error_cb = callback

def connect(self):
self._device = usb.core.find(idVendor=self._info.idVendor,
idProduct=self._info.idProduct)
if self._device is None:
raise IOError(f"Device {self._info} not found!")

try:
if self._device.is_kernel_driver_active(0):
try:
self._device.detach_kernel_driver(0)
print("Kernel driver detached")
except USBError as e:
raise IOError("Could not detach kernel driver") from e
else:
print("No kernel driver attached")
except NotImplementedError as e:
print(f"Not implemented: '{e}', proceeding")

try:
usb.util.claim_interface(self._device, 0)
print("Claimed device")
except USBError as e:
raise IOError("Could not claim the device") from e
try:
self._device.reset()
except usb.core.USBError as e:
raise IOError("Could not set configuration") from e

def disconnect(self):
try:
usb.util.release_interface(self._device, 0)
print("Un-claimed device")
except USBError as e:
raise IOError("Could not un-claim the device") from e

def start_read(self):
self._running.set()
name = self._info.devName + " reader thread"
self._recv_thread = Thread(name=name, target=self._reader_loop)
self._recv_thread.start()

def stop_read(self):
self._running.clear()

def _reader_loop(self):
while self._running.is_set():
try:
data = self._device.read(self._info.endpoint, 64)
if not data:
continue

data = HIDPacket(data)
if self._recv_cb is not None:
self._recv_cb(data)
except USBError as e:
self._running.clear()
if self._error_cb is not None:
self._error_cb(e)
break
142 changes: 142 additions & 0 deletions driver/binutils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
# Original code by Daniel Brodie, modified by didim99
# See: https://code.activestate.com/recipes/576666/

import struct


def hexify(data, sep=' '):
return sep.join([format(x, '02x') for x in data])


def preargs(cls):
def _pre_init(*args1, **kwargs1):
def _my_init(*args2, **kwargs2):
args = args1 + args2
kwargs1.update(kwargs2)
return cls(*args, **kwargs1)
return _my_init
return _pre_init


class BinaryMetaType(type):
def __getitem__(self, val):
return Array(self, val)


class BinaryType(metaclass=BinaryMetaType):
def __init__(self, **kwargs):
self._kwargs = kwargs

def to_binary(self, val):
pass

def from_binary(self, binary):
pass


class SimpleBinaryType(BinaryType):
def __init__(self, fmt, **kwargs):
super().__init__(**kwargs)
self._struct = struct.Struct(fmt)

def to_binary(self, val):
return self._struct.pack(val)

def from_binary(self, binary):
return (self._struct.size,
self._struct.unpack(binary[:self._struct.size])[0])


@preargs
class Array(BinaryType):
def __init__(self, arr_type, arr_len, **kwargs):
super().__init__(**kwargs)
self._arr_type, self._arr_len = arr_type(**kwargs), arr_len

def to_binary(self, val):
res = []
for i, v in enumerate(val):
res.append(self._arr_type.to_binary(v))
if i+1 == self._arr_len:
break
return b''.join(res)

def from_binary(self, binary):
res = []
ssum = 0
for i in range(self._arr_len):
s, v = self._arr_type.from_binary(binary[ssum:])
ssum += s
res.append(v)
return ssum, res


class Byte(SimpleBinaryType):
def __init__(self, **kwargs):
super().__init__('B', **kwargs)


class Word(SimpleBinaryType):
def __init__(self, **kwargs):
super().__init__('H', **kwargs)


class Dword(SimpleBinaryType):
def __init__(self, **kwargs):
super().__init__('I', **kwargs)


class Float(SimpleBinaryType):
def __init__(self, **kwargs):
super().__init__('f', **kwargs)


class BinaryBuilder(dict):
def __init__(self, **kwargs):
super().__init__()
self.members = []
self._kwargs = kwargs

def __setitem__(self, key, value):
if key.startswith('__'):
return
if not callable(value):
return
if key not in self:
self.members.append((key, value(**self._kwargs)))
super().__setitem__(key, value)


class Binary(type):
@classmethod
def __prepare__(mcs, cls, bases, **kwargs):
# In the future kwargs can contain things such as endianity
# and alignment
return BinaryBuilder(**kwargs)

def __new__(mcs, name, bases, class_dict):
# There are nicer ways of doing this, but as a hack it works
def fixupdict(d):
@classmethod
def to_binary(clas, datadict):
res = []
for k, v in clas.members:
res.append(v.to_binary(datadict[k]))
return b''.join(res)

@classmethod
def from_binary(cls, bytes_in):
res = {}
ssum = 0
for k, v in cls.members:
i, _d = v.from_binary(bytes_in[ssum:])
ssum += i
res[k] = _d
return ssum, res

nd = {'to_binary': to_binary,
'from_binary': from_binary,
'members': d.members}
return nd

return super().__new__(mcs, name, bases, fixupdict(class_dict))
Loading

0 comments on commit 7117773

Please sign in to comment.