""" VOD Metadata Service - Enhanced with proper error handling and caching """ import asyncio import aiohttp import json import re import logging import hashlib import os from typing import Dict, List, Optional, Union from datetime import datetime, timedelta from urllib.parse import quote_plus, urljoin from bs4 import BeautifulSoup import requests from sqlalchemy.orm import Session from redis import Redis from vod_models import VODContent, VODGenre, VODContentGenre, VODCast, ContentType logger = logging.getLogger(__name__) class MetadataProvider: """Base class for metadata providers with enhanced error handling""" def __init__(self, config: Dict = None): self.config = config or {} self.session = None self.timeout = aiohttp.ClientTimeout(total=10) self.retry_count = 3 async def __aenter__(self): self.session = aiohttp.ClientSession(timeout=self.timeout) return self async def __aexit__(self, exc_type, exc_val, exc_tb): if self.session: await self.session.close() async def search(self, title: str, year: Optional[int] = None, content_type: str = 'movie') -> List[Dict]: """Search for content by title""" raise NotImplementedError async def get_details(self, external_id: str) -> Optional[Dict]: """Get detailed information by external ID""" raise NotImplementedError async def _make_request(self, url: str, params: Dict = None, headers: Dict = None) -> Optional[Dict]: """Make HTTP request with retry logic""" for attempt in range(self.retry_count): try: async with self.session.get(url, params=params, headers=headers) as response: if response.status == 200: if 'application/json' in response.headers.get('Content-Type', ''): return await response.json() else: return {'text': await response.text()} elif response.status == 429: # Rate limit logger.warning(f"Rate limit hit, waiting {2 ** attempt} seconds...") await asyncio.sleep(2 ** attempt) elif response.status == 401: logger.error("Authentication failed - check API key") return None else: logger.warning(f"Request failed with status {response.status}") return None except asyncio.TimeoutError: logger.warning(f"Request timeout on attempt {attempt + 1}") if attempt < self.retry_count - 1: await asyncio.sleep(1) except Exception as e: logger.error(f"Request error: {e}") if attempt < self.retry_count - 1: await asyncio.sleep(1) return None class IMDBProvider(MetadataProvider): """IMDB metadata provider with enhanced scraping""" def __init__(self, config: Dict = None): super().__init__(config) self.base_url = "https://www.imdb.com" self.search_url = "https://www.imdb.com/find" self.headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', 'Accept-Language': 'en-US,en;q=0.5', 'Accept-Encoding': 'gzip, deflate', 'Connection': 'keep-alive', } async def search(self, title: str, year: Optional[int] = None, content_type: str = 'movie') -> List[Dict]: """Search IMDB for content""" try: query = f"{title} {year}" if year else title params = {'q': query, 'ref_': 'nv_sr_sm'} result = await self._make_request(self.search_url, params=params, headers=self.headers) if result and 'text' in result: return self._parse_search_results(result['text'], content_type) return [] except Exception as e: logger.error(f"IMDB search error: {e}") return [] def _parse_search_results(self, html: str, content_type: str) -> List[Dict]: """Parse IMDB search results safely""" results = [] try: soup = BeautifulSoup(html, 'html.parser') result_sections = soup.find_all('section', {'data-testid': 'find-results-section-title'}) for section in result_sections: items = section.find_all('li', class_='find-result-item') for item in items[:5]: try: result = self._parse_search_item(item, content_type) if result: results.append(result) except Exception as e: logger.debug(f"Failed to parse search item: {e}") continue except Exception as e: logger.error(f"Failed to parse IMDB search results: {e}") return results def _parse_search_item(self, item, content_type: str) -> Optional[Dict]: """Parse individual search result item""" try: link_elem = item.find('a') if not link_elem: return None href = link_elem.get('href', '') imdb_id = self._extract_imdb_id(href) if not imdb_id: return None title_elem = link_elem.find('img') title = title_elem.get('alt', '') if title_elem else link_elem.get_text(strip=True) year = None result_text = item.get_text() year_match = re.search(r'\((\d{4})\)', result_text) if year_match: year = int(year_match.group(1)) type_info = self._determine_content_type(result_text, href) return { 'imdb_id': imdb_id, 'title': title, 'year': year, 'type': type_info, 'url': urljoin(self.base_url, href) } except Exception as e: logger.debug(f"Failed to parse search item: {e}") return None def _extract_imdb_id(self, href: str) -> Optional[str]: """Extract IMDB ID from href""" match = re.search(r'/title/(tt\d+)/', href) return match.group(1) if match else None def _determine_content_type(self, text: str, href: str) -> str: """Determine content type from search result""" text_lower = text.lower() if 'tv series' in text_lower or 'tv mini' in text_lower: return 'tv_series' elif 'episode' in text_lower: return 'episode' elif 'documentary' in text_lower: return 'documentary' else: return 'movie' async def get_details(self, imdb_id: str) -> Optional[Dict]: """Get detailed information from IMDB""" try: url = f"{self.base_url}/title/{imdb_id}/" result = await self._make_request(url, headers=self.headers) if result and 'text' in result: return self._parse_details(result['text'], imdb_id) return None except Exception as e: logger.error(f"IMDB details error: {e}") return None def _parse_details(self, html: str, imdb_id: str) -> Optional[Dict]: """Parse IMDB title page for detailed information""" try: soup = BeautifulSoup(html, 'html.parser') details = { 'imdb_id': imdb_id, 'source': 'imdb' } # Title title_elem = soup.find('h1', {'data-testid': 'hero-title-block__title'}) if title_elem: details['title'] = title_elem.get_text(strip=True) # Year year_elem = soup.find('a', href=re.compile(r'releaseinfo')) if year_elem: year_text = year_elem.get_text(strip=True) year_match = re.search(r'(\d{4})', year_text) if year_match: details['release_year'] = int(year_match.group(1)) # Rating rating_elem = soup.find('span', class_='sc-7ab21ed2-1') if rating_elem: try: details['imdb_rating'] = float(rating_elem.get_text(strip=True)) except ValueError: pass # Runtime runtime_elem = soup.find('li', {'data-testid': 'title-techspec_runtime'}) if runtime_elem: runtime_text = runtime_elem.get_text(strip=True) runtime_match = re.search(r'(\d+)\s*min', runtime_text) if runtime_match: details['runtime_minutes'] = int(runtime_match.group(1)) # Genres genres = [] genre_elems = soup.find_all('a', href=re.compile(r'/search/title.*genre')) for elem in genre_elems: genre = elem.get_text(strip=True) if genre and genre not in genres: genres.append(genre) details['genres'] = genres[:5] # Plot plot_elem = soup.find('span', {'data-testid': 'plot-xl'}) if plot_elem: details['description'] = plot_elem.get_text(strip=True) # Cast cast = [] cast_section = soup.find('section', {'data-testid': 'title-cast'}) if cast_section: cast_items = cast_section.find_all('div', {'data-testid': 'title-cast-item'}) for item in cast_items[:10]: name_elem = item.find('a', {'data-testid': 'title-cast-item__actor'}) if name_elem: name = name_elem.get_text(strip=True) char_elem = item.find('a', {'data-testid': 'cast-item-characters-link'}) character = char_elem.get_text(strip=True) if char_elem else None cast.append({ 'name': name, 'role': 'actor', 'character': character }) details['cast'] = cast # Poster poster_elem = soup.find('img', class_='ipc-image') if poster_elem and poster_elem.get('src'): poster_url = poster_elem['src'] poster_url = re.sub(r'_V1_.*?\.jpg', '_V1_.jpg', poster_url) details['poster_url'] = poster_url return details except Exception as e: logger.error(f"Failed to parse IMDB details: {e}") return None class TMDBProvider(MetadataProvider): """The Movie Database (TMDB) provider with proper API handling""" def __init__(self, config: Dict = None): super().__init__(config) self.api_key = config.get('tmdb_api_key') if config else os.getenv('TMDB_API_KEY', '') self.base_url = "https://api.themoviedb.org/3" self.image_base_url = "https://image.tmdb.org/t/p/w500" if not self.api_key: logger.warning("TMDB API key not configured - provider disabled") async def search(self, title: str, year: Optional[int] = None, content_type: str = 'movie') -> List[Dict]: """Search TMDB for content""" if not self.api_key: return [] try: endpoint = '/search/movie' if content_type == 'movie' else '/search/tv' url = f"{self.base_url}{endpoint}" params = { 'api_key': self.api_key, 'query': title, 'language': 'en-US' } if year: if content_type == 'movie': params['year'] = year else: params['first_air_date_year'] = year result = await self._make_request(url, params=params) if result: return self._parse_search_results(result, content_type) return [] except Exception as e: logger.error(f"TMDB search error: {e}") return [] def _parse_search_results(self, data: Dict, content_type: str) -> List[Dict]: """Parse TMDB search results""" results = [] try: for item in data.get('results', [])[:5]: result = { 'tmdb_id': item['id'], 'source': 'tmdb', 'type': content_type } if content_type == 'movie': result['title'] = item.get('title', '') result['original_title'] = item.get('original_title', '') if item.get('release_date'): try: result['year'] = int(item['release_date'][:4]) except (ValueError, IndexError): pass else: result['title'] = item.get('name', '') result['original_title'] = item.get('original_name', '') if item.get('first_air_date'): try: result['year'] = int(item['first_air_date'][:4]) except (ValueError, IndexError): pass result['description'] = item.get('overview', '') if item.get('poster_path'): result['poster_url'] = f"{self.image_base_url}{item['poster_path']}" results.append(result) except Exception as e: logger.error(f"Failed to parse TMDB search results: {e}") return results async def get_details(self, tmdb_id: Union[str, int]) -> Optional[Dict]: """Get detailed information from TMDB""" if not self.api_key: return None try: for endpoint in ['/movie/', '/tv/']: url = f"{self.base_url}{endpoint}{tmdb_id}" params = { 'api_key': self.api_key, 'language': 'en-US', 'append_to_response': 'credits,keywords,images' } result = await self._make_request(url, params=params) if result: return self._parse_details(result, endpoint.strip('/')) return None except Exception as e: logger.error(f"TMDB details error: {e}") return None def _parse_details(self, data: Dict, content_type: str) -> Dict: """Parse TMDB detailed response""" details = { 'tmdb_id': data['id'], 'source': 'tmdb', 'type': content_type } try: # Basic info if content_type == 'movie': details['title'] = data.get('title', '') details['original_title'] = data.get('original_title', '') if data.get('release_date'): try: details['release_year'] = int(data['release_date'][:4]) except (ValueError, IndexError): pass else: details['title'] = data.get('name', '') details['original_title'] = data.get('original_name', '') if data.get('first_air_date'): try: details['release_year'] = int(data['first_air_date'][:4]) except (ValueError, IndexError): pass details['description'] = data.get('overview', '') details['imdb_rating'] = data.get('vote_average') if content_type == 'movie' and data.get('runtime'): details['runtime_minutes'] = data['runtime'] # Genres genres = [] for genre in data.get('genres', []): genres.append(genre['name']) details['genres'] = genres # Images if data.get('poster_path'): details['poster_url'] = f"{self.image_base_url}{data['poster_path']}" if data.get('backdrop_path'): details['backdrop_url'] = f"https://image.tmdb.org/t/p/w1280{data['backdrop_path']}" # Cast and crew cast = [] credits = data.get('credits', {}) for person in credits.get('cast', [])[:10]: cast.append({ 'name': person['name'], 'role': 'actor', 'character': person.get('character') }) for person in credits.get('crew', []): if person.get('job') in ['Director', 'Producer', 'Writer']: cast.append({ 'name': person['name'], 'role': person['job'].lower(), 'character': None }) details['cast'] = cast except Exception as e: logger.error(f"Error parsing TMDB details: {e}") return details class VODMetadataService: """Enhanced VOD Metadata Service with caching and fallback""" def __init__(self, db: Session, config: Dict = None): self.db = db self.config = config or {} # Initialize Redis cache self.redis_client = self._init_redis() self.cache_ttl = 3600 # 1 hour # Initialize providers self.imdb_provider = IMDBProvider(config) self.tmdb_provider = TMDBProvider(config) # Priority order self.providers = [] if self.tmdb_provider.api_key: self.providers.append(self.tmdb_provider) self.providers.append(self.imdb_provider) logger.info(f"VOD Metadata Service initialized with {len(self.providers)} providers") def _init_redis(self) -> Optional[Redis]: """Initialize Redis connection for caching""" try: redis_host = os.getenv('REDIS_HOST', 'localhost') redis_port = int(os.getenv('REDIS_PORT', '6379')) redis_db = int(os.getenv('REDIS_METADATA_DB', '3')) client = Redis( host=redis_host, port=redis_port, db=redis_db, decode_responses=True, socket_connect_timeout=5, socket_timeout=5 ) client.ping() logger.info("Redis cache connected for metadata") return client except Exception as e: logger.warning(f"Redis not available for metadata caching: {e}") return None def _get_cache_key(self, title: str, year: Optional[int], content_type: str) -> str: """Generate cache key for metadata""" key_parts = [title.lower(), str(year) if year else 'none', content_type] key_string = ':'.join(key_parts) return f"vod:metadata:{hashlib.md5(key_string.encode()).hexdigest()}" async def _get_cached_metadata(self, cache_key: str) -> Optional[Dict]: """Get metadata from cache if available""" if not self.redis_client: return None try: cached_data = self.redis_client.get(cache_key) if cached_data: return json.loads(cached_data) except Exception as e: logger.debug(f"Cache retrieval error: {e}") return None async def _set_cached_metadata(self, cache_key: str, metadata: Dict): """Set metadata in cache""" if not self.redis_client: return try: self.redis_client.setex(cache_key, self.cache_ttl, json.dumps(metadata)) except Exception as e: logger.debug(f"Cache storage error: {e}") async def enrich_content_metadata(self, content_id: int) -> Dict: """Enrich content with metadata from external sources""" try: content = self.db.query(VODContent).filter( VODContent.id == content_id ).first() if not content: raise ValueError(f"Content not found: {content_id}") # Skip if already has external metadata if content.imdb_id or content.tmdb_id: logger.info(f"Content {content_id} already has metadata") return {'status': 'already_enriched'} # Search for metadata metadata = await self.search_metadata( content.title, content.release_year, content.content_type ) if not metadata: logger.warning(f"No metadata found for content: {content.title}") return {'status': 'no_metadata_found'} # Update content with metadata await self.apply_metadata_to_content(content, metadata) return { 'status': 'enriched', 'source': metadata.get('source'), 'title': metadata.get('title') } except Exception as e: logger.error(f"Failed to enrich content {content_id}: {e}") return {'status': 'error', 'error': str(e)} async def search_metadata(self, title: str, year: Optional[int] = None, content_type: str = 'movie') -> Optional[Dict]: """Search for metadata across providers with caching""" # Check cache first cache_key = self._get_cache_key(title, year, content_type) cached_metadata = await self._get_cached_metadata(cache_key) if cached_metadata: logger.info(f"Metadata found in cache for: {title}") return cached_metadata # Map content types search_type = content_type if content_type in [ContentType.TV_SERIES, ContentType.EPISODE]: search_type = 'tv_series' elif content_type in [ContentType.MOVIE, ContentType.DOCUMENTARY]: search_type = 'movie' # Search across providers for provider in self.providers: try: async with provider: results = await provider.search(title, year, search_type) if not results: continue # Get details for best match best_match = self._find_best_match(results, title, year) if best_match: if hasattr(provider, 'get_details'): external_id = best_match.get('imdb_id') or best_match.get('tmdb_id') if external_id: details = await provider.get_details(external_id) if details: # Cache the result await self._set_cached_metadata(cache_key, details) return details # Cache and return search result if no detailed info await self._set_cached_metadata(cache_key, best_match) return best_match except Exception as e: logger.error(f"Provider {provider.__class__.__name__} failed: {e}") continue # Generate basic metadata as fallback basic_metadata = self._generate_basic_metadata(title, year, content_type) await self._set_cached_metadata(cache_key, basic_metadata) return basic_metadata def _find_best_match(self, results: List[Dict], original_title: str, original_year: Optional[int] = None) -> Optional[Dict]: """Find best matching result from search results""" if not results: return None best_score = 0 best_match = None for result in results: score = 0 # Title similarity result_title = result.get('title', '').lower() original_lower = original_title.lower() # Exact match if result_title == original_lower: score += 100 # Contains match elif original_lower in result_title or result_title in original_lower: score += 50 # Year match if original_year and result.get('year'): if result['year'] == original_year: score += 30 else: year_diff = abs(result['year'] - original_year) if year_diff <= 1: score += 20 elif year_diff <= 2: score += 10 if score > best_score: best_score = score best_match = result return best_match if best_score > 30 else None def _generate_basic_metadata(self, title: str, year: Optional[int], content_type: str) -> Dict[str, Any]: """Generate basic metadata when external sources are unavailable""" logger.info(f"Generating basic metadata for: {title}") return { 'title': title, 'original_title': title, 'year': year, 'content_type': content_type, 'description': f"A {content_type} titled '{title}'" + (f" from {year}" if year else ""), 'rating': 0.0, 'runtime_minutes': 0, 'genres': [], 'cast': [], 'poster_url': None, 'backdrop_url': None, 'source': 'generated', 'metadata_complete': False, 'fetched_at': datetime.utcnow().isoformat() } async def apply_metadata_to_content(self, content: VODContent, metadata: Dict): """Apply metadata to content object with transaction safety""" try: # Update basic fields if metadata.get('title'): content.title = metadata['title'] if metadata.get('original_title'): content.original_title = metadata['original_title'] if metadata.get('description'): content.description = metadata['description'] if metadata.get('release_year'): content.release_year = metadata['release_year'] if metadata.get('runtime_minutes'): content.runtime_minutes = metadata['runtime_minutes'] if metadata.get('imdb_rating'): content.imdb_rating = float(metadata['imdb_rating']) if metadata.get('poster_url'): content.poster_url = metadata['poster_url'] if metadata.get('backdrop_url'): content.backdrop_url = metadata['backdrop_url'] # External IDs if metadata.get('imdb_id'): content.imdb_id = metadata['imdb_id'] if metadata.get('tmdb_id'): content.tmdb_id = str(metadata['tmdb_id']) content.updated_at = datetime.utcnow() # Handle genres if metadata.get('genres'): await self._update_content_genres(content, metadata['genres']) # Handle cast if metadata.get('cast'): await self._update_content_cast(content, metadata['cast']) self.db.commit() logger.info(f"Updated content {content.id} with metadata from {metadata.get('source')}") except Exception as e: logger.error(f"Failed to apply metadata to content {content.id}: {e}") self.db.rollback() raise async def _update_content_genres(self, content: VODContent, genres: List[str]): """Update content genres""" try: # Remove existing genres existing_genres = self.db.query(VODContentGenre).filter( VODContentGenre.content_id == content.id ).all() for genre_link in existing_genres: self.db.delete(genre_link) # Add new genres for genre_name in genres: # Find or create genre genre = self.db.query(VODGenre).filter( VODGenre.name == genre_name ).first() if not genre: genre = VODGenre( name=genre_name, description=f"Auto-generated genre: {genre_name}", color=self._generate_genre_color(genre_name) ) self.db.add(genre) self.db.flush() # Link genre to content content_genre = VODContentGenre( content_id=content.id, genre_id=genre.id ) self.db.add(content_genre) except Exception as e: logger.error(f"Failed to update genres: {e}") raise async def _update_content_cast(self, content: VODContent, cast: List[Dict]): """Update content cast""" try: # Remove existing cast existing_cast = self.db.query(VODCast).filter( VODCast.content_id == content.id ).all() for cast_member in existing_cast: self.db.delete(cast_member) # Add new cast for i, person in enumerate(cast[:20]): # Limit to 20 cast_member = VODCast( content_id=content.id, person_name=person['name'], role_type=person['role'], character_name=person.get('character'), order_index=i ) self.db.add(cast_member) except Exception as e: logger.error(f"Failed to update cast: {e}") raise def _generate_genre_color(self, genre_name: str) -> str: """Generate a color for a genre based on its name""" colors = { 'action': '#e74c3c', 'adventure': '#f39c12', 'comedy': '#f1c40f', 'drama': '#3498db', 'horror': '#8e44ad', 'thriller': '#e67e22', 'romance': '#e91e63', 'sci-fi': '#1abc9c', 'fantasy': '#9b59b6', 'crime': '#34495e', 'documentary': '#95a5a6', 'family': '#27ae60', 'animation': '#ff6b6b', 'western': '#d35400', 'war': '#7f8c8d' } genre_lower = genre_name.lower() # Find matching color for key, color in colors.items(): if key in genre_lower: return color # Generate hash-based color for unknown genres hash_obj = hashlib.md5(genre_name.encode()) hex_hash = hash_obj.hexdigest()[:6] return f"#{hex_hash}" async def bulk_enrich_content(self, limit: int = 10, content_type: Optional[str] = None) -> Dict: """Enrich multiple content items in batch""" try: query = self.db.query(VODContent).filter( VODContent.imdb_id.is_(None), VODContent.tmdb_id.is_(None), VODContent.status == 'draft' ) if content_type: query = query.filter(VODContent.content_type == content_type) contents = query.limit(limit).all() results = { 'total_processed': 0, 'enriched': 0, 'no_metadata': 0, 'errors': 0, 'details': [] } for content in contents: try: result = await self.enrich_content_metadata(content.id) results['total_processed'] += 1 if result['status'] == 'enriched': results['enriched'] += 1 elif result['status'] == 'no_metadata_found': results['no_metadata'] += 1 elif result['status'] == 'error': results['errors'] += 1 results['details'].append({ 'content_id': content.id, 'title': content.title, 'status': result['status'] }) # Rate limiting await asyncio.sleep(1) except Exception as e: results['errors'] += 1 results['details'].append({ 'content_id': content.id, 'title': content.title, 'status': 'error', 'error': str(e) }) logger.error(f"Failed to enrich content {content.id}: {e}") return results except Exception as e: logger.error(f"Bulk enrich failed: {e}") return { 'status': 'error', 'error': str(e) }