Archive

USB

The data Windlink sends over the USB is quite complicated and i have not fully decoded all the fields, but i have the HRM data.

To use Windlink you have to wake it up and set the mode. Wakeup message has a string “ROSLYN” and a lot of 0x00 (“hello0” in python code). The mode setting command seems to have parameters. I am using (“hello1” in python code):

01 a1 00 00 00 00 00 00 00 00 4e 00"

which i picked up from Polar ProTrainer (it is sent from a function fnOnlineCom_OpenConnection(0,4) in PolarWindLink.dll).

After setup the Windlink starts to send data: 2 packets in every second.
These two packets are practically identical (only one flag is different).

Packet itself has 33 bytes. Here is a sample with the legend:

0401d4e16ca14e00000000000000000000000000002081e300c454effd000be1a1
..ff....ff..hh....ttttttttttttttttttttttttttttttvvvvvvvv..........

where:

ff - some flags
hh - average pulse for this second
tt - 12bit RR times (packed into bytes 9..23)
vv - some aggregated heart rate variability value

Source code:

#!/usr/bin/env python
"""
ID 0da4:0002 Polar
"""
import usb.core
import usb.util as util
from array import array
import sys
from optparse import OptionParser, make_option
import pickle
from struct import unpack
import time

hello0="\x01\x02\x00\x00\x00\x52\x4f\x53\x4c\x59\x4e\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
hello1 ="\x01\xa1\x00\x00\x00\x00\x00\x00\x00\x00\x4e\x00" # after OpenConnection(0,4)

bye = "\x01\x06"    #39 bytes

dev = None

VID = 0x0da4
PID = 0x0002
EP_IN = 0x81
EP_OUT = 0x03

def findDev():
    global dev
    dev = usb.core.find(idVendor=VID, idProduct=PID)
    if dev is None:
        raise ValueError('Device not found')

    if dev.is_kernel_driver_active(0):
        dev.detach_kernel_driver(0)
    dev.set_configuration()
    util.claim_interface(dev, None)

def openDev():
    global dev
    for i in xrange(5):
        try:
            dev.write(EP_OUT,hello0)
            dev.read(EP_IN, 33, timeout=5000)
        except usb.core.USBError:
            print >> sys.stderr, "Open failed"
            time.sleep(1)
            continue
        break

    dev.write(EP_OUT,hello1)
    dev.write(EP_OUT,hello0)  # This has some unknown effect

def resetDev():
    global dev
    try:
        dev.write(EP_OUT,bye)
        dev.read(EP_IN, 33, timeout=5000)
    except usb.core.USBError:
        pass

previousHR = [0,[],0]

def filterAndParse(d):
    global previousHR

    if d[1]!=1 :
        return None

    flags = d[4]
    hr = d[6]
    rr = []

    # Possibly bytes 9..23 contain HRM 12bit data
    # Here we print out only 16..23 which are filled at low pulse
    x = unpack("!QL", d[16:28])
    n =x[0]
    for i in xrange(0,60,12):
        ms = n>>i & 0xfff
        if ms!=0 or i==0: # first can be 0
            rr.append(ms)
    unknown =  (x[1]>>8)

    # if two consecutive lines differ only by flags then we skip it

    drop =  flags != previousHR[0] and previousHR[1:] == [rr, x[1]]
    previousHR = [flags, rr, x[1]]
    return ( None if drop else (hr, rr, unknown) )

def scan():

    startTime = time.time()
    currentRR = 0
    sys.stdout.write("time RR pulse HRV\n")
    while 1:
        try:
            rawData = dev.read(EP_IN, 33, timeout=5000)
            if not rawData:
                break
            timestamp = "%.3f" % (time.time()-startTime)

            data = filterAndParse(rawData)
            if not data:
                continue

            rr = data[1]               # rr values

            # check for a continuation
            hr = 60000./data[0]        # calculate RR from average HR (data[0])
            if (currentRR/hr > 0.8) and (rr[0]/hr > 0.8):
                currentRR = 0
            currentRR += rr[0]

            for i in xrange(1,len(rr)):
                sys.stdout.write("%s %d %d %d\n" % (timestamp, currentRR, data[0], data[2]))
                sys.stdout.flush()
                currentRR = rr[i]

        except usb.core.USBError:
            print >> sys.stderr, "timeout"
            break

