Archive

Tag Archives: gadgets

Beurer and Omron are part of my routine every morning:

  • weight (Beurer)
  • orthostatic test (6min lying/3min standing) (Windlink)
  • blood sugar (CareSensN)
  • blood pressure (twice both arms) (Omron)

After all this is done i launch a small udev listener:

#!/usr/bin/env python
import pyudev
import subprocess

context = pyudev.Context()
monitor = pyudev.Monitor.from_netlink(context)
monitor.start()
monitor.filter_by('usb', 'usb_device')
print "Started:", monitor.started

for device in iter(monitor.poll, None):
    print device.action, device.subsystem, device
    try:
        atts = device.attributes
        vendor = atts["idVendor"]
        product = atts["idProduct"]
        name = atts["product"]
        print "%s %s:%s" % (name, vendor, product)
    except KeyError:
        continue
    if device.action !="add":
        continue
    if vendor == "04d9": #idProduct = 8010
        print "call"
        subprocess.call("./beurer.sh")
        print "done"
    if vendor == "0590": #idProduct = 0028
        print "call"
        subprocess.call("./omron.sh")
        print "done"

This will launch device specific scripts when a device is connected.
beurer.sh:

#!/bin/zsh

W=data/weight.csv
OLD=data/backup/weight.csv.`date +%FT%H:%M`
T='x.csv'
~/projects/usb/beurer/beurer.py >$T || exit $?

# backup
cp $W $OLD

# merge
tail -q -n +2 $OLD | sort -u $T - | cat $W.header - >$W

#feedback
cat $T

omron.sh

#!/bin/zsh

W=data/pressure.csv
OLD=data/backup/pressure.csv.`date +%FT%H:%M`
T='x.csv'

~/projects/usb/omron/omron.py >$T || exit $?

# backup
cp $W $OLD

# merge
tail -q -n +2 $OLD | sort -u $T - | cat $W.header - >$W

#feedback
cat $T

CareSensN is producing only one number and it is easier to enter it directly. My alias for this is:

alias rrs='>>~/monitor/data/sugar.csv     echo `date +%FT%H:%M`'

“Reports” is next.

This upper arm blood pressure monitor has memory for last 90 measurements. It records date, time, systolic/diastolic pressure and pulse.
It has somewhat more complicated USB protocol using request/response packets. It has also support for wakeup request.The following python program produces a CSV file with all variables available:

#!/usr/bin/env python

# ID 0590:0028
# MIT Elite Plus HEM-7301-ITKE7

from operator import xor
from datetime import datetime
import usb.core
import usb.util as util
from array import array
import sys

# omron device modes
OMRON_NULL_MODE = 0,0
OMRON_VID = 0x0590
OMRON_PID = 0x0028
OMRON_IN_ENDPT  = 0x81
OMRON_OUT_ENDPT = 0x02
debug = 0

def setMode(mode):
    r = dev.ctrl_transfer(
        util.build_request_type(util.CTRL_OUT, util.CTRL_TYPE_CLASS, util.CTRL_RECIPIENT_INTERFACE), # bmRequestType,
        9,                 # bRequest: REQ_HID_SET_REPORT
        0x300,             # wValue=0: (HID_REPORT_TYPE_FEATURE<<8)|feature_report_id,
        0,                 # wIndex=0: feature_interface_num
        mode,              # data_or_wLength=None:  feature_report
        1000               # timeout = None
    )

def _put(b, timeout=2000):
    r = dev.write(OMRON_OUT_ENDPT,b,0,timeout)
    return r

def _get(validate=0, timeout=2000):
    while 1:
        r = dev.read(OMRON_IN_ENDPT,8,0,timeout)
        if r[0] == 8:
            continue  #not ready, wait
        if r[0]<1 or r[0]>7:
            raise ValueError('Invalid byte count')

        if validate and ( r[1]!=ord('O') or r[2]!=ord('K')):
            raise ValueError('Bad response')
        return r[1:r[0]+1]

def _check(a):
    if reduce(xor, a[3:], 0) :
        raise ValueError('Bad checksum')
    return a

def wakeup():
    "Write zero packets until OK is received"
    try:
        r=_get(0,500)
    except: pass
    for i in range(5):
        _put((7,0,0,0,0,0,0,0,7,0,0,0,0,0,0,0))
        try:
           r=_get(1,500)
           break
        except: pass

def shutdown():
    _put(array('B',"\005END00"))
    _get()

def getNoOfMeasurements():
    _put(array('B',"\005CNT00"))
    return _check(_get(1) + _get())[4]

def getMeasurement(i):
    _put(array('B', (7,ord('M'),ord('E'),ord('S'),0,0,i,i)))
    return _check(_get(1) + _get() + _get())

def delete(): # fails with "NO"
    _put(array('B',"\005MCL00"))
    r=_get(1)

def check_time():
    _put(array('B',"\005GCL00"))
    r= _check(_get(1)+_get()) # returns 10 bytes: O K \x00 YY MM DD HH NN SS CRC
    t = datetime(2000+r[3],r[4],r[5],r[6],r[7],r[8] )
    t = abs(t-t.now()).total_seconds()
    if t > 300: # enough to detect DST
        print >> sys.stderr, "WARNING! Time difference is", t, "sec."

dev = usb.core.find(idVendor=OMRON_VID, idProduct=OMRON_PID)
if dev is None:
    raise ValueError('Device not found')

if dev.is_kernel_driver_active(0):
    dev.detach_kernel_driver(0)
