kobkrit's picture
Upload folder using huggingface_hub
bf25e2b verified
#!/usr/bin/env python3
"""
Dataset Generator for BaZi Fortune Telling
Uses Google Gemini Pro 2.5 with thinking to generate high-quality training data
Supports multithreaded processing for faster generation
"""
import json
import random
import time
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple
import os
import sys
from concurrent.futures import ThreadPoolExecutor, as_completed
from threading import Lock
import threading
# Add parent directory to path to import modules
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from bazi_calculator import BaziCalculator, compute_birth_pillars
from bazi_favorable_elements import calculate_favorable_elements
# Import Google Gemini
try:
from google import genai
from google.genai import types
except ImportError:
print("Please install google-genai: pip install google-genai")
sys.exit(1)
# Gemini API Configuration
GEMINI_API_KEY = "AIzaSyCqe3vjvPlo1lt_hpQ4nqAC0-_1omva1oc"
os.environ["GEMINI_API_KEY"] = GEMINI_API_KEY
# Configuration for multithreading
MAX_WORKERS = 10 # Number of concurrent threads for API calls
RATE_LIMIT_DELAY = 0.5 # Delay between API calls to avoid rate limiting
# Thread-safe locks
file_lock = Lock()
progress_lock = Lock()
print_lock = Lock()
api_call_lock = Lock()
last_api_call_time = 0
# Global context cache for system prompt
context_cache = None
cache_lock = Lock()
def format_bazi_production_style(person: Dict, bazi_data: Dict) -> str:
"""Format BaZi data in production website style"""
lines = []
birth_date = bazi_data['birth_date']
pillars = bazi_data['birth_pillars']
fav_elems = bazi_data['favorable_elements']
future_data = bazi_data['future_data']
calculator = bazi_data['calculator']
# Current date for query
current_date = datetime.now()
# Header
lines.append("=== BaZi Data ===")
lines.append(f"Name: {person['name']}")
lines.append(f"Birth: {birth_date.strftime('%Y-%m-%d %H:%M:%S')}")
lines.append(f"Gender: {person['gender']}")
lines.append(f"Query: {current_date.strftime('%Y-%m-%d %H:%M')}")
# Calculate age
age = current_date.year - birth_date.year
if current_date.month < birth_date.month or (current_date.month == birth_date.month and current_date.day < birth_date.day):
age -= 1
lines.append(f"Age: {age}")
# Life Stem (Day Master)
day_stem = pillars['day'][0]
# Determine strength (simplified)
stem_strength = "Strong" if len(fav_elems['favorable']) > len(fav_elems['unfavorable']) else "Weak"
lines.append(f"Life Stem: {day_stem} ({stem_strength})")
# Favorable/Unfavorable elements
lines.append(f"Favorable: {', '.join(fav_elems['favorable'])}")
lines.append(f"Unfavorable: {', '.join(fav_elems['unfavorable'])}")
# Four Pillars with hidden stems
lines.append("\n=== Four Pillars ===")
# Get hidden stems
from bazi_calculator import BaziCalculator
calc_temp = BaziCalculator(birth_date, person['gender'].lower(), pillars)
# Year pillar
year_hidden = calc_temp.BRANCH_HIDDEN_STEMS.get(pillars['year'][1], [])
lines.append(f"Year : {pillars['year'][0]}{pillars['year'][1]} (hidden: {', '.join(year_hidden)})")
# Month pillar
month_hidden = calc_temp.BRANCH_HIDDEN_STEMS.get(pillars['month'][1], [])
lines.append(f"Month: {pillars['month'][0]}{pillars['month'][1]} (hidden: {', '.join(month_hidden)})")
# Day pillar
day_hidden = calc_temp.BRANCH_HIDDEN_STEMS.get(pillars['day'][1], [])
lines.append(f"Day : {pillars['day'][0]}{pillars['day'][1]} (hidden: {', '.join(day_hidden)})")
# Hour pillar
hour_hidden = calc_temp.BRANCH_HIDDEN_STEMS.get(pillars['hour'][1], [])
lines.append(f"Hour : {pillars['hour'][0]}{pillars['hour'][1]} (hidden: {', '.join(hour_hidden)})")
# Current Luck Pillar
lines.append("\n=== Current Luck Pillar ===")
for lp in future_data['luck_pillars']:
if lp['start_age'] <= age <= lp['end_age']:
lines.append(f"Age {age}: {lp['heaven']}{lp['earth']}")
break
# Current Period
lines.append("\n=== Current Period ===")
# Annual
for ap in future_data['annual_pillars']:
if ap['year'] == current_date.year:
lines.append(f"Annual: {ap['heaven']}{ap['earth']}")
break
# Monthly
current_month_key = f"{current_date.year}-{current_date.month:02d}"
if current_month_key in future_data['monthly_pillars']:
mp = future_data['monthly_pillars'][current_month_key]
lines.append(f"Monthly: {mp['heaven']}{mp['earth']}")
# Daily
current_day_key = current_date.strftime("%Y-%m-%d")
if current_day_key in future_data['daily_pillars']:
dp = future_data['daily_pillars'][current_day_key]
lines.append(f"Daily: {dp['heaven']}{dp['earth']}")
# Hourly
current_hour = current_date.hour
# Map to Chinese hour
if current_hour == 0:
chinese_hour = 23
elif current_hour % 2 == 0:
chinese_hour = current_hour - 1
else:
chinese_hour = current_hour
if chinese_hour in future_data['hourly_pillars']:
hp = future_data['hourly_pillars'][chinese_hour]
lines.append(f"Hourly: {hp['heaven']}{hp['earth']}")
# Future 100 Years
lines.append("\n=== Future 100 Years ===")
# Luck Pillars
lines.append("\n[Luck Pillars]")
for lp in future_data['luck_pillars']:
lines.append(f"Age {lp['start_age']}-{lp['end_age']}: {lp['heaven']}{lp['earth']}")
# Annual Pillars - ALL 100 years
lines.append("\n[Annual Pillars]")
for ap in future_data['annual_pillars']:
lines.append(f"{ap['year']}: {ap['heaven']}{ap['earth']}")
# Monthly Pillars - Show many months
lines.append("\n[Monthly Pillars]")
monthly_keys = sorted(list(future_data['monthly_pillars'].keys()))
for key in monthly_keys:
mp = future_data['monthly_pillars'][key]
lines.append(f"{key}: {mp['heaven']}{mp['earth']}")
# Daily Pillars - Show many days
lines.append("\n[Daily Pillars]")
daily_keys = sorted(list(future_data['daily_pillars'].keys()))
for key in daily_keys:
dp = future_data['daily_pillars'][key]
lines.append(f"{key}: {dp['heaven']}{dp['earth']}")
lines.append("\n[Loaded with AiMing v3]")
return '\n'.join(lines)
# Load system prompt
def load_system_prompt() -> str:
"""Load the system prompt from prompt-v3.txt"""
script_dir = os.path.dirname(os.path.abspath(__file__))
parent_dir = os.path.dirname(script_dir)
prompt_path = os.path.join(parent_dir, 'prompt-v3.txt')
with open(prompt_path, 'r', encoding='utf-8') as f:
return f.read()
def generate_random_birthdays(count: int = 10) -> List[Dict]:
"""Generate random birthdays with varied demographics
Args:
count: Number of random birthdays to generate
"""
birthdays = []
# Define name pools - expanded for more variety
male_names = ["สมชาย", "วิชัย", "ประยุทธ์", "สมศักดิ์", "วีระ", "ณัฐวุฒิ", "ธนากร", "พงษ์ศักดิ์", "อภิชาติ", "จักรพันธ์",
"สุรชัย", "ประสิทธิ์", "สมพงษ์", "วิทยา", "นิพนธ์", "สุทธิพงษ์", "อนันต์", "ชัยวัฒน์", "ธีระ", "พิชัย"]
female_names = ["สมหญิง", "มาลี", "สุดาพร", "วิไลวรรณ", "พรทิพย์", "นันทนา", "จิราภรณ์", "ศิริพร", "อรวรรณ", "ปิยะนุช",
"วรรณา", "สุภาพร", "รัตนา", "อารีย์", "ปาริชาติ", "สุนิสา", "กาญจนา", "ดวงใจ", "ชนิดา", "พิมพ์ใจ"]
# Generate diverse birth years (1960-2005)
# If count > 46 (years available), allow duplicates
year_range = list(range(1960, 2006))
if count <= len(year_range):
years = random.sample(year_range, count)
else:
# For counts > 46, we allow some duplicate years
years = random.choices(year_range, k=count)
for i in range(count):
gender = random.choice(["Male", "Female"])
name = random.choice(male_names if gender == "Male" else female_names)
# Random month and day
month = random.randint(1, 12)
if month in [1, 3, 5, 7, 8, 10, 12]:
day = random.randint(1, 31)
elif month in [4, 6, 9, 11]:
day = random.randint(1, 30)
else: # February
day = random.randint(1, 28)
# Random time (distributed across all 12 Chinese hours)
hour = random.randint(0, 23)
minute = random.randint(0, 59)
birthdays.append({
"name": f"{name}_{i+1}",
"gender": gender,
"year": years[i],
"month": month,
"day": day,
"hour": hour,
"minute": minute,
"is_twin": False
})
return birthdays
def generate_questions() -> List[str]:
"""Generate 10 diverse questions for BaZi consultation"""
question_templates = [
# General fortune (expanded)
"ดวงนี้เป็นอย่างไร",
"ช่วงไหนในชีวิตจะประสบความสำเร็จ",
"มีอุปสรรคอะไรที่ต้องระวังบ้าง",
"ดวงชะตาโดยรวมปีนี้เป็นอย่างไร",
"ชีวิตโดยรวมจะเป็นอย่างไร",
"อนาคตของฉันจะเป็นอย่างไร",
"ปีนี้จะเป็นปีที่ดีไหม",
"ช่วงชีวิตไหนจะเจอปัญหามาก",
"จะมีชีวิตที่ดีขึ้นเมื่อไหร่",
"ดวงชะตาในระยะยาวเป็นอย่างไร",
"ชีวิตจะมีความสุขไหม",
"จะพบเจอสิ่งดีๆ เมื่อไหร่",
"มีเคราะห์อะไรรออยู่ไหม",
"ดวงจะดีขึ้นเมื่อไหร่",
"ชีวิตจะราบรื่นไหม",
"ปีไหนจะเป็นปีทอง",
"จะมีโชคเมื่อไหร่",
"ควรระวังอะไรในชีวิต",
"จะมีอะไรเปลี่ยนแปลงไหม",
"ดวงชะตาตอนแก่เป็นอย่างไร",
# Career questions (expanded)
"ช่วงนี้งานการเป็นอย่างไรบ้าง มีโอกาสก้าวหน้าไหม",
"อยากเปลี่ยนงานใหม่ ช่วงไหนเหมาะสม",
"อยากทำธุรกิจส่วนตัว เหมาะไหม ควรทำธุรกิจประเภทไหน",
"จะได้เลื่อนตำแหน่งไหม",
"งานที่ทำอยู่จะมั่นคงไหม",
"ควรลาออกจากงานไหม",
"จะตกงานไหม",
"อาชีพอะไรเหมาะกับดวงนี้",
"ทำงานกับคนอื่นดีหรือทำงานคนเดียวดี",
"จะประสบความสำเร็จในอาชีพไหม",
"เจ้านายเมตตาไหม",
"ควรทำงานรับราชการหรือเอกชน",
"จะมีคนอิจฉาในที่ทำงานไหม",
"ควรเปลี่ยนสายงานไหม",
"จะได้ทำงานที่ชอบไหม",
"หุ้นส่วนธุรกิจจะซื่อสัตย์ไหม",
"ควรทำงานในประเทศหรือต่างประเทศ",
"จะมีปัญหาในที่ทำงานไหม",
"ควรรับงานฟรีแลนซ์ไหม",
"ธุรกิจจะอยู่รอดไหม",
"ควรขยายธุรกิจไหม",
"จะมีคู่แข่งทางธุรกิจไหม",
"ลูกค้าจะพอใจไหม",
"ควรร่วมทุนกับใครไหม",
"จะได้โบนัสไหม",
"งานที่สมัครจะได้ไหม",
"ควรทำอาชีพเสริมไหม",
"จะเป็นผู้บริหารไหม",
"ควรลงทุนในธุรกิจอะไร",
"จะมีรายได้เสริมไหม",
# Love and relationships (expanded)
"เรื่องความรักเป็นอย่างไรบ้าง จะเจอคนที่ใช่เมื่อไหร่",
"ความรักกับแฟนปัจจุบันจะไปได้ไกลไหม",
"ปีนี้มีโอกาสแต่งงานไหม",
"จะได้เจอแฟนไหม เมื่อไหร่",
"แฟนจะนอกใจไหม",
"จะมีคนมาชอบไหม",
"จะเลิกกับแฟนไหม",
"ความรักจะลงเอยด้วยดีไหม",
"คู่ที่เหมาะสมเป็นคนแบบไหน",
"จะมีกี่ครั้งในชีวิตที่รักจริง",
"จะโสดไปตลอดชีวิตไหม",
"ควรแต่งงานกับคนนี้ไหม",
"จะหย่าร้างไหม",
"จะกลับมาคืนดีกันไหม",
"คนที่แอบชอบจะมาบอกรักไหม",
"จะมีคนที่สามไหม",
"ควรรอคนเก่าไหม",
"จะเจอรักแท้ไหม",
"แฟนจะดีกับเราไหม",
"จะมีปัญหาเรื่องความรักไหม",
"ควรเปิดใจให้คนใหม่ไหม",
"จะอกหักไหม",
"คนที่ชอบจะชอบเราไหม",
"จะแต่งงานกี่ครั้ง",
"คู่ชีวิตจะมาจากไหน",
"จะมีลูกกับคนรักไหม",
"ความรักระยะไกลจะสำเร็จไหม",
"จะเจอคนรักที่ทำงานไหม",
"ควรคบคนอายุเท่าไหร่",
"จะมีความรักออนไลน์ไหม",
# Health (expanded)
"สุขภาพช่วงนี้ต้องระวังอะไรบ้าง",
"มีปัญหาสุขภาพเรื้อรัง จะหายไหม",
"ปีนี้จะป่วยไหม",
"เดือนนี้อาการป่วยจะดีขึ้นไหม",
"ดวงนี้ป่วยเป็นโรคอะไร",
"จะมีอายุยืนไหม",
"ควรระวังโรคอะไรเป็นพิเศษ",
"การผ่าตัดจะสำเร็จไหม",
"ควรไปหาหมอไหม",
"สุขภาพจิตจะแข็งแรงไหม",
"จะมีอุบัติเหตุไหม",
"จะมีปัญหาเรื่องน้ำหนักไหม",
"ควรออกกำลังกายแบบไหน",
"จะมีปัญหาการนอนไหม",
"สายตาจะมีปัญหาไหม",
"ฟันจะมีปัญหาไหม",
"จะมีอาการแพ้ไหม",
"ควรกินอาหารแบบไหน",
"จะเครียดมากไหม",
"จะมีปัญหาผิวหนังไหม",
"ระบบย่อยอาหารจะมีปัญหาไหม",
"จะปวดหัวบ่อยไหม",
"จะมีปัญหากระดูกไหม",
"หัวใจจะแข็งแรงไหม",
"จะมีปัญหาความดันไหม",
"จะเป็นโรคติดต่อไหม",
"จะมีปัญหาฮอร์โมนไหม",
"ควรตรวจสุขภาพเมื่อไหร่",
"จะติดเชื้อไหม",
"จะมีปัญหาการหายใจไหม",
# Wealth and finance (expanded)
"การเงินปีนี้เป็นอย่างไร จะมีโชคลาภไหม",
"อยากลงทุนหุ้น/คริปโต เหมาะไหม",
"หนี้สินเยอะ จะหมดเมื่อไหร่",
"ดวงนี้รวยเมื่อไหร่",
"ดวงนี้รวยไหม",
"จะถูกหวยไหม",
"ควรลงทุนอะไร",
"จะล้มละลายไหม",
"เงินจะพอใช้ไหม",
"จะมีคนมาหลอกเงินไหม",
"ควรให้คนอื่นยืมเงินไหม",
"จะได้มรดกไหม",
"ควรซื้อบ้านหรือเช่าบ้าน",
"การลงทุนจะได้ผลตอบแทนดีไหม",
"จะมีเงินเก็บไหม",
"ควรฝากเงินหรือลงทุน",
"จะได้เงินคืนจากลูกหนี้ไหม",
"ควรซื้อทองไหม",
"จะมีรายได้พิเศษไหม",
"ควรเล่นพนันไหม",
"จะขาดทุนไหม",
"ควรกู้เงินไหม",
"จะมีเงินใช้ตอนแก่ไหม",
"ควรทำประกันไหม",
"จะมีปัญหาภาษีไหม",
"ควรลงทุนอสังหาริมทรัพย์ไหม",
"จะได้โบนัสพิเศษไหม",
"ควรเก็บเงินสดไหม",
"จะมีหนี้นอกระบบไหม",
"ควรขายทรัพย์สินไหม",
# Family (expanded)
"ลูกๆ จะเรียนหนังสือเก่งไหม",
"ความสัมพันธ์ในครอบครัวจะราบรื่นไหม",
"จะมีลูกไหม",
"พ่อแม่จะมีสุขภาพดีไหม",
"พี่น้องจะสามัคคีกันไหม",
"จะมีปัญหากับญาติไหม",
"ลูกจะกตัญญูไหม",
"ควรมีลูกกี่คน",
"จะได้อยู่กับครอบครัวไหม",
"จะมีปัญหากับพ่อตาแม่ยายไหม",
"ลูกจะประสบความสำเร็จไหม",
"จะต้องดูแลพ่อแม่ไหม",
"พี่น้องจะช่วยเหลือไหม",
"จะมีปัญหาเรื่องมรดกไหม",
"ครอบครัวจะอบอุ่นไหม",
"จะมีใครในครอบครัวป่วยไหม",
"ลูกจะได้เรียนสูงไหม",
"จะมีปัญหากับคู่สมรสของลูกไหม",
"หลานจะน่ารักไหม",
"จะได้เลี้ยงหลานไหม",
# Education and learning (expanded)
"จะสอบติดไหม",
"ควรเรียนต่อไหม",
"สาขาอะไรเหมาะกับฉัน",
"จะจบการศึกษาไหม",
"จะได้ทุนการศึกษาไหม",
"จะเรียนเก่งไหม",
"ควรเรียนในประเทศหรือต่างประเทศ",
"จะสอบตกไหม",
"ควรเรียนพิเศษไหม",
"จะมีปัญหากับอาจารย์ไหม",
"จะมีเพื่อนที่ดีไหม",
"ควรย้ายโรงเรียนไหม",
"จะได้เกียรตินิยมไหม",
"ควรเรียนภาษาอะไร",
"จะมีปัญหาการเรียนไหม",
"ควรเรียนปริญญาโทไหม",
"จะได้ไปแลกเปลี่ยนไหม",
"ควรเรียนออนไลน์ไหม",
"จะมีปัญหากับเพื่อนร่วมชั้นไหม",
"ควรเข้ามหาวิทยาลัยไหน",
# Specific timing (expanded)
"เดือนหน้าจะมีเรื่องดีๆ เข้ามาไหม",
"ปีหน้าควรระวังเรื่องอะไรบ้าง",
"วันไหนในเดือนนี้เป็นวันมงคล",
"พรุ่งนี้จะเป็นวันที่ดีไหม",
"สัปดาห์นี้จะมีข่าวดีไหม",
"ช่วงไหนของปีจะดีที่สุด",
"เดือนไหนควรระมัดระวังเป็นพิเศษ",
"วันนี้ควรทำอะไร",
"คืนนี้จะนอนหลับสบายไหม",
"เช้านี้จะมีเรื่องดีไหม",
"บ่ายนี้ควรระวังอะไร",
"เย็นนี้จะเจอใคร",
"วันจันทร์จะเป็นอย่างไร",
"สุดสัปดาห์นี้จะสนุกไหม",
"เดือนนี้จะได้เงินไหม",
"ปีนี้จะมีอะไรเปลี่ยนแปลง",
"ไตรมาสนี้ธุรกิจจะดีไหม",
"ฤดูนี้สุขภาพจะเป็นอย่างไร",
"ช่วงปิดเทอมจะไปไหนไหม",
"วันเกิดปีนี้จะมีความสุขไหม",
# Decision making (expanded)
"กำลังตัดสินใจเรื่องสำคัญ ควรเลือกทางไหน",
"จะย้ายบ้านดีไหม ช่วงไหนเหมาะสม",
"อยากไปเรียนต่อต่างประเทศ เหมาะไหม",
"ควรซื้อรถไหม",
"ควรขายที่ดินไหม",
"ควรเริ่มทำอะไรใหม่ๆ ไหม",
"ควรรับข้อเสนอนี้ไหม",
"ควรไปหรือไม่ไป",
"ควรพูดความจริงไหม",
"ควรให้อภัยไหม",
"ควรเสี่ยงไหม",
"ควรรอหรือรีบทำ",
"ควรเลือกข้อ A หรือ B",
"ควรลงทะเบียนไหม",
"ควรเซ็นสัญญาไหม",
"ควรไว้ใจคนนี้ไหม",
"ควรบอกเลิกไหม",
"ควรเริ่มใหม่ไหม",
"ควรยอมรับไหม",
"ควรปฏิเสธไหม",
# Travel and relocation (expanded)
"จะได้ไปต่างประเทศไหม",
"ควรย้ายไปอยู่ที่อื่นไหม",
"การเดินทางจะปลอดภัยไหม",
"ทิศไหนเหมาะกับดวง",
"ควรอยู่ภาคไหนของประเทศ",
"จะได้ไปเที่ยวไหม",
"ควรไปเที่ยวประเทศไหน",
"จะมีปัญหาระหว่างเดินทางไหม",
"ควรเดินทางคนเดียวหรือกับคนอื่น",
"จะพลาดเที่ยวบินไหม",
"จะทำหนังสือเดินทางหายไหม",
"ควรพักโรงแรมหรือบ้านเพื่อน",
"จะมีอุบัติเหตุทางรถไหม",
"ควรขับรถเองหรือนั่งรถคนอื่น",
"จะติดอยู่ต่างประเทศไหม",
# Legal and disputes (expanded)
"คดีความจะชนะไหม",
"จะมีปัญหาทางกฎหมายไหม",
"ควรฟ้องร้องไหม",
"จะเจอคนโกงไหม",
"จะมีปัญหากับเจ้าหน้าที่ไหม",
"จะถูกฟ้องไหม",
"ควรยอมความไหม",
"จะมีปัญหาเอกสารไหม",
"ทนายความจะช่วยได้ไหม",
"จะต้องขึ้นศาลไหม",
"จะมีปัญหาสัญญาไหม",
"จะถูกหลอกลวงไหม",
"จะมีคนมาเอาเปรียบไหม",
"ควรแจ้งความไหม",
"จะได้ความยุติธรรมไหม",
# Spiritual and luck (expanded)
"ธาตุอะไรเสริมดวง ควรใส่สีอะไร",
"เลขอะไรเป็นเลขมงคล",
"ควรบูชาพระเครื่องไหม",
"ควรทำบุญอะไร",
"มีกรรมเก่าอะไรติดตัวไหม",
"ควรแก้ชงอย่างไร",
"วันไหนเป็นวันดีของฉัน",
"ควรตั้งชื่อลูกว่าอะไร",
"ควรไหว้พระวันไหน",
"ควรสวดมนต์ไหม",
"จะมีสิ่งศักดิ์สิทธิ์คุ้มครองไหม",
"ควรใส่เครื่องรางไหม",
"ควรไปวัดไหนทำบุญ",
"จะมีบุญจากชาติก่อนไหม",
"ควรถือศีลไหม",
"จะมีเทพเจ้าช่วยไหม",
"ควรบนบานไหม",
"จะสมหวังที่บนไว้ไหม",
"ควรเลี้ยงสัตว์อะไร",
"ต้นไม้อะไรเสริมดวง",
# Personal development (expanded)
"จะพัฒนาตัวเองได้อย่างไร",
"จุดแข็งของฉันคืออะไร",
"จุดอ่อนที่ควรแก้ไขคืออะไร",
"จะมีชื่อเสียงไหม",
"จะประสบความสำเร็จในชีวิตไหม",
"ควรทำอะไรเพื่อเปลี่ยนชีวิต",
"จะเป็นคนสำคัญไหม",
"จะมีคนนับถือไหม",
"ควรเรียนรู้อะไรเพิ่ม",
"จะมีความสามารถพิเศษไหม",
"ควรฝึกทักษะอะไร",
"จะเป็นผู้นำไหม",
"จะมีคนตามไหม",
"ควรเปลี่ยนนิสัยอะไร",
"จะมีความคิดสร้างสรรค์ไหม",
"ควรทำงานอดิเรกอะไร",
"จะเก่งด้านไหน",
"ควรออกจากคอมฟอร์ทโซนไหม",
"จะมีพรสวรรค์ด้านไหน",
"ควรฝึกสมาธิไหม",
# Crisis and problems (expanded)
"จะผ่านวิกฤตนี้ไปได้ไหม",
"ปัญหาจะจบเมื่อไหร่",
"จะมีคนช่วยเหลือไหม",
"ควรขอความช่วยเหลือจากใคร",
"สิ่งที่กังวลจะเกิดขึ้นจริงไหม",
"จะแก้ปัญหาได้ไหม",
"ควรหนีปัญหาไหม",
"จะมีทางออกไหม",
"ปัญหาจะซ้ำไหม",
"จะมีปัญหาใหม่ไหม",
"ควรเผชิญหน้ากับปัญหาไหม",
"จะมีคนมาสร้างปัญหาไหม",
"ปัญหาจะใหญ่ขึ้นไหม",
"ควรปล่อยวางไหม",
"จะเครียดมากไหม",
# Business specific
"ควรเปิดร้านอาหารไหม",
"ธุรกิจออนไลน์จะสำเร็จไหม",
"ควรขายของผ่านโซเชียลไหม",
"จะมีลูกค้าเยอะไหม",
"คู่แข่งจะมากไหม",
"ควรทำธุรกิจครอบครัวไหม",
"จะขาดทุนในปีแรกไหม",
"ควรขอสินเชื่อธุรกิจไหม",
"พนักงานจะซื่อสัตย์ไหม",
"ควรเปิดสาขาไหม",
# Technology and modern life
"ควรเรียนเขียนโปรแกรมไหม",
"จะประสบความสำเร็จในสายไอทีไหม",
"ควรทำช่อง YouTube ไหม",
"จะดังในโซเชียลมีเดียไหม",
"ควรเป็นอินฟลูเอนเซอร์ไหม",
"จะถูกแฮกข้อมูลไหม",
"ควรลงทุนใน NFT ไหม",
"จะติดเกมไหม",
"ควรทำงาน Work from home ไหม",
"จะมีปัญหากับเทคโนโลยีไหม",
# Lifestyle choices
"ควรเป็นมังสวิรัติไหม",
"ควรงดเหล้าไหม",
"ควรเลิกบุหรี่ไหม",
"ควรทำศัลยกรรมไหม",
"ควรลดน้ำหนักไหม",
"ควรย้อมผมไหม",
"ควรเลี้ยงสัตว์ไหม",
"ควรอยู่คอนโดหรือบ้าน",
"ควรมีรถไหม",
"ควรใช้ชีวิตแบบมินิมอลไหม",
# Social relationships
"จะมีเพื่อนแท้ไหม",
"เพื่อนจะทรยศไหม",
"ควรตัดขาดกับเพื่อนคนนี้ไหม",
"จะมีศัตรูไหม",
"คนรอบข้างจริงใจไหม",
"จะถูกนินทาไหม",
"จะมีคนอิจฉาไหม",
"ควรให้โอกาสเพื่อนไหม",
"จะโดนหักหลังไหม",
"ควรไว้ใจใครไหม",
# Special occasions
"งานแต่งจะสำเร็จไหม",
"ควรจัดงานใหญ่หรือเล็ก",
"จะมีคนมาร่วมงานเยอะไหม",
"ควรเลื่อนงานไหม",
"งานบวชจะราบรื่นไหม",
"ควรทำพิธีขึ้นบ้านใหม่ไหม",
"จะได้รับเชิญไปงานไหม",
"ควรไปงานศพไหม",
"วันเกิดปีนี้จะพิเศษไหม",
"ควรฉลองอะไรไหม",
# Retirement and elderly
"จะเกษียณสุขสบายไหม",
"ควรเกษียณเมื่อไหร่",
"จะมีเงินพอใช้ตอนแก่ไหม",
"ลูกจะดูแลไหม",
"ควรอยู่บ้านพักคนชราไหม",
"จะแก่อย่างมีความสุขไหม",
"จะมีโรคประจำตัวไหม",
"ควรทำพินัยกรรมไหม",
"จะได้เห็นหลานโตไหม",
"ควรย้ายไปอยู่ต่างจังหวัดไหม",
]
# Randomly select 3 questions
return random.sample(question_templates, 4)
def get_bazi_data(person: Dict) -> Dict:
"""Calculate BaZi data manually"""
# Prepare birth datetime
birth_date = datetime(
person['year'], person['month'], person['day'],
person['hour'], person['minute']
)
# Calculate BaZi pillars manually
birth_pillars = compute_birth_pillars(birth_date)
# Calculate favorable elements
fav_elements = calculate_favorable_elements(birth_pillars)
# Generate future BaZi data
calculator = BaziCalculator(
birth_datetime=birth_date,
gender=person['gender'],
birth_pillars=birth_pillars
)
# Import the generate function from api_v3
from api_v3 import generate_future_bazi_data
future_data = generate_future_bazi_data(calculator, years_ahead=100)
return {
'birth_pillars': birth_pillars,
'favorable_elements': fav_elements,
'future_data': future_data,
'birth_date': birth_date,
'calculator': calculator
}
def create_or_get_context_cache(system_prompt: str) -> str:
"""Create or retrieve context cache for the system prompt
Returns:
cache_name: The name/ID of the cached context
"""
global context_cache
with cache_lock:
if context_cache is not None:
with print_lock:
print(" Using existing context cache for system prompt")
return context_cache
try:
client = genai.Client(api_key=GEMINI_API_KEY)
# Create context cache with the system prompt
with print_lock:
print(" Creating context cache for system prompt (this saves API costs)...")
# Create cache with system instruction using the correct API format
# Contents is required even when using system_instruction
cache = client.caches.create(
model="gemini-2.5-pro", # Use explicit version with context caching support
config=types.CreateCachedContentConfig(
display_name='bazi_system_prompt', # Identifier for the cache
system_instruction=system_prompt, # The long system prompt to cache
contents=[
types.Content(
role="user",
parts=[types.Part(text="System prompt loaded.")],
)
], # Minimal content required by API
ttl="72000s", # Cache for 20 hour (in seconds string format)
)
)
context_cache = cache.name
with print_lock:
print(f" Context cache created: {context_cache}")
print(f" Cache will save ~{len(system_prompt)//4} tokens per request")
return context_cache
except Exception as e:
with print_lock:
print(f" Warning: Could not create context cache: {e}")
print(" Falling back to regular API calls (higher cost)")
return None
def call_gemini_api_with_cache(user_prompt: str, cache_name: str = None, system_prompt: str = None, max_retries: int = 3) -> Tuple[str, Optional[str]]:
"""Call Google Gemini using context caching for cost savings
Returns:
Tuple of (response_text, new_cache_name or None)
"""
client = genai.Client(api_key=GEMINI_API_KEY)
for attempt in range(max_retries):
try:
# Generate response using cached context if available
if cache_name:
try:
# Use cached context - must use same model as cache
model = "gemini-2.5-pro"
# Prepare content for the user prompt
contents = [
types.Content(
role="user",
parts=[types.Part(text=user_prompt)],
)
]
config = types.GenerateContentConfig(
temperature=0.7,
top_k=40,
top_p=0.95,
max_output_tokens=7500,
cached_content=cache_name # Reference the cached content
)
# Generate with cached context
response = client.models.generate_content(
model=model,
contents=contents, # Use Content object format
config=config
)
# Get the text from response
if response.text:
return response.text, cache_name # Return existing cache name
else:
raise Exception("No response text generated")
except Exception as cache_error:
# If cache fails, try to recreate it
if "PERMISSION_DENIED" in str(cache_error) or "not found" in str(cache_error).lower():
with print_lock:
print(f" Cache expired or not found, recreating cache...")
# Try to recreate cache if system_prompt is provided
if system_prompt:
new_cache = create_or_get_context_cache(system_prompt)
if new_cache:
cache_name = new_cache
with print_lock:
print(f" Cache recreated successfully: {new_cache}")
# Retry with new cache
continue
# If no system_prompt or cache creation failed, use regular API
cache_name = None
else:
raise cache_error
# If cache_name is None or cache failed, use regular API
if not cache_name:
# Fallback to regular call without caching
contents = [
types.Content(
role="user",
parts=[
types.Part(text=user_prompt),
],
),
]
config = types.GenerateContentConfig(
temperature=0.7,
top_k=40,
top_p=0.95,
max_output_tokens=7500,
)
# Generate response (collecting all chunks)
response_text = ""
for chunk in client.models.generate_content_stream(
model="gemini-2.5-pro", # Use 2.5 pro for non-cached
contents=contents,
config=config,
):
if chunk.text:
response_text += chunk.text
if response_text:
return response_text, None # No cache used
else:
raise Exception("No response text generated")
except Exception as e:
error_str = str(e)
if attempt < max_retries - 1 and ("503" in error_str or "overloaded" in error_str.lower()):
wait_time = (attempt + 1) * 2
print(f" API overloaded. Retrying in {wait_time} seconds...")
import time
time.sleep(wait_time)
else:
raise Exception(f"Gemini API error: {e}")
# Keep old function for compatibility
def call_gemini_api(prompt: str, max_retries: int = 3) -> str:
"""Legacy function - calls the new cached version"""
response, _ = call_gemini_api_with_cache(prompt, None, None, max_retries)
return response
def generate_dataset_entry(person: Dict, question: str, system_prompt: str, entry_id: str, current: int, total: int, cache_name: str = None) -> Tuple[str, Optional[Dict]]:
"""Generate a single dataset entry (thread-safe) with context caching
Returns:
Tuple of (entry_id, entry_dict or None)
"""
global last_api_call_time, context_cache
try:
with print_lock:
print(f" [{current}/{total}] Thread-{threading.current_thread().name}: Processing {question[:50]}...")
# Get BaZi data (this is computational, no API call)
bazi_data = get_bazi_data(person)
# Format BaZi data in production style
bazi_formatted = format_bazi_production_style(person, bazi_data)
# Format user prompt in production style
user_prompt = f"""{bazi_formatted}
Question: {question}"""
# Rate limiting for API calls
with api_call_lock:
current_time = time.time()
if last_api_call_time > 0:
elapsed = current_time - last_api_call_time
if elapsed < RATE_LIMIT_DELAY:
time.sleep(RATE_LIMIT_DELAY - elapsed)
last_api_call_time = time.time()
# Get response from Gemini using context caching
if cache_name:
# When using cache, only send the user prompt
response, new_cache = call_gemini_api_with_cache(user_prompt, cache_name, system_prompt)
# Update global cache if it was recreated
if new_cache and new_cache != cache_name:
with cache_lock:
context_cache = new_cache
with print_lock:
print(f" Updated global cache to: {new_cache}")
else:
# Fallback to combined prompt
full_prompt = f"{system_prompt}\n\n---\n\n{user_prompt}"
response, _ = call_gemini_api_with_cache(full_prompt, None, None)
with print_lock:
print(f" Thread-{threading.current_thread().name}: Successfully generated response")
# Format as OpenAI training data
return (entry_id, {
"messages": [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt},
{"role": "assistant", "content": response}
]
})
except Exception as e:
with print_lock:
print(f" Thread-{threading.current_thread().name}: Error - {str(e)}")
return (entry_id, None)
def write_entry_to_file(output_file: str, entry: Dict) -> None:
"""Thread-safe writing to file"""
with file_lock:
with open(output_file, 'a', encoding='utf-8') as f:
f.write(json.dumps(entry, ensure_ascii=False) + '\n')
f.flush()
def save_progress(progress_file: str, processed_entries: set) -> None:
"""Thread-safe progress saving"""
with progress_lock:
with open(progress_file, 'w') as pf:
json.dump({'processed': list(processed_entries)}, pf)
def main():
"""Main function to generate the dataset with multithreading and context caching"""
print(f"Starting dataset generation with {MAX_WORKERS} worker threads...")
print(f"Using Google Gemini Context Caching to reduce API costs\n")
# Load system prompt
system_prompt = load_system_prompt()
print(f"System prompt loaded: {len(system_prompt)} characters")
# Create context cache for the system prompt to save costs
cache_name = create_or_get_context_cache(system_prompt)
if cache_name:
print(f"✓ Context caching enabled - will save ~{len(system_prompt)//4} tokens per request\n")
else:
print("⚠ Context caching disabled - using regular API calls\n")
# Output file - use absolute paths based on script location
script_dir = os.path.dirname(os.path.abspath(__file__))
output_file = os.path.join(script_dir, "bazi_training_data.jsonl")
progress_file = os.path.join(script_dir, ".generation_progress.json")
# Load progress if exists
processed_entries = set()
if os.path.exists(progress_file):
try:
with open(progress_file, 'r') as f:
progress_data = json.load(f)
processed_entries = set(progress_data.get('processed', []))
print(f"Resuming from previous run. Already processed: {len(processed_entries)} entries")
except:
processed_entries = set()
# Count existing entries in JSONL
existing_count = 0
if os.path.exists(output_file):
with open(output_file, 'r', encoding='utf-8') as f:
existing_count = sum(1 for line in f if line.strip())
print(f"Found {existing_count} existing entries in {output_file}")
# Generate random birthdays with seed for consistency
random.seed(18) # Fixed seed for reproducible birthdays
birthdays = generate_random_birthdays(500)
print(f"Generated {len(birthdays)} random birthdays")
# Prepare all tasks
tasks = []
current = 0
total = len(birthdays) * 4 # 4 questions per birthday
for person in birthdays:
# Use seed based on person for consistent questions
person_seed = hash(f"{person['name']}_{person['year']}_{person['month']}_{person['day']}")
random.seed(person_seed)
questions = generate_questions()
for question in questions:
current += 1
# Create unique ID for this entry
entry_id = f"{person['name']}_{person['year']}{person['month']:02d}{person['day']:02d}_{hash(question)}"
# Skip if already processed
if entry_id not in processed_entries:
tasks.append((person, question, system_prompt, entry_id, current, total))
print(f"\nTotal tasks to process: {len(tasks)}")
print(f"Already processed: {len(processed_entries)}")
print(f"Remaining: {len(tasks)}")
if not tasks:
print("No new tasks to process. Exiting.")
return
# Process tasks with thread pool
generated_count = 0
failed_count = 0
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
# Submit all tasks
future_to_task = {}
for task in tasks:
person, question, system_prompt, entry_id, idx, total = task
# Use the latest cache_name (might be updated by other threads)
current_cache = context_cache if context_cache else cache_name
future = executor.submit(generate_dataset_entry, person, question, system_prompt, entry_id, idx, total, current_cache)
future_to_task[future] = (person, question, entry_id)
# Process completed tasks
for future in as_completed(future_to_task):
person, question, entry_id = future_to_task[future]
try:
result_entry_id, entry = future.result(timeout=600) # 10 minute timeout per task
if entry:
# Write to file
write_entry_to_file(output_file, entry)
# Update progress
with progress_lock:
processed_entries.add(result_entry_id)
generated_count += 1
# Save progress after EVERY entry (not just every 10)
save_progress(progress_file, processed_entries)
with print_lock:
print(f"✓ Saved entry #{existing_count + generated_count}: {person['name']} - {question[:30]}...")
else:
failed_count += 1
with print_lock:
print(f"✗ Failed: {person['name']} - {question[:30]}...")
except Exception as e:
failed_count += 1
with print_lock:
print(f"✗ Error processing {person['name']}: {str(e)}")
# Final progress save
if generated_count > 0:
save_progress(progress_file, processed_entries)
print(f"\n" + "="*60)
print(f"Dataset generation complete!")
print(f"Generated: {generated_count} new entries")
print(f"Failed: {failed_count} entries")
print(f"Total entries in file: {existing_count + generated_count}")
print(f"Saved to: {output_file}")
# Clean up progress file if completed successfully
if len(processed_entries) >= total:
if os.path.exists(progress_file):
os.remove(progress_file)
print("Cleaned up progress file (generation complete)")
if __name__ == "__main__":
main()