Experimental setup for a coherent optical heterodyne transmission
Setup for an offline laboratory experiment, where the elecrical signals are generated in Python, uploaded to an arbitrary waveform generator (AWG), modulated to an optical wave, coherently received, digitized with an real-time oszilloscope and further processed with Python DSP.
Load required packages
[ ]:
import time, copy
import numpy as np
import scipy.signal as ssignal
import matplotlib.pyplot as plt
try:
# if you installed skcomm with pypi
import skcomm as sk
except:
# if you like to use skcomm directly from source.
# Please note: If you want to import skcomm into your file using this snippet, you must ensure that the file is on a directional level below skcomm.
import sys, os
current_parent_folder = os.path.abspath('..')
if not current_parent_folder in sys.path:
sys.path.append(os.path.join(current_parent_folder))
import skcomm as skc
Set parameters
[ ]:
# Tx
# signal parameters
symbol_rate = 3.2e9
n_bits = 2**15
modulation_order = 4
roll_off = 0.1
dac_sr = 16e9
f_if_nom = 2e9
awg_ipaddress = '192.168.1.21'
scope_ipaddress = '192.168.1.20'
upload_samples = True
hold_shot = False
use_predistortion = False
sinc_correction = False
Transmitter
[ ]:
# contruct signal
sig_tx = skc.signal.Signal(n_dims=1)
sig_tx.symbol_rate = symbol_rate
tx_upsampling = dac_sr / sig_tx.symbol_rate[0]
# generate bits
sig_tx.generate_bits(n_bits=n_bits, seed=1)
# set constellation (modulation format)
sig_tx.generate_constellation(format='QAM', order=modulation_order)
# create symbols
sig_tx.mapper()
# upsampling and pulseshaping
sig_tx.pulseshaper(upsampling=tx_upsampling, pulseshape='rrc', roll_off=[roll_off])
# generate DAC samples (analytical signalg at IF)
f_granularity = 1 / sig_tx.samples[0].size * sig_tx.sample_rate[0]
f_if = round(f_if_nom / f_granularity) * f_granularity
print('intermediate frequency: {} MHz'.format(f_if/1e6))
t = np.arange(0, np.size(sig_tx.samples[0])) / sig_tx.sample_rate
# upmixing to IF
sig_tx.samples[0] = sig_tx.samples[0] * np.exp(1j * 2 * np.pi * f_if * t)
sig_tx.center_frequency = f_if
# sinc correction
if sinc_correction:
sig_tx.samples[0] = skc.pre_distortion.dac_sinc_correction(sig_tx.samples[0],
f_max=1.0)
# pre-equalization of AWG frequency response
if use_predistortion:
filtershape = np.load('setup_files/preDistFilter.npy')
sig_tx.samples[0] = skc.filters.filter_arbitrary(sig_tx.samples[0],
filtershape,
sample_rate=sig_tx.symbol_rate[0]*tx_upsampling)
# format samples so that driver can handle them (range +-1)
maxVal = np.max(np.abs(np.concatenate((np.real(sig_tx.samples), np.imag(sig_tx.samples)))))
samples = np.asarray(sig_tx.samples) / maxVal
samples = np.concatenate((np.real(samples), np.imag(samples)))
Channel
Upload signal to arbitrary signal generator (AWG), E/O conversion, transmission, O/E conversion, sampling and download samples fron digital real-time oszilloscope
[ ]:
# Experiment
if upload_samples:
# write samples to AWG
skc.instrument_control.write_samples_Tektronix_AWG70002B(samples, ip_address=awg_ipaddress,
sample_rate=[dac_sr], amp_pp=[0.25, 0.25],
channels=[1, 2], log_mode = False)
time.sleep(2.0)
if not hold_shot:
# get samples from scope
sr, samples = skc.instrument_control.get_samples_Tektronix_MSO6B(channels=[1, 2],
ip_address=scope_ipaddress,
number_of_bytes = 1,
log_mode = False)
tmp_shot = copy.deepcopy(samples)
else:
samples = copy.deepcopy(tmp_shot) # see hint at https://github.com/spyder-ide/spyder/issues/11558
# or, dump to and load from file:
# see https://stackoverflow.com/questions/4530611/saving-and-loading-objects-and-using-pickle/4531859
# import pickle
# with open('tmp_shot.obj', 'wb') as ts:
# pickle.dump(tmp_shot, ts)
# with open('tmp_shot.obj', 'rb') as ts:
# tmp_shot = pickle.load(ts)
Receiver
Preparation of received samples
[ ]:
# subtration of pos. and neg. detector
samples = samples[0] - samples[1]
# remove mean of signal
samples = samples - np.mean(samples)
# Rx
# contruct rx signal structure
sig_rx = copy.deepcopy(sig_tx)
sig_rx.samples = samples
sig_rx.sample_rate = sr
# resampling to the same sample rate as at the transmitter
sr_dsp = sig_tx.sample_rate[0]
# watch out, that this is really an integer, otherwise the samplerate is asynchronous with the data afterwards!!!
len_dsp = sr_dsp / sig_rx.sample_rate[0] * np.size(samples)
if len_dsp % 1:
raise ValueError('DSP samplerate results in asynchronous sampling of the data symbols')
sig_rx.samples = ssignal.resample(sig_rx.samples[0], num=int(len_dsp), window=None)
sig_rx.sample_rate = sr_dsp
sig_rx.plot_spectrum(tit='received spectrum before IF downmixing')
Downmixing from intermediate frequency and resampling
[ ]:
# IQ-Downmixing and (ideal) lowpass filtering
t = skc.utils.create_time_axis(sig_rx.sample_rate[0], np.size(sig_rx.samples[0]))
samples_r = sig_rx.samples[0] * np.cos(2 * np.pi * f_if * t)
fc = sig_tx.symbol_rate[0] / 2 * (1 + roll_off) * 1.1 # cuttoff frequency of filter
fc = fc/(sig_rx.sample_rate[0]/2) # normalization to the sampling frequency
tmp = skc.filters.ideal_lp(samples_r, fc)
samples_r = tmp['samples_out']
samples_i = sig_rx.samples[0] * np.sin(2 * np.pi * f_if * t)
tmp = skc.filters.ideal_lp(samples_i, fc)
samples_i = tmp['samples_out']
sig_rx.samples[0] = samples_r - 1j * samples_i
# From here: "standard" coherent complex baseband signal processing ############
# resample to 2 sps
sps_new = 2
sps = sig_rx.sample_rate[0]/sig_rx.symbol_rate[0]
new_length = int(sig_rx.samples[0].size/sps*sps_new)
sig_rx.samples = ssignal.resample(sig_rx.samples[0], new_length, window='boxcar')
sig_rx.sample_rate = sps_new*sig_rx.symbol_rate[0]
# normalize samples to mean magnitude of original constellation
mag_const = np.mean(abs(sig_rx.constellation[0]))
mag_samples = np.mean(abs(sig_rx.samples[0]))
sig_rx.samples = sig_rx.samples[0] * mag_const / mag_samples
sig_rx.plot_constellation(hist=True, tit='constellation before EQ')
Equalization (or matched filtering)
[ ]:
adaptive_filter = True
# either blind adaptive filter....
if adaptive_filter is True:
results = skc.rx.blind_adaptive_equalizer(sig_rx, n_taps=31, mu_cma=1e-3,
mu_rde=1e-5, mu_dde=0.5, decimate=False,
return_info=True, stop_adapting=-1,
start_rde=5000*0, start_dde=5000*0)
sig_rx = results['sig']
h = results['h'][0]
eps = results['eps'][0]
# plot error evolution
plt.plot(np.abs(eps))
plt.title('evolution of equalizer error')
plt.xlabel('time / symbols')
plt.ylabel('error /a.u.')
plt.show()
# plot last filter frequency response
plt.plot(np.abs(np.fft.fftshift(np.fft.fft(h[-1,:]))))
plt.title('last equalizer frequency response')
plt.xlabel('frequency / Hz')
plt.ylabel('amplitude a.u.')
plt.show()
# plot evolution of filters frequency response
plt.figure()
ax = plt.subplot(projection='3d')
f = np.fft.fftshift(np.fft.fftfreq(h[0,:].size, d=1/sig_rx.sample_rate[0]))
outsymbs = [0, 1000, 5000, 10000, 20000, 30000, h[:,0].size-1]
for outsymb in outsymbs:
plt.plot(f, np.ones(f.size)*outsymb, np.abs(np.fft.fftshift(np.fft.fft(h[int(outsymb),:]))))
plt.title('evolution of equalizer frequency response')
plt.xlabel('frequency / Hz')
plt.ylabel('time / symbols')
plt.show()
# cut away init symbols
sps = int(sig_rx.sample_rate[0]/sig_rx.symbol_rate[0])
cut = 5000
sig_rx.samples = sig_rx.samples[0][int(cut)*sps:]
# ... or matched filtering
else:
# Rx matched filter
sig_rx.raised_cosine_filter(roll_off=roll_off,root_raised=True)
# crop samples here, if necessary
sps = int(sig_rx.sample_rate[0] / sig_rx.symbol_rate[0])
crop = 10*sps
if crop != 0:
sig_rx.samples = sig_rx.samples[0][crop:-crop]
else:
sig_rx.samples = sig_rx.samples[0]
# sampling phase / clock adjustment
block_size = -1 # size of one block in SYMBOLS... -1 for only one block
sig_rx.sampling_clock_adjustment(block_size)
Decimation
[ ]:
# sampling (if necessary)
start_sample = 0
sps = sig_rx.sample_rate[0] / sig_rx.symbol_rate[0] # CHECK FOR INTEGER SPS!!!
sig_rx.samples = sig_rx.samples[0][start_sample::int(sps)]
sig_rx.plot_constellation(0, hist=True, tit='constellation after EQ')
Carrier Phase Estimation (CPE)
[ ]:
# CPE
viterbi = False
# ...either VV
if viterbi:
cpe_results = skc.rx.carrier_phase_estimation_VV(sig_rx.samples[0], n_taps=51,
filter_shape='wiener', mth_power=4,
rho=.05)
sig_rx.samples = cpe_results['rec_symbols']
est_phase = cpe_results['phi_est'].real
# ...or BPS
else:
cpe_results = skc.rx.carrier_phase_estimation_bps(sig_rx.samples[0], sig_rx.constellation[0],
n_taps=15, n_test_phases=15, const_symmetry=np.pi/2)
sig_rx.samples = cpe_results['samples_corrected']
est_phase = cpe_results['est_phase_noise']
plt.plot(est_phase)
plt.title('estimated phase noise')
plt.xlabel('time / symbols')
plt.ylabel('phase / rad')
plt.grid()
plt.show()
sig_rx.plot_constellation(hist=True, tit='constellation after CPE')
Synchronization and determination of quality metricies
[ ]:
# delay and phase ambiguity estimation and compensation
sig_rx = skc.rx.symbol_sequence_sync(sig_rx, dimension=-1)["sig"]
# calc EVM
evm = skc.utils.calc_evm(sig_rx, norm='max')
print("EVM: {:2.2%}".format(evm[0]))
# estimate SNR
snr = skc.utils.estimate_SNR_evm(sig_rx, norm='rms', method='data_aided', opt=False)
print("est. SNR: {:.2f} dB".format(snr[0]))
# decision and demapper
sig_rx.decision()
sig_rx.demapper()
# BER counting
ber_res = skc.rx.count_errors(sig_rx.bits[0], sig_rx.samples[0])
print('BER = {}'.format(ber_res['ber']))