Intro Banner

E. 2D Image Modelling and CNN-Classification

To enhance the classification process, we introduced an innovative approach by transforming the 1D HI profiles into 2D images. This transformation was aimed at capturing asymmetries within the profiles, which are often indicative of underlying galactic processes. We generated three distinct 2D image models, each designed to highlight different features of the HI profiles. CNN techniques were then applied to these images to perform a more nuanced classification, leading to improved accuracy and deeper insights into the data. The results of this 2D analysis were compared with previous classifications, offering a comprehensive evaluation of our methodology.

Step 1: Inspection of Spectrum | CIG - AMIGA

In this part, the script processes and visualizes spectral data from a FITS file. It loads the spectrum, normalizes it, and then plots two types of visualizations. The first plot shows the normalized spectrum across all pixels. The second plot displays the spectrum split into right and left halves, with the left half inverted and negative, and also shows their combined form. The script uses matplotlib for plotting and astropy for reading FITS files. It is designed to be simple yet effective for analyzing spectral data.

# 1.---------Importation of Libraries------
import matplotlib.pyplot as plt
import numpy as np
from astropy.io import fits

# 2.---------Load and Normalize Spectrum Function------
def plot_spectra(file_path):
    # Load and normalize the spectrum
    spectrum = fits.getdata(file_path)[0]
    spectrum_normalized = (spectrum - spectrum.min()) / (spectrum.max() - spectrum.min())

    # Create the x-axis and split the spectrum
    x = np.arange(len(spectrum_normalized))
    center = len(x) // 2
    spectrum_right = spectrum_normalized[center:]
    spectrum_left = -spectrum_normalized[center-1::-1]
    x_right = np.arange(len(spectrum_right))

    # Combine right and left spectra
    spectrum_combined = spectrum_right + spectrum_left

    # Create the plot
    plt.figure(figsize=(10, 5))

    # Plot normalized spectrum
    plt.subplot(2, 1, 1)
    plt.plot(x, spectrum_normalized, label='Normalized Spectrum')
    plt.xlabel('Pixel')
    plt.ylabel('Normalized Flux')
    plt.title(f'Normalized Spectrum from {file_path}')
    plt.grid(True)
    plt.legend()

    # Plot separated and combined spectra
    plt.subplot(2, 1, 2)
    plt.plot(x_right, spectrum_right, label='Right Spectrum')
    plt.plot(np.arange(len(spectrum_left)), spectrum_left, label='Left Spectrum (inverted, negative)', linestyle='--')
    plt.plot(x_right, spectrum_combined, label='Combined Spectrum', linestyle='-.')
    plt.xlabel('Pixel')
    plt.ylabel('Normalized Flux')
    plt.title('Separated and Combined Spectra')
    plt.grid(True)
    plt.legend()

    plt.tight_layout()
    plt.show()

# 3.---------File Path and Function Call------
# Path to the FITS file
file_path = 'D:/1. JAE Intro ICU/sp_im_busyfit/A005580.fits_busyfit_2.fits'

# Call the function to plot the spectra
plot_spectra(file_path)

Ouput Example: image4.1

Step 2: 2D Image Modelling | BusyFit - CIG - AMIGA

This code processes a set of FITS files from a specified directory, focusing on files that match a particular naming convention. The code extracts spectral data from each FITS file, normalizes the spectra, and generates ring-like images based on the spectral data. It then creates combined plots that display the original spectrum, its right and left halves, and their combined form. The code saves these plots and ring images as JPEG files and also compiles them into a single PDF document. The entire process is managed by specifying an index range to select which FITS files to process, allowing for flexible analysis of large datasetTion. Three distinct 2D image models were generated for the symmetry study:

  • Model 1: A rotation of the fitted spectrum.
  • Model 2: A rotated spectrum with subtracted right and left profiles to highlight asymmetry.
  • Model 3: A normalized version of the second model, with adjusted pixel intensity to emphasize specific features. Methodology: The study applies current Machine Learning (ML) techniques and explores extrapolating this approach to the ALFALFA survey.

