Experimental setup for a coherent optical heterodyne transmission

Setup for an offline laboratory experiment, where the elecrical signals are generated in Python, uploaded to an arbitrary waveform generator (AWG), modulated to an optical wave, coherently received, digitized with an real-time oszilloscope and further processed with Python DSP.

Load required packages

[ ]:
import time, copy
import numpy as np
import scipy.signal as ssignal
import matplotlib.pyplot as plt
try:
    # if you installed skcomm with pypi
    import skcomm  as sk
except:
    # if you like to use skcomm directly from source.
    # Please note: If you want to import skcomm into your file using this snippet, you must ensure that the file is on a directional level below skcomm.
    import sys, os
    current_parent_folder = os.path.abspath('..')
    if not current_parent_folder in sys.path:
        sys.path.append(os.path.join(current_parent_folder))
    import skcomm as skc

Set parameters

[ ]:
# Tx
# signal parameters
symbol_rate = 3.2e9
n_bits = 2**15
modulation_order = 4
roll_off = 0.1
dac_sr = 16e9
f_if_nom = 2e9
awg_ipaddress = '192.168.1.21'
scope_ipaddress = '192.168.1.20'
upload_samples = True
hold_shot = False
use_predistortion = False
sinc_correction = False


Transmitter

[ ]:
# contruct signal
sig_tx = skc.signal.Signal(n_dims=1)
sig_tx.symbol_rate = symbol_rate

tx_upsampling = dac_sr / sig_tx.symbol_rate[0]

# generate bits
sig_tx.generate_bits(n_bits=n_bits, seed=1)

# set constellation (modulation format)
sig_tx.generate_constellation(format='QAM', order=modulation_order)

# create symbols
sig_tx.mapper()

# upsampling and pulseshaping
sig_tx.pulseshaper(upsampling=tx_upsampling, pulseshape='rrc', roll_off=[roll_off])

# generate DAC samples (analytical signalg at IF)

f_granularity = 1 / sig_tx.samples[0].size * sig_tx.sample_rate[0]
f_if = round(f_if_nom / f_granularity) * f_granularity
print('intermediate frequency: {} MHz'.format(f_if/1e6))
t = np.arange(0, np.size(sig_tx.samples[0])) / sig_tx.sample_rate

# upmixing to IF
sig_tx.samples[0] = sig_tx.samples[0] * np.exp(1j * 2 * np.pi * f_if * t)
sig_tx.center_frequency = f_if

# sinc correction
if sinc_correction:
    sig_tx.samples[0] = skc.pre_distortion.dac_sinc_correction(sig_tx.samples[0],
                                                                f_max=1.0)

# pre-equalization of AWG frequency response
if use_predistortion:
    filtershape = np.load('setup_files/preDistFilter.npy')
    sig_tx.samples[0] = skc.filters.filter_arbitrary(sig_tx.samples[0],
                                                      filtershape,
                                                      sample_rate=sig_tx.symbol_rate[0]*tx_upsampling)

# format samples so that driver can handle them (range +-1)
maxVal = np.max(np.abs(np.concatenate((np.real(sig_tx.samples), np.imag(sig_tx.samples)))))
samples = np.asarray(sig_tx.samples) / maxVal
samples = np.concatenate((np.real(samples), np.imag(samples)))

Channel

Upload signal to arbitrary signal generator (AWG), E/O conversion, transmission, O/E conversion, sampling and download samples fron digital real-time oszilloscope

[ ]:
# Experiment
if upload_samples:
    # write samples to AWG
    skc.instrument_control.write_samples_Tektronix_AWG70002B(samples, ip_address=awg_ipaddress,
                                                    sample_rate=[dac_sr], amp_pp=[0.25, 0.25],
                                                    channels=[1, 2], log_mode = False)
    time.sleep(2.0)


