Skip to content

Python

Script for parsing CSI

parseFeitCSI.py
'''
Function which parse FeitCSI data.
usage: parseFeitCSI("path/to/FeitCSI/file")
returned data structure sample:
[
    {
        header: {
            "csi_size":416,
            "ftm_clock":144084200,
            "timestamp":1701251234567890,
            "num_rx":2,
            "num_tx":1,
            "num_subcarriers":52,
            "rssi1":67,
            "rssi2":68,
            "source_mac":(0, 7, 14, 124, 232, 48),
            "source_mac_string":"00:07:0e:7c:e8:30",
            "rate_flags":49415,
            "rate_format":"LEGACY_OFDM",
            "channel_width":"20",
            "mcs":7,
            "antenna_a":true,
            "antenna_b":true,
            "ldpc":false,
            "ss":1,
            "beamforming":false
        },
        data: complex matrix[num_subcarriers, num_rx, num_tx]
    },
    {
        header: ...,
        data: ...
    }, 
    ...
]
'''
import numpy as np
import struct


def parseFeitCSI(fileName):
    RATE_MCS_MOD_TYPE_POS = 8
    RATE_MCS_MOD_TYPE_MSK = (0x7 << RATE_MCS_MOD_TYPE_POS)
    RATE_MCS_CCK_MSK = (0 << RATE_MCS_MOD_TYPE_POS)
    RATE_MCS_LEGACY_OFDM_MSK = (1 << RATE_MCS_MOD_TYPE_POS)
    RATE_MCS_HT_MSK = (2 << RATE_MCS_MOD_TYPE_POS)
    RATE_MCS_VHT_MSK = (3 << RATE_MCS_MOD_TYPE_POS)
    RATE_MCS_HE_MSK = (4 << RATE_MCS_MOD_TYPE_POS)
    RATE_MCS_EHT_MSK = (5 << RATE_MCS_MOD_TYPE_POS)
    RATE_MCS_CHAN_WIDTH_POS = 11
    RATE_MCS_CHAN_WIDTH_MSK = (0x7 << RATE_MCS_CHAN_WIDTH_POS)
    RATE_MCS_CHAN_WIDTH_20_VAL = 0
    RATE_MCS_CHAN_WIDTH_20 = (RATE_MCS_CHAN_WIDTH_20_VAL << RATE_MCS_CHAN_WIDTH_POS)
    RATE_MCS_CHAN_WIDTH_40_VAL = 1
    RATE_MCS_CHAN_WIDTH_40 = (RATE_MCS_CHAN_WIDTH_40_VAL << RATE_MCS_CHAN_WIDTH_POS)
    RATE_MCS_CHAN_WIDTH_80_VAL = 2
    RATE_MCS_CHAN_WIDTH_80 = (RATE_MCS_CHAN_WIDTH_80_VAL << RATE_MCS_CHAN_WIDTH_POS)
    RATE_MCS_CHAN_WIDTH_160_VAL = 3
    RATE_MCS_CHAN_WIDTH_160 = (RATE_MCS_CHAN_WIDTH_160_VAL << RATE_MCS_CHAN_WIDTH_POS)
    RATE_MCS_CHAN_WIDTH_320_VAL = 4
    RATE_MCS_CHAN_WIDTH_320 = (RATE_MCS_CHAN_WIDTH_320_VAL << RATE_MCS_CHAN_WIDTH_POS)
    RATE_HT_MCS_CODE_MSK = 7
    RATE_MCS_ANT_A_POS = 14
    RATE_MCS_ANT_A_MSK = (1 << RATE_MCS_ANT_A_POS)
    RATE_MCS_ANT_B_POS = 15
    RATE_MCS_ANT_B_MSK = (1 << RATE_MCS_ANT_B_POS)
    RATE_MCS_LDPC_POS = 16
    RATE_MCS_LDPC_MSK = (1 << RATE_MCS_LDPC_POS)
    RATE_MCS_SS_POS = 16
    RATE_MCS_SS_MSK = (1 << RATE_MCS_SS_POS)
    RATE_MCS_BEAMF_POS = 16
    RATE_MCS_BEAMF_MSK = (1 << RATE_MCS_BEAMF_POS)

    with open(fileName, mode='rb') as file:
        def parseHeader(data):
            header = {}
            header["csi_size"] = struct.unpack("I", data[0:4])[0]
            header["ftm_clock"] = struct.unpack("I", data[8:12])[0]
            header["timestamp"] = struct.unpack("Q", data[12:20])[0]
            header["num_rx"] = data[46]
            header["num_tx"] = data[47]
            header["num_subcarriers"] = struct.unpack("I", data[52:56])[0]
            header["rssi1"] = struct.unpack("I", data[60:64])[0]
            header["rssi2"] = struct.unpack("I", data[64:68])[0]
            header["source_mac"] = struct.unpack("BBBBBB", data[68:74])
            header["source_mac_string"] = "%02x:%02x:%02x:%02x:%02x:%02x" % struct.unpack("BBBBBB", data[68:74])
            header["rate_flags"] = struct.unpack("I", data[92:96])[0]

            rate_format = header["rate_flags"] & RATE_MCS_MOD_TYPE_MSK
            if rate_format == RATE_MCS_CCK_MSK:
                rate_format = "CCK"
            elif rate_format == RATE_MCS_LEGACY_OFDM_MSK:
                rate_format = "LEGACY_OFDM"
            elif rate_format == RATE_MCS_VHT_MSK:
                rate_format = "VHT"
            elif rate_format == RATE_MCS_HT_MSK:
                rate_format = "HT"
            elif rate_format == RATE_MCS_HE_MSK:
                rate_format = "HE"
            elif rate_format == RATE_MCS_EHT_MSK:
                rate_format = "EHT"
            else:
                rate_format = "unknown"
            header["rate_format"] = rate_format

            channel_width = header["rate_flags"] & RATE_MCS_CHAN_WIDTH_MSK
            if channel_width == RATE_MCS_CHAN_WIDTH_20:
                channel_width = "20"
            elif channel_width == RATE_MCS_CHAN_WIDTH_40:
                channel_width = "40"
            elif channel_width == RATE_MCS_CHAN_WIDTH_80:
                channel_width = "80"
            elif channel_width == RATE_MCS_CHAN_WIDTH_160:
                channel_width = "160"
            elif channel_width == RATE_MCS_CHAN_WIDTH_320:
                channel_width = "320"
            else:
                channel_width = "unknown"
            header["channel_width"] = channel_width

            header["mcs"] = header["rate_flags"] & RATE_HT_MCS_CODE_MSK
            header["antenna_a"] = True if header["rate_flags"] & RATE_MCS_ANT_A_MSK else False
            header["antenna_b"] = True if header["rate_flags"] & RATE_MCS_ANT_B_MSK else False
            header["ldpc"] = True if header["rate_flags"] & RATE_MCS_LDPC_MSK else False
            header["ss"] = 2 if header["rate_flags"] & RATE_MCS_SS_MSK else 1
            header["beamforming"] = True if header["rate_flags"] & RATE_MCS_BEAMF_MSK else False

            return header

        def parseCsiData(data, header):
            csi_matrix = np.zeros((header["num_subcarriers"], header["num_rx"], header["num_tx"]), dtype=complex)
            pos = 0
            for j in range(header["num_rx"]):
                for k in range(header["num_tx"]):
                    for n in range(header["num_subcarriers"]):
                        real = struct.unpack("h", data[pos:pos + 2])[0]
                        imag = struct.unpack("h", data[pos + 2:pos + 4])[0]
                        pos += 4
                        csi_matrix[n, j, k] = complex(real, imag)
            return csi_matrix

        fileContent = file.read()
        step = 0
        output = []
        while (len(fileContent) > step):
            data = {}
            data["header"] = parseHeader(fileContent[step:(step+272)])
            step += 272
            data["csi_matrix"] = parseCsiData(fileContent[step:(step + data["header"]["csi_size"])], data["header"])
            step += data["header"]["csi_size"]
            output.append(data)
        return output

