10000 nac-collector for Catalyst Center | Pilot by rrahimm · Pull Request #82 · netascode/nac-collector · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

nac-collector for Catalyst Center | Pilot #82

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 91 additions & 10 deletions nac_collector/cisco_client_catalystcenter.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,30 @@ class CiscoClientCATALYSTCENTER(CiscoClient):
"credentials_https_write": "httpsWrite",
}

"""
id_lookup dictionary to define alternate endpoints for fetching data, as some provider fetched APIs require a different endpoint for fetching data

lookup_endpoint: The endpoint to fetch the list of id's. These are required to fetch data from the new endpoint.
lookup_id: The key to use for fetching the id's from the response of the lookup_endpoint.
new_endpoint: The new endpoint to fetch data from, where %v is replaced by the lookup_id.
uid: The key to use for the id in the new endpoint data that is written to the file. default is 'id'
"""

id_lookup = {
"/dna/intent/api/v2/credential-to-site": {
"lookup_endpoint": "/dna/intent/api/v1/site",
"lookup_id": "id",
"new_endpoint": "/dna/intent/api/v1/sites/%v/deviceCredentials",
"uid": "siteId",
},
"/dna/intent/api/v1/reserve-ip-subpool": {
"lookup_endpoint": "/dna/intent/api/v1/site",
"lookup_id": "id",
"new_endpoint": "/dna/intent/api/v1/reserve-ip-subpool?siteId=%v",
"uid": "siteId",
},
}

def __init__(
self,
username,
Expand Down Expand Up @@ -105,39 +129,85 @@ def process_endpoint_data(self, endpoint, endpoint_dict, data):
dict: The updated endpoint dictionary with processed data.
"""

# modify endpoint before writing to file, if alternate endpoint is defined in id_lookup
if endpoint.get("endpoint") in self.id_lookup:
new_endpoint = self.id_lookup[endpoint.get("endpoint")]["new_endpoint"]
else:
new_endpoint = endpoint["endpoint"]

if data is None:
endpoint_dict[endpoint["name"]].append(
{"data": {}, "endpoint": endpoint["endpoint"]}
)

# License API returns a list of dictionaries
elif isinstance(data, list):
endpoint_dict[endpoint["name"]].append(
{"data": data, "endpoint": endpoint["endpoint"]}
{"data": data, "endpoint": new_endpoint}
)
elif isinstance(data.get("response"), dict):
for k, v in data.get("response").items():
if self.mappings[endpoint["name"]] == k:
if (
self.mappings.get(endpoint["name"])
and self.mappings[endpoint["name"]] == k
):
for i in v:
endpoint_dict[endpoint["name"]].append(
{
"data": i,
"endpoint": endpoint["endpoint"]
+ "/"
+ self.get_id_value(i),
"endpoint": new_endpoint + "/" + self.get_id_value(i),
}
)
else:
endpoint_dict[endpoint["name"]].append(
{"data": v, "endpoint": new_endpoint}
)
elif data.get("response"):
for i in data.get("response"):
endpoint_dict[endpoint["name"]].append(
{
"data": i,
"endpoint": endpoint["endpoint"] + "/" + self.get_id_value(i),
"endpoint": new_endpoint + "/" + self.get_id_value(i),
}
)

return endpoint_dict # Return the processed endpoint dictionary

def fetch_data_alternate(self, endpoint):
"""
Retrieve data from an alternate endpoint if defined in id_lookup.

Parameters:
endpoint (dict): The endpoint configuration.

Returns:
dict: The dictionary containing the data retrieved from the alternate endpoint.
"""

id_lookup_data = self.fetch_data(
self.id_lookup[endpoint.get("endpoint")]["lookup_endpoint"]
)
id_list = [
i[self.id_lookup[endpoint.get("endpoint")]["lookup_id"]]
for i in id_lookup_data["response"]
]
data_list = []
for id_ in id_list:
lookup_endpoint = self.id_lookup[endpoint.get("endpoint")][
"new_endpoint"
].replace("%v", id_)
data = self.fetch_data(lookup_endpoint)
if data.get("response"):
data = data["response"]
if isinstance(data, dict):
data[self.id_lookup[endpoint.get("endpoint")].get("uid", "id")] = id_
elif isinstance(data, list):
data = {
self.id_lookup[endpoint.get("endpoint")].get("uid", "id"): id_,
"data": data,
}
data_list.append(data)
data = {"response": data_list}
return data

def get_from_endpoints(self, endpoints_yaml_file):
"""
Retrieve data from a list of endpoints specified in a YAML file and
Expand Down Expand Up @@ -165,7 +235,15 @@ def get_from_endpoints(self, endpoints_yaml_file):

endpoint_dict = CiscoClient.create_endpoint_dict(endpoint)

data = self.fetch_data(endpoint["endpoint"])
# if alternate endpoint defined in id_lookup, fetch data from alternate endpoint
if endpoint.get("endpoint") in self.id_lookup:
logger.info(
"Alternate endpoint found: %s",
self.id_lookup[endpoint.get("endpoint")]["lookup_endpoint"],
)
data = self.fetch_data_alternate(endpoint)
else:
data = self.fetch_data(endpoint["endpoint"])

# Process the endpoint data and get the updated dictionary
endpoint_dict = self.process_endpoint_data(
Expand Down Expand Up @@ -245,6 +323,9 @@ def get_id_value(i):
try:
id_value = i["name"]
except KeyError:
id_value = None
try:
id_value = i["siteId"]
except KeyError:
id_value = None

return id_value
0