diff --git a/scripts/manage_users.py b/scripts/manage_users.py new file mode 100755 index 0000000..5d7897f --- /dev/null +++ b/scripts/manage_users.py @@ -0,0 +1,321 @@ +#!/usr/bin/env python3 +""" +Script to manage users for the ISP Wiremap application. +Supports password reset, user deletion, and user listing. + +Usage: + python scripts/manage_users.py list + python scripts/manage_users.py reset-password + python scripts/manage_users.py delete + python scripts/manage_users.py toggle-admin +""" +import sys +import argparse +from pathlib import Path +from typing import Optional + +# Add parent directory to path to import app modules +sys.path.append(str(Path(__file__).resolve().parents[1])) + +from app.database import SessionLocal +from app.models.user import User +from app.utils.password import hash_password +import getpass +from sqlalchemy import or_ +from uuid import UUID + + +def get_user(db, identifier: str) -> Optional[User]: + """ + Find a user by username, email, or UUID. + + Args: + db: Database session + identifier: Username, email, or user UUID + + Returns: + User object if found, None otherwise + """ + # Try to parse as UUID first + try: + user_uuid = UUID(identifier) + user = db.query(User).filter(User.id == user_uuid).first() + if user: + return user + except (ValueError, AttributeError): + pass + + # Search by username or email + user = db.query(User).filter( + or_(User.username == identifier, User.email == identifier) + ).first() + + return user + + +def list_users(): + """List all users in the system.""" + print("=" * 80) + print("ISP Wiremap - User List") + print("=" * 80) + print() + + db = SessionLocal() + try: + users = db.query(User).order_by(User.created_at.desc()).all() + + if not users: + print("No users found in the system.") + return + + print(f"Total users: {len(users)}") + print() + print(f"{'Username':<20} {'Email':<30} {'Admin':<8} {'Created':<20} {'User ID'}") + print("-" * 80) + + for user in users: + created_at = user.created_at.strftime("%Y-%m-%d %H:%M:%S") if user.created_at else "N/A" + admin_status = "Yes" if user.is_admin else "No" + print(f"{user.username:<20} {user.email:<30} {admin_status:<8} {created_at:<20} {user.id}") + + print() + except Exception as e: + print(f"Error listing users: {e}") + finally: + db.close() + + +def reset_password(identifier: str): + """ + Reset a user's password. + + Args: + identifier: Username, email, or user UUID + """ + print("=" * 80) + print("ISP Wiremap - Reset User Password") + print("=" * 80) + print() + + db = SessionLocal() + try: + user = get_user(db, identifier) + + if not user: + print(f"Error: User '{identifier}' not found") + print("You can search by username, email, or user ID") + return + + # Display user info + print(f"Found user:") + print(f" Username: {user.username}") + print(f" Email: {user.email}") + print(f" User ID: {user.id}") + print(f" Admin: {'Yes' if user.is_admin else 'No'}") + print() + + # Confirm action + confirm = input("Do you want to reset this user's password? (yes/no): ").strip().lower() + if confirm not in ['yes', 'y']: + print("Password reset cancelled.") + return + + # Get new password + password = getpass.getpass("Enter new password: ") + if not password: + print("Error: Password cannot be empty") + return + + password_confirm = getpass.getpass("Confirm new password: ") + if password != password_confirm: + print("Error: Passwords do not match") + return + + # Update password + user.password_hash = hash_password(password) + db.commit() + + print() + print("=" * 80) + print("Password reset successfully!") + print(f"Username: {user.username}") + print(f"User ID: {user.id}") + print("=" * 80) + + except Exception as e: + db.rollback() + print(f"Error resetting password: {e}") + finally: + db.close() + + +def delete_user(identifier: str): + """ + Delete a user from the system. + + Args: + identifier: Username, email, or user UUID + """ + print("=" * 80) + print("ISP Wiremap - Delete User") + print("=" * 80) + print() + + db = SessionLocal() + try: + user = get_user(db, identifier) + + if not user: + print(f"Error: User '{identifier}' not found") + print("You can search by username, email, or user ID") + return + + # Display user info + print(f"Found user:") + print(f" Username: {user.username}") + print(f" Email: {user.email}") + print(f" User ID: {user.id}") + print(f" Admin: {'Yes' if user.is_admin else 'No'}") + print() + + # Confirm action + print("WARNING: This action cannot be undone!") + print("All maps, items, and shares owned by this user will also be affected.") + confirm = input("Type the username exactly to confirm deletion: ").strip() + + if confirm != user.username: + print("Username does not match. Deletion cancelled.") + return + + # Delete user + username = user.username + user_id = user.id + db.delete(user) + db.commit() + + print() + print("=" * 80) + print("User deleted successfully!") + print(f"Deleted username: {username}") + print(f"Deleted user ID: {user_id}") + print("=" * 80) + + except Exception as e: + db.rollback() + print(f"Error deleting user: {e}") + print("Note: The user may have related records that prevent deletion.") + print("You may need to manually handle foreign key constraints.") + finally: + db.close() + + +def toggle_admin(identifier: str): + """ + Toggle admin status for a user. + + Args: + identifier: Username, email, or user UUID + """ + print("=" * 80) + print("ISP Wiremap - Toggle Admin Status") + print("=" * 80) + print() + + db = SessionLocal() + try: + user = get_user(db, identifier) + + if not user: + print(f"Error: User '{identifier}' not found") + print("You can search by username, email, or user ID") + return + + # Display user info + current_status = "Yes" if user.is_admin else "No" + new_status = "No" if user.is_admin else "Yes" + + print(f"Found user:") + print(f" Username: {user.username}") + print(f" Email: {user.email}") + print(f" User ID: {user.id}") + print(f" Current admin status: {current_status}") + print(f" New admin status will be: {new_status}") + print() + + # Confirm action + confirm = input(f"Toggle admin status to '{new_status}'? (yes/no): ").strip().lower() + if confirm not in ['yes', 'y']: + print("Operation cancelled.") + return + + # Toggle admin status + user.is_admin = not user.is_admin + db.commit() + + print() + print("=" * 80) + print("Admin status updated successfully!") + print(f"Username: {user.username}") + print(f"Admin status: {'Yes' if user.is_admin else 'No'}") + print("=" * 80) + + except Exception as e: + db.rollback() + print(f"Error toggling admin status: {e}") + finally: + db.close() + + +def main(): + """Main entry point for the script.""" + parser = argparse.ArgumentParser( + description="Manage users in the ISP Wiremap application", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Examples: + %(prog)s list + %(prog)s reset-password john_doe + %(prog)s reset-password john@example.com + %(prog)s reset-password 3d2870bc-7dee-476a-a9d0-432a4c9519e9 + %(prog)s delete john_doe + %(prog)s toggle-admin john_doe + """ + ) + + subparsers = parser.add_subparsers(dest='command', help='Command to execute') + + # List command + subparsers.add_parser('list', help='List all users') + + # Reset password command + reset_parser = subparsers.add_parser('reset-password', help='Reset a user\'s password') + reset_parser.add_argument('identifier', help='Username, email, or user ID') + + # Delete command + delete_parser = subparsers.add_parser('delete', help='Delete a user') + delete_parser.add_argument('identifier', help='Username, email, or user ID') + + # Toggle admin command + admin_parser = subparsers.add_parser('toggle-admin', help='Toggle admin status for a user') + admin_parser.add_argument('identifier', help='Username, email, or user ID') + + args = parser.parse_args() + + if not args.command: + parser.print_help() + return + + if args.command == 'list': + list_users() + elif args.command == 'reset-password': + reset_password(args.identifier) + elif args.command == 'delete': + delete_user(args.identifier) + elif args.command == 'toggle-admin': + toggle_admin(args.identifier) + else: + parser.print_help() + + +if __name__ == "__main__": + main()