if not hold_shot:
    # get samples from scope
    sr, samples = skc.instrument_control.get_samples_Tektronix_MSO6B(channels=[1, 2],
                                                                    ip_address=scope_ipaddress,
                                                                    number_of_bytes = 1,
                                                                    log_mode = False)

    tmp_shot = copy.deepcopy(samples)
else:
    samples = copy.deepcopy(tmp_shot) # see hint at https://github.com/spyder-ide/spyder/issues/11558
    # or, dump to and load from file:
    # see https://stackoverflow.com/questions/4530611/saving-and-loading-objects-and-using-pickle/4531859
    # import pickle
    # with open('tmp_shot.obj', 'wb') as ts:
    #   pickle.dump(tmp_shot, ts)
    # with open('tmp_shot.obj', 'rb') as ts:
    #   tmp_shot = pickle.load(ts)

Receiver

Preparation of received samples

[ ]:

# subtration of pos. and neg. detector samples = samples[0] - samples[1] # remove mean of signal samples = samples - np.mean(samples) # Rx # contruct rx signal structure sig_rx = copy.deepcopy(sig_tx) sig_rx.samples = samples sig_rx.sample_rate = sr # resampling to the same sample rate as at the transmitter sr_dsp = sig_tx.sample_rate[0] # watch out, that this is really an integer, otherwise the samplerate is asynchronous with the data afterwards!!! len_dsp = sr_dsp / sig_rx.sample_rate[0] * np.size(samples) if len_dsp % 1: raise ValueError('DSP samplerate results in asynchronous sampling of the data symbols') sig_rx.samples = ssignal.resample(sig_rx.samples[0], num=int(len_dsp), window=None) sig_rx.sample_rate = sr_dsp sig_rx.plot_spectrum(tit='received spectrum before IF downmixing')

Downmixing from intermediate frequency and resampling

[ ]:

# IQ-Downmixing and (ideal) lowpass filtering t = skc.utils.create_time_axis(sig_rx.sample_rate[0], np.size(sig_rx.samples[0])) samples_r = sig_rx.samples[0] * np.cos(2 * np.pi * f_if * t) fc = sig_tx.symbol_rate[0] / 2 * (1 + roll_off) * 1.1 # cuttoff frequency of filter fc = fc/(sig_rx.sample_rate[0]/2) # normalization to the sampling frequency tmp = skc.filters.ideal_lp(samples_r, fc) samples_r = tmp['samples_out'] samples_i = sig_rx.samples[0] * np.sin(2 * np.pi * f_if * t) tmp = skc.filters.ideal_lp(samples_i, fc) samples_i = tmp['samples_out'] sig_rx.samples[0] = samples_r - 1j * samples_i # From here: "standard" coherent complex baseband signal processing ############ # resample to 2 sps sps_new = 2 sps = sig_rx.sample_rate[0]/sig_rx.symbol_rate[0] new_length = int(sig_rx.samples[0].size/sps*sps_new) sig_rx.samples = ssignal.resample(sig_rx.samples[0], new_length, window='boxcar') sig_rx.sample_rate = sps_new*sig_rx.symbol_rate[0] # normalize samples to mean magnitude of original constellation mag_const = np.mean(abs(sig_rx.constellation[0])) mag_samples = np.mean(abs(sig_rx.samples[0])) sig_rx.samples = sig_rx.samples[0] * mag_const / mag_samples sig_rx.plot_constellation(hist=True, tit='constellation before EQ')

Equalization (or matched filtering)

[ ]:

