{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Experimental setup for a coherent optical heterodyne transmission" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Setup for an offline laboratory experiment, where the elecrical signals are generated in Python, uploaded to an arbitrary waveform generator (AWG), modulated to an optical wave, coherently received, digitized with an real-time oszilloscope and further processed with Python DSP." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Load required packages" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import time, copy\n", "import numpy as np\n", "import scipy.signal as ssignal\n", "import matplotlib.pyplot as plt\n", "try:\n", " # if you installed skcomm with pypi\n", " import skcomm as sk\n", "except:\n", " # if you like to use skcomm directly from source.\n", " # Please note: If you want to import skcomm into your file using this snippet, you must ensure that the file is on a directional level below skcomm.\n", " import sys, os\n", " current_parent_folder = os.path.abspath('..')\n", " if not current_parent_folder in sys.path:\n", " sys.path.append(os.path.join(current_parent_folder))\n", " import skcomm as skc" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Set parameters" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Tx \n", "# signal parameters\n", "symbol_rate = 3.2e9\n", "n_bits = 2**15\n", "modulation_order = 4\n", "roll_off = 0.1\n", "dac_sr = 16e9\n", "f_if_nom = 2e9\n", "awg_ipaddress = '192.168.1.21'\n", "scope_ipaddress = '192.168.1.20'\n", "upload_samples = True\n", "hold_shot = False\n", "use_predistortion = False \n", "sinc_correction = False\n", "\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Transmitter" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# contruct signal\n", "sig_tx = skc.signal.Signal(n_dims=1)\n", "sig_tx.symbol_rate = symbol_rate\n", "\n", "tx_upsampling = dac_sr / sig_tx.symbol_rate[0]\n", "\n", "# generate bits\n", "sig_tx.generate_bits(n_bits=n_bits, seed=1)\n", "\n", "# set constellation (modulation format)\n", "sig_tx.generate_constellation(format='QAM', order=modulation_order)\n", "\n", "# create symbols\n", "sig_tx.mapper()\n", "\n", "# upsampling and pulseshaping\n", "sig_tx.pulseshaper(upsampling=tx_upsampling, pulseshape='rrc', roll_off=[roll_off])\n", "\n", "# generate DAC samples (analytical signalg at IF)\n", "\n", "f_granularity = 1 / sig_tx.samples[0].size * sig_tx.sample_rate[0]\n", "f_if = round(f_if_nom / f_granularity) * f_granularity\n", "print('intermediate frequency: {} MHz'.format(f_if/1e6))\n", "t = np.arange(0, np.size(sig_tx.samples[0])) / sig_tx.sample_rate\n", "\n", "# upmixing to IF\n", "sig_tx.samples[0] = sig_tx.samples[0] * np.exp(1j * 2 * np.pi * f_if * t)\n", "sig_tx.center_frequency = f_if\n", "\n", "# sinc correction\n", "if sinc_correction:\n", " sig_tx.samples[0] = skc.pre_distortion.dac_sinc_correction(sig_tx.samples[0],\n", " f_max=1.0)\n", "\n", "# pre-equalization of AWG frequency response\n", "if use_predistortion:\n", " filtershape = np.load('setup_files/preDistFilter.npy')\n", " sig_tx.samples[0] = skc.filters.filter_arbitrary(sig_tx.samples[0], \n", " filtershape, \n", " sample_rate=sig_tx.symbol_rate[0]*tx_upsampling)\n", "\n", "# format samples so that driver can handle them (range +-1)\n", "maxVal = np.max(np.abs(np.concatenate((np.real(sig_tx.samples), np.imag(sig_tx.samples)))))\n", "samples = np.asarray(sig_tx.samples) / maxVal\n", "samples = np.concatenate((np.real(samples), np.imag(samples)))\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Channel" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Upload signal to arbitrary signal generator (AWG), E/O conversion, transmission, O/E conversion, sampling and download samples fron digital real-time oszilloscope" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Experiment \n", "if upload_samples: \n", " # write samples to AWG \n", " skc.instrument_control.write_samples_Tektronix_AWG70002B(samples, ip_address=awg_ipaddress, \n", " sample_rate=[dac_sr], amp_pp=[0.25, 0.25], \n", " channels=[1, 2], log_mode = False)\n", " time.sleep(2.0)\n", "\n", "\n", "if not hold_shot:\n", " # get samples from scope\n", " sr, samples = skc.instrument_control.get_samples_Tektronix_MSO6B(channels=[1, 2], \n", " ip_address=scope_ipaddress,\n", " number_of_bytes = 1,\n", " log_mode = False)\n", " \n", " tmp_shot = copy.deepcopy(samples)\n", "else:\n", " samples = copy.deepcopy(tmp_shot) # see hint at https://github.com/spyder-ide/spyder/issues/11558\n", " # or, dump to and load from file:\n", " # see https://stackoverflow.com/questions/4530611/saving-and-loading-objects-and-using-pickle/4531859\n", " # import pickle\n", " # with open('tmp_shot.obj', 'wb') as ts:\n", " # pickle.dump(tmp_shot, ts)\n", " # with open('tmp_shot.obj', 'rb') as ts:\n", " # tmp_shot = pickle.load(ts)\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Receiver" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Preparation of received samples" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "\n", "# subtration of pos. and neg. detector\n", "samples = samples[0] - samples[1]\n", "\n", "# remove mean of signal\n", "samples = samples - np.mean(samples)\n", "\n", "\n", "# Rx \n", "\n", "# contruct rx signal structure\n", "sig_rx = copy.deepcopy(sig_tx)\n", "sig_rx.samples = samples\n", "sig_rx.sample_rate = sr\n", "\n", "# resampling to the same sample rate as at the transmitter\n", "sr_dsp = sig_tx.sample_rate[0]\n", "\n", "# watch out, that this is really an integer, otherwise the samplerate is asynchronous with the data afterwards!!!\n", "len_dsp = sr_dsp / sig_rx.sample_rate[0] * np.size(samples)\n", "if len_dsp % 1:\n", " raise ValueError('DSP samplerate results in asynchronous sampling of the data symbols')\n", "sig_rx.samples = ssignal.resample(sig_rx.samples[0], num=int(len_dsp), window=None)\n", "sig_rx.sample_rate = sr_dsp\n", "sig_rx.plot_spectrum(tit='received spectrum before IF downmixing')\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Downmixing from intermediate frequency and resampling" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "\n", "# IQ-Downmixing and (ideal) lowpass filtering\n", "t = skc.utils.create_time_axis(sig_rx.sample_rate[0], np.size(sig_rx.samples[0]))\n", "samples_r = sig_rx.samples[0] * np.cos(2 * np.pi * f_if * t)\n", "fc = sig_tx.symbol_rate[0] / 2 * (1 + roll_off) * 1.1 # cuttoff frequency of filter\n", "fc = fc/(sig_rx.sample_rate[0]/2) # normalization to the sampling frequency\n", "tmp = skc.filters.ideal_lp(samples_r, fc)\n", "samples_r = tmp['samples_out']\n", "samples_i = sig_rx.samples[0] * np.sin(2 * np.pi * f_if * t)\n", "tmp = skc.filters.ideal_lp(samples_i, fc)\n", "samples_i = tmp['samples_out']\n", "sig_rx.samples[0] = samples_r - 1j * samples_i\n", "\n", "# From here: \"standard\" coherent complex baseband signal processing ############\n", "# resample to 2 sps\n", "sps_new = 2\n", "sps = sig_rx.sample_rate[0]/sig_rx.symbol_rate[0]\n", "new_length = int(sig_rx.samples[0].size/sps*sps_new)\n", "sig_rx.samples = ssignal.resample(sig_rx.samples[0], new_length, window='boxcar')\n", "sig_rx.sample_rate = sps_new*sig_rx.symbol_rate[0]\n", "\n", "# normalize samples to mean magnitude of original constellation\n", "mag_const = np.mean(abs(sig_rx.constellation[0]))\n", "mag_samples = np.mean(abs(sig_rx.samples[0]))\n", "sig_rx.samples = sig_rx.samples[0] * mag_const / mag_samples\n", "\n", "sig_rx.plot_constellation(hist=True, tit='constellation before EQ')\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Equalization (or matched filtering)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "\n", "adaptive_filter = True\n", "# either blind adaptive filter....\n", "if adaptive_filter is True: \n", " results = skc.rx.blind_adaptive_equalizer(sig_rx, n_taps=31, mu_cma=1e-3, \n", " mu_rde=1e-5, mu_dde=0.5, decimate=False, \n", " return_info=True, stop_adapting=-1, \n", " start_rde=5000*0, start_dde=5000*0)\n", " \n", " sig_rx = results['sig']\n", " h = results['h'][0]\n", " eps = results['eps'][0]\n", " # plot error evolution\n", " plt.plot(np.abs(eps))\n", " plt.title('evolution of equalizer error')\n", " plt.xlabel('time / symbols')\n", " plt.ylabel('error /a.u.')\n", " plt.show()\n", " # plot last filter frequency response\n", " plt.plot(np.abs(np.fft.fftshift(np.fft.fft(h[-1,:]))))\n", " plt.title('last equalizer frequency response')\n", " plt.xlabel('frequency / Hz')\n", " plt.ylabel('amplitude a.u.')\n", " plt.show() \n", " # plot evolution of filters frequency response\n", " plt.figure()\n", " ax = plt.subplot(projection='3d')\n", " f = np.fft.fftshift(np.fft.fftfreq(h[0,:].size, d=1/sig_rx.sample_rate[0]))\n", " outsymbs = [0, 1000, 5000, 10000, 20000, 30000, h[:,0].size-1] \n", " for outsymb in outsymbs:\n", " plt.plot(f, np.ones(f.size)*outsymb, np.abs(np.fft.fftshift(np.fft.fft(h[int(outsymb),:]))))\n", " plt.title('evolution of equalizer frequency response')\n", " plt.xlabel('frequency / Hz')\n", " plt.ylabel('time / symbols') \n", " plt.show() \n", " \n", " # cut away init symbols\n", " sps = int(sig_rx.sample_rate[0]/sig_rx.symbol_rate[0])\n", " cut = 5000\n", " sig_rx.samples = sig_rx.samples[0][int(cut)*sps:]\n", "\n", "# ... or matched filtering\n", "else:\n", " # Rx matched filter\n", " sig_rx.raised_cosine_filter(roll_off=roll_off,root_raised=True) \n", " \n", " # crop samples here, if necessary\n", " sps = int(sig_rx.sample_rate[0] / sig_rx.symbol_rate[0])\n", " crop = 10*sps\n", " if crop != 0:\n", " sig_rx.samples = sig_rx.samples[0][crop:-crop]\n", " else:\n", " sig_rx.samples = sig_rx.samples[0]\n", " \n", " # sampling phase / clock adjustment\n", " block_size = -1 # size of one block in SYMBOLS... -1 for only one block\n", " sig_rx.sampling_clock_adjustment(block_size)\n", " \n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Decimation" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# sampling (if necessary)\n", "start_sample = 0\n", "sps = sig_rx.sample_rate[0] / sig_rx.symbol_rate[0] # CHECK FOR INTEGER SPS!!!\n", "sig_rx.samples = sig_rx.samples[0][start_sample::int(sps)]\n", "sig_rx.plot_constellation(0, hist=True, tit='constellation after EQ')\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Carrier Phase Estimation (CPE)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# CPE\n", "viterbi = False\n", "# ...either VV\n", "if viterbi:\n", " cpe_results = skc.rx.carrier_phase_estimation_VV(sig_rx.samples[0], n_taps=51, \n", " filter_shape='wiener', mth_power=4, \n", " rho=.05)\n", " sig_rx.samples = cpe_results['rec_symbols']\n", " est_phase = cpe_results['phi_est'].real\n", "# ...or BPS\n", "else:\n", " cpe_results = skc.rx.carrier_phase_estimation_bps(sig_rx.samples[0], sig_rx.constellation[0], \n", " n_taps=15, n_test_phases=15, const_symmetry=np.pi/2)\n", " sig_rx.samples = cpe_results['samples_corrected']\n", " est_phase = cpe_results['est_phase_noise']\n", " \n", "plt.plot(est_phase)\n", "plt.title('estimated phase noise')\n", "plt.xlabel('time / symbols')\n", "plt.ylabel('phase / rad')\n", "plt.grid()\n", "plt.show()\n", "\n", "sig_rx.plot_constellation(hist=True, tit='constellation after CPE')\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Synchronization and determination of quality metricies" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "\n", "# delay and phase ambiguity estimation and compensation\n", "sig_rx = skc.rx.symbol_sequence_sync(sig_rx, dimension=-1)[\"sig\"]\n", " \n", "# calc EVM\n", "evm = skc.utils.calc_evm(sig_rx, norm='max')\n", "print(\"EVM: {:2.2%}\".format(evm[0]))\n", "\n", "# estimate SNR\n", "snr = skc.utils.estimate_SNR_evm(sig_rx, norm='rms', method='data_aided', opt=False)\n", "print(\"est. SNR: {:.2f} dB\".format(snr[0]))\n", "\n", "# decision and demapper\n", "sig_rx.decision()\n", "sig_rx.demapper()\n", "\n", "# BER counting\n", "ber_res = skc.rx.count_errors(sig_rx.bits[0], sig_rx.samples[0])\n", "print('BER = {}'.format(ber_res['ber']))" ] } ], "metadata": { "kernelspec": { "display_name": "noelle311", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.11.4" }, "nbsphinx": { "execute": "never" } }, "nbformat": 4, "nbformat_minor": 2 }