8000 PEP 541 Request: `pydantic2` and `pydantic3` · Issue #6382 · pypi/support · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

PEP 541 Request: pydantic2 and pydantic3 #6382

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
1 task done
samuelcolvin opened this issue May 19, 2025 · 3 comments
Open
1 task done

PEP 541 Request: pydantic2 and pydantic3 #6382

samuelcolvin opened this issue May 19, 2025 · 3 comments
Labels
PEP 541 Package name support requests

Comments

@samuelcolvin
Copy link

Project to be claimed

pydantic2: https://pypi.org/project/pydantic2

Your PyPI username

markolofsen: https://pypi.org/user/markolofsen/

Reasons for the request

This package is AI generate garbage, it's not an honest attempt to develop an open source library.

None of the code could run as expected due to the repo structure and that no dependencies are included.

The code looks very dangerous and potentially malicous, e.g. it tries to auto-update itself, run databases and delete them, see below.

The name is extremely confusing at best since the library has nothing to do with the widely used pydantic package, which is currently on V2.

At worst this is deliberate typo squatting.

The user has done the same with numerous other packages:

auto-update code
import requests
import semver
import time
import json
from datetime import datetime, timedelta
from pathlib import Path

from ...__pack__ import __version__, __name__
from ..logger import logger


class VersionControl:
    """Class to manage version control and caching for the library."""

    CACHE_DURATION = timedelta(days=1)

    def __init__(self):
        self.current_version = __version__
        self.package_name = __name__
        # Store cache in the same directory as the module
        module_dir = Path(__file__).parent
        self.cache_file = module_dir / "cache.json"
        self._load_initial_cache()

    def _load_initial_cache(self):
        """Load initial cache values."""
        self.cached_version, self.cache_time = self._load_cache()

    def _fetch_latest_version(self) -> str:
        """Fetch the latest version from PyPI."""
        url = f"https://pypi.org/pypi/{self.package_name}/json"
        response = requests.get(url)
        if response.status_code == 200:
            data = response.json()
            return data['info']['version']
        return "0.0.0"

    def _load_cache(self) -> tuple[str, datetime]:
        """Load the cached version and timestamp from JSON."""
        if self.cache_file.exists():
            try:
                with open(self.cache_file, 'r') as f:
                    cache_data = json.load(f)
                    # Verify cache data is valid
                    if (not isinstance(cache_data, dict) or
                        'version' not in cache_data or
                            'timestamp' not in cache_data):
                        return "0.0.0", datetime.min
                    return (
                        cache_data['version'],
                        datetime.fromtimestamp(cache_data['timestamp'])
                    )
            except (json.JSONDecodeError, KeyError, ValueError):
                # If cache file is corrupted or invalid, return default values
                return "0.0.0", datetime.min
        return "0.0.0", datetime.min

    def _save_cache(self, version: str):
        """Save the version and current timestamp to JSON cache."""
        cache_data = {
            'version': version,
            'timestamp': time.time(),
            'package': self.package_name,
            'last_checked': datetime.now().isoformat()
        }
        with open(self.cache_file, 'w') as f:
            json.dump(cache_data, f, indent=2)
        # Update instance variables after saving
        self.cached_version = version
        self.cache_time = datetime.fromtimestamp(cache_data['timestamp'])

    def check_for_update(self):
        """Check if there is a newer version available."""
        current_time = datetime.now()
        cache_age = current_time - self.cache_time

        if cache_age > self.CACHE_DURATION:
            latest_version = self._fetch_latest_version()
            self._save_cache(latest_version)
            logger.debug(f"[DEBUG] Fetched latest version: {latest_version}")
        else:
            latest_version = self.cached_version
            logger.debug(f"[DEBUG] Using cached version: {latest_version}")

        logger.debug(f"[DEBUG] Current version: {self.current_version}")

        if semver.compare(latest_version, self.current_version) > 0:
            logger.warning(
                f"🚀 A new version {latest_version} is available! "
                f"You are using {self.current_version}. Consider updating."
            )
        else:
            logger.debug(
                f"✅ You are using the latest version: {self.current_version}."
            )
code for running databases, deleting databases and killing processes
import subprocess
import os
from pathlib import Path
import click
import socket
import signal
import sys
import time
import colorlog
import logging
import psutil
import questionary
import webbrowser
from typing import Optional, List, Tuple, Dict

