1D TV Denoising algorithms#

Open In Colab

Implementation of 1D TVD using Maximization and Minimization algorithm and Iterative clipping algorithm. Reference

import numpy as np
import math
import matplotlib.pyplot as plt
from scipy.sparse import csr_matrix, spdiags, diags, csc_matrix, eye
from scipy.sparse.linalg import spsolve
from scipy.linalg import solve
from scipy.fftpack import fft, ifft
import time
s = np.loadtxt(
    "https://eeweb.engineering.nyu.edu/iselesni/lecture_notes/TVDmm/TVD_software/blocks.txt"
)
y = np.loadtxt(
    "https://eeweb.engineering.nyu.edu/iselesni/lecture_notes/TVDmm/TVD_software/blocks_noisy.txt"
)
---------------------------------------------------------------------------
FileNotFoundError                         Traceback (most recent call last)
Cell In[2], line 1
----> 1 s = np.loadtxt(
      2     "https://eeweb.engineering.nyu.edu/iselesni/lecture_notes/TVDmm/TVD_software/blocks.txt"
      3 )
      4 y = np.loadtxt(
      5     "https://eeweb.engineering.nyu.edu/iselesni/lecture_notes/TVDmm/TVD_software/blocks_noisy.txt"
      6 )

File /usr/share/miniconda/envs/L96M2lines/lib/python3.9/site-packages/numpy/lib/npyio.py:1338, in loadtxt(fname, dtype, comments, delimiter, converters, skiprows, usecols, unpack, ndmin, encoding, max_rows, quotechar, like)
   1335 if isinstance(delimiter, bytes):
   1336     delimiter = delimiter.decode('latin1')
-> 1338 arr = _read(fname, dtype=dtype, comment=comment, delimiter=delimiter,
   1339             converters=converters, skiplines=skiprows, usecols=usecols,
   1340             unpack=unpack, ndmin=ndmin, encoding=encoding,
   1341             max_rows=max_rows, quote=quotechar)
   1343 return arr

File /usr/share/miniconda/envs/L96M2lines/lib/python3.9/site-packages/numpy/lib/npyio.py:975, in _read(fname, delimiter, comment, quote, imaginary_unit, usecols, skiplines, max_rows, converters, ndmin, unpack, dtype, encoding)
    973     fname = os.fspath(fname)
    974 if isinstance(fname, str):
--> 975     fh = np.lib._datasource.open(fname, 'rt', encoding=encoding)
    976     if encoding is None:
    977         encoding = getattr(fh, 'encoding', 'latin1')

File /usr/share/miniconda/envs/L96M2lines/lib/python3.9/site-packages/numpy/lib/_datasource.py:193, in open(path, mode, destpath, encoding, newline)
    156 """
    157 Open `path` with `mode` and return the file object.
    158 
   (...)
    189 
    190 """
    192 ds = DataSource(destpath)
--> 193 return ds.open(path, mode, encoding=encoding, newline=newline)

File /usr/share/miniconda/envs/L96M2lines/lib/python3.9/site-packages/numpy/lib/_datasource.py:533, in DataSource.open(self, path, mode, encoding, newline)
    530     return _file_openers[ext](found, mode=mode,
    531                               encoding=encoding, newline=newline)
    532 else:
--> 533     raise FileNotFoundError(f"{path} not found.")

FileNotFoundError: https://eeweb.engineering.nyu.edu/iselesni/lecture_notes/TVDmm/TVD_software/blocks.txt not found.
N = 256
# N : signal length
sigma = 0.5;  # sigma : standard deviation of noise
fig, axs = plt.subplots(1, 2, figsize=(15, 5))
axs[0].plot(s)
axs[0].set_title("original signal")
axs[1].plot(y)
axs[1].set_title("noisy signal");

Using linear systems#

def tvd_mm(y, lam, Nit):
    start = time.time()
    # Cost function history
    cost = np.zeros(Nit)

    # Signal length
    N = y.shape[0]

    # Create sparse difference matrix
    I = np.eye(N)
    D = spdiags(np.vstack((np.ones((1, N)), -1 * np.ones((1, N)))), [0, 1], N - 1, N)
    DDT = D @ D.T

    # Initialization
    x = y.copy()
    Dx = D @ x
    Dy = D @ y

    for k in range(Nit):
        # Compute banded matrix F
        F = csc_matrix(diags(np.abs(Dx).flatten() / lam, 0)) + DDT
        # Solve linear system to get updated signal x
        x = y - D.T @ spsolve(F, Dy)

        # Update variables
        Dx = D @ x
        cost[k] = 0.5 * np.sum(np.abs(x - y) ** 2) + lam * np.sum(np.abs(Dx))

    end = time.time()

    return x.flatten(), cost, end - start