image4.1

    # 1.---------Importation of Libraries------
    import os
    import re
    import matplotlib.pyplot as plt
    import numpy as np
    from astropy.io import fits
    from matplotlib.backends.backend_pdf import PdfPages

    # 2.---------Function to Get the List of Files---------
    def get_fits_files(directory, file_pattern):
        pattern = re.compile(file_pattern)
        files = []
        for file in os.listdir(directory):
            match = pattern.match(file)
            if match:
                num = int(match.group(1))
                files.append((num, file))
        files.sort()
        return files

    # 3.---------Functions to Process FITS Files---------
    def load_fits_spectrum(file_path):
        with fits.open(file_path) as hdul:
            data = hdul[0].data
        return data

    def normalize_spectrum(spectrum):
        min_val = np.min(spectrum)
        max_val = np.max(spectrum)

        # Avoid division by zero
        if max_val - min_val == 0:
            return np.zeros_like(spectrum)  # Return an array of zeros if the spectrum is flat
        else:
            normalized = (spectrum - min_val) / (max_val - min_val)
            return normalized

    def create_ring_image_1(ax, spectrum, cmap='magma_r'):
        spectrum_length = len(spectrum)
        image_size = 2 * spectrum_length  # Image size
        image = np.zeros((image_size, image_size))  # Initialize the image
        center = image_size // 2  # Image center

        # Handling negative values in the spectrum
        if np.any(spectrum < 0):
            print("Warning: The spectrum contains negative values. Taking absolute values.")
            spectrum = np.abs(spectrum)  # Option to take absolute values

        # Normalize the spectrum between 0 and 1
        spectrum_min = np.min(spectrum)
        spectrum_range = np.max(spectrum) - spectrum_min
        if spectrum_range > 0:
            normalized_spectrum = (spectrum - spectrum_min) / spectrum_range
        else:
            normalized_spectrum = np.zeros_like(spectrum)

        for r in range(spectrum_length):
            theta = np.linspace(0, 2 * np.pi, image_size)  # Angles from 0 to 360 degrees
            x = center + r * np.cos(theta)  # x coordinates on the circle
            y = center + r * np.sin(theta)  # y coordinates on the circle
            for i in range(len(x)):
                xi = int(round(x[i]))
                yi = int(round(y[i]))
                if 0 <= xi < image_size and 0 <= yi < image_size:
                    # Scale the intensity logarithmically for better uniformity
                    scaled_intensity = normalized_spectrum[r]
                    # Ensure the values are between 0 and 1
                    image[yi, xi] = min(scaled_intensity, 1)

        # Normalize the image after value assignment
        image_min = np.min(image)
        image_max = np.max(image)
        if image_max > image_min:
            image = (image - image_min) / (image_max - image_min)

        # Display the image with a continuous colormap
        norm = plt.Normalize(vmin=0, vmax=1)
        ax.imshow(image, cmap='magma_r', norm=norm, origin='lower')
        ax.axis('off')


    def create_ring_image_2(ax, spectrum, vmin, vmax, cmap):
        spectrum_length = len(spectrum)
        image_size = 2 * spectrum_length
        image = np.zeros((image_size, image_size))
        center = image_size // 2
        for r in range(spectrum_length):
            theta = np.linspace(0, 2 * np.pi, image_size)  # Corrected line
            x = center + r * np.cos(theta)
            y = center + r * np.sin(theta)
            for i in range(len(x)):
                xi = int(round(x[i]))
                yi = int(round(y[i]))
                image[yi, xi] = spectrum[r]
        norm = plt.Normalize(vmin=vmin, vmax=vmax)
        ax.imshow(image, cmap=cmap, norm=norm, origin='lower')
        ax.axis('off')

    def create_ring_image_3(ax, spectrum, cmap):
        # Normalize image 2 to create image 3
        norm = plt.Normalize(vmin=0, vmax=1)
        spectrum_normalized = normalize_spectrum(spectrum)
        create_ring_image_2(ax, spectrum_normalized, vmin=0, vmax=1, cmap=cmap)

    def plot_and_save_figures(ax_spectrum, ax_image1, ax_image2, ax_image3, file_path, file_name, cmap, output_dir, num):
        data = load_fits_spectrum(file_path)
        if data.ndim == 2 and data.shape[0] == 1:
            spectrum = data[0]
        else:
            raise ValueError("The FITS file does not contain one-dimensional spectral data.")

        spectrum_normalized = normalize_spectrum(spectrum)
        x = np.arange(len(spectrum_normalized))
        center = len(spectrum_normalized) // 2
        spectrum_right = spectrum_normalized[center:]
        x_right = np.arange(center, len(spectrum_normalized))
        spectrum_left = -spectrum_normalized[:center][::-1]
        x_left = np.arange(center, center - len(spectrum_left), -1)
        min_length = min(len(spectrum_right), len(spectrum_left))
        spectrum_right = spectrum_right[:min_length]
        spectrum_left = spectrum_left[:min_length]
        x_right = x_right[:min_length]
        x_left = x_left[:min_length]
        spectrum_combined = spectrum_right + spectrum_left

        ax_spectrum.plot(x, spectrum_normalized, label='Normalized Spectrum', color='deepskyblue')
        ax_spectrum.plot(x_right, spectrum_right, label='Right Spectrum', color='orange')
        ax_spectrum.plot(x_right, spectrum_left, label='Left Spectrum', linestyle='--', color='green')
        ax_spectrum.plot(x_right, spectrum_combined, label='Combined Spectrum', linestyle='-.', color='red')
        ax_spectrum.set_title(file_name, fontsize=10)
        ax_spectrum.set_xlabel('Pixel', fontsize=8)
        ax_spectrum.set_ylabel('Normalized Flux', fontsize=8)
        ax_spectrum.legend(fontsize=6)
        ax_spectrum.grid(True)

        create_ring_image_1(ax_image1, spectrum_normalized, cmap=cmap)
        create_ring_image_2(ax_image2, spectrum_combined, vmin=-1, vmax=1, cmap=cmap)
        create_ring_image_3(ax_image3, spectrum_combined, cmap=cmap)

        if not os.path.exists(output_dir):
            os.makedirs(output_dir)

        num_str = f'{num:06}'

        fig1, ax1 = plt.subplots()
        ax1.imshow(ax_image1.images[0].get_array(), cmap='magma_r', vmin=0, vmax=1, origin='lower')
        ax1.axis('off')
        fig1.savefig(os.path.join(output_dir, f'CIG{num_str}_rot_im_1.jpeg'), bbox_inches='tight', pad_inches=0)
        plt.close(fig1)

        fig2, ax2 = plt.subplots()
        ax2.imshow(ax_image2.images[0].get_array(), cmap=cmap, vmin=-1, vmax=1, origin='lower')
        ax2.axis('off')
        fig2.savefig(os.path.join(output_dir, f'CIG{num_str}_rot_im_2.jpeg'), bbox_inches='tight', pad_inches=0)
        plt.close(fig2)

        fig3, ax3 = plt.subplots()
        ax3.imshow(ax_image3.images[0].get_array(), cmap=cmap, vmin=0, vmax=1, origin='lower')
        ax3.axis('off')
        fig3.savefig(os.path.join(output_dir, f'CIG{num_str}_rot_im_3.jpeg'), bbox_inches='tight', pad_inches=0)
        plt.close(fig3)

    def main(directory, output_dir, start_index, end_index, file_pattern, cmap, pdf_path):
        fits_files = get_fits_files(directory, file_pattern)
        fits_files = [file for file in fits_files if start_index <= file[0] <= end_index]
        num_files_to_process = len(fits_files)

        if num_files_to_process == 0:
            print("No FITS files found within the specified index range.")
            return

        num_rows = (num_files_to_process + 4) // 5
        num_cols = 5

        fig_spectra, axes_spectra = plt.subplots(num_rows, num_cols, figsize=(30, 20), constrained_layout=True)
        fig_image1, axes_image1 = plt.subplots(num_rows, num_cols, figsize=(20, 20), constrained_layout=True)
        fig_image2, axes_image2 = plt.subplots(num_rows, num_cols, figsize=(20, 20), constrained_layout=True)
        fig_image3, axes_image3 = plt.subplots(num_rows, num_cols, figsize=(20, 20), constrained_layout=True)

        for i, (num, file_name) in enumerate(fits_files):
            file_path = os.path.join(directory, file_name)
            print(f"Processing file: {file_path}")

            row = i // num_cols
            col = i % num_cols

            plot_and_save_figures(axes_spectra[row, col], axes_image1[row, col], axes_image2[row, col], axes_image3[row, col], file_path, file_name, cmap, output_dir, num)

            axes_image1[row, col].set_title(f'CIG{num:06}_rot_im_1.jpeg', fontsize=8)
            axes_image2[row, col].set_title(f'CIG{num:06}_rot_im_2.jpeg', fontsize=8)
            axes_image3[row, col].set_title(f'CIG{num:06}_rot_im_3.jpeg', fontsize=8)

        fig_spectra.tight_layout()
        fig_image1.tight_layout()
        fig_image2.tight_layout()
        fig_image3.tight_layout()

        with PdfPages(pdf_path) as pdf_pages:
            pdf_pages.savefig(fig_spectra)
            pdf_pages.savefig(fig_image1)
            pdf_pages.savefig(fig_image2)
            pdf_pages.savefig(fig_image3)

        plt.show()

        print(f"PDF saved at: {pdf_path}")

    # 4.---------Input Parameters---------
    directory = 'D:/1. JAE Intro ICU/CIG/ascii-files/sp_busyfit/'
    output_dir = 'D:/1. JAE Intro ICU/CIG/ascii-files/sp_busyfit/rot_2D_im'
    start_index = 0
    end_index = 20  #Not listed, rather the actual CIG Naming of Object
    file_pattern = r'CIG(\d+)_.*_busyfit_3\.fits'
    cmap = 'RdBu'
    pdf_path = os.path.join(output_dir, '2DImageModel_OutputFigures.pdf')

    # Run the main function with the specified parameters
    main(directory, output_dir, start_index, end_index, file_pattern, cmap, pdf_path)

