ccaptchas/src/ccaptchas/preproc.py

331 lines
15 KiB
Python

import os
import shutil
from pathlib import Path
from typing import Union, Mapping, Sequence, Iterable, Callable
import matplotlib.pyplot as plt
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers
from .config import CONFIG
from .types import PathT
def label_files(src_dir: PathT, dest_dir: PathT, labels: Union[PathT, Sequence[str]],
reverse: bool = False, extensions: Iterable[str] = None) -> None:
"""
Copies files giving them new names by using specified labels.
All matching files are sorted by their file name before applying the sequence of labels to them.
The first file is named with the first label, the second is named with the second label, and so on.
If a label duplicate is encountered, a dot followed by a counter is appended to the file name
*preceding* the file extension, e.g. if 'some_label.jpg' exists, 'some_label.1.jpg' may be created.
The number of matching files must be greater than or equal to the number of labels.
Exactly one file is copied for every label; thus, after every label has been used, the operation ends.
Args:
src_dir:
Path to directory containing the files to be copied/renamed
dest_dir:
Path to destination directory
labels:
Either a sequence of labels (strings) or a path to a files containing the labels (newline separated)
reverse (optional):
Defines which file receives which label;
if False (default), the files in `img_dir` are sorted ascending by their file name,
if True, the files are sorted descending by name.
extensions (optional):
Iterable of file extensions; only files with these extensions will be considered.
"""
extensions = '' if extensions is None else tuple(extensions)
file_names = [name for name in os.listdir(src_dir) if name.endswith(extensions)]
file_names.sort(reverse=reverse)
try:
with open(labels, 'r') as f:
labels = f.read().strip().split('\n')
except TypeError:
pass # Assume, labels is already a sequence of strings
if not os.path.isdir(dest_dir):
raise NotADirectoryError(f"'{dest_dir}' is not a directory.")
if len(labels) > len(file_names):
raise IndexError(f"There are more labels ({len(labels)}) than files "
f"in the source directory ({len(file_names)} matching).")
for idx, label in enumerate(labels):
file_name = file_names[idx]
_, ext = os.path.splitext(file_name)
while True:
new_path = os.path.join(dest_dir, label + ext)
if not os.path.exists(new_path):
shutil.copyfile(os.path.join(src_dir, file_name), new_path)
break
pre_label, n = os.path.splitext(label)
try:
n = int(n[1:])
except ValueError:
label = label + '.1'
else:
label = pre_label + '.' + str(n + 1)
def load_images_data(data_dir: PathT, extensions: Iterable[str] = ('.jpg', '.png'), verbose: bool = True
) -> tuple[dict[str, str], str]:
"""
Creates a dictionary mapping file paths (of images) to their labels.
Everything up to the first dot in the filename is taken to be the label;
this naturally excludes file extensions, but also a filename like `ABC.1.jpg` results in the label `ABC`.
Also creates a vocabulary of characters encountered in the file names.
Args:
data_dir:
Path-like object or string to a directory containing the desired image files
extensions (optional):
Iterable of extensions that the files considered for the resulting data should be restricted to;
defaults to restricting finds to JPEG and PNG files.
verbose (optional):
If True, the function will print out a summary of the findings before returning.
Returns:
2-tuple with the first element being a dictionary where the keys are the file paths and the values are the
file names (i.e. image labels) and the second element being a string of all characters present in the labels.
"""
data_dir = Path(data_dir)
file_paths_and_labels, characters = {}, set()
for file_path in data_dir.iterdir():
if file_path.suffix not in extensions:
continue
label = file_path.name.split('.')[0]
for char in label:
characters.add(char)
file_paths_and_labels[str(file_path)] = label
if verbose:
print("Number of images/labels found: ", len(file_paths_and_labels))
print("Number of unique characters: ", len(characters))
print("Characters present: ", characters)
return file_paths_and_labels, ''.join(characters)
def get_vocab_maps(characters: Iterable[str], num_oov_indices: int = 0, mask_token: str = None
) -> tuple[layers.StringLookup, layers.StringLookup]:
"""
Constructs two table-based lookup objects that map characters to integers and back.
Details about the `StringLookup` class in the documentation:
https://tensorflow.org/api_docs/python/tf/keras/layers/experimental/preprocessing/StringLookup
Args:
characters:
An iterable of strings representing the vocabulary to be mapped
num_oov_indices (optional):
Passed to the `IndexLookup` constructor;
defines the number of out-of-vocabulary (OOV) tokens to create;
assuming that no OOV characters will be encountered, the default is 0.
mask_token (optional):
Passed to the `IndexLookup` constructor;
the token representing missing values;
assuming that there will never be a value missing, the default is None.
Returns:
2-tuple of `StringLookup` objects, the first one mapping characters to integers, the second doing the reverse.
By default, no OOV or missing values are assumed to be encountered,
and thus each index (uniquely) represents a character from the vocabulary.
"""
char_to_int = layers.StringLookup(
vocabulary=list(characters),
num_oov_indices=num_oov_indices,
mask_token=mask_token,
)
int_to_char = layers.StringLookup(
vocabulary=char_to_int.get_vocabulary(),
mask_token=mask_token,
invert=True,
)
return char_to_int, int_to_char
def encode_image(img):
"""
Creates a `Tensor` object from an image file and transposes it.
"""
try:
# 0. Read image
img = tf.io.read_file(str(img))
except ValueError:
pass
# 1. Decode and convert to grayscale
img = tf.io.decode_png(img, channels=1)
# 2. Convert to float32 in [0, 1] range
img = tf.image.convert_image_dtype(img, tf.float32)
# 3. Resize to the desired size
img = tf.image.resize(img, [CONFIG.IMG_HEIGHT, CONFIG.IMG_WIDTH])
# 4. Transpose the image because we want the time
# dimension to correspond to the width of the image.
return tf.transpose(img, perm=[1, 0, 2])
def encode_label(label: str, forward_lookup_table: layers.StringLookup):
"""
Creates a `Tensor` object from a label string by passing passing its characters through a `StringLookup` instance.
"""
return forward_lookup_table(tf.strings.unicode_split(label, input_encoding='UTF-8'))
def decode_label(tensor, backward_lookup_table: layers.StringLookup) -> str:
return tf.strings.reduce_join(backward_lookup_table(tensor)).numpy().decode('utf-8')
def get_sample_encoder(forward_lookup_table: layers.StringLookup) -> Callable[[str, str], dict]:
"""
Returns a function for usage in the `map(...)` method of a `Dataset` instance.
The function will accept an image path and a label and return a dictionary;
the dictionary values will be a tensor representing the image and a tensor representing the label;
the keys for each are pre-configured and will correspond to the models input layers' names.
Args:
forward_lookup_table:
Passed to the `encode_label` function; required for mapping individual characters to floats.
Returns:
Callable taking two strings as arguments and returning a dictionary with string keys and Tensor values.
"""
def func(img_path: PathT, label: str) -> dict:
return {
CONFIG.INPUT_LAYER_NAME_IMAGE: encode_image(img_path),
CONFIG.INPUT_LAYER_NAME_LABEL: encode_label(label, forward_lookup_table)
}
return func
def make_dataset(file_paths: np.ndarray, labels: np.ndarray,
sample_encode_func: Callable[[str, str], dict], batch_size: int) -> tf.data.Dataset:
"""
Generates a `Dataset` instance from an array of image file paths and an array of corresponding labels.
Args:
file_paths:
Array of strings, each representing a path to an image file;
each of those paths will be passed into the function encoding one data sample (as the first argument).
labels:
Array of strings, each representing a label for an image pointed to by a file path
in the `file_paths` array with the corresponding index;
each of those labels will be passed into the function encoding one data sample (as the second argument).
sample_encode_func:
Will be passed as the `map_func` argument into the `map(...)` method of the new `Dataset`;
should be a function taking two strings (image path and label) as arguments and
returning a dictionary of Tensors representing the image and label.
batch_size:
Will be passed as the `batch_size` argument into the `batch(...)` method of the new `Dataset`;
determines how the dataset will be divided into batches.
Returns:
A `Dataset` ready to be fed into a model's `fit(...)` method, provided that model has two input layers
named in accordance with the keys of the dictionary returned by the `sample_encode_func` function.
"""
if file_paths.size != labels.size:
raise ValueError("Number of file paths must be equal to number of labels")
dataset = tf.data.Dataset.from_tensor_slices((file_paths, labels))
dataset = dataset.map(
map_func=sample_encode_func,
num_parallel_calls=tf.data.experimental.AUTOTUNE
).batch(
batch_size=batch_size
).prefetch(
buffer_size=tf.data.experimental.AUTOTUNE
)
return dataset
def get_datasets(file_paths_and_labels: Mapping[str, str], sample_encode_func: Callable[[str, str], dict],
batch_size: int, train_data_ratio: float, shuffle: bool = CONFIG.SHUFFLE_DATA
) -> tuple[tf.data.Dataset, tf.data.Dataset]:
"""
Creates a training dataset and a validation dataset from a mapping of image file paths to labels.
Args:
file_paths_and_labels:
Mapping with keys being image file paths and values being labels of the corresponding images;
this represents the full dataset used for fitting the model.
sample_encode_func:
Will be passed as the `sample_encode_func` argument into the `make_dataset(...)` function;
should be a function taking two strings (image path and label) as arguments and
returning a dictionary of Tensors representing the image and label.
batch_size:
Will be passed as the `batch_size` argument into the `make_dataset(...)` function;
determines how each dataset will be divided into batches.
train_data_ratio:
Floating point value between 0 and 1 determining what ratio of the full dataset will be used for training;
this implies that (1 - `train_data_ratio`) will be the ratio used for validation.
shuffle:
If True, the full dataset is shuffled pseudo-randomly before being split.
Returns:
Two `Dataset` objects ready to be fed into a model's `fit(...)` method, provided that model has two input layers
named in accordance with the keys of the dictionary returned by the `sample_encode_func` function.
"""
# 1. Get the total size of the dataset
size = len(file_paths_and_labels)
# 2. Make an indices array and shuffle it, if required
indices = np.arange(size)
if shuffle:
np.random.shuffle(indices)
# 3. Get the size of training samples; that will be the array cutoff index for the data-indices array
cutoff = int(size * train_data_ratio)
train_indices, valid_indices = indices[:cutoff], indices[cutoff:]
# 4. Split data into training and validation sets
file_paths, labels = np.array(list(file_paths_and_labels.keys())), np.array(list(file_paths_and_labels.values()))
x_train, x_valid = file_paths[train_indices], file_paths[valid_indices]
y_train, y_valid = labels[train_indices], labels[valid_indices]
# 5. Construct the actual Dataset-class objects
train_dataset = make_dataset(x_train, y_train, sample_encode_func, batch_size)
valid_dataset = make_dataset(x_valid, y_valid, sample_encode_func, batch_size)
return train_dataset, valid_dataset
def plot_images(images: Sequence[np.ndarray], labels: Sequence[str] = None, num_columns: int = 4,
transpose: bool = True) -> None:
if transpose:
images = tf.transpose(images, perm=[0, 2, 1, 3])
images = images[:, :, :, 0] * 255
images = images.numpy().astype('uint8')
num_rows = len(images) // num_columns or 1
_, axs = plt.subplots(num_rows, num_columns, figsize=(10, 5))
for idx, image in enumerate(images):
if num_rows == 1:
if num_columns == 1:
ax = axs
else:
ax = axs[idx // num_columns]
else:
ax = axs[idx // num_columns, idx % num_columns]
ax.imshow(image, cmap='gray')
if labels is not None:
ax.set_title(labels[idx])
ax.axis('off')
plt.show()
class DatasetsInterface:
"""
Convenience class for loading and pre-processing the training and validation data for usage with a model.
"""
def __init__(self, batch_size: int, data_dir: PathT,
extensions: Iterable[str] = CONFIG.DEFAULT_DATA_FILE_EXTENSIONS) -> None:
self.file_paths_and_labels, self.characters = load_images_data(data_dir=data_dir, extensions=extensions)
self.forward_lookup_table, self.backward_lookup_table = get_vocab_maps(characters=self.characters)
self.sample_encode_func = get_sample_encoder(forward_lookup_table=self.forward_lookup_table)
self.batch_size = batch_size
self.training, self.validation = None, None
def split_and_make_datasets(self, train_data_ratio: float = (1 - CONFIG.VALIDATION_DATA_RATIO),
shuffle: bool = CONFIG.SHUFFLE_DATA) -> None:
self.training, self.validation = get_datasets(file_paths_and_labels=self.file_paths_and_labels,
sample_encode_func=self.sample_encode_func,
batch_size=self.batch_size,
train_data_ratio=train_data_ratio,
shuffle=shuffle)