def main():

    findDev()
    openDev()
    scan()
    resetDev()

if __name__ == '__main__':
    main()

For visualization i use kst2/ because it nicely scrolls in real time.

A script to start Windlink and kst2 together:

#!/bin/zsh

W='data/hr'

HR=$W.`date +%FT%H:%M`
HR_ERR=${W}_err.`date +%FT%H:%M`
~/projects/usb/polar/polar.py >$HR 2>$HR_ERR &

if [ $? -ne 0 ]; then; exit; fi
sleep 5
kst2 -F $HR polar.kst

pkill polar.py

and it looks like this:

hr

To visualize the collected data i use R and knitr. R has very convenient statistical tools and powerful graphics. knitr creates from the R output a nice HTML page.
The original idea was to analyse HRV data and R is an adequate choice for this. But because i am not doing any intensive training and recovery is not my problem the only variable influencing my decisions is my weight. The flexibility of my setup is still handy. I find myself quite often modifying the reports to match my interests.
The main R/knitr program is:


Weight tracking `r date()`
==============================

```{r echo=F,warning=F,highlight=T}
library("lubridate")
options(width=100)
old.t=trellis.par.set(theEconomist.theme(box = "transparent"))
old.l=lattice.options(theEconomist.opts())
trellis.par.set(reference.line=list(lwd=0.3))
opts_chunk$set( fig.width=12, fig.height=7, tidy=F, error=T, warning=T)
```

Regression line for last 4*7 days <b style="color:blue">blue</b>,
7 days <b style="color:red">red</b>.

```{r weight,echo=F,fig=T}
weight=read.table( file="data/weight.csv", sep=" ", header=T)
weight$date = as.POSIXct( weight$date, format="%Y-%m-%dT%H:%M:%S" )
wday = wday(weight$date,T)
weight=data.frame( weight,
  wday=factor( wday, levels=c("Mon","Tues","Wed","Thurs","Fri","Sat","Sun")), 
  wend=(wday=="Sat") | (wday=="Sun"))

xlim=c(as.POSIXct("2011-07-01"), Sys.time()+(30*3600*24))

p = function(x,y,...) {
  panel.abline(73.4, 0, col="#eeeeee", lwd=70)
  panel.lmline(tail(x,4*7), tail(y,4*7), col="blue")
  panel.lmline(tail(x,7), tail(y,7), col="red")
  panel.xyplot(x,y,col=ifelse(weight$wend,"red","darkblue"),...)
}
print((xyplot( 
  kg~date, main="Weight", data=weight, panel=p, 
  xlim=xlim, ylim=c(70,96),
  xlab=NULL, ylab=NULL, 
  pch=".", cex="3", type=c("p","smooth"),span=0.1 )))
```

```{r weightdiff,child="weightdiff.Rmd"}
```

```{r sugar,echo=F,fig=TRUE}
sugar = read.table( file="data/sugar.csv",  sep=" ", header=T)
sugar$date = as.POSIXct( sugar$date, format="%Y-%m-%dT%H:%M")
sugar= sugar[ hour( sugar$date)<10 & hour(sugar$date)>5,] #filter: keep morning only

pSugar = function(x,y,...) {
  panel.grid(v=FALSE,h=-1,...)
  panel.xyplot(x,y,...)
}

print((xyplot(
  glu ~ date, main="Sugar", data=sugar, panel=pSugar,
  xlim=xlim, ylim=c(4,8),
  xlab=NULL, ylab=NULL, type=c("p","smooth"), span=0.1)))
```

