-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathsmoother.py
99 lines (84 loc) · 3.38 KB
/
smoother.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
# vim: sta:et:sw=2:ts=2:sts=2
# Written by Antonio Loquercio
import numpy as np
import scipy.stats as st
import pdb
import tensorflow as tf
def layer(op):
def layer_decorated(self, *args, **kwargs):
# Automatically set a name if not provided.
name = kwargs.setdefault('name', self.get_unique_name(op.__name__))
# Figure out the layer inputs.
if len(self.terminals) == 0:
raise RuntimeError('No input variables found for layer %s.' % name)
elif len(self.terminals) == 1:
layer_input = self.terminals[0]
else:
layer_input = list(self.terminals)
# Perform the operation and get the output.
layer_output = op(self, layer_input, *args, **kwargs)
# Add to layer LUT.
self.layers[name] = layer_output
# This output is now the input for the next layer.
self.feed(layer_output)
# Return self for chained calls.
return self
return layer_decorated
class Smoother(object):
def __init__(self, inputs, filter_size, sigma):
self.inputs = inputs
self.terminals = []
self.layers = dict(inputs)
self.filter_size = filter_size
self.sigma = sigma
self.setup()
def setup(self):
(self.feed('data')
.conv(name = 'smoothing'))
def get_unique_name(self, prefix):
ident = sum(t.startswith(prefix) for t, _ in self.layers.items()) + 1
return '%s_%d' % (prefix, ident)
def feed(self, *args):
assert len(args) != 0
self.terminals = []
for fed_layer in args:
if isinstance(fed_layer, basestring):
try:
fed_layer = self.layers[fed_layer]
except KeyError:
raise KeyError('Unknown layer name fed: %s' % fed_layer)
self.terminals.append(fed_layer)
return self
def gauss_kernel(self, kernlen=21, nsig=3, channels=1):
interval = (2*nsig+1.)/(kernlen)
x = np.linspace(-nsig-interval/2., nsig+interval/2., kernlen+1)
kern1d = np.diff(st.norm.cdf(x))
kernel_raw = np.sqrt(np.outer(kern1d, kern1d))
kernel = kernel_raw/kernel_raw.sum()
out_filter = np.array(kernel, dtype = np.float32)
out_filter = out_filter.reshape((kernlen, kernlen, 1, 1))
out_filter = np.repeat(out_filter, channels, axis = 2)
return out_filter
def make_gauss_var(self, name, size, sigma, c_i):
with tf.device("/cpu:0"):
kernel = self.gauss_kernel(size, sigma, c_i)
var = tf.Variable(tf.convert_to_tensor(kernel), name = name)
return var
def get_output(self):
'''Returns the smoother output.'''
return self.terminals[-1]
@layer
def conv(self,
input,
name,
padding='SAME'):
# Get the number of channels in the input
c_i = input.get_shape().as_list()[3]
# Convolution for a given input and kernel
convolve = lambda i, k: tf.nn.depthwise_conv2d(i, k, [1, 1, 1, 1],
padding=padding)
with tf.variable_scope(name) as scope:
kernel = self.make_gauss_var('gauss_weight', self.filter_size,
self.sigma, c_i)
output = convolve(input, kernel)
return output