lam = 1.5
max_iter = 256
x_mm, cost_mm, time_taken = tvd_mm(y, lam, max_iter)
fig, axs = plt.subplots(1, 4, figsize=(30, 5))
axs[0].plot(s)
axs[0].set_title("original signal")
axs[1].plot(y)
axs[1].set_title("noisy signal")
axs[2].plot(x_mm)
axs[2].set_title("recovered signal")
axs[3].plot(cost_mm)
axs[3].set_title("cost graph")
print(f"Time taken = {time_taken}")
def tvd_ic(y, lam, Nit):
    start = time.time()
    y = y.reshape((1, -1))  # row vector
    J = np.zeros(Nit)  # objective function
    N = y.shape[1]
    z = np.zeros((1, N - 1))
    alpha = 3
    T = lam / 2
    x = y.copy()
    for k in range(Nit):
        inter = np.concatenate((-z[:, 0], -np.diff(z.flatten()), z[:, -1]))

        x = y - inter  # y - D' z

        J[k] = 0.5 * np.sum(np.abs(x - y) ** 2) + lam * np.sum(np.abs(np.diff(x)))

        z = z + 1 / alpha * np.diff(x)  # z + 1/alpha D z

        z = np.clip(z, -T, T)  # clip(z,T)

    end = time.time()
    return x.flatten(), J, end - start
max_iter = 256
x_ic, cost_ic, time_taken = tvd_ic(y, 1.5 * lam, max_iter)
fig, axs = plt.subplots(1, 4, figsize=(30, 5))
axs[0].plot(s)
axs[0].set_title("original signal")
axs[1].plot(y)
axs[1].set_title("noisy signal")
axs[2].plot(x_ic)
axs[2].set_title("recovered signal")
axs[3].plot(cost_ic)
axs[3].set_title("cost graph")
print(f"Time taken = {time_taken}")
soft_thresh = lambda v, t: np.maximum(np.abs(v) - t, 0.0) * np.sign(v)
# ver3
def tvd_admm(y, lam, Nit, mu1=1, mu2=1):
    start = time.time()
    y = np.expand_dims(y, axis=1)
    cost = np.zeros(Nit)  # objective function
    # Signal length
    N = y.shape[0]
    T = 0.5 * lam / mu2

    e = np.ones(N - 1)
    Dmtx = spdiags([e, -e], [0, 1], N - 1, N)  # sparse matrix
    F = mu1 * eye(N) + mu2 * (Dmtx.T @ Dmtx)  # sparse matrix

    D = lambda x: np.diff(x, axis=0)  # D
    DT = lambda x: np.concatenate((-x[:1,], -np.diff(x, axis=0), x[-1:,]))  # D'

    # initializations
    # u1 = y.copy()
    d1 = np.zeros((N, N))
    d2 = np.zeros((N - 1, 1))
    x = np.zeros((N, 1))
    for k in range(Nit):
        v1 = (y - d1 + mu1 * x) / (1 + mu1)
        v2 = soft_thresh(D(x) + d2, T) - d2
        x = spsolve(F, mu1 * v1 + mu2 * DT(v2))  # sparse system solve
        d1 = x - v1
        d2 = D(x) - v2
        # x = np.expand_dims(x,axis=1)
        cost[k] = np.sum(np.abs(x - y) ** 2) + lam * np.sum(np.abs(D(x)))

    end = time.time()
    return x.flatten()[::256], cost, end - start
max_iter = 256
x_admm, cost_admm, time_taken = tvd_admm(y, lam, max_iter)
fig, axs = plt.subplots(1, 4, figsize=(30, 5))
axs[0].plot(s)
axs[0].set_title("original signal")
axs[1].plot(y)
axs[1].set_title("noisy signal")
axs[2].plot(x_admm)
axs[2].set_title("recovered signal")
axs[3].plot(cost_admm)
axs[3].set_title("cost graph")
print(f"Time taken = {time_taken}")