```{r pressure,echo=F,fig=TRUE}
pressure = read.table( file="data/pressure.csv", sep=" ", header=T)
pressure$date = as.POSIXct( pressure$date, format="%Y-%m-%dT%H:%M:%S")
pressure = pressure[ hour( pressure$date )<10,] #filter: keep morning only

pPressure = function(x,y,...) {
  panel.grid(v=FALSE,h=-1,...)
  #panel.average(trunc(x,"day"),pressure$sys,horizontal=F)
  panel.xyplot(x,y,...)
}

print((xyplot(
  sys+di+fr ~ date, main="Blood pressure", data=pressure, panel=pPressure,
  xlim=xlim,
  xlab=NULL, ylab=NULL, pch=".", cex=3, type=c("p","smooth"), 
  col=c("green","red","black","blue"), span=0.05)))
```

```{r ortho,child="orthostatic.Rmd", eval=F}
```

```{r finish,echo=F}
trellis.par.set(old.t)
lattice.options(old.l)
```

(It references two unimportant include files weightdiff.Rmd and orthostatic.Rmd.)

This script produces this Sample report (converted with wkhtmltopdf).

Beurer and Omron are part of my routine every morning:

  • weight (Beurer)
  • orthostatic test (6min lying/3min standing) (Windlink)
  • blood sugar (CareSensN)
  • blood pressure (twice both arms) (Omron)

After all this is done i launch a small udev listener:

#!/usr/bin/env python
import pyudev
import subprocess

context = pyudev.Context()
monitor = pyudev.Monitor.from_netlink(context)
monitor.start()
monitor.filter_by('usb', 'usb_device')
print "Started:", monitor.started

for device in iter(monitor.poll, None):
    print device.action, device.subsystem, device
    try:
        atts = device.attributes
        vendor = atts["idVendor"]
        product = atts["idProduct"]
        name = atts["product"]
        print "%s %s:%s" % (name, vendor, product)
    except KeyError:
        continue
    if device.action !="add":
        continue
    if vendor == "04d9": #idProduct = 8010
        print "call"
        subprocess.call("./beurer.sh")
        print "done"
    if vendor == "0590": #idProduct = 0028
        print "call"
        subprocess.call("./omron.sh")
        print "done"

This will launch device specific scripts when a device is connected.
beurer.sh:

#!/bin/zsh

W=data/weight.csv
OLD=data/backup/weight.csv.`date +%FT%H:%M`
T='x.csv'
~/projects/usb/beurer/beurer.py >$T || exit $?

# backup
cp $W $OLD

# merge
tail -q -n +2 $OLD | sort -u $T - | cat $W.header - >$W

#feedback
cat $T

omron.sh

#!/bin/zsh

W=data/pressure.csv
OLD=data/backup/pressure.csv.`date +%FT%H:%M`
T='x.csv'

~/projects/usb/omron/omron.py >$T || exit $?

# backup
cp $W $OLD

# merge
tail -q -n +2 $OLD | sort -u $T - | cat $W.header - >$W

#feedback
cat $T

CareSensN is producing only one number and it is easier to enter it directly. My alias for this is:

alias rrs='>>~/monitor/data/sugar.csv     echo `date +%FT%H:%M`'

“Reports” is next.

This upper arm blood pressure monitor has memory for last 90 measurements. It records date, time, systolic/diastolic pressure and pulse.
It has somewhat more complicated USB protocol using request/response packets. It has also support for wakeup request.The following python program produces a CSV file with all variables available:

#!/usr/bin/env python

# ID 0590:0028
# MIT Elite Plus HEM-7301-ITKE7

from operator import xor
from datetime import datetime
import usb.core
import usb.util as util
from array import array
import sys

# omron device modes
OMRON_NULL_MODE = 0,0
OMRON_VID = 0x0590
OMRON_PID = 0x0028
OMRON_IN_ENDPT  = 0x81
OMRON_OUT_ENDPT = 0x02
debug = 0

