151 lines
5.0 KiB
Python
151 lines
5.0 KiB
Python
import os
|
|
import sys
|
|
import warnings
|
|
from functools import partial
|
|
from timeit import timeit
|
|
from typing import Optional, Sequence
|
|
|
|
import numpy as np
|
|
import tensorflow as tf
|
|
from tensorflow.python.ops.gen_math_ops import mat_mul, sigmoid
|
|
from tensorflow.python.ops.gen_nn_ops import bias_add
|
|
from tensorflow.python.ops.resource_variable_ops import ResourceVariable
|
|
from keras.engine.keras_tensor import KerasTensor
|
|
from keras.api._v2.keras.callbacks import Callback
|
|
from keras.api._v2.keras.constraints import Constraint
|
|
from keras.api._v2.keras.layers import Dense, Input
|
|
from keras.api._v2.keras.models import Model
|
|
|
|
from feed_forward import load_test_data
|
|
|
|
|
|
F32 = 'float32'
|
|
|
|
|
|
def init_params(data: np.ndarray, shape: Sequence[int], dtype: Optional[str] = None) -> tf.Tensor:
|
|
"""
|
|
Helper function for initializing layer parameters with specific data.
|
|
"""
|
|
shape = tuple(shape)
|
|
assert shape == data.shape, f"{shape} != {data.shape}"
|
|
assert dtype == data.dtype, f"{dtype} != {data.dtype}"
|
|
return tf.convert_to_tensor(data)
|
|
|
|
|
|
def simple_layer_test() -> None:
|
|
"""
|
|
This is just to get a feel for the object involved.
|
|
"""
|
|
inputs = Input(shape=(3, ))
|
|
|
|
w_data = np.array([[1.0, -0.5, 0.0],
|
|
[3.0, 2.0, -5.0]], dtype=F32)
|
|
b_data = np.array([-1.0, 2.0], dtype=F32)
|
|
w_init = partial(init_params, w_data.T)
|
|
b_init = partial(init_params, b_data.T)
|
|
|
|
class Const(Constraint):
|
|
def __init__(self, zero_mask: np.ndarray) -> None:
|
|
self.mask = zero_mask
|
|
|
|
def __call__(self, weights: ResourceVariable) -> ResourceVariable:
|
|
weights.assign(weights - self.mask * weights)
|
|
return weights
|
|
|
|
layer = Dense(units=2, activation='sigmoid', kernel_initializer=w_init, bias_initializer=b_init, kernel_constraint=Const(w_data.T == 0))(inputs)
|
|
assert isinstance(layer, KerasTensor)
|
|
model = Model(inputs=inputs, outputs=layer)
|
|
w_tensor = model.trainable_variables[0]
|
|
b_tensor = model.trainable_variables[1]
|
|
assert isinstance(w_tensor, ResourceVariable)
|
|
assert isinstance(b_tensor, ResourceVariable)
|
|
assert np.equal(w_tensor.numpy().T, w_data).all()
|
|
assert np.equal(b_tensor.numpy().T, b_data).all()
|
|
model.compile(optimizer='adam', loss='categorical_crossentropy')
|
|
x = np.array([[1.0, -2.0, 0.2]], dtype=F32)
|
|
print("input", x[0])
|
|
y = model(x)
|
|
assert isinstance(y, tf.Tensor)
|
|
print("output", np.array(y)[0])
|
|
assert y[0][1] == 0.5
|
|
samples = np.array([[1., 1., 1.], [2., 2., 2.], [3., 3., 3.]], dtype=F32)
|
|
labels = np.array([[0., 1.], [0., 2.], [3., 0.]], dtype=F32)
|
|
|
|
class CB(Callback):
|
|
def on_train_batch_begin(self, batch, logs=None):
|
|
print(f"...start of batch {batch}; model weights:")
|
|
print(self.model.trainable_variables[0].numpy())
|
|
model.fit(samples, labels, batch_size=1, callbacks=[CB()], verbose=0)
|
|
|
|
|
|
def build_model(input_shape: Sequence[int], *layers: tuple[np.ndarray, np.ndarray]) -> Model:
|
|
"""
|
|
Takes loaded test data and constructs a finished model from it.
|
|
"""
|
|
inputs = Input(shape=input_shape)
|
|
layer = inputs
|
|
for i, (weights, biases) in enumerate(layers, start=1):
|
|
layer = Dense(
|
|
name=f"layer_{i}",
|
|
units=len(biases),
|
|
activation='sigmoid',
|
|
kernel_initializer=partial(init_params, weights.T),
|
|
bias_initializer=partial(init_params, biases.T)
|
|
)(layer)
|
|
model = Model(inputs=inputs, outputs=layer)
|
|
model.compile()
|
|
return model
|
|
|
|
|
|
def setup() -> tuple[tf.Tensor, Model]:
|
|
"""
|
|
Loads test data, builds a model from it, converts inputs to a `tf.Tensor`.
|
|
"""
|
|
|
|
# Tensorflow logs:
|
|
# tf.debugging.set_log_device_placement(True)
|
|
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
|
|
|
|
# Shut up numpy warning:
|
|
warnings.filterwarnings('ignore', 'elementwise comparison failed', FutureWarning, module='numpy')
|
|
|
|
inp, layers = load_test_data()
|
|
model = build_model(inp.shape, *layers)
|
|
return tf.convert_to_tensor(np.expand_dims(inp, axis=0)), model
|
|
|
|
|
|
def feed_forward_naive(x: np.ndarray, model: Model) -> tf.Tensor:
|
|
return model.call(x)
|
|
|
|
|
|
def layer_func(x: tf.Tensor, layer: Dense) -> tf.Tensor:
|
|
return sigmoid(bias_add(mat_mul(x, layer.kernel), layer.bias))
|
|
|
|
|
|
def feed_forward(x: np.ndarray, model: Model) -> tf.Tensor:
|
|
# Tried to optimize by skipping a bunch of Python code in the standard `call()` method.
|
|
# Made no noticeable difference in execution time.
|
|
# Probably the bottleneck is actually the data transfer back and forth between the host and the device (GPU).
|
|
for layer in model.layers[1:]:
|
|
x = layer_func(x, layer)
|
|
return x
|
|
|
|
|
|
# To test if execution times change at all:
|
|
# feed_forward = feed_forward_naive
|
|
|
|
|
|
def main(n: int) -> None:
|
|
t = timeit(
|
|
stmt='feed_forward(inp, model)',
|
|
setup='from __main__ import feed_forward, setup; '
|
|
'inp, model = setup()',
|
|
number=n
|
|
)
|
|
print(round(t, 5))
|
|
|
|
|
|
if __name__ == '__main__':
|
|
simple_layer_test()
|
|
# main(int(sys.argv[1]))
|