Files
IPTV-Updates/patches/v2.7.4/vod_metadata_service_fixed.py
IPTV Server Updates 4f515bbd61 Release v2.7.4 - Critical VOD System Fixes
- Fixed SQLAlchemy import issues in VOD models
- Fixed TMDB/OMDB API authentication and rate limiting
- Fixed VOD directory path resolution and permission errors
- Fixed rental system transaction handling
- Added HLS streaming support for VOD content
- Implemented Redis caching for performance
- Added watch progress tracking
- Enhanced search with multi-field support
- Added health check endpoint

This patch resolves critical production issues in the VOD system.
2025-09-21 22:19:16 +00:00

905 lines
33 KiB
Python

"""
VOD Metadata Service - Enhanced with proper error handling and caching
"""
import asyncio
import aiohttp
import json
import re
import logging
import hashlib
import os
from typing import Dict, List, Optional, Union
from datetime import datetime, timedelta
from urllib.parse import quote_plus, urljoin
from bs4 import BeautifulSoup
import requests
from sqlalchemy.orm import Session
from redis import Redis
from vod_models import VODContent, VODGenre, VODContentGenre, VODCast, ContentType
logger = logging.getLogger(__name__)
class MetadataProvider:
"""Base class for metadata providers with enhanced error handling"""
def __init__(self, config: Dict = None):
self.config = config or {}
self.session = None
self.timeout = aiohttp.ClientTimeout(total=10)
self.retry_count = 3
async def __aenter__(self):
self.session = aiohttp.ClientSession(timeout=self.timeout)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.session:
await self.session.close()
async def search(self, title: str, year: Optional[int] = None,
content_type: str = 'movie') -> List[Dict]:
"""Search for content by title"""
raise NotImplementedError
async def get_details(self, external_id: str) -> Optional[Dict]:
"""Get detailed information by external ID"""
raise NotImplementedError
async def _make_request(self, url: str, params: Dict = None, headers: Dict = None) -> Optional[Dict]:
"""Make HTTP request with retry logic"""
for attempt in range(self.retry_count):
try:
async with self.session.get(url, params=params, headers=headers) as response:
if response.status == 200:
if 'application/json' in response.headers.get('Content-Type', ''):
return await response.json()
else:
return {'text': await response.text()}
elif response.status == 429: # Rate limit
logger.warning(f"Rate limit hit, waiting {2 ** attempt} seconds...")
await asyncio.sleep(2 ** attempt)
elif response.status == 401:
logger.error("Authentication failed - check API key")
return None
else:
logger.warning(f"Request failed with status {response.status}")
return None
except asyncio.TimeoutError:
logger.warning(f"Request timeout on attempt {attempt + 1}")
if attempt < self.retry_count - 1:
await asyncio.sleep(1)
except Exception as e:
logger.error(f"Request error: {e}")
if attempt < self.retry_count - 1:
await asyncio.sleep(1)
return None
class IMDBProvider(MetadataProvider):
"""IMDB metadata provider with enhanced scraping"""
def __init__(self, config: Dict = None):
super().__init__(config)
self.base_url = "https://www.imdb.com"
self.search_url = "https://www.imdb.com/find"
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.5',
'Accept-Encoding': 'gzip, deflate',
'Connection': 'keep-alive',
}
async def search(self, title: str, year: Optional[int] = None,
content_type: str = 'movie') -> List[Dict]:
"""Search IMDB for content"""
try:
query = f"{title} {year}" if year else title
params = {'q': query, 'ref_': 'nv_sr_sm'}
result = await self._make_request(self.search_url, params=params, headers=self.headers)
if result and 'text' in result:
return self._parse_search_results(result['text'], content_type)
return []
except Exception as e:
logger.error(f"IMDB search error: {e}")
return []
def _parse_search_results(self, html: str, content_type: str) -> List[Dict]:
"""Parse IMDB search results safely"""
results = []
try:
soup = BeautifulSoup(html, 'html.parser')
result_sections = soup.find_all('section', {'data-testid': 'find-results-section-title'})
for section in result_sections:
items = section.find_all('li', class_='find-result-item')
for item in items[:5]:
try:
result = self._parse_search_item(item, content_type)
if result:
results.append(result)
except Exception as e:
logger.debug(f"Failed to parse search item: {e}")
continue
except Exception as e:
logger.error(f"Failed to parse IMDB search results: {e}")
return results
def _parse_search_item(self, item, content_type: str) -> Optional[Dict]:
"""Parse individual search result item"""
try:
link_elem = item.find('a')
if not link_elem:
return None
href = link_elem.get('href', '')
imdb_id = self._extract_imdb_id(href)
if not imdb_id:
return None
title_elem = link_elem.find('img')
title = title_elem.get('alt', '') if title_elem else link_elem.get_text(strip=True)
year = None
result_text = item.get_text()
year_match = re.search(r'\((\d{4})\)', result_text)
if year_match:
year = int(year_match.group(1))
type_info = self._determine_content_type(result_text, href)
return {
'imdb_id': imdb_id,
'title': title,
'year': year,
'type': type_info,
'url': urljoin(self.base_url, href)
}
except Exception as e:
logger.debug(f"Failed to parse search item: {e}")
return None
def _extract_imdb_id(self, href: str) -> Optional[str]:
"""Extract IMDB ID from href"""
match = re.search(r'/title/(tt\d+)/', href)
return match.group(1) if match else None
def _determine_content_type(self, text: str, href: str) -> str:
"""Determine content type from search result"""
text_lower = text.lower()
if 'tv series' in text_lower or 'tv mini' in text_lower:
return 'tv_series'
elif 'episode' in text_lower:
return 'episode'
elif 'documentary' in text_lower:
return 'documentary'
else:
return 'movie'
async def get_details(self, imdb_id: str) -> Optional[Dict]:
"""Get detailed information from IMDB"""
try:
url = f"{self.base_url}/title/{imdb_id}/"
result = await self._make_request(url, headers=self.headers)
if result and 'text' in result:
return self._parse_details(result['text'], imdb_id)
return None
except Exception as e:
logger.error(f"IMDB details error: {e}")
return None
def _parse_details(self, html: str, imdb_id: str) -> Optional[Dict]:
"""Parse IMDB title page for detailed information"""
try:
soup = BeautifulSoup(html, 'html.parser')
details = {
'imdb_id': imdb_id,
'source': 'imdb'
}
# Title
title_elem = soup.find('h1', {'data-testid': 'hero-title-block__title'})
if title_elem:
details['title'] = title_elem.get_text(strip=True)
# Year
year_elem = soup.find('a', href=re.compile(r'releaseinfo'))
if year_elem:
year_text = year_elem.get_text(strip=True)
year_match = re.search(r'(\d{4})', year_text)
if year_match:
details['release_year'] = int(year_match.group(1))
# Rating
rating_elem = soup.find('span', class_='sc-7ab21ed2-1')
if rating_elem:
try:
details['imdb_rating'] = float(rating_elem.get_text(strip=True))
except ValueError:
pass
# Runtime
runtime_elem = soup.find('li', {'data-testid': 'title-techspec_runtime'})
if runtime_elem:
runtime_text = runtime_elem.get_text(strip=True)
runtime_match = re.search(r'(\d+)\s*min', runtime_text)
if runtime_match:
details['runtime_minutes'] = int(runtime_match.group(1))
# Genres
genres = []
genre_elems = soup.find_all('a', href=re.compile(r'/search/title.*genre'))
for elem in genre_elems:
genre = elem.get_text(strip=True)
if genre and genre not in genres:
genres.append(genre)
details['genres'] = genres[:5]
# Plot
plot_elem = soup.find('span', {'data-testid': 'plot-xl'})
if plot_elem:
details['description'] = plot_elem.get_text(strip=True)
# Cast
cast = []
cast_section = soup.find('section', {'data-testid': 'title-cast'})
if cast_section:
cast_items = cast_section.find_all('div', {'data-testid': 'title-cast-item'})
for item in cast_items[:10]:
name_elem = item.find('a', {'data-testid': 'title-cast-item__actor'})
if name_elem:
name = name_elem.get_text(strip=True)
char_elem = item.find('a', {'data-testid': 'cast-item-characters-link'})
character = char_elem.get_text(strip=True) if char_elem else None
cast.append({
'name': name,
'role': 'actor',
'character': character
})
details['cast'] = cast
# Poster
poster_elem = soup.find('img', class_='ipc-image')
if poster_elem and poster_elem.get('src'):
poster_url = poster_elem['src']
poster_url = re.sub(r'_V1_.*?\.jpg', '_V1_.jpg', poster_url)
details['poster_url'] = poster_url
return details
except Exception as e:
logger.error(f"Failed to parse IMDB details: {e}")
return None
class TMDBProvider(MetadataProvider):
"""The Movie Database (TMDB) provider with proper API handling"""
def __init__(self, config: Dict = None):
super().__init__(config)
self.api_key = config.get('tmdb_api_key') if config else os.getenv('TMDB_API_KEY', '')
self.base_url = "https://api.themoviedb.org/3"
self.image_base_url = "https://image.tmdb.org/t/p/w500"
if not self.api_key:
logger.warning("TMDB API key not configured - provider disabled")
async def search(self, title: str, year: Optional[int] = None,
content_type: str = 'movie') -> List[Dict]:
"""Search TMDB for content"""
if not self.api_key:
return []
try:
endpoint = '/search/movie' if content_type == 'movie' else '/search/tv'
url = f"{self.base_url}{endpoint}"
params = {
'api_key': self.api_key,
'query': title,
'language': 'en-US'
}
if year:
if content_type == 'movie':
params['year'] = year
else:
params['first_air_date_year'] = year
result = await self._make_request(url, params=params)
if result:
return self._parse_search_results(result, content_type)
return []
except Exception as e:
logger.error(f"TMDB search error: {e}")
return []
def _parse_search_results(self, data: Dict, content_type: str) -> List[Dict]:
"""Parse TMDB search results"""
results = []
try:
for item in data.get('results', [])[:5]:
result = {
'tmdb_id': item['id'],
'source': 'tmdb',
'type': content_type
}
if content_type == 'movie':
result['title'] = item.get('title', '')
result['original_title'] = item.get('original_title', '')
if item.get('release_date'):
try:
result['year'] = int(item['release_date'][:4])
except (ValueError, IndexError):
pass
else:
result['title'] = item.get('name', '')
result['original_title'] = item.get('original_name', '')
if item.get('first_air_date'):
try:
result['year'] = int(item['first_air_date'][:4])
except (ValueError, IndexError):
pass
result['description'] = item.get('overview', '')
if item.get('poster_path'):
result['poster_url'] = f"{self.image_base_url}{item['poster_path']}"
results.append(result)
except Exception as e:
logger.error(f"Failed to parse TMDB search results: {e}")
return results
async def get_details(self, tmdb_id: Union[str, int]) -> Optional[Dict]:
"""Get detailed information from TMDB"""
if not self.api_key:
return None
try:
for endpoint in ['/movie/', '/tv/']:
url = f"{self.base_url}{endpoint}{tmdb_id}"
params = {
'api_key': self.api_key,
'language': 'en-US',
'append_to_response': 'credits,keywords,images'
}
result = await self._make_request(url, params=params)
if result:
return self._parse_details(result, endpoint.strip('/'))
return None
except Exception as e:
logger.error(f"TMDB details error: {e}")
return None
def _parse_details(self, data: Dict, content_type: str) -> Dict:
"""Parse TMDB detailed response"""
details = {
'tmdb_id': data['id'],
'source': 'tmdb',
'type': content_type
}
try:
# Basic info
if content_type == 'movie':
details['title'] = data.get('title', '')
details['original_title'] = data.get('original_title', '')
if data.get('release_date'):
try:
details['release_year'] = int(data['release_date'][:4])
except (ValueError, IndexError):
pass
else:
details['title'] = data.get('name', '')
details['original_title'] = data.get('original_name', '')
if data.get('first_air_date'):
try:
details['release_year'] = int(data['first_air_date'][:4])
except (ValueError, IndexError):
pass
details['description'] = data.get('overview', '')
details['imdb_rating'] = data.get('vote_average')
if content_type == 'movie' and data.get('runtime'):
details['runtime_minutes'] = data['runtime']
# Genres
genres = []
for genre in data.get('genres', []):
genres.append(genre['name'])
details['genres'] = genres
# Images
if data.get('poster_path'):
details['poster_url'] = f"{self.image_base_url}{data['poster_path']}"
if data.get('backdrop_path'):
details['backdrop_url'] = f"https://image.tmdb.org/t/p/w1280{data['backdrop_path']}"
# Cast and crew
cast = []
credits = data.get('credits', {})
for person in credits.get('cast', [])[:10]:
cast.append({
'name': person['name'],
'role': 'actor',
'character': person.get('character')
})
for person in credits.get('crew', []):
if person.get('job') in ['Director', 'Producer', 'Writer']:
cast.append({
'name': person['name'],
'role': person['job'].lower(),
'character': None
})
details['cast'] = cast
except Exception as e:
logger.error(f"Error parsing TMDB details: {e}")
return details
class VODMetadataService:
"""Enhanced VOD Metadata Service with caching and fallback"""
def __init__(self, db: Session, config: Dict = None):
self.db = db
self.config = config or {}
# Initialize Redis cache
self.redis_client = self._init_redis()
self.cache_ttl = 3600 # 1 hour
# Initialize providers
self.imdb_provider = IMDBProvider(config)
self.tmdb_provider = TMDBProvider(config)
# Priority order
self.providers = []
if self.tmdb_provider.api_key:
self.providers.append(self.tmdb_provider)
self.providers.append(self.imdb_provider)
logger.info(f"VOD Metadata Service initialized with {len(self.providers)} providers")
def _init_redis(self) -> Optional[Redis]:
"""Initialize Redis connection for caching"""
try:
redis_host = os.getenv('REDIS_HOST', 'localhost')
redis_port = int(os.getenv('REDIS_PORT', '6379'))
redis_db = int(os.getenv('REDIS_METADATA_DB', '3'))
client = Redis(
host=redis_host,
port=redis_port,
db=redis_db,
decode_responses=True,
socket_connect_timeout=5,
socket_timeout=5
)
client.ping()
logger.info("Redis cache connected for metadata")
return client
except Exception as e:
logger.warning(f"Redis not available for metadata caching: {e}")
return None
def _get_cache_key(self, title: str, year: Optional[int], content_type: str) -> str:
"""Generate cache key for metadata"""
key_parts = [title.lower(), str(year) if year else 'none', content_type]
key_string = ':'.join(key_parts)
return f"vod:metadata:{hashlib.md5(key_string.encode()).hexdigest()}"
async def _get_cached_metadata(self, cache_key: str) -> Optional[Dict]:
"""Get metadata from cache if available"""
if not self.redis_client:
return None
try:
cached_data = self.redis_client.get(cache_key)
if cached_data:
return json.loads(cached_data)
except Exception as e:
logger.debug(f"Cache retrieval error: {e}")
return None
async def _set_cached_metadata(self, cache_key: str, metadata: Dict):
"""Set metadata in cache"""
if not self.redis_client:
return
try:
self.redis_client.setex(cache_key, self.cache_ttl, json.dumps(metadata))
except Exception as e:
logger.debug(f"Cache storage error: {e}")
async def enrich_content_metadata(self, content_id: int) -> Dict:
"""Enrich content with metadata from external sources"""
try:
content = self.db.query(VODContent).filter(
VODContent.id == content_id
).first()
if not content:
raise ValueError(f"Content not found: {content_id}")
# Skip if already has external metadata
if content.imdb_id or content.tmdb_id:
logger.info(f"Content {content_id} already has metadata")
return {'status': 'already_enriched'}
# Search for metadata
metadata = await self.search_metadata(
content.title,
content.release_year,
content.content_type
)
if not metadata:
logger.warning(f"No metadata found for content: {content.title}")
return {'status': 'no_metadata_found'}
# Update content with metadata
await self.apply_metadata_to_content(content, metadata)
return {
'status': 'enriched',
'source': metadata.get('source'),
'title': metadata.get('title')
}
except Exception as e:
logger.error(f"Failed to enrich content {content_id}: {e}")
return {'status': 'error', 'error': str(e)}
async def search_metadata(self, title: str, year: Optional[int] = None,
content_type: str = 'movie') -> Optional[Dict]:
"""Search for metadata across providers with caching"""
# Check cache first
cache_key = self._get_cache_key(title, year, content_type)
cached_metadata = await self._get_cached_metadata(cache_key)
if cached_metadata:
logger.info(f"Metadata found in cache for: {title}")
return cached_metadata
# Map content types
search_type = content_type
if content_type in [ContentType.TV_SERIES, ContentType.EPISODE]:
search_type = 'tv_series'
elif content_type in [ContentType.MOVIE, ContentType.DOCUMENTARY]:
search_type = 'movie'
# Search across providers
for provider in self.providers:
try:
async with provider:
results = await provider.search(title, year, search_type)
if not results:
continue
# Get details for best match
best_match = self._find_best_match(results, title, year)
if best_match:
if hasattr(provider, 'get_details'):
external_id = best_match.get('imdb_id') or best_match.get('tmdb_id')
if external_id:
details = await provider.get_details(external_id)
if details:
# Cache the result
await self._set_cached_metadata(cache_key, details)
return details
# Cache and return search result if no detailed info
await self._set_cached_metadata(cache_key, best_match)
return best_match
except Exception as e:
logger.error(f"Provider {provider.__class__.__name__} failed: {e}")
continue
# Generate basic metadata as fallback
basic_metadata = self._generate_basic_metadata(title, year, content_type)
await self._set_cached_metadata(cache_key, basic_metadata)
return basic_metadata
def _find_best_match(self, results: List[Dict], original_title: str,
original_year: Optional[int] = None) -> Optional[Dict]:
"""Find best matching result from search results"""
if not results:
return None
best_score = 0
best_match = None
for result in results:
score = 0
# Title similarity
result_title = result.get('title', '').lower()
original_lower = original_title.lower()
# Exact match
if result_title == original_lower:
score += 100
# Contains match
elif original_lower in result_title or result_title in original_lower:
score += 50
# Year match
if original_year and result.get('year'):
if result['year'] == original_year:
score += 30
else:
year_diff = abs(result['year'] - original_year)
if year_diff <= 1:
score += 20
elif year_diff <= 2:
score += 10
if score > best_score:
best_score = score
best_match = result
return best_match if best_score > 30 else None
def _generate_basic_metadata(self, title: str, year: Optional[int],
content_type: str) -> Dict[str, Any]:
"""Generate basic metadata when external sources are unavailable"""
logger.info(f"Generating basic metadata for: {title}")
return {
'title': title,
'original_title': title,
'year': year,
'content_type': content_type,
'description': f"A {content_type} titled '{title}'" + (f" from {year}" if year else ""),
'rating': 0.0,
'runtime_minutes': 0,
'genres': [],
'cast': [],
'poster_url': None,
'backdrop_url': None,
'source': 'generated',
'metadata_complete': False,
'fetched_at': datetime.utcnow().isoformat()
}
async def apply_metadata_to_content(self, content: VODContent, metadata: Dict):
"""Apply metadata to content object with transaction safety"""
try:
# Update basic fields
if metadata.get('title'):
content.title = metadata['title']
if metadata.get('original_title'):
content.original_title = metadata['original_title']
if metadata.get('description'):
content.description = metadata['description']
if metadata.get('release_year'):
content.release_year = metadata['release_year']
if metadata.get('runtime_minutes'):
content.runtime_minutes = metadata['runtime_minutes']
if metadata.get('imdb_rating'):
content.imdb_rating = float(metadata['imdb_rating'])
if metadata.get('poster_url'):
content.poster_url = metadata['poster_url']
if metadata.get('backdrop_url'):
content.backdrop_url = metadata['backdrop_url']
# External IDs
if metadata.get('imdb_id'):
content.imdb_id = metadata['imdb_id']
if metadata.get('tmdb_id'):
content.tmdb_id = str(metadata['tmdb_id'])
content.updated_at = datetime.utcnow()
# Handle genres
if metadata.get('genres'):
await self._update_content_genres(content, metadata['genres'])
# Handle cast
if metadata.get('cast'):
await self._update_content_cast(content, metadata['cast'])
self.db.commit()
logger.info(f"Updated content {content.id} with metadata from {metadata.get('source')}")
except Exception as e:
logger.error(f"Failed to apply metadata to content {content.id}: {e}")
self.db.rollback()
raise
async def _update_content_genres(self, content: VODContent, genres: List[str]):
"""Update content genres"""
try:
# Remove existing genres
existing_genres = self.db.query(VODContentGenre).filter(
VODContentGenre.content_id == content.id
).all()
for genre_link in existing_genres:
self.db.delete(genre_link)
# Add new genres
for genre_name in genres:
# Find or create genre
genre = self.db.query(VODGenre).filter(
VODGenre.name == genre_name
).first()
if not genre:
genre = VODGenre(
name=genre_name,
description=f"Auto-generated genre: {genre_name}",
color=self._generate_genre_color(genre_name)
)
self.db.add(genre)
self.db.flush()
# Link genre to content
content_genre = VODContentGenre(
content_id=content.id,
genre_id=genre.id
)
self.db.add(content_genre)
except Exception as e:
logger.error(f"Failed to update genres: {e}")
raise
async def _update_content_cast(self, content: VODContent, cast: List[Dict]):
"""Update content cast"""
try:
# Remove existing cast
existing_cast = self.db.query(VODCast).filter(
VODCast.content_id == content.id
).all()
for cast_member in existing_cast:
self.db.delete(cast_member)
# Add new cast
for i, person in enumerate(cast[:20]): # Limit to 20
cast_member = VODCast(
content_id=content.id,
person_name=person['name'],
role_type=person['role'],
character_name=person.get('character'),
order_index=i
)
self.db.add(cast_member)
except Exception as e:
logger.error(f"Failed to update cast: {e}")
raise
def _generate_genre_color(self, genre_name: str) -> str:
"""Generate a color for a genre based on its name"""
colors = {
'action': '#e74c3c',
'adventure': '#f39c12',
'comedy': '#f1c40f',
'drama': '#3498db',
'horror': '#8e44ad',
'thriller': '#e67e22',
'romance': '#e91e63',
'sci-fi': '#1abc9c',
'fantasy': '#9b59b6',
'crime': '#34495e',
'documentary': '#95a5a6',
'family': '#27ae60',
'animation': '#ff6b6b',
'western': '#d35400',
'war': '#7f8c8d'
}
genre_lower = genre_name.lower()
# Find matching color
for key, color in colors.items():
if key in genre_lower:
return color
# Generate hash-based color for unknown genres
hash_obj = hashlib.md5(genre_name.encode())
hex_hash = hash_obj.hexdigest()[:6]
return f"#{hex_hash}"
async def bulk_enrich_content(self, limit: int = 10,
content_type: Optional[str] = None) -> Dict:
"""Enrich multiple content items in batch"""
try:
query = self.db.query(VODContent).filter(
VODContent.imdb_id.is_(None),
VODContent.tmdb_id.is_(None),
VODContent.status == 'draft'
)
if content_type:
query = query.filter(VODContent.content_type == content_type)
contents = query.limit(limit).all()
results = {
'total_processed': 0,
'enriched': 0,
'no_metadata': 0,
'errors': 0,
'details': []
}
for content in contents:
try:
result = await self.enrich_content_metadata(content.id)
results['total_processed'] += 1
if result['status'] == 'enriched':
results['enriched'] += 1
elif result['status'] == 'no_metadata_found':
results['no_metadata'] += 1
elif result['status'] == 'error':
results['errors'] += 1
results['details'].append({
'content_id': content.id,
'title': content.title,
'status': result['status']
})
# Rate limiting
await asyncio.sleep(1)
except Exception as e:
results['errors'] += 1
results['details'].append({
'content_id': content.id,
'title': content.title,
'status': 'error',
'error': str(e)
})
logger.error(f"Failed to enrich content {content.id}: {e}")
return results
except Exception as e:
logger.error(f"Bulk enrich failed: {e}")
return {
'status': 'error',
'error': str(e)
}