Archive

Polar

The data Windlink sends over the USB is quite complicated and i have not fully decoded all the fields, but i have the HRM data.

To use Windlink you have to wake it up and set the mode. Wakeup message has a string “ROSLYN” and a lot of 0x00 (“hello0” in python code). The mode setting command seems to have parameters. I am using (“hello1” in python code):

01 a1 00 00 00 00 00 00 00 00 4e 00"

which i picked up from Polar ProTrainer (it is sent from a function fnOnlineCom_OpenConnection(0,4) in PolarWindLink.dll).

After setup the Windlink starts to send data: 2 packets in every second.
These two packets are practically identical (only one flag is different).

Packet itself has 33 bytes. Here is a sample with the legend:

0401d4e16ca14e00000000000000000000000000002081e300c454effd000be1a1
..ff....ff..hh....ttttttttttttttttttttttttttttttvvvvvvvv..........

where:

ff - some flags
hh - average pulse for this second
tt - 12bit RR times (packed into bytes 9..23)
vv - some aggregated heart rate variability value

Source code:

#!/usr/bin/env python
"""
ID 0da4:0002 Polar
"""
import usb.core
import usb.util as util
from array import array
import sys
from optparse import OptionParser, make_option
import pickle
from struct import unpack
import time

hello0="\x01\x02\x00\x00\x00\x52\x4f\x53\x4c\x59\x4e\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
hello1 ="\x01\xa1\x00\x00\x00\x00\x00\x00\x00\x00\x4e\x00" # after OpenConnection(0,4)

bye = "\x01\x06"    #39 bytes

dev = None

VID = 0x0da4
PID = 0x0002
EP_IN = 0x81
EP_OUT = 0x03

def findDev():
    global dev
    dev = usb.core.find(idVendor=VID, idProduct=PID)
    if dev is None:
        raise ValueError('Device not found')

    if dev.is_kernel_driver_active(0):
        dev.detach_kernel_driver(0)
    dev.set_configuration()
    util.claim_interface(dev, None)

def openDev():
    global dev
    for i in xrange(5):
        try:
            dev.write(EP_OUT,hello0)
            dev.read(EP_IN, 33, timeout=5000)
        except usb.core.USBError:
            print >> sys.stderr, "Open failed"
            time.sleep(1)
            continue
        break

    dev.write(EP_OUT,hello1)
    dev.write(EP_OUT,hello0)  # This has some unknown effect

def resetDev():
    global dev
    try:
        dev.write(EP_OUT,bye)
        dev.read(EP_IN, 33, timeout=5000)
    except usb.core.USBError:
        pass

previousHR = [0,[],0]

def filterAndParse(d):
    global previousHR

    if d[1]!=1 :
        return None

    flags = d[4]
    hr = d[6]
    rr = []

    # Possibly bytes 9..23 contain HRM 12bit data
    # Here we print out only 16..23 which are filled at low pulse
    x = unpack("!QL", d[16:28])
    n =x[0]
    for i in xrange(0,60,12):
        ms = n>>i & 0xfff
        if ms!=0 or i==0: # first can be 0
            rr.append(ms)
    unknown =  (x[1]>>8)

    # if two consecutive lines differ only by flags then we skip it

    drop =  flags != previousHR[0] and previousHR[1:] == [rr, x[1]]
    previousHR = [flags, rr, x[1]]
    return ( None if drop else (hr, rr, unknown) )

def scan():

    startTime = time.time()
    currentRR = 0
    sys.stdout.write("time RR pulse HRV\n")
    while 1:
        try:
            rawData = dev.read(EP_IN, 33, timeout=5000)
            if not rawData:
                break
            timestamp = "%.3f" % (time.time()-startTime)

            data = filterAndParse(rawData)
            if not data:
                continue

            rr = data[1]               # rr values

            # check for a continuation
            hr = 60000./data[0]        # calculate RR from average HR (data[0])
            if (currentRR/hr > 0.8) and (rr[0]/hr > 0.8):
                currentRR = 0
            currentRR += rr[0]

            for i in xrange(1,len(rr)):
                sys.stdout.write("%s %d %d %d\n" % (timestamp, currentRR, data[0], data[2]))
                sys.stdout.flush()
                currentRR = rr[i]

        except usb.core.USBError:
            print >> sys.stderr, "timeout"
            break

def main():

    findDev()
    openDev()
    scan()
    resetDev()

if __name__ == '__main__':
    main()

For visualization i use kst2/ because it nicely scrolls in real time.

A script to start Windlink and kst2 together:

#!/bin/zsh

W='data/hr'

HR=$W.`date +%FT%H:%M`
HR_ERR=${W}_err.`date +%FT%H:%M`
~/projects/usb/polar/polar.py >$HR 2>$HR_ERR &

if [ $? -ne 0 ]; then; exit; fi
sleep 5
kst2 -F $HR polar.kst

pkill polar.py

and it looks like this:

hr

I have Polar WindLink, Nike Sportband, Omron HEM-7301 and Beurer BG64 connected to my Linux laptop. Because all these are HID devices i used for this libusb and small python scripts. The intention is to describe them here starting from the simplest: BG64.