8000 feat: improve template loading flow by ocervell · Pull Request #667 · freelabz/secator · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

feat: improve template loading flow #667

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
May 25, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions scripts/generate_tools_md_table.py
8000
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
from secator.cli import ALL_TASKS
from secator.loader import discover_tasks

import re
from pathlib import Path


TABLE_START_MARKER = "<!-- START_TOOLS_TABLE -->"
TABLE_END_MARKER = "<!-- END_TOOLS_TABLE -->"
README_FILENAME = "README.md"


def get_tools_data():
data = []
hardcoded_urls = {
Expand All @@ -23,7 +25,7 @@ def get_tools_data():
'msfconsole': 'https://docs.rapid7.com/metasploit/msf-overview/',
'searchsploit': 'https://gitlab.com/exploit-database/exploitdb'
}
for task in ALL_TASKS:
for task in discover_tasks():
url = task.install_github_handle
if url:
url = f'https://github.com/{url}'
Expand All @@ -32,7 +34,7 @@ def get_tools_data():
data.append({
'name': task.__name__,
'url': url,
'description': task.__doc__,
'description': task.__doc__ or '',
'category': '/'.join(task.tags)
})
return data
Expand Down
54 changes: 27 additions & 27 deletions secator/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,28 +18,28 @@

from secator.config import CONFIG, ROOT_FOLDER, Config, default_config, config_path, download_files
from secator.click import OrderedGroup
from secator.decorators import register_runner
from secator.cli_helper import register_runner
from secator.definitions import ADDONS_ENABLED, ASCII, DEV_PACKAGE, VERSION, STATE_COLORS
from secator.installer import ToolInstaller, fmt_health_table_row, get_health_table, get_version_info, get_distro_config
from secator.output_types import FINDING_TYPES, Info, Warning, Error
from secator.report import Report
from secator.rich import console
from secator.runners import Command, Runner
from secator.serializers.dataclass import loads_dataclass
from secator.template import TEMPLATES, TemplateLoader
from secator.loader import get_configs_by_type, discover_tasks
from secator.utils import (
debug, detect_host, discover_tasks, flatten, print_version, get_file_date,
debug, detect_host, flatten, print_version, get_file_date,
sort_files_by_date, get_file_timestamp, list_reports, get_info_from_report_path, human_to_timedelta
)
from contextlib import nullcontext
click.rich_click.USE_RICH_MARKUP = True

ALL_TASKS = discover_tasks()
ALL_WORKFLOWS = [t for t in TEMPLATES if t.type == 'workflow']
ALL_SCANS = [t for t in TEMPLATES if t.type == 'scan']
ALL_PROFILES = [t for t in TEMPLATES if t.type == 'profile']
FINDING_TYPES_LOWER = [c.__name__.lower() for c in FINDING_TYPES]
CONTEXT_SETTINGS = dict(help_option_names=['-h', '-help', '--help'])
TASKS = get_configs_by_type('task')
WORKFLOWS = get_configs_by_type('workflow')
SCANS = get_configs_by_type('scan')
PROFILES = get_configs_by_type('profile')


#-----#
Expand Down Expand Up @@ -78,8 +78,7 @@ def task(ctx):
ctx.get_help()


for cls in ALL_TASKS:
config = TemplateLoader(input={'name': cls.__name__, 'type': 'task', 'input_types': cls.input_types})
for config in TASKS:
register_runner(task, config)

#----------#
Expand All @@ -95,7 +94,7 @@ def workflow(ctx):
ctx.get_help()


for config in sorted(ALL_WORKFLOWS, key=lambda x: x['name']):
for config in WORKFLOWS:
register_runner(workflow, config)


Expand All @@ -111,7 +110,7 @@ def scan(ctx):
ctx.get_help()


for config in sorted(ALL_SCANS, key=lambda x: x['name']):
for config in SCANS:
register_runner(scan, config)


