|
|
import gradio as gr |
|
|
from contextlib import contextmanager |
|
|
from ultralytics import YOLO |
|
|
import cv2 |
|
|
import numpy as np |
|
|
from PIL import Image |
|
|
import torch |
|
|
from transformers import TrOCRProcessor, VisionEncoderDecoderModel |
|
|
from datetime import datetime |
|
|
from tensorflow.keras.models import load_model |
|
|
import os |
|
|
import tempfile |
|
|
|
|
|
import sqlite3 |
|
|
from sqlite3 import Error |
|
|
import re |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
cnn_logo_model = load_model('logo_model_cnn.h5') |
|
|
|
|
|
|
|
|
color_model = load_model("vehicle_color.h5") |
|
|
color_classes = ['black', 'blue', 'brown', 'green', 'pink', 'red', 'silver', 'white', 'yellow'] |
|
|
print(f"Color model input shape: {color_model.input_shape}") |
|
|
|
|
|
|
|
|
logo_classes = [ |
|
|
'Alfa romeo', 'Audi', 'BMW', 'Chevrolet', 'Citroen', 'Dacia', 'Daewoo', |
|
|
'Dodge', 'Ferrari', 'Fiat', 'Ford', 'Honda', 'Hyundai', 'Jaguar', 'Jeep', |
|
|
'Kia', 'Lada', 'Lancia', 'Land rover', 'Lexus', 'Maserati', 'Mazda', |
|
|
'Mercedes', 'Mitsubishi', 'Nissan', 'Opel', 'Peugeot', 'Porsche', |
|
|
'Renault', 'Rover', 'Saab', 'Seat', 'Skoda', 'Subaru', 'Suzuki', |
|
|
'Tata', 'Tesla', 'Toyota', 'Volkswagen', 'Volvo' |
|
|
] |
|
|
|
|
|
|
|
|
model_orientation = YOLO("direction_best.pt") |
|
|
model_plate_detection = YOLO("plate_detection.pt") |
|
|
model_logo_detection = YOLO("car_logo_detection.pt") |
|
|
model_characters = YOLO("character_detetion.pt") |
|
|
model_vehicle = YOLO("vehicle_recognition.pt") |
|
|
|
|
|
|
|
|
|
|
|
trocr_model = VisionEncoderDecoderModel.from_pretrained("microsoft/trocr-base-printed") |
|
|
trocr_processor = TrOCRProcessor.from_pretrained("microsoft/trocr-base-printed") |
|
|
|
|
|
model_per_brand = { |
|
|
'nissan': load_model("nissan_model_final2.keras"), |
|
|
'chevrolet': load_model("chevrolet_model_final2.keras"), |
|
|
} |
|
|
|
|
|
model_labels = { |
|
|
'nissan': ['nissan-altima', 'nissan-armada', 'nissan-datsun', 'nissan-maxima', 'nissan-navara', 'nissan-patrol', 'nissan-sunny'], |
|
|
'chevrolet': ['chevrolet-aveo', 'chevrolet-impala', 'chevrolet-malibu', 'chevrolet-silverado', 'chevrolet-tahoe', 'chevrolet-traverse'], |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
CATEGORIES = { |
|
|
'1': "Passenger vehicles", |
|
|
'2': "Trucks", |
|
|
'3': "Vans", |
|
|
'4': "Coaches and buses", |
|
|
'5': "Road tractors", |
|
|
'6': "Other tractors", |
|
|
'7': "Special vehicles", |
|
|
'8': "Trailers and semi-trailers", |
|
|
'9': "Motorcycles" |
|
|
} |
|
|
|
|
|
WILAYAS = { |
|
|
"01": "Adrar", "02": "Chlef", "03": "Laghouat", "04": "Oum El Bouaghi", |
|
|
"05": "Batna", "06": "Béjaïa", "07": "Biskra", "08": "Béchar", |
|
|
"09": "Blida", "10": "Bouira", "11": "Tamanrasset", "12": "Tébessa", |
|
|
"13": "Tlemcen", "14": "Tiaret", "15": "Tizi Ouzou", "16": "Alger", |
|
|
"17": "Djelfa", "18": "Jijel", "19": "Sétif", "20": "Saïda", |
|
|
"21": "Skikda", "22": "Sidi Bel Abbès", "23": "Annaba", "24": "Guelma", |
|
|
"25": "Constantine", "26": "Médéa", "27": "Mostaganem", "28": "MSila", |
|
|
"29": "Mascara", "30": "Ouargla", "31": "Oran", "32": "El Bayadh", |
|
|
"33": "Illizi", "34": "Bordj Bou Arreridj", "35": "Boumerdès", |
|
|
"36": "El Tarf", "37": "Tindouf", "38": "Tissemsilt", "39": "El Oued", |
|
|
"40": "Khenchela", "41": "Souk Ahras", "42": "Tipaza", "43": "Mila", |
|
|
"44": "Aïn Defla", "45": "Naâma", "46": "Aïn Témouchent", |
|
|
"47": "Ghardaïa", "48": "Relizane", |
|
|
"49": "El M'Ghair", "50": "El Menia", |
|
|
"51": "Ouled Djellal", "52": "Bordj Badji Mokhtar", |
|
|
"53": "Béni Abbès", "54": "Timimoun", |
|
|
"55": "Touggourt", "56": "Djanet", |
|
|
"57": "In Salah", "58": "In Guezzam" |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
shared_results = { |
|
|
"original_image": None, |
|
|
"img_rgb": None, |
|
|
"img_draw": None, |
|
|
"plate_crop_img": None, |
|
|
"logo_crop_img": None, |
|
|
"plate_with_chars_img": None, |
|
|
"trocr_char_list": [], |
|
|
"trocr_combined_text": "", |
|
|
"classification_result": "", |
|
|
"label_color": "", |
|
|
"label_orientation": "", |
|
|
"vehicle_type": "", |
|
|
"vehicle_model": "", |
|
|
"vehicle_brand": "", |
|
|
"logo_recognition_results": [], |
|
|
"current_frame": None, |
|
|
"video_path": None, |
|
|
"video_processing": False, |
|
|
"frame_count": 0, |
|
|
"total_frames": 0, |
|
|
"original_video_dimensions": None, |
|
|
"corrected_orientation": False, |
|
|
"vehicle_box": None, |
|
|
"vehicle_detected": False, |
|
|
"detection_boxes": { |
|
|
"plate": None, |
|
|
"logo": None, |
|
|
"color": None, |
|
|
"orientation": None |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def save_complete_results(plate_info, color, model, orientation, vehicle_type, brand): |
|
|
"""Sauvegarde toutes les informations dans resultats.txt""" |
|
|
with open("/content/drive/MyDrive/resultats.txt", "a", encoding="utf-8") as f: |
|
|
f.write("\n" + "="*60 + "\n") |
|
|
f.write(f"ANALYSIS CARRIED OUT ON : {datetime.now().strftime('%d/%m/%Y %H:%M:%S')}\n") |
|
|
f.write("="*60 + "\n\n") |
|
|
|
|
|
|
|
|
f.write("PLATE INFORMATION:\n") |
|
|
f.write("-"*50 + "\n") |
|
|
if plate_info: |
|
|
f.write(f"Numéro complet: {plate_info.get('matricule_complet', 'N/A')}\n") |
|
|
f.write(f"Wilaya: {plate_info.get('wilaya', ('', 'N/A'))[1]} ({plate_info.get('wilaya', ('', ''))[0]})\n") |
|
|
f.write(f"Année: {plate_info.get('annee', 'N/A')}\n") |
|
|
f.write(f"Catégorie: {plate_info.get('categorie', ('', 'N/A'))[1]} ({plate_info.get('categorie', ('', ''))[0]})\n") |
|
|
f.write(f"Série: {plate_info.get('serie', 'N/A')}\n") |
|
|
else: |
|
|
f.write("Aucune information de plaque disponible\n") |
|
|
|
|
|
|
|
|
f.write("\nCARACTÉRISTIQUES VÉHICULE:\n") |
|
|
f.write("-"*50 + "\n") |
|
|
f.write(f"Couleur: {color if color else 'Not detected'}\n") |
|
|
f.write(f"Marque: {brand if brand else 'Not detected'}\n") |
|
|
f.write(f"Modèle: {model if model else 'Not detected'}\n") |
|
|
f.write(f"Orientation: {orientation if orientation else 'Not detected'}\n") |
|
|
f.write(f"Type de véhicule: {vehicle_type if vehicle_type else 'Not detected'}\n") |
|
|
f.write("\n" + "="*60 + "\n\n") |
|
|
|
|
|
def format_vehicle_type(class_name): |
|
|
"""Formate les noms des classes de véhicules pour l'affichage""" |
|
|
vehicle_types = { |
|
|
'car': 'CAR', |
|
|
'truck': 'TRUCK', |
|
|
'bus': 'BUS', |
|
|
'motorcycle': 'MOTORCYCLE', |
|
|
'van': 'VAN', |
|
|
|
|
|
} |
|
|
return vehicle_types.get(class_name.lower(), class_name.upper()) |
|
|
|
|
|
|
|
|
def preprocess_image(image): |
|
|
return image |
|
|
|
|
|
|
|
|
|
|
|
def verify_color_model(): |
|
|
"""Vérifier que le modèle de couleur fonctionne correctement""" |
|
|
try: |
|
|
|
|
|
test_img = np.zeros((128, 128, 3), dtype=np.uint8) |
|
|
test_img[:,:,0] = 255 |
|
|
|
|
|
|
|
|
cv2.imwrite("/tmp/test_red.jpg", test_img) |
|
|
color, confidence = predict_color("/tmp/test_red.jpg") |
|
|
|
|
|
print(f"Test modèle couleur - Devrait être 'red': {color} ({confidence}%)") |
|
|
|
|
|
|
|
|
print(f"Classes disponibles: {color_classes}") |
|
|
|
|
|
|
|
|
print(f"Forme d'entrée attendue: {color_model.input_shape}") |
|
|
except Exception as e: |
|
|
print(f"Échec du test du modèle couleur: {e}") |
|
|
|
|
|
|
|
|
verify_color_model() |
|
|
|
|
|
|
|
|
def is_algerian_plate(text): |
|
|
digits_only = ''.join(c for c in text if c.isdigit()) |
|
|
if len(digits_only) < 5: |
|
|
return False |
|
|
wilaya_code = digits_only[-2:] |
|
|
return wilaya_code.isdigit() and 1 <= int(wilaya_code) <= 58 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def classify_plate(text): |
|
|
"""Classification complète du numéro de plaque algérienne""" |
|
|
try: |
|
|
|
|
|
clean_text = ''.join(c for c in text if c.isalnum()).upper() |
|
|
|
|
|
if len(clean_text) < 7 or not is_algerian_plate(clean_text): |
|
|
return None |
|
|
|
|
|
matricule_complet = clean_text |
|
|
position = clean_text[:-5] |
|
|
middle = clean_text[-5:-2] |
|
|
wilaya_code = clean_text[-2:] |
|
|
|
|
|
if not middle.isdigit() or not wilaya_code.isdigit(): |
|
|
return None |
|
|
|
|
|
categorie = middle[0] |
|
|
annee = f"20{middle[1:]}" if middle[1:].isdigit() else "Unknown" |
|
|
wilaya = WILAYAS.get(wilaya_code, "Wilaya Unknown") |
|
|
vehicle_type = CATEGORIES.get(categorie, "Category Unknown") |
|
|
|
|
|
return { |
|
|
'matricule_complet': matricule_complet, |
|
|
'wilaya': (wilaya_code, wilaya), |
|
|
'annee': annee, |
|
|
'categorie': (categorie, vehicle_type), |
|
|
'serie': position |
|
|
} |
|
|
except Exception as e: |
|
|
print(f"Classification error: {str(e)}") |
|
|
return None |
|
|
|
|
|
|
|
|
def predict_brand(image): |
|
|
"""Prédire la marque de voiture à partir de l'image en utilisant le modèle CNN""" |
|
|
try: |
|
|
img = Image.fromarray(image).resize((224, 224)) |
|
|
img_array = np.array(img) / 255.0 |
|
|
img_array = np.expand_dims(img_array, axis=0) |
|
|
|
|
|
predictions = cnn_logo_model.predict(img_array) |
|
|
predicted_class = np.argmax(predictions[0]) |
|
|
confidence = predictions[0][predicted_class] |
|
|
|
|
|
if confidence < 0.5: |
|
|
return "Brand not detected (confidence too low)" |
|
|
|
|
|
brand = logo_classes[predicted_class] |
|
|
return f"{brand} (confiance: {confidence:.2f})" |
|
|
except Exception as e: |
|
|
print(f"Error predicting brand: {str(e)}") |
|
|
return "Detection error" |
|
|
|
|
|
def predict_color(img_input): |
|
|
"""Fonction pour prédire la couleur du véhicule en utilisant le modèle CNN""" |
|
|
try: |
|
|
|
|
|
if isinstance(img_input, str): |
|
|
img = Image.open(img_input).convert('RGB').resize((128, 128)) |
|
|
elif isinstance(img_input, np.ndarray): |
|
|
if len(img_input.shape) == 2: |
|
|
img = Image.fromarray(cv2.cvtColor(img_input, cv2.COLOR_GRAY2RGB)).resize((128, 128)) |
|
|
else: |
|
|
img = Image.fromarray(cv2.cvtColor(img_input, cv2.COLOR_BGR2RGB)).resize((128, 128)) |
|
|
elif isinstance(img_input, Image.Image): |
|
|
img = img_input.convert('RGB').resize((128, 128)) |
|
|
else: |
|
|
return "Inconnu", 0.0 |
|
|
|
|
|
|
|
|
img_array = np.array(img) / 255.0 |
|
|
img_array = np.expand_dims(img_array, axis=0) |
|
|
|
|
|
|
|
|
if img_array.shape[1:] != (128, 128, 3): |
|
|
return "Inconnu", 0.0 |
|
|
|
|
|
|
|
|
prediction = color_model.predict(img_array, verbose=0) |
|
|
predicted_index = np.argmax(prediction) |
|
|
predicted_label = color_classes[predicted_index] |
|
|
confidence = np.max(prediction) * 100 |
|
|
|
|
|
return predicted_label, confidence |
|
|
except Exception as e: |
|
|
print(f"Erreur lors de la prédiction de couleur: {e}") |
|
|
return "Inconnu", 0.0 |
|
|
|
|
|
|
|
|
def recognize_logo(cropped_logo): |
|
|
"""Reconnaître la marque à partir d'un logo détecté""" |
|
|
try: |
|
|
if cropped_logo.size == 0: |
|
|
return "Logo too small for analysis" |
|
|
|
|
|
resized_logo = cv2.resize(np.array(cropped_logo), (128, 128)) |
|
|
rgb_logo = cv2.cvtColor(resized_logo, cv2.COLOR_BGR2RGB) |
|
|
normalized_logo = rgb_logo / 255.0 |
|
|
input_logo = np.expand_dims(normalized_logo, axis=0) |
|
|
|
|
|
predictions = cnn_logo_model.predict(input_logo, verbose=0) |
|
|
pred_index = np.argmax(predictions[0]) |
|
|
pred_label = logo_classes[pred_index] |
|
|
pred_conf = predictions[0][pred_index] |
|
|
|
|
|
if pred_conf < 0.5: |
|
|
return f"Uncertain brand: {pred_label} ({pred_conf:.2f})" |
|
|
|
|
|
return f"{pred_label} (confiance: {pred_conf:.2f})" |
|
|
except Exception as e: |
|
|
print(f"Logo recognition error: {str(e)}") |
|
|
return "Parse error" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def recognize_model(brand, logo_crop): |
|
|
"""Reconnaître le modèle spécifique d'une voiture à partir de son logo""" |
|
|
try: |
|
|
|
|
|
clean_brand = brand.split('(')[0].strip().lower() if '(' in brand else brand.lower() |
|
|
|
|
|
if clean_brand not in model_per_brand: |
|
|
return f"Model detection not available for {brand}" |
|
|
|
|
|
if logo_crop.size == 0: |
|
|
return "Image too small for analysis" |
|
|
|
|
|
model_recognizer = model_per_brand[clean_brand] |
|
|
model_input_height, model_input_width = model_recognizer.input_shape[1:3] |
|
|
|
|
|
|
|
|
resized_model = cv2.resize(np.array(logo_crop), (model_input_width, model_input_height)) |
|
|
normalized_model = resized_model / 255.0 |
|
|
input_model = np.expand_dims(normalized_model, axis=0) |
|
|
|
|
|
|
|
|
model_predictions = model_recognizer.predict(input_model, verbose=0) |
|
|
model_index = np.argmax(model_predictions[0]) |
|
|
|
|
|
|
|
|
if clean_brand in model_labels and model_index < len(model_labels[clean_brand]): |
|
|
model_name = model_labels[clean_brand][model_index] |
|
|
return model_name |
|
|
else: |
|
|
return f"Model {model_index} (no label available)" |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Model recognition error: {str(e)}") |
|
|
return "Detection error" |
|
|
|
|
|
def draw_detection_boxes(image): |
|
|
"""Dessiner toutes les boîtes de détection sur l'image""" |
|
|
img_draw = image.copy() |
|
|
|
|
|
|
|
|
if shared_results["vehicle_box"]: |
|
|
x1, y1, x2, y2 = shared_results["vehicle_box"] |
|
|
cv2.rectangle(img_draw, (x1, y1), (x2, y2), (0, 165, 255), 2) |
|
|
vehicle_type = shared_results.get("vehicle_type", "VEHICLE") |
|
|
cv2.putText(img_draw, vehicle_type, (x1, y1 - 10), |
|
|
cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 165, 255), 2) |
|
|
|
|
|
if shared_results["detection_boxes"]["plate"]: |
|
|
x1, y1, x2, y2 = shared_results["detection_boxes"]["plate"] |
|
|
cv2.rectangle(img_draw, (x1, y1), (x2, y2), (0, 255, 0), 2) |
|
|
cv2.putText(img_draw, "PLATE", (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 255, 0), 2) |
|
|
|
|
|
|
|
|
if shared_results["detection_boxes"]["logo"]: |
|
|
x1, y1, x2, y2 = shared_results["detection_boxes"]["logo"] |
|
|
cv2.rectangle(img_draw, (x1, y1), (x2, y2), (255, 0, 0), 2) |
|
|
cv2.putText(img_draw, "LOGO", (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (255, 0, 0), 2) |
|
|
|
|
|
|
|
|
if shared_results["vehicle_model"]: |
|
|
model_text = shared_results["vehicle_model"].split("(")[0].strip() if "(" in shared_results["vehicle_model"] else shared_results["vehicle_model"] |
|
|
cv2.putText(img_draw, f"Model: {model_text}", (x1, y2 + 20), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 0, 0), 2) |
|
|
|
|
|
|
|
|
if shared_results["detection_boxes"]["color"]: |
|
|
x1, y1, x2, y2 = shared_results["detection_boxes"]["color"] |
|
|
cv2.rectangle(img_draw, (x1, y1), (x2, y2), (0, 0, 255), 2) |
|
|
cv2.putText(img_draw, "COLOR", (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 0, 255), 2) |
|
|
|
|
|
|
|
|
if shared_results["label_color"]: |
|
|
cv2.putText(img_draw, f"{shared_results['label_color']}", (x1, y2 + 20), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2) |
|
|
|
|
|
|
|
|
if shared_results["detection_boxes"]["orientation"]: |
|
|
x1, y1, x2, y2 = shared_results["detection_boxes"]["orientation"] |
|
|
cv2.rectangle(img_draw, (x1, y1), (x2, y2), (255, 255, 0), 2) |
|
|
cv2.putText(img_draw, "ORIENTATION", (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (255, 255, 0), 2) |
|
|
|
|
|
|
|
|
if shared_results["label_orientation"]: |
|
|
cv2.putText(img_draw, f"{shared_results['label_orientation']}", (x1, y2 + 20), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 0), 2) |
|
|
|
|
|
return img_draw |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def load_input(input_data): |
|
|
"""Charger une image ou une vidéo et préparer le premier frame""" |
|
|
if isinstance(input_data, str): |
|
|
if input_data.lower().endswith(('.png', '.jpg', '.jpeg')): |
|
|
|
|
|
return load_image(input_data) |
|
|
else: |
|
|
|
|
|
return load_video(input_data) |
|
|
else: |
|
|
return load_image(input_data) |
|
|
|
|
|
|
|
|
def load_image(image_path): |
|
|
"""Charger et préparer l'image de base""" |
|
|
if isinstance(image_path, str): |
|
|
img = cv2.imread(image_path) |
|
|
else: |
|
|
img = cv2.cvtColor(image_path, cv2.COLOR_RGB2BGR) |
|
|
|
|
|
if img is None: |
|
|
raise gr.Error("Failed to read image") |
|
|
|
|
|
|
|
|
img_processed = preprocess_image(img) |
|
|
|
|
|
img_rgb = cv2.cvtColor(img_processed, cv2.COLOR_BGR2RGB) |
|
|
img_draw = img_rgb.copy() |
|
|
|
|
|
shared_results["original_image"] = img |
|
|
shared_results["img_rgb"] = img_rgb |
|
|
shared_results["img_draw"] = img_draw |
|
|
shared_results["video_processing"] = False |
|
|
shared_results["corrected_orientation"] = False |
|
|
|
|
|
|
|
|
shared_results["detection_boxes"] = { |
|
|
"plate": None, |
|
|
"logo": None, |
|
|
"color": None, |
|
|
"orientation": None |
|
|
} |
|
|
|
|
|
return Image.fromarray(img_rgb) |
|
|
|
|
|
|
|
|
def load_video(video_path): |
|
|
"""Charger une vidéo et préparer le premier frame""" |
|
|
cap = cv2.VideoCapture(video_path) |
|
|
if not cap.isOpened(): |
|
|
raise gr.Error("Video playback failed") |
|
|
|
|
|
|
|
|
shared_results["video_path"] = video_path |
|
|
shared_results["video_processing"] = True |
|
|
shared_results["frame_count"] = 0 |
|
|
shared_results["total_frames"] = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
|
|
|
|
|
|
|
|
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) |
|
|
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) |
|
|
shared_results["original_video_dimensions"] = (width, height) |
|
|
|
|
|
|
|
|
success, frame = cap.read() |
|
|
cap.release() |
|
|
|
|
|
if not success: |
|
|
raise gr.Error("Failed to play first frame of video") |
|
|
|
|
|
|
|
|
frame_processed = preprocess_image(frame) |
|
|
|
|
|
img_rgb = cv2.cvtColor(frame_processed, cv2.COLOR_BGR2RGB) |
|
|
img_draw = img_rgb.copy() |
|
|
|
|
|
shared_results["original_image"] = frame |
|
|
shared_results["img_rgb"] = img_rgb |
|
|
shared_results["img_draw"] = img_draw |
|
|
shared_results["current_frame"] = frame_processed |
|
|
shared_results["corrected_orientation"] = False |
|
|
|
|
|
|
|
|
shared_results["detection_boxes"] = { |
|
|
"plate": None, |
|
|
"logo": None, |
|
|
"color": None, |
|
|
"orientation": None |
|
|
} |
|
|
|
|
|
return Image.fromarray(img_rgb) |
|
|
|
|
|
def get_next_video_frame(): |
|
|
"""Obtenir le frame suivant de la vidéo en cours""" |
|
|
if not shared_results["video_processing"] or not shared_results["video_path"]: |
|
|
return None |
|
|
|
|
|
cap = cv2.VideoCapture(shared_results["video_path"]) |
|
|
if not cap.isOpened(): |
|
|
return None |
|
|
|
|
|
|
|
|
shared_results["frame_count"] += 1 |
|
|
cap.set(cv2.CAP_PROP_POS_FRAMES, shared_results["frame_count"]) |
|
|
|
|
|
success, frame = cap.read() |
|
|
cap.release() |
|
|
|
|
|
if not success: |
|
|
|
|
|
shared_results["frame_count"] = 0 |
|
|
cap = cv2.VideoCapture(shared_results["video_path"]) |
|
|
success, frame = cap.read() |
|
|
cap.release() |
|
|
if not success: |
|
|
return None |
|
|
|
|
|
|
|
|
frame = cv2.resize(frame, shared_results["original_video_dimensions"]) |
|
|
|
|
|
|
|
|
frame_processed = preprocess_image(frame) |
|
|
|
|
|
img_rgb = cv2.cvtColor(frame_processed, cv2.COLOR_BGR2RGB) |
|
|
img_draw = img_rgb.copy() |
|
|
|
|
|
shared_results["original_image"] = frame |
|
|
shared_results["img_rgb"] = img_rgb |
|
|
shared_results["img_draw"] = img_draw |
|
|
shared_results["current_frame"] = frame_processed |
|
|
shared_results["corrected_orientation"] = False |
|
|
|
|
|
|
|
|
shared_results["detection_boxes"] = { |
|
|
"plate": None, |
|
|
"logo": None, |
|
|
"color": None, |
|
|
"orientation": None |
|
|
} |
|
|
|
|
|
return Image.fromarray(img_rgb) |
|
|
|
|
|
|
|
|
def detect_vehicle(): |
|
|
"""Détecter le véhicule principal dans l'image""" |
|
|
if shared_results["img_rgb"] is None: |
|
|
return "Veuillez d'abord charger une image/vidéo", None, "" |
|
|
|
|
|
img_to_process = shared_results["img_rgb"] |
|
|
if shared_results.get("corrected_orientation", False): |
|
|
height, width = img_to_process.shape[:2] |
|
|
if height > width: |
|
|
img_to_process = cv2.rotate(img_to_process, cv2.ROTATE_90_CLOCKWISE) |
|
|
|
|
|
results_vehicle = model_vehicle(img_to_process) |
|
|
img_with_boxes = img_to_process.copy() |
|
|
vehicle_detected = False |
|
|
vehicle_type = "" |
|
|
highest_conf = 0 |
|
|
|
|
|
for r in results_vehicle: |
|
|
if r.boxes: |
|
|
for box in r.boxes: |
|
|
conf = box.conf.item() |
|
|
if conf < 0.5: |
|
|
continue |
|
|
|
|
|
if conf > highest_conf: |
|
|
highest_conf = conf |
|
|
x1, y1, x2, y2 = map(int, box.xyxy[0]) |
|
|
cls = int(box.cls[0]) |
|
|
vehicle_type = model_vehicle.names[cls].upper() |
|
|
|
|
|
|
|
|
cv2.rectangle(img_with_boxes, (x1, y1), (x2, y2), (0, 165, 255), 2) |
|
|
cv2.putText(img_with_boxes, f"{vehicle_type} {conf:.2f}", (x1, y1 - 10), |
|
|
cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 165, 255), 2) |
|
|
|
|
|
shared_results["vehicle_box"] = (x1, y1, x2, y2) |
|
|
shared_results["vehicle_detected"] = True |
|
|
shared_results["vehicle_type"] = vehicle_type |
|
|
vehicle_detected = True |
|
|
|
|
|
shared_results["img_draw"] = img_with_boxes |
|
|
|
|
|
if vehicle_detected: |
|
|
return f"{vehicle_type} détecté (confiance: {highest_conf:.2f})", Image.fromarray(img_with_boxes), vehicle_type |
|
|
else: |
|
|
shared_results["vehicle_box"] = None |
|
|
shared_results["vehicle_detected"] = False |
|
|
return "Aucun véhicule détecté (confiance trop faible)", Image.fromarray(img_with_boxes), "" |
|
|
|
|
|
|
|
|
def detect_color(): |
|
|
"""Détecter la couleur du véhicule en utilisant le modèle CNN""" |
|
|
if shared_results["img_rgb"] is None: |
|
|
return "Please upload an image/video", None |
|
|
|
|
|
try: |
|
|
|
|
|
if shared_results["vehicle_detected"] and shared_results["vehicle_box"]: |
|
|
x1, y1, x2, y2 = shared_results["vehicle_box"] |
|
|
vehicle_roi = shared_results["img_rgb"][y1:y2, x1:x2] |
|
|
else: |
|
|
vehicle_roi = shared_results["img_rgb"] |
|
|
|
|
|
|
|
|
vehicle_pil = Image.fromarray(vehicle_roi) |
|
|
|
|
|
|
|
|
color, confidence = predict_color(vehicle_pil) |
|
|
|
|
|
|
|
|
shared_results["label_color"] = f"{color} ({confidence:.1f}%)" |
|
|
|
|
|
|
|
|
img_with_boxes = shared_results["img_draw"].copy() |
|
|
|
|
|
if shared_results["vehicle_detected"] and shared_results["vehicle_box"]: |
|
|
x1, y1, x2, y2 = shared_results["vehicle_box"] |
|
|
shared_results["detection_boxes"]["color"] = (x1, y1, x2, y2) |
|
|
cv2.rectangle(img_with_boxes, (x1, y1), (x2, y2), (0, 255, 0), 2) |
|
|
cv2.putText(img_with_boxes, "Color", (x1, y1-10), |
|
|
cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0,255,0), 2) |
|
|
cv2.putText(img_with_boxes, f"{color} ({confidence:.1f}%)", (x1, y2+20), |
|
|
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0,255,0), 2) |
|
|
|
|
|
shared_results["img_draw"] = img_with_boxes |
|
|
|
|
|
return f"Color: {color} ({confidence:.1f}%)", Image.fromarray(img_with_boxes) |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Color detection error: {e}") |
|
|
return f"Color detection failed: {str(e)}", Image.fromarray(shared_results["img_draw"]) |
|
|
def detect_orientation(): |
|
|
"""Détecter l'orientation du véhicule""" |
|
|
if shared_results["img_rgb"] is None: |
|
|
return "Please upload an image/video" |
|
|
|
|
|
|
|
|
img_to_process = shared_results["img_rgb"] |
|
|
if shared_results["video_processing"]: |
|
|
|
|
|
height, width = img_to_process.shape[:2] |
|
|
if height > width: |
|
|
img_to_process = cv2.rotate(img_to_process, cv2.ROTATE_90_CLOCKWISE) |
|
|
shared_results["corrected_orientation"] = True |
|
|
|
|
|
results_orientation = model_orientation(img_to_process) |
|
|
for r in results_orientation: |
|
|
if hasattr(r, 'boxes') and r.boxes and hasattr(r.boxes, 'cls') and len(r.boxes.cls) > 0: |
|
|
cls = int(r.boxes.cls[0]) |
|
|
shared_results["label_orientation"] = r.names[cls] |
|
|
|
|
|
|
|
|
box = r.boxes.xyxy[0].cpu().numpy() |
|
|
x1, y1, x2, y2 = map(int, box) |
|
|
shared_results["detection_boxes"]["orientation"] = (x1, y1, x2, y2) |
|
|
|
|
|
|
|
|
img_with_boxes = draw_detection_boxes(shared_results["img_rgb"]) |
|
|
shared_results["img_draw"] = img_with_boxes |
|
|
|
|
|
return f"Orientation: {shared_results['label_orientation']}" if shared_results['label_orientation'] else "Orientation not detected", Image.fromarray(img_with_boxes) |
|
|
|
|
|
def detect_logo_and_model(): |
|
|
"""Détecter et reconnaître le logo et le modèle du véhicule""" |
|
|
if shared_results["img_rgb"] is None: |
|
|
return "Please upload an image first", None, None, None, None |
|
|
|
|
|
shared_results["logo_recognition_results"] = [] |
|
|
img_draw = shared_results["img_draw"].copy() |
|
|
detected_model = "Model not detected" |
|
|
|
|
|
try: |
|
|
results_logo = model_logo_detection(shared_results["img_rgb"]) |
|
|
if results_logo and results_logo[0].boxes: |
|
|
for box in results_logo[0].boxes: |
|
|
x1, y1, x2, y2 = map(int, box.xyxy[0]) |
|
|
cv2.rectangle(img_draw, (x1, y1), (x2, y2), (255, 0, 0), 2) |
|
|
|
|
|
logo_crop = shared_results["img_rgb"][y1:y2, x1:x2] |
|
|
shared_results["logo_crop_img"] = Image.fromarray(logo_crop) |
|
|
|
|
|
|
|
|
logo_recognition = recognize_logo(shared_results["logo_crop_img"]) |
|
|
shared_results["logo_recognition_results"].append(logo_recognition) |
|
|
|
|
|
cv2.putText(img_draw, "LOGO", (x1, y1 - 10), |
|
|
cv2.FONT_HERSHEY_SIMPLEX, 0.9, (255,0,0), 2) |
|
|
|
|
|
|
|
|
if logo_recognition and "not detected" not in logo_recognition.lower(): |
|
|
try: |
|
|
brand = logo_recognition.split('(')[0].strip().lower() |
|
|
detected_model = recognize_model(brand, shared_results["logo_crop_img"]) |
|
|
|
|
|
|
|
|
cv2.putText(img_draw, f"Modèle: {detected_model}", (x1, y2 + 20), |
|
|
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2) |
|
|
except Exception as e: |
|
|
print(f"Model recognition failed: {str(e)}") |
|
|
detected_model = "Model detection failed" |
|
|
|
|
|
shared_results["vehicle_model"] = detected_model |
|
|
|
|
|
|
|
|
if not shared_results["vehicle_brand"] or "not detected" in shared_results["vehicle_brand"].lower(): |
|
|
global_brand = predict_brand(shared_results["img_rgb"]) |
|
|
if global_brand and "not detected" not in global_brand.lower(): |
|
|
shared_results["vehicle_brand"] = global_brand |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Error in logo and model detection: {str(e)}") |
|
|
shared_results["vehicle_brand"] = "Detection error" |
|
|
shared_results["vehicle_model"] = "Detection error" |
|
|
|
|
|
logo_results_text = " | ".join(shared_results["logo_recognition_results"]) if shared_results["logo_recognition_results"] else "No logo recognized" |
|
|
|
|
|
return ( |
|
|
f"Brand: {shared_results['vehicle_brand']}" if shared_results['vehicle_brand'] else "Brand not detected", |
|
|
f"Model: {shared_results['vehicle_model']}" if shared_results['vehicle_model'] else "Model not detected", |
|
|
f"Logo recognition: {logo_results_text}", |
|
|
Image.fromarray(img_draw), |
|
|
shared_results["logo_crop_img"] |
|
|
) |
|
|
|
|
|
def detect_plate(): |
|
|
"""Détecter la plaque d'immatriculation et reconnaître les caractères""" |
|
|
if shared_results["img_rgb"] is None: |
|
|
return "Please upload an image/video", None, None, None |
|
|
|
|
|
shared_results["trocr_char_list"] = [] |
|
|
shared_results["trocr_combined_text"] = "" |
|
|
img_to_process = shared_results["img_rgb"] |
|
|
|
|
|
|
|
|
if shared_results.get("corrected_orientation", False): |
|
|
height, width = img_to_process.shape[:2] |
|
|
if height > width: |
|
|
img_to_process = cv2.rotate(img_to_process, cv2.ROTATE_90_CLOCKWISE) |
|
|
|
|
|
|
|
|
if shared_results["vehicle_detected"] and shared_results["vehicle_box"]: |
|
|
vx1, vy1, vx2, vy2 = shared_results["vehicle_box"] |
|
|
roi = img_to_process[vy1:vy2, vx1:vx2] |
|
|
results_plate = model_plate_detection(roi) |
|
|
else: |
|
|
results_plate = model_plate_detection(img_to_process) |
|
|
|
|
|
if results_plate and results_plate[0].boxes: |
|
|
for box in results_plate[0].boxes: |
|
|
|
|
|
if shared_results["vehicle_detected"] and shared_results["vehicle_box"]: |
|
|
vx1, vy1, vx2, vy2 = shared_results["vehicle_box"] |
|
|
rx1, ry1, rx2, ry2 = map(int, box.xyxy[0]) |
|
|
|
|
|
x1 = vx1 + rx1 |
|
|
y1 = vy1 + ry1 |
|
|
x2 = vx1 + rx2 |
|
|
y2 = vy1 + ry2 |
|
|
else: |
|
|
x1, y1, x2, y2 = map(int, box.xyxy[0]) |
|
|
|
|
|
shared_results["detection_boxes"]["plate"] = (x1, y1, x2, y2) |
|
|
plate_crop = img_to_process[y1:y2, x1:x2] |
|
|
shared_results["plate_crop_img"] = Image.fromarray(plate_crop) |
|
|
plate_for_char_draw = plate_crop.copy() |
|
|
|
|
|
|
|
|
results_chars = model_characters(plate_crop) |
|
|
char_boxes = [] |
|
|
for r in results_chars: |
|
|
if r.boxes: |
|
|
for box in r.boxes: |
|
|
x1c, y1c, x2c, y2c = map(int, box.xyxy[0]) |
|
|
char_boxes.append(((x1c, y1c, x2c, y2c), x1c)) |
|
|
|
|
|
char_boxes.sort(key=lambda x: x[1]) |
|
|
|
|
|
for i, (coords, _) in enumerate(char_boxes): |
|
|
x1c, y1c, x2c, y2c = coords |
|
|
char_crop = plate_crop[y1c:y2c, x1c:x2c] |
|
|
char_pil = Image.fromarray(char_crop).convert("RGB") |
|
|
|
|
|
try: |
|
|
inputs = trocr_processor(images=char_pil, return_tensors="pt").pixel_values |
|
|
generated_ids = trocr_model.generate(inputs) |
|
|
predicted_char = trocr_processor.batch_decode(generated_ids, skip_special_tokens=True)[0].strip() |
|
|
shared_results["trocr_char_list"].append(predicted_char) |
|
|
except Exception as e: |
|
|
shared_results["trocr_char_list"].append("?") |
|
|
|
|
|
cv2.rectangle(plate_for_char_draw, (x1c, y1c), (x2c, y2c), (255, 0, 255), 1) |
|
|
cv2.putText(plate_for_char_draw, predicted_char, (x1c, y1c - 5), |
|
|
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 0, 255), 1) |
|
|
|
|
|
shared_results["plate_with_chars_img"] = Image.fromarray(plate_for_char_draw) |
|
|
shared_results["trocr_combined_text"] = ''.join(shared_results["trocr_char_list"]) |
|
|
break |
|
|
|
|
|
|
|
|
img_with_boxes = draw_detection_boxes(shared_results["img_rgb"]) |
|
|
shared_results["img_draw"] = img_with_boxes |
|
|
|
|
|
return ( |
|
|
Image.fromarray(img_with_boxes), |
|
|
shared_results["plate_crop_img"], |
|
|
shared_results["plate_with_chars_img"], |
|
|
shared_results["trocr_char_list"] |
|
|
) |
|
|
|
|
|
def is_empty_plate(cropped_plate_image): |
|
|
"""Détecte si la plaque est visuellement vide (espace blanc)""" |
|
|
if cropped_plate_image is None: |
|
|
return True |
|
|
|
|
|
|
|
|
if isinstance(cropped_plate_image, Image.Image): |
|
|
plate_img = np.array(cropped_plate_image) |
|
|
else: |
|
|
plate_img = cropped_plate_image |
|
|
|
|
|
|
|
|
gray = cv2.cvtColor(plate_img, cv2.COLOR_RGB2GRAY) |
|
|
|
|
|
|
|
|
_, thresholded = cv2.threshold(gray, 220, 255, cv2.THRESH_BINARY_INV) |
|
|
|
|
|
|
|
|
non_white_pixels = cv2.countNonZero(thresholded) |
|
|
|
|
|
|
|
|
total_pixels = gray.shape[0] * gray.shape[1] |
|
|
return non_white_pixels < (0.01 * total_pixels) |
|
|
|
|
|
|
|
|
def classify_plate_number(): |
|
|
"""Classifier le numéro de plaque détecté uniquement si elle est algérienne""" |
|
|
if not shared_results["trocr_combined_text"]: |
|
|
return "No plate text to classify", "", "❌ No plate detected", "" |
|
|
|
|
|
text = shared_results["trocr_combined_text"] |
|
|
|
|
|
if not is_algerian_plate(text): |
|
|
return "Non-Algerian license plate detected", "Type not detected", "❌ Non-Algerian", "" |
|
|
|
|
|
classified_plate = classify_plate(text) |
|
|
if classified_plate: |
|
|
shared_results["classified_plate"] = classified_plate |
|
|
|
|
|
shared_results["classification_result"] = f"Plate: {classified_plate['matricule_complet']}\n" |
|
|
shared_results["classification_result"] += f"Wilaya: {classified_plate['wilaya'][1]} ({classified_plate['wilaya'][0]})\n" |
|
|
shared_results["classification_result"] += f"Year: {classified_plate['annee']}\n" |
|
|
shared_results["classification_result"] += f"Category: {classified_plate['categorie'][1]} ({classified_plate['categorie'][0]})\n" |
|
|
shared_results["classification_result"] += f"Serie: {classified_plate['serie']}\n" |
|
|
|
|
|
shared_results["vehicle_type"] = classified_plate['categorie'][1] |
|
|
|
|
|
save_complete_results( |
|
|
plate_info=classified_plate, |
|
|
color=shared_results["label_color"], |
|
|
model=shared_results["vehicle_model"], |
|
|
orientation=shared_results["label_orientation"], |
|
|
vehicle_type=shared_results["vehicle_type"], |
|
|
brand=shared_results["vehicle_brand"] |
|
|
) |
|
|
|
|
|
return ( |
|
|
shared_results["classification_result"], |
|
|
f"Type: {shared_results['vehicle_type']}" if shared_results['vehicle_type'] else "Type not detected", |
|
|
"✅ Algerian plate", |
|
|
"Classification successful" |
|
|
) |
|
|
else: |
|
|
return "Unable to classify the plate", "Type not detected", "❌ Invalid plate", "" |
|
|
|
|
|
def next_frame(): |
|
|
"""Passer au frame suivant dans une vidéo""" |
|
|
if not shared_results["video_processing"] or not shared_results["video_path"]: |
|
|
return ( |
|
|
"No video being processed", |
|
|
None, |
|
|
None, |
|
|
None, |
|
|
None, |
|
|
None, |
|
|
None, |
|
|
None, |
|
|
None |
|
|
) |
|
|
|
|
|
cap = cv2.VideoCapture(shared_results["video_path"]) |
|
|
if not cap.isOpened(): |
|
|
return ( |
|
|
"Video playback error", |
|
|
None, None, None, None, None, None, None, None |
|
|
) |
|
|
|
|
|
|
|
|
shared_results["frame_count"] += 1 |
|
|
cap.set(cv2.CAP_PROP_POS_FRAMES, shared_results["frame_count"]) |
|
|
success, frame = cap.read() |
|
|
cap.release() |
|
|
|
|
|
if not success: |
|
|
|
|
|
shared_results["frame_count"] = 0 |
|
|
cap = cv2.VideoCapture(shared_results["video_path"]) |
|
|
success, frame = cap.read() |
|
|
cap.release() |
|
|
if not success: |
|
|
return ( |
|
|
"Error reading first frame", |
|
|
None, None, None, None, None, None, None, None |
|
|
) |
|
|
|
|
|
|
|
|
frame = cv2.resize(frame, shared_results["original_video_dimensions"]) |
|
|
|
|
|
|
|
|
img_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) |
|
|
img_draw = img_rgb.copy() |
|
|
|
|
|
|
|
|
shared_results.update({ |
|
|
"original_image": frame, |
|
|
"img_rgb": img_rgb, |
|
|
"img_draw": img_draw, |
|
|
"current_frame": frame, |
|
|
"corrected_orientation": False, |
|
|
"label_color": "", |
|
|
"label_orientation": "", |
|
|
"vehicle_type": "", |
|
|
"vehicle_model": "", |
|
|
"vehicle_brand": "", |
|
|
"logo_recognition_results": [], |
|
|
"trocr_char_list": [], |
|
|
"trocr_combined_text": "", |
|
|
"classification_result": "", |
|
|
"vehicle_box": None, |
|
|
"vehicle_detected": False, |
|
|
"detection_boxes": { |
|
|
"plate": None, |
|
|
"logo": None, |
|
|
"color": None, |
|
|
"orientation": None |
|
|
}, |
|
|
"plate_crop_img": None, |
|
|
"logo_crop_img": None, |
|
|
"plate_with_chars_img": None |
|
|
}) |
|
|
|
|
|
|
|
|
return ( |
|
|
Image.fromarray(img_rgb), |
|
|
f"Frame {shared_results['frame_count']}/{shared_results['total_frames']} - Ready for analysis", |
|
|
None, |
|
|
None, |
|
|
None, |
|
|
None, |
|
|
None, |
|
|
None |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
TIME_PATTERN = re.compile(r'^([01]?[0-9]|2[0-3]):[0-5][0-9]-([01]?[0-9]|2[0-3]):[0-5][0-9]$') |
|
|
|
|
|
def init_database(): |
|
|
"""Initialiser la base de données SQLite""" |
|
|
try: |
|
|
conn = sqlite3.connect('/content/drive/MyDrive/vehicle_database.db') |
|
|
cursor = conn.cursor() |
|
|
|
|
|
|
|
|
cursor.execute(''' |
|
|
CREATE TABLE IF NOT EXISTS vehicles ( |
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT, |
|
|
plate_number TEXT UNIQUE NOT NULL, |
|
|
brand TEXT, |
|
|
model TEXT, |
|
|
color TEXT, |
|
|
orientation TEXT, |
|
|
vehicle_type TEXT, |
|
|
access_status TEXT, |
|
|
time_slot TEXT, |
|
|
registration_date TEXT, |
|
|
last_access_date TEXT |
|
|
) |
|
|
''') |
|
|
|
|
|
conn.commit() |
|
|
return True |
|
|
except Error as e: |
|
|
print(f"Database error: {e}") |
|
|
return False |
|
|
finally: |
|
|
if conn: |
|
|
conn.close() |
|
|
|
|
|
def save_vehicle(plate_info, color, model, brand, status, time_slot): |
|
|
"""Enregistrer un véhicule dans la base de données""" |
|
|
try: |
|
|
conn = sqlite3.connect('vehicle_database.db') |
|
|
cursor = conn.cursor() |
|
|
|
|
|
|
|
|
cursor.execute('SELECT plate_number FROM vehicles WHERE plate_number = ?', |
|
|
(plate_info['matricule_complet'],)) |
|
|
exists = cursor.fetchone() |
|
|
|
|
|
current_date = datetime.now().strftime('%Y-%m-%d %H:%M:%S') |
|
|
|
|
|
if exists: |
|
|
|
|
|
cursor.execute(''' |
|
|
UPDATE vehicles SET |
|
|
brand = ?, |
|
|
model = ?, |
|
|
color = ?, |
|
|
orientation = ?, |
|
|
vehicle_type = ?, |
|
|
access_status = ?, |
|
|
time_slot = ?, |
|
|
last_access_date = ? |
|
|
WHERE plate_number = ? |
|
|
''', ( |
|
|
brand, |
|
|
model, |
|
|
color, |
|
|
shared_results.get("label_orientation", "Unknown"), |
|
|
plate_info['categorie'][1], |
|
|
status, |
|
|
time_slot, |
|
|
current_date, |
|
|
plate_info['matricule_complet'] |
|
|
)) |
|
|
else: |
|
|
|
|
|
cursor.execute(''' |
|
|
INSERT INTO vehicles ( |
|
|
plate_number, brand, model, color, orientation, |
|
|
vehicle_type, access_status, time_slot, registration_date, last_access_date |
|
|
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) |
|
|
''', ( |
|
|
plate_info['matricule_complet'], |
|
|
brand, |
|
|
model, |
|
|
color, |
|
|
shared_results.get("label_orientation", "Unknown"), |
|
|
plate_info['categorie'][1], |
|
|
status, |
|
|
time_slot, |
|
|
current_date, |
|
|
current_date |
|
|
)) |
|
|
|
|
|
conn.commit() |
|
|
return True, "Vehicle information saved successfully" |
|
|
except Error as e: |
|
|
return False, f"Database error: {e}" |
|
|
finally: |
|
|
if conn: |
|
|
conn.close() |
|
|
|
|
|
def check_vehicle(plate_number): |
|
|
"""Vérifier si un véhicule existe dans la base""" |
|
|
try: |
|
|
conn = sqlite3.connect('vehicle_database.db') |
|
|
cursor = conn.cursor() |
|
|
|
|
|
cursor.execute(''' |
|
|
SELECT plate_number, brand, model, access_status, time_slot |
|
|
FROM vehicles WHERE plate_number = ? |
|
|
''', (plate_number,)) |
|
|
|
|
|
vehicle = cursor.fetchone() |
|
|
|
|
|
if vehicle: |
|
|
return True, f"Vehicle found:\nPlate: {vehicle[0]}\nBrand: {vehicle[1]}\nModel: {vehicle[2]}" |
|
|
return False, "This vehicle is not registered" |
|
|
except Error as e: |
|
|
return False, f"Database error: {e}" |
|
|
finally: |
|
|
if conn: |
|
|
conn.close() |
|
|
|
|
|
def is_access_allowed(plate_number): |
|
|
"""Vérifier si l'accès est autorisé pour ce véhicule""" |
|
|
try: |
|
|
conn = sqlite3.connect('vehicle_database.db') |
|
|
cursor = conn.cursor() |
|
|
|
|
|
cursor.execute(''' |
|
|
SELECT access_status, time_slot FROM vehicles WHERE plate_number = ? |
|
|
''', (plate_number,)) |
|
|
|
|
|
result = cursor.fetchone() |
|
|
|
|
|
if not result: |
|
|
return False |
|
|
|
|
|
status, time_slot = result |
|
|
|
|
|
|
|
|
if status != "Authorized": |
|
|
return False |
|
|
|
|
|
|
|
|
if time_slot and time_slot != "24/24": |
|
|
if time_slot == "Custom...": |
|
|
|
|
|
return False |
|
|
|
|
|
current_time = datetime.now().time() |
|
|
|
|
|
if "-" in time_slot: |
|
|
start_str, end_str = time_slot.split("-") |
|
|
start_time = datetime.strptime(start_str.strip(), "%H:%M").time() |
|
|
end_time = datetime.strptime(end_str.strip(), "%H:%M").time() |
|
|
|
|
|
if start_time <= current_time <= end_time: |
|
|
return True |
|
|
return False |
|
|
|
|
|
return True |
|
|
except Error as e: |
|
|
print(f"Access check error: {e}") |
|
|
return False |
|
|
finally: |
|
|
if conn: |
|
|
conn.close() |
|
|
|
|
|
def get_all_vehicles(): |
|
|
"""Récupérer tous les véhicules enregistrés""" |
|
|
try: |
|
|
conn = sqlite3.connect('vehicle_database.db') |
|
|
cursor = conn.cursor() |
|
|
|
|
|
cursor.execute(''' |
|
|
SELECT |
|
|
plate_number, brand, model, color, orientation, |
|
|
vehicle_type, access_status, time_slot, registration_date |
|
|
FROM vehicles |
|
|
ORDER BY registration_date DESC |
|
|
''') |
|
|
|
|
|
columns = [description[0] for description in cursor.description] |
|
|
vehicles = cursor.fetchall() |
|
|
|
|
|
return columns, vehicles |
|
|
except Error as e: |
|
|
print(f"Database error: {e}") |
|
|
return [], [] |
|
|
finally: |
|
|
if conn: |
|
|
conn.close() |
|
|
|
|
|
def export_database(): |
|
|
"""Exporter toute la base de données dans un fichier SQL""" |
|
|
try: |
|
|
|
|
|
with tempfile.NamedTemporaryFile(suffix=".sql", delete=False) as tmp: |
|
|
|
|
|
conn = sqlite3.connect('vehicle_database.db') |
|
|
with open(tmp.name, 'w') as f: |
|
|
for line in conn.iterdump(): |
|
|
f.write(f'{line}\n') |
|
|
conn.close() |
|
|
return gr.File(value=tmp.name, visible=True) |
|
|
except Exception as e: |
|
|
print(f"Export error: {e}") |
|
|
return gr.File(visible=False) |
|
|
|
|
|
def init_database(): |
|
|
"""Initialiser la base de données SQLite de manière robuste""" |
|
|
conn = None |
|
|
try: |
|
|
conn = sqlite3.connect('vehicle_database.db') |
|
|
cursor = conn.cursor() |
|
|
|
|
|
|
|
|
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='vehicles'") |
|
|
if not cursor.fetchone(): |
|
|
|
|
|
cursor.execute(''' |
|
|
CREATE TABLE vehicles ( |
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT, |
|
|
plate_number TEXT UNIQUE NOT NULL, |
|
|
brand TEXT, |
|
|
model TEXT, |
|
|
color TEXT, |
|
|
orientation TEXT, |
|
|
vehicle_type TEXT, |
|
|
access_status TEXT, |
|
|
time_slot TEXT, |
|
|
registration_date TEXT, |
|
|
last_access_date TEXT |
|
|
) |
|
|
''') |
|
|
conn.commit() |
|
|
print("✅ Table 'vehicles' créée avec succès") |
|
|
return True |
|
|
except Error as e: |
|
|
print(f"❌ Erreur base de données: {e}") |
|
|
return False |
|
|
finally: |
|
|
if conn: |
|
|
conn.close() |
|
|
def process_video_frame(frame): |
|
|
"""Traiter un frame vidéo avec toutes les détections""" |
|
|
|
|
|
shared_results["img_rgb"] = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) |
|
|
shared_results["img_draw"] = shared_results["img_rgb"].copy() |
|
|
|
|
|
|
|
|
detect_vehicle() |
|
|
detect_color() |
|
|
detect_orientation() |
|
|
detect_logo_and_model() |
|
|
detect_plate() |
|
|
|
|
|
|
|
|
return shared_results["img_draw"] |
|
|
|
|
|
def save_modified_video(): |
|
|
"""Sauvegarder la vidéo annotée avec toutes les détections""" |
|
|
if not shared_results.get("video_path"): |
|
|
raise gr.Error("Aucune vidéo chargée") |
|
|
|
|
|
|
|
|
cap = cv2.VideoCapture(shared_results["video_path"]) |
|
|
if not cap.isOpened(): |
|
|
raise gr.Error("Impossible de lire la vidéo source") |
|
|
|
|
|
fps = cap.get(cv2.CAP_PROP_FPS) |
|
|
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) |
|
|
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) |
|
|
|
|
|
|
|
|
temp_dir = tempfile.gettempdir() |
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
|
|
output_path = os.path.join(temp_dir, f"annotated_{timestamp}.mp4") |
|
|
|
|
|
fourcc = cv2.VideoWriter_fourcc(*'mp4v') |
|
|
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height)) |
|
|
|
|
|
frame_count = 0 |
|
|
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
|
|
progress = gr.Progress() |
|
|
|
|
|
try: |
|
|
while True: |
|
|
ret, frame = cap.read() |
|
|
if not ret: |
|
|
break |
|
|
|
|
|
progress(frame_count / total_frames, f"Traitement du frame {frame_count}/{total_frames}") |
|
|
|
|
|
|
|
|
if frame_count in shared_results.get("modified_frames", {}): |
|
|
annotated_frame = np.array(shared_results["modified_frames"][frame_count]) |
|
|
else: |
|
|
|
|
|
shared_results["img_rgb"] = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) |
|
|
shared_results["img_draw"] = shared_results["img_rgb"].copy() |
|
|
shared_results["frame_count"] = frame_count |
|
|
|
|
|
|
|
|
detect_vehicle() |
|
|
detect_color() |
|
|
detect_orientation() |
|
|
detect_logo_and_model() |
|
|
detect_plate() |
|
|
|
|
|
annotated_frame = shared_results["img_draw"] |
|
|
|
|
|
|
|
|
out.write(cv2.cvtColor(annotated_frame, cv2.COLOR_RGB2BGR)) |
|
|
frame_count += 1 |
|
|
|
|
|
except Exception as e: |
|
|
raise gr.Error(f"Erreur lors de la sauvegarde: {str(e)}") |
|
|
finally: |
|
|
cap.release() |
|
|
out.release() |
|
|
|
|
|
|
|
|
if not os.path.exists(output_path): |
|
|
raise gr.Error("Échec de la création de la vidéo") |
|
|
|
|
|
return output_path |
|
|
|
|
|
def process_and_save_video(): |
|
|
"""Traiter et sauvegarder la vidéo annotée""" |
|
|
if not shared_results.get("video_path"): |
|
|
raise gr.Error("Aucune vidéo chargée") |
|
|
|
|
|
|
|
|
output_path = tempfile.NamedTemporaryFile(suffix=".mp4", delete=False).name |
|
|
|
|
|
cap = cv2.VideoCapture(shared_results["video_path"]) |
|
|
fps = cap.get(cv2.CAP_PROP_FPS) |
|
|
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) |
|
|
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) |
|
|
|
|
|
fourcc = cv2.VideoWriter_fourcc(*'mp4v') |
|
|
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height)) |
|
|
|
|
|
frame_count = 0 |
|
|
while True: |
|
|
ret, frame = cap.read() |
|
|
if not ret: |
|
|
break |
|
|
|
|
|
|
|
|
if frame_count in shared_results.get("modified_frames", {}): |
|
|
annotated_frame = np.array(shared_results["modified_frames"][frame_count]) |
|
|
out.write(cv2.cvtColor(annotated_frame, cv2.COLOR_RGB2BGR)) |
|
|
else: |
|
|
out.write(frame) |
|
|
|
|
|
frame_count += 1 |
|
|
|
|
|
cap.release() |
|
|
out.release() |
|
|
|
|
|
return output_path |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
with gr.Blocks(title="🚗 Système de Reconnaissance de Véhicules Algériens", theme="soft") as demo: |
|
|
|
|
|
with gr.Tab("Accueil"): |
|
|
with gr.Column(): |
|
|
|
|
|
gr.Markdown("# 🚗 An Intelligent Vehicle Recognition System for Access Control in Algeria") |
|
|
gr.Markdown(""" |
|
|
** 🚗 OPENIVRS : Advanced solution for the detection and identification of Algerian vehicles.** |
|
|
*Technologies used: YOLO, CNN, TrOCR, and image processing.* |
|
|
""") |
|
|
|
|
|
|
|
|
with gr.Row(): |
|
|
|
|
|
with gr.Column(scale=1): |
|
|
welcome_img = gr.Image( |
|
|
value="/content/drive/MyDrive/system.png", |
|
|
label="Illustration of the system", |
|
|
interactive=False |
|
|
) |
|
|
|
|
|
|
|
|
with gr.Column(scale=1): |
|
|
gr.Markdown(""" |
|
|
### 🔧 Key Features: |
|
|
- 🚘 Algerian license plate detection. |
|
|
- 🚗🔤 Vehicle make and model recognition. |
|
|
- 🎨🧭 Color classification and orientation. |
|
|
- 🗄️🔐 Access management via database. |
|
|
- 📤📊 Data export for analysis. |
|
|
""") |
|
|
|
|
|
|
|
|
with gr.Tab("Vehicle Detection", id="detection"): |
|
|
gr.Markdown("# 🚗 Vehicle Detection and Recognition") |
|
|
gr.Markdown("Analyze Vehicle Characteristics from Images") |
|
|
|
|
|
with gr.Row(): |
|
|
with gr.Column(): |
|
|
input_type = gr.Radio(["Image", "Video"], label="Entry type", value="Image", interactive=True) |
|
|
file_input = gr.File(label="Drop an Image Here - or - Click to Upload", |
|
|
file_types=["image", "video"]) |
|
|
load_btn = gr.Button("Upload Image", variant="primary") |
|
|
|
|
|
|
|
|
video_player = gr.Video( |
|
|
visible=False, |
|
|
label="Aperçu vidéo", |
|
|
interactive=False, |
|
|
height=150 |
|
|
) |
|
|
|
|
|
frame_gallery = gr.Gallery(visible=False, label="Select a frame", columns=4) |
|
|
frame_slider = gr.Slider(visible=False, interactive=True, label="Selected frame") |
|
|
load_frame_btn = gr.Button(visible=False, value="Load the selected frame", variant="secondary") |
|
|
|
|
|
with gr.Row(): |
|
|
detect_vehicle_btn = gr.Button("Vehicle Detection", variant="secondary") |
|
|
detect_color_btn = gr.Button("Color Detection", variant="secondary") |
|
|
|
|
|
with gr.Row(): |
|
|
detect_orientation_btn = gr.Button("Orientation Detection", variant="secondary") |
|
|
detect_logo_btn = gr.Button("Brand and Model", variant="secondary") |
|
|
|
|
|
with gr.Row(): |
|
|
detect_plate_btn = gr.Button("License Plate Detection", variant="secondary") |
|
|
classify_plate_btn = gr.Button("Classify License Plate", variant="secondary") |
|
|
|
|
|
with gr.Row(): |
|
|
next_frame_btn = gr.Button("Next Frame", visible=False) |
|
|
save_video_btn = gr.Button("Save Video", visible=True, variant="primary") |
|
|
|
|
|
with gr.Row(): |
|
|
saved_video = gr.Video(label="annotated video saved", visible= True, interactive=False) |
|
|
|
|
|
|
|
|
|
|
|
with gr.Column(): |
|
|
original_image = gr.Image(label="Original Image") |
|
|
processed_image = gr.Image(label="Annotated Image") |
|
|
status_output = gr.Textbox(label="Statuts") |
|
|
|
|
|
with gr.Tab("Vehicle"): |
|
|
vehicle_type_output = gr.Textbox(label="Type de véhicule") |
|
|
|
|
|
with gr.Tab("Color"): |
|
|
color_output = gr.Textbox(label="Color detection") |
|
|
|
|
|
with gr.Tab("Orientation"): |
|
|
orientation_output = gr.Textbox(label="Orientation detection") |
|
|
|
|
|
with gr.Tab("Brand & Model"): |
|
|
with gr.Column(): |
|
|
logo_output = gr.Textbox(label="Brand detection") |
|
|
model_output = gr.Textbox(label="model recognition") |
|
|
logo_image = gr.Image(label="detected logo") |
|
|
|
|
|
with gr.Tab("Plate"): |
|
|
with gr.Column(): |
|
|
plate_image = gr.Image(label="Detected Plate") |
|
|
plate_chars_image = gr.Image(label="plate with characters") |
|
|
plate_chars_list = gr.Textbox(label="Detected characters") |
|
|
|
|
|
with gr.Tab("Classification"): |
|
|
with gr.Column(): |
|
|
plate_classification = gr.Textbox(label="Plate Details") |
|
|
vehicle_type_output = gr.Textbox(label="Type de véhicule") |
|
|
with gr.Row(): |
|
|
algerian_check_output = gr.Textbox(label="Origine", scale=2) |
|
|
action_output = gr.Textbox(label="Action recommandée", scale=3) |
|
|
|
|
|
|
|
|
|
|
|
with gr.Tab("Access Management", id="access"): |
|
|
with gr.Column(): |
|
|
check_btn = gr.Button("🔍 Verify Vehicle", variant="primary") |
|
|
save_btn = gr.Button("💾 Register", interactive=False, variant="primary") |
|
|
|
|
|
with gr.Row(visible=False) as access_form: |
|
|
with gr.Column(): |
|
|
access_status = gr.Radio( |
|
|
["Authorized", "Not Authorized"], |
|
|
label="Access Status" |
|
|
) |
|
|
time_range = gr.Dropdown( |
|
|
["24/24", "8:00-16:00", "9:00-17:00", "Custom..."], |
|
|
label="Time Slot" |
|
|
) |
|
|
custom_time = gr.Textbox( |
|
|
visible=False, |
|
|
placeholder="HH:MM-HH:MM", |
|
|
label="Enter Time Slot" |
|
|
) |
|
|
save_btn = gr.Button("Confirm Registration", variant="primary") |
|
|
|
|
|
access_output = gr.Textbox(label="Verification Result") |
|
|
|
|
|
|
|
|
with gr.Tab("Database", id="database"): |
|
|
with gr.Column(): |
|
|
with gr.Row(): |
|
|
refresh_db_btn = gr.Button("🔄 Refresh", variant="secondary") |
|
|
export_csv_btn = gr.Button("📤 Export CSV", variant="secondary") |
|
|
export_db_btn = gr.Button("💾 Exporter DB", variant="secondary") |
|
|
|
|
|
db_table = gr.Dataframe( |
|
|
headers=["Plaque ", "Marque", "Modèle", "Couleur", "Orientation", "Type", "Statut", "Plage horaire", "Date"], |
|
|
datatype=["str", "str", "str", "str", "str", "str", "str"], |
|
|
interactive=False, |
|
|
label="Registered Vehicles" |
|
|
) |
|
|
|
|
|
csv_output = gr.File(label="Exported File", visible=False) |
|
|
|
|
|
def update_input_visibility(input_type): |
|
|
if input_type == "Video": |
|
|
return gr.Button(visible=True) |
|
|
else: |
|
|
return gr.Button(visible=False) |
|
|
|
|
|
input_type.change( |
|
|
fn=update_input_visibility, |
|
|
inputs=input_type, |
|
|
outputs=next_frame_btn |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
def extract_video_frames(video_path, num_frames=12): |
|
|
"""Extraire plusieurs frames d'une vidéo pour la sélection""" |
|
|
cap = cv2.VideoCapture(video_path) |
|
|
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
|
|
frames = [] |
|
|
|
|
|
|
|
|
for i in range(num_frames): |
|
|
frame_pos = int(i * (total_frames / num_frames)) |
|
|
cap.set(cv2.CAP_PROP_POS_FRAMES, frame_pos) |
|
|
ret, frame = cap.read() |
|
|
if ret: |
|
|
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) |
|
|
frames.append((frame_pos, Image.fromarray(frame_rgb))) |
|
|
|
|
|
cap.release() |
|
|
return frames |
|
|
|
|
|
|
|
|
|
|
|
def process_load(input_type, files): |
|
|
if files is None: |
|
|
raise gr.Error("Veuillez sélectionner un fichier") |
|
|
|
|
|
file_path = files.name if hasattr(files, 'name') else files |
|
|
|
|
|
if input_type == "Image": |
|
|
if not file_path.lower().endswith(('.png', '.jpg', '.jpeg')): |
|
|
raise gr.Error("Veuillez sélectionner une image valide (PNG, JPG, JPEG)") |
|
|
return ( |
|
|
load_image(file_path), |
|
|
"Image chargée - Cliquez sur les boutons pour analyser", |
|
|
gr.Button(visible=False), |
|
|
gr.Gallery(visible=False), |
|
|
gr.Slider(visible=False), |
|
|
gr.Button(visible=False), |
|
|
gr.Video(visible=False) |
|
|
) |
|
|
else: |
|
|
if not file_path.lower().endswith(('.mp4', '.avi', '.mov')): |
|
|
raise gr.Error("Veuillez sélectionner une vidéo valide (MP4, AVI, MOV)") |
|
|
|
|
|
frames = extract_video_frames(file_path) |
|
|
shared_results["video_path"] = file_path |
|
|
shared_results["video_frames"] = frames |
|
|
|
|
|
return ( |
|
|
None, |
|
|
f"Vidéo chargée - {len(frames)} frames extraits", |
|
|
gr.Button(visible=True), |
|
|
gr.Gallery(visible=True, value=[(img, f"Frame {pos}") for pos, img in frames]), |
|
|
gr.Slider(visible=True, maximum=len(frames)-1, value=0, step=1, label="Frame sélectionné"), |
|
|
gr.Button(visible=True, value="Charger le frame sélectionné"), |
|
|
gr.Video(visible=True, value=file_path, height=150) |
|
|
) |
|
|
|
|
|
|
|
|
def load_selected_frame(selected_frame_idx): |
|
|
if not shared_results.get("video_frames"): |
|
|
raise gr.Error("No video loaded") |
|
|
|
|
|
frame_pos, frame_img = shared_results["video_frames"][selected_frame_idx] |
|
|
|
|
|
|
|
|
cap = cv2.VideoCapture(shared_results["video_path"]) |
|
|
cap.set(cv2.CAP_PROP_POS_FRAMES, frame_pos) |
|
|
ret, frame = cap.read() |
|
|
cap.release() |
|
|
|
|
|
if not ret: |
|
|
raise gr.Error("Error reading the selected frame") |
|
|
|
|
|
|
|
|
img_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) |
|
|
img_draw = img_rgb.copy() |
|
|
|
|
|
|
|
|
shared_results.update({ |
|
|
"original_image": frame, |
|
|
"img_rgb": img_rgb, |
|
|
"img_draw": img_draw, |
|
|
"current_frame": frame, |
|
|
"corrected_orientation": False, |
|
|
"frame_count": frame_pos, |
|
|
"video_processing": True |
|
|
}) |
|
|
|
|
|
return ( |
|
|
Image.fromarray(img_rgb), |
|
|
f"Frame {frame_pos} loaded - Ready for analysis", |
|
|
gr.Button(visible=True) |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
def toggle_time_range(choice): |
|
|
"""Afficher/masquer le champ personnalisé""" |
|
|
if choice == "Custom...": |
|
|
return gr.Textbox(visible=True) |
|
|
return gr.Textbox(visible=False) |
|
|
|
|
|
def verify_vehicle(): |
|
|
"""Vérifier l'existence du véhicule""" |
|
|
if not shared_results["trocr_combined_text"]: |
|
|
raise gr.Error("No License Plate Detected") |
|
|
|
|
|
plate_info = classify_plate(shared_results["trocr_combined_text"]) |
|
|
if not plate_info: |
|
|
raise gr.Error("Invalid License Plate") |
|
|
|
|
|
exists, message = check_vehicle(plate_info['matricule_complet']) |
|
|
|
|
|
if exists: |
|
|
allowed = "✅ ACCESS ALLOWED" if is_access_allowed(plate_info['matricule_complet']) else "❌ ACCESS DENIED" |
|
|
return { |
|
|
access_output: f"{message}\n{allowed}", |
|
|
access_form: gr.update(visible=False), |
|
|
save_btn: gr.update(interactive=False) |
|
|
} |
|
|
else: |
|
|
return { |
|
|
access_output: message, |
|
|
access_form: gr.update(visible=True), |
|
|
save_btn: gr.update(interactive=True) |
|
|
} |
|
|
|
|
|
def save_vehicle_info(status, time_choice, custom_time_input): |
|
|
"""Enregistrer les informations du véhicule""" |
|
|
if not shared_results.get("classified_plate"): |
|
|
raise gr.Error("No License Plate Information Available") |
|
|
|
|
|
plate_info = shared_results["classified_plate"] |
|
|
|
|
|
|
|
|
if time_choice == "Custom...": |
|
|
if not TIME_PATTERN.match(custom_time_input): |
|
|
raise gr.Error("Invalid Time Format Use HH:MM-HH:MM") |
|
|
time_range = custom_time_input |
|
|
else: |
|
|
time_range = time_choice |
|
|
|
|
|
|
|
|
brand = shared_results.get("vehicle_brand", "Unknown") |
|
|
model = shared_results.get("vehicle_model", "Unknown") |
|
|
|
|
|
|
|
|
success, message = save_vehicle( |
|
|
plate_info, |
|
|
shared_results.get("label_color", "Unknown"), |
|
|
model, |
|
|
brand, |
|
|
status, |
|
|
time_range |
|
|
) |
|
|
|
|
|
if not success: |
|
|
raise gr.Error(message) |
|
|
|
|
|
return { |
|
|
access_output: message, |
|
|
access_form: gr.update(visible=False), |
|
|
save_btn: gr.update(interactive=False) |
|
|
} |
|
|
|
|
|
def refresh_database(): |
|
|
"""Actualiser le tableau de la base de données""" |
|
|
columns, vehicles = get_all_vehicles() |
|
|
if vehicles: |
|
|
return gr.Dataframe(value=vehicles, headers=columns) |
|
|
raise gr.Error("No vehicles found or read error") |
|
|
|
|
|
def export_to_csv(): |
|
|
"""Exporter la base de données en CSV""" |
|
|
columns, vehicles = get_all_vehicles() |
|
|
if not vehicles: |
|
|
raise gr.Error("No vehicles to export") |
|
|
|
|
|
|
|
|
with tempfile.NamedTemporaryFile(suffix=".csv", delete=False) as tmp: |
|
|
with open(tmp.name, 'w', encoding='utf-8') as f: |
|
|
|
|
|
f.write(",".join(columns) + "\n") |
|
|
|
|
|
|
|
|
for vehicle in vehicles: |
|
|
f.write(",".join(str(v) if v is not None else "" for v in vehicle) + "\n") |
|
|
|
|
|
return gr.File(value=tmp.name, visible=True) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
load_btn.click( |
|
|
fn=process_load, |
|
|
inputs=[input_type, file_input], |
|
|
outputs=[original_image, status_output, next_frame_btn] |
|
|
) |
|
|
|
|
|
|
|
|
load_btn.click( |
|
|
fn=process_load, |
|
|
inputs=[input_type, file_input], |
|
|
outputs=[ |
|
|
original_image, |
|
|
status_output, |
|
|
next_frame_btn, |
|
|
frame_gallery, |
|
|
frame_slider, |
|
|
load_frame_btn, |
|
|
video_player |
|
|
] |
|
|
) |
|
|
|
|
|
load_frame_btn.click( |
|
|
fn=load_selected_frame, |
|
|
inputs=[frame_slider], |
|
|
outputs=[original_image, status_output, next_frame_btn] |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
detect_vehicle_btn.click( |
|
|
fn=detect_vehicle, |
|
|
outputs=[status_output, processed_image, vehicle_type_output] |
|
|
) |
|
|
|
|
|
detect_color_btn.click( |
|
|
fn=detect_color, |
|
|
outputs=[color_output, processed_image] |
|
|
) |
|
|
|
|
|
detect_orientation_btn.click( |
|
|
fn=detect_orientation, |
|
|
outputs=[orientation_output, processed_image] |
|
|
) |
|
|
|
|
|
detect_logo_btn.click( |
|
|
fn=detect_logo_and_model, |
|
|
outputs=[logo_output, model_output, logo_output, processed_image, logo_image] |
|
|
) |
|
|
|
|
|
detect_plate_btn.click( |
|
|
fn=detect_plate, |
|
|
outputs=[processed_image, plate_image, plate_chars_image, plate_chars_list] |
|
|
) |
|
|
|
|
|
classify_plate_btn.click( |
|
|
fn=classify_plate_number, |
|
|
outputs=[ |
|
|
plate_classification, |
|
|
vehicle_type_output, |
|
|
algerian_check_output, |
|
|
action_output |
|
|
] |
|
|
) |
|
|
|
|
|
|
|
|
next_frame_btn.click( |
|
|
fn=next_frame, |
|
|
outputs=[original_image, status_output, |
|
|
color_output, orientation_output, |
|
|
logo_output, model_output, |
|
|
plate_classification, vehicle_type_output] |
|
|
) |
|
|
save_video_btn.click( |
|
|
fn=process_and_save_video, |
|
|
outputs=saved_video |
|
|
) |
|
|
|
|
|
|
|
|
time_range.change( |
|
|
fn=toggle_time_range, |
|
|
inputs=time_range, |
|
|
outputs=custom_time |
|
|
) |
|
|
|
|
|
check_btn.click( |
|
|
fn=verify_vehicle, |
|
|
outputs=[access_output, access_form, save_btn] |
|
|
) |
|
|
|
|
|
save_btn.click( |
|
|
fn=save_vehicle_info, |
|
|
inputs=[access_status, time_range, custom_time], |
|
|
outputs=[access_output, access_form, save_btn] |
|
|
) |
|
|
|
|
|
|
|
|
refresh_db_btn.click( |
|
|
fn=refresh_database, |
|
|
outputs=db_table |
|
|
) |
|
|
|
|
|
export_csv_btn.click( |
|
|
fn=export_to_csv, |
|
|
outputs=csv_output |
|
|
) |
|
|
|
|
|
|
|
|
def load_initial_data(): |
|
|
init_database() |
|
|
columns, vehicles = get_all_vehicles() |
|
|
return vehicles if vehicles else [] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if not init_database(): |
|
|
print("Erreur lors de l'initialisation de la base de données") |
|
|
else: |
|
|
print("Base de données initialisée avec succès") |
|
|
|
|
|
if __name__ == "__main__": |
|
|
demo.launch() |