Output example:

image4.2 image4.3 image4.4 image4.5 PDF saved at: D:/rot_2D_im\2DImageModel_OutputFigures.pdf

Step 3: Asymmetry Classification | CIG - AMIGA

This script is designed for image classification tasks using convolutional neural networks (CNNs). It processes images from a specified directory, builds a CNN model, and tunes hyperparameters (like test size, random state, and epochs) to find the best configuration. The script also includes steps for data visualization, success rate calculation, and model performance evaluation. The results, including classification success rates and visualizations, are saved in a specified results directory. The script is highly configurable, with key variables set at the beginning for easy adaptation to different datasets and scenarios.

The Symmetry Study: - Classifications made for a selected model (Models 1, 2 or 3) based on: _X.jpeg termination of selected model files. - Comparison: The resulting classification was compared with a profile classification previously conducted by the AMIGA scientific group (Espada, 2011).

Example of parametrization:

    file_suffix = "_rot_im_2.jpeg"
    ...
    num_groups = 3
    test_size_options = [0.7, 0.5, 0.3]
    random_state_options = [42, 99]
    epochs_options = [50, 80, 120]
    model_configs = 
        [32, 64, 128],      # Simple model
        [32, 64, 128, 256], # Medium model
        [32, 64, 128, 256, 512] # Complex model

