import ast import os import re from typing import List, Dict, Any class SensitiveLogDetector(ast.NodeVisitor): """ Detects logger.info() statements that might log sensitive request/response data. """ def __init__(self): self.violations = [] self.current_file = None def set_file(self, file_path: str): """Set the current file being analyzed""" self.current_file = file_path def visit_Call(self, node): """Visit function calls to detect logger.info() with sensitive data""" if self._is_logger_info_call(node): # Check all arguments to the logger.info() call for arg in node.args: if self._contains_sensitive_data(arg): violation = { "file": self.current_file, "line": node.lineno, "call": self._get_call_string(node), "reason": self._get_violation_reason(arg), "arg": self._get_arg_string(arg), } self.violations.append(violation) self.generic_visit(node) def _is_logger_info_call(self, node) -> bool: """Check if this is a logger.info() call""" if not isinstance(node.func, ast.Attribute): return False # Check for various logger patterns: # logger.info(), verbose_logger.info(), verbose_proxy_logger.info(), etc. if node.func.attr == "info": if isinstance(node.func.value, ast.Name): logger_name = node.func.value.id return any( pattern in logger_name.lower() for pattern in ["logger", "log"] ) return False def _contains_sensitive_data(self, arg) -> bool: """Check if the argument might contain sensitive data""" # Convert argument to string for analysis arg_str = self._get_arg_string(arg).lower() # Skip obvious non-sensitive patterns non_sensitive_patterns = [ r'^["\'][\w\s\-_:.,!?]*["\']$', # Simple static strings r'^["\'][^{%]*["\']$', # Strings without format placeholders ] # Skip common safe phrases that contain sensitive keywords safe_phrases = [ r"request\s+(completed|finished|started|processing)", r"response\s+(sent|received|processed)", r"data\s+(inserted|updated|deleted|saved)\s+into", r"(successfully|failed)\s+(request|response)", r"(starting|ending|completed)\s+(request|response)", r"no\s+(usage\s+)?data\s+found", r"found\s+\d+.*records", r"exported\s+\d+.*records", ] for pattern in non_sensitive_patterns: if re.search(pattern, arg_str): # Check if it's a safe phrase first for safe_pattern in safe_phrases: if re.search(safe_pattern, arg_str, re.IGNORECASE): return False # Then check if the static string mentions sensitive keywords if not any( keyword in arg_str for keyword in [ "request", "response", "data", "body", "payload", "token", "auth", "credential", ] ): return False # Direct variable/attribute patterns that are likely sensitive sensitive_patterns = [ r"\brequest\b(?!\s*(id|status|method))", # request but not request_id, request_status, request_method r"\bresponse\b(?!\s*(status|code|time))", # response but not response_status, response_code r"\bdata\b(?=[\.\[\s]|$)", # data followed by . [ space or end r"\bbody\b(?=[\.\[\s]|$)", r"\bpayload\b(?=[\.\[\s]|$)", r"\bmessages?\b(?=[\.\[\s]|$)", r"\bcontent\b(?=[\.\[\s]|$)", r"\binput\b(?=[\.\[\s]|$)", r"\boutput\b(?=[\.\[\s]|$)", r"\bargs\b(?=[\.\[\s]|$)", r"\bkwargs\b(?=[\.\[\s]|$)", r"\bparams\b(?=[\.\[\s]|$)", r"\bheaders\b(?=[\.\[\s]|$)", r"\bapi_key\b", r"\btoken\b(?!\s*(name|id))", # token but not token_name, token_id r"\bauth\b(?=[\.\[\s]|$)", r"\bcredentials?\b", ] # Check for direct variable references with context for pattern in sensitive_patterns: if re.search(pattern, arg_str): return True # Check for format strings that might interpolate sensitive data if self._is_format_string_with_sensitive_data(arg): return True # Check for JSON dumps or string formatting of objects if self._is_object_serialization(arg): return True return False def _is_format_string_with_sensitive_data(self, arg) -> bool: """Check if this is a format string that might contain sensitive data""" # Check for f-strings if isinstance(arg, ast.JoinedStr): for value in arg.values: if isinstance(value, ast.FormattedValue): value_str = self._get_arg_string(value.value).lower() # Check for any sensitive data patterns in f-string interpolations sensitive_f_string_patterns = [ "request", "response", "data", "body", "content", "messages", "token", "jwt", "auth", "api_key", "apikey", "credential", "secret", "password", "passwd", ] if any( pattern in value_str for pattern in sensitive_f_string_patterns ): return True # Check for .format() calls if isinstance(arg, ast.Call) and isinstance(arg.func, ast.Attribute): if arg.func.attr == "format": # Check the base string for suspicious patterns base_str = self._get_arg_string(arg.func.value).lower() if "{}" in base_str or "{" in base_str: # Check format arguments for sensitive data sensitive_format_patterns = [ "request", "response", "data", "body", "content", "token", "jwt", "auth", "api_key", "apikey", "credential", "secret", "password", "passwd", ] for format_arg in arg.args: format_str = self._get_arg_string(format_arg).lower() if any( pattern in format_str for pattern in sensitive_format_patterns ): return True return False def _is_object_serialization(self, arg) -> bool: """Check if this is serializing an object that might contain sensitive data""" arg_str = self._get_arg_string(arg) # Check for json.dumps() calls if isinstance(arg, ast.Call): if ( isinstance(arg.func, ast.Attribute) and arg.func.attr == "dumps" and isinstance(arg.func.value, ast.Name) and arg.func.value.id == "json" ): return True # Check for str() calls on potentially sensitive objects if ( isinstance(arg.func, ast.Name) and arg.func.id == "str" and len(arg.args) > 0 ): obj_str = self._get_arg_string(arg.args[0]).lower() if any( pattern in obj_str for pattern in ["request", "response", "data", "body"] ): return True return False def _get_violation_reason(self, arg) -> str: """Get a human-readable reason for the violation""" arg_str = self._get_arg_string(arg).lower() if any( pattern in arg_str for pattern in [ "jwt", "token", "api_key", "apikey", "auth", "credential", "secret", "password", "passwd", ] ): return "Potentially logging authentication/secret data (JWT, token, API key, etc.)" elif "request" in arg_str: return "Potentially logging request data" elif "response" in arg_str: return "Potentially logging response data" elif any( pattern in arg_str for pattern in ["data", "body", "payload", "content"] ): return "Potentially logging sensitive data/body/content" elif any(pattern in arg_str for pattern in ["messages", "input", "output"]): return "Potentially logging message/input/output data" else: return "Potentially logging sensitive data" def _get_call_string(self, node) -> str: """Get string representation of the function call""" try: if hasattr(ast, "unparse"): return ast.unparse(node) else: # Fallback for older Python versions return f"{self._get_arg_string(node.func)}(...)" except: return "logger.info(...)" def _get_arg_string(self, arg) -> str: """Get string representation of an argument""" try: if hasattr(ast, "unparse"): return ast.unparse(arg) else: # Fallback for older Python versions if isinstance(arg, ast.Name): return arg.id elif isinstance(arg, ast.Attribute): return f"{self._get_arg_string(arg.value)}.{arg.attr}" elif isinstance(arg, ast.Str): return repr(arg.s) elif isinstance(arg, ast.Constant): return repr(arg.value) else: return str(type(arg).__name__) except: return "unknown" def check_sensitive_logging(base_dir: str) -> List[Dict[str, Any]]: """ Check for logger.info() statements that might log sensitive data. Args: base_dir: Base directory to scan (typically the litellm root) Returns: List of violations found """ detector = SensitiveLogDetector() all_violations = [] # Directories to scan - only main litellm codebase scan_dirs = ["litellm", "enterprise"] # Include enterprise directory if it exists # Directories to exclude (third-party code, venvs, etc.) exclude_dirs = { "venv", "venv313", ".venv", "env", ".env", "node_modules", "__pycache__", ".git", "build", "dist", ".tox", "clean_env", "litellm_env", "myenv", "py313_env", "venv_sip_bypass", "mypyc_env", } for scan_dir in scan_dirs: dir_path = os.path.join(base_dir, scan_dir) if not os.path.exists(dir_path): print(f"Warning: Directory {dir_path} does not exist, skipping.") continue print(f"Scanning directory: {dir_path}") for root, dirs, files in os.walk(dir_path): # Skip excluded directories dirs[:] = [d for d in dirs if d not in exclude_dirs] # Skip if we're in a virtual environment or third-party directory relative_root = os.path.relpath(root, base_dir) if any( excluded in relative_root.split(os.sep) for excluded in exclude_dirs ): continue for file in files: if file.endswith(".py"): file_path = os.path.join(root, file) relative_path = os.path.relpath(file_path, base_dir) # Skip files that are clearly third-party or generated if any(excluded in relative_path for excluded in exclude_dirs): continue try: with open(file_path, "r", encoding="utf-8") as f: content = f.read() tree = ast.parse(content) detector.set_file(relative_path) detector.visit(tree) except SyntaxError as e: print(f"Warning: Syntax error in file {relative_path}: {e}") continue except UnicodeDecodeError as e: print( f"Warning: Unicode decode error in file {relative_path}: {e}" ) continue except Exception as e: print(f"Warning: Error processing file {relative_path}: {e}") continue return detector.violations def main(): """Main function to run the sensitive logging check""" # Get the base directory (assume we're running from tests/code_coverage_tests/) ################### # Running locally ################### # current_dir = os.path.dirname(os.path.abspath(__file__)) # base_dir = os.path.join(current_dir, "..", "..") # base_dir = os.path.abspath(base_dir) ################### # Running in CI/CD ################### base_dir = "./litellm" # Adjust this path as needed print(f"Checking for sensitive logging in: {base_dir}") violations = check_sensitive_logging(base_dir) if violations: print(f"\n❌ Found {len(violations)} potential violations:") print("=" * 80) for i, violation in enumerate(violations, 1): print(f"\n{i}. {violation['file']}:{violation['line']}") print(f" Reason: {violation['reason']}") print(f" Call: {violation['call']}") print(f" Argument: {violation['arg']}") print("\n" + "=" * 80) print("⚠️ SECURITY WARNING:") print("These logger.info() statements may log sensitive request/response data.") print("Consider changing them to logger.debug() or removing sensitive data.") print("This is critical for PII compliance and security.") print( "Please contact @ishaan-jaff for more details about this check. DO NOT VIOLATE THIS CHECK." ) return 1 # Exit with error code else: print("\n✅ No sensitive logging violations found!") return 0 if __name__ == "__main__": exit(main())