Script for parsing FTM

parseFeitCSIftm.py
'''
Function which parse FeitCSI FTM data.
usage: parseFeitCSIftm("path/to/FeitCSI/ftm/file")
returned data structure sample:
[
    {
        "burstIndex": 123,
        "numFtmrAttempts": 123,
        "numFtmrSuccesses": 123,
        "numBurstsExp": 123,
        "burstDuration": 123,
        "ftmsPerBurst": 123,
        "rssiAvg": 123,
        "rssiSpread": 123,
        "rttAvg": 123,
        "rttVariance": 123,
        "rttSpread": 123,
        "distAvg": 123,
        "distVariance": 123,
        "distSpread": 123,
        "timestamp": 123,
    },
    {
        FTM data...
    }, 
    ...
]
'''
import numpy as np
import struct


def parseFeitCSIftm(fileName):
    with open(fileName, mode='rb') as file:
        def parseFtmData(data):
            ftmData = {}
            ftmData["burstIndex"] = struct.unpack("I", data[0:4])[0]
            ftmData["numFtmrAttempts"] = struct.unpack("I", data[4:8])[0]
            ftmData["numFtmrSuccesses"] = struct.unpack("I", data[8:12])[0]
            ftmData["numBurstsExp"] = struct.unpack("I", data[12:16])[0]
            ftmData["burstDuration"] = data[16]
            ftmData["ftmsPerBurst"] = data[17]
            ftmData["rssiAvg"] = data[18]
            ftmData["rssiSpread"] = struct.unpack("I", data[19:23])[0]
            ftmData["rttAvg"] = struct.unpack("Q", data[23:31])[0]
            ftmData["rttVariance"] = struct.unpack("Q", data[31:39])[0]
            ftmData["rttSpread"] = struct.unpack("Q", data[39:47])[0]
            ftmData["distAvg"] = struct.unpack("Q", data[47:55])[0]
            ftmData["distVariance"] = struct.unpack("Q", data[55:63])[0]
            ftmData["distSpread"] = struct.unpack("Q", data[63:71])[0]
            ftmData["timestamp"] = struct.unpack("Q", data[71:79])[0]

            return ftmData

        fileContent = file.read()
        step = 0
        output = []
        while (len(fileContent) > step):
            data = parseFtmData(fileContent[step:(step+79)])
            step += 79
            output.append(data)
        return output