Release v2.7.4 - Critical VOD System Fixes
- Fixed SQLAlchemy import issues in VOD models - Fixed TMDB/OMDB API authentication and rate limiting - Fixed VOD directory path resolution and permission errors - Fixed rental system transaction handling - Added HLS streaming support for VOD content - Implemented Redis caching for performance - Added watch progress tracking - Enhanced search with multi-field support - Added health check endpoint This patch resolves critical production issues in the VOD system.
This commit is contained in:
@@ -0,0 +1,851 @@
|
||||
"""
|
||||
VOD API - Enhanced with comprehensive error handling and validation
|
||||
"""
|
||||
from fastapi import APIRouter, Depends, HTTPException, status, Query, UploadFile, File, BackgroundTasks
|
||||
from fastapi.responses import FileResponse, StreamingResponse
|
||||
from sqlalchemy.orm import Session, joinedload
|
||||
from sqlalchemy import and_, or_, func
|
||||
from typing import Optional, List, Dict, Any
|
||||
from datetime import datetime, timedelta
|
||||
import os
|
||||
import uuid
|
||||
import json
|
||||
import logging
|
||||
from pydantic import BaseModel, Field, validator
|
||||
import hashlib
|
||||
import asyncio
|
||||
|
||||
from database import get_db
|
||||
from auth import get_current_user, require_admin
|
||||
from models import User
|
||||
from billing_models import UserSubscription, SubscriptionPlan
|
||||
from vod_models import (
|
||||
VODContent, VODUserRental, VODGenre, VODContentGenre, VODCast,
|
||||
VODUserWatchHistory, VODUserRating, VODUserWishlist, VODCollection,
|
||||
VODCollectionItem, ContentType, ContentStatus, RentalType, PaymentStatus,
|
||||
VODSubtitle, VODDirectory, VODDirectoryScan
|
||||
)
|
||||
from rental_system import RentalSystem, PricingEngine
|
||||
from rental_system import (
|
||||
RentalSystemError, InsufficientCreditsError,
|
||||
ContentNotAvailableError, AlreadyRentedError
|
||||
)
|
||||
from vod_metadata_service_fixed import VODMetadataService
|
||||
from vod_directory_service_fixed import VODDirectoryService
|
||||
from redis import Redis
|
||||
import aioredis
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
router = APIRouter(prefix="/api/vod", tags=["VOD"])
|
||||
|
||||
# Enhanced Pydantic models with validation
|
||||
class VODContentCreate(BaseModel):
|
||||
title: str = Field(..., min_length=1, max_length=255)
|
||||
description: Optional[str] = Field(None, max_length=5000)
|
||||
content_type: ContentType
|
||||
release_year: Optional[int] = Field(None, ge=1900, le=datetime.now().year + 5)
|
||||
runtime_minutes: Optional[int] = Field(None, ge=0, le=1440)
|
||||
language: str = Field("en", min_length=2, max_length=10)
|
||||
country: Optional[str] = Field(None, max_length=100)
|
||||
age_rating: Optional[str] = Field(None, max_length=10)
|
||||
rental_type: RentalType = RentalType.FREE
|
||||
rental_price: float = Field(0.0, ge=0, le=1000)
|
||||
rental_currency: str = Field("EUR", min_length=3, max_length=3)
|
||||
rental_duration_hours: int = Field(48, ge=1, le=720)
|
||||
video_url: Optional[str] = Field(None, max_length=500)
|
||||
trailer_url: Optional[str] = Field(None, max_length=500)
|
||||
poster_url: Optional[str] = Field(None, max_length=500)
|
||||
genre_ids: List[int] = []
|
||||
tags: List[str] = []
|
||||
|
||||
@validator('title')
|
||||
def validate_title(cls, v):
|
||||
if not v or v.strip() == "":
|
||||
raise ValueError('Title cannot be empty')
|
||||
return v.strip()
|
||||
|
||||
class VODContentUpdate(BaseModel):
|
||||
title: Optional[str] = Field(None, min_length=1, max_length=255)
|
||||
description: Optional[str] = Field(None, max_length=5000)
|
||||
status: Optional[ContentStatus] = None
|
||||
rental_price: Optional[float] = Field(None, ge=0, le=1000)
|
||||
rental_type: Optional[RentalType] = None
|
||||
video_url: Optional[str] = Field(None, max_length=500)
|
||||
trailer_url: Optional[str] = Field(None, max_length=500)
|
||||
poster_url: Optional[str] = Field(None, max_length=500)
|
||||
genre_ids: Optional[List[int]] = None
|
||||
tags: Optional[List[str]] = None
|
||||
|
||||
class RentalRequest(BaseModel):
|
||||
content_id: int = Field(..., ge=1)
|
||||
payment_method: str = Field("credits", regex="^(credits|bitcoin|card)$")
|
||||
|
||||
class SubscriptionRequest(BaseModel):
|
||||
plan_id: int = Field(..., ge=1)
|
||||
payment_method: str = Field("credits", regex="^(credits|bitcoin|card)$")
|
||||
|
||||
class ContentRatingRequest(BaseModel):
|
||||
content_id: int = Field(..., ge=1)
|
||||
rating: float = Field(..., ge=0.0, le=5.0)
|
||||
review: Optional[str] = Field(None, max_length=1000)
|
||||
|
||||
class WatchProgressUpdate(BaseModel):
|
||||
content_id: int = Field(..., ge=1)
|
||||
watch_time_seconds: int = Field(..., ge=0)
|
||||
total_duration: int = Field(..., ge=0)
|
||||
|
||||
# Initialize Redis cache
|
||||
redis_client = None
|
||||
|
||||
def get_redis_client():
|
||||
global redis_client
|
||||
if not redis_client:
|
||||
try:
|
||||
redis_host = os.getenv('REDIS_HOST', 'localhost')
|
||||
redis_port = int(os.getenv('REDIS_PORT', '6379'))
|
||||
redis_db = int(os.getenv('REDIS_VOD_DB', '4'))
|
||||
redis_client = Redis(
|
||||
host=redis_host,
|
||||
port=redis_port,
|
||||
db=redis_db,
|
||||
decode_responses=True
|
||||
)
|
||||
redis_client.ping()
|
||||
except Exception as e:
|
||||
logger.warning(f"Redis not available: {e}")
|
||||
redis_client = None
|
||||
return redis_client
|
||||
|
||||
# Content Management Endpoints
|
||||
@router.get("/content", response_model=List[Dict])
|
||||
async def list_content(
|
||||
skip: int = Query(0, ge=0),
|
||||
limit: int = Query(100, le=1000),
|
||||
content_type: Optional[ContentType] = None,
|
||||
genre_id: Optional[int] = None,
|
||||
rental_type: Optional[RentalType] = None,
|
||||
search: Optional[str] = Query(None, max_length=100),
|
||||
sort_by: str = Query("created_at", regex="^(created_at|title|release_year|user_rating|view_count)$"),
|
||||
sort_order: str = Query("desc", regex="^(asc|desc)$"),
|
||||
include_unavailable: bool = False,
|
||||
db: Session = Depends(get_db),
|
||||
current_user: User = Depends(get_current_user)
|
||||
):
|
||||
"""List VOD content with filtering, search and caching"""
|
||||
try:
|
||||
# Generate cache key
|
||||
cache_key = f"vod:list:{current_user.id}:{skip}:{limit}:{content_type}:{genre_id}:{rental_type}:{search}:{sort_by}:{sort_order}"
|
||||
cache_client = get_redis_client()
|
||||
|
||||
# Try cache first
|
||||
if cache_client and not search: # Don't cache search results
|
||||
try:
|
||||
cached_data = cache_client.get(cache_key)
|
||||
if cached_data:
|
||||
logger.info(f"Returning cached content list for user {current_user.id}")
|
||||
return json.loads(cached_data)
|
||||
except Exception as e:
|
||||
logger.warning(f"Cache retrieval error: {e}")
|
||||
|
||||
# Build query with eager loading
|
||||
query = db.query(VODContent).options(
|
||||
joinedload(VODContent.genres),
|
||||
joinedload(VODContent.cast)
|
||||
)
|
||||
|
||||
# Filter by status
|
||||
if not include_unavailable:
|
||||
query = query.filter(VODContent.status == ContentStatus.PUBLISHED)
|
||||
|
||||
# Apply filters
|
||||
if content_type:
|
||||
query = query.filter(VODContent.content_type == content_type)
|
||||
|
||||
if rental_type:
|
||||
query = query.filter(or_(
|
||||
VODContent.rental_type == rental_type,
|
||||
VODContent.rental_type == RentalType.BOTH
|
||||
))
|
||||
|
||||
if genre_id:
|
||||
query = query.join(VODContentGenre).filter(VODContentGenre.genre_id == genre_id)
|
||||
|
||||
if search:
|
||||
# Enhanced search across multiple fields
|
||||
search_term = f"%{search}%"
|
||||
query = query.filter(or_(
|
||||
VODContent.title.ilike(search_term),
|
||||
VODContent.description.ilike(search_term),
|
||||
VODContent.original_title.ilike(search_term),
|
||||
VODContent.keywords.contains([search])
|
||||
))
|
||||
|
||||
# Apply sorting
|
||||
sort_column = getattr(VODContent, sort_by)
|
||||
if sort_order == "desc":
|
||||
sort_column = sort_column.desc()
|
||||
query = query.order_by(sort_column)
|
||||
|
||||
# Pagination
|
||||
total_count = query.count()
|
||||
content_list = query.offset(skip).limit(limit).all()
|
||||
|
||||
# Format response with access info
|
||||
rental_system = RentalSystem(db)
|
||||
pricing_engine = PricingEngine(db)
|
||||
|
||||
result = []
|
||||
for content in content_list:
|
||||
try:
|
||||
access_info = rental_system.check_user_access(current_user, content)
|
||||
dynamic_price = pricing_engine.calculate_dynamic_price(content, current_user)
|
||||
|
||||
content_data = {
|
||||
"id": content.id,
|
||||
"title": content.title,
|
||||
"description": content.description,
|
||||
"content_type": content.content_type.value if content.content_type else None,
|
||||
"release_year": content.release_year,
|
||||
"runtime_minutes": content.runtime_minutes,
|
||||
"age_rating": content.age_rating,
|
||||
"user_rating": content.user_rating,
|
||||
"view_count": content.view_count,
|
||||
"poster_url": content.poster_url,
|
||||
"trailer_url": content.trailer_url,
|
||||
"rental_type": content.rental_type.value if content.rental_type else None,
|
||||
"base_rental_price": content.rental_price,
|
||||
"dynamic_price": dynamic_price,
|
||||
"currency": content.rental_currency,
|
||||
"access_info": access_info,
|
||||
"genres": [{"id": g.id, "name": g.name} for g in content.genres] if hasattr(content, 'genres') else [],
|
||||
"created_at": content.created_at.isoformat() if content.created_at else None
|
||||
}
|
||||
result.append(content_data)
|
||||
except Exception as e:
|
||||
logger.error(f"Error processing content {content.id}: {e}")
|
||||
continue
|
||||
|
||||
# Cache result
|
||||
if cache_client and not search and result:
|
||||
try:
|
||||
cache_client.setex(cache_key, 300, json.dumps(result)) # Cache for 5 minutes
|
||||
except Exception as e:
|
||||
logger.warning(f"Cache storage error: {e}")
|
||||
|
||||
return result
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error listing content: {e}")
|
||||
raise HTTPException(status_code=500, detail="Failed to retrieve content list")
|
||||
|
||||
@router.get("/content/{content_id}")
|
||||
async def get_content_details(
|
||||
content_id: int,
|
||||
db: Session = Depends(get_db),
|
||||
current_user: User = Depends(get_current_user)
|
||||
):
|
||||
"""Get detailed content information with error handling"""
|
||||
try:
|
||||
# Query with eager loading
|
||||
content = db.query(VODContent).options(
|
||||
joinedload(VODContent.genres),
|
||||
joinedload(VODContent.cast),
|
||||
joinedload(VODContent.subtitles)
|
||||
).filter(VODContent.id == content_id).first()
|
||||
|
||||
if not content:
|
||||
raise HTTPException(status_code=404, detail="Content not found")
|
||||
|
||||
rental_system = RentalSystem(db)
|
||||
pricing_engine = PricingEngine(db)
|
||||
|
||||
# Get access information
|
||||
access_info = rental_system.check_user_access(current_user, content)
|
||||
dynamic_price = pricing_engine.calculate_dynamic_price(content, current_user)
|
||||
|
||||
# Get user's watch history
|
||||
watch_history = db.query(VODUserWatchHistory).filter(
|
||||
VODUserWatchHistory.user_id == current_user.id,
|
||||
VODUserWatchHistory.content_id == content_id
|
||||
).first()
|
||||
|
||||
# Get user's rating
|
||||
user_rating = db.query(VODUserRating).filter(
|
||||
VODUserRating.user_id == current_user.id,
|
||||
VODUserRating.content_id == content_id
|
||||
).first()
|
||||
|
||||
# Check wishlist status
|
||||
in_wishlist = db.query(VODUserWishlist).filter(
|
||||
VODUserWishlist.user_id == current_user.id,
|
||||
VODUserWishlist.content_id == content_id
|
||||
).first() is not None
|
||||
|
||||
# Get similar content
|
||||
similar_content = _get_similar_content(db, content, limit=5)
|
||||
|
||||
return {
|
||||
"id": content.id,
|
||||
"title": content.title,
|
||||
"original_title": content.original_title,
|
||||
"description": content.description,
|
||||
"synopsis": content.synopsis,
|
||||
"content_type": content.content_type.value if content.content_type else None,
|
||||
"release_year": content.release_year,
|
||||
"runtime_minutes": content.runtime_minutes,
|
||||
"language": content.language,
|
||||
"country": content.country,
|
||||
"age_rating": content.age_rating,
|
||||
"imdb_rating": content.imdb_rating,
|
||||
"user_rating": content.user_rating,
|
||||
"view_count": content.view_count,
|
||||
"poster_url": content.poster_url,
|
||||
"backdrop_url": content.backdrop_url,
|
||||
"trailer_url": content.trailer_url,
|
||||
"video_quality": content.video_quality,
|
||||
"audio_languages": content.audio_languages,
|
||||
"rental_type": content.rental_type.value if content.rental_type else None,
|
||||
"base_rental_price": content.rental_price,
|
||||
"dynamic_price": dynamic_price,
|
||||
"currency": content.rental_currency,
|
||||
"rental_duration_hours": content.rental_duration_hours,
|
||||
"genres": [{"id": g.id, "name": g.name, "color": g.color} for g in content.genres],
|
||||
"cast": [{"name": c.person_name, "role": c.role_type, "character": c.character_name} for c in content.cast],
|
||||
"subtitles": [
|
||||
{
|
||||
"id": s.id,
|
||||
"language": s.language,
|
||||
"language_name": s.language_name,
|
||||
"format": s.format
|
||||
} for s in content.subtitles
|
||||
] if hasattr(content, 'subtitles') else [],
|
||||
"access_info": access_info,
|
||||
"user_data": {
|
||||
"watch_progress": {
|
||||
"watch_time_seconds": watch_history.watch_time_seconds if watch_history else 0,
|
||||
"completion_percentage": watch_history.completion_percentage if watch_history else 0.0,
|
||||
"is_completed": watch_history.is_completed if watch_history else False,
|
||||
"last_watched": watch_history.last_watched_at.isoformat() if watch_history else None
|
||||
} if watch_history else None,
|
||||
"user_rating": user_rating.rating if user_rating else None,
|
||||
"user_review": user_rating.review if user_rating else None,
|
||||
"in_wishlist": in_wishlist
|
||||
},
|
||||
"similar_content": similar_content,
|
||||
"available_from": content.available_from.isoformat() if content.available_from else None,
|
||||
"available_until": content.available_until.isoformat() if content.available_until else None,
|
||||
"published_at": content.published_at.isoformat() if content.published_at else None
|
||||
}
|
||||
|
||||
except HTTPException:
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting content details: {e}")
|
||||
raise HTTPException(status_code=500, detail="Failed to retrieve content details")
|
||||
|
||||
def _get_similar_content(db: Session, content: VODContent, limit: int = 5) -> List[Dict]:
|
||||
"""Get similar content based on genres and metadata"""
|
||||
try:
|
||||
similar = []
|
||||
|
||||
# Get content with same genres
|
||||
if content.genres:
|
||||
genre_ids = [g.id for g in content.genres]
|
||||
query = db.query(VODContent).join(VODContentGenre).filter(
|
||||
and_(
|
||||
VODContentGenre.genre_id.in_(genre_ids),
|
||||
VODContent.id != content.id,
|
||||
VODContent.status == ContentStatus.PUBLISHED
|
||||
)
|
||||
).limit(limit)
|
||||
|
||||
for similar_content in query.all():
|
||||
similar.append({
|
||||
"id": similar_content.id,
|
||||
"title": similar_content.title,
|
||||
"poster_url": similar_content.poster_url,
|
||||
"release_year": similar_content.release_year,
|
||||
"user_rating": similar_content.user_rating
|
||||
})
|
||||
|
||||
return similar
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting similar content: {e}")
|
||||
return []
|
||||
|
||||
@router.post("/content", dependencies=[Depends(require_admin)])
|
||||
async def create_content(
|
||||
content_data: VODContentCreate,
|
||||
background_tasks: BackgroundTasks,
|
||||
db: Session = Depends(get_db),
|
||||
current_user: User = Depends(get_current_user)
|
||||
):
|
||||
"""Create new VOD content with validation (Admin only)"""
|
||||
try:
|
||||
# Generate unique slug
|
||||
slug = content_data.title.lower().replace(" ", "-")
|
||||
slug = re.sub(r'[^a-z0-9-]', '', slug)
|
||||
slug = f"{slug}-{uuid.uuid4().hex[:8]}"
|
||||
|
||||
content = VODContent(
|
||||
title=content_data.title,
|
||||
description=content_data.description,
|
||||
content_type=content_data.content_type,
|
||||
status=ContentStatus.DRAFT,
|
||||
release_year=content_data.release_year,
|
||||
runtime_minutes=content_data.runtime_minutes,
|
||||
language=content_data.language,
|
||||
country=content_data.country,
|
||||
age_rating=content_data.age_rating,
|
||||
rental_type=content_data.rental_type,
|
||||
rental_price=content_data.rental_price,
|
||||
rental_currency=content_data.rental_currency,
|
||||
rental_duration_hours=content_data.rental_duration_hours,
|
||||
video_url=content_data.video_url,
|
||||
trailer_url=content_data.trailer_url,
|
||||
poster_url=content_data.poster_url,
|
||||
keywords=content_data.tags,
|
||||
slug=slug,
|
||||
uploaded_by=current_user.id
|
||||
)
|
||||
|
||||
db.add(content)
|
||||
db.flush() # Get the ID
|
||||
|
||||
# Add genres
|
||||
for genre_id in content_data.genre_ids:
|
||||
genre_link = VODContentGenre(
|
||||
content_id=content.id,
|
||||
genre_id=genre_id
|
||||
)
|
||||
db.add(genre_link)
|
||||
|
||||
db.commit()
|
||||
db.refresh(content)
|
||||
|
||||
# Schedule metadata enrichment in background
|
||||
if content_data.title:
|
||||
background_tasks.add_task(
|
||||
enrich_content_metadata,
|
||||
content.id,
|
||||
db
|
||||
)
|
||||
|
||||
# Clear content list cache
|
||||
cache_client = get_redis_client()
|
||||
if cache_client:
|
||||
try:
|
||||
for key in cache_client.scan_iter("vod:list:*"):
|
||||
cache_client.delete(key)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return {
|
||||
"id": content.id,
|
||||
"title": content.title,
|
||||
"slug": content.slug,
|
||||
"status": "created"
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
db.rollback()
|
||||
logger.error(f"Error creating content: {e}")
|
||||
raise HTTPException(status_code=500, detail="Failed to create content")
|
||||
|
||||
async def enrich_content_metadata(content_id: int, db: Session):
|
||||
"""Background task to enrich content metadata"""
|
||||
try:
|
||||
metadata_service = VODMetadataService(db)
|
||||
await metadata_service.enrich_content_metadata(content_id)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to enrich metadata for content {content_id}: {e}")
|
||||
|
||||
@router.put("/content/{content_id}", dependencies=[Depends(require_admin)])
|
||||
async def update_content(
|
||||
content_id: int,
|
||||
content_data: VODContentUpdate,
|
||||
db: Session = Depends(get_db)
|
||||
):
|
||||
"""Update VOD content with validation (Admin only)"""
|
||||
try:
|
||||
content = db.query(VODContent).filter(VODContent.id == content_id).first()
|
||||
if not content:
|
||||
raise HTTPException(status_code=404, detail="Content not found")
|
||||
|
||||
# Update fields
|
||||
for field, value in content_data.dict(exclude_unset=True).items():
|
||||
if field == 'genre_ids' and value is not None:
|
||||
# Update genres
|
||||
db.query(VODContentGenre).filter(
|
||||
VODContentGenre.content_id == content_id
|
||||
).delete()
|
||||
|
||||
for genre_id in value:
|
||||
genre_link = VODContentGenre(
|
||||
content_id=content_id,
|
||||
genre_id=genre_id
|
||||
)
|
||||
db.add(genre_link)
|
||||
elif field == 'tags' and value is not None:
|
||||
content.keywords = value
|
||||
elif field != 'genre_ids' and field != 'tags':
|
||||
setattr(content, field, value)
|
||||
|
||||
content.updated_at = datetime.utcnow()
|
||||
|
||||
# If publishing, set published_at
|
||||
if content_data.status == ContentStatus.PUBLISHED and not content.published_at:
|
||||
content.published_at = datetime.utcnow()
|
||||
|
||||
db.commit()
|
||||
db.refresh(content)
|
||||
|
||||
# Clear caches
|
||||
cache_client = get_redis_client()
|
||||
if cache_client:
|
||||
try:
|
||||
for key in cache_client.scan_iter("vod:*"):
|
||||
cache_client.delete(key)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return {"id": content.id, "status": "updated"}
|
||||
|
||||
except HTTPException:
|
||||
raise
|
||||
except Exception as e:
|
||||
db.rollback()
|
||||
logger.error(f"Error updating content: {e}")
|
||||
raise HTTPException(status_code=500, detail="Failed to update content")
|
||||
|
||||
@router.delete("/content/{content_id}", dependencies=[Depends(require_admin)])
|
||||
async def delete_content(
|
||||
content_id: int,
|
||||
db: Session = Depends(get_db)
|
||||
):
|
||||
"""Delete VOD content (Admin only)"""
|
||||
try:
|
||||
content = db.query(VODContent).filter(VODContent.id == content_id).first()
|
||||
if not content:
|
||||
raise HTTPException(status_code=404, detail="Content not found")
|
||||
|
||||
# Delete related data
|
||||
db.query(VODContentGenre).filter(VODContentGenre.content_id == content_id).delete()
|
||||
db.query(VODCast).filter(VODCast.content_id == content_id).delete()
|
||||
db.query(VODSubtitle).filter(VODSubtitle.content_id == content_id).delete()
|
||||
db.query(VODUserRating).filter(VODUserRating.content_id == content_id).delete()
|
||||
db.query(VODUserWishlist).filter(VODUserWishlist.content_id == content_id).delete()
|
||||
db.query(VODUserWatchHistory).filter(VODUserWatchHistory.content_id == content_id).delete()
|
||||
|
||||
db.delete(content)
|
||||
db.commit()
|
||||
|
||||
# Clear caches
|
||||
cache_client = get_redis_client()
|
||||
if cache_client:
|
||||
try:
|
||||
for key in cache_client.scan_iter("vod:*"):
|
||||
cache_client.delete(key)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return {"status": "deleted"}
|
||||
|
||||
except HTTPException:
|
||||
raise
|
||||
except Exception as e:
|
||||
db.rollback()
|
||||
logger.error(f"Error deleting content: {e}")
|
||||
raise HTTPException(status_code=500, detail="Failed to delete content")
|
||||
|
||||
# Rental System Endpoints with enhanced error handling
|
||||
@router.post("/rent")
|
||||
async def rent_content(
|
||||
rental_request: RentalRequest,
|
||||
db: Session = Depends(get_db),
|
||||
current_user: User = Depends(get_current_user)
|
||||
):
|
||||
"""Rent VOD content with comprehensive error handling"""
|
||||
try:
|
||||
# Validate content exists
|
||||
content = db.query(VODContent).filter(
|
||||
VODContent.id == rental_request.content_id
|
||||
).first()
|
||||
|
||||
if not content:
|
||||
raise HTTPException(status_code=404, detail="Content not found")
|
||||
|
||||
if content.status != ContentStatus.PUBLISHED:
|
||||
raise HTTPException(status_code=400, detail="Content is not available")
|
||||
|
||||
rental_system = RentalSystem(db)
|
||||
rental = rental_system.rent_content(
|
||||
current_user,
|
||||
rental_request.content_id,
|
||||
rental_request.payment_method
|
||||
)
|
||||
|
||||
return {
|
||||
"rental_id": rental.id,
|
||||
"content_id": rental.content_id,
|
||||
"price": rental.rental_price,
|
||||
"currency": rental.currency,
|
||||
"expires_at": rental.expires_at.isoformat(),
|
||||
"payment_status": rental.payment_status.value if rental.payment_status else None,
|
||||
"transaction_id": rental.transaction_id
|
||||
}
|
||||
|
||||
except InsufficientCreditsError as e:
|
||||
raise HTTPException(status_code=402, detail=str(e))
|
||||
except AlreadyRentedError as e:
|
||||
raise HTTPException(status_code=400, detail=str(e))
|
||||
except ContentNotAvailableError as e:
|
||||
raise HTTPException(status_code=400, detail=str(e))
|
||||
except RentalSystemError as e:
|
||||
logger.error(f"Rental system error: {e}")
|
||||
raise HTTPException(status_code=500, detail="Failed to process rental")
|
||||
except HTTPException:
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.error(f"Unexpected error during rental: {e}")
|
||||
raise HTTPException(status_code=500, detail="Failed to process rental")
|
||||
|
||||
# Watch Progress Tracking
|
||||
@router.post("/watch-progress")
|
||||
async def update_watch_progress(
|
||||
progress: WatchProgressUpdate,
|
||||
db: Session = Depends(get_db),
|
||||
current_user: User = Depends(get_current_user)
|
||||
):
|
||||
"""Update user's watch progress"""
|
||||
try:
|
||||
# Verify access
|
||||
content = db.query(VODContent).filter(
|
||||
VODContent.id == progress.content_id
|
||||
).first()
|
||||
|
||||
if not content:
|
||||
raise HTTPException(status_code=404, detail="Content not found")
|
||||
|
||||
rental_system = RentalSystem(db)
|
||||
access_info = rental_system.check_user_access(current_user, content)
|
||||
|
||||
if not access_info["has_access"]:
|
||||
raise HTTPException(status_code=403, detail="Access denied")
|
||||
|
||||
# Update or create watch history
|
||||
watch_history = db.query(VODUserWatchHistory).filter(
|
||||
VODUserWatchHistory.user_id == current_user.id,
|
||||
VODUserWatchHistory.content_id == progress.content_id
|
||||
).first()
|
||||
|
||||
completion_percentage = (progress.watch_time_seconds / progress.total_duration * 100) if progress.total_duration > 0 else 0
|
||||
is_completed = completion_percentage >= 90 # Consider 90% as completed
|
||||
|
||||
if watch_history:
|
||||
watch_history.watch_time_seconds = progress.watch_time_seconds
|
||||
watch_history.completion_percentage = completion_percentage
|
||||
watch_history.is_completed = is_completed
|
||||
watch_history.last_watched_at = datetime.utcnow()
|
||||
else:
|
||||
watch_history = VODUserWatchHistory(
|
||||
user_id=current_user.id,
|
||||
content_id=progress.content_id,
|
||||
watch_time_seconds=progress.watch_time_seconds,
|
||||
completion_percentage=completion_percentage,
|
||||
is_completed=is_completed,
|
||||
last_watched_at=datetime.utcnow()
|
||||
)
|
||||
db.add(watch_history)
|
||||
|
||||
db.commit()
|
||||
|
||||
return {
|
||||
"status": "updated",
|
||||
"completion_percentage": completion_percentage,
|
||||
"is_completed": is_completed
|
||||
}
|
||||
|
||||
except HTTPException:
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.error(f"Error updating watch progress: {e}")
|
||||
raise HTTPException(status_code=500, detail="Failed to update watch progress")
|
||||
|
||||
# Directory Management Endpoints
|
||||
@router.get("/directories", dependencies=[Depends(require_admin)])
|
||||
async def list_directories(
|
||||
db: Session = Depends(get_db)
|
||||
):
|
||||
"""List VOD directories (Admin only)"""
|
||||
try:
|
||||
directory_service = VODDirectoryService(db)
|
||||
directories = db.query(VODDirectory).all()
|
||||
|
||||
result = []
|
||||
for directory in directories:
|
||||
status = directory_service.get_directory_status(directory.id)
|
||||
result.append(status['directory'])
|
||||
|
||||
return result
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error listing directories: {e}")
|
||||
raise HTTPException(status_code=500, detail="Failed to list directories")
|
||||
|
||||
@router.post("/directories/scan/{directory_id}", dependencies=[Depends(require_admin)])
|
||||
async def scan_directory(
|
||||
directory_id: int,
|
||||
background_tasks: BackgroundTasks,
|
||||
force: bool = False,
|
||||
deep_scan: bool = False,
|
||||
db: Session = Depends(get_db)
|
||||
):
|
||||
"""Trigger directory scan (Admin only)"""
|
||||
try:
|
||||
directory_service = VODDirectoryService(db)
|
||||
|
||||
# Validate directory exists
|
||||
directory = db.query(VODDirectory).filter(
|
||||
VODDirectory.id == directory_id
|
||||
).first()
|
||||
|
||||
if not directory:
|
||||
raise HTTPException(status_code=404, detail="Directory not found")
|
||||
|
||||
# Start scan in background
|
||||
background_tasks.add_task(
|
||||
run_directory_scan,
|
||||
directory_id,
|
||||
force,
|
||||
deep_scan,
|
||||
db
|
||||
)
|
||||
|
||||
return {
|
||||
"status": "scan_started",
|
||||
"directory_id": directory_id,
|
||||
"directory_name": directory.name
|
||||
}
|
||||
|
||||
except HTTPException:
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.error(f"Error starting directory scan: {e}")
|
||||
raise HTTPException(status_code=500, detail="Failed to start directory scan")
|
||||
|
||||
def run_directory_scan(directory_id: int, force: bool, deep_scan: bool, db: Session):
|
||||
"""Background task to scan directory"""
|
||||
try:
|
||||
directory_service = VODDirectoryService(db)
|
||||
scan = directory_service.scan_directory(directory_id, force, deep_scan)
|
||||
logger.info(f"Directory scan completed: {scan.id}")
|
||||
except Exception as e:
|
||||
logger.error(f"Directory scan failed: {e}")
|
||||
|
||||
# Streaming endpoint with HLS support
|
||||
@router.get("/stream/{content_id}/playlist.m3u8")
|
||||
async def get_hls_playlist(
|
||||
content_id: int,
|
||||
db: Session = Depends(get_db),
|
||||
current_user: User = Depends(get_current_user)
|
||||
):
|
||||
"""Get HLS playlist for content streaming"""
|
||||
try:
|
||||
content = db.query(VODContent).filter(VODContent.id == content_id).first()
|
||||
if not content:
|
||||
raise HTTPException(status_code=404, detail="Content not found")
|
||||
|
||||
# Check user access
|
||||
rental_system = RentalSystem(db)
|
||||
access_info = rental_system.check_user_access(current_user, content)
|
||||
|
||||
if not access_info["has_access"]:
|
||||
raise HTTPException(status_code=403, detail="Access denied")
|
||||
|
||||
if not content.video_url:
|
||||
raise HTTPException(status_code=404, detail="Video file not available")
|
||||
|
||||
# Update view count
|
||||
content.view_count = (content.view_count or 0) + 1
|
||||
db.commit()
|
||||
|
||||
# Generate HLS playlist
|
||||
playlist = generate_hls_playlist(content, current_user)
|
||||
|
||||
return StreamingResponse(
|
||||
io.StringIO(playlist),
|
||||
media_type="application/x-mpegURL",
|
||||
headers={
|
||||
"Cache-Control": "no-cache",
|
||||
"X-Content-Duration": str(content.duration_seconds or 0)
|
||||
}
|
||||
)
|
||||
|
||||
except HTTPException:
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.error(f"Error generating HLS playlist: {e}")
|
||||
raise HTTPException(status_code=500, detail="Failed to generate streaming playlist")
|
||||
|
||||
def generate_hls_playlist(content: VODContent, user: User) -> str:
|
||||
"""Generate HLS playlist for content"""
|
||||
# This is a simplified example - actual implementation would use FFmpeg
|
||||
# to transcode the video file to HLS format
|
||||
playlist = """#EXTM3U
|
||||
#EXT-X-VERSION:3
|
||||
#EXT-X-TARGETDURATION:10
|
||||
#EXT-X-MEDIA-SEQUENCE:0
|
||||
#EXT-X-PLAYLIST-TYPE:VOD
|
||||
"""
|
||||
|
||||
# Generate secure token for segments
|
||||
token = hashlib.sha256(
|
||||
f"{content.id}:{user.id}:{datetime.utcnow().isoformat()}".encode()
|
||||
).hexdigest()
|
||||
|
||||
# Add segments (simplified - actual implementation would read from transcoded files)
|
||||
segment_duration = 10 # seconds
|
||||
total_duration = content.duration_seconds or 0
|
||||
num_segments = (total_duration // segment_duration) + 1
|
||||
|
||||
for i in range(num_segments):
|
||||
duration = min(segment_duration, total_duration - (i * segment_duration))
|
||||
if duration > 0:
|
||||
playlist += f"#EXTINF:{duration:.3f},\n"
|
||||
playlist += f"/api/vod/stream/{content.id}/segment{i}.ts?token={token}\n"
|
||||
|
||||
playlist += "#EXT-X-ENDLIST\n"
|
||||
return playlist
|
||||
|
||||
# Health check endpoint
|
||||
@router.get("/health")
|
||||
async def health_check():
|
||||
"""VOD service health check"""
|
||||
try:
|
||||
# Check Redis
|
||||
redis_status = "healthy"
|
||||
cache_client = get_redis_client()
|
||||
if cache_client:
|
||||
try:
|
||||
cache_client.ping()
|
||||
except Exception:
|
||||
redis_status = "unhealthy"
|
||||
else:
|
||||
redis_status = "unavailable"
|
||||
|
||||
return {
|
||||
"status": "healthy",
|
||||
"redis": redis_status,
|
||||
"timestamp": datetime.utcnow().isoformat()
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Health check failed: {e}")
|
||||
return {
|
||||
"status": "unhealthy",
|
||||
"error": str(e),
|
||||
"timestamp": datetime.utcnow().isoformat()
|
||||
}
|
||||
Reference in New Issue
Block a user