import logging
import numpy as np
import scipy.signal as ssignal
import matplotlib.pyplot as plt
#import scipy.interpolate as sinterp
import skcomm as skc
logging.basicConfig()
skc_log = logging.getLogger('skcomm')
skc_log.setLevel(logging.INFO)
plt.ion()
############### PARAMS ########################
symbol_rate = 32e9
sample_rate_tx = 2*symbol_rate #16e9#12.8e9
sample_rate_rx = 2*symbol_rate
mod_order = 4
roll_off = 0.1
linewidth = 200e3 * 1
f_offset = 300e6 * 1
cfo_comp = True
snr_seed_X = 123
snr_seed_Y = 456
SNR = 10
# dbg_plt = False
dbg_plt = False
dbg_dim = 0 # starts with 0, # -1 means last dim
dbg_frame = 0 # starts with 0; -1 means last frame
snrs = np.arange(0.0,1.0,1.0) + SNR #np.arange(-5,15,1)#np.arange(2,13,2.0) #np.arange(10,11,1.0)#np.arange(0,13,1.0) #np.arange(5,6,1.0)#np.arange(100,101,1)#
bersX = np.ones_like(snrs,dtype=np.float64)
bersY = np.ones_like(snrs,dtype=np.float64)
for snr_idx, snr in enumerate(snrs):
skc_log.info(f'\n============\n SNR: {snr}')
############## TRANSMITTER ############################################################################################################################################
# contruct signal
sig_tx = skc.signal.Signal(n_dims=2)
sig_tx.symbol_rate = symbol_rate
# generate bits
sig_tx.generate_bits(n_bits=int(2**15*np.log2(mod_order)), seed=[1975,1979])
# sig_tx.bits=[np.tile(True,int(2**15*np.log2(mod_order))),np.tile(True,int(2**15*np.log2(mod_order)))]
# set constellation (modulation format)
sig_tx.generate_constellation(format='QAM', order=mod_order)
# create symbols
sig_tx.mapper()
sig_tx.samples = sig_tx.symbols[:]
sig_tx.sample_rate = sig_tx.symbol_rate[:]
ce_seed = [165,165] #[833,833] # equal seeds for PAPR optimization
pilot_offset = [0,1]
for dim in np.arange(sig_tx.n_dims):
# insert frame sync header
sig_tx = skc.tx.insert_frame_sync_seq(sig_tx, dim=dim, n_symb=32, fs_header_len=1024,
mean_power=1*np.mean(np.abs(sig_tx.constellation[dim])**2),
seed=10+dim)
# # insert CE header
n_fft = 128; n_pilots=64; n_repetitions=16
# n_fft = 8; n_pilots= 4; n_repetitions = 8
sig_tx = skc.tx.insert_ch_est_seq(sig_tx, dim=dim, offset=sig_tx.info[dim]['fs_header']['length'],
n_fft=n_fft, n_pilots=n_pilots, pilot_offset=pilot_offset[dim], n_repetitions=n_repetitions,
mean_power=1*np.mean(np.abs(sig_tx.constellation[dim])**2), seed=ce_seed[dim])
# insert CPE pilots
sig_tx = skc.tx.insert_cpe_pilots(sig_tx, dim=dim, offset=sig_tx.info[dim]['fs_header']['length']+sig_tx.info[dim]['ce_header']['length'],
pilot_distance=100, mean_power=1*np.max(np.abs(sig_tx.constellation[dim])**2),
seed=12+dim)
skc_log.info(f"""header lengths:
header length: {sig_tx.info[dim]['fs_header']['length']+sig_tx.info[dim]['ce_header']['length']:d} symbols,
overall frame length: {sig_tx.samples[dim].size:d} symbols,
overhead: {(sig_tx.info[dim]['fs_header']['length']+sig_tx.info[dim]['ce_header']['length'])/sig_tx.samples[dim].size*100:.2f}%""")
# resample to AWG (Tx) sample rate
upsam = sample_rate_tx/symbol_rate
sr = symbol_rate * upsam
sig_tx.samples[dim] = skc.tx.pulseshaper(sig_tx.samples[dim], upsampling=upsam, pulseshape='rrc', roll_off=roll_off)
sig_tx.sample_rate[dim] = sample_rate_tx
# emulate DAC (quantization and normalization)
n_quantize = None
if n_quantize:
maxVal = np.max(np.abs(np.concatenate((np.real(sig_tx.samples[dim]), np.imag(sig_tx.samples[dim])))))
sig_tx.samples[dim] = sig_tx.samples[dim]/maxVal
# scale i and Q independently ("Load and Norm")???
maxVal_r = np.max(np.abs(np.real(sig_tx.samples[dim])))
maxVal_i = np.max(np.abs(np.imag(sig_tx.samples[dim])))
maxVal = np.max([maxVal_r,maxVal_i])
sig_tx.samples[dim] = sig_tx.samples[dim].real/maxVal + 1j*sig_tx.samples[dim].imag/maxVal
sig_tx.samples[dim] = sig_tx.samples[dim] * 2**(n_quantize-1)
sig_tx.samples[dim] = np.round(sig_tx.samples[dim])
sig_tx.samples[dim] = sig_tx.samples[dim] / 2**(n_quantize-1)
# repeat signal (emulate Scope)
repetitions = 4.2 #4.2 # TODO: non-cyclic shift
rem_samples = int((repetitions%1)*sig_tx.samples[dim].size)
sig_tx.samples[dim] = np.concatenate((np.tile(sig_tx.samples[dim], int(repetitions//1)), sig_tx.samples[dim][:rem_samples]), axis=0)
if dbg_plt:
skc.visualizer.plot_signal(np.abs(sig_tx.samples[dbg_dim]), sample_rate=sig_tx.sample_rate[0], fNum=205,tit=f'Signal before channel (abs) (dim = {dbg_dim})')
sig_tx.plot_spectrum(fNum=206,dimension=dbg_dim, tit=f'Spectrum before channel (dim = {dbg_dim})')
############## CHANNEL ############################################################################################################################################
# b, a = ssignal.bessel(5, 6e9, btype='low', analog=False, output='ba', norm='phase', fs=sample_rate_tx)
# signal = ssignal.lfilter(b,a,signal)
sig_tx.samples[0] = skc.filters.filter_arbitrary(sig_tx.samples[0], FILTER=np.asarray([[0, 6.4e9, 10e9],[0, 0*-1, 0*-5],[0,0* -90,0*-180]]).T,sample_rate=sample_rate_tx)
sig_tx.samples[1] = skc.filters.filter_arbitrary(sig_tx.samples[1], FILTER=np.asarray([[0, 6.4e9, 10e9],[0, 0*-1, 0*-5],[0,0*-180,-0*370]]).T,sample_rate=sample_rate_tx)
# (pol rotation): linear unitary combination of X and Y pol
rng = np.random.default_rng(seed=None)
alpha = 0 * np.pi/180 #1*0.5*np.pi/2 #0*rng.random()*2*np.pi*1 #4*22.5
phi = 0 * rng.random()*2*np.pi*1
tmp = sig_tx.samples[0]
sig_tx.samples[0] = np.cos(alpha) * sig_tx.samples[0]*np.exp(-1j*phi) + np.sin(alpha)*sig_tx.samples[1]
sig_tx.samples[1] = -np.sin(alpha)*tmp + np.cos(alpha)*sig_tx.samples[1]*np.exp(1j*phi)
# add noise
sig_tx.samples[0] = skc.channel.set_snr(sig_tx.samples[0],snr_dB=snr,sps=upsam,seed=snr_seed_X)
sig_tx.samples[1] = skc.channel.set_snr(sig_tx.samples[1],snr_dB=snr,sps=upsam,seed=snr_seed_Y)
## noise as CW tones on frequency sub-grid of repeated pilot tones => doesn't work :(
# rng = np.random.default_rng(seed=123)
# noise_0 = np.fft.ifft(sig_tx.samples[0].std()/10**(snr/20) * np.exp(1j*2*np.pi*np.random.uniform(low=0,high=1,size=sig_tx.samples[0].shape))) * np.sqrt(sig_tx.samples[0].size * sig_tx.sample_rate[0]/sig_tx.symbol_rate[0])
# sig_tx.samples[0] += noise_0
# rng = np.random.default_rng(seed=345)
# noise_1 = np.fft.ifft(sig_tx.samples[1].std()/10**(snr/20) * np.exp(1j*2*np.pi*np.random.uniform(low=0,high=1,size=sig_tx.samples[1].shape))) * np.sqrt(sig_tx.samples[1].size * sig_tx.sample_rate[1]/sig_tx.symbol_rate[1])
# sig_tx.samples[1] += noise_1
# add CFO
if f_offset != 0:
sig_tx.samples[0] = skc.channel.add_frequency_offset(sig_tx.samples[0],sample_rate=sr,f_offset=f_offset)
sig_tx.samples[1] = skc.channel.add_frequency_offset(sig_tx.samples[1],sample_rate=sr,f_offset=f_offset)
if linewidth != 0.0:
#seed = np.random.randint(low=1, high=1000)
seed = 789
pn_res = skc.channel.add_phase_noise(sig_tx.samples[0],s_rate=sr, linewidth=linewidth, seed=seed)
sig_tx.samples[0] = pn_res['samples']
# pn_signature = pn_res['phaseAcc']
pn_res = skc.channel.add_phase_noise(sig_tx.samples[1],s_rate=sr, linewidth=linewidth, seed=seed)
sig_tx.samples[1] = pn_res['samples']
# pn_signature = pn_res['phaseAcc']
if dbg_plt:
sig_tx.plot_spectrum(fNum=211,dimension=dbg_dim, tit=f'Spectrum after channel (dim = {dbg_dim})')
# add delay
channel_delay_0 = 150
sig_tx.samples[0] = np.roll(sig_tx.samples[0], channel_delay_0)
channel_delay_1 = 200
sig_tx.samples[1] = np.roll(sig_tx.samples[1], channel_delay_1)
############## RECEIVER ############################################################################################################################################
sig_rx = sig_tx.copy()
# # normalize to mean constellation
# sig_rx.samples[0] = sig_rx.samples[0] * np.sqrt(np.mean(np.abs(sig_rx.constellation[0])**2)/np.mean(np.abs(sig_rx.samples[0])**2))
# sig_rx.samples[1] = sig_rx.samples[1] * np.sqrt(np.mean(np.abs(sig_rx.constellation[1])**2)/np.mean(np.abs(sig_rx.samples[1])**2))
# resample to 2 samples per symbol (Rx sample rate)
upsam = sample_rate_rx/sample_rate_tx
sr = sr * upsam
for dim in np.arange(sig_rx.n_dims):
sig_rx.samples[dim] = ssignal.resample(sig_rx.samples[dim],int(sig_rx.samples[dim].size*upsam))
sig_rx.sample_rate[dim] = sample_rate_rx
if dbg_plt:
sig_rx.plot_spectrum(fNum=251, dimension=dbg_dim, tit=f'Spectrum after Rx resampling (dimension {dbg_dim})')
frame_len = sig_rx.info[0]['fs_header']['length'] + sig_rx.info[0]['ce_header']['length'] + sig_rx.symbols[0].size + sig_rx.info[0]['cpe_pilot_info']['cpe_pilots'].size
results_da_EQ = skc.rx.da_equalizer(sig_tx, sig_rx, cfo_comp=cfo_comp, decimate=True, dbg_plt=dbg_plt, dbg_frame=dbg_frame, dbg_dim=dbg_dim)
sig_rx = results_da_EQ['sig_rx']
# exit()
# pilot_based - CPE
sig_rx, _ = skc.rx.pilot_based_cpe_wrapper(sig_rx, pilot_average=5, tap_len=128, test_phases=91, remove_filter_effects=True, debug=False, fNum=4000)
if dbg_plt:
modMax = np.max(np.concatenate((np.abs(sig_rx.constellation[0].real), np.abs(sig_rx.constellation[0].imag))))
# skc.visualizer.plot_constellation(sig_rx.samples[dbg_dim],fNum=530,hist=True,axMax=modMax*1.5,tit=f'received equalized constellation after CPE (dimension {dbg_dim})')
skc.visualizer.plot_constellation(sig_rx.samples[0],fNum=530,hist=True,axMax=modMax*1.5,tit=f'received equalized constellation after CPE (X Pol)')
skc.visualizer.plot_constellation(sig_rx.samples[1],fNum=531,hist=True,axMax=modMax*1.5,tit=f'received equalized constellation after CPE (Y Pol)')
# delay and phase ambiguity estimation and compensation --> only necessary if signal is cropped after pilot based CPE in line 213 (sig_rx.samples[dim] = sig_rx.samples[dim][filter_len:-filter_len])
# sig_rx = skc.rx.symbol_sequence_sync(sig_rx, dimension=-1)["sig"]
evm = skc.utils.calc_evm(sig_rx)
skc_log.info(f'EVM X: {evm[0]*100:.2f}%, EVM Y: {evm[1]*100:.2f}%')
# decision and demapper
sig_rx.decision()
sig_rx.demapper()
# BER counting
ber_res_X = skc.rx.count_errors(sig_rx.bits[0], sig_rx.samples[0])
ber_res_Y = skc.rx.count_errors(sig_rx.bits[1], sig_rx.samples[1])
skc_log.info(f"BER X = {ber_res_X['ber']:.2e} ({ber_res_X['err_idx'].sum():d} error(s)), BER Y = {ber_res_Y['ber']:.2e} ({ber_res_Y['err_idx'].sum():d} error(s)), mean BER = {(ber_res_X['ber']+ber_res_Y['ber'])/2:.2e} ({ber_res_X['err_idx'].sum()+ber_res_Y['err_idx'].sum():d} error(s))")
if dbg_plt:
plt.figure(654448)
if dbg_dim == 0:
plt.plot(ber_res_X['err_idx'][::int(np.log2(mod_order))],label='BER error idx X')
elif dbg_dim==1:
plt.plot(ber_res_Y['err_idx'][::int(np.log2(mod_order))],label='BER error idx Y')
plt.axvline(sig_tx.symbols[dbg_dim].size,ls='--')
plt.axvline(sig_tx.symbols[dbg_dim].size*2,ls='--')
plt.legend()
monitor_num=0
# skc.visualizer.place_figures(monitor_num=monitor_num)
bersX[snr_idx] = ber_res_X['ber']
bersY[snr_idx] = ber_res_Y['ber']
plt.figure(600)
plt.semilogy(snrs, bersX, 'o', label='BER X')
plt.semilogy(snrs, bersY, 'o', label='BER Y')
plt.semilogy(snrs, skc.utils.ber_awgn(value=snrs, modulation_format='QAM', modulation_order=mod_order, type='SNR',
symbol_rate=symbol_rate, PDM=False), label='theory')
plt.legend()
plt.grid(visible=True,which='both')
plt.ylim([1e-6, 1])
plt.figure(601)
plt.plot(snrs,skc.utils.ber2q(bersX), 'o', label='Q²_dB (X)')
plt.plot(snrs,skc.utils.ber2q(bersY), 'o', label='Q²_dB (Y)')
plt.plot(snrs,skc.utils.ber2q(skc.utils.ber_awgn(value=snrs, modulation_format='QAM', modulation_order=mod_order, type='SNR',
symbol_rate=symbol_rate, PDM=False)), label='theory')
plt.legend()
plt.grid(visible=True,which='both')
plt.ylim([0, skc.utils.ber2q(1e-6)])