adaptive_filter = True # either blind adaptive filter.... if adaptive_filter is True: results = skc.rx.blind_adaptive_equalizer(sig_rx, n_taps=31, mu_cma=1e-3, mu_rde=1e-5, mu_dde=0.5, decimate=False, return_info=True, stop_adapting=-1, start_rde=5000*0, start_dde=5000*0) sig_rx = results['sig'] h = results['h'][0] eps = results['eps'][0] # plot error evolution plt.plot(np.abs(eps)) plt.title('evolution of equalizer error') plt.xlabel('time / symbols') plt.ylabel('error /a.u.') plt.show() # plot last filter frequency response plt.plot(np.abs(np.fft.fftshift(np.fft.fft(h[-1,:])))) plt.title('last equalizer frequency response') plt.xlabel('frequency / Hz') plt.ylabel('amplitude a.u.') plt.show() # plot evolution of filters frequency response plt.figure() ax = plt.subplot(projection='3d') f = np.fft.fftshift(np.fft.fftfreq(h[0,:].size, d=1/sig_rx.sample_rate[0])) outsymbs = [0, 1000, 5000, 10000, 20000, 30000, h[:,0].size-1] for outsymb in outsymbs: plt.plot(f, np.ones(f.size)*outsymb, np.abs(np.fft.fftshift(np.fft.fft(h[int(outsymb),:])))) plt.title('evolution of equalizer frequency response') plt.xlabel('frequency / Hz') plt.ylabel('time / symbols') plt.show() # cut away init symbols sps = int(sig_rx.sample_rate[0]/sig_rx.symbol_rate[0]) cut = 5000 sig_rx.samples = sig_rx.samples[0][int(cut)*sps:] # ... or matched filtering else: # Rx matched filter sig_rx.raised_cosine_filter(roll_off=roll_off,root_raised=True) # crop samples here, if necessary sps = int(sig_rx.sample_rate[0] / sig_rx.symbol_rate[0]) crop = 10*sps if crop != 0: sig_rx.samples = sig_rx.samples[0][crop:-crop] else: sig_rx.samples = sig_rx.samples[0] # sampling phase / clock adjustment block_size = -1 # size of one block in SYMBOLS... -1 for only one block sig_rx.sampling_clock_adjustment(block_size)

Decimation

[ ]:
# sampling (if necessary)
start_sample = 0
sps = sig_rx.sample_rate[0] / sig_rx.symbol_rate[0] # CHECK FOR INTEGER SPS!!!
sig_rx.samples = sig_rx.samples[0][start_sample::int(sps)]
sig_rx.plot_constellation(0, hist=True, tit='constellation after EQ')

Carrier Phase Estimation (CPE)

[ ]:
# CPE
viterbi = False
# ...either VV
if viterbi:
    cpe_results = skc.rx.carrier_phase_estimation_VV(sig_rx.samples[0], n_taps=51,
                                                      filter_shape='wiener', mth_power=4,
                                                      rho=.05)
    sig_rx.samples = cpe_results['rec_symbols']
    est_phase = cpe_results['phi_est'].real
# ...or BPS
else:
    cpe_results = skc.rx.carrier_phase_estimation_bps(sig_rx.samples[0], sig_rx.constellation[0],
                                               n_taps=15, n_test_phases=15, const_symmetry=np.pi/2)
    sig_rx.samples = cpe_results['samples_corrected']
    est_phase = cpe_results['est_phase_noise']

plt.plot(est_phase)
plt.title('estimated phase noise')
plt.xlabel('time / symbols')
plt.ylabel('phase / rad')
plt.grid()
plt.show()

sig_rx.plot_constellation(hist=True, tit='constellation after CPE')

Synchronization and determination of quality metricies

[ ]:

# delay and phase ambiguity estimation and compensation sig_rx = skc.rx.symbol_sequence_sync(sig_rx, dimension=-1)["sig"] # calc EVM evm = skc.utils.calc_evm(sig_rx, norm='max') print("EVM: {:2.2%}".format(evm[0])) # estimate SNR snr = skc.utils.estimate_SNR_evm(sig_rx, norm='rms', method='data_aided', opt=False) print("est. SNR: {:.2f} dB".format(snr[0])) # decision and demapper sig_rx.decision() sig_rx.demapper() # BER counting ber_res = skc.rx.count_errors(sig_rx.bits[0], sig_rx.samples[0]) print('BER = {}'.format(ber_res['ber']))