8000 Refactored Maven Artifact Download Script by jharper-sec · Pull Request #8 · jharper-sec/c6t · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Refactored Maven Artifact Download Script #8

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "c6t"
version = "0.0.8"
version = "0.0.9"
description = "Unofficial Administrative Command Line Interface for Contrast Security"
authors = [
{ name = "Jonathan Harper", email = "39912347+jharper-sec@users.noreply.github.com" },
Expand Down
2 changes: 1 addition & 1 deletion src/c6t/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
__app_name__ = "c6t"
__version__ = "0.0.8"
__version__ = "0.0.9"
283 changes: 153 additions & 130 deletions src/c6t/api/maven_repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,136 +10,159 @@
from rich import print as rprint
from lxml import etree

session = requests.Session()


def verify_file_checksum(
target_filename: Path, checksum_filename: Path, algorithm: str
) -> bool:
"""
Verify the checksum of a file against a checksum file.
"""
rprint(f"[cyan]Verifying checksum for {target_filename.name}...")
with open(target_filename, "rb") as f:
target_checksum = hashlib.new(
algorithm, f.read(), usedforsecurity=True
).hexdigest()
with open(checksum_filename, "r") as f:
checksum = f.read().strip()
if target_checksum != checksum:
raise ValueError(f"Checksum does not match for {target_filename.name}.")
else:
rprint(f"[green]Checksum verified for {target_filename.name}.")
return True


def download_file(url: str, filename: Path) -> None:
rprint(f"[cyan]Starting download from: {url}")
with Progress() as progress:
task = progress.add_task(f"[cyan]Downloading...", total=100)
response = session.get(url, stream=True)
response.raise_for_status()
total_length = int(response.headers.get("content-length", 0))
chunk_size = 4096

with open(filename, "wb") as f:
for data in response.iter_content(chunk_size=chunk_size):
f.write(data)
progress.update(task, advance=len(data) / total_length * 100)
rprint(f"[green]Download completed: {filename.name}")


def download_maven_metadata_from_repo(repository_url: str, temp_dir: Path) -> Path:
url = f"{repository_url}/com/contrastsecurity/contrast-agent/maven-metadata.xml"
filename = temp_dir / "maven-metadata.xml"
download_file(url, filename)
return filename


def download_maven_metadata_checksum_from_repo(
repository_url: str, algorithm: str, temp_dir: Path
) -> Path:
filename = temp_dir / f"maven-metadata.xml.{algorithm}"
url = f"{repository_url}/com/contrastsecurity/contrast-agent/{filename.name}"
download_file(url, filename)
return filename


def parse_maven_metadata(metadata_file: Path) -> Any:
with open(metadata_file, "r") as f:
metadata = f.read()
tree = etree.parse(BytesIO(metadata.encode("utf-8")))
return tree.getroot()


def get_latest_version_from_maven_metadata(root: Any) -> str:
versioning = root.find("versioning")
if versioning is not None:
latest = versioning.find("latest")
if latest is not None:
if latest.text is not None:
return latest.text.strip()
raise ValueError("Could not find latest version in Maven metadata.")


def is_version_in_maven_metadata(root: Any, version: str) -> bool:
versioning = root.find("versioning")
if versioning is not None:
versions = versioning.find("versions")
if versions is not None:
for v in versions:
if v.text is not None and v.text.strip() == version:
return True
raise ValueError(f"Version {version} is not in Maven metadata.")


def download_java_agent_from_repo(
repository_url: str, version: str, target_dir: Path
) -> Path:
with tempfile.TemporaryDirectory() as temp_dir:
temp_dir_path = Path(temp_dir)
maven_metadata_file = download_maven_metadata_from_repo(
repository_url, temp_dir_path
)
maven_metadata_checksum_file = download_maven_metadata_checksum_from_repo(
repository_url, "sha1", temp_dir_path
)
verify_file_checksum(maven_metadata_file, maven_metadata_checksum_file, "sha1")

root = parse_maven_metadata(maven_metadata_file)

if version == "latest":
version = get_latest_version_from_maven_metadata(root)
class ChecksumVerifier:
@staticmethod
def verify_file_checksum(
target_filename: Path, checksum_filename: Path, algorithm: str
) -> bool:
"""
Verify the checksum of a file against a checksum file.
"""
rprint(f"[cyan]Verifying checksum for {target_filename.name}...")
with open(target_filename, "rb") as f:
target_checksum = hashlib.new(
algorithm, f.read(), usedforsecurity=True
).hexdigest()
with open(checksum_filename, "r") as f:
checksum = f.read().strip()
if target_checksum != checksum:
raise ValueError(f"Checksum does not match for {target_filename.name}.")
else:
if not is_version_in_maven_metadata(root, version):
raise ValueError(f"Version {version} is not in Maven metadata.")

jar_filename = temp_dir_path / "contrast.jar"
url = f"{repository_url}/com/contrastsecurity/contrast-agent/{version}/contrast-agent-{version}.jar"
download_file(url, jar_filename)
java_agent_checksum_filename = download_java_agent_checksum_from_repo(
repository_url, version, temp_dir_path
)
verify_file_checksum(jar_filename, java_agent_checksum_filename, "sha1")

target_jar_path = target_dir / "contrast.jar"
shutil.copy(jar_filename, target_jar_path)
rprint(f"[green]File copied to {target_jar_path}")

return target_jar_path


def download_java_agent_checksum_from_repo(
repository_url: str, version: str, temp_dir: Path, algorithm: str = "sha1"
) -> Path:
if version == "latest":
version = get_latest_version_from_maven_metadata(
parse_maven_metadata(temp_dir / "maven-metadata.xml")
rprint(f"[green]Checksum verified for {target_filename.name}.")
return True


class FileDownloader:
def __init__(self):
self.session = requests.Session()

def download_file(self, url: str, filename: Path) -> None:
rprint(f"[cyan]Starting download from: {url}")
with Progress() as progress:
task = progress.add_task(f"[cyan]Downloading...", total=100)
response = self.session.get(url, stream=True)
response.raise_for_status()
total_length = int(response.headers.get("content-length", 0))
chunk_size = 4096

with open(filename, "wb") as f:
for data in response.iter_content(chunk_size=chunk_size):
f.write(data)
progress.update(task, advance=len(data) / total_length * 100)
rprint(f"[green]Download completed: {filename.name}")


class MavenMetadataHandler:
def __init__(self, downloader: FileDownloader, namespace: str):
self.downloader = downloader
self.namespace = namespace

def download_maven_metadata(self, repository_url: str, temp_dir: Path) -> Path:
url = f"{repository_url}/{self.namespace}/maven-metadata.xml"
filename = temp_dir / "maven-metadata.xml"
self.downloader.download_file(url, filename)
return filename

def download_maven_metadata_checksum(
self, repository_url: str, algorithm: str, temp_dir: Path
) -> Path:
filename = temp_dir / f"maven-metadata.xml.{algorithm}"
url = f"{repository_url}/{self.namespace}/{filename.name}"
self.downloader.download_file(url, filename)
return filename

def parse_maven_metadata(self, metadata_file: Path) -> Any:
with open(metadata_file, "r") as f:
metadata = f.read()
tree = etree.parse(BytesIO(metadata.encode("utf-8")))
return tree.getroot()

def get_latest_version(self, root: Any) -> str:
versioning = root.find("versioning")
if versioning is not None:
latest = versioning.find("latest")
if latest is not None:
if latest.text is not None:
return latest.text.strip()
raise ValueError("Could not find latest version in Maven metadata.")

def is_version_in_metadata(self, root: Any, version: str) -> bool:
versioning = root.find("versioning")
if versioning is not None:
versions = versioning.find("versions")
if versions is not None:
for v in versions:
if v.text is not None and v.text.strip() == version:
return True
raise ValueError(f"Version {version} is not in Maven metadata.")


class JavaAgentDownloader:
def __init__(
self,
downloader: FileDownloader,
verifier: ChecksumVerifier,
metadata_handler: MavenMetadataHandler,
):
self.downloader = downloader
self.verifier = verifier
self.metadata_handler = metadata_handler

def download_agent(
self, repository_url: str, version: str, target_dir: Path
) -> Path:
with tempfile.TemporaryDirectory() as temp_dir:
temp_dir_path = Path(temp_dir)
maven_metadata_file = self.metadata_handler.download_maven_metadata(
repository_url, temp_dir_path
)
maven_metadata_checksum_file = (
self.metadata_handler.download_maven_metadata_checksum(
repository_url, "sha1", temp_dir_path
)
)
self.verifier.verify_file_checksum(
maven_metadata_file, maven_metadata_checksum_file, "sha1"
)

root = self.metadata_handler.parse_maven_metadata(maven_metadata_file)

if version == "latest":
version = self.metadata_handler.get_latest_version(root)
else:
if not self.metadata_handler.is_version_in_metadata(root, version):
raise ValueError(f"Version {version} is not in Maven metadata.")

jar_filename = temp_dir_path / "contrast.jar"
url = f"{repository_url}/{self.metadata_handler.namespace}/{version}/contrast-agent-{version}.jar"
self.downloader.download_file(url, jar_filename)
java_agent_checksum_filename = self.download_checksum(
repository_url, version, temp_dir_path
)
self.verifier.verify_file_checksum(
jar_filename, java_agent_checksum_filename, "sha1"
)

target_jar_path = target_dir / "contrast.jar"
shutil.copy(jar_filename, target_jar_path)
rprint(f"[green]File copied to {target_jar_path}")

return target_jar_path

def download_checksum(
self, repository_url: str, version: str, temp_dir: Path, algorithm: str = "sha1"
) -> Path:
if version == "latest":
version = self.metadata_handler.get_latest_version(
self.metadata_handler.parse_maven_metadata(
temp_dir / "maven-metadata.xml"
)
)

java_agent_checksum_filename = (
temp_dir / f"contrast-agent-{version}.jar.{algorithm}"
)

java_agent_checksum_filename = (
temp_dir / f"contrast-agent-{version}.jar.{algorithm}"
)
url = f"{repository_url}/com/contrastsecurity/contrast-agent/{version}/{java_agent_checksum_filename.name}"
download_file(url, java_agent_checksum_filename)
return java_agent_checksum_filename
url = f"{repository_url}/{self.metadata_handler.namespace}/{version}/{java_agent_checksum_filename.name}"
self.downloader.download_file(url, java_agent_checksum_filename)
return java_agent_checksum_filename
17 changes: 14 additions & 3 deletions src/c6t/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from c6t.configure.credentials import ContrastAPICredentials
from c6t.ui.auth import ContrastUIAuthManager
from c6t.api.agent_config import AgentConfig
from c6t.api.maven_repo import download_java_agent_from_repo
from c6t.api import maven_repo
from c6t.external.integrations.scw.contrast_scw import scw_create, scw_delete

app = typer.Typer()
Expand Down Expand Up @@ -174,11 +174,22 @@ def download_agent(
"""
Download the Contrast agent for the specified language.
"""

MAVEN_PACKAGE_NAMESPACE = "com/contrastsecurity/contrast-agent"

if language == "JAVA":
rprint(f"Downloading Contrast agent ({version})...")
download_java_agent_from_repo(
repository_url=repository_url, version=version, target_dir=target_dir
downloader = maven_repo.FileDownloader()
verifier = maven_repo.ChecksumVerifier()
metadata_handler = maven_repo.MavenMetadataHandler(
downloader=downloader, namespace=MAVEN_PACKAGE_NAMESPACE
)
agent_downloader = maven_repo.JavaAgentDownloader(
downloader=downloader, verifier=verifier, metadata_handler=metadata_handler
)

agent_downloader.download_agent(repository_url, version, target_dir)

else:
rprint("Unsupported language")

Expand Down
0