Python
parseFeitCSI.py
'''
Function which parse FeitCSI data.
usage: parseFeitCSI("path/to/FeitCSI/file")
returned data structure sample:
[
{
header: {
"csi_size":416,
"ftm_clock":144084200,
"num_rx":2,
"num_tx":1,
"num_subcarriers":52,
"rssi1":67,
"rssi2":68,
"source_mac":(0, 7, 14, 124, 232, 48),
"source_mac_string":"00:07:0e:7c:e8:30",
"rate_flags":49415,
"rate_format":"LEGACY_OFDM",
"channel_width":"20",
"mcs":7,
"antenna_a":true,
"antenna_b":true,
"ldpc":false,
"ss":1,
"beamforming":false
},
data: complex matrix[num_subcarriers, num_rx, num_tx]
},
{
header: ...,
data: ...
},
...
]
'''
import numpy as np
import struct
def parseFeitCSI(fileName):
RATE_MCS_MOD_TYPE_POS = 8
RATE_MCS_MOD_TYPE_MSK = (0x7 << RATE_MCS_MOD_TYPE_POS)
RATE_MCS_CCK_MSK = (0 << RATE_MCS_MOD_TYPE_POS)
RATE_MCS_LEGACY_OFDM_MSK = (1 << RATE_MCS_MOD_TYPE_POS)
RATE_MCS_HT_MSK = (2 << RATE_MCS_MOD_TYPE_POS)
RATE_MCS_VHT_MSK = (3 << RATE_MCS_MOD_TYPE_POS)
RATE_MCS_HE_MSK = (4 << RATE_MCS_MOD_TYPE_POS)
RATE_MCS_EHT_MSK = (5 << RATE_MCS_MOD_TYPE_POS)
RATE_MCS_CHAN_WIDTH_POS = 11
RATE_MCS_CHAN_WIDTH_MSK = (0x7 << RATE_MCS_CHAN_WIDTH_POS)
RATE_MCS_CHAN_WIDTH_20_VAL = 0
RATE_MCS_CHAN_WIDTH_20 = (RATE_MCS_CHAN_WIDTH_20_VAL << RATE_MCS_CHAN_WIDTH_POS)
RATE_MCS_CHAN_WIDTH_40_VAL = 1
RATE_MCS_CHAN_WIDTH_40 = (RATE_MCS_CHAN_WIDTH_40_VAL << RATE_MCS_CHAN_WIDTH_POS)
RATE_MCS_CHAN_WIDTH_80_VAL = 2
RATE_MCS_CHAN_WIDTH_80 = (RATE_MCS_CHAN_WIDTH_80_VAL << RATE_MCS_CHAN_WIDTH_POS)
RATE_MCS_CHAN_WIDTH_160_VAL = 3
RATE_MCS_CHAN_WIDTH_160 = (RATE_MCS_CHAN_WIDTH_160_VAL << RATE_MCS_CHAN_WIDTH_POS)
RATE_MCS_CHAN_WIDTH_320_VAL = 4
RATE_MCS_CHAN_WIDTH_320 = (RATE_MCS_CHAN_WIDTH_320_VAL << RATE_MCS_CHAN_WIDTH_POS)
RATE_HT_MCS_CODE_MSK = 7
RATE_MCS_ANT_A_POS = 14
RATE_MCS_ANT_A_MSK = (1 << RATE_MCS_ANT_A_POS)
RATE_MCS_ANT_B_POS = 15
RATE_MCS_ANT_B_MSK = (1 << RATE_MCS_ANT_B_POS)
RATE_MCS_LDPC_POS = 16
RATE_MCS_LDPC_MSK = (1 << RATE_MCS_LDPC_POS)
RATE_MCS_SS_POS = 16
RATE_MCS_SS_MSK = (1 << RATE_MCS_SS_POS)
RATE_MCS_BEAMF_POS = 16
RATE_MCS_BEAMF_MSK = (1 << RATE_MCS_BEAMF_POS)
with open(fileName, mode='rb') as file:
def parseHeader(data):
header = {}
header["csi_size"] = struct.unpack("I", data[0:4])[0]
header["ftm_clock"] = struct.unpack("I", data[8:12])[0]
header["num_rx"] = data[46]
header["num_tx"] = data[47]
header["num_subcarriers"] = struct.unpack("I", data[52:56])[0]
header["rssi1"] = struct.unpack("I", data[60:64])[0]
header["rssi2"] = struct.unpack("I", data[64:68])[0]
header["source_mac"] = struct.unpack("BBBBBB", data[68:74])
header["source_mac_string"] = "%02x:%02x:%02x:%02x:%02x:%02x" % struct.unpack("BBBBBB", data[68:74])
header["rate_flags"] = struct.unpack("I", data[92:96])[0]
rate_format = header["rate_flags"] & RATE_MCS_MOD_TYPE_MSK
if rate_format == RATE_MCS_CCK_MSK:
rate_format = "CCK"
elif rate_format == RATE_MCS_LEGACY_OFDM_MSK:
rate_format = "LEGACY_OFDM"
elif rate_format == RATE_MCS_VHT_MSK:
rate_format = "VHT"
elif rate_format == RATE_MCS_HT_MSK:
rate_format = "HT"
elif rate_format == RATE_MCS_HE_MSK:
rate_format = "HE"
elif rate_format == RATE_MCS_EHT_MSK:
rate_format = "EHT"
else:
rate_format = "unknown"
header["rate_format"] = rate_format
channel_width = header["rate_flags"] & RATE_MCS_CHAN_WIDTH_MSK
if channel_width == RATE_MCS_CHAN_WIDTH_20:
channel_width = "20"
elif channel_width == RATE_MCS_CHAN_WIDTH_40:
channel_width = "40"
elif channel_width == RATE_MCS_CHAN_WIDTH_80:
channel_width = "80"
elif channel_width == RATE_MCS_CHAN_WIDTH_160:
channel_width = "160"
elif channel_width == RATE_MCS_CHAN_WIDTH_320:
channel_width = "320"
else:
channel_width = "unknown"
header["channel_width"] = channel_width
header["mcs"] = header["rate_flags"] & RATE_HT_MCS_CODE_MSK
header["antenna_a"] = True if header["rate_flags"] & RATE_MCS_ANT_A_MSK else False
header["antenna_b"] = True if header["rate_flags"] & RATE_MCS_ANT_B_MSK else False
header["ldpc"] = True if header["rate_flags"] & RATE_MCS_LDPC_MSK else False
header["ss"] = 2 if header["rate_flags"] & RATE_MCS_SS_MSK else 1
header["beamforming"] = True if header["rate_flags"] & RATE_MCS_BEAMF_MSK else False
return header
def parseCsiData(data, header):
csi_matrix = np.zeros((header["num_subcarriers"], header["num_rx"], header["num_tx"]), dtype=complex)
pos = 0
for j in range(header["num_rx"]):
for k in range(header["num_tx"]):
for n in range(header["num_subcarriers"]):
real = struct.unpack("h", data[pos:pos + 2])[0]
imag = struct.unpack("h", data[pos + 2:pos + 4])[0]
pos += 4
csi_matrix[n, j, k] = complex(real, imag)
return csi_matrix
fileContent = file.read()
step = 0
output = []
while (len(fileContent) > step):
data = {}
data["header"] = parseHeader(fileContent[step:(step+272)])
step += 272
data["csi_matrix"] = parseCsiData(fileContent[step:(step + data["header"]["csi_size"])], data["header"])
step += data["header"]["csi_size"]
output.append(data)
return output