""" VOD API - Enhanced with comprehensive error handling and validation """ from fastapi import APIRouter, Depends, HTTPException, status, Query, UploadFile, File, BackgroundTasks from fastapi.responses import FileResponse, StreamingResponse from sqlalchemy.orm import Session, joinedload from sqlalchemy import and_, or_, func from typing import Optional, List, Dict, Any from datetime import datetime, timedelta import os import uuid import json import logging from pydantic import BaseModel, Field, validator import hashlib import asyncio from database import get_db from auth import get_current_user, require_admin from models import User from billing_models import UserSubscription, SubscriptionPlan from vod_models import ( VODContent, VODUserRental, VODGenre, VODContentGenre, VODCast, VODUserWatchHistory, VODUserRating, VODUserWishlist, VODCollection, VODCollectionItem, ContentType, ContentStatus, RentalType, PaymentStatus, VODSubtitle, VODDirectory, VODDirectoryScan ) from rental_system import RentalSystem, PricingEngine from rental_system import ( RentalSystemError, InsufficientCreditsError, ContentNotAvailableError, AlreadyRentedError ) from vod_metadata_service_fixed import VODMetadataService from vod_directory_service_fixed import VODDirectoryService from redis import Redis import aioredis logger = logging.getLogger(__name__) router = APIRouter(prefix="/api/vod", tags=["VOD"]) # Enhanced Pydantic models with validation class VODContentCreate(BaseModel): title: str = Field(..., min_length=1, max_length=255) description: Optional[str] = Field(None, max_length=5000) content_type: ContentType release_year: Optional[int] = Field(None, ge=1900, le=datetime.now().year + 5) runtime_minutes: Optional[int] = Field(None, ge=0, le=1440) language: str = Field("en", min_length=2, max_length=10) country: Optional[str] = Field(None, max_length=100) age_rating: Optional[str] = Field(None, max_length=10) rental_type: RentalType = RentalType.FREE rental_price: float = Field(0.0, ge=0, le=1000) rental_currency: str = Field("EUR", min_length=3, max_length=3) rental_duration_hours: int = Field(48, ge=1, le=720) video_url: Optional[str] = Field(None, max_length=500) trailer_url: Optional[str] = Field(None, max_length=500) poster_url: Optional[str] = Field(None, max_length=500) genre_ids: List[int] = [] tags: List[str] = [] @validator('title') def validate_title(cls, v): if not v or v.strip() == "": raise ValueError('Title cannot be empty') return v.strip() class VODContentUpdate(BaseModel): title: Optional[str] = Field(None, min_length=1, max_length=255) description: Optional[str] = Field(None, max_length=5000) status: Optional[ContentStatus] = None rental_price: Optional[float] = Field(None, ge=0, le=1000) rental_type: Optional[RentalType] = None video_url: Optional[str] = Field(None, max_length=500) trailer_url: Optional[str] = Field(None, max_length=500) poster_url: Optional[str] = Field(None, max_length=500) genre_ids: Optional[List[int]] = None tags: Optional[List[str]] = None class RentalRequest(BaseModel): content_id: int = Field(..., ge=1) payment_method: str = Field("credits", regex="^(credits|bitcoin|card)$") class SubscriptionRequest(BaseModel): plan_id: int = Field(..., ge=1) payment_method: str = Field("credits", regex="^(credits|bitcoin|card)$") class ContentRatingRequest(BaseModel): content_id: int = Field(..., ge=1) rating: float = Field(..., ge=0.0, le=5.0) review: Optional[str] = Field(None, max_length=1000) class WatchProgressUpdate(BaseModel): content_id: int = Field(..., ge=1) watch_time_seconds: int = Field(..., ge=0) total_duration: int = Field(..., ge=0) # Initialize Redis cache redis_client = None def get_redis_client(): global redis_client if not redis_client: try: redis_host = os.getenv('REDIS_HOST', 'localhost') redis_port = int(os.getenv('REDIS_PORT', '6379')) redis_db = int(os.getenv('REDIS_VOD_DB', '4')) redis_client = Redis( host=redis_host, port=redis_port, db=redis_db, decode_responses=True ) redis_client.ping() except Exception as e: logger.warning(f"Redis not available: {e}") redis_client = None return redis_client # Content Management Endpoints @router.get("/content", response_model=List[Dict]) async def list_content( skip: int = Query(0, ge=0), limit: int = Query(100, le=1000), content_type: Optional[ContentType] = None, genre_id: Optional[int] = None, rental_type: Optional[RentalType] = None, search: Optional[str] = Query(None, max_length=100), sort_by: str = Query("created_at", regex="^(created_at|title|release_year|user_rating|view_count)$"), sort_order: str = Query("desc", regex="^(asc|desc)$"), include_unavailable: bool = False, db: Session = Depends(get_db), current_user: User = Depends(get_current_user) ): """List VOD content with filtering, search and caching""" try: # Generate cache key cache_key = f"vod:list:{current_user.id}:{skip}:{limit}:{content_type}:{genre_id}:{rental_type}:{search}:{sort_by}:{sort_order}" cache_client = get_redis_client() # Try cache first if cache_client and not search: # Don't cache search results try: cached_data = cache_client.get(cache_key) if cached_data: logger.info(f"Returning cached content list for user {current_user.id}") return json.loads(cached_data) except Exception as e: logger.warning(f"Cache retrieval error: {e}") # Build query with eager loading query = db.query(VODContent).options( joinedload(VODContent.genres), joinedload(VODContent.cast) ) # Filter by status if not include_unavailable: query = query.filter(VODContent.status == ContentStatus.PUBLISHED) # Apply filters if content_type: query = query.filter(VODContent.content_type == content_type) if rental_type: query = query.filter(or_( VODContent.rental_type == rental_type, VODContent.rental_type == RentalType.BOTH )) if genre_id: query = query.join(VODContentGenre).filter(VODContentGenre.genre_id == genre_id) if search: # Enhanced search across multiple fields search_term = f"%{search}%" query = query.filter(or_( VODContent.title.ilike(search_term), VODContent.description.ilike(search_term), VODContent.original_title.ilike(search_term), VODContent.keywords.contains([search]) )) # Apply sorting sort_column = getattr(VODContent, sort_by) if sort_order == "desc": sort_column = sort_column.desc() query = query.order_by(sort_column) # Pagination total_count = query.count() content_list = query.offset(skip).limit(limit).all() # Format response with access info rental_system = RentalSystem(db) pricing_engine = PricingEngine(db) result = [] for content in content_list: try: access_info = rental_system.check_user_access(current_user, content) dynamic_price = pricing_engine.calculate_dynamic_price(content, current_user) content_data = { "id": content.id, "title": content.title, "description": content.description, "content_type": content.content_type.value if content.content_type else None, "release_year": content.release_year, "runtime_minutes": content.runtime_minutes, "age_rating": content.age_rating, "user_rating": content.user_rating, "view_count": content.view_count, "poster_url": content.poster_url, "trailer_url": content.trailer_url, "rental_type": content.rental_type.value if content.rental_type else None, "base_rental_price": content.rental_price, "dynamic_price": dynamic_price, "currency": content.rental_currency, "access_info": access_info, "genres": [{"id": g.id, "name": g.name} for g in content.genres] if hasattr(content, 'genres') else [], "created_at": content.created_at.isoformat() if content.created_at else None } result.append(content_data) except Exception as e: logger.error(f"Error processing content {content.id}: {e}") continue # Cache result if cache_client and not search and result: try: cache_client.setex(cache_key, 300, json.dumps(result)) # Cache for 5 minutes except Exception as e: logger.warning(f"Cache storage error: {e}") return result except Exception as e: logger.error(f"Error listing content: {e}") raise HTTPException(status_code=500, detail="Failed to retrieve content list") @router.get("/content/{content_id}") async def get_content_details( content_id: int, db: Session = Depends(get_db), current_user: User = Depends(get_current_user) ): """Get detailed content information with error handling""" try: # Query with eager loading content = db.query(VODContent).options( joinedload(VODContent.genres), joinedload(VODContent.cast), joinedload(VODContent.subtitles) ).filter(VODContent.id == content_id).first() if not content: raise HTTPException(status_code=404, detail="Content not found") rental_system = RentalSystem(db) pricing_engine = PricingEngine(db) # Get access information access_info = rental_system.check_user_access(current_user, content) dynamic_price = pricing_engine.calculate_dynamic_price(content, current_user) # Get user's watch history watch_history = db.query(VODUserWatchHistory).filter( VODUserWatchHistory.user_id == current_user.id, VODUserWatchHistory.content_id == content_id ).first() # Get user's rating user_rating = db.query(VODUserRating).filter( VODUserRating.user_id == current_user.id, VODUserRating.content_id == content_id ).first() # Check wishlist status in_wishlist = db.query(VODUserWishlist).filter( VODUserWishlist.user_id == current_user.id, VODUserWishlist.content_id == content_id ).first() is not None # Get similar content similar_content = _get_similar_content(db, content, limit=5) return { "id": content.id, "title": content.title, "original_title": content.original_title, "description": content.description, "synopsis": content.synopsis, "content_type": content.content_type.value if content.content_type else None, "release_year": content.release_year, "runtime_minutes": content.runtime_minutes, "language": content.language, "country": content.country, "age_rating": content.age_rating, "imdb_rating": content.imdb_rating, "user_rating": content.user_rating, "view_count": content.view_count, "poster_url": content.poster_url, "backdrop_url": content.backdrop_url, "trailer_url": content.trailer_url, "video_quality": content.video_quality, "audio_languages": content.audio_languages, "rental_type": content.rental_type.value if content.rental_type else None, "base_rental_price": content.rental_price, "dynamic_price": dynamic_price, "currency": content.rental_currency, "rental_duration_hours": content.rental_duration_hours, "genres": [{"id": g.id, "name": g.name, "color": g.color} for g in content.genres], "cast": [{"name": c.person_name, "role": c.role_type, "character": c.character_name} for c in content.cast], "subtitles": [ { "id": s.id, "language": s.language, "language_name": s.language_name, "format": s.format } for s in content.subtitles ] if hasattr(content, 'subtitles') else [], "access_info": access_info, "user_data": { "watch_progress": { "watch_time_seconds": watch_history.watch_time_seconds if watch_history else 0, "completion_percentage": watch_history.completion_percentage if watch_history else 0.0, "is_completed": watch_history.is_completed if watch_history else False, "last_watched": watch_history.last_watched_at.isoformat() if watch_history else None } if watch_history else None, "user_rating": user_rating.rating if user_rating else None, "user_review": user_rating.review if user_rating else None, "in_wishlist": in_wishlist }, "similar_content": similar_content, "available_from": content.available_from.isoformat() if content.available_from else None, "available_until": content.available_until.isoformat() if content.available_until else None, "published_at": content.published_at.isoformat() if content.published_at else None } except HTTPException: raise except Exception as e: logger.error(f"Error getting content details: {e}") raise HTTPException(status_code=500, detail="Failed to retrieve content details") def _get_similar_content(db: Session, content: VODContent, limit: int = 5) -> List[Dict]: """Get similar content based on genres and metadata""" try: similar = [] # Get content with same genres if content.genres: genre_ids = [g.id for g in content.genres] query = db.query(VODContent).join(VODContentGenre).filter( and_( VODContentGenre.genre_id.in_(genre_ids), VODContent.id != content.id, VODContent.status == ContentStatus.PUBLISHED ) ).limit(limit) for similar_content in query.all(): similar.append({ "id": similar_content.id, "title": similar_content.title, "poster_url": similar_content.poster_url, "release_year": similar_content.release_year, "user_rating": similar_content.user_rating }) return similar except Exception as e: logger.error(f"Error getting similar content: {e}") return [] @router.post("/content", dependencies=[Depends(require_admin)]) async def create_content( content_data: VODContentCreate, background_tasks: BackgroundTasks, db: Session = Depends(get_db), current_user: User = Depends(get_current_user) ): """Create new VOD content with validation (Admin only)""" try: # Generate unique slug slug = content_data.title.lower().replace(" ", "-") slug = re.sub(r'[^a-z0-9-]', '', slug) slug = f"{slug}-{uuid.uuid4().hex[:8]}" content = VODContent( title=content_data.title, description=content_data.description, content_type=content_data.content_type, status=ContentStatus.DRAFT, release_year=content_data.release_year, runtime_minutes=content_data.runtime_minutes, language=content_data.language, country=content_data.country, age_rating=content_data.age_rating, rental_type=content_data.rental_type, rental_price=content_data.rental_price, rental_currency=content_data.rental_currency, rental_duration_hours=content_data.rental_duration_hours, video_url=content_data.video_url, trailer_url=content_data.trailer_url, poster_url=content_data.poster_url, keywords=content_data.tags, slug=slug, uploaded_by=current_user.id ) db.add(content) db.flush() # Get the ID # Add genres for genre_id in content_data.genre_ids: genre_link = VODContentGenre( content_id=content.id, genre_id=genre_id ) db.add(genre_link) db.commit() db.refresh(content) # Schedule metadata enrichment in background if content_data.title: background_tasks.add_task( enrich_content_metadata, content.id, db ) # Clear content list cache cache_client = get_redis_client() if cache_client: try: for key in cache_client.scan_iter("vod:list:*"): cache_client.delete(key) except Exception: pass return { "id": content.id, "title": content.title, "slug": content.slug, "status": "created" } except Exception as e: db.rollback() logger.error(f"Error creating content: {e}") raise HTTPException(status_code=500, detail="Failed to create content") async def enrich_content_metadata(content_id: int, db: Session): """Background task to enrich content metadata""" try: metadata_service = VODMetadataService(db) await metadata_service.enrich_content_metadata(content_id) except Exception as e: logger.error(f"Failed to enrich metadata for content {content_id}: {e}") @router.put("/content/{content_id}", dependencies=[Depends(require_admin)]) async def update_content( content_id: int, content_data: VODContentUpdate, db: Session = Depends(get_db) ): """Update VOD content with validation (Admin only)""" try: content = db.query(VODContent).filter(VODContent.id == content_id).first() if not content: raise HTTPException(status_code=404, detail="Content not found") # Update fields for field, value in content_data.dict(exclude_unset=True).items(): if field == 'genre_ids' and value is not None: # Update genres db.query(VODContentGenre).filter( VODContentGenre.content_id == content_id ).delete() for genre_id in value: genre_link = VODContentGenre( content_id=content_id, genre_id=genre_id ) db.add(genre_link) elif field == 'tags' and value is not None: content.keywords = value elif field != 'genre_ids' and field != 'tags': setattr(content, field, value) content.updated_at = datetime.utcnow() # If publishing, set published_at if content_data.status == ContentStatus.PUBLISHED and not content.published_at: content.published_at = datetime.utcnow() db.commit() db.refresh(content) # Clear caches cache_client = get_redis_client() if cache_client: try: for key in cache_client.scan_iter("vod:*"): cache_client.delete(key) except Exception: pass return {"id": content.id, "status": "updated"} except HTTPException: raise except Exception as e: db.rollback() logger.error(f"Error updating content: {e}") raise HTTPException(status_code=500, detail="Failed to update content") @router.delete("/content/{content_id}", dependencies=[Depends(require_admin)]) async def delete_content( content_id: int, db: Session = Depends(get_db) ): """Delete VOD content (Admin only)""" try: content = db.query(VODContent).filter(VODContent.id == content_id).first() if not content: raise HTTPException(status_code=404, detail="Content not found") # Delete related data db.query(VODContentGenre).filter(VODContentGenre.content_id == content_id).delete() db.query(VODCast).filter(VODCast.content_id == content_id).delete() db.query(VODSubtitle).filter(VODSubtitle.content_id == content_id).delete() db.query(VODUserRating).filter(VODUserRating.content_id == content_id).delete() db.query(VODUserWishlist).filter(VODUserWishlist.content_id == content_id).delete() db.query(VODUserWatchHistory).filter(VODUserWatchHistory.content_id == content_id).delete() db.delete(content) db.commit() # Clear caches cache_client = get_redis_client() if cache_client: try: for key in cache_client.scan_iter("vod:*"): cache_client.delete(key) except Exception: pass return {"status": "deleted"} except HTTPException: raise except Exception as e: db.rollback() logger.error(f"Error deleting content: {e}") raise HTTPException(status_code=500, detail="Failed to delete content") # Rental System Endpoints with enhanced error handling @router.post("/rent") async def rent_content( rental_request: RentalRequest, db: Session = Depends(get_db), current_user: User = Depends(get_current_user) ): """Rent VOD content with comprehensive error handling""" try: # Validate content exists content = db.query(VODContent).filter( VODContent.id == rental_request.content_id ).first() if not content: raise HTTPException(status_code=404, detail="Content not found") if content.status != ContentStatus.PUBLISHED: raise HTTPException(status_code=400, detail="Content is not available") rental_system = RentalSystem(db) rental = rental_system.rent_content( current_user, rental_request.content_id, rental_request.payment_method ) return { "rental_id": rental.id, "content_id": rental.content_id, "price": rental.rental_price, "currency": rental.currency, "expires_at": rental.expires_at.isoformat(), "payment_status": rental.payment_status.value if rental.payment_status else None, "transaction_id": rental.transaction_id } except InsufficientCreditsError as e: raise HTTPException(status_code=402, detail=str(e)) except AlreadyRentedError as e: raise HTTPException(status_code=400, detail=str(e)) except ContentNotAvailableError as e: raise HTTPException(status_code=400, detail=str(e)) except RentalSystemError as e: logger.error(f"Rental system error: {e}") raise HTTPException(status_code=500, detail="Failed to process rental") except HTTPException: raise except Exception as e: logger.error(f"Unexpected error during rental: {e}") raise HTTPException(status_code=500, detail="Failed to process rental") # Watch Progress Tracking @router.post("/watch-progress") async def update_watch_progress( progress: WatchProgressUpdate, db: Session = Depends(get_db), current_user: User = Depends(get_current_user) ): """Update user's watch progress""" try: # Verify access content = db.query(VODContent).filter( VODContent.id == progress.content_id ).first() if not content: raise HTTPException(status_code=404, detail="Content not found") rental_system = RentalSystem(db) access_info = rental_system.check_user_access(current_user, content) if not access_info["has_access"]: raise HTTPException(status_code=403, detail="Access denied") # Update or create watch history watch_history = db.query(VODUserWatchHistory).filter( VODUserWatchHistory.user_id == current_user.id, VODUserWatchHistory.content_id == progress.content_id ).first() completion_percentage = (progress.watch_time_seconds / progress.total_duration * 100) if progress.total_duration > 0 else 0 is_completed = completion_percentage >= 90 # Consider 90% as completed if watch_history: watch_history.watch_time_seconds = progress.watch_time_seconds watch_history.completion_percentage = completion_percentage watch_history.is_completed = is_completed watch_history.last_watched_at = datetime.utcnow() else: watch_history = VODUserWatchHistory( user_id=current_user.id, content_id=progress.content_id, watch_time_seconds=progress.watch_time_seconds, completion_percentage=completion_percentage, is_completed=is_completed, last_watched_at=datetime.utcnow() ) db.add(watch_history) db.commit() return { "status": "updated", "completion_percentage": completion_percentage, "is_completed": is_completed } except HTTPException: raise except Exception as e: logger.error(f"Error updating watch progress: {e}") raise HTTPException(status_code=500, detail="Failed to update watch progress") # Directory Management Endpoints @router.get("/directories", dependencies=[Depends(require_admin)]) async def list_directories( db: Session = Depends(get_db) ): """List VOD directories (Admin only)""" try: directory_service = VODDirectoryService(db) directories = db.query(VODDirectory).all() result = [] for directory in directories: status = directory_service.get_directory_status(directory.id) result.append(status['directory']) return result except Exception as e: logger.error(f"Error listing directories: {e}") raise HTTPException(status_code=500, detail="Failed to list directories") @router.post("/directories/scan/{directory_id}", dependencies=[Depends(require_admin)]) async def scan_directory( directory_id: int, background_tasks: BackgroundTasks, force: bool = False, deep_scan: bool = False, db: Session = Depends(get_db) ): """Trigger directory scan (Admin only)""" try: directory_service = VODDirectoryService(db) # Validate directory exists directory = db.query(VODDirectory).filter( VODDirectory.id == directory_id ).first() if not directory: raise HTTPException(status_code=404, detail="Directory not found") # Start scan in background background_tasks.add_task( run_directory_scan, directory_id, force, deep_scan, db ) return { "status": "scan_started", "directory_id": directory_id, "directory_name": directory.name } except HTTPException: raise except Exception as e: logger.error(f"Error starting directory scan: {e}") raise HTTPException(status_code=500, detail="Failed to start directory scan") def run_directory_scan(directory_id: int, force: bool, deep_scan: bool, db: Session): """Background task to scan directory""" try: directory_service = VODDirectoryService(db) scan = directory_service.scan_directory(directory_id, force, deep_scan) logger.info(f"Directory scan completed: {scan.id}") except Exception as e: logger.error(f"Directory scan failed: {e}") # Streaming endpoint with HLS support @router.get("/stream/{content_id}/playlist.m3u8") async def get_hls_playlist( content_id: int, db: Session = Depends(get_db), current_user: User = Depends(get_current_user) ): """Get HLS playlist for content streaming""" try: content = db.query(VODContent).filter(VODContent.id == content_id).first() if not content: raise HTTPException(status_code=404, detail="Content not found") # Check user access rental_system = RentalSystem(db) access_info = rental_system.check_user_access(current_user, content) if not access_info["has_access"]: raise HTTPException(status_code=403, detail="Access denied") if not content.video_url: raise HTTPException(status_code=404, detail="Video file not available") # Update view count content.view_count = (content.view_count or 0) + 1 db.commit() # Generate HLS playlist playlist = generate_hls_playlist(content, current_user) return StreamingResponse( io.StringIO(playlist), media_type="application/x-mpegURL", headers={ "Cache-Control": "no-cache", "X-Content-Duration": str(content.duration_seconds or 0) } ) except HTTPException: raise except Exception as e: logger.error(f"Error generating HLS playlist: {e}") raise HTTPException(status_code=500, detail="Failed to generate streaming playlist") def generate_hls_playlist(content: VODContent, user: User) -> str: """Generate HLS playlist for content""" # This is a simplified example - actual implementation would use FFmpeg # to transcode the video file to HLS format playlist = """#EXTM3U #EXT-X-VERSION:3 #EXT-X-TARGETDURATION:10 #EXT-X-MEDIA-SEQUENCE:0 #EXT-X-PLAYLIST-TYPE:VOD """ # Generate secure token for segments token = hashlib.sha256( f"{content.id}:{user.id}:{datetime.utcnow().isoformat()}".encode() ).hexdigest() # Add segments (simplified - actual implementation would read from transcoded files) segment_duration = 10 # seconds total_duration = content.duration_seconds or 0 num_segments = (total_duration // segment_duration) + 1 for i in range(num_segments): duration = min(segment_duration, total_duration - (i * segment_duration)) if duration > 0: playlist += f"#EXTINF:{duration:.3f},\n" playlist += f"/api/vod/stream/{content.id}/segment{i}.ts?token={token}\n" playlist += "#EXT-X-ENDLIST\n" return playlist # Health check endpoint @router.get("/health") async def health_check(): """VOD service health check""" try: # Check Redis redis_status = "healthy" cache_client = get_redis_client() if cache_client: try: cache_client.ping() except Exception: redis_status = "unhealthy" else: redis_status = "unavailable" return { "status": "healthy", "redis": redis_status, "timestamp": datetime.utcnow().isoformat() } except Exception as e: logger.error(f"Health check failed: {e}") return { "status": "unhealthy", "error": str(e), "timestamp": datetime.utcnow().isoformat() }