1D TV Denoising algorithms#
Implementation of 1D TVD using Maximization and Minimization algorithm and Iterative clipping algorithm. Reference
import numpy as np
import math
import matplotlib.pyplot as plt
from scipy.sparse import csr_matrix, spdiags, diags, csc_matrix, eye
from scipy.sparse.linalg import spsolve
from scipy.linalg import solve
from scipy.fftpack import fft, ifft
import time
s = np.loadtxt(
"https://eeweb.engineering.nyu.edu/iselesni/lecture_notes/TVDmm/TVD_software/blocks.txt"
)
y = np.loadtxt(
"https://eeweb.engineering.nyu.edu/iselesni/lecture_notes/TVDmm/TVD_software/blocks_noisy.txt"
)
---------------------------------------------------------------------------
FileNotFoundError Traceback (most recent call last)
Cell In[2], line 1
----> 1 s = np.loadtxt(
2 "https://eeweb.engineering.nyu.edu/iselesni/lecture_notes/TVDmm/TVD_software/blocks.txt"
3 )
4 y = np.loadtxt(
5 "https://eeweb.engineering.nyu.edu/iselesni/lecture_notes/TVDmm/TVD_software/blocks_noisy.txt"
6 )
File /usr/share/miniconda/envs/L96M2lines/lib/python3.9/site-packages/numpy/lib/npyio.py:1338, in loadtxt(fname, dtype, comments, delimiter, converters, skiprows, usecols, unpack, ndmin, encoding, max_rows, quotechar, like)
1335 if isinstance(delimiter, bytes):
1336 delimiter = delimiter.decode('latin1')
-> 1338 arr = _read(fname, dtype=dtype, comment=comment, delimiter=delimiter,
1339 converters=converters, skiplines=skiprows, usecols=usecols,
1340 unpack=unpack, ndmin=ndmin, encoding=encoding,
1341 max_rows=max_rows, quote=quotechar)
1343 return arr
File /usr/share/miniconda/envs/L96M2lines/lib/python3.9/site-packages/numpy/lib/npyio.py:975, in _read(fname, delimiter, comment, quote, imaginary_unit, usecols, skiplines, max_rows, converters, ndmin, unpack, dtype, encoding)
973 fname = os.fspath(fname)
974 if isinstance(fname, str):
--> 975 fh = np.lib._datasource.open(fname, 'rt', encoding=encoding)
976 if encoding is None:
977 encoding = getattr(fh, 'encoding', 'latin1')
File /usr/share/miniconda/envs/L96M2lines/lib/python3.9/site-packages/numpy/lib/_datasource.py:193, in open(path, mode, destpath, encoding, newline)
156 """
157 Open `path` with `mode` and return the file object.
158
(...)
189
190 """
192 ds = DataSource(destpath)
--> 193 return ds.open(path, mode, encoding=encoding, newline=newline)
File /usr/share/miniconda/envs/L96M2lines/lib/python3.9/site-packages/numpy/lib/_datasource.py:533, in DataSource.open(self, path, mode, encoding, newline)
530 return _file_openers[ext](found, mode=mode,
531 encoding=encoding, newline=newline)
532 else:
--> 533 raise FileNotFoundError(f"{path} not found.")
FileNotFoundError: https://eeweb.engineering.nyu.edu/iselesni/lecture_notes/TVDmm/TVD_software/blocks.txt not found.
N = 256
# N : signal length
sigma = 0.5; # sigma : standard deviation of noise
fig, axs = plt.subplots(1, 2, figsize=(15, 5))
axs[0].plot(s)
axs[0].set_title("original signal")
axs[1].plot(y)
axs[1].set_title("noisy signal");
Using linear systems#
def tvd_mm(y, lam, Nit):
start = time.time()
# Cost function history
cost = np.zeros(Nit)
# Signal length
N = y.shape[0]
# Create sparse difference matrix
I = np.eye(N)
D = spdiags(np.vstack((np.ones((1, N)), -1 * np.ones((1, N)))), [0, 1], N - 1, N)
DDT = D @ D.T
# Initialization
x = y.copy()
Dx = D @ x
Dy = D @ y
for k in range(Nit):
# Compute banded matrix F
F = csc_matrix(diags(np.abs(Dx).flatten() / lam, 0)) + DDT
# Solve linear system to get updated signal x
x = y - D.T @ spsolve(F, Dy)
# Update variables
Dx = D @ x
cost[k] = 0.5 * np.sum(np.abs(x - y) ** 2) + lam * np.sum(np.abs(Dx))
end = time.time()
return x.flatten(), cost, end - start
lam = 1.5
max_iter = 256
x_mm, cost_mm, time_taken = tvd_mm(y, lam, max_iter)
fig, axs = plt.subplots(1, 4, figsize=(30, 5))
axs[0].plot(s)
axs[0].set_title("original signal")
axs[1].plot(y)
axs[1].set_title("noisy signal")
axs[2].plot(x_mm)
axs[2].set_title("recovered signal")
axs[3].plot(cost_mm)
axs[3].set_title("cost graph")
print(f"Time taken = {time_taken}")
def tvd_ic(y, lam, Nit):
start = time.time()
y = y.reshape((1, -1)) # row vector
J = np.zeros(Nit) # objective function
N = y.shape[1]
z = np.zeros((1, N - 1))
alpha = 3
T = lam / 2
x = y.copy()
for k in range(Nit):
inter = np.concatenate((-z[:, 0], -np.diff(z.flatten()), z[:, -1]))
x = y - inter # y - D' z
J[k] = 0.5 * np.sum(np.abs(x - y) ** 2) + lam * np.sum(np.abs(np.diff(x)))
z = z + 1 / alpha * np.diff(x) # z + 1/alpha D z
z = np.clip(z, -T, T) # clip(z,T)
end = time.time()
return x.flatten(), J, end - start
max_iter = 256
x_ic, cost_ic, time_taken = tvd_ic(y, 1.5 * lam, max_iter)
fig, axs = plt.subplots(1, 4, figsize=(30, 5))
axs[0].plot(s)
axs[0].set_title("original signal")
axs[1].plot(y)
axs[1].set_title("noisy signal")
axs[2].plot(x_ic)
axs[2].set_title("recovered signal")
axs[3].plot(cost_ic)
axs[3].set_title("cost graph")
print(f"Time taken = {time_taken}")
soft_thresh = lambda v, t: np.maximum(np.abs(v) - t, 0.0) * np.sign(v)
# ver3
def tvd_admm(y, lam, Nit, mu1=1, mu2=1):
start = time.time()
y = np.expand_dims(y, axis=1)
cost = np.zeros(Nit) # objective function
# Signal length
N = y.shape[0]
T = 0.5 * lam / mu2
e = np.ones(N - 1)
Dmtx = spdiags([e, -e], [0, 1], N - 1, N) # sparse matrix
F = mu1 * eye(N) + mu2 * (Dmtx.T @ Dmtx) # sparse matrix
D = lambda x: np.diff(x, axis=0) # D
DT = lambda x: np.concatenate((-x[:1,], -np.diff(x, axis=0), x[-1:,])) # D'
# initializations
# u1 = y.copy()
d1 = np.zeros((N, N))
d2 = np.zeros((N - 1, 1))
x = np.zeros((N, 1))
for k in range(Nit):
v1 = (y - d1 + mu1 * x) / (1 + mu1)
v2 = soft_thresh(D(x) + d2, T) - d2
x = spsolve(F, mu1 * v1 + mu2 * DT(v2)) # sparse system solve
d1 = x - v1
d2 = D(x) - v2
# x = np.expand_dims(x,axis=1)
cost[k] = np.sum(np.abs(x - y) ** 2) + lam * np.sum(np.abs(D(x)))
end = time.time()
return x.flatten()[::256], cost, end - start
max_iter = 256
x_admm, cost_admm, time_taken = tvd_admm(y, lam, max_iter)
fig, axs = plt.subplots(1, 4, figsize=(30, 5))
axs[0].plot(s)
axs[0].set_title("original signal")
axs[1].plot(y)
axs[1].set_title("noisy signal")
axs[2].plot(x_admm)
axs[2].set_title("recovered signal")
axs[3].plot(cost_admm)
axs[3].set_title("cost graph")
print(f"Time taken = {time_taken}")