def setMode(mode):
    r = dev.ctrl_transfer(
        util.build_request_type(util.CTRL_OUT, util.CTRL_TYPE_CLASS, util.CTRL_RECIPIENT_INTERFACE), # bmRequestType,
        9,                 # bRequest: REQ_HID_SET_REPORT
        0x300,             # wValue=0: (HID_REPORT_TYPE_FEATURE<<8)|feature_report_id,
        0,                 # wIndex=0: feature_interface_num
        mode,              # data_or_wLength=None:  feature_report
        1000               # timeout = None
    )

def _put(b, timeout=2000):
    r = dev.write(OMRON_OUT_ENDPT,b,0,timeout)
    return r

def _get(validate=0, timeout=2000):
    while 1:
        r = dev.read(OMRON_IN_ENDPT,8,0,timeout)
        if r[0] == 8:
            continue  #not ready, wait
        if r[0]<1 or r[0]>7:
            raise ValueError('Invalid byte count')

        if validate and ( r[1]!=ord('O') or r[2]!=ord('K')):
            raise ValueError('Bad response')
        return r[1:r[0]+1]

def _check(a):
    if reduce(xor, a[3:], 0) :
        raise ValueError('Bad checksum')
    return a

def wakeup():
    "Write zero packets until OK is received"
    try:
        r=_get(0,500)
    except: pass
    for i in range(5):
        _put((7,0,0,0,0,0,0,0,7,0,0,0,0,0,0,0))
        try:
           r=_get(1,500)
           break
        except: pass

def shutdown():
    _put(array('B',"\005END00"))
    _get()

def getNoOfMeasurements():
    _put(array('B',"\005CNT00"))
    return _check(_get(1) + _get())[4]

def getMeasurement(i):
    _put(array('B', (7,ord('M'),ord('E'),ord('S'),0,0,i,i)))
    return _check(_get(1) + _get() + _get())

def delete(): # fails with "NO"
    _put(array('B',"\005MCL00"))
    r=_get(1)

def check_time():
    _put(array('B',"\005GCL00"))
    r= _check(_get(1)+_get()) # returns 10 bytes: O K \x00 YY MM DD HH NN SS CRC
    t = datetime(2000+r[3],r[4],r[5],r[6],r[7],r[8] )
    t = abs(t-t.now()).total_seconds()
    if t > 300: # enough to detect DST
        print >> sys.stderr, "WARNING! Time difference is", t, "sec."

dev = usb.core.find(idVendor=OMRON_VID, idProduct=OMRON_PID)
if dev is None:
    raise ValueError('Device not found')

if dev.is_kernel_driver_active(0):
    dev.detach_kernel_driver(0)
dev.set_configuration() # do reset and set active conf. Must be done before claim.
util.claim_interface(dev, None)

setMode(OMRON_NULL_MODE) # mode value seems to have no effect
wakeup()
check_time()
cnt = getNoOfMeasurements()
for i in range(cnt):
    r = getMeasurement(i)
    print "%d-%02d-%02dT%02d:%02d:%02d %d %d %d" % (2000+r[3], r[4],r[5],r[6],r[7],r[8],r[11],r[12],r[13])
    #sys.stdout.flush()

shutdown()

To call it enter at command prompt:

./omron.py > pressure.csv

This scale can measure your weight and percentage of different body components. The control panel is recording 12 variables:

  1. weight kg
  2. body fat %
  3. water %
  4. muscles %
  5. bones kg
  6. date
  7. upper body fat % (not for BG64)
  8. lower body fat % (not for BG64)
  9. upper body muscles % (not for BG64)
  10. lower body muscles % (not for BG64)
  11. BMR
  12. AMR

All values are 16 bit words.
Up to 10 users can store thier data for 32 measurements.
Beacause date has no time only one measurement per day is possible.

The following python program produces a CSV file with all variables available on BG64 for the first user (can be specified as a command parameter):

#!/usr/bin/env python

"""
ID 04d9:8010 Holtek Semiconductor, Inc. 
"""
import usb.core
import usb.util as util
from array import array
import sys
from optparse import OptionParser, make_option
import pickle
from struct import unpack

dev = None

VID = 0x04d9
PID = 0x8010

