generated from daniil-berg/boilerplate-py
222 lines
10 KiB
Python
222 lines
10 KiB
Python
import logging
|
|
from pathlib import Path
|
|
from typing import Iterable
|
|
|
|
import numpy as np
|
|
import tensorflow as tf
|
|
|
|
from .config import CONFIG
|
|
from .keras.layers import StringLookup
|
|
from .types import PathT, SampleEncFuncT, ImgT
|
|
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
UTF8 = 'UTF-8'
|
|
IMG_DECODE_MAP = {
|
|
CONFIG.EXT_PNG: tf.image.decode_png,
|
|
CONFIG.EXT_JPG: tf.image.decode_jpeg,
|
|
}
|
|
|
|
|
|
def find_image_files(data_dir: PathT, file_ext: Iterable[str] = CONFIG.DEFAULT_IMG_FILE_EXT) -> list[Path]:
|
|
data_dir = Path(data_dir)
|
|
if not data_dir.is_dir():
|
|
raise NotADirectoryError
|
|
log.debug("Finding labeled image files in directory '%s'", str(data_dir))
|
|
img_paths = []
|
|
for ext in file_ext:
|
|
if not ext.startswith('.'):
|
|
ext = f'.{ext}'
|
|
img_paths.extend(path for path in data_dir.glob(f'*{ext}') if path.is_file())
|
|
log.info("Found %d image files", len(img_paths))
|
|
return img_paths
|
|
|
|
|
|
def get_all_characters(img_paths: Iterable[Path]) -> str:
|
|
characters = set()
|
|
for path in img_paths:
|
|
characters.update(path.stem)
|
|
characters = ''.join(characters)
|
|
log.info("Identified %d distinct characters in the file labels: '%s'", len(characters), characters)
|
|
return characters
|
|
|
|
|
|
def get_lookup_table(vocabulary: Iterable[str], invert: bool = False, **kwargs) -> StringLookup:
|
|
"""
|
|
Constructs a string lookup table mapping characters to integers or vice-versa.
|
|
|
|
Details about the `StringLookup` class in the documentation:
|
|
https://keras.io/api/layers/preprocessing_layers/categorical/string_lookup/
|
|
|
|
Args:
|
|
vocabulary:
|
|
An iterable of strings representing the vocabulary to be mapped
|
|
invert (optional):
|
|
If `True`, a backward lookup table is returned, which maps indices to characters. Otherwise a forward
|
|
lookup table is returned mapping characters to indices. Defaults to `False`.
|
|
**kwargs (optional):
|
|
Other keyword arguments to pass into the `StringLookup` constructors.
|
|
Defaults for `num_oov_indices` and `mask_token` are defined in the package config.
|
|
|
|
Returns:
|
|
`StringLookup` object with the specified properties.
|
|
"""
|
|
kwargs.setdefault('num_oov_indices', CONFIG.DEFAULT_NUM_OOV_INDICES)
|
|
kwargs.setdefault('mask_token', CONFIG.DEFAULT_MASK_TOKEN)
|
|
if isinstance(vocabulary, str):
|
|
vocabulary = list(vocabulary)
|
|
return StringLookup(vocabulary=vocabulary, invert=invert, **kwargs)
|
|
|
|
|
|
def get_vocab_maps(characters: str, **kwargs) -> tuple[StringLookup, StringLookup]:
|
|
"""
|
|
Constructs two table-based lookup objects that map characters to integers and back.
|
|
|
|
See `get_lookup_table` for details.
|
|
|
|
Args:
|
|
characters:
|
|
A string of all characters in the vocabulary to be mapped; the characters should all be distinct.
|
|
**kwargs (optional):
|
|
Keyword arguments to pass into both `StringLookup` constructors.
|
|
Must not contain the `invert` and `vocabulary` keywords.
|
|
Defaults for `num_oov_indices` and `mask_token` are defined in the package config.
|
|
|
|
Returns:
|
|
2-tuple of `StringLookup` objects, the first one mapping characters to integers, the second doing the reverse.
|
|
"""
|
|
char_to_int = get_lookup_table(characters, invert=False, **kwargs)
|
|
int_to_char = get_lookup_table(char_to_int.get_vocabulary(), invert=True, **kwargs)
|
|
log.info("Constructed vocabulary lookup tables")
|
|
return char_to_int, int_to_char
|
|
|
|
|
|
def split_data(img_paths: Iterable[Path], validation_ratio: float = CONFIG.DEFAULT_VALIDATION_RATIO,
|
|
shuffle: bool = CONFIG.DEFAULT_SHUFFLE_DATA) -> tuple[np.ndarray, np.ndarray]:
|
|
"""
|
|
Splits an iterable of image paths into two arrays of training and validation data.
|
|
|
|
Args:
|
|
img_paths:
|
|
Iterable of paths to the image files to be used for training and validation.
|
|
validation_ratio:
|
|
Float between 0 and 1 determining what ratio of the full dataset will be used for validation;
|
|
this implies that (1 - `validation_ratio`) will be the ratio used for training.
|
|
shuffle:
|
|
If True, the full dataset is shuffled pseudo-randomly before being split.
|
|
|
|
Returns:
|
|
2-tuple of 2D numpy arrays, the first representing the training data and the second representing the validation
|
|
data. The first dimension of each array is the dataset dimension (i.e. the samples) and the second contains the
|
|
path (as a string) at index 0 and the label for each image at index 1.
|
|
"""
|
|
if not 0 < validation_ratio < 1:
|
|
raise ValueError
|
|
paths_and_labels = np.array(tuple((str(path), path.stem) for path in img_paths))
|
|
# 1. Get the total size of the dataset
|
|
size = len(paths_and_labels)
|
|
cutoff = int(size * (1 - validation_ratio))
|
|
# 2. Make an indices array and shuffle it, if required
|
|
indices = np.arange(size)
|
|
if shuffle:
|
|
np.random.shuffle(indices)
|
|
# 4. Split data into training and validation sets
|
|
training_data, validation_data = paths_and_labels[indices[:cutoff]], paths_and_labels[indices[cutoff:]]
|
|
log.info("Split data into %d images for training and %d for validation", len(training_data), len(validation_data))
|
|
return training_data, validation_data
|
|
|
|
|
|
def process_image(img: ImgT, img_width: int = CONFIG.DEFAULT_IMG_WIDTH,
|
|
img_height: int = CONFIG.DEFAULT_IMG_HEIGHT) -> tf.Tensor:
|
|
# 0. Read image
|
|
if isinstance(img, (str, Path)):
|
|
img = tf.io.read_file(str(img))
|
|
# 1. Decode and convert to grayscale
|
|
img = tf.io.decode_image(img, channels=1, expand_animations=False)
|
|
# img = tf.io.decode_jpeg(img, channels=1)
|
|
# 2. Convert to float32 in [0, 1] range
|
|
img = tf.image.convert_image_dtype(img, tf.float32)
|
|
# 3. Resize to the desired size
|
|
img = tf.image.resize(img, [img_height, img_width])
|
|
# 4. Transpose the image because we want the time
|
|
# dimension to correspond to the width of the image.
|
|
return tf.transpose(img, perm=[1, 0, 2])
|
|
|
|
|
|
def encode_label(label: str, forward_lookup: StringLookup):
|
|
"""
|
|
Creates a `Tensor` object from a label string by passing passing its characters through a `StringLookup` instance.
|
|
"""
|
|
return forward_lookup(tf.strings.unicode_split(label, input_encoding=UTF8))
|
|
|
|
|
|
def decode_label(tensor: tf.Tensor, backward_lookup: StringLookup) -> str:
|
|
return tf.strings.reduce_join(backward_lookup(tensor)).numpy().decode(UTF8)
|
|
|
|
|
|
def get_sample_encode_func(forward_lookup: StringLookup, img_width: int = CONFIG.DEFAULT_IMG_WIDTH,
|
|
img_height: int = CONFIG.DEFAULT_IMG_HEIGHT) -> SampleEncFuncT:
|
|
def encode_sample(img_path: PathT, label: str) -> dict[str, tf.Tensor]:
|
|
log.debug("Encoding image '%s'", str(img_path))
|
|
img = process_image(tf.io.read_file(img_path), img_width=img_width, img_height=img_height)
|
|
label = encode_label(label, forward_lookup)
|
|
# Return a dict as our model is expecting two inputs
|
|
return {CONFIG.LAYER_NAME_INPUT_IMAGE: img, CONFIG.LAYER_NAME_INPUT_LABEL: label}
|
|
return encode_sample
|
|
|
|
|
|
def make_dataset(data: np.ndarray, sample_encode_func: SampleEncFuncT, batch_size: int) -> tf.data.Dataset:
|
|
"""
|
|
Generates a `Dataset` instance from an array of image file paths and an array of corresponding labels.
|
|
|
|
Args:
|
|
data:
|
|
A 2D numpy array representing the data and labels to turn into a dataset for training/validation.
|
|
The first dimension of the array is the dataset dimension (i.e. the samples) and the second contains the
|
|
path (as a string) at index 0 and the label for each image at index 1.
|
|
Each path-label-pair will be passed into the `sample_encode_func` during construction of the dataset as
|
|
the only two positional arguments.
|
|
sample_encode_func:
|
|
Will be passed as the `map_func` argument into the `map(...)` method of the new `Dataset`;
|
|
should be a function taking two strings (image path and label) as arguments and
|
|
returning a dictionary of Tensors representing the image and label.
|
|
batch_size:
|
|
Will be passed as the `batch_size` argument into the `batch(...)` method of the new `Dataset`;
|
|
determines how the dataset will be divided into batches.
|
|
|
|
Returns:
|
|
A `Dataset` ready to be fed into a model's `fit(...)` method, provided that model has two input layers
|
|
named in accordance with the keys of the dictionary returned by the `sample_encode_func` function.
|
|
"""
|
|
log.info("Constructing dataset of %d samples, split into batches of %d", len(data), batch_size)
|
|
dataset = tf.data.Dataset.from_tensor_slices((data[:, 0], data[:, 1]))
|
|
dataset = dataset.map(
|
|
map_func=sample_encode_func,
|
|
num_parallel_calls=tf.data.experimental.AUTOTUNE
|
|
).batch(
|
|
batch_size=batch_size
|
|
).prefetch(
|
|
buffer_size=tf.data.experimental.AUTOTUNE
|
|
)
|
|
return dataset
|
|
|
|
|
|
def load_datasets(data_dir: PathT,
|
|
file_ext: Iterable[str] = CONFIG.DEFAULT_IMG_FILE_EXT,
|
|
batch_size: int = CONFIG.DEFAULT_BATCH_SIZE,
|
|
validation_ratio: float = CONFIG.DEFAULT_VALIDATION_RATIO,
|
|
shuffle: bool = CONFIG.DEFAULT_SHUFFLE_DATA,
|
|
img_width: int = CONFIG.DEFAULT_IMG_WIDTH,
|
|
img_height: int = CONFIG.DEFAULT_IMG_HEIGHT) -> tuple[tf.data.Dataset, tf.data.Dataset, str]:
|
|
log.info("Constructing datasets")
|
|
img_paths = find_image_files(data_dir, file_ext=file_ext)
|
|
characters = get_all_characters(img_paths)
|
|
forward_lookup, _ = get_vocab_maps(characters)
|
|
arr_train, arr_valid = split_data(img_paths, validation_ratio=validation_ratio, shuffle=shuffle)
|
|
encode_func = get_sample_encode_func(forward_lookup, img_width=img_width, img_height=img_height)
|
|
ds_train = make_dataset(arr_train, encode_func, batch_size=batch_size)
|
|
ds_valid = make_dataset(arr_valid, encode_func, batch_size=batch_size)
|
|
assert characters == ''.join(forward_lookup.get_vocabulary())
|
|
return ds_train, ds_valid, characters
|