dev.set_configuration() # do reset and set active conf. Must be done before claim.
util.claim_interface(dev, None)

setMode(OMRON_NULL_MODE) # mode value seems to have no effect
wakeup()
check_time()
cnt = getNoOfMeasurements()
for i in range(cnt):
    r = getMeasurement(i)
    print "%d-%02d-%02dT%02d:%02d:%02d %d %d %d" % (2000+r[3], r[4],r[5],r[6],r[7],r[8],r[11],r[12],r[13])
    #sys.stdout.flush()

shutdown()

To call it enter at command prompt:

./omron.py > pressure.csv

This scale can measure your weight and percentage of different body components. The control panel is recording 12 variables:

  1. weight kg
  2. body fat %
  3. water %
  4. muscles %
  5. bones kg
  6. date
  7. upper body fat % (not for BG64)
  8. lower body fat % (not for BG64)
  9. upper body muscles % (not for BG64)
  10. lower body muscles % (not for BG64)
  11. BMR
  12. AMR

All values are 16 bit words.
Up to 10 users can store thier data for 32 measurements.
Beacause date has no time only one measurement per day is possible.

The following python program produces a CSV file with all variables available on BG64 for the first user (can be specified as a command parameter):

#!/usr/bin/env python

"""
ID 04d9:8010 Holtek Semiconductor, Inc. 
"""
import usb.core
import usb.util as util
from array import array
import sys
from optparse import OptionParser, make_option
import pickle
from struct import unpack

dev = None

VID = 0x04d9
PID = 0x8010

# define HID constants

REQ_HID_GET_REPORT   = 0x01 
REQ_HID_GET_IDLE     = 0x02 
REQ_HID_GET_PROTOCOL = 0x03 

REQ_HID_SET_REPORT   = 0x09 
REQ_HID_SET_IDLE     = 0x0A 
REQ_HID_SET_PROTOCOL = 0x0B 

HID_REPORT_TYPE_INPUT   = 1<<8
HID_REPORT_TYPE_OUTPUT  = 2<<8
HID_REPORT_TYPE_FEATURE = 3<<8

def openDev():
    global dev
    dev = usb.core.find(idVendor=VID, idProduct=PID)
    if dev is None:
        raise ValueError('Device not found')

    if dev.is_kernel_driver_active(0):
        dev.detach_kernel_driver(0)
    dev.set_configuration() # do reset and set active conf. Must be done before claim.
    util.claim_interface(dev, None)

def setReport(reportId, data):
    r = dev.ctrl_transfer(
        util.build_request_type(util.CTRL_OUT, util.CTRL_TYPE_CLASS, util.CTRL_RECIPIENT_INTERFACE), # bmRequestType,
        REQ_HID_SET_REPORT,
        HID_REPORT_TYPE_OUTPUT | reportId,
        0,                 # feature_interface_num
        data,              # data
        3000               # timeout (1000 was too small for C_FREE)
    )
    return r

def read():
    openDev()
    try:
        dev.read(0x81,64) #flush 
    except usb.core.USBError:
        pass
    
    setReport(9, array('B',(0x10,0,0,0,0,0,0,0)))

    # Every user can have upto 12 variables
    # and additional 8*64 for user data
    r = []
    for i in xrange(120+8):
        x = dev.read(0x81,64)
        if not x:
            print >> sys.stderr, "Read failed"
            exit(1)
        if x[0]==0xff and (i<120):
            print >> sys.stderr,"Invalid data",x
            exit(1)
        r.append(x)    

    # Each variable can have upto 32 values
    frmt = "!" + "H"*32
    s = []
    for i in xrange(120) :
        x = unpack(frmt, r[i])
        
        # Variables 5 .. 10 are not available on my scale
        if i>5 and i<10:
            err = [item for item in x if item!=0]
        else:
            err = [item for item in x if item==0xffff]
        if len(err):
            print >> sys.stderr, "INVALID:", err
            exit(1)

        s.append(x)
    return s

def printUser(s, user):
    # Transpose from 12x32 to 32x12 and format

    j = 12*user
    for i in xrange(32):
        x = s[j+5][i] # date
        if not x:
           break
        print "{:d}-{:02d}-{:02d}T07:00:00 {:.1f} {:.1f} {:.1f} {:.1f} {:.1f} {:d} {:d}".format(
           1920+(x>>9), x>>5&0xf, x&0x1f,
           s[j+0][i]/10., s[j+1][i]/10., s[j+2][i]/10., s[j+3][i]/10., s[j+4][i]/10., s[j+10][i], s[j+11][i])

def main():

    option_list = [
        make_option("-c", "--cached", action="store_true", dest="cached"),
        #make_option("-w", "--write",action="store",      dest="output_file"),
        make_option('-u', '--user', action='store',      dest='user'),
    ]

    parser = OptionParser(option_list=option_list, usage='Usage: %prog [options]')
    options, args = parser.parse_args()

    if options.cached:
        with open("dump.pickle") as f: s = pickle.load(f)
    else:
        s = read()
        with open("dump.pickle","w") as f: pickle.dump(s,f)

    user = 0
    if options.user:
        user=int(options.user)-1

    printUser(s, user)


if __name__ == '__main__':
    main()


To call it enter at command prompt:

./beurer.py > user1.csv

I have Polar WindLink, Nike Sportband, Omron HEM-7301 and Beurer BG64 connected to my Linux laptop. Because all these are HID devices i used for this libusb and small python scripts. The intention is to describe them here starting from the simplest: BG64.