Files
2024-12-23 18:21:10 +02:00

185 lines
6.4 KiB
Python

import yt_dlp
import librosa
from librosa.feature.rhythm import tempo
import numpy as np
import os
import json
import requests
import pandas as pd
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
from tqdm import tqdm
class NoOpLogger:
def debug(self, msg):
pass
def warning(self, msg):
pass
def error(self, msg):
pass
# Constants
COOKIES_PATH = "Downloader/secret/youtube_cookies.txt"
TEMP_AUDIO_DIR = "temp_audio" # dir to store temporary audio files in
OUTPUT_FILE = "output.parquet"
ERROR_LOG_FILE = "err.log"
MAX_WORKERS = 10
DOWNLOAD_LONG = False # Set to True to allow downloading songs over 15 minutes
# Ensure temporary directory exists
os.makedirs(TEMP_AUDIO_DIR, exist_ok=True)
# Function to log errors
def log_error(message: str):
with open(ERROR_LOG_FILE, "a") as log_file:
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
log_file.write(f"[{timestamp}] {message}\n")
def get_youtube_music_info(url: str):
try:
ydl_opts = {
'quiet': True,
'no_warnings': True,
'skip_download': True,
'logger': NoOpLogger(), # Suppress all yt_dlp logs
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(url, download=False)
return {
'title': info.get('title', 'No title found'),
'duration': info.get('duration', 0), # duration in seconds
}
except Exception as e:
log_error(f"Failed to retrieve info for URL {url}: {e}")
return {'title': 'Unknown Title', 'duration': 0}
def download_audio(video_url, output_path, cookies_path):
try:
ydl_opts = {
"format": "bestaudio/best",
"cookiefile": cookies_path,
"postprocessors": [
{"key": "FFmpegExtractAudio", "preferredcodec": "wav"}
],
"outtmpl": output_path,
"logger": NoOpLogger(), # Suppress all yt_dlp logs
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
ydl.download([video_url])
except Exception as e:
log_error(f"Failed to download audio for {video_url}: {e}")
raise
def extract_audio_features(audio_path):
try:
y, sr = librosa.load(audio_path, sr=None)
features = {
"tempo": tempo(y=y, sr=sr)[0],
"mfcc": np.mean(librosa.feature.mfcc(y=y, sr=sr, n_mfcc=13).T, axis=0),
"spectral_contrast": np.mean(librosa.feature.spectral_contrast(y=y, sr=sr).T, axis=0),
"chroma_stft": np.mean(librosa.feature.chroma_stft(y=y, sr=sr).T, axis=0),
}
return features
except Exception as e:
log_error(f"Failed to extract features from {audio_path}: {e}")
raise
def fetch_metadata(title, artist="Unknown"):
try:
base_url = "https://musicbrainz.org/ws/2/recording/"
params = {"query": title, "fmt": "json"}
response = requests.get(base_url, params=params)
if response.status_code == 200:
results = response.json().get("recordings", [])
if results:
metadata = {
"title": results[0].get("title"),
"artist": results[0].get("artist-credit", [{}])[0].get("artist", {}).get("name"),
"release_date": results[0].get("first-release-date"),
"genres": results[0].get("tags", []),
}
return metadata
log_error(f"No results from MusicBrainz for {title} by {artist}")
except Exception as e:
log_error(f"Failed to fetch metadata for {title}: {e}")
return {"title": title, "artist": artist, "release_date": None, "genres": []}
def process_song(video_url):
info = get_youtube_music_info(video_url)
title = info['title']
duration = info['duration'] # duration in seconds
# Check if the song exceeds the allowed length
if not DOWNLOAD_LONG and duration > 15 * 60:
log_error(f"Skipped {title} (Duration: {duration / 60:.2f} minutes) - too long.")
with open(ERROR_LOG_FILE, "a") as log_file:
log_file.write(f"{video_url},")
return None
audio_path = os.path.join(TEMP_AUDIO_DIR, f"{title.replace(' ', '_')}.wav")
try:
download_audio(video_url, audio_path.replace(".wav", ""), COOKIES_PATH)
audio_features = extract_audio_features(audio_path)
metadata = fetch_metadata(title)
data = {
**metadata,
**{f"mfcc_{i}": val for i, val in enumerate(audio_features["mfcc"])},
**{f"spectral_contrast_{i}": val for i, val in enumerate(audio_features["spectral_contrast"])},
**{f"chroma_stft_{i}": val for i, val in enumerate(audio_features["chroma_stft"])},
"tempo": audio_features["tempo"],
}
return data
except Exception as e:
log_error(f"Failed to process song {title} from URL {video_url}: {e}")
return None
finally:
if os.path.exists(audio_path):
os.remove(audio_path)
def read_urls_from_json(data_dir):
urls = []
for filename in os.listdir(data_dir):
if filename.endswith(".json"):
file_path = os.path.join(data_dir, filename)
try:
with open(file_path, "r") as f:
data = json.load(f)
if isinstance(data, list):
urls.extend(data)
elif isinstance(data, dict) and "url" in data:
urls.append(data["url"])
except json.JSONDecodeError as e:
log_error(f"Failed to read JSON file {file_path}: {e}")
return [url for url in urls if url]
if __name__ == "__main__":
try:
songs = read_urls_from_json('data')
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
with tqdm(total=len(songs), desc="Processing songs", unit="song") as pbar:
results = []
for result in executor.map(process_song, songs):
results.append(result)
pbar.update(1)
processed_data = [result for result in results if result is not None]
df = pd.DataFrame(processed_data)
df.to_parquet(OUTPUT_FILE, engine="pyarrow", index=False)
except Exception as e:
log_error(f"Pipeline failed: {e}")
finally:
if os.path.exists(TEMP_AUDIO_DIR):
os.rmdir(TEMP_AUDIO_DIR)