mubaraknumann commited on
Commit
704902c
·
verified ·
1 Parent(s): b319248

Upload test_model_streamlit.py

Browse files
Files changed (1) hide show
  1. test_model_streamlit.py +259 -0
test_model_streamlit.py ADDED
@@ -0,0 +1,259 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import streamlit as st
2
+ import tensorflow as tf
3
+ from tensorflow import keras
4
+ from tensorflow.keras import layers # For custom layer definitions
5
+ import numpy as np
6
+ from PIL import Image
7
+ import json
8
+ import os
9
+
10
+ # --- RepVGGBlock Class Definition (Latest Verified Version) ---
11
+ # Users will need this definition if it's a custom layer in your model.
12
+ class RepVGGBlock(layers.Layer):
13
+ def __init__(self, in_channels, out_channels, kernel_size=3, stride=1,
14
+ groups=1, deploy=False, use_se=False, **kwargs):
15
+ super(RepVGGBlock, self).__init__(**kwargs)
16
+ self.config_initial_in_channels = in_channels
17
+ self.config_out_channels = out_channels
18
+ self.config_kernel_size = kernel_size
19
+ self.config_strides_val = stride
20
+ self.config_groups = groups
21
+ self._deploy_mode_internal = deploy
22
+ self.config_use_se = use_se
23
+ self.actual_in_channels = None
24
+
25
+ self.rbr_dense_conv = layers.Conv2D(
26
+ filters=self.config_out_channels, kernel_size=self.config_kernel_size,
27
+ strides=self.config_strides_val, padding='same',
28
+ groups=self.config_groups, use_bias=False, name=self.name + '_dense_conv'
29
+ )
30
+ self.rbr_dense_bn = layers.BatchNormalization(name=self.name + '_dense_bn')
31
+ self.rbr_1x1_conv = layers.Conv2D(
32
+ filters=self.config_out_channels, kernel_size=1,
33
+ strides=self.config_strides_val, padding='valid',
34
+ groups=self.config_groups, use_bias=False, name=self.name + '_1x1_conv'
35
+ )
36
+ self.rbr_1x1_bn = layers.BatchNormalization(name=self.name + '_1x1_bn')
37
+ self.rbr_identity_bn = None
38
+ self.rbr_reparam = layers.Conv2D(
39
+ filters=self.config_out_channels, kernel_size=self.config_kernel_size,
40
+ strides=self.config_strides_val, padding='same',
41
+ groups=self.config_groups, use_bias=True, name=self.name + '_reparam_conv'
42
+ )
43
+
44
+ def build(self, input_shape):
45
+ self.actual_in_channels = input_shape[-1]
46
+ if self.config_initial_in_channels is None:
47
+ self.config_initial_in_channels = self.actual_in_channels
48
+ elif self.config_initial_in_channels != self.actual_in_channels:
49
+ raise ValueError(f"Input channel mismatch for layer {self.name}: Expected {self.config_initial_in_channels}, got {self.actual_in_channels}")
50
+
51
+ if self.rbr_identity_bn is None and \
52
+ self.actual_in_channels == self.config_out_channels and self.config_strides_val == 1:
53
+ self.rbr_identity_bn = layers.BatchNormalization(name=self.name + '_identity_bn')
54
+
55
+ super(RepVGGBlock, self).build(input_shape)
56
+
57
+ if not self.rbr_dense_conv.built: self.rbr_dense_conv.build(input_shape)
58
+ if not self.rbr_dense_bn.built: self.rbr_dense_bn.build(self.rbr_dense_conv.compute_output_shape(input_shape))
59
+ if not self.rbr_1x1_conv.built: self.rbr_1x1_conv.build(input_shape)
60
+ if not self.rbr_1x1_bn.built: self.rbr_1x1_bn.build(self.rbr_1x1_conv.compute_output_shape(input_shape))
61
+ if self.rbr_identity_bn is not None and not self.rbr_identity_bn.built:
62
+ self.rbr_identity_bn.build(input_shape)
63
+ if not self.rbr_reparam.built:
64
+ self.rbr_reparam.build(input_shape)
65
+
66
+ def call(self, inputs):
67
+ if self._deploy_mode_internal:
68
+ return self.rbr_reparam(inputs)
69
+ else:
70
+ out_dense = self.rbr_dense_bn(self.rbr_dense_conv(inputs))
71
+ out_1x1 = self.rbr_1x1_bn(self.rbr_1x1_conv(inputs))
72
+ if self.rbr_identity_bn is not None:
73
+ out_identity = self.rbr_identity_bn(inputs)
74
+ return out_dense + out_1x1 + out_identity
75
+ else: return out_dense + out_1x1
76
+
77
+ def _fuse_bn_tensor(self, conv_layer, bn_layer): # Not called during inference with deploy=True model
78
+ kernel = conv_layer.kernel; dtype = kernel.dtype; out_channels = kernel.shape[-1]
79
+ gamma = getattr(bn_layer, 'gamma', tf.ones(out_channels, dtype=dtype))
80
+ beta = getattr(bn_layer, 'beta', tf.zeros(out_channels, dtype=dtype))
81
+ running_mean = getattr(bn_layer, 'moving_mean', tf.zeros(out_channels, dtype=dtype))
82
+ running_var = getattr(bn_layer, 'moving_variance', tf.ones(out_channels, dtype=dtype))
83
+ epsilon = bn_layer.epsilon; std = tf.sqrt(running_var + epsilon)
84
+ fused_kernel = kernel * (gamma / std)
85
+ if conv_layer.use_bias: fused_bias = beta + (gamma * (conv_layer.bias - running_mean)) / std
86
+ else: fused_bias = beta - (running_mean * gamma) / std
87
+ return fused_kernel, fused_bias
88
+
89
+ def reparameterize(self): # Not called during inference with deploy=True model
90
+ if self._deploy_mode_internal: return
91
+ branches_to_check = [self.rbr_dense_conv, self.rbr_dense_bn, self.rbr_1x1_conv, self.rbr_1x1_bn]
92
+ if self.rbr_identity_bn: branches_to_check.append(self.rbr_identity_bn)
93
+ for branch_layer in branches_to_check:
94
+ if not branch_layer.built: raise Exception(f"ERROR: Branch layer {branch_layer.name} for {self.name} not built.")
95
+ kernel_dense, bias_dense = self._fuse_bn_tensor(self.rbr_dense_conv, self.rbr_dense_bn)
96
+ kernel_1x1_unpadded, bias_1x1 = self._fuse_bn_tensor(self.rbr_1x1_conv, self.rbr_1x1_bn)
97
+ pad_amount = self.config_kernel_size // 2
98
+ kernel_1x1_padded = tf.pad(kernel_1x1_unpadded, [[pad_amount,pad_amount],[pad_amount,pad_amount],[0,0],[0,0]])
99
+ final_kernel = kernel_dense + kernel_1x1_padded; final_bias = bias_dense + bias_1x1
100
+ if self.rbr_identity_bn is not None:
101
+ running_mean_id = self.rbr_identity_bn.moving_mean; running_var_id = self.rbr_identity_bn.moving_variance
102
+ gamma_id = self.rbr_identity_bn.gamma; beta_id = self.rbr_identity_bn.beta
103
+ epsilon_id = self.rbr_identity_bn.epsilon; std_id = tf.sqrt(running_var_id + epsilon_id)
104
+ kernel_id_scaler = gamma_id / std_id
105
+ bias_id_term = beta_id - (running_mean_id * gamma_id) / std_id
106
+ identity_kernel_np = np.zeros((self.config_kernel_size,self.config_kernel_size,self.actual_in_channels,self.config_out_channels),dtype=np.float32)
107
+ for i in range(self.actual_in_channels): identity_kernel_np[pad_amount,pad_amount,i,i] = kernel_id_scaler[i].numpy()
108
+ kernel_id_final = tf.convert_to_tensor(identity_kernel_np, dtype=tf.float32)
109
+ final_kernel += kernel_id_final; final_bias += bias_id_term
110
+ if not self.rbr_reparam.built: raise Exception(f"CRITICAL ERROR: {self.rbr_reparam.name} not built before set_weights.")
111
+ self.rbr_reparam.set_weights([final_kernel, final_bias]); self._deploy_mode_internal = True
112
+
113
+ def get_config(self):
114
+ config = super(RepVGGBlock, self).get_config()
115
+ config.update({
116
+ "in_channels": self.config_initial_in_channels, "out_channels": self.config_out_channels,
117
+ "kernel_size": self.config_kernel_size, "stride": self.config_strides_val,
118
+ "groups": self.config_groups, "deploy": self._deploy_mode_internal, "use_se": self.config_use_se
119
+ }); return config
120
+ @classmethod
121
+ def from_config(cls, config): return cls(**config)
122
+ # --- End of RepVGGBlock ---
123
+
124
+ # --- NECALayer Class Definition (Verified Version) ---
125
+ class NECALayer(layers.Layer):
126
+ def __init__(self, channels, gamma=2, b=1, **kwargs):
127
+ super(NECALayer, self).__init__(**kwargs)
128
+ self.channels = channels; self.gamma = gamma; self.b = b
129
+ tf_channels = tf.cast(self.channels, tf.float32)
130
+ k_float = (tf.math.log(tf_channels) / tf.math.log(2.0) + self.b) / self.gamma
131
+ k_int = tf.cast(tf.round(k_float), tf.int32)
132
+ if tf.equal(k_int % 2, 0): self.k_scalar_val = k_int + 1
133
+ else: self.k_scalar_val = k_int
134
+ self.k_scalar_val = tf.maximum(1, self.k_scalar_val)
135
+ kernel_size_for_conv1d = (int(self.k_scalar_val.numpy()),)
136
+ self.gap = layers.GlobalAveragePooling2D(keepdims=True)
137
+ self.conv1d = layers.Conv1D(filters=1, kernel_size=kernel_size_for_conv1d, padding='same', use_bias=False, name=self.name + '_eca_conv1d')
138
+ self.sigmoid = layers.Activation('sigmoid')
139
+ def call(self, inputs):
140
+ if self.channels != inputs.shape[-1]: raise ValueError(f"Input channels {inputs.shape[-1]} != layer channels {self.channels} for {self.name}")
141
+ x = self.gap(inputs); x = tf.squeeze(x, axis=[1,2]); x = tf.expand_dims(x, axis=-1)
142
+ x = self.conv1d(x); x = tf.squeeze(x, axis=-1); attention = self.sigmoid(x)
143
+ return inputs * tf.reshape(attention, [-1, 1, 1, self.channels])
144
+ def get_config(self):
145
+ config = super(NECALayer, self).get_config()
146
+ config.update({"channels": self.channels, "gamma": self.gamma, "b": self.b}); return config
147
+ @classmethod
148
+ def from_config(cls, config): return cls(**config)
149
+ # --- End of NECALayer ---
150
+
151
+
152
+ # --- Streamlit App Configuration ---
153
+ MODEL_FILENAME = 'genera_cic_v1.keras'
154
+ LABEL_MAPPING_FILENAME = 'label_mapping.json'
155
+ IMG_WIDTH = 299
156
+ IMG_HEIGHT = 299
157
+
158
+ st.set_page_config(page_title="Genera Cloud Classifier", layout="wide")
159
+
160
+ # --- Load Model and Label Mapping (Cached for performance) ---
161
+ @st.cache_resource
162
+ def load_keras_model(model_path):
163
+ """Loads the Keras model with custom layer definitions."""
164
+ if not os.path.exists(model_path):
165
+ st.error(f"Model file not found: {model_path}")
166
+ st.error(f"Please ensure '{model_path}' is in the same directory as this script, or update the path.")
167
+ return None
168
+ try:
169
+ custom_objects = {'RepVGGBlock': RepVGGBlock, 'NECALayer': NECALayer}
170
+ model = tf.keras.models.load_model(model_path, custom_objects=custom_objects, compile=False)
171
+ print("Model loaded successfully.")
172
+ return model
173
+ except Exception as e:
174
+ st.error(f"Error loading Keras model from '{model_path}': {e}")
175
+ st.error("Make sure the custom layer definitions (RepVGGBlock, NECALayer) are correct and match the saved model.")
176
+ return None
177
+
178
+ @st.cache_data
179
+ def load_label_map(mapping_path):
180
+ """Loads the label mapping from a JSON file."""
181
+ if not os.path.exists(mapping_path):
182
+ st.error(f"Label mapping file not found: {mapping_path}")
183
+ st.error(f"Please ensure '{mapping_path}' is in the same directory as this script, or update the path.")
184
+ return None
185
+ try:
186
+ with open(mapping_path, 'r') as f:
187
+ label_data = json.load(f)
188
+ # Ensure int_to_label keys are integers, as they might be saved as strings in JSON
189
+ int_to_label = {int(k): v for k, v in label_data['int_to_label'].items()}
190
+ return int_to_label
191
+ except Exception as e:
192
+ st.error(f"Error loading label mapping from '{mapping_path}': {e}")
193
+ return None
194
+
195
+ # Load resources
196
+ model = load_keras_model(MODEL_FILENAME)
197
+ int_to_label = load_label_map(LABEL_MAPPING_FILENAME)
198
+
199
+ # --- Image Preprocessing Function ---
200
+ def preprocess_for_prediction(image_pil, target_size=(IMG_HEIGHT, IMG_WIDTH)):
201
+ """Prepares a PIL image for model prediction."""
202
+ img = image_pil.convert('RGB') # Ensure 3 channels
203
+ img_resized = img.resize(target_size)
204
+ img_array = np.array(img_resized, dtype=np.float32)
205
+ img_array = img_array / 255.0 # Normalize to [0, 1]
206
+ img_array = np.expand_dims(img_array, axis=0) # Add batch dimension
207
+ return img_array
208
+
209
+ # --- Streamlit App UI ---
210
+ st.title("☁️ Genera - Cloud Classifier 🌥️")
211
+ st.markdown("Upload an image of the sky, and this app will predict the dominant cloud genus.")
212
+
213
+ # Check if model and labels loaded successfully before proceeding
214
+ if model is None or int_to_label is None:
215
+ st.error("Application cannot start due to errors loading model or label mapping. Please check the console/logs for details.")
216
+ else:
217
+ uploaded_file = st.file_uploader("Choose a cloud image...", type=["jpg", "jpeg", "png"])
218
+
219
+ if uploaded_file is not None:
220
+ try:
221
+ image_pil = Image.open(uploaded_file)
222
+
223
+ col1, col2 = st.columns(2)
224
+ with col1:
225
+ st.image(image_pil, caption='Uploaded Image.', use_container_width=True)
226
+
227
+ # Preprocess and predict
228
+ with st.spinner('Analyzing the sky...'):
229
+ processed_image_tensor = preprocess_for_prediction(image_pil)
230
+ predictions = model.predict(processed_image_tensor)
231
+ pred_probabilities = predictions[0] # Get probabilities for the single uploaded image
232
+
233
+ with col2:
234
+ st.subheader("🔍 Prediction Results:")
235
+ # Display top N predictions with confidence
236
+ top_n = 5 # Show top 5 predictions
237
+ # Get indices of sorted probabilities (highest first)
238
+ sorted_indices = np.argsort(pred_probabilities)[::-1]
239
+
240
+ for i in range(min(top_n, len(pred_probabilities))):
241
+ class_index = sorted_indices[i]
242
+ class_name = int_to_label.get(class_index, f"Unknown Class ({class_index})")
243
+ confidence = pred_probabilities[class_index]
244
+ st.markdown(f"**{class_name}**: `{confidence*100:.2f}%`")
245
+
246
+ # Highlight the top prediction
247
+ top_pred_idx = sorted_indices[0]
248
+ top_class_name = int_to_label.get(top_pred_idx, "Unknown Class")
249
+ top_confidence = pred_probabilities[top_pred_idx]
250
+ st.success(f"**Top Prediction: {top_class_name} ({top_confidence*100:.2f}%)**")
251
+
252
+ except Exception as e:
253
+ st.error(f"An error occurred during image processing or prediction: {e}")
254
+ st.error("Please ensure the uploaded file is a valid image format (JPG, JPEG, PNG).")
255
+ else:
256
+ st.info("Please upload an image to classify.")
257
+
258
+ st.markdown("---")
259
+ st.markdown("Developed as part of the Personalized Weather Intelligence project.")