# Configure colored logging
handler = colorlog.StreamHandler()
handler.setFormatter(colorlog.ColoredFormatter(
    '%(log_color)s%(asctime)s %(reset)s %(message)s',
    datefmt='%H:%M:%S',
    reset=True,
    log_colors={
        'DEBUG':    'cyan',
        'INFO':     'green',
        'WARNING':  'yellow',
        'ERROR':    'red',
        'CRITICAL': 'red,bg_white',
    }
))

logger = logging.getLogger('datasette_viewer')
logger.addHandler(handler)
logger.setLevel(logging.INFO)

DB_DIR = Path(__file__).parent.parent / "db"
MODELS_DB = DB_DIR / "models.db"
USAGE_DB = DB_DIR / "usage.db"
SESSIONS_DB = DB_DIR / "sessions.db"

# Global list to track running processes
running_processes: List[subprocess.Popen] = []

# Database configurations
DB_CONFIGS: Dict[str, Dict] = {
    'Models Database': {'path': MODELS_DB, 'port': 8881},
    'Usage Database': {'path': USAGE_DB, 'port': 8882},
    'Sessions Database': {'path': SESSIONS_DB, 'port': 8883},
    'Delete Database': {'action': 'delete_db'},
    'Kill Process on Port': {'action': 'kill_port'},
    'Exit': {'action': 'exit'}
}


def delete_database(db_path: Path) -> bool:
    """
    Safely delete a database file.

    Args:
        db_path: Path to the database file

    Returns:
        bool: True if database was deleted, False otherwise
    """
    try:
        if not db_path.exists():
            logger.warning(f"Database {db_path} does not exist")
            return False

        # Delete the database file
        os.remove(db_path)
        logger.info(f"Successfully deleted database: {db_path}")
        return True

    except Exception as e:
        logger.error(f"Error deleting database {db_path}: {e}")
        return False


def kill_port(port: int) -> bool:
    """
    Kill process running on specified port.

    Args:
        port: Port number to kill

    Returns:
        bool: True if process was killed, False otherwise
    """
    try:
        # Use lsof to find process using the port
        cmd = f"lsof -ti tcp:{port}"
        process = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        output, error = process.communicate()

        if output:
            pid = int(output.decode().strip())
            try:
                proc = psutil.Process(pid)
                proc.terminate()
                proc.wait(timeout=3)
                logger.info(f"Successfully killed process {pid} on port {port}")
                return True
            except psutil.NoSuchProcess:
                logger.warning(f"Process {pid} on port {port} already terminated")
                return True
            except psutil.TimeoutExpired:
                proc.kill()
                logger.info(f"Force killed process {pid} on port {port}")
                return True
        else:
            logger.warning(f"No process found on port {port}")
            return False

    except Exception as e:
        logger.error(f"Error killing port {port}: {e}")
        return False


def cleanup_processes():
    """Terminate all running processes."""
    for process in running_processes:
        try:
            process.terminate()
            process.wait(timeout=5)
        except subprocess.TimeoutExpired:
            process.kill()
        except Exception as e:
            logger.error(f"Error while terminating process: {e}")
    running_processes.clear()


def signal_handler(signum, frame):
    """Handle termination signals."""
    print("\n🛑 Shutting down database viewers...")
    cleanup_processes()
    sys.exit(0)


# Register signal handlers
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)


def format_url(port: int) -> str:
    """Format URL with ANSI colors for terminal."""
    return f"\033[1;94mhttp://localhost:{port}\033[0m"


def start_datasette(
    db_path: str,
    start_port: int,
    max_attempts: int = 10
) -> Tuple[Optional[subprocess.Popen], Optional[int]]:
    """Try to start datasette on an available port with retries."""
    for port in range(start_port, start_port + max_attempts):
        # Try to kill any existing process on this port
        kill_port(port)
        time.sleep(1)  # Give the system time to free the port

        try:
            process = subprocess.Popen(
                [
                    "datasette", "serve", db_path,
                    "--port", str(port),
                    "--cors",
                    "--setting", "truncate_cells_html", "0"
                ],
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE
            )
            # Give the process a moment to start and bind to the port
            time.sleep(1)

            # Check if process is still running
            if process.poll() is None:
                # Try to connect to verify the server is up
                with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
                    try:
                        s.connect(('localhost', port))
                        return process, port
                    except socket.error:
                        process.terminate()
                        continue
            else:
                # Process failed to start, clean up and try next port
                _, stderr = process.communicate()
                if b"address already in use" not in stderr:
                    print(f"Failed to start datasette: {stderr.decode()}")
                process.terminate()
        except Exception as e:
            print(f"Error starting datasette on port {port}: {e}")

    return None, None


