{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Experimental setup for a coherent optical homodyne (intradyne) transmission" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import time, copy\n", "\n", "import numpy as np\n", "import scipy.signal as ssignal\n", "import matplotlib.pyplot as plt\n", "import scipy.interpolate as sinterp\n", "\n", "try:\n", " # if you installed skcomm with pypi\n", " import skcomm as sk\n", "except:\n", " # if you like to use skcomm directly from source.\n", " # Please note: If you want to import skcomm into your file using this snippet, you must ensure that the file is on a directional level below skcomm.\n", " import sys, os\n", " current_parent_folder = os.path.abspath('..')\n", " if not current_parent_folder in sys.path:\n", " sys.path.append(os.path.join(current_parent_folder))\n", " import skcomm as skc" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "symbol_rate = 12.8e9\n", "n_bits = 2**15\n", "modulation_order = 4\n", "roll_off = 0.1\n", "dac_sr = 16e9\n", "upload_samples = False\n", "hold_shot = False\n", "use_predistortion = False\n", "sinc_correction = True" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "\n", "# contruct signal\n", "sig_tx = skc.signal.Signal(n_dims=1)\n", "sig_tx.symbol_rate = symbol_rate\n", "\n", "tx_upsampling = dac_sr / sig_tx.symbol_rate[0]\n", "\n", "# generate bits\n", "sig_tx.generate_bits(n_bits=n_bits, seed=1)\n", "\n", "# set constellation (modulation format)\n", "sig_tx.generate_constellation(format='QAM', order=modulation_order)\n", "\n", "# create symbols\n", "sig_tx.mapper()\n", "\n", "# upsampling and pulseshaping\n", "\n", "sig_tx.pulseshaper(upsampling=tx_upsampling, pulseshape='rrc', roll_off=[roll_off])\n", "\n", "# sinc correction\n", "if sinc_correction:\n", " sig_tx.samples[0] = skc.pre_distortion.dac_sinc_correction(sig_tx.samples[0],\n", " f_max=1.0)\n", "\n", "# pre-equalization of AWG frequency response\n", "if use_predistortion:\n", " #filtershape = np.load('setup_files/preDistFilter.npy')\n", " filtershape = np.transpose(np.array([[-16e9, -8e9, -7.8e9, 0, 7.8e9, 8e9, 16e9], [-100, -100, 2, 0 , 2,-100,-100]]))\n", " sig_tx.samples[0] = skc.filters.filter_arbitrary(sig_tx.samples[0], \n", " filtershape, \n", " sample_rate=sig_tx.symbol_rate[0]*tx_upsampling)\n", "\n", "# format samples so that driver can handle them (range +-1)\n", "maxVal = np.max(np.abs(np.concatenate((np.real(sig_tx.samples), np.imag(sig_tx.samples)))))\n", "samples = np.asarray(sig_tx.samples) / maxVal\n", "samples = np.concatenate((np.real(samples), np.imag(samples)))\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Upload signal to arbitrary signal generator (AWG), E/O conversion, transmission, O/E conversion, sampling and download samples fron digital real-time oszilloscope" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "if upload_samples: \n", " # write samples to AWG \n", " skc.instrument_control.write_samples_Tektronix_AWG70002B(samples, ip_address='192.168.1.21', \n", " sample_rate=[dac_sr], amp_pp=[0.25, 0.25], \n", " channels=[1, 2], log_mode = False)\n", " time.sleep(2.0)\n", " \n", "if not hold_shot:\n", " # get samples from scope\n", " sr, samples = skc.instrument_control.get_samples_Tektronix_MSO6B(channels=[1, 2], \n", " ip_address='192.168.1.20',\n", " number_of_bytes = 1,\n", " log_mode = False)\n", " \n", " tmp_shot = copy.deepcopy(samples)\n", "else:\n", " samples = copy.deepcopy(tmp_shot) # see hint at https://github.com/spyder-ide/spyder/issues/11558\n", " # or, dump to and load from file:\n", " # see https://stackoverflow.com/questions/4530611/saving-and-loading-objects-and-using-pickle/4531859\n", " # import pickle\n", " # with open('tmp_shot.obj', 'wb') as ts:\n", " # pickle.dump(tmp_shot, ts)\n", " # with open('tmp_shot.obj', 'rb') as ts:\n", " # tmp_shot = pickle.load(ts)\n", "\n", "# # I-Q imbalance correction (for suppressing the DD-term for high SSI-suppression)\n", "# IQ_imbalance = -0.45 # [dB] (a pos. value means channel I is smaller than channel Q after coh. frontend, and v.v.)\n", "# samples[0] = samples[0]*10**(0.5*IQ_imbalance/20)\n", "# samples[1] = samples[1]*10**(-0.5*IQ_imbalance/20)\n", "\n", "# # remove mean of signal\n", "# samples = samples - np.mean(samples)\n", "\n", "# construct complex signal\n", "samples = samples[0] + 1j * samples[1]\n", "samples = samples - np.mean(samples)\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Receiver" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Resampling" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# contruct rx signal structure\n", "sig_rx = copy.deepcopy(sig_tx)\n", "sig_rx.samples = samples\n", "sig_rx.sample_rate = sr\n", "\n", "sig_rx.plot_spectrum(tit='spectrum from scope',fNum=1)\n", "\n", "# From here: \"standard\" coherent complex baseband signal processing ############\n", "# resample to 2 sps\n", "sps_new = 2\n", "sps = sig_rx.sample_rate[0]/sig_rx.symbol_rate[0]\n", "new_length = int(sig_rx.samples[0].size/sps*sps_new)\n", "sig_rx.samples = ssignal.resample(sig_rx.samples[0], new_length, window='boxcar')\n", "sig_rx.sample_rate = sps_new*sig_rx.symbol_rate[0]\n", "\n", "sig_rx.plot_spectrum(tit='spectrum after resampling',fNum=2)\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Estimate SNR" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# estimate SNR\n", "sig_range = np.asarray([-1, 1])*sig_rx.symbol_rate[0]/2*(1+roll_off)\n", "noise_range = np.asarray([-1.1, -1.05, 1.05, 1.1]) * sig_rx.symbol_rate[0]/2 * (1+roll_off)\n", "\n", "fft_temp = np.fft.fft(sig_rx.samples[0])/sig_rx.samples[0].size\n", "RBW = sig_rx.sample_rate[0]/sig_rx.samples[0].size\n", "spec = (np.abs(np.fft.fftshift(fft_temp))**2)/RBW # -> [W/Hz]\n", "freq = np.fft.fftshift(np.fft.fftfreq(sig_rx.samples[0].size, 1/sig_rx.sample_rate[0]))\n", "snr = skc.utils.estimate_snr_spectrum(freq, spec, sig_range=sig_range, \n", " noise_range=noise_range, order=1, \n", " noise_bw=sig_rx.symbol_rate[0], \n", " scaling='lin', plotting=True, fNum=3)[\"snr_dB\"]\n", "\n", "print('est. SNR (from spectrum): {:.1f} dB'.format(snr))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Frequency offset estimation / correction" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# frequency offset estimation / correction\n", "results_foe = skc.rx.frequency_offset_correction(sig_rx.samples[0], \n", " sample_rate=sig_rx.sample_rate[0],\n", " symbol_rate=sig_rx.symbol_rate[0])\n", "\n", "sig_rx.samples[0] = results_foe['samples_corrected']\n", "print('estimated frequency offset: {:.0f} MHz'.format(results_foe['estimated_fo']/1e6))\n", "\n", "\n", "# normalize samples to mean magnitude of original constellation\n", "mag_const = np.mean(abs(sig_rx.constellation[0]))\n", "mag_samples= np.mean(abs(sig_rx.samples[0]))\n", "sig_rx.samples = sig_rx.samples[0] * mag_const / mag_samples\n", "\n", "sig_rx.plot_constellation(hist=True, tit='constellation before EQ', fNum=4)\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Equalizer / matched filter" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "\n", "adaptive_filter = True\n", "# either blind adaptive filter....\n", "if adaptive_filter is True: \n", " results = skc.rx.blind_adaptive_equalizer(sig_rx, n_taps=51, mu_cma=1e-4, \n", " mu_rde=1e-5, mu_dde=0.5, decimate=True, \n", " return_info=True, stop_adapting=-1, \n", " start_rde=5000*0, start_dde=5000*0,\n", " exec_mode='auto')\n", " \n", " sig_rx = results['sig']\n", " h = results['h'][0]\n", " eps = results['eps'][0]\n", " # plot error evolution\n", " f = plt.figure(5)\n", " f.clear()\n", " plt.plot(np.abs(eps))\n", " plt.title('evolution of equalizer error')\n", " plt.xlabel('time / symbols')\n", " plt.ylabel('error /a.u.')\n", " plt.grid(visible=True)\n", " plt.show() \n", " # plot evolution of filters frequency response\n", " f = plt.figure(6)\n", " f.clear()\n", " ax = plt.subplot(projection='3d')\n", " f = np.fft.fftshift(np.fft.fftfreq(h[0,:].size, d=1/sig_rx.sample_rate[0]))\n", " outsymbs = [0, 1000, 5000, 10000, 20000, 30000, h[:,0].size-1] \n", " for outsymb in outsymbs:\n", " plt.plot(f, np.ones(f.size)*outsymb, np.abs(np.fft.fftshift(np.fft.fft(h[int(outsymb),:]))))\n", " plt.title('evolution of equalizer frequency response')\n", " plt.xlabel('frequency / Hz')\n", " plt.ylabel('time / symbols') \n", " plt.grid(visible=True)\n", " plt.show()\n", " # plot last filter frequency response\n", " f = plt.figure(7)\n", " f.clear()\n", " f = np.fft.fftshift(np.fft.fftfreq(h[0,:].size, d=1/sig_rx.sample_rate[0]))\n", " plt.plot(f, np.abs(np.fft.fftshift(np.fft.fft(h[-1,:]))))\n", " plt.title('last equalizer frequency response')\n", " plt.xlabel('frequency / Hz')\n", " plt.ylabel('amplitude a.u.')\n", " plt.grid(visible=True)\n", " plt.show() \n", " \n", " # cut away init symbols\n", " sps = int(sig_rx.sample_rate[0]/sig_rx.symbol_rate[0])\n", " cut = 10000\n", " sig_rx.samples = sig_rx.samples[0][int(cut)*sps:]\n", "\n", "# ... or matched filtering\n", "else:\n", " # Rx matched filter\n", " sig_rx.raised_cosine_filter(roll_off=roll_off,root_raised=True) \n", " \n", " # crop samples here, if necessary\n", " sps = int(sig_rx.sample_rate[0] / sig_rx.symbol_rate[0])\n", " crop = 10*sps\n", " if crop != 0:\n", " sig_rx.samples = sig_rx.samples[0][crop:-crop]\n", " else:\n", " sig_rx.samples = sig_rx.samples[0]\n", " \n", " # sampling phase / clock adjustment\n", " BLOCK_SIZE = -1 # size of one block in SYMBOLS... -1 for only one block\n", " sig_rx.sampling_clock_adjustment(BLOCK_SIZE)\n", " \n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Decimation" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# sampling (if necessary)\n", "START_SAMPLE = 0\n", "sps = sig_rx.sample_rate[0] / sig_rx.symbol_rate[0] # CHECK FOR INTEGER SPS!!!\n", "sig_rx.samples = sig_rx.samples[0][START_SAMPLE::int(sps)]\n", "sig_rx.plot_constellation(0, hist=True, tit='constellation after EQ',fNum=8)\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Carrier phase estimation / recovery" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "\n", "# CPE\n", "viterbi = True\n", "# ...either VV\n", "if viterbi:\n", " cpe_results = skc.rx.carrier_phase_estimation_VV(sig_rx.samples[0], n_taps=31, \n", " filter_shape='wiener', mth_power=4, \n", " rho=.001)\n", " sig_rx.samples = cpe_results['rec_symbols']\n", " est_phase = cpe_results['phi_est'].real\n", "# ...or BPS\n", "else:\n", " cpe_results = skc.rx.carrier_phase_estimation_bps(sig_rx.samples[0], sig_rx.constellation[0], \n", " n_taps=15, n_test_phases=45, const_symmetry=np.pi/2,\n", " exec_mode='auto')\n", " sig_rx.samples = cpe_results['samples_corrected']\n", " est_phase = cpe_results['est_phase_noise']\n", "\n", "f = plt.figure(9)\n", "f.clear()\n", "plt.plot(est_phase)\n", "plt.title('estimated phase noise')\n", "plt.xlabel('time / symbols')\n", "plt.ylabel('phase / rad')\n", "plt.grid(visible=True)\n", "plt.show()\n", "\n", "sig_rx.plot_constellation(hist=True, tit='constellation after CPE',fNum=10)\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Synchronization and estimation of quality metric" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "\n", "# delay and phase ambiguity estimation and compensation\n", "sig_rx = skc.rx.symbol_sequence_sync(sig_rx, dimension=-1)[\"sig\"]\n", " \n", "# calc EVM\n", "evm = skc.utils.calc_evm(sig_rx, norm='max')\n", "print(\"EVM: {:2.2%}\".format(evm[0]))\n", "\n", "# estimate SNR\n", "snr = skc.utils.estimate_SNR_evm(sig_rx, norm='rms', method='data_aided', opt=False)\n", "print(\"est. SNR: {:.2f} dB\".format(snr[0]))\n", "\n", "# decision and demapper\n", "sig_rx.decision()\n", "sig_rx.demapper()\n", "\n", "# BER counting\n", "ber_res = skc.rx.count_errors(sig_rx.bits[0], sig_rx.samples[0])\n", "print('BER = {:.2e}'.format(ber_res['ber']))\n" ] } ], "metadata": { "kernelspec": { "display_name": "noelle311", "language": "python", "name": "python3" }, "language_info": { "name": "python", "version": "3.11.4" }, "nbsphinx": { "execute": "never" } }, "nbformat": 4, "nbformat_minor": 2 }