Experimental setup for a coherent optical homodyne (intradyne) transmission
[ ]:
import time, copy
import numpy as np
import scipy.signal as ssignal
import matplotlib.pyplot as plt
import scipy.interpolate as sinterp
try:
# if you installed skcomm with pypi
import skcomm as sk
except:
# if you like to use skcomm directly from source.
# Please note: If you want to import skcomm into your file using this snippet, you must ensure that the file is on a directional level below skcomm.
import sys, os
current_parent_folder = os.path.abspath('..')
if not current_parent_folder in sys.path:
sys.path.append(os.path.join(current_parent_folder))
import skcomm as skc
[ ]:
symbol_rate = 12.8e9
n_bits = 2**15
modulation_order = 4
roll_off = 0.1
dac_sr = 16e9
upload_samples = False
hold_shot = False
use_predistortion = False
sinc_correction = True
[ ]:
# contruct signal
sig_tx = skc.signal.Signal(n_dims=1)
sig_tx.symbol_rate = symbol_rate
tx_upsampling = dac_sr / sig_tx.symbol_rate[0]
# generate bits
sig_tx.generate_bits(n_bits=n_bits, seed=1)
# set constellation (modulation format)
sig_tx.generate_constellation(format='QAM', order=modulation_order)
# create symbols
sig_tx.mapper()
# upsampling and pulseshaping
sig_tx.pulseshaper(upsampling=tx_upsampling, pulseshape='rrc', roll_off=[roll_off])
# sinc correction
if sinc_correction:
sig_tx.samples[0] = skc.pre_distortion.dac_sinc_correction(sig_tx.samples[0],
f_max=1.0)
# pre-equalization of AWG frequency response
if use_predistortion:
#filtershape = np.load('setup_files/preDistFilter.npy')
filtershape = np.transpose(np.array([[-16e9, -8e9, -7.8e9, 0, 7.8e9, 8e9, 16e9], [-100, -100, 2, 0 , 2,-100,-100]]))
sig_tx.samples[0] = skc.filters.filter_arbitrary(sig_tx.samples[0],
filtershape,
sample_rate=sig_tx.symbol_rate[0]*tx_upsampling)
# format samples so that driver can handle them (range +-1)
maxVal = np.max(np.abs(np.concatenate((np.real(sig_tx.samples), np.imag(sig_tx.samples)))))
samples = np.asarray(sig_tx.samples) / maxVal
samples = np.concatenate((np.real(samples), np.imag(samples)))
Upload signal to arbitrary signal generator (AWG), E/O conversion, transmission, O/E conversion, sampling and download samples fron digital real-time oszilloscope
[ ]:
if upload_samples:
# write samples to AWG
skc.instrument_control.write_samples_Tektronix_AWG70002B(samples, ip_address='192.168.1.21',
sample_rate=[dac_sr], amp_pp=[0.25, 0.25],
channels=[1, 2], log_mode = False)
time.sleep(2.0)
if not hold_shot:
# get samples from scope
sr, samples = skc.instrument_control.get_samples_Tektronix_MSO6B(channels=[1, 2],
ip_address='192.168.1.20',
number_of_bytes = 1,
log_mode = False)
tmp_shot = copy.deepcopy(samples)
else:
samples = copy.deepcopy(tmp_shot) # see hint at https://github.com/spyder-ide/spyder/issues/11558
# or, dump to and load from file:
# see https://stackoverflow.com/questions/4530611/saving-and-loading-objects-and-using-pickle/4531859
# import pickle
# with open('tmp_shot.obj', 'wb') as ts:
# pickle.dump(tmp_shot, ts)
# with open('tmp_shot.obj', 'rb') as ts:
# tmp_shot = pickle.load(ts)
# # I-Q imbalance correction (for suppressing the DD-term for high SSI-suppression)
# IQ_imbalance = -0.45 # [dB] (a pos. value means channel I is smaller than channel Q after coh. frontend, and v.v.)
# samples[0] = samples[0]*10**(0.5*IQ_imbalance/20)
# samples[1] = samples[1]*10**(-0.5*IQ_imbalance/20)
# # remove mean of signal
# samples = samples - np.mean(samples)
# construct complex signal
samples = samples[0] + 1j * samples[1]
samples = samples - np.mean(samples)
Receiver
Resampling
[ ]:
# contruct rx signal structure
sig_rx = copy.deepcopy(sig_tx)
sig_rx.samples = samples
sig_rx.sample_rate = sr
sig_rx.plot_spectrum(tit='spectrum from scope',fNum=1)
# From here: "standard" coherent complex baseband signal processing ############
# resample to 2 sps
sps_new = 2
sps = sig_rx.sample_rate[0]/sig_rx.symbol_rate[0]
new_length = int(sig_rx.samples[0].size/sps*sps_new)
sig_rx.samples = ssignal.resample(sig_rx.samples[0], new_length, window='boxcar')
sig_rx.sample_rate = sps_new*sig_rx.symbol_rate[0]
sig_rx.plot_spectrum(tit='spectrum after resampling',fNum=2)
Estimate SNR
[ ]:
# estimate SNR
sig_range = np.asarray([-1, 1])*sig_rx.symbol_rate[0]/2*(1+roll_off)
noise_range = np.asarray([-1.1, -1.05, 1.05, 1.1]) * sig_rx.symbol_rate[0]/2 * (1+roll_off)
fft_temp = np.fft.fft(sig_rx.samples[0])/sig_rx.samples[0].size
RBW = sig_rx.sample_rate[0]/sig_rx.samples[0].size
spec = (np.abs(np.fft.fftshift(fft_temp))**2)/RBW # -> [W/Hz]
freq = np.fft.fftshift(np.fft.fftfreq(sig_rx.samples[0].size, 1/sig_rx.sample_rate[0]))
snr = skc.utils.estimate_snr_spectrum(freq, spec, sig_range=sig_range,
noise_range=noise_range, order=1,
noise_bw=sig_rx.symbol_rate[0],
scaling='lin', plotting=True, fNum=3)["snr_dB"]
print('est. SNR (from spectrum): {:.1f} dB'.format(snr))
Frequency offset estimation / correction
[ ]:
# frequency offset estimation / correction
results_foe = skc.rx.frequency_offset_correction(sig_rx.samples[0],
sample_rate=sig_rx.sample_rate[0],
symbol_rate=sig_rx.symbol_rate[0])
sig_rx.samples[0] = results_foe['samples_corrected']
print('estimated frequency offset: {:.0f} MHz'.format(results_foe['estimated_fo']/1e6))
# normalize samples to mean magnitude of original constellation
mag_const = np.mean(abs(sig_rx.constellation[0]))
mag_samples= np.mean(abs(sig_rx.samples[0]))
sig_rx.samples = sig_rx.samples[0] * mag_const / mag_samples
sig_rx.plot_constellation(hist=True, tit='constellation before EQ', fNum=4)
Equalizer / matched filter
[ ]:
adaptive_filter = True
# either blind adaptive filter....
if adaptive_filter is True:
results = skc.rx.blind_adaptive_equalizer(sig_rx, n_taps=51, mu_cma=1e-4,
mu_rde=1e-5, mu_dde=0.5, decimate=True,
return_info=True, stop_adapting=-1,
start_rde=5000*0, start_dde=5000*0,
exec_mode='auto')
sig_rx = results['sig']
h = results['h'][0]
eps = results['eps'][0]
# plot error evolution
f = plt.figure(5)
f.clear()
plt.plot(np.abs(eps))
plt.title('evolution of equalizer error')
plt.xlabel('time / symbols')
plt.ylabel('error /a.u.')
plt.grid(visible=True)
plt.show()
# plot evolution of filters frequency response
f = plt.figure(6)
f.clear()
ax = plt.subplot(projection='3d')
f = np.fft.fftshift(np.fft.fftfreq(h[0,:].size, d=1/sig_rx.sample_rate[0]))
outsymbs = [0, 1000, 5000, 10000, 20000, 30000, h[:,0].size-1]
for outsymb in outsymbs:
plt.plot(f, np.ones(f.size)*outsymb, np.abs(np.fft.fftshift(np.fft.fft(h[int(outsymb),:]))))
plt.title('evolution of equalizer frequency response')
plt.xlabel('frequency / Hz')
plt.ylabel('time / symbols')
plt.grid(visible=True)
plt.show()
# plot last filter frequency response
f = plt.figure(7)
f.clear()
f = np.fft.fftshift(np.fft.fftfreq(h[0,:].size, d=1/sig_rx.sample_rate[0]))
plt.plot(f, np.abs(np.fft.fftshift(np.fft.fft(h[-1,:]))))
plt.title('last equalizer frequency response')
plt.xlabel('frequency / Hz')
plt.ylabel('amplitude a.u.')
plt.grid(visible=True)
plt.show()
# cut away init symbols
sps = int(sig_rx.sample_rate[0]/sig_rx.symbol_rate[0])
cut = 10000
sig_rx.samples = sig_rx.samples[0][int(cut)*sps:]
# ... or matched filtering
else:
# Rx matched filter
sig_rx.raised_cosine_filter(roll_off=roll_off,root_raised=True)
# crop samples here, if necessary
sps = int(sig_rx.sample_rate[0] / sig_rx.symbol_rate[0])
crop = 10*sps
if crop != 0:
sig_rx.samples = sig_rx.samples[0][crop:-crop]
else:
sig_rx.samples = sig_rx.samples[0]
# sampling phase / clock adjustment
BLOCK_SIZE = -1 # size of one block in SYMBOLS... -1 for only one block
sig_rx.sampling_clock_adjustment(BLOCK_SIZE)
Decimation
[ ]:
# sampling (if necessary)
START_SAMPLE = 0
sps = sig_rx.sample_rate[0] / sig_rx.symbol_rate[0] # CHECK FOR INTEGER SPS!!!
sig_rx.samples = sig_rx.samples[0][START_SAMPLE::int(sps)]
sig_rx.plot_constellation(0, hist=True, tit='constellation after EQ',fNum=8)
Carrier phase estimation / recovery
[ ]:
# CPE
viterbi = True
# ...either VV
if viterbi:
cpe_results = skc.rx.carrier_phase_estimation_VV(sig_rx.samples[0], n_taps=31,
filter_shape='wiener', mth_power=4,
rho=.001)
sig_rx.samples = cpe_results['rec_symbols']
est_phase = cpe_results['phi_est'].real
# ...or BPS
else:
cpe_results = skc.rx.carrier_phase_estimation_bps(sig_rx.samples[0], sig_rx.constellation[0],
n_taps=15, n_test_phases=45, const_symmetry=np.pi/2,
exec_mode='auto')
sig_rx.samples = cpe_results['samples_corrected']
est_phase = cpe_results['est_phase_noise']
f = plt.figure(9)
f.clear()
plt.plot(est_phase)
plt.title('estimated phase noise')
plt.xlabel('time / symbols')
plt.ylabel('phase / rad')
plt.grid(visible=True)
plt.show()
sig_rx.plot_constellation(hist=True, tit='constellation after CPE',fNum=10)
Synchronization and estimation of quality metric
[ ]:
# delay and phase ambiguity estimation and compensation
sig_rx = skc.rx.symbol_sequence_sync(sig_rx, dimension=-1)["sig"]
# calc EVM
evm = skc.utils.calc_evm(sig_rx, norm='max')
print("EVM: {:2.2%}".format(evm[0]))
# estimate SNR
snr = skc.utils.estimate_SNR_evm(sig_rx, norm='rms', method='data_aided', opt=False)
print("est. SNR: {:.2f} dB".format(snr[0]))
# decision and demapper
sig_rx.decision()
sig_rx.demapper()
# BER counting
ber_res = skc.rx.count_errors(sig_rx.bits[0], sig_rx.samples[0])
print('BER = {:.2e}'.format(ber_res['ber']))