ccaptchas/src/ccaptchas/preprocess.py

222 lines
10 KiB
Python

import logging
from pathlib import Path
from typing import Iterable
import numpy as np
import tensorflow as tf
from .config import CONFIG
from .keras.layers import StringLookup
from .types import PathT, SampleEncFuncT, ImgT
log = logging.getLogger(__name__)
UTF8 = 'UTF-8'
IMG_DECODE_MAP = {
CONFIG.EXT_PNG: tf.image.decode_png,
CONFIG.EXT_JPG: tf.image.decode_jpeg,
}
def find_image_files(data_dir: PathT, file_ext: Iterable[str] = CONFIG.DEFAULT_IMG_FILE_EXT) -> list[Path]:
data_dir = Path(data_dir)
if not data_dir.is_dir():
raise NotADirectoryError
log.debug("Finding labeled image files in directory '%s'", str(data_dir))
img_paths = []
for ext in file_ext:
if not ext.startswith('.'):
ext = f'.{ext}'
img_paths.extend(path for path in data_dir.glob(f'*{ext}') if path.is_file())
log.info("Found %d image files", len(img_paths))
return img_paths
def get_all_characters(img_paths: Iterable[Path]) -> str:
characters = set()
for path in img_paths:
characters.update(path.stem)
characters = ''.join(characters)
log.info("Identified %d distinct characters in the file labels: '%s'", len(characters), characters)
return characters
def get_lookup_table(vocabulary: Iterable[str], invert: bool = False, **kwargs) -> StringLookup:
"""
Constructs a string lookup table mapping characters to integers or vice-versa.
Details about the `StringLookup` class in the documentation:
https://keras.io/api/layers/preprocessing_layers/categorical/string_lookup/
Args:
vocabulary:
An iterable of strings representing the vocabulary to be mapped
invert (optional):
If `True`, a backward lookup table is returned, which maps indices to characters. Otherwise a forward
lookup table is returned mapping characters to indices. Defaults to `False`.
**kwargs (optional):
Other keyword arguments to pass into the `StringLookup` constructors.
Defaults for `num_oov_indices` and `mask_token` are defined in the package config.
Returns:
`StringLookup` object with the specified properties.
"""
kwargs.setdefault('num_oov_indices', CONFIG.DEFAULT_NUM_OOV_INDICES)
kwargs.setdefault('mask_token', CONFIG.DEFAULT_MASK_TOKEN)
if isinstance(vocabulary, str):
vocabulary = list(vocabulary)
return StringLookup(vocabulary=vocabulary, invert=invert, **kwargs)
def get_vocab_maps(characters: str, **kwargs) -> tuple[StringLookup, StringLookup]:
"""
Constructs two table-based lookup objects that map characters to integers and back.
See `get_lookup_table` for details.
Args:
characters:
A string of all characters in the vocabulary to be mapped; the characters should all be distinct.
**kwargs (optional):
Keyword arguments to pass into both `StringLookup` constructors.
Must not contain the `invert` and `vocabulary` keywords.
Defaults for `num_oov_indices` and `mask_token` are defined in the package config.
Returns:
2-tuple of `StringLookup` objects, the first one mapping characters to integers, the second doing the reverse.
"""
char_to_int = get_lookup_table(characters, invert=False, **kwargs)
int_to_char = get_lookup_table(char_to_int.get_vocabulary(), invert=True, **kwargs)
log.info("Constructed vocabulary lookup tables")
return char_to_int, int_to_char
def split_data(img_paths: Iterable[Path], validation_ratio: float = CONFIG.DEFAULT_VALIDATION_RATIO,
shuffle: bool = CONFIG.DEFAULT_SHUFFLE_DATA) -> tuple[np.ndarray, np.ndarray]:
"""
Splits an iterable of image paths into two arrays of training and validation data.
Args:
img_paths:
Iterable of paths to the image files to be used for training and validation.
validation_ratio:
Float between 0 and 1 determining what ratio of the full dataset will be used for validation;
this implies that (1 - `validation_ratio`) will be the ratio used for training.
shuffle:
If True, the full dataset is shuffled pseudo-randomly before being split.
Returns:
2-tuple of 2D numpy arrays, the first representing the training data and the second representing the validation
data. The first dimension of each array is the dataset dimension (i.e. the samples) and the second contains the
path (as a string) at index 0 and the label for each image at index 1.
"""
if not 0 < validation_ratio < 1:
raise ValueError
paths_and_labels = np.array(tuple((str(path), path.stem) for path in img_paths))
# 1. Get the total size of the dataset
size = len(paths_and_labels)
cutoff = int(size * (1 - validation_ratio))
# 2. Make an indices array and shuffle it, if required
indices = np.arange(size)
if shuffle:
np.random.shuffle(indices)
# 4. Split data into training and validation sets
training_data, validation_data = paths_and_labels[indices[:cutoff]], paths_and_labels[indices[cutoff:]]
log.info("Split data into %d images for training and %d for validation", len(training_data), len(validation_data))
return training_data, validation_data
def process_image(img: ImgT, img_width: int = CONFIG.DEFAULT_IMG_WIDTH,
img_height: int = CONFIG.DEFAULT_IMG_HEIGHT) -> tf.Tensor:
# 0. Read image
if isinstance(img, (str, Path)):
img = tf.io.read_file(str(img))
# 1. Decode and convert to grayscale
img = tf.io.decode_image(img, channels=1, expand_animations=False)
# img = tf.io.decode_jpeg(img, channels=1)
# 2. Convert to float32 in [0, 1] range
img = tf.image.convert_image_dtype(img, tf.float32)
# 3. Resize to the desired size
img = tf.image.resize(img, [img_height, img_width])
# 4. Transpose the image because we want the time
# dimension to correspond to the width of the image.
return tf.transpose(img, perm=[1, 0, 2])
def encode_label(label: str, forward_lookup: StringLookup):
"""
Creates a `Tensor` object from a label string by passing passing its characters through a `StringLookup` instance.
"""
return forward_lookup(tf.strings.unicode_split(label, input_encoding=UTF8))
def decode_label(tensor: tf.Tensor, backward_lookup: StringLookup) -> str:
return tf.strings.reduce_join(backward_lookup(tensor)).numpy().decode(UTF8)
def get_sample_encode_func(forward_lookup: StringLookup, img_width: int = CONFIG.DEFAULT_IMG_WIDTH,
img_height: int = CONFIG.DEFAULT_IMG_HEIGHT) -> SampleEncFuncT:
def encode_sample(img_path: PathT, label: str) -> dict[str, tf.Tensor]:
log.debug("Encoding image '%s'", str(img_path))
img = process_image(tf.io.read_file(img_path), img_width=img_width, img_height=img_height)
label = encode_label(label, forward_lookup)
# Return a dict as our model is expecting two inputs
return {CONFIG.LAYER_NAME_INPUT_IMAGE: img, CONFIG.LAYER_NAME_INPUT_LABEL: label}
return encode_sample
def make_dataset(data: np.ndarray, sample_encode_func: SampleEncFuncT, batch_size: int) -> tf.data.Dataset:
"""
Generates a `Dataset` instance from an array of image file paths and an array of corresponding labels.
Args:
data:
A 2D numpy array representing the data and labels to turn into a dataset for training/validation.
The first dimension of the array is the dataset dimension (i.e. the samples) and the second contains the
path (as a string) at index 0 and the label for each image at index 1.
Each path-label-pair will be passed into the `sample_encode_func` during construction of the dataset as
the only two positional arguments.
sample_encode_func:
Will be passed as the `map_func` argument into the `map(...)` method of the new `Dataset`;
should be a function taking two strings (image path and label) as arguments and
returning a dictionary of Tensors representing the image and label.
batch_size:
Will be passed as the `batch_size` argument into the `batch(...)` method of the new `Dataset`;
determines how the dataset will be divided into batches.
Returns:
A `Dataset` ready to be fed into a model's `fit(...)` method, provided that model has two input layers
named in accordance with the keys of the dictionary returned by the `sample_encode_func` function.
"""
log.info("Constructing dataset of %d samples, split into batches of %d", len(data), batch_size)
dataset = tf.data.Dataset.from_tensor_slices((data[:, 0], data[:, 1]))
dataset = dataset.map(
map_func=sample_encode_func,
num_parallel_calls=tf.data.experimental.AUTOTUNE
).batch(
batch_size=batch_size
).prefetch(
buffer_size=tf.data.experimental.AUTOTUNE
)
return dataset
def load_datasets(data_dir: PathT,
file_ext: Iterable[str] = CONFIG.DEFAULT_IMG_FILE_EXT,
batch_size: int = CONFIG.DEFAULT_BATCH_SIZE,
validation_ratio: float = CONFIG.DEFAULT_VALIDATION_RATIO,
shuffle: bool = CONFIG.DEFAULT_SHUFFLE_DATA,
img_width: int = CONFIG.DEFAULT_IMG_WIDTH,
img_height: int = CONFIG.DEFAULT_IMG_HEIGHT) -> tuple[tf.data.Dataset, tf.data.Dataset, str]:
log.info("Constructing datasets")
img_paths = find_image_files(data_dir, file_ext=file_ext)
characters = get_all_characters(img_paths)
forward_lookup, _ = get_vocab_maps(characters)
arr_train, arr_valid = split_data(img_paths, validation_ratio=validation_ratio, shuffle=shuffle)
encode_func = get_sample_encode_func(forward_lookup, img_width=img_width, img_height=img_height)
ds_train = make_dataset(arr_train, encode_func, batch_size=batch_size)
ds_valid = make_dataset(arr_valid, encode_func, batch_size=batch_size)
assert characters == ''.join(forward_lookup.get_vocabulary())
return ds_train, ds_valid, characters