4f515bbd61
- Fixed SQLAlchemy import issues in VOD models - Fixed TMDB/OMDB API authentication and rate limiting - Fixed VOD directory path resolution and permission errors - Fixed rental system transaction handling - Added HLS streaming support for VOD content - Implemented Redis caching for performance - Added watch progress tracking - Enhanced search with multi-field support - Added health check endpoint This patch resolves critical production issues in the VOD system.
851 lines
31 KiB
Python
851 lines
31 KiB
Python
"""
|
|
VOD API - Enhanced with comprehensive error handling and validation
|
|
"""
|
|
from fastapi import APIRouter, Depends, HTTPException, status, Query, UploadFile, File, BackgroundTasks
|
|
from fastapi.responses import FileResponse, StreamingResponse
|
|
from sqlalchemy.orm import Session, joinedload
|
|
from sqlalchemy import and_, or_, func
|
|
from typing import Optional, List, Dict, Any
|
|
from datetime import datetime, timedelta
|
|
import os
|
|
import uuid
|
|
import json
|
|
import logging
|
|
from pydantic import BaseModel, Field, validator
|
|
import hashlib
|
|
import asyncio
|
|
|
|
from database import get_db
|
|
from auth import get_current_user, require_admin
|
|
from models import User
|
|
from billing_models import UserSubscription, SubscriptionPlan
|
|
from vod_models import (
|
|
VODContent, VODUserRental, VODGenre, VODContentGenre, VODCast,
|
|
VODUserWatchHistory, VODUserRating, VODUserWishlist, VODCollection,
|
|
VODCollectionItem, ContentType, ContentStatus, RentalType, PaymentStatus,
|
|
VODSubtitle, VODDirectory, VODDirectoryScan
|
|
)
|
|
from rental_system import RentalSystem, PricingEngine
|
|
from rental_system import (
|
|
RentalSystemError, InsufficientCreditsError,
|
|
ContentNotAvailableError, AlreadyRentedError
|
|
)
|
|
from vod_metadata_service_fixed import VODMetadataService
|
|
from vod_directory_service_fixed import VODDirectoryService
|
|
from redis import Redis
|
|
import aioredis
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
router = APIRouter(prefix="/api/vod", tags=["VOD"])
|
|
|
|
# Enhanced Pydantic models with validation
|
|
class VODContentCreate(BaseModel):
|
|
title: str = Field(..., min_length=1, max_length=255)
|
|
description: Optional[str] = Field(None, max_length=5000)
|
|
content_type: ContentType
|
|
release_year: Optional[int] = Field(None, ge=1900, le=datetime.now().year + 5)
|
|
runtime_minutes: Optional[int] = Field(None, ge=0, le=1440)
|
|
language: str = Field("en", min_length=2, max_length=10)
|
|
country: Optional[str] = Field(None, max_length=100)
|
|
age_rating: Optional[str] = Field(None, max_length=10)
|
|
rental_type: RentalType = RentalType.FREE
|
|
rental_price: float = Field(0.0, ge=0, le=1000)
|
|
rental_currency: str = Field("EUR", min_length=3, max_length=3)
|
|
rental_duration_hours: int = Field(48, ge=1, le=720)
|
|
video_url: Optional[str] = Field(None, max_length=500)
|
|
trailer_url: Optional[str] = Field(None, max_length=500)
|
|
poster_url: Optional[str] = Field(None, max_length=500)
|
|
genre_ids: List[int] = []
|
|
tags: List[str] = []
|
|
|
|
@validator('title')
|
|
def validate_title(cls, v):
|
|
if not v or v.strip() == "":
|
|
raise ValueError('Title cannot be empty')
|
|
return v.strip()
|
|
|
|
class VODContentUpdate(BaseModel):
|
|
title: Optional[str] = Field(None, min_length=1, max_length=255)
|
|
description: Optional[str] = Field(None, max_length=5000)
|
|
status: Optional[ContentStatus] = None
|
|
rental_price: Optional[float] = Field(None, ge=0, le=1000)
|
|
rental_type: Optional[RentalType] = None
|
|
video_url: Optional[str] = Field(None, max_length=500)
|
|
trailer_url: Optional[str] = Field(None, max_length=500)
|
|
poster_url: Optional[str] = Field(None, max_length=500)
|
|
genre_ids: Optional[List[int]] = None
|
|
tags: Optional[List[str]] = None
|
|
|
|
class RentalRequest(BaseModel):
|
|
content_id: int = Field(..., ge=1)
|
|
payment_method: str = Field("credits", regex="^(credits|bitcoin|card)$")
|
|
|
|
class SubscriptionRequest(BaseModel):
|
|
plan_id: int = Field(..., ge=1)
|
|
payment_method: str = Field("credits", regex="^(credits|bitcoin|card)$")
|
|
|
|
class ContentRatingRequest(BaseModel):
|
|
content_id: int = Field(..., ge=1)
|
|
rating: float = Field(..., ge=0.0, le=5.0)
|
|
review: Optional[str] = Field(None, max_length=1000)
|
|
|
|
class WatchProgressUpdate(BaseModel):
|
|
content_id: int = Field(..., ge=1)
|
|
watch_time_seconds: int = Field(..., ge=0)
|
|
total_duration: int = Field(..., ge=0)
|
|
|
|
# Initialize Redis cache
|
|
redis_client = None
|
|
|
|
def get_redis_client():
|
|
global redis_client
|
|
if not redis_client:
|
|
try:
|
|
redis_host = os.getenv('REDIS_HOST', 'localhost')
|
|
redis_port = int(os.getenv('REDIS_PORT', '6379'))
|
|
redis_db = int(os.getenv('REDIS_VOD_DB', '4'))
|
|
redis_client = Redis(
|
|
host=redis_host,
|
|
port=redis_port,
|
|
db=redis_db,
|
|
decode_responses=True
|
|
)
|
|
redis_client.ping()
|
|
except Exception as e:
|
|
logger.warning(f"Redis not available: {e}")
|
|
redis_client = None
|
|
return redis_client
|
|
|
|
# Content Management Endpoints
|
|
@router.get("/content", response_model=List[Dict])
|
|
async def list_content(
|
|
skip: int = Query(0, ge=0),
|
|
limit: int = Query(100, le=1000),
|
|
content_type: Optional[ContentType] = None,
|
|
genre_id: Optional[int] = None,
|
|
rental_type: Optional[RentalType] = None,
|
|
search: Optional[str] = Query(None, max_length=100),
|
|
sort_by: str = Query("created_at", regex="^(created_at|title|release_year|user_rating|view_count)$"),
|
|
sort_order: str = Query("desc", regex="^(asc|desc)$"),
|
|
include_unavailable: bool = False,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_user)
|
|
):
|
|
"""List VOD content with filtering, search and caching"""
|
|
try:
|
|
# Generate cache key
|
|
cache_key = f"vod:list:{current_user.id}:{skip}:{limit}:{content_type}:{genre_id}:{rental_type}:{search}:{sort_by}:{sort_order}"
|
|
cache_client = get_redis_client()
|
|
|
|
# Try cache first
|
|
if cache_client and not search: # Don't cache search results
|
|
try:
|
|
cached_data = cache_client.get(cache_key)
|
|
if cached_data:
|
|
logger.info(f"Returning cached content list for user {current_user.id}")
|
|
return json.loads(cached_data)
|
|
except Exception as e:
|
|
logger.warning(f"Cache retrieval error: {e}")
|
|
|
|
# Build query with eager loading
|
|
query = db.query(VODContent).options(
|
|
joinedload(VODContent.genres),
|
|
joinedload(VODContent.cast)
|
|
)
|
|
|
|
# Filter by status
|
|
if not include_unavailable:
|
|
query = query.filter(VODContent.status == ContentStatus.PUBLISHED)
|
|
|
|
# Apply filters
|
|
if content_type:
|
|
query = query.filter(VODContent.content_type == content_type)
|
|
|
|
if rental_type:
|
|
query = query.filter(or_(
|
|
VODContent.rental_type == rental_type,
|
|
VODContent.rental_type == RentalType.BOTH
|
|
))
|
|
|
|
if genre_id:
|
|
query = query.join(VODContentGenre).filter(VODContentGenre.genre_id == genre_id)
|
|
|
|
if search:
|
|
# Enhanced search across multiple fields
|
|
search_term = f"%{search}%"
|
|
query = query.filter(or_(
|
|
VODContent.title.ilike(search_term),
|
|
VODContent.description.ilike(search_term),
|
|
VODContent.original_title.ilike(search_term),
|
|
VODContent.keywords.contains([search])
|
|
))
|
|
|
|
# Apply sorting
|
|
sort_column = getattr(VODContent, sort_by)
|
|
if sort_order == "desc":
|
|
sort_column = sort_column.desc()
|
|
query = query.order_by(sort_column)
|
|
|
|
# Pagination
|
|
total_count = query.count()
|
|
content_list = query.offset(skip).limit(limit).all()
|
|
|
|
# Format response with access info
|
|
rental_system = RentalSystem(db)
|
|
pricing_engine = PricingEngine(db)
|
|
|
|
result = []
|
|
for content in content_list:
|
|
try:
|
|
access_info = rental_system.check_user_access(current_user, content)
|
|
dynamic_price = pricing_engine.calculate_dynamic_price(content, current_user)
|
|
|
|
content_data = {
|
|
"id": content.id,
|
|
"title": content.title,
|
|
"description": content.description,
|
|
"content_type": content.content_type.value if content.content_type else None,
|
|
"release_year": content.release_year,
|
|
"runtime_minutes": content.runtime_minutes,
|
|
"age_rating": content.age_rating,
|
|
"user_rating": content.user_rating,
|
|
"view_count": content.view_count,
|
|
"poster_url": content.poster_url,
|
|
"trailer_url": content.trailer_url,
|
|
"rental_type": content.rental_type.value if content.rental_type else None,
|
|
"base_rental_price": content.rental_price,
|
|
"dynamic_price": dynamic_price,
|
|
"currency": content.rental_currency,
|
|
"access_info": access_info,
|
|
"genres": [{"id": g.id, "name": g.name} for g in content.genres] if hasattr(content, 'genres') else [],
|
|
"created_at": content.created_at.isoformat() if content.created_at else None
|
|
}
|
|
result.append(content_data)
|
|
except Exception as e:
|
|
logger.error(f"Error processing content {content.id}: {e}")
|
|
continue
|
|
|
|
# Cache result
|
|
if cache_client and not search and result:
|
|
try:
|
|
cache_client.setex(cache_key, 300, json.dumps(result)) # Cache for 5 minutes
|
|
except Exception as e:
|
|
logger.warning(f"Cache storage error: {e}")
|
|
|
|
return result
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error listing content: {e}")
|
|
raise HTTPException(status_code=500, detail="Failed to retrieve content list")
|
|
|
|
@router.get("/content/{content_id}")
|
|
async def get_content_details(
|
|
content_id: int,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_user)
|
|
):
|
|
"""Get detailed content information with error handling"""
|
|
try:
|
|
# Query with eager loading
|
|
content = db.query(VODContent).options(
|
|
joinedload(VODContent.genres),
|
|
joinedload(VODContent.cast),
|
|
joinedload(VODContent.subtitles)
|
|
).filter(VODContent.id == content_id).first()
|
|
|
|
if not content:
|
|
raise HTTPException(status_code=404, detail="Content not found")
|
|
|
|
rental_system = RentalSystem(db)
|
|
pricing_engine = PricingEngine(db)
|
|
|
|
# Get access information
|
|
access_info = rental_system.check_user_access(current_user, content)
|
|
dynamic_price = pricing_engine.calculate_dynamic_price(content, current_user)
|
|
|
|
# Get user's watch history
|
|
watch_history = db.query(VODUserWatchHistory).filter(
|
|
VODUserWatchHistory.user_id == current_user.id,
|
|
VODUserWatchHistory.content_id == content_id
|
|
).first()
|
|
|
|
# Get user's rating
|
|
user_rating = db.query(VODUserRating).filter(
|
|
VODUserRating.user_id == current_user.id,
|
|
VODUserRating.content_id == content_id
|
|
).first()
|
|
|
|
# Check wishlist status
|
|
in_wishlist = db.query(VODUserWishlist).filter(
|
|
VODUserWishlist.user_id == current_user.id,
|
|
VODUserWishlist.content_id == content_id
|
|
).first() is not None
|
|
|
|
# Get similar content
|
|
similar_content = _get_similar_content(db, content, limit=5)
|
|
|
|
return {
|
|
"id": content.id,
|
|
"title": content.title,
|
|
"original_title": content.original_title,
|
|
"description": content.description,
|
|
"synopsis": content.synopsis,
|
|
"content_type": content.content_type.value if content.content_type else None,
|
|
"release_year": content.release_year,
|
|
"runtime_minutes": content.runtime_minutes,
|
|
"language": content.language,
|
|
"country": content.country,
|
|
"age_rating": content.age_rating,
|
|
"imdb_rating": content.imdb_rating,
|
|
"user_rating": content.user_rating,
|
|
"view_count": content.view_count,
|
|
"poster_url": content.poster_url,
|
|
"backdrop_url": content.backdrop_url,
|
|
"trailer_url": content.trailer_url,
|
|
"video_quality": content.video_quality,
|
|
"audio_languages": content.audio_languages,
|
|
"rental_type": content.rental_type.value if content.rental_type else None,
|
|
"base_rental_price": content.rental_price,
|
|
"dynamic_price": dynamic_price,
|
|
"currency": content.rental_currency,
|
|
"rental_duration_hours": content.rental_duration_hours,
|
|
"genres": [{"id": g.id, "name": g.name, "color": g.color} for g in content.genres],
|
|
"cast": [{"name": c.person_name, "role": c.role_type, "character": c.character_name} for c in content.cast],
|
|
"subtitles": [
|
|
{
|
|
"id": s.id,
|
|
"language": s.language,
|
|
"language_name": s.language_name,
|
|
"format": s.format
|
|
} for s in content.subtitles
|
|
] if hasattr(content, 'subtitles') else [],
|
|
"access_info": access_info,
|
|
"user_data": {
|
|
"watch_progress": {
|
|
"watch_time_seconds": watch_history.watch_time_seconds if watch_history else 0,
|
|
"completion_percentage": watch_history.completion_percentage if watch_history else 0.0,
|
|
"is_completed": watch_history.is_completed if watch_history else False,
|
|
"last_watched": watch_history.last_watched_at.isoformat() if watch_history else None
|
|
} if watch_history else None,
|
|
"user_rating": user_rating.rating if user_rating else None,
|
|
"user_review": user_rating.review if user_rating else None,
|
|
"in_wishlist": in_wishlist
|
|
},
|
|
"similar_content": similar_content,
|
|
"available_from": content.available_from.isoformat() if content.available_from else None,
|
|
"available_until": content.available_until.isoformat() if content.available_until else None,
|
|
"published_at": content.published_at.isoformat() if content.published_at else None
|
|
}
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error getting content details: {e}")
|
|
raise HTTPException(status_code=500, detail="Failed to retrieve content details")
|
|
|
|
def _get_similar_content(db: Session, content: VODContent, limit: int = 5) -> List[Dict]:
|
|
"""Get similar content based on genres and metadata"""
|
|
try:
|
|
similar = []
|
|
|
|
# Get content with same genres
|
|
if content.genres:
|
|
genre_ids = [g.id for g in content.genres]
|
|
query = db.query(VODContent).join(VODContentGenre).filter(
|
|
and_(
|
|
VODContentGenre.genre_id.in_(genre_ids),
|
|
VODContent.id != content.id,
|
|
VODContent.status == ContentStatus.PUBLISHED
|
|
)
|
|
).limit(limit)
|
|
|
|
for similar_content in query.all():
|
|
similar.append({
|
|
"id": similar_content.id,
|
|
"title": similar_content.title,
|
|
"poster_url": similar_content.poster_url,
|
|
"release_year": similar_content.release_year,
|
|
"user_rating": similar_content.user_rating
|
|
})
|
|
|
|
return similar
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting similar content: {e}")
|
|
return []
|
|
|
|
@router.post("/content", dependencies=[Depends(require_admin)])
|
|
async def create_content(
|
|
content_data: VODContentCreate,
|
|
background_tasks: BackgroundTasks,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_user)
|
|
):
|
|
"""Create new VOD content with validation (Admin only)"""
|
|
try:
|
|
# Generate unique slug
|
|
slug = content_data.title.lower().replace(" ", "-")
|
|
slug = re.sub(r'[^a-z0-9-]', '', slug)
|
|
slug = f"{slug}-{uuid.uuid4().hex[:8]}"
|
|
|
|
content = VODContent(
|
|
title=content_data.title,
|
|
description=content_data.description,
|
|
content_type=content_data.content_type,
|
|
status=ContentStatus.DRAFT,
|
|
release_year=content_data.release_year,
|
|
runtime_minutes=content_data.runtime_minutes,
|
|
language=content_data.language,
|
|
country=content_data.country,
|
|
age_rating=content_data.age_rating,
|
|
rental_type=content_data.rental_type,
|
|
rental_price=content_data.rental_price,
|
|
rental_currency=content_data.rental_currency,
|
|
rental_duration_hours=content_data.rental_duration_hours,
|
|
video_url=content_data.video_url,
|
|
trailer_url=content_data.trailer_url,
|
|
poster_url=content_data.poster_url,
|
|
keywords=content_data.tags,
|
|
slug=slug,
|
|
uploaded_by=current_user.id
|
|
)
|
|
|
|
db.add(content)
|
|
db.flush() # Get the ID
|
|
|
|
# Add genres
|
|
for genre_id in content_data.genre_ids:
|
|
genre_link = VODContentGenre(
|
|
content_id=content.id,
|
|
genre_id=genre_id
|
|
)
|
|
db.add(genre_link)
|
|
|
|
db.commit()
|
|
db.refresh(content)
|
|
|
|
# Schedule metadata enrichment in background
|
|
if content_data.title:
|
|
background_tasks.add_task(
|
|
enrich_content_metadata,
|
|
content.id,
|
|
db
|
|
)
|
|
|
|
# Clear content list cache
|
|
cache_client = get_redis_client()
|
|
if cache_client:
|
|
try:
|
|
for key in cache_client.scan_iter("vod:list:*"):
|
|
cache_client.delete(key)
|
|
except Exception:
|
|
pass
|
|
|
|
return {
|
|
"id": content.id,
|
|
"title": content.title,
|
|
"slug": content.slug,
|
|
"status": "created"
|
|
}
|
|
|
|
except Exception as e:
|
|
db.rollback()
|
|
logger.error(f"Error creating content: {e}")
|
|
raise HTTPException(status_code=500, detail="Failed to create content")
|
|
|
|
async def enrich_content_metadata(content_id: int, db: Session):
|
|
"""Background task to enrich content metadata"""
|
|
try:
|
|
metadata_service = VODMetadataService(db)
|
|
await metadata_service.enrich_content_metadata(content_id)
|
|
except Exception as e:
|
|
logger.error(f"Failed to enrich metadata for content {content_id}: {e}")
|
|
|
|
@router.put("/content/{content_id}", dependencies=[Depends(require_admin)])
|
|
async def update_content(
|
|
content_id: int,
|
|
content_data: VODContentUpdate,
|
|
db: Session = Depends(get_db)
|
|
):
|
|
"""Update VOD content with validation (Admin only)"""
|
|
try:
|
|
content = db.query(VODContent).filter(VODContent.id == content_id).first()
|
|
if not content:
|
|
raise HTTPException(status_code=404, detail="Content not found")
|
|
|
|
# Update fields
|
|
for field, value in content_data.dict(exclude_unset=True).items():
|
|
if field == 'genre_ids' and value is not None:
|
|
# Update genres
|
|
db.query(VODContentGenre).filter(
|
|
VODContentGenre.content_id == content_id
|
|
).delete()
|
|
|
|
for genre_id in value:
|
|
genre_link = VODContentGenre(
|
|
content_id=content_id,
|
|
genre_id=genre_id
|
|
)
|
|
db.add(genre_link)
|
|
elif field == 'tags' and value is not None:
|
|
content.keywords = value
|
|
elif field != 'genre_ids' and field != 'tags':
|
|
setattr(content, field, value)
|
|
|
|
content.updated_at = datetime.utcnow()
|
|
|
|
# If publishing, set published_at
|
|
if content_data.status == ContentStatus.PUBLISHED and not content.published_at:
|
|
content.published_at = datetime.utcnow()
|
|
|
|
db.commit()
|
|
db.refresh(content)
|
|
|
|
# Clear caches
|
|
cache_client = get_redis_client()
|
|
if cache_client:
|
|
try:
|
|
for key in cache_client.scan_iter("vod:*"):
|
|
cache_client.delete(key)
|
|
except Exception:
|
|
pass
|
|
|
|
return {"id": content.id, "status": "updated"}
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
db.rollback()
|
|
logger.error(f"Error updating content: {e}")
|
|
raise HTTPException(status_code=500, detail="Failed to update content")
|
|
|
|
@router.delete("/content/{content_id}", dependencies=[Depends(require_admin)])
|
|
async def delete_content(
|
|
content_id: int,
|
|
db: Session = Depends(get_db)
|
|
):
|
|
"""Delete VOD content (Admin only)"""
|
|
try:
|
|
content = db.query(VODContent).filter(VODContent.id == content_id).first()
|
|
if not content:
|
|
raise HTTPException(status_code=404, detail="Content not found")
|
|
|
|
# Delete related data
|
|
db.query(VODContentGenre).filter(VODContentGenre.content_id == content_id).delete()
|
|
db.query(VODCast).filter(VODCast.content_id == content_id).delete()
|
|
db.query(VODSubtitle).filter(VODSubtitle.content_id == content_id).delete()
|
|
db.query(VODUserRating).filter(VODUserRating.content_id == content_id).delete()
|
|
db.query(VODUserWishlist).filter(VODUserWishlist.content_id == content_id).delete()
|
|
db.query(VODUserWatchHistory).filter(VODUserWatchHistory.content_id == content_id).delete()
|
|
|
|
db.delete(content)
|
|
db.commit()
|
|
|
|
# Clear caches
|
|
cache_client = get_redis_client()
|
|
if cache_client:
|
|
try:
|
|
for key in cache_client.scan_iter("vod:*"):
|
|
cache_client.delete(key)
|
|
except Exception:
|
|
pass
|
|
|
|
return {"status": "deleted"}
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
db.rollback()
|
|
logger.error(f"Error deleting content: {e}")
|
|
raise HTTPException(status_code=500, detail="Failed to delete content")
|
|
|
|
# Rental System Endpoints with enhanced error handling
|
|
@router.post("/rent")
|
|
async def rent_content(
|
|
rental_request: RentalRequest,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_user)
|
|
):
|
|
"""Rent VOD content with comprehensive error handling"""
|
|
try:
|
|
# Validate content exists
|
|
content = db.query(VODContent).filter(
|
|
VODContent.id == rental_request.content_id
|
|
).first()
|
|
|
|
if not content:
|
|
raise HTTPException(status_code=404, detail="Content not found")
|
|
|
|
if content.status != ContentStatus.PUBLISHED:
|
|
raise HTTPException(status_code=400, detail="Content is not available")
|
|
|
|
rental_system = RentalSystem(db)
|
|
rental = rental_system.rent_content(
|
|
current_user,
|
|
rental_request.content_id,
|
|
rental_request.payment_method
|
|
)
|
|
|
|
return {
|
|
"rental_id": rental.id,
|
|
"content_id": rental.content_id,
|
|
"price": rental.rental_price,
|
|
"currency": rental.currency,
|
|
"expires_at": rental.expires_at.isoformat(),
|
|
"payment_status": rental.payment_status.value if rental.payment_status else None,
|
|
"transaction_id": rental.transaction_id
|
|
}
|
|
|
|
except InsufficientCreditsError as e:
|
|
raise HTTPException(status_code=402, detail=str(e))
|
|
except AlreadyRentedError as e:
|
|
raise HTTPException(status_code=400, detail=str(e))
|
|
except ContentNotAvailableError as e:
|
|
raise HTTPException(status_code=400, detail=str(e))
|
|
except RentalSystemError as e:
|
|
logger.error(f"Rental system error: {e}")
|
|
raise HTTPException(status_code=500, detail="Failed to process rental")
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Unexpected error during rental: {e}")
|
|
raise HTTPException(status_code=500, detail="Failed to process rental")
|
|
|
|
# Watch Progress Tracking
|
|
@router.post("/watch-progress")
|
|
async def update_watch_progress(
|
|
progress: WatchProgressUpdate,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_user)
|
|
):
|
|
"""Update user's watch progress"""
|
|
try:
|
|
# Verify access
|
|
content = db.query(VODContent).filter(
|
|
VODContent.id == progress.content_id
|
|
).first()
|
|
|
|
if not content:
|
|
raise HTTPException(status_code=404, detail="Content not found")
|
|
|
|
rental_system = RentalSystem(db)
|
|
access_info = rental_system.check_user_access(current_user, content)
|
|
|
|
if not access_info["has_access"]:
|
|
raise HTTPException(status_code=403, detail="Access denied")
|
|
|
|
# Update or create watch history
|
|
watch_history = db.query(VODUserWatchHistory).filter(
|
|
VODUserWatchHistory.user_id == current_user.id,
|
|
VODUserWatchHistory.content_id == progress.content_id
|
|
).first()
|
|
|
|
completion_percentage = (progress.watch_time_seconds / progress.total_duration * 100) if progress.total_duration > 0 else 0
|
|
is_completed = completion_percentage >= 90 # Consider 90% as completed
|
|
|
|
if watch_history:
|
|
watch_history.watch_time_seconds = progress.watch_time_seconds
|
|
watch_history.completion_percentage = completion_percentage
|
|
watch_history.is_completed = is_completed
|
|
watch_history.last_watched_at = datetime.utcnow()
|
|
else:
|
|
watch_history = VODUserWatchHistory(
|
|
user_id=current_user.id,
|
|
content_id=progress.content_id,
|
|
watch_time_seconds=progress.watch_time_seconds,
|
|
completion_percentage=completion_percentage,
|
|
is_completed=is_completed,
|
|
last_watched_at=datetime.utcnow()
|
|
)
|
|
db.add(watch_history)
|
|
|
|
db.commit()
|
|
|
|
return {
|
|
"status": "updated",
|
|
"completion_percentage": completion_percentage,
|
|
"is_completed": is_completed
|
|
}
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error updating watch progress: {e}")
|
|
raise HTTPException(status_code=500, detail="Failed to update watch progress")
|
|
|
|
# Directory Management Endpoints
|
|
@router.get("/directories", dependencies=[Depends(require_admin)])
|
|
async def list_directories(
|
|
db: Session = Depends(get_db)
|
|
):
|
|
"""List VOD directories (Admin only)"""
|
|
try:
|
|
directory_service = VODDirectoryService(db)
|
|
directories = db.query(VODDirectory).all()
|
|
|
|
result = []
|
|
for directory in directories:
|
|
status = directory_service.get_directory_status(directory.id)
|
|
result.append(status['directory'])
|
|
|
|
return result
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error listing directories: {e}")
|
|
raise HTTPException(status_code=500, detail="Failed to list directories")
|
|
|
|
@router.post("/directories/scan/{directory_id}", dependencies=[Depends(require_admin)])
|
|
async def scan_directory(
|
|
directory_id: int,
|
|
background_tasks: BackgroundTasks,
|
|
force: bool = False,
|
|
deep_scan: bool = False,
|
|
db: Session = Depends(get_db)
|
|
):
|
|
"""Trigger directory scan (Admin only)"""
|
|
try:
|
|
directory_service = VODDirectoryService(db)
|
|
|
|
# Validate directory exists
|
|
directory = db.query(VODDirectory).filter(
|
|
VODDirectory.id == directory_id
|
|
).first()
|
|
|
|
if not directory:
|
|
raise HTTPException(status_code=404, detail="Directory not found")
|
|
|
|
# Start scan in background
|
|
background_tasks.add_task(
|
|
run_directory_scan,
|
|
directory_id,
|
|
force,
|
|
deep_scan,
|
|
db
|
|
)
|
|
|
|
return {
|
|
"status": "scan_started",
|
|
"directory_id": directory_id,
|
|
"directory_name": directory.name
|
|
}
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error starting directory scan: {e}")
|
|
raise HTTPException(status_code=500, detail="Failed to start directory scan")
|
|
|
|
def run_directory_scan(directory_id: int, force: bool, deep_scan: bool, db: Session):
|
|
"""Background task to scan directory"""
|
|
try:
|
|
directory_service = VODDirectoryService(db)
|
|
scan = directory_service.scan_directory(directory_id, force, deep_scan)
|
|
logger.info(f"Directory scan completed: {scan.id}")
|
|
except Exception as e:
|
|
logger.error(f"Directory scan failed: {e}")
|
|
|
|
# Streaming endpoint with HLS support
|
|
@router.get("/stream/{content_id}/playlist.m3u8")
|
|
async def get_hls_playlist(
|
|
content_id: int,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_user)
|
|
):
|
|
"""Get HLS playlist for content streaming"""
|
|
try:
|
|
content = db.query(VODContent).filter(VODContent.id == content_id).first()
|
|
if not content:
|
|
raise HTTPException(status_code=404, detail="Content not found")
|
|
|
|
# Check user access
|
|
rental_system = RentalSystem(db)
|
|
access_info = rental_system.check_user_access(current_user, content)
|
|
|
|
if not access_info["has_access"]:
|
|
raise HTTPException(status_code=403, detail="Access denied")
|
|
|
|
if not content.video_url:
|
|
raise HTTPException(status_code=404, detail="Video file not available")
|
|
|
|
# Update view count
|
|
content.view_count = (content.view_count or 0) + 1
|
|
db.commit()
|
|
|
|
# Generate HLS playlist
|
|
playlist = generate_hls_playlist(content, current_user)
|
|
|
|
return StreamingResponse(
|
|
io.StringIO(playlist),
|
|
media_type="application/x-mpegURL",
|
|
headers={
|
|
"Cache-Control": "no-cache",
|
|
"X-Content-Duration": str(content.duration_seconds or 0)
|
|
}
|
|
)
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error generating HLS playlist: {e}")
|
|
raise HTTPException(status_code=500, detail="Failed to generate streaming playlist")
|
|
|
|
def generate_hls_playlist(content: VODContent, user: User) -> str:
|
|
"""Generate HLS playlist for content"""
|
|
# This is a simplified example - actual implementation would use FFmpeg
|
|
# to transcode the video file to HLS format
|
|
playlist = """#EXTM3U
|
|
#EXT-X-VERSION:3
|
|
#EXT-X-TARGETDURATION:10
|
|
#EXT-X-MEDIA-SEQUENCE:0
|
|
#EXT-X-PLAYLIST-TYPE:VOD
|
|
"""
|
|
|
|
# Generate secure token for segments
|
|
token = hashlib.sha256(
|
|
f"{content.id}:{user.id}:{datetime.utcnow().isoformat()}".encode()
|
|
).hexdigest()
|
|
|
|
# Add segments (simplified - actual implementation would read from transcoded files)
|
|
segment_duration = 10 # seconds
|
|
total_duration = content.duration_seconds or 0
|
|
num_segments = (total_duration // segment_duration) + 1
|
|
|
|
for i in range(num_segments):
|
|
duration = min(segment_duration, total_duration - (i * segment_duration))
|
|
if duration > 0:
|
|
playlist += f"#EXTINF:{duration:.3f},\n"
|
|
playlist += f"/api/vod/stream/{content.id}/segment{i}.ts?token={token}\n"
|
|
|
|
playlist += "#EXT-X-ENDLIST\n"
|
|
return playlist
|
|
|
|
# Health check endpoint
|
|
@router.get("/health")
|
|
async def health_check():
|
|
"""VOD service health check"""
|
|
try:
|
|
# Check Redis
|
|
redis_status = "healthy"
|
|
cache_client = get_redis_client()
|
|
if cache_client:
|
|
try:
|
|
cache_client.ping()
|
|
except Exception:
|
|
redis_status = "unhealthy"
|
|
else:
|
|
redis_status = "unavailable"
|
|
|
|
return {
|
|
"status": "healthy",
|
|
"redis": redis_status,
|
|
"timestamp": datetime.utcnow().isoformat()
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Health check failed: {e}")
|
|
return {
|
|
"status": "unhealthy",
|
|
"error": str(e),
|
|
"timestamp": datetime.utcnow().isoformat()
|
|
} |