The script:


    # 1.---------Importation of Libraries------
    import os
    import numpy as np
    import matplotlib.pyplot as plt
    from matplotlib.artist import Artist, allow_rasterization
    import tensorflow as tf
    from tensorflow.keras.models import Sequential
    from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Dropout, BatchNormalization
    from tensorflow.keras.preprocessing import image
    import matplotlib.pyplot as plt
    from sklearn.model_selection import train_test_split
    import pandas as pd
    from tensorflow.keras.optimizers import AdamW

    # 2.---------Configuration Variables------
    file_suffix = "_rot_im_2.jpeg"  #Selected 2D Image Model
    directory = 'D:/1. JAE Intro ICU/CIG/ascii-files/sp_busyfit/rot_im_busyfit_3' # Select Output Directory
    results_directory = os.path.join(directory, 'results_TDE5_2') # Select output file
    num_groups = 3  #Selec Number of Groups to be Classified, related to "espada2011_table1"
    espada2011_table1 = pd.read_csv(os.path.join(directory, 'Espada2011_Table1_5.csv'), delimiter=';') # Select Comparison Clasification

    # 3.---------Image Loading Function------
    def load_images(directory, file_suffix):
        images = []
        file_names = []
        for file in os.listdir(directory):
            if file.endswith(file_suffix):
                img_path = os.path.join(directory, file)
                img = image.load_img(img_path, target_size=(128, 128))
                img_array = image.img_to_array(img)
                images.append(img_array)
                file_names.append(file)
        images = np.array(images)
        return images, file_names

    # 4.---------Image Preprocessing Function------
    def preprocess_images(images):
        images = images / 255.0
        return images

    # 5.---------Model Building Function------
    def build_model(num_groups, layer_config):
        model = Sequential()
        model.add(Conv2D(layer_config[0], (3, 3), activation='relu', input_shape=(128, 128, 3)))
        model.add(BatchNormalization())
        model.add(MaxPooling2D((2, 2)))
        model.add(Dropout(0.25))

        for filters in layer_config[1:]:
            model.add(Conv2D(filters, (3, 3), activation='relu'))
            model.add(BatchNormalization())
            model.add(MaxPooling2D((2, 2)))
            model.add(Dropout(0.25))

        model.add(Flatten())
        model.add(Dense(512, activation='relu'))
        model.add(BatchNormalization())
        model.add(Dropout(0.5))
        model.add(Dense(num_groups, activation='softmax'))

        optimizer = AdamW(learning_rate=0.001, weight_decay=1e-5)
        model.compile(optimizer=optimizer, loss='sparse_categorical_crossentropy', metrics=['accuracy'])

        return model

    # 6.---------Image Classification Function------
    def classify_images(images, model):
        predictions = model.predict(images)
        predicted_classes = np.argmax(predictions, axis=1)
        return predicted_classes

    # 7.---------Visualization Function------
    def visualize_random_spectra(images, file_names, predicted_classes, num_samples=60):
        if num_samples > len(images):
            raise ValueError(f'The requested number of samples ({num_samples}) exceeds the test set size ({len(images)}).')

        # Ensure we don't exceed the maximum allowed subplots (10 rows * 6 columns = 60)
        num_samples = min(num_samples, 60)

        indices = np.random.choice(len(images), num_samples, replace=False)
        plt.figure(figsize=(20, 20))
        for i, idx in enumerate(indices):
            plt.subplot(10, 6, i + 1)
            plt.imshow(images[idx])
            plt.title(f'{file_names[idx]} - Class {predicted_classes[idx]}')
            plt.axis('off')
        plt.tight_layout()
        plt.show()

    # 8.---------Label Assignment Function------
    def assign_labels(num_samples, num_groups):
        labels = np.random.randint(0, num_groups, num_samples)
        return labels

    # 9.---------Main Function (Execution of the Experiment)------
    def main():
        os.makedirs(results_directory, exist_ok=True)  # Create the directory if it doesn't exist

        images, file_names = load_images(directory, file_suffix)
        images = preprocess_images(images)
        labels = assign_labels(len(images), num_groups)

        test_size_options = [0.7, 0.5, 0.3]
        random_state_options = [42, 99]
        epochs_options = [50, 80, 120]
        model_configs = [
            [32, 64, 128],      # Simple model
            [32, 64, 128, 256], # Medium model
            [32, 64, 128, 256, 512] # Complex model
        ]

        best_success_rate = 0
        best_model_config = None
        best_test_size = None
        best_random_state = None
        best_epochs = None

        success_rates = []  # List to store success rates for all combinations
        configurations = []  # List to store configuration details

        # 10.---------Hyperparameter Tuning Loop------
        for test_size in test_size_options:
            for random_state in random_state_options:
                for epochs in epochs_options:
                    for model_config in model_configs:
                        X_train, X_test, y_train, y_test, file_names_train, file_names_test = train_test_split(
                            images, labels, file_names, test_size=test_size, random_state=random_state
                        )

                        model = build_model(num_groups, model_config)
                        model.fit(X_train, y_train, epochs=epochs, validation_data=(X_test, y_test))

                        predicted_classes = classify_images(X_test, model)

                        model_config_str = '_'.join(map(str, model_config))
                        save_path = os.path.join(results_directory, f'classification_results_{model_config_str}_testsize{test_size}_epochs{epochs}_rs{random_state}.xlsx')
                        results_df = pd.DataFrame({'File Name': file_names_test, 'Class': predicted_classes})
                        results_df.to_excel(save_path, index=False)
                        print(f'Results saved to {os.path.abspath(save_path)}')

                        classification_results = results_df

                        classification_results['cig'] = classification_results['File Name'].str.extract(r'CIG(\d+)_')[0].astype(float)
                        classification_results = classification_results.dropna(subset=['cig'])
                        classification_results['cig'] = classification_results['cig'].astype(int)

                        merged_df = pd.merge(classification_results, espada2011_table1, on='cig', how='inner')
                        merged_df['coincidence'] = (merged_df['Class_x'] == merged_df['Class_y']).astype(int)

                        # Definir transformaciones basadas en el valor de num_groups
                        if num_groups == 2:
                            transformations = {
                                'Class_y_v1': lambda x: x,
                                'Class_y_v2': lambda x: x.map({0: 1, 1: 0}),
                            }
                        elif num_groups == 3:
                            transformations = {
                                'Class_y_v1': lambda x: x,
                                'Class_y_v2': lambda x: x.map({0: 1, 1: 2, 2: 0}),
                                'Class_y_v3': lambda x: x.map({0: 2, 1: 0, 2: 1}),
                                'Class_y_v4': lambda x: x.map({0: 0, 1: 2, 2: 1}),
                                'Class_y_v5': lambda x: x.map({0: 1, 1: 0, 2: 2}),
                                'Class_y_v6': lambda x: x.map({0: 2, 1: 1, 2: 0})
                            }
                        else:
                            raise ValueError("El valor de num_groups no es vĂ¡lido. Debe ser 2 o 3.")

                        results = []

                        # 11.---------Transformation and Success Rate Calculation------
                        for label, transform in transformations.items():
                            merged_df[label] = transform(merged_df['Class_y'])
                            coincidence_col = (merged_df['Class_x'] == merged_df[label]).astype(int)
                            success_rate = coincidence_col.mean()
                            # Check if all classes are the same
                            unique_classes = merged_df['Class_x'].nunique()
                            results.append({
                                'Transformation': label,
                                'Success Rate': success_rate,
                                'Total Matches': coincidence_col.sum(),
                                'Total Samples': len(merged_df),
                                'Unique Classes': unique_classes
                            })

                        results_df = pd.DataFrame(results)
                        # Exclude models where all samples are classified the same
                        filtered_results_df = results_df[results_df['Unique Classes'] > 1]

                        if not filtered_results_df.empty:
                            best_transformation = filtered_results_df.loc[filtered_results_df['Success Rate'].idxmax()]
                        else:
                            continue  # Skip if all classifications are the same

                        # Store the success rate and configuration for the final plot
                        success_rates.append(best_transformation['Success Rate'])
                        configurations.append(f'Model:{model_config_str} TestSize:{test_size} Epochs:{epochs} RS:{random_state}')

                        if best_transformation['Success Rate'] > best_success_rate:
                            best_success_rate = best_transformation['Success Rate']
                            best_model_config = model_config
                            best_test_size = test_size
                            best_random_state = random_state
                            best_epochs = epochs

                        results_save_path = os.path.join(results_directory, f'transformation_results_{model_config_str}_testsize{test_size}_epochs{epochs}_rs{random_state}.xlsx')
                        with pd.ExcelWriter(results_save_path) as writer:
                            filtered_results_df.to_excel(writer, sheet_name='Transformation_Summary', index=False)
                            merged_df.to_excel(writer, sheet_name='Merged_Data', index=False)

                        print(f'Results file saved at: {os.path.abspath(results_save_path)}')

        print(f"Best configuration: Model Config: {best_model_config}, Test Size: {best_test_size}, Random State: {best_random_state}, Epochs: {best_epochs}, Success Rate: {best_success_rate}")

        # 12.---------Plot of Success Rates Across All Configurations------
        plt.figure(figsize=(12, 8))
        plt.barh(configurations, success_rates, color='skyblue')
        plt.xlabel('Success Rate')
        plt.ylabel('Configurations')
        plt.title('Success Rate Across Different Model Configurations')
        plt.tight_layout()
        plot_image_path = os.path.join(results_directory, 'success_rate_across_configurations.jpeg')
        plt.savefig(plot_image_path, format='jpeg')
        plt.show()

        # 13.---------Visualization of Samples for the Best Configuration------
        X_train, X_test, y_train, y_test, file_names_train, file_names_test = train_test_split(
            images, labels, file_names, test_size=best_test_size, random_state=best_random_state
        )

        model = build_model(num_groups, best_model_config)
        model.fit(X_train, y_train, epochs=best_epochs, validation_data=(X_test, y_test))
        predicted_classes = classify_images(X_test, model)
        num_samples = min(300, len(X_test))
        visualize_random_spectra(X_test, file_names_test, predicted_classes, num_samples=num_samples)

    if __name__ == "__main__":
        main()

