{ "cells": [ { "attachments": {}, "cell_type": "markdown", "metadata": { "colab_type": "text", "id": "view-in-github" }, "source": [ "# 1D TV Denoising algorithms\n", "\n", "\"Open" ] }, { "cell_type": "markdown", "metadata": { "id": "-1HztvHt3fd2" }, "source": [ "Implementation of 1D TVD using Maximization and Minimization algorithm and Iterative clipping algorithm. [Reference](https://eeweb.engineering.nyu.edu/iselesni/lecture_notes/TVDmm/TVD_software/html/Example1.html)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "thbGfv-dTYB2", "tags": [] }, "outputs": [], "source": [ "import numpy as np\n", "import math\n", "import matplotlib.pyplot as plt\n", "from scipy.sparse import csr_matrix, spdiags, diags, csc_matrix, eye\n", "from scipy.sparse.linalg import spsolve\n", "from scipy.linalg import solve\n", "from scipy.fftpack import fft, ifft\n", "import time" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "A-vVH1H6VPsZ", "tags": [] }, "outputs": [], "source": [ "s = np.loadtxt(\n", " \"https://eeweb.engineering.nyu.edu/iselesni/lecture_notes/TVDmm/TVD_software/blocks.txt\"\n", ")\n", "y = np.loadtxt(\n", " \"https://eeweb.engineering.nyu.edu/iselesni/lecture_notes/TVDmm/TVD_software/blocks_noisy.txt\"\n", ")" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "S85cB8m2Wv0Y", "tags": [] }, "outputs": [], "source": [ "N = 256\n", "# N : signal length\n", "sigma = 0.5; # sigma : standard deviation of noise" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "colab": { "base_uri": "https://localhost:8080/", "height": 485 }, "id": "0jh9m866XFbL", "outputId": "dffe4693-a3c5-4dfe-c503-5f83e4314762", "tags": [] }, "outputs": [], "source": [ "fig, axs = plt.subplots(1, 2, figsize=(15, 5))\n", "axs[0].plot(s)\n", "axs[0].set_title(\"original signal\")\n", "axs[1].plot(y)\n", "axs[1].set_title(\"noisy signal\");" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": { "id": "vaUVQfM3iYCX" }, "source": [ "## Using linear systems" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "e369Dv-DXKv8" }, "outputs": [], "source": [ "def tvd_mm(y, lam, Nit):\n", " start = time.time()\n", " # Cost function history\n", " cost = np.zeros(Nit)\n", "\n", " # Signal length\n", " N = y.shape[0]\n", "\n", " # Create sparse difference matrix\n", " I = np.eye(N)\n", " D = spdiags(np.vstack((np.ones((1, N)), -1 * np.ones((1, N)))), [0, 1], N - 1, N)\n", " DDT = D @ D.T\n", "\n", " # Initialization\n", " x = y.copy()\n", " Dx = D @ x\n", " Dy = D @ y\n", "\n", " for k in range(Nit):\n", " # Compute banded matrix F\n", " F = csc_matrix(diags(np.abs(Dx).flatten() / lam, 0)) + DDT\n", " # Solve linear system to get updated signal x\n", " x = y - D.T @ spsolve(F, Dy)\n", "\n", " # Update variables\n", " Dx = D @ x\n", " cost[k] = 0.5 * np.sum(np.abs(x - y) ** 2) + lam * np.sum(np.abs(Dx))\n", "\n", " end = time.time()\n", "\n", " return x.flatten(), cost, end - start" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "colab": { "base_uri": "https://localhost:8080/", "height": 298 }, "id": "2MDXGjyPdUCB", "outputId": "6c38f1cc-dcc4-4fa1-b032-f455a3a5a404" }, "outputs": [], "source": [ "lam = 1.5\n", "max_iter = 256\n", "x_mm, cost_mm, time_taken = tvd_mm(y, lam, max_iter)\n", "fig, axs = plt.subplots(1, 4, figsize=(30, 5))\n", "axs[0].plot(s)\n", "axs[0].set_title(\"original signal\")\n", "axs[1].plot(y)\n", "axs[1].set_title(\"noisy signal\")\n", "axs[2].plot(x_mm)\n", "axs[2].set_title(\"recovered signal\")\n", "axs[3].plot(cost_mm)\n", "axs[3].set_title(\"cost graph\")\n", "print(f\"Time taken = {time_taken}\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "eCPgt3R7kWwj" }, "outputs": [], "source": [ "def tvd_ic(y, lam, Nit):\n", " start = time.time()\n", " y = y.reshape((1, -1)) # row vector\n", " J = np.zeros(Nit) # objective function\n", " N = y.shape[1]\n", " z = np.zeros((1, N - 1))\n", " alpha = 3\n", " T = lam / 2\n", " x = y.copy()\n", " for k in range(Nit):\n", " inter = np.concatenate((-z[:, 0], -np.diff(z.flatten()), z[:, -1]))\n", "\n", " x = y - inter # y - D' z\n", "\n", " J[k] = 0.5 * np.sum(np.abs(x - y) ** 2) + lam * np.sum(np.abs(np.diff(x)))\n", "\n", " z = z + 1 / alpha * np.diff(x) # z + 1/alpha D z\n", "\n", " z = np.clip(z, -T, T) # clip(z,T)\n", "\n", " end = time.time()\n", " return x.flatten(), J, end - start" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "colab": { "base_uri": "https://localhost:8080/", "height": 298 }, "id": "3jUg5lx1khK4", "outputId": "fe659ee7-6a46-4dd6-cdd7-1624ae86596c" }, "outputs": [], "source": [ "max_iter = 256\n", "x_ic, cost_ic, time_taken = tvd_ic(y, 1.5 * lam, max_iter)\n", "fig, axs = plt.subplots(1, 4, figsize=(30, 5))\n", "axs[0].plot(s)\n", "axs[0].set_title(\"original signal\")\n", "axs[1].plot(y)\n", "axs[1].set_title(\"noisy signal\")\n", "axs[2].plot(x_ic)\n", "axs[2].set_title(\"recovered signal\")\n", "axs[3].plot(cost_ic)\n", "axs[3].set_title(\"cost graph\")\n", "print(f\"Time taken = {time_taken}\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "x3bQgV-pPstF" }, "outputs": [], "source": [ "soft_thresh = lambda v, t: np.maximum(np.abs(v) - t, 0.0) * np.sign(v)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "gQao7kCCzNmj" }, "outputs": [], "source": [ "# ver3\n", "def tvd_admm(y, lam, Nit, mu1=1, mu2=1):\n", " start = time.time()\n", " y = np.expand_dims(y, axis=1)\n", " cost = np.zeros(Nit) # objective function\n", " # Signal length\n", " N = y.shape[0]\n", " T = 0.5 * lam / mu2\n", "\n", " e = np.ones(N - 1)\n", " Dmtx = spdiags([e, -e], [0, 1], N - 1, N) # sparse matrix\n", " F = mu1 * eye(N) + mu2 * (Dmtx.T @ Dmtx) # sparse matrix\n", "\n", " D = lambda x: np.diff(x, axis=0) # D\n", " DT = lambda x: np.concatenate((-x[:1,], -np.diff(x, axis=0), x[-1:,])) # D'\n", "\n", " # initializations\n", " # u1 = y.copy()\n", " d1 = np.zeros((N, N))\n", " d2 = np.zeros((N - 1, 1))\n", " x = np.zeros((N, 1))\n", " for k in range(Nit):\n", " v1 = (y - d1 + mu1 * x) / (1 + mu1)\n", " v2 = soft_thresh(D(x) + d2, T) - d2\n", " x = spsolve(F, mu1 * v1 + mu2 * DT(v2)) # sparse system solve\n", " d1 = x - v1\n", " d2 = D(x) - v2\n", " # x = np.expand_dims(x,axis=1)\n", " cost[k] = np.sum(np.abs(x - y) ** 2) + lam * np.sum(np.abs(D(x)))\n", "\n", " end = time.time()\n", " return x.flatten()[::256], cost, end - start" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "colab": { "base_uri": "https://localhost:8080/", "height": 1000 }, "id": "ggIx9BqM1e4b", "outputId": "b915c085-c822-45d6-fed2-5b6ac7e41639" }, "outputs": [], "source": [ "max_iter = 256\n", "x_admm, cost_admm, time_taken = tvd_admm(y, lam, max_iter)\n", "fig, axs = plt.subplots(1, 4, figsize=(30, 5))\n", "axs[0].plot(s)\n", "axs[0].set_title(\"original signal\")\n", "axs[1].plot(y)\n", "axs[1].set_title(\"noisy signal\")\n", "axs[2].plot(x_admm)\n", "axs[2].set_title(\"recovered signal\")\n", "axs[3].plot(cost_admm)\n", "axs[3].set_title(\"cost graph\")\n", "print(f\"Time taken = {time_taken}\")" ] } ], "metadata": { "colab": { "authorship_tag": "ABX9TyNYZq3Zfwo1T+YqD4Oz/Zf/", "include_colab_link": true, "provenance": [] }, "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.9.16" } }, "nbformat": 4, "nbformat_minor": 4 }