def launch_viewer(db_path: str, db_name: str, start_port: int) -> bool:
    """Launch a database viewer and return success status."""
    print(f"🚀 Starting {db_name} database viewer...")
    process, port = start_datasette(db_path, start_port)
    if not process or not port:
        print(f"Failed to start {db_name} database viewer")
        return False

    running_processes.append(process)
    url = f"http://localhost:{port}"
    print(f"✓ {db_name} DB viewer: {format_url(port)}")

    # Open browser after a short delay to ensure server is ready
    time.sleep(1)
    try:
        webbrowser.open(url)
    except Exception as e:
        print(f"Note: Could not open browser automatically: {e}")
        print(f"Please open {url} manually in your browser")

    return True


def show_interactive_menu():
    """Show interactive menu for database selection."""
    while True:
        choice = questionary.select(
            "Select an action:",
            choices=list(DB_CONFIGS.keys())
        ).ask()

        if not choice:
            break

        config = DB_CONFIGS[choice]

        if 'action' in config:
            if config['action'] == 'kill_port':
                port = questionary.text("Enter port number to kill:").ask()
                if port:
                    try:
                        port = int(port)
                        if kill_port(port):
                            print(f"✓ Successfully killed process on port {port}")
                        else:
                            print(f"✗ No process found on port {port}")
                    except ValueError:
                        print("✗ Invalid port number")
            elif config['action'] == 'delete_db':
                db_choice = questionary.select(
                    "Select database to delete:",
                    choices=[
                        'Models Database',
                        'Usage Database',
                        'Sessions Database',
                        'Cancel'
                    ]
                ).ask()

                if db_choice == 'Cancel':
                    continue

                db_path = DB_CONFIGS[db_choice]['path']
                confirm = questionary.confirm(
                    f"Are you sure you want to delete {db_choice}? This action cannot be undone.",
                    default=False
                ).ask()

                if confirm:
                    if delete_database(db_path):
                        print(f"✓ Successfully deleted {db_choice}")
                    else:
                        print(f"✗ Failed to delete {db_choice}")
            elif config['action'] == 'exit':
                break
        else:
            if launch_viewer(config['path'], choice, config['port']):
                print("\n🔍 Press Ctrl+C to stop the server")
                running_processes[0].wait()
                break


@click.command()
@click.option('--db', is_flag=True, help='Launch interactive database viewer')
def cli(db):
    """Pydantic2 CLI tool for database viewing"""
    try:
        if db:
            show_interactive_menu()
        else:
            print("Use --db to launch the interactive database viewer")

    except Exception as e:
        print(f"Error: {str(e)}")
        cleanup_processes()
        raise click.Abort()


if __name__ == '__main__':
    try:
        cli()
    finally:
        cleanup_processes()

This is all from code downloaded from the whl at https://pypi.org/project/pydantic2/1.1.15/#files

Maintenance or replacement?

Replacement

Source code repositories URLs

Repo links to https://github.com/markolofsen/pydantic2 (goes to 404)

The documentation is here: https://pydantic.reforms.ai/getting-started/installation/

The documentation breaches Pydantic trademark by using the Pydantic logo and the PSF trademark by using the Python logo:

Image Image

Contact and additional research

We repeated asked the creator to take down the package on github, he deleted our issues then deleted the repo or made it private. Screenshot of issue before it was deleted:

Image

Code of Conduct

  • I agree to follow the PSF Code of Conduct
@samuelcolvin samuelcolvin added the PEP 541 Package name support requests label May 19, 2025
@samuelcolvin
Copy link
Author

related to pypi/warehouse#17774 and pydantic/logfire#929.

@samuelcolvin
Copy link
Author

@markolofsen tagging you so you see this, to get this sorted quickly please could you delete pydantic2 and pydantic3, or transfer them to me, https://pypi.org/user/samuelcolvin/

@samuelcolvin
Copy link
Author

The lack of action here, or even a reply is extremely frustration and frankly worrying.

Leaving this package up is indefensible. It's a serious security risk.

Please can someone act here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
PEP 541 Package name support requests
Projects
None yet
Development

No branches or pull requests

1 participant
0