-
Notifications
You must be signed in to change notification settings - Fork 17
WIP: Refactor to add ability to use it as a lib #10
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,7 +9,7 @@ | |
import argparse | ||
import requests | ||
from prettytable import PrettyTable | ||
from dataclasses import dataclass | ||
from dataclasses import dataclass, asdict | ||
from ipaddress import IPv4Address, IPv6Address | ||
from typing import List, Dict | ||
|
||
|
@@ -172,10 +172,13 @@ def fetch_common_facilities(facilities: List[Facility]) -> List[str]: | |
common_fac = common_fac.intersection(set([i.name for i in fac.present_in])) | ||
return common_fac | ||
|
||
|
||
def main(): | ||
|
||
args = getArgs() | ||
def get_peers(args: Dict) -> Dict: | ||
""" Return a dict with peers | ||
Arguments: | ||
args: A dict containing all peers and extra arguments | ||
Returns: | ||
A list of peers and facilities | ||
""" | ||
pdata = dict() | ||
peers = list() | ||
[pdata.update({i: getPeeringDB(i)}) for i in args.asn] | ||
|
@@ -185,60 +188,131 @@ def main(): | |
ix_set = [pdb_to_ixp(ix) for _, ix in ix_dedup.items()] | ||
netfac_set = [pdb_to_fac(ix) for ix in pdb["data"][0]["netfac_set"]] | ||
peers.append(pdb_to_peer(pdb["data"][0], ix_set, netfac_set)) | ||
return(peers) | ||
|
||
if args.ix_only: | ||
common_ix_list = fetch_common_ixps(peers) | ||
if len(common_ix_list) < 1: | ||
print("Didnt find any common IX, exiting...") | ||
exit(1) | ||
header = [] | ||
header.append("IX") | ||
def find_common_points(common_ix_list: List, common_fac_list: List, peers: List) -> Dict: | ||
""" Return a dict with all common IXs and facilities | ||
Arguments: | ||
common_ix_list: A list of all common ixs | ||
common_fac_list: A list of all common facilities | ||
peers: A list of the involved peers | ||
Returns: | ||
A dict with the merged IXs and facilites | ||
""" | ||
common_points = {} | ||
common_ixs = {} | ||
common_facs = {} | ||
for ix in common_ix_list: | ||
for peer in peers: | ||
header.append(peer.name) | ||
|
||
ix_tab = PrettyTable(header) | ||
ix_tab.print_empty = False | ||
|
||
for ix in common_ix_list: | ||
row = [ix] | ||
for peer in peers: | ||
curr_ix = fetch_ix_from_ixps(ix, peer.peering_on) | ||
v4 = "v4: " + "\n".join([str(i) for i in curr_ix.subnet4]) | ||
v6 = "v6: " + "\n".join( | ||
[str(i) for i in curr_ix.subnet6 if str(i) != "0100::"] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We use the IPv6 prefix 0100:: to denote that this peer does not have a v6 address. So please ensure you capture this logic here. |
||
curr_ix = asdict(fetch_ix_from_ixps(ix, peer.peering_on)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please avoid rednering the dataclass to dict, rather keep track of common IXPs by appending the IXP dataclass to a list, and returning that. Move the logic for the rendering for the ipv4/ipv6 to human format to human_readable_print, and use subnet4/subnet6 to capture the list of IP Addresses in the IXP dataclass. |
||
if curr_ix['name'] not in common_ixs: | ||
common_ixs.update({curr_ix['name']: {}}) | ||
|
||
common_ixs[curr_ix['name']].update( | ||
{ | ||
asdict(peer)['ASN']: | ||
{ | ||
"name": asdict(peer)['name'], | ||
"IPv4": [IPv4Address(i) for i in curr_ix["subnet4"]], | ||
"IPv6": [IPv6Address(i) for i in curr_ix["subnet6"]] | ||
} | ||
} | ||
|
||
) | ||
v6 = v6 if v6 != "v6: " else "" | ||
line = f"{v4}\n{v6}" | ||
row.append(line) | ||
ix_tab.add_row(row) | ||
for fac in common_fac_list: | ||
for peer in peers: | ||
curr_fac = asdict(fetch_fac_from_facilities(fac, peer.present_in)) | ||
if curr_fac['name'] not in common_facs: | ||
common_facs.update({curr_fac['name']: {}}) | ||
common_facs[curr_fac['name']].update({ | ||
asdict(peer)['name']: asdict(peer)['ASN'] | ||
}) | ||
common_points.update({ | ||
"IXs": common_ixs, | ||
"FACs": common_facs | ||
}) | ||
return(common_points) | ||
|
||
def human_readable_print(common_points: Dict, peers: List): | ||
""" Return always 0 | ||
Arguments: | ||
common_points: A dict containing all common facilites and IXs | ||
Returns: | ||
Always zero | ||
""" | ||
|
||
header = [] | ||
header.append("IX") | ||
|
||
for peer in peers: | ||
header.append(peer.name) | ||
|
||
ix_tab = PrettyTable(header) | ||
ix_tab.print_empty = False | ||
|
||
for ix in common_points['IXs']: | ||
row = [ix] | ||
for peer in peers: | ||
|
||
v4 = "v4: " + "\n".join([str(i) for i in common_points['IXs'][ix][asdict(peer)['ASN']]['IPv4']]) | ||
v6 = "v6: " + "\n".join( | ||
[str(i) for i in common_points['IXs'][ix][asdict(peer)['ASN']]['IPv6'] if str(i) != "0100::"] | ||
) | ||
v6 = v6 if v6 != "v6: " else "" | ||
line = f"{v4}\n{v6}" | ||
row.append(line) | ||
ix_tab.add_row(row) | ||
|
||
ix_tab.hrules = 1 | ||
print(ix_tab.get_string(sortby="IX")) | ||
print(ix_tab.get_string(sortby="IX")) | ||
|
||
if args.fac_only: | ||
common_fac_list = fetch_common_facilities(peers) | ||
if len(common_fac_list) < 1: | ||
print("Didnt find any common Facility, exiting...") | ||
exit(1) | ||
header = [] | ||
header.append("Facility") | ||
header = [] | ||
header.append("Facility") | ||
for peer in peers: | ||
header.append(peer.name) | ||
|
||
ix_tab = PrettyTable(header) | ||
ix_tab.print_empty = False | ||
|
||
for fac in common_points['FACs']: | ||
row = [fac] | ||
|
||
for peer in peers: | ||
header.append(peer.name) | ||
curr_fac = common_points['FACs'][fac][asdict(peer)['name']] | ||
line = f"ASN: {curr_fac}" | ||
row.append(line) | ||
ix_tab.add_row(row) | ||
|
||
ix_tab = PrettyTable(header) | ||
ix_tab.print_empty = False | ||
ix_tab.hrules = 1 | ||
|
||
print(ix_tab.get_string(sortby="Facility")) | ||
|
||
for fac in common_fac_list: | ||
row = [fac] | ||
for peer in peers: | ||
curr_fac = fetch_fac_from_facilities(fac, peer.present_in) | ||
line = f"ASN: {curr_fac.ASN}" | ||
row.append(line) | ||
ix_tab.add_row(row) | ||
return 0 | ||
|
||
ix_tab.hrules = 1 | ||
print(ix_tab.get_string(sortby="Facility")) | ||
|
||
def main(): | ||
|
||
args = getArgs() | ||
peers = get_peers(args) | ||
|
||
if args.ix_only and args.fac_only: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. and args.fac_only is not needed here, as we will fallthrough to the next elif and print the required output. |
||
common_ix_list = fetch_common_ixps(peers) | ||
common_fac_list = fetch_common_facilities(peers) | ||
human_readable_print(find_common_points(common_ix_list,common_fac_list, peers), peers) | ||
elif args.ix_only: | ||
common_ix_list = fetch_common_ixps(peers) | ||
if len(common_ix_list) < 1: | ||
print("Didnt find any common IX, exiting...") | ||
exit(1) | ||
human_readable_print(find_common_points(common_ix_list,[], peers), peers) | ||
|
||
elif args.fac_only: | ||
common_fac_list = fetch_common_facilities(peers) | ||
if len(common_fac_list) < 1: | ||
print("Didnt find any common Facility, exiting...") | ||
exit(1) | ||
human_readable_print(find_common_points([],common_fac_list, peers), peers) | ||
|
||
exit(0) | ||
|
||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider a return type of List[IXP] for this method
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will refactor this and split it in two functions which can be called independ.