From be3aebaf0757e554ad5d3f148ff37ea50ef5c311 Mon Sep 17 00:00:00 2001 From: Daniil Fajnberg Date: Sun, 8 May 2022 20:53:05 +0200 Subject: [PATCH] initial --- requirements/common.txt | 3 + setup.cfg | 7 +- src/{package_name => ccaptchas}/__init__.py | 0 src/ccaptchas/__main__.py | 110 +++++++ src/ccaptchas/config.py | 20 ++ src/ccaptchas/infer.py | 63 ++++ src/ccaptchas/model.py | 178 +++++++++++ src/ccaptchas/preproc.py | 330 ++++++++++++++++++++ src/ccaptchas/types.py | 5 + 9 files changed, 713 insertions(+), 3 deletions(-) rename src/{package_name => ccaptchas}/__init__.py (100%) create mode 100644 src/ccaptchas/__main__.py create mode 100644 src/ccaptchas/config.py create mode 100644 src/ccaptchas/infer.py create mode 100644 src/ccaptchas/model.py create mode 100644 src/ccaptchas/preproc.py create mode 100644 src/ccaptchas/types.py diff --git a/requirements/common.txt b/requirements/common.txt index e69de29..6bb6c73 100644 --- a/requirements/common.txt +++ b/requirements/common.txt @@ -0,0 +1,3 @@ +tensorflow +numpy +matplotlib diff --git a/setup.cfg b/setup.cfg index 67201f9..80acfa8 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,8 +1,8 @@ [metadata] name = ccaptchas version = 0.0.1 -author = Daniil -author_email = mail@placeholder123.to +author = Daniil Fajnberg +author_email = mail@daniil.fajnberg.de description = Character CAPTCHA Solver long_description = file: README.md long_description_content_type = text/markdown @@ -19,7 +19,8 @@ package_dir = packages = find: python_requires = >=3 install_requires = - ... + numpy + matplotlib [options.extras_require] dev = diff --git a/src/package_name/__init__.py b/src/ccaptchas/__init__.py similarity index 100% rename from src/package_name/__init__.py rename to src/ccaptchas/__init__.py diff --git a/src/ccaptchas/__main__.py b/src/ccaptchas/__main__.py new file mode 100644 index 0000000..5f421c7 --- /dev/null +++ b/src/ccaptchas/__main__.py @@ -0,0 +1,110 @@ +from argparse import ArgumentParser +from pathlib import Path +from typing import Any, Sequence + +from .config import CONFIG + + +CMD = 'command' +TRAIN = 'train' +DATA_DIR = 'data_dir' +SAVE_DIR = 'save_dir' +FILE_EXTENSIONS = 'file_extensions' +BATCH_SIZE = 'batch_size' +NUM_EPOCHS = 'num_epochs' +EARLY_STOPPING_PATIENCE = 'early_stopping_patience' + +INFER = 'infer' +MODEL_DIR = 'model_dir' + + +def ext_list(string: str) -> list[str]: + out = [] + for ext in string.split(','): + ext = ext.strip() + if not ext.startswith('.'): + raise ValueError("Extensions must start with a dot") + out.append(ext) + return out + + +def parse_cli(args: Sequence[str] = None) -> dict[str, Any]: + parser = ArgumentParser( + prog=CONFIG.PROGRAM_NAME, + description="Character CAPTCHA Solver", + ) + parser.add_argument( + '-E', f'--{FILE_EXTENSIONS.replace("_", "-")}', + default=CONFIG.DEFAULT_DATA_FILE_EXTENSIONS, + type=ext_list, + help=f"When used in `{TRAIN}` mode, extensions of the image files to be used for training/testing the model. " + f"When used in `{INFER}` mode, extensions of the image files to use the model on." + f"Defaults to {CONFIG.DEFAULT_DATA_FILE_EXTENSIONS}." + ) + subparsers = parser.add_subparsers(dest=CMD) + + parser_train = subparsers.add_parser(TRAIN, help="trains a new model") + parser_train.add_argument( + DATA_DIR, + type=Path, + help="Directory containing the image files to be used for training/testing the model." + ) + parser_train.add_argument( + '-s', f'--{SAVE_DIR.replace("_", "-")}', + default=CONFIG.DEFAULT_SAVE_DIR, + type=Path, + help=f"Directory in which to save trained models. A subdirectory for each training session named with the " + f"current date and time will be created there and the model will be saved in that subdirectory. " + f"Defaults to '{CONFIG.DEFAULT_SAVE_DIR}'." + ) + parser_train.add_argument( + '-b', f'--{BATCH_SIZE.replace("_", "-")}', + default=CONFIG.DEFAULT_BATCH_SIZE, + type=int, + help=f"The dataset will be divided into batches; this determines the number of images in each batch. " + f"Defaults to {CONFIG.DEFAULT_BATCH_SIZE}." + ) + parser_train.add_argument( + '-n', f'--{NUM_EPOCHS.replace("_", "-")}', + default=CONFIG.DEFAULT_NUM_EPOCHS, + type=int, + help=f"The number of training epochs. Defaults to {CONFIG.DEFAULT_NUM_EPOCHS}." + ) + parser_train.add_argument( + '-p', f'--{EARLY_STOPPING_PATIENCE.replace("_", "-")}', + default=CONFIG.DEFAULT_EARLY_STOPPING_PATIENCE, + type=int, + help=f"The number of training epochs with no improvement over a previously achieved optimum to allow before " + f"stopping training early (i.e. without completing all epochs). " + f"Defaults to {CONFIG.DEFAULT_EARLY_STOPPING_PATIENCE}." + ) + + parser_infer = subparsers.add_parser(INFER, help="uses an existing model to make inferences") + parser_infer.add_argument( + MODEL_DIR, + type=Path, + help="Directory containing the model to use for inference." + ) + parser_infer.add_argument( + DATA_DIR, + type=Path, + help="Directory containing the image files to use the model on." + ) + return vars(parser.parse_args(args)) + + +def main() -> None: + kwargs = parse_cli() + cmd = kwargs.pop(CMD) + if cmd == TRAIN: + from .model import start + start(**kwargs) + elif cmd == INFER: + from .infer import start + start(**kwargs) + else: + raise NotImplemented + + +if __name__ == '__main__': + main() diff --git a/src/ccaptchas/config.py b/src/ccaptchas/config.py new file mode 100644 index 0000000..5ebc3b6 --- /dev/null +++ b/src/ccaptchas/config.py @@ -0,0 +1,20 @@ +from pathlib import Path + + +class CONFIG(object): + __slots__ = () + PROGRAM_NAME = 'ccaptchas' + + DEFAULT_SAVE_DIR = Path('.', 'saved_models') + DEFAULT_DATA_FILE_EXTENSIONS = ('.jpg', '.png') + DEFAULT_BATCH_SIZE = 10 + DEFAULT_NUM_EPOCHS = 100 + DEFAULT_EARLY_STOPPING_PATIENCE = 10 + + VALIDATION_DATA_RATIO = 1 / 8 + SHUFFLE_DATA = True + INPUT_LAYER_NAME_IMAGE, INPUT_LAYER_NAME_LABEL = 'image', 'label' + OUTPUT_LAYER_NAME = 'encoded_output' + MAX_STRING_LENGTH = 6 # Maximum number of character in any captcha image in the dataset + IMG_WIDTH, IMG_HEIGHT = 250, 50 # Desired image dimensions + VOCABULARY_FILE_NAME = '.vocabulary' diff --git a/src/ccaptchas/infer.py b/src/ccaptchas/infer.py new file mode 100644 index 0000000..1359b6f --- /dev/null +++ b/src/ccaptchas/infer.py @@ -0,0 +1,63 @@ +from pathlib import Path +from typing import Iterable + +import numpy as np +import tensorflow as tf +from keras.api._v2.keras.models import Model, load_model +from keras.api._v2.keras.layers.experimental.preprocessing import StringLookup +from keras.api._v2.keras.backend import ctc_decode + +from .config import CONFIG +from .preproc import encode_image, decode_label, plot_images +from .types import PathT + + +def images_to_input(*images) -> tf.data.Dataset: + array = np.array([encode_image(img) for img in images]) + return tf.data.Dataset.from_tensor_slices(array) + + +def decode_batch_outputs(predictions, backward_lookup_table: StringLookup) -> list[str]: + input_len = np.ones(predictions.shape[0]) * predictions.shape[1] + # Use greedy search. For complex tasks, you can use beam search + sequences, _ = ctc_decode(predictions, input_length=input_len, greedy=True) + results = sequences[0][:, :CONFIG.MAX_STRING_LENGTH] + # Iterate over the results and get back the text + return [decode_label(result, backward_lookup_table) for result in results] + + +def load_inference_model(path: PathT) -> Model: + with open(Path(path, CONFIG.VOCABULARY_FILE_NAME), 'r') as vocab_file: + backward_lookup_table = StringLookup(vocabulary=list(vocab_file.read()), mask_token=None, invert=True) + saved_model = load_model(path) + inference_model = Model( + saved_model.get_layer(name=CONFIG.INPUT_LAYER_NAME_IMAGE).input, + saved_model.get_layer(name=CONFIG.OUTPUT_LAYER_NAME).output + ) + + def infer_and_decode(x: tf.data.Dataset) -> list[str]: + return decode_batch_outputs(predictions=inference_model.predict(x), backward_lookup_table=backward_lookup_table) + + inference_model.infer_and_decode = infer_and_decode + inference_model.backward_lookup_table = backward_lookup_table + return inference_model + + +def start(model_dir: PathT, data_dir: PathT, + file_extensions: Iterable[str] = CONFIG.DEFAULT_DATA_FILE_EXTENSIONS) -> None: + data_dir = Path(data_dir) + file_paths = [] + for ext in file_extensions: + file_paths.extend(data_dir.glob(f'*{ext}')) + file_paths.sort() + count = len(file_paths) + if count > 24: + raise ValueError("Too many files") + # images = [] + # for path in file_paths: + # with open(path, 'rb') as f: + # images.append(f.read()) + dataset = images_to_input(*file_paths) + model = load_inference_model(model_dir) + labels = model.infer_and_decode(dataset.batch(count)) + plot_images(list(dataset.as_numpy_iterator()), labels=labels) diff --git a/src/ccaptchas/model.py b/src/ccaptchas/model.py new file mode 100644 index 0000000..2c04083 --- /dev/null +++ b/src/ccaptchas/model.py @@ -0,0 +1,178 @@ +import os +from datetime import datetime +from pathlib import Path +from typing import Iterable + +import numpy as np +import tensorflow as tf +from tensorflow import keras +from tensorflow.keras import layers + +from .config import CONFIG +from .preproc import DatasetsInterface +from .types import PathT + + +THIS_DIR = os.path.dirname(os.path.realpath(__file__)) +# Build paths relative to this file's directory like this: os_path.join(THIS_DIR, ...) + + +class CTCLayer(layers.Layer): + def __init__(self, name: str = None): + super().__init__(name=name) + self.loss_fn = keras.backend.ctc_batch_cost + + def call(self, y_true: np.ndarray = None, y_pred: np.ndarray = None) -> np.ndarray: + # Compute the training-time loss value and add it + # to the layer using `self.add_loss()`. + batch_len = tf.cast(tf.shape(y_true)[0], dtype='int64') + input_length = tf.cast(tf.shape(y_pred)[1], dtype='int64') + label_length = tf.cast(tf.shape(y_true)[1], dtype='int64') + input_length = input_length * tf.ones(shape=(batch_len, 1), dtype='int64') + label_length = label_length * tf.ones(shape=(batch_len, 1), dtype='int64') + loss = self.loss_fn(y_true, y_pred, input_length, label_length) + self.add_loss(loss) + # At test time, just return the computed predictions + return y_pred + + +# Factor by which the image is going to be downsampled +# by the convolutional blocks. We will be using two +# convolution blocks and each block will have +# a pooling layer which downsample the features by a factor of 2. +# Hence total downsampling factor would be 4. +downsample_factor = 4 + + +def build_model(alphabet_size: int, + img_width: int = CONFIG.IMG_WIDTH, + img_height: int = CONFIG.IMG_HEIGHT, + optimizer: keras.optimizers.Optimizer = keras.optimizers.Adam()) -> keras.models.Model: + # Inputs to the model + input_img = layers.Input( + shape=(img_width, img_height, 1), + dtype='float32', + name=CONFIG.INPUT_LAYER_NAME_IMAGE + ) + labels = layers.Input( + shape=(None, ), + dtype='float32', + name=CONFIG.INPUT_LAYER_NAME_LABEL, + ) + # First conv block + x = layers.Conv2D( + filters=32, + kernel_size=(3, 3), + activation='relu', + kernel_initializer='he_normal', + padding='same', + name='conv1', + )(input_img) + x = layers.MaxPooling2D( + pool_size=(2, 2), + name='pool1' + )(x) + # Second conv block + x = layers.Conv2D( + filters=64, + kernel_size=(3, 3), + activation='relu', + kernel_initializer='he_normal', + padding='same', + name='conv2', + )(x) + x = layers.MaxPooling2D( + pool_size=(2, 2), + name='pool2' + )(x) + # We have used two max. pooling layers with pool size and strides 2. + # Hence, downsampled feature maps are 4x smaller. The number of + # filters in the last layer is 64. Reshape accordingly before + # passing the output to the RNN part of the model + new_shape = ( + (img_width // 4), + (img_height // 4) * 64 + ) + x = layers.Reshape( + target_shape=new_shape, + name='reshape' + )(x) + x = layers.Dense( + units=64, + activation='relu', + name='dense1' + )(x) + x = layers.Dropout(rate=0.2)(x) + # RNNs + x = layers.Bidirectional( + layers.LSTM( + units=128, + return_sequences=True, + dropout=0.25, + ) + )(x) + x = layers.Bidirectional( + layers.LSTM( + units=64, + return_sequences=True, + dropout=0.25, + ) + )(x) + # Output layer + x = layers.Dense( + units=alphabet_size + 1, + activation='softmax', + name=CONFIG.OUTPUT_LAYER_NAME, + )(x) + # Add CTC layer for calculating CTC loss at each step + output = CTCLayer(name='ctc_loss')(labels, x) + # Define the model + model = keras.models.Model( + inputs=[input_img, labels], + outputs=output, + name='ocr_model_v1' + ) + # Compile the model and return + model.compile(optimizer=optimizer) + return model + + +def train_model(model: keras.models.Model, train_dataset: tf.data.Dataset, valid_dataset: tf.data.Dataset, + num_epochs: int = CONFIG.DEFAULT_NUM_EPOCHS, + early_stopping_patience: int = CONFIG.DEFAULT_EARLY_STOPPING_PATIENCE) -> keras.callbacks.History: + # Add early stopping + early_stopping = keras.callbacks.EarlyStopping( + monitor='val_loss', + patience=early_stopping_patience, + restore_best_weights=True, + ) + # Train the model + history = model.fit( + x=train_dataset, + validation_data=valid_dataset, + epochs=num_epochs, + callbacks=[early_stopping], + ) + return history + + +def start(data_dir: PathT, save_dir: PathT, file_extensions: Iterable[str] = CONFIG.DEFAULT_DATA_FILE_EXTENSIONS, + batch_size: int = CONFIG.DEFAULT_BATCH_SIZE, num_epochs: int = CONFIG.DEFAULT_NUM_EPOCHS, + early_stopping_patience: int = CONFIG.DEFAULT_EARLY_STOPPING_PATIENCE) -> None: + save_dir = Path(save_dir, datetime.now().strftime('%Y-%m-%d_%H-%M')) + save_dir.mkdir(parents=True) + print("\nConstructing datasets\n") + data_interface = DatasetsInterface(batch_size=int(batch_size), data_dir=data_dir, extensions=file_extensions) + data_interface.split_and_make_datasets() + print("\nBuilding model\n") + model = build_model(len(data_interface.characters)) + print("\nBeginning training\n") + train_model(model, data_interface.training, data_interface.validation, num_epochs=num_epochs, + early_stopping_patience=early_stopping_patience) + print("\nSaving model\n") + model.save(save_dir) + print("\nSaving vocabulary\n") + vocabulary = ''.join(data_interface.forward_lookup_table.get_vocabulary()) + with open(os.path.join(save_dir, CONFIG.VOCABULARY_FILE_NAME), 'w') as f: + f.write(vocabulary) + print("\nAll saved!\n") diff --git a/src/ccaptchas/preproc.py b/src/ccaptchas/preproc.py new file mode 100644 index 0000000..e9a5dac --- /dev/null +++ b/src/ccaptchas/preproc.py @@ -0,0 +1,330 @@ +import os +import shutil +from pathlib import Path +from typing import Union, Mapping, Sequence, Iterable, Callable + +import matplotlib.pyplot as plt +import numpy as np +import tensorflow as tf +from tensorflow.keras import layers + +from .config import CONFIG +from .types import PathT + + +def label_files(src_dir: PathT, dest_dir: PathT, labels: Union[PathT, Sequence[str]], + reverse: bool = False, extensions: Iterable[str] = None) -> None: + """ + Copies files giving them new names by using specified labels. + + All matching files are sorted by their file name before applying the sequence of labels to them. + The first file is named with the first label, the second is named with the second label, and so on. + If a label duplicate is encountered, a dot followed by a counter is appended to the file name + *preceding* the file extension, e.g. if 'some_label.jpg' exists, 'some_label.1.jpg' may be created. + + The number of matching files must be greater than or equal to the number of labels. + Exactly one file is copied for every label; thus, after every label has been used, the operation ends. + + Args: + src_dir: + Path to directory containing the files to be copied/renamed + dest_dir: + Path to destination directory + labels: + Either a sequence of labels (strings) or a path to a files containing the labels (newline separated) + reverse (optional): + Defines which file receives which label; + if False (default), the files in `img_dir` are sorted ascending by their file name, + if True, the files are sorted descending by name. + extensions (optional): + Iterable of file extensions; only files with these extensions will be considered. + + """ + extensions = '' if extensions is None else tuple(extensions) + file_names = [name for name in os.listdir(src_dir) if name.endswith(extensions)] + file_names.sort(reverse=reverse) + try: + with open(labels, 'r') as f: + labels = f.read().strip().split('\n') + except TypeError: + pass # Assume, labels is already a sequence of strings + if not os.path.isdir(dest_dir): + raise NotADirectoryError(f"'{dest_dir}' is not a directory.") + if len(labels) > len(file_names): + raise IndexError(f"There are more labels ({len(labels)}) than files " + f"in the source directory ({len(file_names)} matching).") + for idx, label in enumerate(labels): + file_name = file_names[idx] + _, ext = os.path.splitext(file_name) + while True: + new_path = os.path.join(dest_dir, label + ext) + if not os.path.exists(new_path): + shutil.copyfile(os.path.join(src_dir, file_name), new_path) + break + pre_label, n = os.path.splitext(label) + try: + n = int(n[1:]) + except ValueError: + label = label + '.1' + else: + label = pre_label + '.' + str(n + 1) + + +def load_images_data(data_dir: PathT, extensions: Iterable[str] = ('.jpg', '.png'), verbose: bool = True + ) -> tuple[dict[str, str], str]: + """ + Creates a dictionary mapping file paths (of images) to their labels. + Everything up to the first dot in the filename is taken to be the label; + this naturally excludes file extensions, but also a filename like `ABC.1.jpg` results in the label `ABC`. + Also creates a vocabulary of characters encountered in the file names. + + Args: + data_dir: + Path-like object or string to a directory containing the desired image files + extensions (optional): + Iterable of extensions that the files considered for the resulting data should be restricted to; + defaults to restricting finds to JPEG and PNG files. + verbose (optional): + If True, the function will print out a summary of the findings before returning. + + Returns: + 2-tuple with the first element being a dictionary where the keys are the file paths and the values are the + file names (i.e. image labels) and the second element being a string of all characters present in the labels. + """ + data_dir = Path(data_dir) + file_paths_and_labels, characters = {}, set() + for file_path in data_dir.iterdir(): + if file_path.suffix not in extensions: + continue + label = file_path.name.split('.')[0] + for char in label: + characters.add(char) + file_paths_and_labels[str(file_path)] = label + if verbose: + print("Number of images/labels found: ", len(file_paths_and_labels)) + print("Number of unique characters: ", len(characters)) + print("Characters present: ", characters) + return file_paths_and_labels, ''.join(characters) + + +def get_vocab_maps(characters: Iterable[str], num_oov_indices: int = 0, mask_token: str = None + ) -> tuple[layers.StringLookup, layers.StringLookup]: + """ + Constructs two table-based lookup objects that map characters to integers and back. + + Details about the `StringLookup` class in the documentation: + https://tensorflow.org/api_docs/python/tf/keras/layers/experimental/preprocessing/StringLookup + + Args: + characters: + An iterable of strings representing the vocabulary to be mapped + num_oov_indices (optional): + Passed to the `IndexLookup` constructor; + defines the number of out-of-vocabulary (OOV) tokens to create; + assuming that no OOV characters will be encountered, the default is 0. + mask_token (optional): + Passed to the `IndexLookup` constructor; + the token representing missing values; + assuming that there will never be a value missing, the default is None. + + Returns: + 2-tuple of `StringLookup` objects, the first one mapping characters to integers, the second doing the reverse. + By default, no OOV or missing values are assumed to be encountered, + and thus each index (uniquely) represents a character from the vocabulary. + """ + char_to_int = layers.StringLookup( + vocabulary=list(characters), + num_oov_indices=num_oov_indices, + mask_token=mask_token, + ) + int_to_char = layers.StringLookup( + vocabulary=char_to_int.get_vocabulary(), + mask_token=mask_token, + invert=True, + ) + return char_to_int, int_to_char + + +def encode_image(img): + """ + Creates a `Tensor` object from an image file and transposes it. + """ + try: + # 0. Read image + img = tf.io.read_file(str(img)) + except ValueError: + pass + # 1. Decode and convert to grayscale + img = tf.io.decode_png(img, channels=1) + # 2. Convert to float32 in [0, 1] range + img = tf.image.convert_image_dtype(img, tf.float32) + # 3. Resize to the desired size + img = tf.image.resize(img, [CONFIG.IMG_HEIGHT, CONFIG.IMG_WIDTH]) + # 4. Transpose the image because we want the time + # dimension to correspond to the width of the image. + return tf.transpose(img, perm=[1, 0, 2]) + + +def encode_label(label: str, forward_lookup_table: layers.StringLookup): + """ + Creates a `Tensor` object from a label string by passing passing its characters through a `StringLookup` instance. + """ + return forward_lookup_table(tf.strings.unicode_split(label, input_encoding='UTF-8')) + + +def decode_label(tensor, backward_lookup_table: layers.StringLookup) -> str: + return tf.strings.reduce_join(backward_lookup_table(tensor)).numpy().decode('utf-8') + + +def get_sample_encoder(forward_lookup_table: layers.StringLookup) -> Callable[[str, str], dict]: + """ + Returns a function for usage in the `map(...)` method of a `Dataset` instance. + + The function will accept an image path and a label and return a dictionary; + the dictionary values will be a tensor representing the image and a tensor representing the label; + the keys for each are pre-configured and will correspond to the models input layers' names. + + Args: + forward_lookup_table: + Passed to the `encode_label` function; required for mapping individual characters to floats. + + Returns: + Callable taking two strings as arguments and returning a dictionary with string keys and Tensor values. + """ + def func(img_path: PathT, label: str) -> dict: + return { + CONFIG.INPUT_LAYER_NAME_IMAGE: encode_image(img_path), + CONFIG.INPUT_LAYER_NAME_LABEL: encode_label(label, forward_lookup_table) + } + return func + + +def make_dataset(file_paths: np.ndarray, labels: np.ndarray, + sample_encode_func: Callable[[str, str], dict], batch_size: int) -> tf.data.Dataset: + """ + Generates a `Dataset` instance from an array of image file paths and an array of corresponding labels. + + Args: + file_paths: + Array of strings, each representing a path to an image file; + each of those paths will be passed into the function encoding one data sample (as the first argument). + labels: + Array of strings, each representing a label for an image pointed to by a file path + in the `file_paths` array with the corresponding index; + each of those labels will be passed into the function encoding one data sample (as the second argument). + sample_encode_func: + Will be passed as the `map_func` argument into the `map(...)` method of the new `Dataset`; + should be a function taking two strings (image path and label) as arguments and + returning a dictionary of Tensors representing the image and label. + batch_size: + Will be passed as the `batch_size` argument into the `batch(...)` method of the new `Dataset`; + determines how the dataset will be divided into batches. + + Returns: + A `Dataset` ready to be fed into a model's `fit(...)` method, provided that model has two input layers + named in accordance with the keys of the dictionary returned by the `sample_encode_func` function. + """ + if file_paths.size != labels.size: + raise ValueError("Number of file paths must be equal to number of labels") + + dataset = tf.data.Dataset.from_tensor_slices((file_paths, labels)) + dataset = dataset.map( + map_func=sample_encode_func, + num_parallel_calls=tf.data.experimental.AUTOTUNE + ).batch( + batch_size=batch_size + ).prefetch( + buffer_size=tf.data.experimental.AUTOTUNE + ) + return dataset + + +def get_datasets(file_paths_and_labels: Mapping[str, str], sample_encode_func: Callable[[str, str], dict], + batch_size: int, train_data_ratio: float, shuffle: bool = CONFIG.SHUFFLE_DATA + ) -> tuple[tf.data.Dataset, tf.data.Dataset]: + """ + Creates a training dataset and a validation dataset from a mapping of image file paths to labels. + + Args: + file_paths_and_labels: + Mapping with keys being image file paths and values being labels of the corresponding images; + this represents the full dataset used for fitting the model. + sample_encode_func: + Will be passed as the `sample_encode_func` argument into the `make_dataset(...)` function; + should be a function taking two strings (image path and label) as arguments and + returning a dictionary of Tensors representing the image and label. + batch_size: + Will be passed as the `batch_size` argument into the `make_dataset(...)` function; + determines how each dataset will be divided into batches. + train_data_ratio: + Floating point value between 0 and 1 determining what ratio of the full dataset will be used for training; + this implies that (1 - `train_data_ratio`) will be the ratio used for validation. + shuffle: + If True, the full dataset is shuffled pseudo-randomly before being split. + + Returns: + Two `Dataset` objects ready to be fed into a model's `fit(...)` method, provided that model has two input layers + named in accordance with the keys of the dictionary returned by the `sample_encode_func` function. + """ + # 1. Get the total size of the dataset + size = len(file_paths_and_labels) + # 2. Make an indices array and shuffle it, if required + indices = np.arange(size) + if shuffle: + np.random.shuffle(indices) + # 3. Get the size of training samples; that will be the array cutoff index for the data-indices array + cutoff = int(size * train_data_ratio) + train_indices, valid_indices = indices[:cutoff], indices[cutoff:] + # 4. Split data into training and validation sets + file_paths, labels = np.array(list(file_paths_and_labels.keys())), np.array(list(file_paths_and_labels.values())) + x_train, x_valid = file_paths[train_indices], file_paths[valid_indices] + y_train, y_valid = labels[train_indices], labels[valid_indices] + # 5. Construct the actual Dataset-class objects + train_dataset = make_dataset(x_train, y_train, sample_encode_func, batch_size) + valid_dataset = make_dataset(x_valid, y_valid, sample_encode_func, batch_size) + return train_dataset, valid_dataset + + +def plot_images(images: Sequence[np.ndarray], labels: Sequence[str] = None, num_columns: int = 4, + transpose: bool = True) -> None: + if transpose: + images = tf.transpose(images, perm=[0, 2, 1, 3]) + images = images[:, :, :, 0] * 255 + images = images.numpy().astype('uint8') + num_rows = len(images) // num_columns or 1 + _, axs = plt.subplots(num_rows, num_columns, figsize=(10, 5)) + for idx, image in enumerate(images): + if num_rows == 1: + if num_columns == 1: + ax = axs + else: + ax = axs[idx // num_columns] + else: + ax = axs[idx // num_columns, idx % num_columns] + ax.imshow(image, cmap='gray') + if labels is not None: + ax.set_title(labels[idx]) + ax.axis('off') + plt.show() + + +class DatasetsInterface: + """ + Convenience class for loading and pre-processing the training and validation data for usage with a model. + """ + + def __init__(self, batch_size: int, data_dir: PathT, + extensions: Iterable[str] = CONFIG.DEFAULT_DATA_FILE_EXTENSIONS) -> None: + self.file_paths_and_labels, self.characters = load_images_data(data_dir=data_dir, extensions=extensions) + self.forward_lookup_table, self.backward_lookup_table = get_vocab_maps(characters=self.characters) + self.sample_encode_func = get_sample_encoder(forward_lookup_table=self.forward_lookup_table) + self.batch_size = batch_size + self.training, self.validation = None, None + + def split_and_make_datasets(self, train_data_ratio: float = (1 - CONFIG.VALIDATION_DATA_RATIO), + shuffle: bool = CONFIG.SHUFFLE_DATA) -> None: + self.training, self.validation = get_datasets(file_paths_and_labels=self.file_paths_and_labels, + sample_encode_func=self.sample_encode_func, + batch_size=self.batch_size, + train_data_ratio=train_data_ratio, + shuffle=shuffle) diff --git a/src/ccaptchas/types.py b/src/ccaptchas/types.py new file mode 100644 index 0000000..6edb455 --- /dev/null +++ b/src/ccaptchas/types.py @@ -0,0 +1,5 @@ +from pathlib import Path +from typing import Union + + +PathT = Union[Path, str]