Output example:

image4.6 Best configuration: Model Config: [32, 64, 128, 256], Test Size: 0.3, Random State: 42, Epochs: 120, Success Rate: 0.5666666666666667 image4.7 image4.8

Step 4: Asymmetry Classification - Spectrums | CIG - AMIGA

In this part we performs automated data analysis focused on classifying and clustering ASCII-encoded images. It loads and preprocesses image data, applies multiple clustering methods (e.g., K-Means, DBSCAN) and classification algorithms (e.g., KNN, SVM, Random Forest), and evaluates their performance. The script also merges results with an external dataset to assess the accuracy of predictions, testing different configurations and transformations to find the optimal model. The results, including success rates and visualizations of random samples, are saved to Excel and plotted for further analysis.

    import os
    import numpy as np
    import pandas as pd
    from sklearn.preprocessing import StandardScaler
    from sklearn.cluster import KMeans, SpectralClustering, DBSCAN, AgglomerativeClustering
    from sklearn.mixture import GaussianMixture
    from sklearn.neighbors import KNeighborsClassifier
    from sklearn.svm import SVC
    from sklearn.ensemble import RandomForestClassifier
    import matplotlib.pyplot as plt
    from sklearn.model_selection import train_test_split
    from tslearn.shapelets import LearningShapelets

    # 1.---------Variable Configuration------
    file_suffix = ".asc"  # ASCII file extension
    directory = 'D:/1. JAE Intro ICU/CIG/ascii-files'
    results_directory = os.path.join(directory, 'results_Shapelets_TDE5_3')  # Results directory
    num_groups = 3  # Number of groups for classification
    espada2011_table1 = pd.read_csv(os.path.join(directory, 'sp_busyfit/rot_im_busyfit_3/Espada2011_Table1_5.csv'), delimiter=';')

    # 2.---------Function to Load ASCII Files------
    def load_fits_data(directory, file_suffix):
        images = []
        file_names = []
        for file in os.listdir(directory):
            if file.endswith(file_suffix):
                file_path = os.path.join(directory, file)
                try:
                    data = np.loadtxt(file_path)
                    if data.ndim == 2:  # Check if data has 2 dimensions
                        images.append(data.flatten())  # Flatten data to 1D
                        file_names.append(file)
                except Exception as e:
                    print(f"Error loading {file}: {e}")
        return images, file_names

    # 3.---------Function for Preprocessing Files------
    def preprocess_images(images, target_length=None):
        if target_length is None:
            target_length = max(len(image) for image in images)

        scaler = StandardScaler()
        images_scaled = []
        for image in images:
            image = np.pad(image, (0, max(0, target_length - len(image))), 'constant')[:target_length]
            image_scaled = scaler.fit_transform(image.reshape(-1, 1)).flatten()
            images_scaled.append(image_scaled)

        return images_scaled

    # 4.---------Function for Visualization------
    def visualize_random_spectra(images, file_names, predicted_classes, num_samples=60):
        if num_samples > len(images):
            raise ValueError(f'The requested number of samples ({num_samples}) exceeds the test set size ({len(images)}).')

        num_samples = min(num_samples, 60)

        indices = np.random.choice(len(images), num_samples, replace=False)
        plt.figure(figsize=(20, 20))
        for i, idx in enumerate(indices):
            plt.subplot(10, 6, i + 1)
            plt.plot(images[idx])  # Visualize as spectrum
            plt.title(f'{file_names[idx]} - Class {predicted_classes[idx]}')
            plt.axis('off')
        plt.tight_layout()
        plt.show()

    # 5.---------Function to Assign Labels------
    def assign_labels(num_samples, num_groups):
        labels = np.random.randint(0, num_groups, num_samples)
        return labels

    # 6.---------Function to Apply Clustering and Classification Models------
    def apply_clustering(images, method):
        if method == 'kmeans':
            model = KMeans(n_clusters=num_groups, random_state=42)
        elif method == 'spectral':
            model = SpectralClustering(n_clusters=num_groups, random_state=42)
        elif method == 'dbscan':
            model = DBSCAN(eps=0.5, min_samples=5)
        elif method == 'agglomerative':
            model = AgglomerativeClustering(n_clusters=num_groups)
        elif method == 'gaussian_mixture':
            model = GaussianMixture(n_components=num_groups, random_state=42)

        predicted_classes = model.fit_predict(images)

        return predicted_classes

    def apply_classification(X_train, y_train, X_test, method):
        if method == 'knn':
            model = KNeighborsClassifier(n_neighbors=5)
        elif method == 'svm':
            model = SVC(kernel='linear', probability=True)
        elif method == 'random_forest':
            model = RandomForestClassifier(n_estimators=100, random_state=42)
        elif method == 'shapelets':
            model = LearningShapelets(n_shapelets_per_size={100: 10, 50: 8, 30: 5, 10: 3}, max_iter=350, batch_size=10, scale=True)

        model.fit(X_train, y_train)
        predicted_classes = model.predict(X_test)

        return predicted_classes

    # 7.---------Main Function (Experiment Execution)------
    def main():
        os.makedirs(results_directory, exist_ok=True)  # Create directory if it doesn't exist

        images, file_names = load_fits_data(directory, file_suffix)
        images = preprocess_images(images)
        labels = assign_labels(len(images), num_groups)

        test_size_options = [0.7, 0.5, 0.3]
        random_state_options = [42, 99]
        methods = ['kmeans', 'spectral', 'dbscan', 'agglomerative', 'gaussian_mixture', 'knn', 'svm', 'random_forest', 'shapelets']

        best_success_rate = 0
        best_method = None
        best_test_size = None
        best_random_state = None

        success_rates = []
        configurations = []

        for test_size in test_size_options:
            for random_state in random_state_options:
                for method in methods:
                    X_train, X_test, y_train, y_test, file_names_train, file_names_test = train_test_split(
                        images, labels, file_names, test_size=test_size, random_state=random_state
                    )

                    if method in ['kmeans', 'spectral', 'dbscan', 'agglomerative', 'gaussian_mixture']:
                        predicted_classes = apply_clustering(X_test, method)
                    else:
                        predicted_classes = apply_classification(X_train, y_train, X_test, method)

                    results_df = pd.DataFrame({'File Name': file_names_test, 'Class': predicted_classes})
                    save_path = os.path.join(results_directory, f'classification_results_{method}_testsize{test_size}_rs{random_state}.xlsx')
                    results_df.to_excel(save_path, index=False)
                    print(f'Results saved to {os.path.abspath(save_path)}')

                    classification_results = results_df
                    classification_results['cig'] = classification_results['File Name'].str.extract(r'CIG(\d+)_')[0].astype(float)
                    classification_results = classification_results.dropna(subset=['cig'])
                    classification_results['cig'] = classification_results['cig'].astype(int)

                    merged_df = pd.merge(classification_results, espada2011_table1, on='cig', how='inner')
                    merged_df['coincidence'] = (merged_df['Class_x'] == merged_df['Class_y']).astype(int)

                    transformations = {
                        'Class_y_v1': lambda x: x,
                        'Class_y_v2': lambda x: x.map({0: 1, 1: 2, 2: 0}),
                        'Class_y_v3': lambda x: x.map({0: 2, 1: 0, 2: 1}),
                        'Class_y_v4': lambda x: x.map({0: 0, 1: 2, 2: 1}),
                        'Class_y_v5': lambda x: x.map({0: 1, 1: 0, 2: 2}),
                        'Class_y_v6': lambda x: x.map({0: 2, 1: 1, 2: 0})
                    }

                    results = []

                    for label, transform in transformations.items():
                        merged_df[label] = transform(merged_df['Class_y'])
                        coincidence_col = (merged_df['Class_x'] == merged_df[label]).astype(int)
                        success_rate = coincidence_col.mean()
                        results.append({
                            'Transformation': label,
                            'Success Rate': success_rate,
                            'Total Matches': coincidence_col.sum(),
                            'Total Samples': len(merged_df)
                        })

                    results_df = pd.DataFrame(results)
                    best_transformation = results_df.loc[results_df['Success Rate'].idxmax()]

                    success_rates.append(best_transformation['Success Rate'])
                    configurations.append(f'Method:{method} TestSize:{test_size} RS:{random_state}')

                    if best_transformation['Success Rate'] > best_success_rate:
                        best_success_rate = best_transformation['Success Rate']
                        best_method = method
                        best_test_size = test_size
                        best_random_state = random_state

                    results_save_path = os.path.join(results_directory, f'transformation_results_{method}_testsize{test_size}_rs{random_state}.xlsx')
                    with pd.ExcelWriter(results_save_path) as writer:
                        results_df.to_excel(writer, sheet_name='Transformation_Summary', index=False)
                        merged_df.to_excel(writer, sheet_name='Merged_Data', index=False)

                    print(f'Results file saved at: {os.path.abspath(results_save_path)}')

        print(f"Best configuration: Method: {best_method}, Test Size: {best_test_size}, Random State: {best_random_state}, Success Rate: {best_success_rate}")

        plt.figure(figsize=(12, 8))
        plt.barh(configurations, success_rates, color='skyblue')
        plt.xlabel('Success Rate')
        plt.ylabel('Configurations')
        plt.title('Success Rate Across Different Configurations')
        plt.tight_layout()
        plot_image_path = os.path.join(results_directory, 'success_rate_across_configurations.jpeg')
        plt.savefig(plot_image_path, format='jpeg')
        plt.show()

        X_train, X_test, y_train, y_test, file_names_train, file_names_test = train_test_split(
            images, labels, file_names, test_size=best_test_size, random_state=best_random_state
        )

        predicted_classes = apply_classification(X_train, y_train, X_test, best_method)
        visualize_random_spectra(X_test, file_names_test, predicted_classes, num_samples=min(300, len(X_test)))

    if __name__ == "__main__":
        main()