|
import numpy as np |
|
import pandas as pd |
|
import requests |
|
from io import StringIO |
|
from sklearn.feature_extraction.text import TfidfVectorizer |
|
from sklearn.metrics.pairwise import cosine_similarity |
|
import speech_recognition as sr |
|
import pyttsx3 |
|
from googlesearch import search |
|
from bs4 import BeautifulSoup |
|
import urllib.request |
|
from urllib.parse import quote |
|
|
|
class HybridChatBot: |
|
def __init__(self, dataset_url=None): |
|
self.dataset_url = dataset_url |
|
self.qa_pairs = {} |
|
self.vectorizer = TfidfVectorizer() |
|
self.X = None |
|
self.recognizer = sr.Recognizer() |
|
self.engine = pyttsx3.init() |
|
|
|
|
|
voices = self.engine.getProperty('voices') |
|
self.engine.setProperty('voice', voices[0].id) |
|
self.engine.setProperty('rate', 150) |
|
|
|
if dataset_url: |
|
self.load_dataset() |
|
self.train() |
|
|
|
def load_dataset(self): |
|
"""Load dataset from web resource""" |
|
try: |
|
response = requests.get(self.dataset_url) |
|
response.raise_for_status() |
|
|
|
if self.dataset_url.endswith('.csv'): |
|
data = pd.read_csv(StringIO(response.text)) |
|
elif self.dataset_url.endswith('.json'): |
|
data = pd.read_json(StringIO(response.text)) |
|
else: |
|
print("File format not supported") |
|
return |
|
|
|
for _, row in data.iterrows(): |
|
self.qa_pairs[row["question"].lower()] = row["answer"] |
|
|
|
print(f"Loaded {len(self.qa_pairs)} question-answer pairs") |
|
|
|
except Exception as e: |
|
print(f"Error loading dataset: {e}") |
|
|
|
def train(self): |
|
"""Train the model on loaded data""" |
|
if not self.qa_pairs: |
|
print("No data available for training!") |
|
return |
|
|
|
questions = list(self.qa_pairs.keys()) |
|
self.X = self.vectorizer.fit_transform(questions) |
|
print("Model trained on loaded data") |
|
|
|
def add_qa_pair(self, question, answer): |
|
"""Add new question-answer pair""" |
|
self.qa_pairs[question.lower()] = answer |
|
self.train() |
|
|
|
def web_search(self, query, num_results=3): |
|
"""Perform web search and extract information""" |
|
try: |
|
print(f"\nSearching the web: {query}") |
|
search_results = [] |
|
|
|
|
|
for url in search(query, num_results=num_results, lang='en'): |
|
try: |
|
|
|
req = urllib.request.Request(url, headers={'User-Agent': 'Mozilla/5.0'}) |
|
with urllib.request.urlopen(req, timeout=5) as response: |
|
html = response.read() |
|
|
|
|
|
soup = BeautifulSoup(html, 'html.parser') |
|
|
|
|
|
for script in soup(["script", "style", "iframe", "nav", "footer"]): |
|
script.extract() |
|
|
|
|
|
text = soup.get_text(separator=' ', strip=True) |
|
text = ' '.join(text.split()[:200]) |
|
|
|
search_results.append({ |
|
'url': url, |
|
'content': text |
|
}) |
|
|
|
except Exception as e: |
|
print(f"Error processing {url}: {e}") |
|
continue |
|
|
|
return search_results |
|
|
|
except Exception as e: |
|
print(f"Search error: {e}") |
|
return None |
|
|
|
def get_response(self, user_input): |
|
"""Get response to user input""" |
|
if not self.qa_pairs: |
|
return "I'm not trained yet. Please add questions and answers." |
|
|
|
|
|
if "search the web for" in user_input.lower() or "find online" in user_input.lower(): |
|
query = user_input.replace("search the web for", "").replace("find online", "").strip() |
|
search_results = self.web_search(query) |
|
if search_results: |
|
response = "Here's what I found online:\n" |
|
for i, result in enumerate(search_results, 1): |
|
response += f"\n{i}. {result['content']}\n(Source: {result['url']})\n" |
|
return response[:2000] |
|
else: |
|
return "Couldn't find any information online." |
|
|
|
|
|
user_vec = self.vectorizer.transform([user_input.lower()]) |
|
similarities = cosine_similarity(user_vec, self.X) |
|
best_match_idx = np.argmax(similarities) |
|
best_match_score = similarities[0, best_match_idx] |
|
|
|
if best_match_score > 0.5: |
|
best_question = list(self.qa_pairs.keys())[best_match_idx] |
|
return self.qa_pairs[best_question] |
|
else: |
|
return "I don't know the answer to this question. Would you like me to search online? (Say 'search the web for...')" |
|
|
|
def text_to_speech(self, text): |
|
"""Convert text to speech""" |
|
self.engine.say(text) |
|
self.engine.runAndWait() |
|
|
|
def speech_to_text(self): |
|
"""Convert speech from microphone to text""" |
|
with sr.Microphone() as source: |
|
print("\nSpeak now...") |
|
self.recognizer.adjust_for_ambient_noise(source) |
|
try: |
|
audio = self.recognizer.listen(source, timeout=5) |
|
text = self.recognizer.recognize_google(audio, language="en-US") |
|
print(f"Recognized: {text}") |
|
return text |
|
except sr.UnknownValueError: |
|
print("Speech not recognized") |
|
return None |
|
except sr.RequestError: |
|
print("Recognition service error") |
|
return None |
|
except sr.WaitTimeoutError: |
|
print("Timeout expired") |
|
return None |
|
|
|
def run(self): |
|
"""Improved interaction interface""" |
|
print("\n" + "="*50) |
|
print("WELCOME TO INTELLIGENT CHATBOT".center(50)) |
|
print("="*50) |
|
|
|
current_mode = "text" |
|
while True: |
|
print("\n" + "-"*50) |
|
print(f"Current input mode: {current_mode.upper()}") |
|
print("[1] Send text message") |
|
print("[2] Speak to the bot") |
|
print("[3] Switch input mode") |
|
print("[4] Teach the bot a new answer") |
|
print("[5] Web search") |
|
print("[6] Exit") |
|
|
|
try: |
|
choice = input("Choose action (1-6): ").strip() |
|
|
|
if choice == "1": |
|
user_input = input("\nYour message: ") |
|
if user_input.lower() in ["exit", "stop"]: |
|
break |
|
|
|
response = self.get_response(user_input) |
|
if response: |
|
print(f"\nBot: {response}") |
|
self.text_to_speech(response) |
|
else: |
|
print("\nBot: I don't know what to say. Would you like to teach me?") |
|
|
|
elif choice == "2": |
|
user_input = self.speech_to_text() |
|
if user_input: |
|
if user_input.lower() in ["exit", "stop"]: |
|
break |
|
|
|
response = self.get_response(user_input) |
|
if response: |
|
print(f"\nBot: {response}") |
|
self.text_to_speech(response) |
|
else: |
|
print("\nBot: I don't know how to respond to that.") |
|
self.text_to_speech("I don't know how to respond to that") |
|
|
|
elif choice == "3": |
|
current_mode = "voice" if current_mode == "text" else "text" |
|
print(f"\nMode changed to: {current_mode.upper()}") |
|
|
|
elif choice == "4": |
|
print("\nTeaching the bot:") |
|
question = input("Enter question: ") |
|
answer = input("Enter answer: ") |
|
self.add_qa_pair(question, answer) |
|
print("Bot successfully trained!") |
|
|
|
elif choice == "5": |
|
query = input("\nEnter search query: ") |
|
search_results = self.web_search(query) |
|
if search_results: |
|
print("\nSearch results:") |
|
for i, result in enumerate(search_results, 1): |
|
print(f"\n{i}. {result['content']}\n(Source: {result['url']})\n") |
|
else: |
|
print("\nNothing found.") |
|
|
|
elif choice == "6": |
|
print("\nShutting down...") |
|
break |
|
|
|
else: |
|
print("\nPlease choose an option between 1 and 6") |
|
|
|
except KeyboardInterrupt: |
|
print("\nShutting down...") |
|
break |
|
|
|
if __name__ == "__main__": |
|
|
|
DATASET_URL = "https://raw.githubusercontent.com/user/repo/main/qa_data.csv" |
|
|
|
bot = HybridChatBot(DATASET_URL) |
|
bot.run() |