""" Base Service Layer This module provides base service classes for implementing business logic. All services should inherit from these base classes. """ from typing import Optional, Dict, Any, List, TypeVar, Generic from abc import ABC, abstractmethod import logging from datetime import datetime from contextlib import contextmanager from ..config.settings import Settings, get_settings from ..repositories.base import BaseRepository, RepositoryException logger = logging.getLogger(__name__) T = TypeVar('T') class ServiceException(Exception): """Base exception for service layer errors""" pass class ValidationError(ServiceException): """Raised when validation fails""" def __init__(self, message: str, errors: Optional[Dict[str, Any]] = None): super().__init__(message) self.errors = errors or {} class BusinessRuleViolation(ServiceException): """Raised when a business rule is violated""" pass class ResourceNotFoundError(ServiceException): """Raised when a requested resource is not found""" pass class ResourceConflictError(ServiceException): """Raised when there's a conflict with existing resources""" pass class BaseService(ABC): """ Base service class providing common functionality for all services. Services encapsulate business logic and coordinate between repositories, external services, and other components. """ def __init__(self, settings: Optional[Settings] = None): """ Initialize service. Args: settings: Application settings """ self.settings = settings or get_settings() self._logger = logging.getLogger(self.__class__.__name__) @property def logger(self) -> logging.Logger: """Get logger for this service""" return self._logger def log_info(self, message: str, **kwargs): """Log info message with context""" self._logger.info(message, extra=kwargs) def log_error(self, message: str, error: Optional[Exception] = None, **kwargs): """Log error message with context""" if error: self._logger.error(f"{message}: {str(error)}", exc_info=True, extra=kwargs) else: self._logger.error(message, extra=kwargs) def log_warning(self, message: str, **kwargs): """Log warning message with context""" self._logger.warning(message, extra=kwargs) def log_debug(self, message: str, **kwargs): """Log debug message with context""" self._logger.debug(message, extra=kwargs) @contextmanager def handle_errors(self, operation: str = "operation"): """ Context manager for handling service errors. Args: operation: Description of the operation being performed Usage: with self.handle_errors("creating user"): # Service logic here pass """ try: yield except ValidationError: raise except BusinessRuleViolation: raise except ResourceNotFoundError: raise except ResourceConflictError: raise except RepositoryException as e: self.log_error(f"Repository error during {operation}", e) raise ServiceException(f"Database error during {operation}: {str(e)}") except Exception as e: self.log_error(f"Unexpected error during {operation}", e) raise ServiceException(f"Unexpected error during {operation}: {str(e)}") def validate_required_fields(self, data: Dict[str, Any], required_fields: List[str]): """ Validate that required fields are present and not empty. Args: data: Data dictionary to validate required_fields: List of required field names Raises: ValidationError: If validation fails """ errors = {} for field in required_fields: if field not in data or data[field] is None: errors[field] = f"{field} is required" elif isinstance(data[field], str) and not data[field].strip(): errors[field] = f"{field} cannot be empty" if errors: raise ValidationError("Validation failed", errors) def validate_field_types(self, data: Dict[str, Any], field_types: Dict[str, type]): """ Validate field types. Args: data: Data dictionary to validate field_types: Dictionary mapping field names to expected types Raises: ValidationError: If validation fails """ errors = {} for field, expected_type in field_types.items(): if field in data and data[field] is not None: if not isinstance(data[field], expected_type): errors[field] = f"{field} must be of type {expected_type.__name__}" if errors: raise ValidationError("Type validation failed", errors) def validate_field_values(self, data: Dict[str, Any], field_validators: Dict[str, callable]): """ Validate field values using custom validators. Args: data: Data dictionary to validate field_validators: Dictionary mapping field names to validator functions Raises: ValidationError: If validation fails """ errors = {} for field, validator in field_validators.items(): if field in data and data[field] is not None: try: if not validator(data[field]): errors[field] = f"{field} validation failed" except Exception as e: errors[field] = str(e) if errors: raise ValidationError("Value validation failed", errors) def sanitize_input(self, data: Dict[str, Any]) -> Dict[str, Any]: """ Sanitize input data. Args: data: Input data to sanitize Returns: Sanitized data """ sanitized = {} for key, value in data.items(): if isinstance(value, str): # Strip whitespace value = value.strip() # Remove null bytes value = value.replace('\x00', '') sanitized[key] = value return sanitized def audit_log( self, action: str, entity_type: str, entity_id: Optional[Any] = None, user: Optional[str] = None, details: Optional[Dict[str, Any]] = None ): """ Create an audit log entry. Args: action: Action performed (e.g., "create", "update", "delete") entity_type: Type of entity affected entity_id: ID of the affected entity user: User who performed the action details: Additional details about the action """ log_entry = { "timestamp": datetime.utcnow().isoformat(), "action": action, "entity_type": entity_type, "entity_id": entity_id, "user": user, "details": details } self.log_info(f"Audit: {action} {entity_type}", **log_entry) class CRUDService(BaseService, Generic[T]): """ Base service class for CRUD operations. This class provides standard CRUD operations with business logic validation. """ def __init__( self, repository: BaseRepository[T], settings: Optional[Settings] = None ): """ Initialize CRUD service. Args: repository: Repository for data access settings: Application settings """ super().__init__(settings) self.repository = repository def create( self, data: Dict[str, Any], user: Optional[str] = None, validate: bool = True ) -> T: """ Create a new entity. Args: data: Entity data user: User performing the action validate: Whether to validate data Returns: Created entity Raises: ValidationError: If validation fails ServiceException: If creation fails """ with self.handle_errors("creating entity"): if validate: self.validate_create(data) # Sanitize input data = self.sanitize_input(data) # Create entity entity = self._create_entity_from_data(data) entity = self.repository.create(entity) # Audit log self.audit_log( action="create", entity_type=entity.__class__.__name__, entity_id=entity.id, user=user, details={"data": data} ) return entity def update( self, id: Any, data: Dict[str, Any], user: Optional[str] = None, validate: bool = True, partial: bool = True ) -> T: """ Update an existing entity. Args: id: Entity ID data: Update data user: User performing the action validate: Whether to validate data partial: Whether this is a partial update Returns: Updated entity Raises: ResourceNotFoundError: If entity not found ValidationError: If validation fails ServiceException: If update fails """ with self.handle_errors("updating entity"): # Get existing entity entity = self.repository.get(id) if not entity: raise ResourceNotFoundError(f"Entity with id {id} not found") if validate: self.validate_update(data, entity, partial) # Sanitize input data = self.sanitize_input(data) # Update entity self._update_entity_from_data(entity, data) entity = self.repository.update(entity) # Audit log self.audit_log( action="update", entity_type=entity.__class__.__name__, entity_id=entity.id, user=user, details={"data": data} ) return entity def delete( self, id: Any, user: Optional[str] = None, soft: bool = True ) -> bool: """ Delete an entity. Args: id: Entity ID user: User performing the action soft: Whether to perform soft delete Returns: True if deleted successfully Raises: ResourceNotFoundError: If entity not found ServiceException: If deletion fails """ with self.handle_errors("deleting entity"): # Get existing entity entity = self.repository.get(id) if not entity: raise ResourceNotFoundError(f"Entity with id {id} not found") # Validate deletion self.validate_delete(entity) # Delete entity if soft and hasattr(entity, 'soft_delete'): entity.soft_delete() self.repository.update(entity) action = "soft_delete" else: self.repository.delete(entity) action = "delete" # Audit log self.audit_log( action=action, entity_type=entity.__class__.__name__, entity_id=id, user=user ) return True def get(self, id: Any) -> Optional[T]: """ Get an entity by ID. Args: id: Entity ID Returns: Entity or None if not found """ with self.handle_errors("getting entity"): return self.repository.get(id) def get_or_404(self, id: Any) -> T: """ Get an entity by ID or raise error. Args: id: Entity ID Returns: Entity Raises: ResourceNotFoundError: If entity not found """ entity = self.get(id) if not entity: raise ResourceNotFoundError(f"Entity with id {id} not found") return entity def list( self, filters: Optional[Dict[str, Any]] = None, page: int = 1, page_size: int = 20, order_by: Optional[str] = None, order_desc: bool = False ) -> Dict[str, Any]: """ List entities with pagination. Args: filters: Filter criteria page: Page number (1-based) page_size: Number of items per page order_by: Field to order by order_desc: Whether to order descending Returns: Dictionary with items and pagination info """ with self.handle_errors("listing entities"): # Calculate offset offset = (page - 1) * page_size # Get total count total = self.repository.count(**(filters or {})) # Get items items = self.repository.find_all( filters=filters, limit=page_size, offset=offset, order_by=order_by, order_desc=order_desc ) # Calculate pagination info total_pages = (total + page_size - 1) // page_size return { "items": items, "pagination": { "page": page, "page_size": page_size, "total": total, "total_pages": total_pages, "has_next": page < total_pages, "has_prev": page > 1 } } def search( self, query: str, fields: List[str], filters: Optional[Dict[str, Any]] = None, page: int = 1, page_size: int = 20 ) -> Dict[str, Any]: """ Search entities. Args: query: Search query fields: Fields to search in filters: Additional filters page: Page number page_size: Number of items per page Returns: Search results with pagination """ # This is a basic implementation - override in specific services # for more sophisticated search functionality combined_filters = filters or {} # Add search to filters (basic implementation) if query and fields: # This would need to be implemented based on your database # For now, we'll just use the first field combined_filters[fields[0]] = {"ilike": query} return self.list( filters=combined_filters, page=page, page_size=page_size ) # Abstract methods to be implemented by subclasses @abstractmethod def validate_create(self, data: Dict[str, Any]): """Validate data for entity creation""" pass @abstractmethod def validate_update(self, data: Dict[str, Any], entity: T, partial: bool = True): """Validate data for entity update""" pass def validate_delete(self, entity: T): """Validate entity deletion - override if needed""" pass @abstractmethod def _create_entity_from_data(self, data: Dict[str, Any]) -> T: """Create entity instance from data""" pass @abstractmethod def _update_entity_from_data(self, entity: T, data: Dict[str, Any]): """Update entity instance from data""" pass