Spaces:
Sleeping
Sleeping
import gradio as gr | |
import requests | |
from pydub import AudioSegment | |
import shazamio | |
import os | |
import time | |
import tempfile | |
from pathlib import Path | |
import sys | |
import asyncio | |
import base64 # Hinzugefügt, um base64 zu importieren | |
# Ersetzen Sie dies durch Ihre tatsächliche Client-ID und Client-Secret | |
client_id = os.environ.get('SOUNDCLOUD_CLIENT_ID') | |
client_secret = os.environ.get('SOUNDCLOUD_CLIENT_SECRET') | |
token = os.environ.get('TOKEN') | |
def get_soundcloud_access_token(client_id, client_secret): | |
try: | |
auth_string = f'{client_id}:{client_secret}' | |
auth_headers = { | |
'Authorization': 'Basic ' + base64.b64encode(auth_string.encode()).decode() | |
} | |
data = { | |
'grant_type': 'client_credentials' | |
} | |
response = requests.post('https://api.soundcloud.com/oauth2/token', headers=auth_headers, data=data) | |
response.raise_for_status() # Verwenden Sie raise_for_status, um Fehler zu behandeln | |
token_data = response.json() | |
return token_data['access_token'] | |
except requests.RequestException as e: | |
return f"Fehler beim Abrufen des Zugriffstokens: {str(e)}" | |
def download_audio(streaming_url, output_path, headers): | |
try: | |
response = requests.get(streaming_url, headers=headers, stream=True) | |
response.raise_for_status() # Verwenden Sie raise_for_status, um Fehler zu behandeln | |
with open(output_path, 'wb') as f: | |
for chunk in response.iter_content(chunk_size=8192): | |
f.write(chunk) | |
except requests.RequestException as e: | |
raise Exception(f"Fehler beim Herunterladen des Audios: {str(e)}") | |
async def identify_track(shazam, audio_chunk, temp_dir): | |
try: | |
output_directory = "/tmp" | |
temp_file = os.path.join(temp_dir, 'temp_chunk.wav') | |
audio_chunk.export(output_directory, format='wav') | |
result = await shazam.recognize_file(temp_file) | |
if result and 'track' in result: | |
track_data = result['track'] | |
return { | |
'title': track_data['title'], | |
'subtitle': track_data['subtitle'] | |
} | |
return None | |
except Exception as e: | |
return f"Fehler bei der Identifizierung des Tracks: {str(e)}" | |
async def process_dj_set(track_url, progress=gr.Progress()): | |
with tempfile.TemporaryDirectory() as temp_dir: # Use a temporary directory | |
output_path = os.path.join(temp_dir, 'track.wav') | |
tracklist_path = os.path.join(temp_dir, 'tracklist.txt') | |
status_messages = [] | |
# Überprüfen Sie die Umgebungsvariablen | |
if not client_id or not client_secret or not token: | |
return "", "", "Umgebungsvariablen sind nicht gesetzt." | |
# Zugriffstoken abrufen | |
access_token = get_soundcloud_access_token(client_id, client_secret) | |
if isinstance(access_token, str) and access_token.startswith("Fehler"): | |
return "", "", access_token | |
headers = { | |
'Authorization': f'Bearer {access_token}' | |
} | |
# Track-URL auflösen | |
try: | |
resolve_response = requests.get(f'https://api.soundcloud.com/resolve.json?url={track_url}', headers=headers) | |
resolve_response.raise_for_status() # Verwenden Sie raise_for_status, um Fehler zu behandeln | |
track_data = resolve_response.json() | |
streaming_url = track_data['stream_url'] | |
status_messages.append("Track-URL erfolgreich aufgelöst.") | |
except requests.RequestException as e: | |
return "", "", f"Fehler beim Auflösen der Track-URL: {str(e)}" | |
# Audio herunterladen | |
try: | |
download_audio(streaming_url, output_path, headers) | |
status_messages.append("Audio erfolgreich heruntergeladen.") | |
except Exception as e: | |
return "", "", f"Fehler beim Herunterladen des Audios: {str(e)}" | |
# Audio laden | |
try: | |
audio = AudioSegment.from_wav(output_path) | |
status_messages.append("Audio erfolgreich geladen.") | |
except Exception as e: | |
return "", "", f"Fehler beim Laden des Audios: {str(e)}" | |
# Audio in Chunks aufteilen | |
chunk_duration = 30000 # 30 Sekunden | |
overlap = 10000 # 10 Sekunden | |
chunks = [] | |
start = 0 | |
while start + chunk_duration < len(audio): | |
end = start + chunk_duration | |
chunk = audio[start:end] | |
chunks.append((start, chunk)) | |
start += chunk_duration - overlap | |
# Shazam initialisieren | |
shazam = shazamio.Shazam() | |
# Tracks identifizieren | |
tracklist = [] | |
for start_time, chunk in chunks: | |
progress(0.1) | |
track_info = await identify_track(shazam, chunk, temp_dir) | |
if isinstance(track_info, dict): | |
timestamp = time.strftime("%M:%S", time.gmtime(start_time / 1000)) | |
tracklist.append(f"{timestamp} - {track_info['title']} von {track_info['subtitle']}") | |
elif isinstance(track_info, str): | |
status_messages.append(track_info) | |
# Tracklist-Ausgabe vorbereiten | |
tracklist_output = "\n".join(tracklist) | |
with open(tracklist_path, 'w') as f: | |
f.write(tracklist_output) | |
# Temporäres Verzeichnis bereinigen | |
import shutil | |
shutil.rmtree(temp_dir) | |
# Statusnachricht vorbereiten | |
status_message = "\n".join(status_messages) | |
if not tracklist: | |
status_message += "\nKeine Tracks identifiziert." | |
return tracklist_output, tracklist_path, status_message | |
css = """ | |
#col-container { | |
margin: 0 auto; | |
max-width: 640px; | |
} | |
""" | |
with gr.Blocks(css=css) as demo: | |
with gr.Column(elem_id="col-container"): | |
gr.Markdown("# SoundCloud DJ Set Track Identifier") | |
status_text = gr.Markdown(elem_id="status_text") | |
with gr.Row(): | |
track_url = gr.Text( | |
label="SoundCloud DJ Set URL", | |
show_label=False, | |
max_lines=1, | |
placeholder="Geben Sie die SoundCloud DJ-Set-URL ein", | |
container=False, | |
) | |
run_button = gr.Button("Verarbeiten", scale=0, variant="primary") | |
result = gr.Textbox(label="Trackliste", show_label=False) | |
with gr.Accordion("Trackliste herunterladen", open=False): | |
download_button = gr.File(label="Herunterladen") | |
gr.Examples(examples=["https://soundcloud.com/your-track-url"], inputs=[track_url]) | |
run_button.click(process_dj_set, inputs=track_url, outputs=[result, download_button, status_text]) | |
if __name__ == "__main__": | |
demo.launch(share=False) |