From 7c9f97f21b92b57e0ee3965c5c6d518496664bc8 Mon Sep 17 00:00:00 2001 From: Daniil Fajnberg Date: Wed, 11 May 2022 17:32:01 +0200 Subject: [PATCH] refactor entire package; add CLI --- .gitignore | 2 + src/ccaptchas/__main__.py | 77 +++++-- src/ccaptchas/config.py | 29 ++- src/ccaptchas/ctc_layer.py | 24 +++ src/ccaptchas/infer.py | 109 +++++----- src/ccaptchas/keras/__init__.py | 0 src/ccaptchas/keras/backend.py | 1 + src/ccaptchas/keras/callbacks.py | 1 + src/ccaptchas/keras/layers.py | 1 + src/ccaptchas/keras/models.py | 1 + src/ccaptchas/keras/optimizers.py | 1 + src/ccaptchas/model.py | 133 +++++------- src/ccaptchas/preproc.py | 330 ------------------------------ src/ccaptchas/preprocess.py | 221 ++++++++++++++++++++ src/ccaptchas/types.py | 8 +- src/ccaptchas/visualize.py | 28 +++ 16 files changed, 485 insertions(+), 481 deletions(-) create mode 100644 src/ccaptchas/ctc_layer.py create mode 100644 src/ccaptchas/keras/__init__.py create mode 100644 src/ccaptchas/keras/backend.py create mode 100644 src/ccaptchas/keras/callbacks.py create mode 100644 src/ccaptchas/keras/layers.py create mode 100644 src/ccaptchas/keras/models.py create mode 100644 src/ccaptchas/keras/optimizers.py delete mode 100644 src/ccaptchas/preproc.py create mode 100644 src/ccaptchas/preprocess.py create mode 100644 src/ccaptchas/visualize.py diff --git a/.gitignore b/.gitignore index 22ce824..6d6ab23 100644 --- a/.gitignore +++ b/.gitignore @@ -8,3 +8,5 @@ /dist/ # Python cache: __pycache__/ + +saved_models/ diff --git a/src/ccaptchas/__main__.py b/src/ccaptchas/__main__.py index 5f421c7..5c516c6 100644 --- a/src/ccaptchas/__main__.py +++ b/src/ccaptchas/__main__.py @@ -6,16 +6,26 @@ from .config import CONFIG CMD = 'command' + TRAIN = 'train' + DATA_DIR = 'data_dir' SAVE_DIR = 'save_dir' -FILE_EXTENSIONS = 'file_extensions' +FILE_EXT = 'file_ext' BATCH_SIZE = 'batch_size' +VALIDATION_RATIO = 'validation_ratio' +IMG_WIDTH = 'img_width' +IMG_HEIGHT = 'img_height' NUM_EPOCHS = 'num_epochs' EARLY_STOPPING_PATIENCE = 'early_stopping_patience' +_PREPROCESSING_KEYS = (DATA_DIR, FILE_EXT, BATCH_SIZE, VALIDATION_RATIO, IMG_WIDTH, IMG_HEIGHT) +_TRAINING_KEYS = (SAVE_DIR, NUM_EPOCHS, EARLY_STOPPING_PATIENCE) INFER = 'infer' MODEL_DIR = 'model_dir' +IMAGES_DIR = 'images_dir' +IMAGE_FILES = 'image_files' +PLOT_RESULTS = 'plot_results' def ext_list(string: str) -> list[str]: @@ -34,14 +44,14 @@ def parse_cli(args: Sequence[str] = None) -> dict[str, Any]: description="Character CAPTCHA Solver", ) parser.add_argument( - '-E', f'--{FILE_EXTENSIONS.replace("_", "-")}', - default=CONFIG.DEFAULT_DATA_FILE_EXTENSIONS, + '-E', f'--{FILE_EXT.replace("_", "-")}', + default=CONFIG.DEFAULT_IMG_FILE_EXT, type=ext_list, help=f"When used in `{TRAIN}` mode, extensions of the image files to be used for training/testing the model. " f"When used in `{INFER}` mode, extensions of the image files to use the model on." - f"Defaults to {CONFIG.DEFAULT_DATA_FILE_EXTENSIONS}." + f"Defaults to {CONFIG.DEFAULT_IMG_FILE_EXT}." ) - subparsers = parser.add_subparsers(dest=CMD) + subparsers = parser.add_subparsers(dest=CMD, title="Commands") parser_train = subparsers.add_parser(TRAIN, help="trains a new model") parser_train.add_argument( @@ -49,7 +59,9 @@ def parse_cli(args: Sequence[str] = None) -> dict[str, Any]: type=Path, help="Directory containing the image files to be used for training/testing the model." ) - parser_train.add_argument( + preprocessing_group = parser_train.add_argument_group("Preprocessing options") + training_group = parser_train.add_argument_group("Training options") + training_group.add_argument( '-s', f'--{SAVE_DIR.replace("_", "-")}', default=CONFIG.DEFAULT_SAVE_DIR, type=Path, @@ -57,20 +69,40 @@ def parse_cli(args: Sequence[str] = None) -> dict[str, Any]: f"current date and time will be created there and the model will be saved in that subdirectory. " f"Defaults to '{CONFIG.DEFAULT_SAVE_DIR}'." ) - parser_train.add_argument( + preprocessing_group.add_argument( '-b', f'--{BATCH_SIZE.replace("_", "-")}', default=CONFIG.DEFAULT_BATCH_SIZE, type=int, help=f"The dataset will be divided into batches; this determines the number of images in each batch. " f"Defaults to {CONFIG.DEFAULT_BATCH_SIZE}." ) - parser_train.add_argument( + preprocessing_group.add_argument( + '-r', f'--{VALIDATION_RATIO.replace("_", "-")}', + default=CONFIG.DEFAULT_VALIDATION_RATIO, + type=float, + help=f"The dataset will split into training and validation data; this argument should be a float between 0 " + f"and 1 determining the relative size of the validation dataset to the whole dataset. " + f"Defaults to {round(CONFIG.DEFAULT_VALIDATION_RATIO, 3)}." + ) + preprocessing_group.add_argument( + '-W', f'--{IMG_WIDTH.replace("_", "-")}', + default=CONFIG.DEFAULT_IMG_WIDTH, + type=int, + help=f"The width of an image in pixels. Defaults to {CONFIG.DEFAULT_IMG_WIDTH}." + ) + preprocessing_group.add_argument( + '-H', f'--{IMG_HEIGHT.replace("_", "-")}', + default=CONFIG.DEFAULT_IMG_HEIGHT, + type=int, + help=f"The height of an image in pixels. Defaults to {CONFIG.DEFAULT_IMG_HEIGHT}." + ) + training_group.add_argument( '-n', f'--{NUM_EPOCHS.replace("_", "-")}', default=CONFIG.DEFAULT_NUM_EPOCHS, type=int, help=f"The number of training epochs. Defaults to {CONFIG.DEFAULT_NUM_EPOCHS}." ) - parser_train.add_argument( + training_group.add_argument( '-p', f'--{EARLY_STOPPING_PATIENCE.replace("_", "-")}', default=CONFIG.DEFAULT_EARLY_STOPPING_PATIENCE, type=int, @@ -85,10 +117,24 @@ def parse_cli(args: Sequence[str] = None) -> dict[str, Any]: type=Path, help="Directory containing the model to use for inference." ) - parser_infer.add_argument( - DATA_DIR, + data_group = parser_infer.add_mutually_exclusive_group() + data_group.add_argument( + '-f', f'--{IMAGE_FILES.replace("_", "-")}', type=Path, - help="Directory containing the image files to use the model on." + nargs='*', + metavar='PATH', + help="Paths to image files to use the model on." + ) + data_group.add_argument( + '-d', f'--{IMAGES_DIR.replace("_", "-")}', + type=Path, + metavar='PATH', + help="Path to directory containing the image files to use the model on." + ) + parser_infer.add_argument( + '-p', f'--{PLOT_RESULTS.replace("_", "-")}', + action='store_true', + help="If set, a plot will be displayed, showing the images with the inferred labels." ) return vars(parser.parse_args(args)) @@ -98,12 +144,15 @@ def main() -> None: cmd = kwargs.pop(CMD) if cmd == TRAIN: from .model import start - start(**kwargs) + from .preprocess import load_datasets + pre_kwargs = {k: kwargs.pop(k) for k in _PREPROCESSING_KEYS} + training_data, validation_data, vocabulary = load_datasets(**pre_kwargs) + start(training_data, validation_data, vocabulary, **kwargs) elif cmd == INFER: from .infer import start start(**kwargs) else: - raise NotImplemented + raise SystemExit # Should be unreachable since argument parser will throw an error earlier if __name__ == '__main__': diff --git a/src/ccaptchas/config.py b/src/ccaptchas/config.py index 5ebc3b6..4a75b92 100644 --- a/src/ccaptchas/config.py +++ b/src/ccaptchas/config.py @@ -5,16 +5,29 @@ class CONFIG(object): __slots__ = () PROGRAM_NAME = 'ccaptchas' - DEFAULT_SAVE_DIR = Path('.', 'saved_models') - DEFAULT_DATA_FILE_EXTENSIONS = ('.jpg', '.png') - DEFAULT_BATCH_SIZE = 10 + EXT_PNG, EXT_JPG = '.png', '.jpg' + DEFAULT_IMG_FILE_EXT = (EXT_PNG, EXT_JPG) + + # StringLookup parameters: + DEFAULT_NUM_OOV_INDICES = 0 # assuming no out-of-vocabulary (OOV) characters will be encountered + DEFAULT_MASK_TOKEN = None # assuming there will never be a value missing + + # Data splitting: + DEFAULT_VALIDATION_RATIO = 1 / 8 + DEFAULT_SHUFFLE_DATA = True + + # Image processing: + DEFAULT_IMG_WIDTH, DEFAULT_IMG_HEIGHT = 250, 50 # Desired image dimensions + + # Training hyper-parameters: + DEFAULT_BATCH_SIZE = 16 DEFAULT_NUM_EPOCHS = 100 DEFAULT_EARLY_STOPPING_PATIENCE = 10 - VALIDATION_DATA_RATIO = 1 / 8 - SHUFFLE_DATA = True - INPUT_LAYER_NAME_IMAGE, INPUT_LAYER_NAME_LABEL = 'image', 'label' - OUTPUT_LAYER_NAME = 'encoded_output' + DEFAULT_SAVE_DIR = Path('.', 'saved_models') + MODEL_NAME = f'{PROGRAM_NAME}_model' + LAYER_NAME_INPUT_IMAGE, LAYER_NAME_INPUT_LABEL = 'image', 'label' + LAYER_NAME_OUTPUT = 'encoded_output' MAX_STRING_LENGTH = 6 # Maximum number of character in any captcha image in the dataset - IMG_WIDTH, IMG_HEIGHT = 250, 50 # Desired image dimensions VOCABULARY_FILE_NAME = '.vocabulary' + HISTORY_FILE_NAME = '.history.json' diff --git a/src/ccaptchas/ctc_layer.py b/src/ccaptchas/ctc_layer.py new file mode 100644 index 0000000..153b1ac --- /dev/null +++ b/src/ccaptchas/ctc_layer.py @@ -0,0 +1,24 @@ +import tensorflow as tf +import numpy as np + +from .keras.backend import ctc_batch_cost +from .keras.layers import Layer + + +class CTCLayer(Layer): + def __init__(self, name: str = None): + super().__init__(name=name) + self.loss_fn = ctc_batch_cost + + def call(self, y_true: np.ndarray = None, y_pred: np.ndarray = None) -> np.ndarray: + # Compute the training-time loss value and add it + # to the layer using `self.add_loss()`. + batch_len = tf.cast(tf.shape(y_true)[0], dtype='int64') + input_length = tf.cast(tf.shape(y_pred)[1], dtype='int64') + label_length = tf.cast(tf.shape(y_true)[1], dtype='int64') + input_length = input_length * tf.ones(shape=(batch_len, 1), dtype='int64') + label_length = label_length * tf.ones(shape=(batch_len, 1), dtype='int64') + loss = self.loss_fn(y_true, y_pred, input_length, label_length) + self.add_loss(loss) + # At test time, just return the computed predictions + return y_pred \ No newline at end of file diff --git a/src/ccaptchas/infer.py b/src/ccaptchas/infer.py index 1359b6f..3b1daf6 100644 --- a/src/ccaptchas/infer.py +++ b/src/ccaptchas/infer.py @@ -1,63 +1,78 @@ +import sys from pathlib import Path -from typing import Iterable +from typing import Iterable, Sequence import numpy as np import tensorflow as tf from keras.api._v2.keras.models import Model, load_model -from keras.api._v2.keras.layers.experimental.preprocessing import StringLookup +from keras.api._v2.keras.layers import StringLookup from keras.api._v2.keras.backend import ctc_decode from .config import CONFIG -from .preproc import encode_image, decode_label, plot_images -from .types import PathT +from .preprocess import process_image, decode_label, find_image_files, get_lookup_table +from .types import PathT, ImgT, Array +from .visualize import plot_images -def images_to_input(*images) -> tf.data.Dataset: - array = np.array([encode_image(img) for img in images]) - return tf.data.Dataset.from_tensor_slices(array) +def process_predictions(predictions: tf.Tensor) -> tf.Tensor: + num_predictions = predictions.shape[0] # corresponds to the number of images passed into the model for inference + output_width = predictions.shape[1] # corresponds to the (down-sampled) width of an image + # It is worth noting that `predictions.shape[2]` corresponds to the size of the vocabulary + 1, + # i.e. one more than the number of distinct characters that can occur in a label. + + # Since the `predictions` tensor is the output of a softmax activation function, we need to decode the values along + # the "width axis" from arrays of floats between 0 and 1 to single integers representing the inferred characters. + # (see CTC concepts) + + # Construct 1D array, each element representing the width of a single prediction, i.e. the down-sampled image width: + seq_lengths = np.ones(num_predictions) * output_width + # Retrieve the sequences of label indices inferred by the model: + sequences, _probabilities = ctc_decode(predictions, input_length=seq_lengths, greedy=True) + # Since we use a greedy approach, only one sequence per prediction is returned, so we discard the other dimensions: + sequences = sequences[0] + # Now this is a 2D tensor, for which `sequences.shape[0]` corresponds to the number of samples/images, + # while `sequences.shape[1]` corresponds to the size of the vocabulary + 1. + # Assuming n characters were inferred, the first n elements of each array will be the label indices of those + # characters, whereas the rest of the elements will be -1, implying blank labels. Since we know the maximum length + # a string of characters in an image can have, we can discard all those labels, that must be blank. + # What we are then left with, will be an array of relevant label indices for each image passed through the model. + # Using a backward lookup table, these can later be easily decoded to the actual characters. + return sequences[:, :CONFIG.MAX_STRING_LENGTH] -def decode_batch_outputs(predictions, backward_lookup_table: StringLookup) -> list[str]: - input_len = np.ones(predictions.shape[0]) * predictions.shape[1] - # Use greedy search. For complex tasks, you can use beam search - sequences, _ = ctc_decode(predictions, input_length=input_len, greedy=True) - results = sequences[0][:, :CONFIG.MAX_STRING_LENGTH] - # Iterate over the results and get back the text - return [decode_label(result, backward_lookup_table) for result in results] - - -def load_inference_model(path: PathT) -> Model: - with open(Path(path, CONFIG.VOCABULARY_FILE_NAME), 'r') as vocab_file: - backward_lookup_table = StringLookup(vocabulary=list(vocab_file.read()), mask_token=None, invert=True) - saved_model = load_model(path) +def load_inference_model(model_dir: PathT) -> tuple[Model, StringLookup]: + with open(Path(model_dir, CONFIG.VOCABULARY_FILE_NAME), 'r') as vocab_file: + backward_lookup = get_lookup_table(vocab_file.read(), invert=True) + saved_model = load_model(model_dir) inference_model = Model( - saved_model.get_layer(name=CONFIG.INPUT_LAYER_NAME_IMAGE).input, - saved_model.get_layer(name=CONFIG.OUTPUT_LAYER_NAME).output + saved_model.get_layer(name=CONFIG.LAYER_NAME_INPUT_IMAGE).input, + saved_model.get_layer(name=CONFIG.LAYER_NAME_OUTPUT).output ) - - def infer_and_decode(x: tf.data.Dataset) -> list[str]: - return decode_batch_outputs(predictions=inference_model.predict(x), backward_lookup_table=backward_lookup_table) - - inference_model.infer_and_decode = infer_and_decode - inference_model.backward_lookup_table = backward_lookup_table - return inference_model + return inference_model, backward_lookup -def start(model_dir: PathT, data_dir: PathT, - file_extensions: Iterable[str] = CONFIG.DEFAULT_DATA_FILE_EXTENSIONS) -> None: - data_dir = Path(data_dir) - file_paths = [] - for ext in file_extensions: - file_paths.extend(data_dir.glob(f'*{ext}')) - file_paths.sort() - count = len(file_paths) - if count > 24: - raise ValueError("Too many files") - # images = [] - # for path in file_paths: - # with open(path, 'rb') as f: - # images.append(f.read()) - dataset = images_to_input(*file_paths) - model = load_inference_model(model_dir) - labels = model.infer_and_decode(dataset.batch(count)) - plot_images(list(dataset.as_numpy_iterator()), labels=labels) +def predict_and_decode(images: Sequence[ImgT], model: Model, backward_lookup: StringLookup) -> tuple[Array, list[str]]: + dataset = np.array([process_image(img) for img in images]) + encoded_labels = process_predictions(model.predict(dataset)) + return dataset, [decode_label(label, backward_lookup) for label in encoded_labels] + + +def load_and_infer(images: Sequence[ImgT], model_dir: PathT, plot_results: bool = False) -> list[str]: + model, backward_lookup = load_inference_model(model_dir) + images, labels = predict_and_decode(images, model, backward_lookup) + if plot_results: + per_plot = 24 + for i in range(0, len(images), per_plot): + plot_images(images[i:(i + per_plot)], labels=labels[i:(i + per_plot)]) + return labels + + +def start(model_dir: PathT, image_files: Sequence[Path] = (), images_dir: PathT = None, + file_ext: Iterable[str] = CONFIG.DEFAULT_IMG_FILE_EXT, plot_results: bool = False) -> None: + if images_dir is not None: + image_files = sorted(find_image_files(images_dir, file_ext=file_ext)) + if not image_files: + image_files = [sys.stdin.buffer.read()] + labels = load_and_infer(image_files, model_dir, plot_results=plot_results) + for label in labels: + print(label) diff --git a/src/ccaptchas/keras/__init__.py b/src/ccaptchas/keras/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/ccaptchas/keras/backend.py b/src/ccaptchas/keras/backend.py new file mode 100644 index 0000000..1cd1850 --- /dev/null +++ b/src/ccaptchas/keras/backend.py @@ -0,0 +1 @@ +from keras.api._v2.keras.backend import * diff --git a/src/ccaptchas/keras/callbacks.py b/src/ccaptchas/keras/callbacks.py new file mode 100644 index 0000000..53754d0 --- /dev/null +++ b/src/ccaptchas/keras/callbacks.py @@ -0,0 +1 @@ +from keras.api._v2.keras.callbacks import * diff --git a/src/ccaptchas/keras/layers.py b/src/ccaptchas/keras/layers.py new file mode 100644 index 0000000..ae2a07e --- /dev/null +++ b/src/ccaptchas/keras/layers.py @@ -0,0 +1 @@ +from keras.api._v2.keras.layers import * diff --git a/src/ccaptchas/keras/models.py b/src/ccaptchas/keras/models.py new file mode 100644 index 0000000..7347354 --- /dev/null +++ b/src/ccaptchas/keras/models.py @@ -0,0 +1 @@ +from keras.api._v2.keras.models import * diff --git a/src/ccaptchas/keras/optimizers.py b/src/ccaptchas/keras/optimizers.py new file mode 100644 index 0000000..34b65fc --- /dev/null +++ b/src/ccaptchas/keras/optimizers.py @@ -0,0 +1 @@ +from keras.api._v2.keras.optimizers import * diff --git a/src/ccaptchas/model.py b/src/ccaptchas/model.py index 2c04083..449d03e 100644 --- a/src/ccaptchas/model.py +++ b/src/ccaptchas/model.py @@ -1,66 +1,40 @@ -import os +import logging +import json from datetime import datetime from pathlib import Path -from typing import Iterable -import numpy as np import tensorflow as tf -from tensorflow import keras -from tensorflow.keras import layers from .config import CONFIG -from .preproc import DatasetsInterface +from .ctc_layer import CTCLayer +from .keras.callbacks import EarlyStopping, History +from .keras.layers import Bidirectional, Conv2D, Dense, Dropout, Input, LSTM, MaxPooling2D, Reshape +from .keras.models import Model +from .keras.optimizers import Adam, Optimizer from .types import PathT -THIS_DIR = os.path.dirname(os.path.realpath(__file__)) -# Build paths relative to this file's directory like this: os_path.join(THIS_DIR, ...) - - -class CTCLayer(layers.Layer): - def __init__(self, name: str = None): - super().__init__(name=name) - self.loss_fn = keras.backend.ctc_batch_cost - - def call(self, y_true: np.ndarray = None, y_pred: np.ndarray = None) -> np.ndarray: - # Compute the training-time loss value and add it - # to the layer using `self.add_loss()`. - batch_len = tf.cast(tf.shape(y_true)[0], dtype='int64') - input_length = tf.cast(tf.shape(y_pred)[1], dtype='int64') - label_length = tf.cast(tf.shape(y_true)[1], dtype='int64') - input_length = input_length * tf.ones(shape=(batch_len, 1), dtype='int64') - label_length = label_length * tf.ones(shape=(batch_len, 1), dtype='int64') - loss = self.loss_fn(y_true, y_pred, input_length, label_length) - self.add_loss(loss) - # At test time, just return the computed predictions - return y_pred - - -# Factor by which the image is going to be downsampled -# by the convolutional blocks. We will be using two -# convolution blocks and each block will have -# a pooling layer which downsample the features by a factor of 2. -# Hence total downsampling factor would be 4. -downsample_factor = 4 +log = logging.getLogger(__name__) def build_model(alphabet_size: int, - img_width: int = CONFIG.IMG_WIDTH, - img_height: int = CONFIG.IMG_HEIGHT, - optimizer: keras.optimizers.Optimizer = keras.optimizers.Adam()) -> keras.models.Model: + img_width: int = CONFIG.DEFAULT_IMG_WIDTH, + img_height: int = CONFIG.DEFAULT_IMG_HEIGHT, + optimizer: Optimizer = Adam()) -> Model: + log.info("Building model") # Inputs to the model - input_img = layers.Input( + input_img = Input( shape=(img_width, img_height, 1), dtype='float32', - name=CONFIG.INPUT_LAYER_NAME_IMAGE + name=CONFIG.LAYER_NAME_INPUT_IMAGE ) - labels = layers.Input( + labels = Input( shape=(None, ), dtype='float32', - name=CONFIG.INPUT_LAYER_NAME_LABEL, + name=CONFIG.LAYER_NAME_INPUT_LABEL, ) # First conv block - x = layers.Conv2D( + x = Conv2D( filters=32, kernel_size=(3, 3), activation='relu', @@ -68,12 +42,12 @@ def build_model(alphabet_size: int, padding='same', name='conv1', )(input_img) - x = layers.MaxPooling2D( + x = MaxPooling2D( pool_size=(2, 2), name='pool1' )(x) # Second conv block - x = layers.Conv2D( + x = Conv2D( filters=64, kernel_size=(3, 3), activation='relu', @@ -81,72 +55,72 @@ def build_model(alphabet_size: int, padding='same', name='conv2', )(x) - x = layers.MaxPooling2D( + x = MaxPooling2D( pool_size=(2, 2), name='pool2' )(x) # We have used two max. pooling layers with pool size and strides 2. - # Hence, downsampled feature maps are 4x smaller. The number of + # Hence, down-sampled feature maps are 4x smaller. The number of # filters in the last layer is 64. Reshape accordingly before # passing the output to the RNN part of the model + down_sample_factor = 4 new_shape = ( - (img_width // 4), - (img_height // 4) * 64 + (img_width // down_sample_factor), + (img_height // down_sample_factor) * 64 ) - x = layers.Reshape( + x = Reshape( target_shape=new_shape, name='reshape' )(x) - x = layers.Dense( + x = Dense( units=64, activation='relu', name='dense1' )(x) - x = layers.Dropout(rate=0.2)(x) + x = Dropout(rate=0.2)(x) # RNNs - x = layers.Bidirectional( - layers.LSTM( + x = Bidirectional( + LSTM( units=128, return_sequences=True, dropout=0.25, ) )(x) - x = layers.Bidirectional( - layers.LSTM( + x = Bidirectional( + LSTM( units=64, return_sequences=True, dropout=0.25, ) )(x) # Output layer - x = layers.Dense( + x = Dense( units=alphabet_size + 1, activation='softmax', - name=CONFIG.OUTPUT_LAYER_NAME, + name=CONFIG.LAYER_NAME_OUTPUT, )(x) # Add CTC layer for calculating CTC loss at each step output = CTCLayer(name='ctc_loss')(labels, x) # Define the model - model = keras.models.Model( + model = Model( inputs=[input_img, labels], outputs=output, - name='ocr_model_v1' + name=CONFIG.MODEL_NAME ) - # Compile the model and return + log.debug("Compiling model") model.compile(optimizer=optimizer) return model -def train_model(model: keras.models.Model, train_dataset: tf.data.Dataset, valid_dataset: tf.data.Dataset, +def train_model(model: Model, train_dataset: tf.data.Dataset, valid_dataset: tf.data.Dataset, num_epochs: int = CONFIG.DEFAULT_NUM_EPOCHS, - early_stopping_patience: int = CONFIG.DEFAULT_EARLY_STOPPING_PATIENCE) -> keras.callbacks.History: - # Add early stopping - early_stopping = keras.callbacks.EarlyStopping( + early_stopping_patience: int = CONFIG.DEFAULT_EARLY_STOPPING_PATIENCE) -> History: + early_stopping = EarlyStopping( monitor='val_loss', patience=early_stopping_patience, restore_best_weights=True, ) - # Train the model + log.debug("Beginning training") history = model.fit( x=train_dataset, validation_data=valid_dataset, @@ -156,23 +130,20 @@ def train_model(model: keras.models.Model, train_dataset: tf.data.Dataset, valid return history -def start(data_dir: PathT, save_dir: PathT, file_extensions: Iterable[str] = CONFIG.DEFAULT_DATA_FILE_EXTENSIONS, - batch_size: int = CONFIG.DEFAULT_BATCH_SIZE, num_epochs: int = CONFIG.DEFAULT_NUM_EPOCHS, +def start(training_data: tf.data.Dataset, validation_data: tf.data.Dataset, vocabulary: str, + save_dir: PathT = CONFIG.DEFAULT_SAVE_DIR, num_epochs: int = CONFIG.DEFAULT_NUM_EPOCHS, early_stopping_patience: int = CONFIG.DEFAULT_EARLY_STOPPING_PATIENCE) -> None: save_dir = Path(save_dir, datetime.now().strftime('%Y-%m-%d_%H-%M')) save_dir.mkdir(parents=True) - print("\nConstructing datasets\n") - data_interface = DatasetsInterface(batch_size=int(batch_size), data_dir=data_dir, extensions=file_extensions) - data_interface.split_and_make_datasets() - print("\nBuilding model\n") - model = build_model(len(data_interface.characters)) - print("\nBeginning training\n") - train_model(model, data_interface.training, data_interface.validation, num_epochs=num_epochs, - early_stopping_patience=early_stopping_patience) - print("\nSaving model\n") + model = build_model(len(vocabulary)) + history = train_model(model, training_data, validation_data, + num_epochs=num_epochs, early_stopping_patience=early_stopping_patience) + log.debug("Saving model") model.save(save_dir) - print("\nSaving vocabulary\n") - vocabulary = ''.join(data_interface.forward_lookup_table.get_vocabulary()) - with open(os.path.join(save_dir, CONFIG.VOCABULARY_FILE_NAME), 'w') as f: + log.debug("Saving vocabulary") + with open(Path(save_dir, CONFIG.VOCABULARY_FILE_NAME), 'w') as f: f.write(vocabulary) - print("\nAll saved!\n") + log.debug("Saving history") + with open(Path(save_dir, CONFIG.HISTORY_FILE_NAME), 'w') as f: + json.dump(history.history, f, indent=4) + log.info("All saved!") diff --git a/src/ccaptchas/preproc.py b/src/ccaptchas/preproc.py deleted file mode 100644 index e9a5dac..0000000 --- a/src/ccaptchas/preproc.py +++ /dev/null @@ -1,330 +0,0 @@ -import os -import shutil -from pathlib import Path -from typing import Union, Mapping, Sequence, Iterable, Callable - -import matplotlib.pyplot as plt -import numpy as np -import tensorflow as tf -from tensorflow.keras import layers - -from .config import CONFIG -from .types import PathT - - -def label_files(src_dir: PathT, dest_dir: PathT, labels: Union[PathT, Sequence[str]], - reverse: bool = False, extensions: Iterable[str] = None) -> None: - """ - Copies files giving them new names by using specified labels. - - All matching files are sorted by their file name before applying the sequence of labels to them. - The first file is named with the first label, the second is named with the second label, and so on. - If a label duplicate is encountered, a dot followed by a counter is appended to the file name - *preceding* the file extension, e.g. if 'some_label.jpg' exists, 'some_label.1.jpg' may be created. - - The number of matching files must be greater than or equal to the number of labels. - Exactly one file is copied for every label; thus, after every label has been used, the operation ends. - - Args: - src_dir: - Path to directory containing the files to be copied/renamed - dest_dir: - Path to destination directory - labels: - Either a sequence of labels (strings) or a path to a files containing the labels (newline separated) - reverse (optional): - Defines which file receives which label; - if False (default), the files in `img_dir` are sorted ascending by their file name, - if True, the files are sorted descending by name. - extensions (optional): - Iterable of file extensions; only files with these extensions will be considered. - - """ - extensions = '' if extensions is None else tuple(extensions) - file_names = [name for name in os.listdir(src_dir) if name.endswith(extensions)] - file_names.sort(reverse=reverse) - try: - with open(labels, 'r') as f: - labels = f.read().strip().split('\n') - except TypeError: - pass # Assume, labels is already a sequence of strings - if not os.path.isdir(dest_dir): - raise NotADirectoryError(f"'{dest_dir}' is not a directory.") - if len(labels) > len(file_names): - raise IndexError(f"There are more labels ({len(labels)}) than files " - f"in the source directory ({len(file_names)} matching).") - for idx, label in enumerate(labels): - file_name = file_names[idx] - _, ext = os.path.splitext(file_name) - while True: - new_path = os.path.join(dest_dir, label + ext) - if not os.path.exists(new_path): - shutil.copyfile(os.path.join(src_dir, file_name), new_path) - break - pre_label, n = os.path.splitext(label) - try: - n = int(n[1:]) - except ValueError: - label = label + '.1' - else: - label = pre_label + '.' + str(n + 1) - - -def load_images_data(data_dir: PathT, extensions: Iterable[str] = ('.jpg', '.png'), verbose: bool = True - ) -> tuple[dict[str, str], str]: - """ - Creates a dictionary mapping file paths (of images) to their labels. - Everything up to the first dot in the filename is taken to be the label; - this naturally excludes file extensions, but also a filename like `ABC.1.jpg` results in the label `ABC`. - Also creates a vocabulary of characters encountered in the file names. - - Args: - data_dir: - Path-like object or string to a directory containing the desired image files - extensions (optional): - Iterable of extensions that the files considered for the resulting data should be restricted to; - defaults to restricting finds to JPEG and PNG files. - verbose (optional): - If True, the function will print out a summary of the findings before returning. - - Returns: - 2-tuple with the first element being a dictionary where the keys are the file paths and the values are the - file names (i.e. image labels) and the second element being a string of all characters present in the labels. - """ - data_dir = Path(data_dir) - file_paths_and_labels, characters = {}, set() - for file_path in data_dir.iterdir(): - if file_path.suffix not in extensions: - continue - label = file_path.name.split('.')[0] - for char in label: - characters.add(char) - file_paths_and_labels[str(file_path)] = label - if verbose: - print("Number of images/labels found: ", len(file_paths_and_labels)) - print("Number of unique characters: ", len(characters)) - print("Characters present: ", characters) - return file_paths_and_labels, ''.join(characters) - - -def get_vocab_maps(characters: Iterable[str], num_oov_indices: int = 0, mask_token: str = None - ) -> tuple[layers.StringLookup, layers.StringLookup]: - """ - Constructs two table-based lookup objects that map characters to integers and back. - - Details about the `StringLookup` class in the documentation: - https://tensorflow.org/api_docs/python/tf/keras/layers/experimental/preprocessing/StringLookup - - Args: - characters: - An iterable of strings representing the vocabulary to be mapped - num_oov_indices (optional): - Passed to the `IndexLookup` constructor; - defines the number of out-of-vocabulary (OOV) tokens to create; - assuming that no OOV characters will be encountered, the default is 0. - mask_token (optional): - Passed to the `IndexLookup` constructor; - the token representing missing values; - assuming that there will never be a value missing, the default is None. - - Returns: - 2-tuple of `StringLookup` objects, the first one mapping characters to integers, the second doing the reverse. - By default, no OOV or missing values are assumed to be encountered, - and thus each index (uniquely) represents a character from the vocabulary. - """ - char_to_int = layers.StringLookup( - vocabulary=list(characters), - num_oov_indices=num_oov_indices, - mask_token=mask_token, - ) - int_to_char = layers.StringLookup( - vocabulary=char_to_int.get_vocabulary(), - mask_token=mask_token, - invert=True, - ) - return char_to_int, int_to_char - - -def encode_image(img): - """ - Creates a `Tensor` object from an image file and transposes it. - """ - try: - # 0. Read image - img = tf.io.read_file(str(img)) - except ValueError: - pass - # 1. Decode and convert to grayscale - img = tf.io.decode_png(img, channels=1) - # 2. Convert to float32 in [0, 1] range - img = tf.image.convert_image_dtype(img, tf.float32) - # 3. Resize to the desired size - img = tf.image.resize(img, [CONFIG.IMG_HEIGHT, CONFIG.IMG_WIDTH]) - # 4. Transpose the image because we want the time - # dimension to correspond to the width of the image. - return tf.transpose(img, perm=[1, 0, 2]) - - -def encode_label(label: str, forward_lookup_table: layers.StringLookup): - """ - Creates a `Tensor` object from a label string by passing passing its characters through a `StringLookup` instance. - """ - return forward_lookup_table(tf.strings.unicode_split(label, input_encoding='UTF-8')) - - -def decode_label(tensor, backward_lookup_table: layers.StringLookup) -> str: - return tf.strings.reduce_join(backward_lookup_table(tensor)).numpy().decode('utf-8') - - -def get_sample_encoder(forward_lookup_table: layers.StringLookup) -> Callable[[str, str], dict]: - """ - Returns a function for usage in the `map(...)` method of a `Dataset` instance. - - The function will accept an image path and a label and return a dictionary; - the dictionary values will be a tensor representing the image and a tensor representing the label; - the keys for each are pre-configured and will correspond to the models input layers' names. - - Args: - forward_lookup_table: - Passed to the `encode_label` function; required for mapping individual characters to floats. - - Returns: - Callable taking two strings as arguments and returning a dictionary with string keys and Tensor values. - """ - def func(img_path: PathT, label: str) -> dict: - return { - CONFIG.INPUT_LAYER_NAME_IMAGE: encode_image(img_path), - CONFIG.INPUT_LAYER_NAME_LABEL: encode_label(label, forward_lookup_table) - } - return func - - -def make_dataset(file_paths: np.ndarray, labels: np.ndarray, - sample_encode_func: Callable[[str, str], dict], batch_size: int) -> tf.data.Dataset: - """ - Generates a `Dataset` instance from an array of image file paths and an array of corresponding labels. - - Args: - file_paths: - Array of strings, each representing a path to an image file; - each of those paths will be passed into the function encoding one data sample (as the first argument). - labels: - Array of strings, each representing a label for an image pointed to by a file path - in the `file_paths` array with the corresponding index; - each of those labels will be passed into the function encoding one data sample (as the second argument). - sample_encode_func: - Will be passed as the `map_func` argument into the `map(...)` method of the new `Dataset`; - should be a function taking two strings (image path and label) as arguments and - returning a dictionary of Tensors representing the image and label. - batch_size: - Will be passed as the `batch_size` argument into the `batch(...)` method of the new `Dataset`; - determines how the dataset will be divided into batches. - - Returns: - A `Dataset` ready to be fed into a model's `fit(...)` method, provided that model has two input layers - named in accordance with the keys of the dictionary returned by the `sample_encode_func` function. - """ - if file_paths.size != labels.size: - raise ValueError("Number of file paths must be equal to number of labels") - - dataset = tf.data.Dataset.from_tensor_slices((file_paths, labels)) - dataset = dataset.map( - map_func=sample_encode_func, - num_parallel_calls=tf.data.experimental.AUTOTUNE - ).batch( - batch_size=batch_size - ).prefetch( - buffer_size=tf.data.experimental.AUTOTUNE - ) - return dataset - - -def get_datasets(file_paths_and_labels: Mapping[str, str], sample_encode_func: Callable[[str, str], dict], - batch_size: int, train_data_ratio: float, shuffle: bool = CONFIG.SHUFFLE_DATA - ) -> tuple[tf.data.Dataset, tf.data.Dataset]: - """ - Creates a training dataset and a validation dataset from a mapping of image file paths to labels. - - Args: - file_paths_and_labels: - Mapping with keys being image file paths and values being labels of the corresponding images; - this represents the full dataset used for fitting the model. - sample_encode_func: - Will be passed as the `sample_encode_func` argument into the `make_dataset(...)` function; - should be a function taking two strings (image path and label) as arguments and - returning a dictionary of Tensors representing the image and label. - batch_size: - Will be passed as the `batch_size` argument into the `make_dataset(...)` function; - determines how each dataset will be divided into batches. - train_data_ratio: - Floating point value between 0 and 1 determining what ratio of the full dataset will be used for training; - this implies that (1 - `train_data_ratio`) will be the ratio used for validation. - shuffle: - If True, the full dataset is shuffled pseudo-randomly before being split. - - Returns: - Two `Dataset` objects ready to be fed into a model's `fit(...)` method, provided that model has two input layers - named in accordance with the keys of the dictionary returned by the `sample_encode_func` function. - """ - # 1. Get the total size of the dataset - size = len(file_paths_and_labels) - # 2. Make an indices array and shuffle it, if required - indices = np.arange(size) - if shuffle: - np.random.shuffle(indices) - # 3. Get the size of training samples; that will be the array cutoff index for the data-indices array - cutoff = int(size * train_data_ratio) - train_indices, valid_indices = indices[:cutoff], indices[cutoff:] - # 4. Split data into training and validation sets - file_paths, labels = np.array(list(file_paths_and_labels.keys())), np.array(list(file_paths_and_labels.values())) - x_train, x_valid = file_paths[train_indices], file_paths[valid_indices] - y_train, y_valid = labels[train_indices], labels[valid_indices] - # 5. Construct the actual Dataset-class objects - train_dataset = make_dataset(x_train, y_train, sample_encode_func, batch_size) - valid_dataset = make_dataset(x_valid, y_valid, sample_encode_func, batch_size) - return train_dataset, valid_dataset - - -def plot_images(images: Sequence[np.ndarray], labels: Sequence[str] = None, num_columns: int = 4, - transpose: bool = True) -> None: - if transpose: - images = tf.transpose(images, perm=[0, 2, 1, 3]) - images = images[:, :, :, 0] * 255 - images = images.numpy().astype('uint8') - num_rows = len(images) // num_columns or 1 - _, axs = plt.subplots(num_rows, num_columns, figsize=(10, 5)) - for idx, image in enumerate(images): - if num_rows == 1: - if num_columns == 1: - ax = axs - else: - ax = axs[idx // num_columns] - else: - ax = axs[idx // num_columns, idx % num_columns] - ax.imshow(image, cmap='gray') - if labels is not None: - ax.set_title(labels[idx]) - ax.axis('off') - plt.show() - - -class DatasetsInterface: - """ - Convenience class for loading and pre-processing the training and validation data for usage with a model. - """ - - def __init__(self, batch_size: int, data_dir: PathT, - extensions: Iterable[str] = CONFIG.DEFAULT_DATA_FILE_EXTENSIONS) -> None: - self.file_paths_and_labels, self.characters = load_images_data(data_dir=data_dir, extensions=extensions) - self.forward_lookup_table, self.backward_lookup_table = get_vocab_maps(characters=self.characters) - self.sample_encode_func = get_sample_encoder(forward_lookup_table=self.forward_lookup_table) - self.batch_size = batch_size - self.training, self.validation = None, None - - def split_and_make_datasets(self, train_data_ratio: float = (1 - CONFIG.VALIDATION_DATA_RATIO), - shuffle: bool = CONFIG.SHUFFLE_DATA) -> None: - self.training, self.validation = get_datasets(file_paths_and_labels=self.file_paths_and_labels, - sample_encode_func=self.sample_encode_func, - batch_size=self.batch_size, - train_data_ratio=train_data_ratio, - shuffle=shuffle) diff --git a/src/ccaptchas/preprocess.py b/src/ccaptchas/preprocess.py new file mode 100644 index 0000000..5a6b71d --- /dev/null +++ b/src/ccaptchas/preprocess.py @@ -0,0 +1,221 @@ +import logging +from pathlib import Path +from typing import Iterable + +import numpy as np +import tensorflow as tf + +from .config import CONFIG +from .keras.layers import StringLookup +from .types import PathT, SampleEncFuncT, ImgT + + +log = logging.getLogger(__name__) + +UTF8 = 'UTF-8' +IMG_DECODE_MAP = { + CONFIG.EXT_PNG: tf.image.decode_png, + CONFIG.EXT_JPG: tf.image.decode_jpeg, +} + + +def find_image_files(data_dir: PathT, file_ext: Iterable[str] = CONFIG.DEFAULT_IMG_FILE_EXT) -> list[Path]: + data_dir = Path(data_dir) + if not data_dir.is_dir(): + raise NotADirectoryError + log.debug("Finding labeled image files in directory '%s'", str(data_dir)) + img_paths = [] + for ext in file_ext: + if not ext.startswith('.'): + ext = f'.{ext}' + img_paths.extend(path for path in data_dir.glob(f'*{ext}') if path.is_file()) + log.info("Found %d image files", len(img_paths)) + return img_paths + + +def get_all_characters(img_paths: Iterable[Path]) -> str: + characters = set() + for path in img_paths: + characters.update(path.stem) + characters = ''.join(characters) + log.info("Identified %d distinct characters in the file labels: '%s'", len(characters), characters) + return characters + + +def get_lookup_table(vocabulary: Iterable[str], invert: bool = False, **kwargs) -> StringLookup: + """ + Constructs a string lookup table mapping characters to integers or vice-versa. + + Details about the `StringLookup` class in the documentation: + https://keras.io/api/layers/preprocessing_layers/categorical/string_lookup/ + + Args: + vocabulary: + An iterable of strings representing the vocabulary to be mapped + invert (optional): + If `True`, a backward lookup table is returned, which maps indices to characters. Otherwise a forward + lookup table is returned mapping characters to indices. Defaults to `False`. + **kwargs (optional): + Other keyword arguments to pass into the `StringLookup` constructors. + Defaults for `num_oov_indices` and `mask_token` are defined in the package config. + + Returns: + `StringLookup` object with the specified properties. + """ + kwargs.setdefault('num_oov_indices', CONFIG.DEFAULT_NUM_OOV_INDICES) + kwargs.setdefault('mask_token', CONFIG.DEFAULT_MASK_TOKEN) + if isinstance(vocabulary, str): + vocabulary = list(vocabulary) + return StringLookup(vocabulary=vocabulary, invert=invert, **kwargs) + + +def get_vocab_maps(characters: str, **kwargs) -> tuple[StringLookup, StringLookup]: + """ + Constructs two table-based lookup objects that map characters to integers and back. + + See `get_lookup_table` for details. + + Args: + characters: + A string of all characters in the vocabulary to be mapped; the characters should all be distinct. + **kwargs (optional): + Keyword arguments to pass into both `StringLookup` constructors. + Must not contain the `invert` and `vocabulary` keywords. + Defaults for `num_oov_indices` and `mask_token` are defined in the package config. + + Returns: + 2-tuple of `StringLookup` objects, the first one mapping characters to integers, the second doing the reverse. + """ + char_to_int = get_lookup_table(characters, invert=False, **kwargs) + int_to_char = get_lookup_table(char_to_int.get_vocabulary(), invert=True, **kwargs) + log.info("Constructed vocabulary lookup tables") + return char_to_int, int_to_char + + +def split_data(img_paths: Iterable[Path], validation_ratio: float = CONFIG.DEFAULT_VALIDATION_RATIO, + shuffle: bool = CONFIG.DEFAULT_SHUFFLE_DATA) -> tuple[np.ndarray, np.ndarray]: + """ + Splits an iterable of image paths into two arrays of training and validation data. + + Args: + img_paths: + Iterable of paths to the image files to be used for training and validation. + validation_ratio: + Float between 0 and 1 determining what ratio of the full dataset will be used for validation; + this implies that (1 - `validation_ratio`) will be the ratio used for training. + shuffle: + If True, the full dataset is shuffled pseudo-randomly before being split. + + Returns: + 2-tuple of 2D numpy arrays, the first representing the training data and the second representing the validation + data. The first dimension of each array is the dataset dimension (i.e. the samples) and the second contains the + path (as a string) at index 0 and the label for each image at index 1. + """ + if not 0 < validation_ratio < 1: + raise ValueError + paths_and_labels = np.array(tuple((str(path), path.stem) for path in img_paths)) + # 1. Get the total size of the dataset + size = len(paths_and_labels) + cutoff = int(size * (1 - validation_ratio)) + # 2. Make an indices array and shuffle it, if required + indices = np.arange(size) + if shuffle: + np.random.shuffle(indices) + # 4. Split data into training and validation sets + training_data, validation_data = paths_and_labels[indices[:cutoff]], paths_and_labels[indices[cutoff:]] + log.info("Split data into %d images for training and %d for validation", len(training_data), len(validation_data)) + return training_data, validation_data + + +def process_image(img: ImgT, img_width: int = CONFIG.DEFAULT_IMG_WIDTH, + img_height: int = CONFIG.DEFAULT_IMG_HEIGHT) -> tf.Tensor: + # 0. Read image + if isinstance(img, (str, Path)): + img = tf.io.read_file(str(img)) + # 1. Decode and convert to grayscale + img = tf.io.decode_image(img, channels=1, expand_animations=False) + # img = tf.io.decode_jpeg(img, channels=1) + # 2. Convert to float32 in [0, 1] range + img = tf.image.convert_image_dtype(img, tf.float32) + # 3. Resize to the desired size + img = tf.image.resize(img, [img_height, img_width]) + # 4. Transpose the image because we want the time + # dimension to correspond to the width of the image. + return tf.transpose(img, perm=[1, 0, 2]) + + +def encode_label(label: str, forward_lookup: StringLookup): + """ + Creates a `Tensor` object from a label string by passing passing its characters through a `StringLookup` instance. + """ + return forward_lookup(tf.strings.unicode_split(label, input_encoding=UTF8)) + + +def decode_label(tensor: tf.Tensor, backward_lookup: StringLookup) -> str: + return tf.strings.reduce_join(backward_lookup(tensor)).numpy().decode(UTF8) + + +def get_sample_encode_func(forward_lookup: StringLookup, img_width: int = CONFIG.DEFAULT_IMG_WIDTH, + img_height: int = CONFIG.DEFAULT_IMG_HEIGHT) -> SampleEncFuncT: + def encode_sample(img_path: PathT, label: str) -> dict[str, tf.Tensor]: + log.debug("Encoding image '%s'", str(img_path)) + img = process_image(tf.io.read_file(img_path), img_width=img_width, img_height=img_height) + label = encode_label(label, forward_lookup) + # Return a dict as our model is expecting two inputs + return {CONFIG.LAYER_NAME_INPUT_IMAGE: img, CONFIG.LAYER_NAME_INPUT_LABEL: label} + return encode_sample + + +def make_dataset(data: np.ndarray, sample_encode_func: SampleEncFuncT, batch_size: int) -> tf.data.Dataset: + """ + Generates a `Dataset` instance from an array of image file paths and an array of corresponding labels. + + Args: + data: + A 2D numpy array representing the data and labels to turn into a dataset for training/validation. + The first dimension of the array is the dataset dimension (i.e. the samples) and the second contains the + path (as a string) at index 0 and the label for each image at index 1. + Each path-label-pair will be passed into the `sample_encode_func` during construction of the dataset as + the only two positional arguments. + sample_encode_func: + Will be passed as the `map_func` argument into the `map(...)` method of the new `Dataset`; + should be a function taking two strings (image path and label) as arguments and + returning a dictionary of Tensors representing the image and label. + batch_size: + Will be passed as the `batch_size` argument into the `batch(...)` method of the new `Dataset`; + determines how the dataset will be divided into batches. + + Returns: + A `Dataset` ready to be fed into a model's `fit(...)` method, provided that model has two input layers + named in accordance with the keys of the dictionary returned by the `sample_encode_func` function. + """ + log.info("Constructing dataset of %d samples, split into batches of %d", len(data), batch_size) + dataset = tf.data.Dataset.from_tensor_slices((data[:, 0], data[:, 1])) + dataset = dataset.map( + map_func=sample_encode_func, + num_parallel_calls=tf.data.experimental.AUTOTUNE + ).batch( + batch_size=batch_size + ).prefetch( + buffer_size=tf.data.experimental.AUTOTUNE + ) + return dataset + + +def load_datasets(data_dir: PathT, + file_ext: Iterable[str] = CONFIG.DEFAULT_IMG_FILE_EXT, + batch_size: int = CONFIG.DEFAULT_BATCH_SIZE, + validation_ratio: float = CONFIG.DEFAULT_VALIDATION_RATIO, + shuffle: bool = CONFIG.DEFAULT_SHUFFLE_DATA, + img_width: int = CONFIG.DEFAULT_IMG_WIDTH, + img_height: int = CONFIG.DEFAULT_IMG_HEIGHT) -> tuple[tf.data.Dataset, tf.data.Dataset, str]: + log.info("Constructing datasets") + img_paths = find_image_files(data_dir, file_ext=file_ext) + characters = get_all_characters(img_paths) + forward_lookup, _ = get_vocab_maps(characters) + arr_train, arr_valid = split_data(img_paths, validation_ratio=validation_ratio, shuffle=shuffle) + encode_func = get_sample_encode_func(forward_lookup, img_width=img_width, img_height=img_height) + ds_train = make_dataset(arr_train, encode_func, batch_size=batch_size) + ds_valid = make_dataset(arr_valid, encode_func, batch_size=batch_size) + assert characters == ''.join(forward_lookup.get_vocabulary()) + return ds_train, ds_valid, characters diff --git a/src/ccaptchas/types.py b/src/ccaptchas/types.py index 6edb455..0d4b147 100644 --- a/src/ccaptchas/types.py +++ b/src/ccaptchas/types.py @@ -1,5 +1,11 @@ from pathlib import Path -from typing import Union +from typing import Callable, Union + +import numpy as np +import tensorflow as tf PathT = Union[Path, str] +SampleEncFuncT = Callable[[PathT, str], dict[str, tf.Tensor]] +ImgT = Union[PathT, bytes] +Array = np.ndarray diff --git a/src/ccaptchas/visualize.py b/src/ccaptchas/visualize.py new file mode 100644 index 0000000..0550df2 --- /dev/null +++ b/src/ccaptchas/visualize.py @@ -0,0 +1,28 @@ +from typing import Sequence + +import matplotlib.pyplot as plt +import numpy as np +import tensorflow as tf + + +def plot_images(images: Sequence[np.ndarray], labels: Sequence[str] = None, num_columns: int = 4, + transpose: bool = True) -> None: + if transpose: + images = tf.transpose(images, perm=[0, 2, 1, 3]) + images = images[:, :, :, 0] * 255 + images = images.numpy().astype('uint8') + num_rows = len(images) // num_columns or 1 + _, axs = plt.subplots(num_rows, num_columns, figsize=(10, 5)) + for idx, image in enumerate(images): + if num_rows == 1: + if num_columns == 1: + ax = axs + else: + ax = axs[idx // num_columns] + else: + ax = axs[idx // num_columns, idx % num_columns] + ax.imshow(image, cmap='gray') + if labels is not None: + ax.set_title(labels[idx]) + ax.axis('off') + plt.show()