ff-performance-tests/py/feed_forward_tf.py

151 lines
5.0 KiB
Python

import os
import sys
import warnings
from functools import partial
from timeit import timeit
from typing import Optional, Sequence
import numpy as np
import tensorflow as tf
from tensorflow.python.ops.gen_math_ops import mat_mul, sigmoid
from tensorflow.python.ops.gen_nn_ops import bias_add
from tensorflow.python.ops.resource_variable_ops import ResourceVariable
from keras.engine.keras_tensor import KerasTensor
from keras.api._v2.keras.callbacks import Callback
from keras.api._v2.keras.constraints import Constraint
from keras.api._v2.keras.layers import Dense, Input
from keras.api._v2.keras.models import Model
from feed_forward import load_test_data
F32 = 'float32'
def init_params(data: np.ndarray, shape: Sequence[int], dtype: Optional[str] = None) -> tf.Tensor:
"""
Helper function for initializing layer parameters with specific data.
"""
shape = tuple(shape)
assert shape == data.shape, f"{shape} != {data.shape}"
assert dtype == data.dtype, f"{dtype} != {data.dtype}"
return tf.convert_to_tensor(data)
def simple_layer_test() -> None:
"""
This is just to get a feel for the object involved.
"""
inputs = Input(shape=(3, ))
w_data = np.array([[1.0, -0.5, 0.0],
[3.0, 2.0, -5.0]], dtype=F32)
b_data = np.array([-1.0, 2.0], dtype=F32)
w_init = partial(init_params, w_data.T)
b_init = partial(init_params, b_data.T)
class Const(Constraint):
def __init__(self, zero_mask: np.ndarray) -> None:
self.mask = zero_mask
def __call__(self, weights: ResourceVariable) -> ResourceVariable:
weights.assign(weights - self.mask * weights)
return weights
layer = Dense(units=2, activation='sigmoid', kernel_initializer=w_init, bias_initializer=b_init, kernel_constraint=Const(w_data.T == 0))(inputs)
assert isinstance(layer, KerasTensor)
model = Model(inputs=inputs, outputs=layer)
w_tensor = model.trainable_variables[0]
b_tensor = model.trainable_variables[1]
assert isinstance(w_tensor, ResourceVariable)
assert isinstance(b_tensor, ResourceVariable)
assert np.equal(w_tensor.numpy().T, w_data).all()
assert np.equal(b_tensor.numpy().T, b_data).all()
model.compile(optimizer='adam', loss='categorical_crossentropy')
x = np.array([[1.0, -2.0, 0.2]], dtype=F32)
print("input", x[0])
y = model(x)
assert isinstance(y, tf.Tensor)
print("output", np.array(y)[0])
assert y[0][1] == 0.5
samples = np.array([[1., 1., 1.], [2., 2., 2.], [3., 3., 3.]], dtype=F32)
labels = np.array([[0., 1.], [0., 2.], [3., 0.]], dtype=F32)
class CB(Callback):
def on_train_batch_begin(self, batch, logs=None):
print(f"...start of batch {batch}; model weights:")
print(self.model.trainable_variables[0].numpy())
model.fit(samples, labels, batch_size=1, callbacks=[CB()], verbose=0)
def build_model(input_shape: Sequence[int], *layers: tuple[np.ndarray, np.ndarray]) -> Model:
"""
Takes loaded test data and constructs a finished model from it.
"""
inputs = Input(shape=input_shape)
layer = inputs
for i, (weights, biases) in enumerate(layers, start=1):
layer = Dense(
name=f"layer_{i}",
units=len(biases),
activation='sigmoid',
kernel_initializer=partial(init_params, weights.T),
bias_initializer=partial(init_params, biases.T)
)(layer)
model = Model(inputs=inputs, outputs=layer)
model.compile()
return model
def setup() -> tuple[tf.Tensor, Model]:
"""
Loads test data, builds a model from it, converts inputs to a `tf.Tensor`.
"""
# Tensorflow logs:
# tf.debugging.set_log_device_placement(True)
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
# Shut up numpy warning:
warnings.filterwarnings('ignore', 'elementwise comparison failed', FutureWarning, module='numpy')
inp, layers = load_test_data()
model = build_model(inp.shape, *layers)
return tf.convert_to_tensor(np.expand_dims(inp, axis=0)), model
def feed_forward_naive(x: np.ndarray, model: Model) -> tf.Tensor:
return model.call(x)
def layer_func(x: tf.Tensor, layer: Dense) -> tf.Tensor:
return sigmoid(bias_add(mat_mul(x, layer.kernel), layer.bias))
def feed_forward(x: np.ndarray, model: Model) -> tf.Tensor:
# Tried to optimize by skipping a bunch of Python code in the standard `call()` method.
# Made no noticeable difference in execution time.
# Probably the bottleneck is actually the data transfer back and forth between the host and the device (GPU).
for layer in model.layers[1:]:
x = layer_func(x, layer)
return x
# To test if execution times change at all:
# feed_forward = feed_forward_naive
def main(n: int) -> None:
t = timeit(
stmt='feed_forward(inp, model)',
setup='from __main__ import feed_forward, setup; '
'inp, model = setup()',
number=n
)
print(round(t, 5))
if __name__ == '__main__':
simple_layer_test()
# main(int(sys.argv[1]))