# define HID constants

REQ_HID_GET_REPORT   = 0x01 
REQ_HID_GET_IDLE     = 0x02 
REQ_HID_GET_PROTOCOL = 0x03 

REQ_HID_SET_REPORT   = 0x09 
REQ_HID_SET_IDLE     = 0x0A 
REQ_HID_SET_PROTOCOL = 0x0B 

HID_REPORT_TYPE_INPUT   = 1<<8
HID_REPORT_TYPE_OUTPUT  = 2<<8
HID_REPORT_TYPE_FEATURE = 3<<8

def openDev():
    global dev
    dev = usb.core.find(idVendor=VID, idProduct=PID)
    if dev is None:
        raise ValueError('Device not found')

    if dev.is_kernel_driver_active(0):
        dev.detach_kernel_driver(0)
    dev.set_configuration() # do reset and set active conf. Must be done before claim.
    util.claim_interface(dev, None)

def setReport(reportId, data):
    r = dev.ctrl_transfer(
        util.build_request_type(util.CTRL_OUT, util.CTRL_TYPE_CLASS, util.CTRL_RECIPIENT_INTERFACE), # bmRequestType,
        REQ_HID_SET_REPORT,
        HID_REPORT_TYPE_OUTPUT | reportId,
        0,                 # feature_interface_num
        data,              # data
        3000               # timeout (1000 was too small for C_FREE)
    )
    return r

def read():
    openDev()
    try:
        dev.read(0x81,64) #flush 
    except usb.core.USBError:
        pass
    
    setReport(9, array('B',(0x10,0,0,0,0,0,0,0)))

    # Every user can have upto 12 variables
    # and additional 8*64 for user data
    r = []
    for i in xrange(120+8):
        x = dev.read(0x81,64)
        if not x:
            print >> sys.stderr, "Read failed"
            exit(1)
        if x[0]==0xff and (i<120):
            print >> sys.stderr,"Invalid data",x
            exit(1)
        r.append(x)    

    # Each variable can have upto 32 values
    frmt = "!" + "H"*32
    s = []
    for i in xrange(120) :
        x = unpack(frmt, r[i])
        
        # Variables 5 .. 10 are not available on my scale
        if i>5 and i<10:
            err = [item for item in x if item!=0]
        else:
            err = [item for item in x if item==0xffff]
        if len(err):
            print >> sys.stderr, "INVALID:", err
            exit(1)

        s.append(x)
    return s

def printUser(s, user):
    # Transpose from 12x32 to 32x12 and format

    j = 12*user
    for i in xrange(32):
        x = s[j+5][i] # date
        if not x:
           break
        print "{:d}-{:02d}-{:02d}T07:00:00 {:.1f} {:.1f} {:.1f} {:.1f} {:.1f} {:d} {:d}".format(
           1920+(x>>9), x>>5&0xf, x&0x1f,
           s[j+0][i]/10., s[j+1][i]/10., s[j+2][i]/10., s[j+3][i]/10., s[j+4][i]/10., s[j+10][i], s[j+11][i])

def main():

    option_list = [
        make_option("-c", "--cached", action="store_true", dest="cached"),
        #make_option("-w", "--write",action="store",      dest="output_file"),
        make_option('-u', '--user', action='store',      dest='user'),
    ]

    parser = OptionParser(option_list=option_list, usage='Usage: %prog [options]')
    options, args = parser.parse_args()

    if options.cached:
        with open("dump.pickle") as f: s = pickle.load(f)
    else:
        s = read()
        with open("dump.pickle","w") as f: pickle.dump(s,f)

    user = 0
    if options.user:
        user=int(options.user)-1

    printUser(s, user)


if __name__ == '__main__':
    main()


To call it enter at command prompt:

./beurer.py > user1.csv

I have Polar WindLink, Nike Sportband, Omron HEM-7301 and Beurer BG64 connected to my Linux laptop. Because all these are HID devices i used for this libusb and small python scripts. The intention is to describe them here starting from the simplest: BG64.