Expand Down Expand Up @@ -142,10 +141,11 @@ def worker(hostname, concurrency, reload, queue, pool, quiet, loglevel, check, d
# Check broken / backend addon is installed
broker_protocol = CONFIG.celery.broker_url.split('://')[0]
backend_protocol = CONFIG.celery.result_backend.split('://')[0]
if CONFIG.celery.broker_url:
if (broker_protocol == 'redis' or backend_protocol == 'redis') and not ADDONS_ENABLED['redis']:
console.print(Error(message='Missing redis addon: please run "secator install addons redis".'))
sys.exit(1)
if CONFIG.celery.broker_url and \
(broker_protocol == 'redis' or backend_protocol == 'redis') and \
not ADDONS_ENABLED['redis']:
console.print(Error(message='Missing redis addon: please run "secator install addons redis".'))
sys.exit(1)

# Debug Celery config
from secator.celery import app, is_celery_worker_alive
Expand Down Expand Up @@ -587,9 +587,9 @@ def profile_list():
table.add_column("Profile name", style="bold gold3")
table.add_column("Description", overflow='fold')
table.add_column("Options", overflow='fold')
for profile in ALL_PROFILES:
for profile in PROFILES:
opts_str = ','.join(f'{k}={v}' for k, v in profile.opts.items())
table.add_row(profile.name, profile.description, opts_str)
table.add_row(profile.name, profile.description or '', opts_str)
console.print(table)


Expand Down Expand Up @@ -653,20 +653,20 @@ def list_aliases(silent):
"""List aliases"""
aliases = []
aliases.extend([
f'alias {task.__name__}="secator x {task.__name__}"'
for task in ALL_TASKS
f'alias {task.name}="secator x {task.name}"'
for task in TASKS
])
aliases.extend([
f'alias {workflow.alias}="secator w {workflow.name}"'
for workflow in ALL_WORKFLOWS
for workflow in WORKFLOWS
])
aliases.extend([
f'alias {workflow.name}="secator w {workflow.name}"'
for workflow in ALL_WORKFLOWS
for workflow in WORKFLOWS
])
aliases.extend([
f'alias scan_{scan.name}="secator s {scan.name}"'
for scan in ALL_SCANS
for scan in SCANS
])
aliases.append('alias listx="secator x"')
aliases.append('alias listw="secator w"')
Expand Down Expand Up @@ -984,7 +984,7 @@ def report_export(json_path, output_folder, output):
@click.option('--bleeding', '-bleeding', is_flag=True, default=False, help='Check bleeding edge version of tools')
def health(json_, debug, strict, bleeding):
"""Get health status."""
tools = ALL_TASKS
tools = discover_tasks()
upgrade_cmd = ''
results = []
messages = []
Expand Down Expand Up @@ -1320,7 +1320,7 @@ def install_tools(cmds, cleanup, fail_fast):
cmd, version = tuple(cmd.split('=='))
else:
cmd, version = cmd, None
cls = next((cls for cls in ALL_TASKS if cls.__name__ == cmd), None)
cls = next((cls for cls in discover_tasks() if cls.__name__ == cmd), None)
if cls:
if version:
if cls.install_version and cls.install_version.startswith('v') and not version.startswith('v'):
Expand All @@ -1330,7 +1330,7 @@ def install_tools(cmds, cleanup, fail_fast):
else:
console.print(Warning(message=f'Tool {cmd} is not supported or inexistent.'))
else:
tools = ALL_TASKS
tools = discover_tasks()
tools.sort(key=lambda x: x.__name__)
return_code = 0
if not tools:
Expand Down Expand Up @@ -1400,7 +1400,7 @@ def update(all):
# Update tools
if all:
return_code = 0
for cls in ALL_TASKS:
for cls in discover_tasks():
cmd = cls.cmd.split(' ')[0]
version_flag = cls.get_version_flag()
info = get_version_info(cmd, version_flag, cls.install_github_handle)
Expand Down Expand Up @@ -1584,7 +1584,7 @@ def performance(tasks, workflows, scans, test):
def task(name, verbose, check):
"""Test a single task for semantics errors, and run unit + integration tests."""
console.print(f'[bold gold3]:wrench: Testing task {name} ...[/]')
task = [task for task in ALL_TASKS if task.__name__ == name]
task = [task for task in discover_tasks() if task.__name__ == name]
warnings = []
errors = []
exit_code = 0
Expand Down
Loading
Loading
0