import os import sys import warnings from functools import partial from timeit import timeit from typing import Optional, Sequence import numpy as np import tensorflow as tf from tensorflow.python.ops.gen_math_ops import mat_mul, sigmoid from tensorflow.python.ops.gen_nn_ops import bias_add from tensorflow.python.ops.resource_variable_ops import ResourceVariable from keras.engine.keras_tensor import KerasTensor from keras.api._v2.keras.callbacks import Callback from keras.api._v2.keras.constraints import Constraint from keras.api._v2.keras.layers import Dense, Input from keras.api._v2.keras.models import Model from feed_forward import load_test_data F32 = 'float32' def init_params(data: np.ndarray, shape: Sequence[int], dtype: Optional[str] = None) -> tf.Tensor: """ Helper function for initializing layer parameters with specific data. """ shape = tuple(shape) assert shape == data.shape, f"{shape} != {data.shape}" assert dtype == data.dtype, f"{dtype} != {data.dtype}" return tf.convert_to_tensor(data) def simple_layer_test() -> None: """ This is just to get a feel for the object involved. """ inputs = Input(shape=(3, )) w_data = np.array([[1.0, -0.5, 0.0], [3.0, 2.0, -5.0]], dtype=F32) b_data = np.array([-1.0, 2.0], dtype=F32) w_init = partial(init_params, w_data.T) b_init = partial(init_params, b_data.T) class Const(Constraint): def __init__(self, zero_mask: np.ndarray) -> None: self.mask = zero_mask def __call__(self, weights: ResourceVariable) -> ResourceVariable: weights.assign(weights - self.mask * weights) return weights layer = Dense(units=2, activation='sigmoid', kernel_initializer=w_init, bias_initializer=b_init, kernel_constraint=Const(w_data.T == 0))(inputs) assert isinstance(layer, KerasTensor) model = Model(inputs=inputs, outputs=layer) w_tensor = model.trainable_variables[0] b_tensor = model.trainable_variables[1] assert isinstance(w_tensor, ResourceVariable) assert isinstance(b_tensor, ResourceVariable) assert np.equal(w_tensor.numpy().T, w_data).all() assert np.equal(b_tensor.numpy().T, b_data).all() model.compile(optimizer='adam', loss='categorical_crossentropy') x = np.array([[1.0, -2.0, 0.2]], dtype=F32) print("input", x[0]) y = model(x) assert isinstance(y, tf.Tensor) print("output", np.array(y)[0]) assert y[0][1] == 0.5 samples = np.array([[1., 1., 1.], [2., 2., 2.], [3., 3., 3.]], dtype=F32) labels = np.array([[0., 1.], [0., 2.], [3., 0.]], dtype=F32) class CB(Callback): def on_train_batch_begin(self, batch, logs=None): print(f"...start of batch {batch}; model weights:") print(self.model.trainable_variables[0].numpy()) model.fit(samples, labels, batch_size=1, callbacks=[CB()], verbose=0) def build_model(input_shape: Sequence[int], *layers: tuple[np.ndarray, np.ndarray]) -> Model: """ Takes loaded test data and constructs a finished model from it. """ inputs = Input(shape=input_shape) layer = inputs for i, (weights, biases) in enumerate(layers, start=1): layer = Dense( name=f"layer_{i}", units=len(biases), activation='sigmoid', kernel_initializer=partial(init_params, weights.T), bias_initializer=partial(init_params, biases.T) )(layer) model = Model(inputs=inputs, outputs=layer) model.compile() return model def setup() -> tuple[tf.Tensor, Model]: """ Loads test data, builds a model from it, converts inputs to a `tf.Tensor`. """ # Tensorflow logs: # tf.debugging.set_log_device_placement(True) os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2' # Shut up numpy warning: warnings.filterwarnings('ignore', 'elementwise comparison failed', FutureWarning, module='numpy') inp, layers = load_test_data() model = build_model(inp.shape, *layers) return tf.convert_to_tensor(np.expand_dims(inp, axis=0)), model def feed_forward_naive(x: np.ndarray, model: Model) -> tf.Tensor: return model.call(x) def layer_func(x: tf.Tensor, layer: Dense) -> tf.Tensor: return sigmoid(bias_add(mat_mul(x, layer.kernel), layer.bias)) def feed_forward(x: np.ndarray, model: Model) -> tf.Tensor: # Tried to optimize by skipping a bunch of Python code in the standard `call()` method. # Made no noticeable difference in execution time. # Probably the bottleneck is actually the data transfer back and forth between the host and the device (GPU). for layer in model.layers[1:]: x = layer_func(x, layer) return x # To test if execution times change at all: # feed_forward = feed_forward_naive def main(n: int) -> None: t = timeit( stmt='feed_forward(inp, model)', setup='from __main__ import feed_forward, setup; ' 'inp, model = setup()', number=n ) print(round(t, 5)) if __name__ == '__main__': simple_layer_test() # main(int(sys.argv[1]))