|
|
|
|
|
import numpy as np
|
|
import tensorflow as tf
|
|
import matplotlib.pyplot as plt
|
|
from tensorflow.keras import layers, models, regularizers, optimizers, callbacks
|
|
|
|
class AUNET:
|
|
def __init__(self, input_length=30, forecast_horizon=7, feature_dim=None,
|
|
lr=0.000352, dropout_rate=0.261, hidden_size=128, num_heads=8,
|
|
patience=10, batch_size=32, epochs=50, verbose=1):
|
|
self.input_length = input_length
|
|
self.forecast_horizon = forecast_horizon
|
|
self.feature_dim = feature_dim
|
|
self.lr = lr
|
|
self.dropout_rate = dropout_rate
|
|
self.hidden_size = hidden_size
|
|
self.num_heads = num_heads
|
|
self.patience = patience
|
|
self.batch_size = batch_size
|
|
self.epochs = epochs
|
|
self.verbose = verbose
|
|
self.model = None
|
|
self.history = None
|
|
|
|
def _build_model(self):
|
|
inputs = layers.Input(shape=(self.input_length, self.feature_dim))
|
|
x = layers.MultiHeadAttention(num_heads=self.num_heads, key_dim=self.hidden_size, dropout=self.dropout_rate)(inputs, inputs)
|
|
for _ in range(2):
|
|
x = layers.Dense(self.hidden_size, activation="relu", kernel_regularizer=regularizers.l2(1.95e-6))(x)
|
|
x = layers.BatchNormalization()(x)
|
|
x = layers.Dropout(self.dropout_rate)(x)
|
|
x = layers.Dense(self.forecast_horizon)(x)
|
|
outputs = layers.Flatten()(x)
|
|
model = models.Model(inputs, outputs)
|
|
model.compile(optimizer=optimizers.Adam(learning_rate=self.lr), loss='mse')
|
|
return model
|
|
|
|
def fit(self, X, y, validation_split=0.2):
|
|
if self.feature_dim is None:
|
|
self.feature_dim = X.shape[2]
|
|
self.model = self._build_model()
|
|
es = callbacks.EarlyStopping(monitor='val_loss', patience=self.patience, restore_best_weights=True)
|
|
self.history = self.model.fit(X, y,
|
|
epochs=self.epochs,
|
|
batch_size=self.batch_size,
|
|
validation_split=validation_split,
|
|
callbacks=[es],
|
|
verbose=self.verbose)
|
|
|
|
def predict(self, X):
|
|
return self.model.predict(X)
|
|
|
|
def evaluate(self, X, y_true, scaler_y=None):
|
|
y_pred = self.predict(X)
|
|
if scaler_y:
|
|
y_pred = scaler_y.inverse_transform(y_pred)
|
|
y_true = scaler_y.inverse_transform(y_true)
|
|
mae = np.mean(np.abs(y_true - y_pred))
|
|
rmse = np.sqrt(np.mean((y_true - y_pred)**2))
|
|
r2 = 1 - np.sum((y_true - y_pred)**2) / np.sum((y_true - np.mean(y_true))**2)
|
|
return {"MAE": mae, "RMSE": rmse, "R2": r2}
|
|
|
|
def plot_learning_curve(self):
|
|
if self.history:
|
|
plt.figure(figsize=(10, 5))
|
|
plt.plot(self.history.history['loss'], label='Train Loss')
|
|
if 'val_loss' in self.history.history:
|
|
plt.plot(self.history.history['val_loss'], label='Validation Loss')
|
|
plt.xlabel('Epoch')
|
|
plt.ylabel('Loss')
|
|
plt.title('Learning Curve')
|
|
plt.legend()
|
|
plt.grid(True)
|
|
plt.show()
|
|
|
|
def plot_predictions(self, y_true, y_pred, scaler_y=None, title='Prediction vs Actual'):
|
|
if scaler_y:
|
|
y_pred = scaler_y.inverse_transform(y_pred)
|
|
y_true = scaler_y.inverse_transform(y_true)
|
|
plt.figure(figsize=(12, 6))
|
|
plt.plot(y_true.flatten(), label='Actual')
|
|
plt.plot(y_pred.flatten(), label='Predicted')
|
|
plt.title(title)
|
|
plt.xlabel('Time Step')
|
|
plt.ylabel('Value')
|
|
plt.legend()
|
|
plt.grid(True)
|
|
plt.show()
|
|
|