From 63b1fca4cc896450e25fadf666d000733f86b0d5 Mon Sep 17 00:00:00 2001 From: Piyush Sardana Date: Mon, 11 Nov 2024 16:07:28 +0100 Subject: [PATCH 01/29] added initial config for prometheus integration in opal server --- packages/opal-server/opal_server/data/api.py | 3 +++ packages/opal-server/opal_server/metrics/__init__.py | 0 .../opal_server/metrics/prometheus_metrics.py | 9 +++++++++ packages/opal-server/opal_server/server.py | 8 ++++++++ requirements.txt | 2 ++ 5 files changed, 22 insertions(+) create mode 100644 packages/opal-server/opal_server/metrics/__init__.py create mode 100644 packages/opal-server/opal_server/metrics/prometheus_metrics.py diff --git a/packages/opal-server/opal_server/data/api.py b/packages/opal-server/opal_server/data/api.py index da5d043a9..4afe6ae54 100644 --- a/packages/opal-server/opal_server/data/api.py +++ b/packages/opal-server/opal_server/data/api.py @@ -20,6 +20,7 @@ from opal_common.urls import set_url_query_param from opal_server.config import opal_server_config from opal_server.data.data_update_publisher import DataUpdatePublisher +from metrics import data_update_total, data_update_errors def init_data_updates_router( @@ -125,8 +126,10 @@ async def publish_data_update_event( authenticator, claims, update ) # may throw Unauthorized except Unauthorized as e: + data_update_errors.inc() logger.error(f"Unauthorized to publish update: {repr(e)}") raise + data_update_total.inc() await data_update_publisher.publish_data_updates(update) return {"status": "ok"} diff --git a/packages/opal-server/opal_server/metrics/__init__.py b/packages/opal-server/opal_server/metrics/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/packages/opal-server/opal_server/metrics/prometheus_metrics.py b/packages/opal-server/opal_server/metrics/prometheus_metrics.py new file mode 100644 index 000000000..81c7ceb96 --- /dev/null +++ b/packages/opal-server/opal_server/metrics/prometheus_metrics.py @@ -0,0 +1,9 @@ +from prometheus_client import Counter, Gauge, Histogram + +data_update_total = Counter( + 'opal_data_update_total', 'Total number of data update events published' +) + +data_update_errors = Counter( + 'opal_data_update_errors', 'Total number of errors in data update publishing' +) \ No newline at end of file diff --git a/packages/opal-server/opal_server/server.py b/packages/opal-server/opal_server/server.py index 34d9905c3..2440c552c 100644 --- a/packages/opal-server/opal_server/server.py +++ b/packages/opal-server/opal_server/server.py @@ -8,6 +8,9 @@ from fastapi import Depends, FastAPI from fastapi_websocket_pubsub.event_broadcaster import EventBroadcasterContextManager +from fastapi.responses import Response +from prometheus_client import CONTENT_TYPE_LATEST, generate_latest + from opal_common.authentication.deps import JWTAuthenticator, StaticBearerAuthenticator from opal_common.authentication.signer import JWTSigner from opal_common.confi.confi import load_conf_if_none @@ -278,6 +281,11 @@ def _configure_api_routes(self, app: FastAPI): @app.get("/", include_in_schema=False) def healthcheck(): return {"status": "ok"} + + @app.get("/metrics", include_in_schema=False) + def metrics(): + """Endpoint to expose Prometheus metrics.""" + return Response(generate_latest(), media_type=CONTENT_TYPE_LATEST) return app diff --git a/requirements.txt b/requirements.txt index 656fe7c60..1d0322f24 100644 --- a/requirements.txt +++ b/requirements.txt @@ -9,3 +9,5 @@ wheel>=0.38.0 twine setuptools>=70.0.0 # not directly required, pinned by Snyk to avoid a vulnerability zipp>=3.19.1 # not directly required, pinned by Snyk to avoid a vulnerability +prometheus_client + From d1ac117506cd15f0d910866c880a09b2afb5c501 Mon Sep 17 00:00:00 2001 From: Piyush Sardana Date: Mon, 11 Nov 2024 16:20:49 +0100 Subject: [PATCH 02/29] feat(data_update_publisher.py): add data_update_latency metric to track latency of data update events feat(prometheus_metrics.py): create data_update_latency histogram to monitor latency of data update events --- .../opal_server/data/data_update_publisher.py | 82 ++++++++++--------- .../opal_server/metrics/prometheus_metrics.py | 4 + 2 files changed, 47 insertions(+), 39 deletions(-) diff --git a/packages/opal-server/opal_server/data/data_update_publisher.py b/packages/opal-server/opal_server/data/data_update_publisher.py index 64bc32bbe..e2a55d344 100644 --- a/packages/opal-server/opal_server/data/data_update_publisher.py +++ b/packages/opal-server/opal_server/data/data_update_publisher.py @@ -10,6 +10,8 @@ ServerDataSourceConfig, ) from opal_common.topics.publisher import TopicPublisher +from metrics import data_update_latency + TOPIC_DELIMITER = "/" PREFIX_DELIMITER = ":" @@ -70,44 +72,46 @@ async def publish_data_updates(self, update: DataUpdate): topics (List[str]): topics (with hierarchy) to notify subscribers of update (DataUpdate): update data-source configuration for subscribers to fetch data from """ - all_topic_combos = set() - - # a nicer format of entries to the log - logged_entries = [ - dict( - url=entry.url, - method=entry.save_method, - path=entry.dst_path or "/", - inline_data=(entry.data is not None), - topics=entry.topics, - ) - for entry in update.entries - ] - - # Expand the topics for each event to include sub topic combos (e.g. publish 'a/b/c' as 'a' , 'a/b', and 'a/b/c') - for entry in update.entries: - topic_combos = [] - if entry.topics: - for topic in entry.topics: - topic_combos.extend(DataUpdatePublisher.get_topic_combos(topic)) - entry.topics = topic_combos # Update entry with the exhaustive list, so client won't have to expand it again - all_topic_combos.update(topic_combos) - else: - logger.warning( - "[{pid}] No topics were provided for the following entry: {entry}", - pid=os.getpid(), - entry=entry, + + with data_update_latency.time(): + all_topic_combos = set() + + # a nicer format of entries to the log + logged_entries = [ + dict( + url=entry.url, + method=entry.save_method, + path=entry.dst_path or "/", + inline_data=(entry.data is not None), + topics=entry.topics, ) + for entry in update.entries + ] + + # Expand the topics for each event to include sub topic combos (e.g. publish 'a/b/c' as 'a' , 'a/b', and 'a/b/c') + for entry in update.entries: + topic_combos = [] + if entry.topics: + for topic in entry.topics: + topic_combos.extend(DataUpdatePublisher.get_topic_combos(topic)) + entry.topics = topic_combos # Update entry with the exhaustive list, so client won't have to expand it again + all_topic_combos.update(topic_combos) + else: + logger.warning( + "[{pid}] No topics were provided for the following entry: {entry}", + pid=os.getpid(), + entry=entry, + ) + + # publish all topics with all their sub combinations + logger.info( + "[{pid}] Publishing data update to topics: {topics}, reason: {reason}, entries: {entries}", + pid=os.getpid(), + topics=all_topic_combos, + reason=update.reason, + entries=logged_entries, + ) - # publish all topics with all their sub combinations - logger.info( - "[{pid}] Publishing data update to topics: {topics}, reason: {reason}, entries: {entries}", - pid=os.getpid(), - topics=all_topic_combos, - reason=update.reason, - entries=logged_entries, - ) - - await self._publisher.publish( - list(all_topic_combos), update.dict(by_alias=True) - ) + await self._publisher.publish( + list(all_topic_combos), update.dict(by_alias=True) + ) diff --git a/packages/opal-server/opal_server/metrics/prometheus_metrics.py b/packages/opal-server/opal_server/metrics/prometheus_metrics.py index 81c7ceb96..251caef04 100644 --- a/packages/opal-server/opal_server/metrics/prometheus_metrics.py +++ b/packages/opal-server/opal_server/metrics/prometheus_metrics.py @@ -6,4 +6,8 @@ data_update_errors = Counter( 'opal_data_update_errors', 'Total number of errors in data update publishing' +) + +data_update_latency = Histogram( + 'opal_data_update_latency_seconds', 'Latency of data update events in seconds' ) \ No newline at end of file From faf0cd8e9dacc649a5425ccd9f3f0f4063129b10 Mon Sep 17 00:00:00 2001 From: Piyush Sardana Date: Tue, 12 Nov 2024 06:14:40 +0100 Subject: [PATCH 03/29] refactor(api.py, data_update_publisher.py): update import paths for metrics to use opal_server.metrics.prometheus_metrics for better organization chore(requirements.txt): add prometheus_client to dependencies for metrics tracking functionality --- packages/opal-server/opal_server/data/api.py | 2 +- packages/opal-server/opal_server/data/data_update_publisher.py | 2 +- packages/requires.txt | 1 + 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/packages/opal-server/opal_server/data/api.py b/packages/opal-server/opal_server/data/api.py index 4afe6ae54..f1a0fad0a 100644 --- a/packages/opal-server/opal_server/data/api.py +++ b/packages/opal-server/opal_server/data/api.py @@ -20,7 +20,7 @@ from opal_common.urls import set_url_query_param from opal_server.config import opal_server_config from opal_server.data.data_update_publisher import DataUpdatePublisher -from metrics import data_update_total, data_update_errors +from opal_server.metrics.prometheus_metrics import data_update_total, data_update_errors def init_data_updates_router( diff --git a/packages/opal-server/opal_server/data/data_update_publisher.py b/packages/opal-server/opal_server/data/data_update_publisher.py index e2a55d344..0ff0eaa43 100644 --- a/packages/opal-server/opal_server/data/data_update_publisher.py +++ b/packages/opal-server/opal_server/data/data_update_publisher.py @@ -10,7 +10,7 @@ ServerDataSourceConfig, ) from opal_common.topics.publisher import TopicPublisher -from metrics import data_update_latency +from opal_server.metrics.prometheus_metrics import data_update_latency TOPIC_DELIMITER = "/" diff --git a/packages/requires.txt b/packages/requires.txt index 0a6244553..96939e4a7 100644 --- a/packages/requires.txt +++ b/packages/requires.txt @@ -12,3 +12,4 @@ uvicorn[standard]>=0.17.6,<1 fastapi-utils>=0.2.1,<1 setuptools>=70.0.0 # not directly required, pinned by Snyk to avoid a vulnerability anyio>=4.4.0 # not directly required, pinned by Snyk to avoid a vulnerability +prometheus_client \ No newline at end of file From bc06d6b394c302c2e6e32c094ad77decfdb58925 Mon Sep 17 00:00:00 2001 From: Piyush Sardana Date: Tue, 12 Nov 2024 09:03:19 +0100 Subject: [PATCH 04/29] feat(data_update_publisher.py): add data_update_count_per_topic metric to track updates per topic feat(prometheus_metrics.py): introduce data_update_count_per_topic counter for monitoring data updates by topic --- .../opal-server/opal_server/data/data_update_publisher.py | 8 +++++++- .../opal-server/opal_server/metrics/prometheus_metrics.py | 6 ++++-- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/packages/opal-server/opal_server/data/data_update_publisher.py b/packages/opal-server/opal_server/data/data_update_publisher.py index 0ff0eaa43..dc419e8fa 100644 --- a/packages/opal-server/opal_server/data/data_update_publisher.py +++ b/packages/opal-server/opal_server/data/data_update_publisher.py @@ -10,7 +10,10 @@ ServerDataSourceConfig, ) from opal_common.topics.publisher import TopicPublisher -from opal_server.metrics.prometheus_metrics import data_update_latency +from opal_server.metrics.prometheus_metrics import ( + data_update_latency, + data_update_count_per_topic +) TOPIC_DELIMITER = "/" @@ -96,6 +99,9 @@ async def publish_data_updates(self, update: DataUpdate): topic_combos.extend(DataUpdatePublisher.get_topic_combos(topic)) entry.topics = topic_combos # Update entry with the exhaustive list, so client won't have to expand it again all_topic_combos.update(topic_combos) + + for topic in topic_combos: + data_update_count_per_topic.labels(topic).inc() else: logger.warning( "[{pid}] No topics were provided for the following entry: {entry}", diff --git a/packages/opal-server/opal_server/metrics/prometheus_metrics.py b/packages/opal-server/opal_server/metrics/prometheus_metrics.py index 251caef04..5107218ab 100644 --- a/packages/opal-server/opal_server/metrics/prometheus_metrics.py +++ b/packages/opal-server/opal_server/metrics/prometheus_metrics.py @@ -8,6 +8,8 @@ 'opal_data_update_errors', 'Total number of errors in data update publishing' ) -data_update_latency = Histogram( - 'opal_data_update_latency_seconds', 'Latency of data update events in seconds' +data_update_count_per_topic = Counter( + 'opal_data_update_count_per_topic', + 'Count of data updates published per topic', + labelnames=['topic'] ) \ No newline at end of file From 6dfb25d543472465a788567fb4f4fc110bb23a88 Mon Sep 17 00:00:00 2001 From: Piyush Sardana Date: Tue, 12 Nov 2024 10:48:27 +0100 Subject: [PATCH 05/29] feat(metrics): add new metrics for policy updates and bundle requests to enhance observability fix(api.py): increment policy bundle request count and measure latency for bundle generation fix(callbacks.py): observe size of changed directories in policy update notifications fix(task.py): track policy update count and latency when triggering policy watcher --- .../opal_server/metrics/prometheus_metrics.py | 30 ++++++++ .../opal_server/policy/bundles/api.py | 70 +++++++++++++------ .../opal_server/policy/watcher/callbacks.py | 3 + .../opal_server/policy/watcher/task.py | 9 ++- 4 files changed, 88 insertions(+), 24 deletions(-) diff --git a/packages/opal-server/opal_server/metrics/prometheus_metrics.py b/packages/opal-server/opal_server/metrics/prometheus_metrics.py index 5107218ab..41b4a272d 100644 --- a/packages/opal-server/opal_server/metrics/prometheus_metrics.py +++ b/packages/opal-server/opal_server/metrics/prometheus_metrics.py @@ -1,5 +1,6 @@ from prometheus_client import Counter, Gauge, Histogram +# Data update metrics data_update_total = Counter( 'opal_data_update_total', 'Total number of data update events published' ) @@ -12,4 +13,33 @@ 'opal_data_update_count_per_topic', 'Count of data updates published per topic', labelnames=['topic'] +) + +# Policy update metrics +policy_update_count = Counter( + 'opal_policy_update_count', + 'Total number of policy updates triggered', + labelnames=['source'] +) + +policy_update_latency = Histogram( + 'opal_policy_update_latency_seconds', + 'Latency of policy bundle generation in seconds', + labelnames=['source'] +) + +policy_bundle_request_count = Counter( + 'opal_policy_bundle_request_count', + 'Total number of policy bundle requests' +) + +policy_bundle_latency = Histogram( + 'opal_policy_bundle_latency_seconds', + 'Latency of serving policy bundles in seconds' +) + +policy_update_size = Histogram( + 'opal_policy_update_size', + 'Size of policy updates (in number of files)', + buckets=[1, 10, 50, 100, 500, 1000] ) \ No newline at end of file diff --git a/packages/opal-server/opal_server/policy/bundles/api.py b/packages/opal-server/opal_server/policy/bundles/api.py index ae1da68ef..831f9647c 100644 --- a/packages/opal-server/opal_server/policy/bundles/api.py +++ b/packages/opal-server/opal_server/policy/bundles/api.py @@ -13,6 +13,11 @@ from opal_common.schemas.policy import PolicyBundle from opal_server.config import opal_server_config from starlette.responses import RedirectResponse +from opal_server.metrics import ( + policy_bundle_request_count, + policy_bundle_latency, + policy_update_size, +) router = APIRouter() @@ -99,29 +104,48 @@ async def get_policy( None, description="hash of previous bundle already downloaded, server will return a diff bundle.", ), -): - maker = BundleMaker( - repo, - in_directories=set(input_paths), - extensions=opal_server_config.FILTER_FILE_EXTENSIONS, - root_manifest_path=opal_server_config.POLICY_REPO_MANIFEST_PATH, - bundle_ignore=opal_server_config.BUNDLE_IGNORE, - ) - # check if commit exist in the repo - revision = None - if base_hash: +): + policy_bundle_request_count.inc() + + with policy_bundle_latency.time(): + maker = BundleMaker( + repo, + in_directories=set(input_paths), + extensions=opal_server_config.FILTER_FILE_EXTENSIONS, + root_manifest_path=opal_server_config.POLICY_REPO_MANIFEST_PATH, + bundle_ignore=opal_server_config.BUNDLE_IGNORE, + ) + # check if commit exist in the repo + revision = None + if base_hash: + try: + revision = repo.rev_parse(base_hash) + except ValueError: + logger.warning(f"base_hash {base_hash} not exist in the repo") + + if revision is None: + bundle = maker.make_bundle(repo.head.commit) + bundle_size = ( + (len(bundle.data_modules) if bundle.data_modules is not None else 0) + + (len(bundle.policy_modules) if bundle.policy_modules is not None else 0) + ) + if bundle.deleted_files: + bundle_size += len(bundle.deleted_files.files) + policy_update_size.observe(bundle_size) + return bundle try: - revision = repo.rev_parse(base_hash) + old_commit = repo.commit(base_hash) + diff_bundle = maker.make_diff_bundle(old_commit, repo.head.commit) + diff_bundle_size = ( + (len(diff_bundle.data_modules) if diff_bundle.data_modules is not None else 0) + + (len(diff_bundle.policy_modules) if diff_bundle.policy_modules is not None else 0) + ) + if diff_bundle.deleted_files: + diff_bundle_size += len(diff_bundle.deleted_files.files) + policy_update_size.observe(diff_bundle_size) + return diff_bundle except ValueError: - logger.warning(f"base_hash {base_hash} not exist in the repo") - - if revision is None: - return maker.make_bundle(repo.head.commit) - try: - old_commit = repo.commit(base_hash) - return maker.make_diff_bundle(old_commit, repo.head.commit) - except ValueError: - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail=f"commit with hash {base_hash} was not found in the policy repo!", + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"commit with hash {base_hash} was not found in the policy repo!", ) diff --git a/packages/opal-server/opal_server/policy/watcher/callbacks.py b/packages/opal-server/opal_server/policy/watcher/callbacks.py index 1b5f65590..11d716912 100644 --- a/packages/opal-server/opal_server/policy/watcher/callbacks.py +++ b/packages/opal-server/opal_server/policy/watcher/callbacks.py @@ -18,6 +18,7 @@ ) from opal_common.topics.publisher import TopicPublisher from opal_common.topics.utils import policy_topics +from opal_server.metrics import policy_update_size async def create_update_all_directories_in_repo( @@ -116,6 +117,8 @@ async def publish_changed_directories( ) if notification: + policy_update_size.observe(len(notification.update.changed_directories)) + async with publisher: await publisher.publish( topics=notification.topics, data=notification.update.dict() diff --git a/packages/opal-server/opal_server/policy/watcher/task.py b/packages/opal-server/opal_server/policy/watcher/task.py index a2ba57558..0b8b2b73e 100644 --- a/packages/opal-server/opal_server/policy/watcher/task.py +++ b/packages/opal-server/opal_server/policy/watcher/task.py @@ -8,6 +8,10 @@ from opal_common.logger import logger from opal_common.sources.base_policy_source import BasePolicySource from opal_server.config import opal_server_config +from opal_server.metrics import ( + policy_update_count, + policy_update_latency +) class BasePolicyWatcherTask: @@ -123,4 +127,7 @@ async def stop(self): async def trigger(self, topic: Topic, data: Any): """triggers the policy watcher from outside to check for changes (git pull)""" - await self._watcher.check_for_changes() + policy_update_count.labels(source="webhook").inc() + + with policy_update_latency.labels(source="webhook").time(): + await self._watcher.check_for_changes() From af89d39b1739410445009301407d65cd5a408287 Mon Sep 17 00:00:00 2001 From: Piyush Sardana Date: Thu, 14 Nov 2024 10:56:48 +0100 Subject: [PATCH 06/29] moved prometheus metrics to opal common --- .../opal_common/monitoring}/prometheus_metrics.py | 5 +++++ packages/opal-server/opal_server/data/api.py | 2 +- .../opal-server/opal_server/data/data_update_publisher.py | 2 +- packages/opal-server/opal_server/metrics/__init__.py | 0 packages/opal-server/opal_server/policy/bundles/api.py | 6 +++--- .../opal-server/opal_server/policy/watcher/callbacks.py | 2 +- packages/opal-server/opal_server/policy/watcher/task.py | 2 +- 7 files changed, 12 insertions(+), 7 deletions(-) rename packages/{opal-server/opal_server/metrics => opal-common/opal_common/monitoring}/prometheus_metrics.py (90%) delete mode 100644 packages/opal-server/opal_server/metrics/__init__.py diff --git a/packages/opal-server/opal_server/metrics/prometheus_metrics.py b/packages/opal-common/opal_common/monitoring/prometheus_metrics.py similarity index 90% rename from packages/opal-server/opal_server/metrics/prometheus_metrics.py rename to packages/opal-common/opal_common/monitoring/prometheus_metrics.py index 41b4a272d..b831154af 100644 --- a/packages/opal-server/opal_server/metrics/prometheus_metrics.py +++ b/packages/opal-common/opal_common/monitoring/prometheus_metrics.py @@ -9,6 +9,11 @@ 'opal_data_update_errors', 'Total number of errors in data update publishing' ) +data_update_latency = Histogram( + 'opal_data_update_latency_seconds', + 'Latency of data update publishing in seconds' +) + data_update_count_per_topic = Counter( 'opal_data_update_count_per_topic', 'Count of data updates published per topic', diff --git a/packages/opal-server/opal_server/data/api.py b/packages/opal-server/opal_server/data/api.py index f1a0fad0a..4109024c8 100644 --- a/packages/opal-server/opal_server/data/api.py +++ b/packages/opal-server/opal_server/data/api.py @@ -20,7 +20,7 @@ from opal_common.urls import set_url_query_param from opal_server.config import opal_server_config from opal_server.data.data_update_publisher import DataUpdatePublisher -from opal_server.metrics.prometheus_metrics import data_update_total, data_update_errors +from opal_common.monitoring.prometheus_metrics import data_update_total, data_update_errors def init_data_updates_router( diff --git a/packages/opal-server/opal_server/data/data_update_publisher.py b/packages/opal-server/opal_server/data/data_update_publisher.py index dc419e8fa..bfec77244 100644 --- a/packages/opal-server/opal_server/data/data_update_publisher.py +++ b/packages/opal-server/opal_server/data/data_update_publisher.py @@ -10,7 +10,7 @@ ServerDataSourceConfig, ) from opal_common.topics.publisher import TopicPublisher -from opal_server.metrics.prometheus_metrics import ( +from opal_common.monitoring.prometheus_metrics import ( data_update_latency, data_update_count_per_topic ) diff --git a/packages/opal-server/opal_server/metrics/__init__.py b/packages/opal-server/opal_server/metrics/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/packages/opal-server/opal_server/policy/bundles/api.py b/packages/opal-server/opal_server/policy/bundles/api.py index 831f9647c..bd320a034 100644 --- a/packages/opal-server/opal_server/policy/bundles/api.py +++ b/packages/opal-server/opal_server/policy/bundles/api.py @@ -13,7 +13,7 @@ from opal_common.schemas.policy import PolicyBundle from opal_server.config import opal_server_config from starlette.responses import RedirectResponse -from opal_server.metrics import ( +from opal_common.monitoring.prometheus_metrics import ( policy_bundle_request_count, policy_bundle_latency, policy_update_size, @@ -104,9 +104,9 @@ async def get_policy( None, description="hash of previous bundle already downloaded, server will return a diff bundle.", ), -): +): policy_bundle_request_count.inc() - + with policy_bundle_latency.time(): maker = BundleMaker( repo, diff --git a/packages/opal-server/opal_server/policy/watcher/callbacks.py b/packages/opal-server/opal_server/policy/watcher/callbacks.py index 11d716912..04ab59f9a 100644 --- a/packages/opal-server/opal_server/policy/watcher/callbacks.py +++ b/packages/opal-server/opal_server/policy/watcher/callbacks.py @@ -18,7 +18,7 @@ ) from opal_common.topics.publisher import TopicPublisher from opal_common.topics.utils import policy_topics -from opal_server.metrics import policy_update_size +from opal_common.monitoring.prometheus_metrics import policy_update_size async def create_update_all_directories_in_repo( diff --git a/packages/opal-server/opal_server/policy/watcher/task.py b/packages/opal-server/opal_server/policy/watcher/task.py index 0b8b2b73e..42f88a072 100644 --- a/packages/opal-server/opal_server/policy/watcher/task.py +++ b/packages/opal-server/opal_server/policy/watcher/task.py @@ -8,7 +8,7 @@ from opal_common.logger import logger from opal_common.sources.base_policy_source import BasePolicySource from opal_server.config import opal_server_config -from opal_server.metrics import ( +from opal_common.monitoring.prometheus_metrics import ( policy_update_count, policy_update_latency ) From 6e500554ca202af3c1ccde0ef2bd23b29f4062e2 Mon Sep 17 00:00:00 2001 From: Piyush Sardana Date: Thu, 14 Nov 2024 11:55:37 +0100 Subject: [PATCH 07/29] scopes and security prometheus metrics added --- .../monitoring/prometheus_metrics.py | 113 ++++++++-- packages/opal-server/opal_server/data/api.py | 6 +- .../opal_server/data/data_update_publisher.py | 8 +- .../opal_server/policy/bundles/api.py | 14 +- .../opal_server/policy/watcher/callbacks.py | 4 +- .../opal_server/policy/watcher/task.py | 8 +- .../opal-server/opal_server/scopes/api.py | 197 ++++++++++-------- .../opal-server/opal_server/security/api.py | 41 ++-- 8 files changed, 250 insertions(+), 141 deletions(-) diff --git a/packages/opal-common/opal_common/monitoring/prometheus_metrics.py b/packages/opal-common/opal_common/monitoring/prometheus_metrics.py index b831154af..c3a4c484d 100644 --- a/packages/opal-common/opal_common/monitoring/prometheus_metrics.py +++ b/packages/opal-common/opal_common/monitoring/prometheus_metrics.py @@ -1,50 +1,117 @@ from prometheus_client import Counter, Gauge, Histogram -# Data update metrics -data_update_total = Counter( - 'opal_data_update_total', 'Total number of data update events published' +# Opal Server Data update metrics +opal_server_data_update_total = Counter( + 'opal_server_data_update_total', 'Total number of data update events published to opal server' ) -data_update_errors = Counter( - 'opal_data_update_errors', 'Total number of errors in data update publishing' +opal_server_data_update_errors = Counter( + 'opal_server_data_update_errors', 'Total number of errors in opal server data update publishing' ) -data_update_latency = Histogram( - 'opal_data_update_latency_seconds', - 'Latency of data update publishing in seconds' +opal_server_data_update_latency = Histogram( + 'opal_opal_server_data_update_latency_seconds', + 'Latency of data update publishing to opal server in seconds' ) -data_update_count_per_topic = Counter( - 'opal_data_update_count_per_topic', - 'Count of data updates published per topic', +opal_server_data_update_count_per_topic = Counter( + 'opal_server_data_update_count_per_topic', + 'Count of data updates published per topic to opal server', labelnames=['topic'] ) -# Policy update metrics -policy_update_count = Counter( - 'opal_policy_update_count', - 'Total number of policy updates triggered', +# Opal Server Policy update metrics +opal_server_policy_update_count = Counter( + 'pal_server_policy_update_count', + 'Total number of policy updates triggered to opal server', labelnames=['source'] ) -policy_update_latency = Histogram( - 'opal_policy_update_latency_seconds', +opal_server_policy_update_latency = Histogram( + 'opal_server_policy_update_latency_seconds', 'Latency of policy bundle generation in seconds', labelnames=['source'] ) -policy_bundle_request_count = Counter( - 'opal_policy_bundle_request_count', +opal_server_policy_bundle_request_count = Counter( + 'opal_server_policy_bundle_request_count', 'Total number of policy bundle requests' ) -policy_bundle_latency = Histogram( - 'opal_policy_bundle_latency_seconds', +opal_server_policy_bundle_latency = Histogram( + 'opal_server_policy_bundle_latency_seconds', 'Latency of serving policy bundles in seconds' ) -policy_update_size = Histogram( - 'opal_policy_update_size', +opal_server_policy_update_size = Histogram( + 'opal_server_policy_update_size', 'Size of policy updates (in number of files)', buckets=[1, 10, 50, 100, 500, 1000] +) + +# Scope metrics +opal_server_scope_request_count = Counter( + 'opal_server_scope_request_count', + 'Total number of requests to scope endpoints', + labelnames=['endpoint', 'method'] +) + +opal_server_scope_request_latency = Histogram( + 'opal_server_scope_request_latency', + 'Latency of scope requests in seconds', + labelnames=['endpoint', 'method'] +) + +opal_server_scope_data_update_count = Counter( + 'opal_server_scope_data_update_count', + 'Total number of data updates published per scope', + labelnames=['scope_id'] +) + +opal_server_scope_data_update_errors = Counter( + 'opal_server_scope_data_update_errors', + 'Total number of errors during data update publication per scope', + labelnames=['scope_id'] +) + +opal_server_scope_data_update_latency = Histogram( + 'opal_server_scope_data_update_latency_seconds', + 'Latency of data update publishing in seconds per scope', + labelnames=['scope_id'] +) + +opal_server_scope_policy_sync_count = Counter( + 'opal_server_scope_policy_sync_count', + 'Total number of policy syncs per scope', + labelnames=['scope_id'] +) + +opal_server_scope_policy_sync_latency = Histogram( + 'opal_server_scope_policy_sync_latency_seconds', + 'Latency of policy sync in seconds per scope', + labelnames=['scope_id'] +) + +opal_server_scope_error_count = Counter( + 'opal_server_scope_error_count', + 'Total count of errors encountered per scope operation', + labelnames=['scope_id', 'error_type'] +) + +# Define metrics +token_request_count = Counter( + "opal_token_request_count", + "Total number of token requests", +) + +token_generation_errors = Counter( + "opal_token_generation_errors", + "Total number of errors during token generation", + labelnames=["error_type"] +) + +token_generated_count = Counter( + "opal_token_generated_count", + "Total number of tokens successfully generated", + labelnames=["peer_type"] ) \ No newline at end of file diff --git a/packages/opal-server/opal_server/data/api.py b/packages/opal-server/opal_server/data/api.py index 4109024c8..d015938a2 100644 --- a/packages/opal-server/opal_server/data/api.py +++ b/packages/opal-server/opal_server/data/api.py @@ -20,7 +20,7 @@ from opal_common.urls import set_url_query_param from opal_server.config import opal_server_config from opal_server.data.data_update_publisher import DataUpdatePublisher -from opal_common.monitoring.prometheus_metrics import data_update_total, data_update_errors +from opal_common.monitoring.prometheus_metrics import opal_server_data_update_total, opal_server_data_update_errors def init_data_updates_router( @@ -126,10 +126,10 @@ async def publish_data_update_event( authenticator, claims, update ) # may throw Unauthorized except Unauthorized as e: - data_update_errors.inc() + opal_server_data_update_errors.inc() logger.error(f"Unauthorized to publish update: {repr(e)}") raise - data_update_total.inc() + opal_server_data_update_total.inc() await data_update_publisher.publish_data_updates(update) return {"status": "ok"} diff --git a/packages/opal-server/opal_server/data/data_update_publisher.py b/packages/opal-server/opal_server/data/data_update_publisher.py index bfec77244..6eb1d43b9 100644 --- a/packages/opal-server/opal_server/data/data_update_publisher.py +++ b/packages/opal-server/opal_server/data/data_update_publisher.py @@ -11,8 +11,8 @@ ) from opal_common.topics.publisher import TopicPublisher from opal_common.monitoring.prometheus_metrics import ( - data_update_latency, - data_update_count_per_topic + opal_server_data_update_latency, + opal_server_data_update_count_per_topic ) @@ -76,7 +76,7 @@ async def publish_data_updates(self, update: DataUpdate): update (DataUpdate): update data-source configuration for subscribers to fetch data from """ - with data_update_latency.time(): + with opal_server_data_update_latency.time(): all_topic_combos = set() # a nicer format of entries to the log @@ -101,7 +101,7 @@ async def publish_data_updates(self, update: DataUpdate): all_topic_combos.update(topic_combos) for topic in topic_combos: - data_update_count_per_topic.labels(topic).inc() + opal_server_data_update_count_per_topic.labels(topic).inc() else: logger.warning( "[{pid}] No topics were provided for the following entry: {entry}", diff --git a/packages/opal-server/opal_server/policy/bundles/api.py b/packages/opal-server/opal_server/policy/bundles/api.py index bd320a034..564404676 100644 --- a/packages/opal-server/opal_server/policy/bundles/api.py +++ b/packages/opal-server/opal_server/policy/bundles/api.py @@ -14,9 +14,9 @@ from opal_server.config import opal_server_config from starlette.responses import RedirectResponse from opal_common.monitoring.prometheus_metrics import ( - policy_bundle_request_count, - policy_bundle_latency, - policy_update_size, + opal_server_policy_bundle_request_count, + opal_server_policy_bundle_latency, + opal_server_policy_update_size, ) router = APIRouter() @@ -105,9 +105,9 @@ async def get_policy( description="hash of previous bundle already downloaded, server will return a diff bundle.", ), ): - policy_bundle_request_count.inc() + opal_server_policy_bundle_request_count.inc() - with policy_bundle_latency.time(): + with opal_server_policy_bundle_latency.time(): maker = BundleMaker( repo, in_directories=set(input_paths), @@ -131,7 +131,7 @@ async def get_policy( ) if bundle.deleted_files: bundle_size += len(bundle.deleted_files.files) - policy_update_size.observe(bundle_size) + opal_server_policy_update_size.observe(bundle_size) return bundle try: old_commit = repo.commit(base_hash) @@ -142,7 +142,7 @@ async def get_policy( ) if diff_bundle.deleted_files: diff_bundle_size += len(diff_bundle.deleted_files.files) - policy_update_size.observe(diff_bundle_size) + opal_server_policy_update_size.observe(diff_bundle_size) return diff_bundle except ValueError: raise HTTPException( diff --git a/packages/opal-server/opal_server/policy/watcher/callbacks.py b/packages/opal-server/opal_server/policy/watcher/callbacks.py index 04ab59f9a..6b760f203 100644 --- a/packages/opal-server/opal_server/policy/watcher/callbacks.py +++ b/packages/opal-server/opal_server/policy/watcher/callbacks.py @@ -18,7 +18,7 @@ ) from opal_common.topics.publisher import TopicPublisher from opal_common.topics.utils import policy_topics -from opal_common.monitoring.prometheus_metrics import policy_update_size +from opal_common.monitoring.prometheus_metrics import opal_server_policy_update_size async def create_update_all_directories_in_repo( @@ -117,7 +117,7 @@ async def publish_changed_directories( ) if notification: - policy_update_size.observe(len(notification.update.changed_directories)) + opal_server_policy_update_size.observe(len(notification.update.changed_directories)) async with publisher: await publisher.publish( diff --git a/packages/opal-server/opal_server/policy/watcher/task.py b/packages/opal-server/opal_server/policy/watcher/task.py index 42f88a072..5878ff712 100644 --- a/packages/opal-server/opal_server/policy/watcher/task.py +++ b/packages/opal-server/opal_server/policy/watcher/task.py @@ -9,8 +9,8 @@ from opal_common.sources.base_policy_source import BasePolicySource from opal_server.config import opal_server_config from opal_common.monitoring.prometheus_metrics import ( - policy_update_count, - policy_update_latency + opal_server_policy_update_count, + opal_server_policy_update_latency ) @@ -127,7 +127,7 @@ async def stop(self): async def trigger(self, topic: Topic, data: Any): """triggers the policy watcher from outside to check for changes (git pull)""" - policy_update_count.labels(source="webhook").inc() + opal_server_policy_update_count.labels(source="webhook").inc() - with policy_update_latency.labels(source="webhook").time(): + with opal_server_policy_update_latency.labels(source="webhook").time(): await self._watcher.check_for_changes() diff --git a/packages/opal-server/opal_server/scopes/api.py b/packages/opal-server/opal_server/scopes/api.py index 95181866a..aaccdd74e 100644 --- a/packages/opal-server/opal_server/scopes/api.py +++ b/packages/opal-server/opal_server/scopes/api.py @@ -40,6 +40,11 @@ ServerSideTopicPublisher, ) from opal_common.urls import set_url_query_param +from opal_common.monitoring.prometheus_metrics import ( + opal_server_scope_request_count, + opal_server_scope_request_latency, + opal_server_scope_error_count, +) from opal_server.config import opal_server_config from opal_server.data.data_update_publisher import DataUpdatePublisher from opal_server.git_fetcher import GitPolicyFetcher @@ -104,25 +109,28 @@ async def put_scope( scope_in: Scope, claims: JWTClaims = Depends(authenticator), ): - try: - require_peer_type(authenticator, claims, PeerType.datasource) - except Unauthorized as ex: - logger.error(f"Unauthorized to PUT scope: {repr(ex)}") - raise + opal_server_scope_request_count.labels(endpoint="put_scope", method="PUT").inc() + with opal_server_scope_request_latency.labels(endpoint="put_scope", method="PUT").time(): + try: + require_peer_type(authenticator, claims, PeerType.datasource) + except Unauthorized as ex: + logger.error(f"Unauthorized to PUT scope: {repr(ex)}") + opal_server_scope_error_count.labels(scope_id=scope_in.scope_id, error_type="Unauthorized").inc() + raise - verify_private_key_or_throw(scope_in) - await scopes.put(scope_in) + verify_private_key_or_throw(scope_in) + await scopes.put(scope_in) - force_fetch_str = " (force fetch)" if force_fetch else "" - logger.info(f"Sync scope: {scope_in.scope_id}{force_fetch_str}") + force_fetch_str = " (force fetch)" if force_fetch else "" + logger.info(f"Sync scope: {scope_in.scope_id}{force_fetch_str}") - # All server replicas (leaders) should sync the scope. - await pubsub_endpoint.publish( - opal_server_config.POLICY_REPO_WEBHOOK_TOPIC, - {"scope_id": scope_in.scope_id, "force_fetch": force_fetch}, - ) + # All server replicas (leaders) should sync the scope. + await pubsub_endpoint.publish( + opal_server_config.POLICY_REPO_WEBHOOK_TOPIC, + {"scope_id": scope_in.scope_id, "force_fetch": force_fetch}, + ) - return Response(status_code=status.HTTP_201_CREATED) + return Response(status_code=status.HTTP_201_CREATED) @router.get( "", @@ -130,13 +138,16 @@ async def put_scope( response_model_exclude={"policy": {"auth"}}, ) async def get_all_scopes(*, claims: JWTClaims = Depends(authenticator)): - try: - require_peer_type(authenticator, claims, PeerType.datasource) - except Unauthorized as ex: - logger.error(f"Unauthorized to get scopes: {repr(ex)}") - raise + opal_server_scope_request_count.labels(endpoint="get_all_scopes", method="GET").inc() + with opal_server_scope_request_latency.labels(endpoint="get_all_scopes", method="GET").time(): + try: + require_peer_type(authenticator, claims, PeerType.datasource) + except Unauthorized as ex: + logger.error(f"Unauthorized to get scopes: {repr(ex)}") + opal_server_scope_error_count.labels(scope_id="all_scopes", error_type="Unauthorized").inc() + raise - return await scopes.all() + return await scopes.all() @router.get( "/{scope_id}", @@ -144,19 +155,22 @@ async def get_all_scopes(*, claims: JWTClaims = Depends(authenticator)): response_model_exclude={"policy": {"auth"}}, ) async def get_scope(*, scope_id: str, claims: JWTClaims = Depends(authenticator)): - try: - require_peer_type(authenticator, claims, PeerType.datasource) - except Unauthorized as ex: - logger.error(f"Unauthorized to get scope: {repr(ex)}") - raise + opal_server_scope_request_count.labels(endpoint="get_scope", method="GET").inc() + with opal_server_scope_request_latency.labels(endpoint="get_scope", method="GET").time(): + try: + require_peer_type(authenticator, claims, PeerType.datasource) + except Unauthorized as ex: + logger.error(f"Unauthorized to get scope: {repr(ex)}") + opal_server_scope_error_count.labels(scope_id=scope_id, error_type="Unauthorized").inc() + raise - try: - scope = await scopes.get(scope_id) - return scope - except ScopeNotFoundError: - raise HTTPException( - status.HTTP_404_NOT_FOUND, detail=f"No such scope: {scope_id}" - ) + try: + scope = await scopes.get(scope_id) + return scope + except ScopeNotFoundError: + raise HTTPException( + status.HTTP_404_NOT_FOUND, detail=f"No such scope: {scope_id}" + ) @router.delete( "/{scope_id}", @@ -165,16 +179,19 @@ async def get_scope(*, scope_id: str, claims: JWTClaims = Depends(authenticator) async def delete_scope( *, scope_id: str, claims: JWTClaims = Depends(authenticator) ): - try: - require_peer_type(authenticator, claims, PeerType.datasource) - except Unauthorized as ex: - logger.error(f"Unauthorized to delete scope: {repr(ex)}") - raise + opal_server_scope_request_count.labels(endpoint="delete_scope", method="DELETE").inc() + with opal_server_scope_request_latency.labels(endpoint="delete_scope", method="DELETE").time(): + try: + require_peer_type(authenticator, claims, PeerType.datasource) + except Unauthorized as ex: + logger.error(f"Unauthorized to delete scope: {repr(ex)}") + opal_server_scope_error_count.labels(scope_id=scope_id, error_type="Unauthorized").inc() + raise - # TODO: This should also asynchronously clean the repo from the disk (if it's not used by other scopes) - await scopes.delete(scope_id) + # TODO: This should also asynchronously clean the repo from the disk (if it's not used by other scopes) + await scopes.delete(scope_id) - return Response(status_code=status.HTTP_204_NO_CONTENT) + return Response(status_code=status.HTTP_204_NO_CONTENT) @router.post("/{scope_id}/refresh", status_code=status.HTTP_200_OK) async def refresh_scope( @@ -187,51 +204,58 @@ async def refresh_scope( ), claims: JWTClaims = Depends(authenticator), ): - try: - require_peer_type(authenticator, claims, PeerType.datasource) - except Unauthorized as ex: - logger.error(f"Unauthorized to delete scope: {repr(ex)}") - raise + opal_server_scope_request_count.labels(endpoint="refresh_scope", method="POST").inc() + with opal_server_scope_request_latency.labels(endpoint="refresh_scope", method="POST").time(): + try: + require_peer_type(authenticator, claims, PeerType.datasource) + except Unauthorized as ex: + logger.error(f"Unauthorized to delete scope: {repr(ex)}") + opal_server_scope_error_count.labels(scope_id=scope_id, error_type="Unauthorized").inc() + raise - try: - _ = await scopes.get(scope_id) + try: + _ = await scopes.get(scope_id) - logger.info(f"Refresh scope: {scope_id}") + logger.info(f"Refresh scope: {scope_id}") - # If the hinted hash is None, we have no way to know whether we should - # re-fetch the remote, so we force fetch, just in case. - force_fetch = hinted_hash is None + # If the hinted hash is None, we have no way to know whether we should + # re-fetch the remote, so we force fetch, just in case. + force_fetch = hinted_hash is None - # All server replicas (leaders) should sync the scope. - await pubsub_endpoint.publish( - opal_server_config.POLICY_REPO_WEBHOOK_TOPIC, - { - "scope_id": scope_id, - "force_fetch": force_fetch, - "hinted_hash": hinted_hash, - }, - ) + # All server replicas (leaders) should sync the scope. + await pubsub_endpoint.publish( + opal_server_config.POLICY_REPO_WEBHOOK_TOPIC, + { + "scope_id": scope_id, + "force_fetch": force_fetch, + "hinted_hash": hinted_hash, + }, + ) - return Response(status_code=status.HTTP_200_OK) + return Response(status_code=status.HTTP_200_OK) - except ScopeNotFoundError: - raise HTTPException( - status.HTTP_404_NOT_FOUND, detail=f"No such scope: {scope_id}" - ) + except ScopeNotFoundError: + opal_server_scope_error_count.labels(scope_id=scope_id, error_type="NotFound").inc() + raise HTTPException( + status.HTTP_404_NOT_FOUND, detail=f"No such scope: {scope_id}" + ) @router.post("/refresh", status_code=status.HTTP_200_OK) async def sync_all_scopes(claims: JWTClaims = Depends(authenticator)): """sync all scopes.""" - try: - require_peer_type(authenticator, claims, PeerType.datasource) - except Unauthorized as ex: - logger.error(f"Unauthorized to refresh all scopes: {repr(ex)}") - raise + opal_server_scope_request_count.labels(endpoint="sync_all_scopes", method="POST").inc() + with opal_server_scope_request_latency.labels(endpoint="sync_all_scopes", method="POST").time(): + try: + require_peer_type(authenticator, claims, PeerType.datasource) + except Unauthorized as ex: + logger.error(f"Unauthorized to refresh all scopes: {repr(ex)}") + opal_server_scope_error_count.labels(scope_id="all_scopes", error_type="Unauthorized").inc() + raise - # All server replicas (leaders) should sync all scopes. - await pubsub_endpoint.publish(opal_server_config.POLICY_REPO_WEBHOOK_TOPIC) + # All server replicas (leaders) should sync all scopes. + await pubsub_endpoint.publish(opal_server_config.POLICY_REPO_WEBHOOK_TOPIC) - return Response(status_code=status.HTTP_200_OK) + return Response(status_code=status.HTTP_200_OK) @router.get( "/{scope_id}/policy", @@ -341,19 +365,22 @@ async def publish_data_update_event( claims: JWTClaims = Depends(authenticator), scope_id: str = Path(..., description="Scope ID"), ): - try: - require_peer_type(authenticator, claims, PeerType.datasource) + opal_server_scope_request_count.labels(endpoint="publish_data_update_event", method="POST").inc() + with opal_server_scope_request_latency.labels(endpoint="publish_data_update_event", method="POST").time(): + try: + require_peer_type(authenticator, claims, PeerType.datasource) - restrict_optional_topics_to_publish(authenticator, claims, update) + restrict_optional_topics_to_publish(authenticator, claims, update) - for entry in update.entries: - entry.topics = [f"data:{topic}" for topic in entry.topics] + for entry in update.entries: + entry.topics = [f"data:{topic}" for topic in entry.topics] - await DataUpdatePublisher( - ScopedServerSideTopicPublisher(pubsub_endpoint, scope_id) - ).publish_data_updates(update) - except Unauthorized as ex: - logger.error(f"Unauthorized to publish update: {repr(ex)}") - raise + await DataUpdatePublisher( + ScopedServerSideTopicPublisher(pubsub_endpoint, scope_id) + ).publish_data_updates(update) + except Unauthorized as ex: + logger.error(f"Unauthorized to publish update: {repr(ex)}") + opal_server_scope_error_count.labels(scope_id=scope_id, error_type="Unauthorized").inc() + raise return router diff --git a/packages/opal-server/opal_server/security/api.py b/packages/opal-server/opal_server/security/api.py index a17235163..319774b2f 100644 --- a/packages/opal-server/opal_server/security/api.py +++ b/packages/opal-server/opal_server/security/api.py @@ -5,6 +5,11 @@ from opal_common.authentication.signer import JWTSigner from opal_common.logger import logger from opal_common.schemas.security import AccessToken, AccessTokenRequest, TokenDetails +from opal_common.monitoring.prometheus_metrics import ( + token_generation_errors, + token_generated_count, + token_request_count, +) def init_security_router(signer: JWTSigner, authenticator: StaticBearerAuthenticator): @@ -17,23 +22,33 @@ def init_security_router(signer: JWTSigner, authenticator: StaticBearerAuthentic dependencies=[Depends(authenticator)], ) async def generate_new_access_token(req: AccessTokenRequest): + token_request_count.inc() if not signer.enabled: + token_generation_errors.labels(error_type="SignerDisabled").inc() raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="opal server was not configured with security, cannot generate tokens!", ) - - claims = {"peer_type": req.type.value, **req.claims} - token = signer.sign(sub=req.id, token_lifetime=req.ttl, custom_claims=claims) - logger.info(f"Generated opal token: peer_type={req.type.value}") - return AccessToken( - token=token, - details=TokenDetails( - id=req.id, - type=req.type, - expired=datetime.utcnow() + req.ttl, - claims=claims, - ), - ) + try: + claims = {"peer_type": req.type.value, **req.claims} + token = signer.sign(sub=req.id, token_lifetime=req.ttl, custom_claims=claims) + logger.info(f"Generated opal token: peer_type={req.type.value}") + token_generated_count.labels(peer_type=req.type.value).inc() + return AccessToken( + token=token, + details=TokenDetails( + id=req.id, + type=req.type, + expired=datetime.utcnow() + req.ttl, + claims=claims, + ), + ) + except Exception as ex: + logger.error(f"Failed to generate token: {str(ex)}") + token_generation_errors.labels(error_type="TokenGenerationFailed").inc() + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="Failed to generate token due to server error.", + ) return router From f44e2bb8e1b9b6e95c0544d4ad863058e35a46a5 Mon Sep 17 00:00:00 2001 From: Piyush Sardana Date: Thu, 14 Nov 2024 17:06:39 +0100 Subject: [PATCH 08/29] added client metrics endpoint and total active clients metric --- packages/opal-client/opal_client/client.py | 7 +++++++ .../opal_common/monitoring/prometheus_metrics.py | 6 ++++++ packages/opal-server/opal_server/pubsub.py | 9 +++++++++ 3 files changed, 22 insertions(+) diff --git a/packages/opal-client/opal_client/client.py b/packages/opal-client/opal_client/client.py index 6a9958b98..ab8a23a63 100644 --- a/packages/opal-client/opal_client/client.py +++ b/packages/opal-client/opal_client/client.py @@ -6,6 +6,8 @@ import uuid from logging import disable from typing import Awaitable, Callable, List, Literal, Optional, Union +from prometheus_client import CONTENT_TYPE_LATEST, generate_latest +from fastapi.responses import Response import aiofiles import aiofiles.os @@ -307,6 +309,11 @@ async def ready(): status_code=status.HTTP_503_SERVICE_UNAVAILABLE, content={"status": "unavailable"}, ) + + @app.get("/metrics", include_in_schema=False) + async def metrics(): + """Endpoint to expose Prometheus metrics.""" + return Response(generate_latest(), media_type=CONTENT_TYPE_LATEST) return app diff --git a/packages/opal-common/opal_common/monitoring/prometheus_metrics.py b/packages/opal-common/opal_common/monitoring/prometheus_metrics.py index c3a4c484d..87c5d6d97 100644 --- a/packages/opal-common/opal_common/monitoring/prometheus_metrics.py +++ b/packages/opal-common/opal_common/monitoring/prometheus_metrics.py @@ -114,4 +114,10 @@ "opal_token_generated_count", "Total number of tokens successfully generated", labelnames=["peer_type"] +) + +active_clients = Gauge( + 'opal_active_clients_total', + 'Number of currently connected OPAL clients', + ['client_id', 'source'] ) \ No newline at end of file diff --git a/packages/opal-server/opal_server/pubsub.py b/packages/opal-server/opal_server/pubsub.py index 26d47c422..6c16d7c59 100644 --- a/packages/opal-server/opal_server/pubsub.py +++ b/packages/opal-server/opal_server/pubsub.py @@ -28,6 +28,10 @@ from opal_common.confi.confi import load_conf_if_none from opal_common.config import opal_common_config from opal_common.logger import logger +from opal_common.monitoring.prometheus_metrics import ( + active_clients, + opal_server_data_update_errors +) from opal_server.config import opal_server_config from pydantic import BaseModel from starlette.datastructures import QueryParams @@ -79,6 +83,8 @@ def new_client( connect_time=time.time(), query_params=query_params, ) + source = f"{source_host}:{source_port}" if source_host and source_port else "unknown" + active_clients.labels(client_id=client_id, source=source).inc() client_info.refcount += 1 self._clients_by_ids[client_id] = client_info yield client_info @@ -87,6 +93,9 @@ def new_client( client_info.refcount -= 1 if client_info.refcount >= 1: self._clients_by_ids[client_id] = client_info + else: + source = f"{client_info.source_host}:{client_info.source_port}" if client_info.source_host and client_info.source_port else "unknown" + active_clients.labels(client_id=client_id, source=source).dec() async def on_subscribe( self, From 00830ebd906a09b14cc307bed96c4f5916104e81 Mon Sep 17 00:00:00 2001 From: Piyush Sardana Date: Thu, 14 Nov 2024 17:10:53 +0100 Subject: [PATCH 09/29] data topic subscribed by client --- .../opal_common/monitoring/prometheus_metrics.py | 6 ++++++ packages/opal-server/opal_server/pubsub.py | 12 +++++++++++- 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/packages/opal-common/opal_common/monitoring/prometheus_metrics.py b/packages/opal-common/opal_common/monitoring/prometheus_metrics.py index 87c5d6d97..0b974e9b8 100644 --- a/packages/opal-common/opal_common/monitoring/prometheus_metrics.py +++ b/packages/opal-common/opal_common/monitoring/prometheus_metrics.py @@ -120,4 +120,10 @@ 'opal_active_clients_total', 'Number of currently connected OPAL clients', ['client_id', 'source'] +) + +client_data_subscriptions = Gauge( + 'opal_client_data_subscriptions', + 'Number of data topics a client is subscribed to', + ['client_id', 'topic'] ) \ No newline at end of file diff --git a/packages/opal-server/opal_server/pubsub.py b/packages/opal-server/opal_server/pubsub.py index 6c16d7c59..655d0ba87 100644 --- a/packages/opal-server/opal_server/pubsub.py +++ b/packages/opal-server/opal_server/pubsub.py @@ -30,7 +30,7 @@ from opal_common.logger import logger from opal_common.monitoring.prometheus_metrics import ( active_clients, - opal_server_data_update_errors + client_data_subscriptions ) from opal_server.config import opal_server_config from pydantic import BaseModel @@ -110,6 +110,11 @@ async def on_subscribe( # on_subscribe is sometimes called for the broadcaster, when there is no "current client" if client_info is not None: client_info.subscribed_topics.update(topics) + for topic in topics: + client_data_subscriptions.labels( + client_id=client_info.client_id, + topic=topic + ).inc() async def on_unsubscribe( self, @@ -124,6 +129,11 @@ async def on_unsubscribe( # on_subscribe is sometimes called for the broadcaster, when there is no "current client" if client_info is not None: client_info.subscribed_topics.difference_update(topics) + for topic in topics: + client_data_subscriptions.labels( + client_id=client_info.client_id, + topic=topic + ).dec() class PubSub: From 192a255e3f37d12b3a62566f634a5b626d6998ae Mon Sep 17 00:00:00 2001 From: Piyush Sardana Date: Thu, 14 Nov 2024 17:57:57 +0100 Subject: [PATCH 10/29] added token type in prometheus metric --- packages/opal-client/requires.txt | 1 + .../opal-common/opal_common/monitoring/prometheus_metrics.py | 1 + packages/opal-server/opal_server/security/api.py | 2 +- packages/opal-server/requires.txt | 1 + 4 files changed, 4 insertions(+), 1 deletion(-) diff --git a/packages/opal-client/requires.txt b/packages/opal-client/requires.txt index 0fb2499eb..5b57d2593 100644 --- a/packages/opal-client/requires.txt +++ b/packages/opal-client/requires.txt @@ -4,3 +4,4 @@ psutil>=5.9.1,<6 tenacity>=8.0.1,<9 dpath>=2.1.5,<3 jsonpatch>=1.33,<2 +prometheus_client \ No newline at end of file diff --git a/packages/opal-common/opal_common/monitoring/prometheus_metrics.py b/packages/opal-common/opal_common/monitoring/prometheus_metrics.py index 0b974e9b8..057cf4393 100644 --- a/packages/opal-common/opal_common/monitoring/prometheus_metrics.py +++ b/packages/opal-common/opal_common/monitoring/prometheus_metrics.py @@ -102,6 +102,7 @@ token_request_count = Counter( "opal_token_request_count", "Total number of token requests", + ['token_type'] ) token_generation_errors = Counter( diff --git a/packages/opal-server/opal_server/security/api.py b/packages/opal-server/opal_server/security/api.py index 319774b2f..0ab4d8135 100644 --- a/packages/opal-server/opal_server/security/api.py +++ b/packages/opal-server/opal_server/security/api.py @@ -22,7 +22,7 @@ def init_security_router(signer: JWTSigner, authenticator: StaticBearerAuthentic dependencies=[Depends(authenticator)], ) async def generate_new_access_token(req: AccessTokenRequest): - token_request_count.inc() + token_request_count.labels(token_type=req.type.value).inc() if not signer.enabled: token_generation_errors.labels(error_type="SignerDisabled").inc() raise HTTPException( diff --git a/packages/opal-server/requires.txt b/packages/opal-server/requires.txt index ff3e5cb13..9dd75773e 100644 --- a/packages/opal-server/requires.txt +++ b/packages/opal-server/requires.txt @@ -7,3 +7,4 @@ slowapi>=0.1.5,<1 pygit2>=1.14.1,<1.15 asgiref>=3.5.2,<4 redis>=4.3.4,<5 +prometheus_client \ No newline at end of file From bd32ddb6b0796eeec79edce1db4b13a7a80e1b6c Mon Sep 17 00:00:00 2001 From: Piyush Sardana Date: Fri, 15 Nov 2024 06:25:32 +0100 Subject: [PATCH 11/29] added labels to the metrics for data and policy updates --- .../monitoring/prometheus_metrics.py | 26 +++++-- packages/opal-server/opal_server/data/api.py | 29 ++++++-- .../opal_server/policy/bundles/api.py | 73 ++++++++++--------- .../opal_server/policy/watcher/callbacks.py | 4 +- .../opal_server/policy/watcher/task.py | 29 +++++++- 5 files changed, 108 insertions(+), 53 deletions(-) diff --git a/packages/opal-common/opal_common/monitoring/prometheus_metrics.py b/packages/opal-common/opal_common/monitoring/prometheus_metrics.py index 057cf4393..4517399b0 100644 --- a/packages/opal-common/opal_common/monitoring/prometheus_metrics.py +++ b/packages/opal-common/opal_common/monitoring/prometheus_metrics.py @@ -2,50 +2,60 @@ # Opal Server Data update metrics opal_server_data_update_total = Counter( - 'opal_server_data_update_total', 'Total number of data update events published to opal server' + 'opal_server_data_update_total', + 'Total number of data update events published to opal server', + ["status"] ) opal_server_data_update_errors = Counter( - 'opal_server_data_update_errors', 'Total number of errors in opal server data update publishing' + 'opal_server_data_update_errors', + 'Total number of errors in opal server data update publishing', + ["error_type"] ) opal_server_data_update_latency = Histogram( 'opal_opal_server_data_update_latency_seconds', - 'Latency of data update publishing to opal server in seconds' + 'Latency of data update publishing to opal server in seconds', + buckets=[.001, .005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 5.0] ) opal_server_data_update_count_per_topic = Counter( 'opal_server_data_update_count_per_topic', 'Count of data updates published per topic to opal server', - labelnames=['topic'] + ['topic'] ) # Opal Server Policy update metrics opal_server_policy_update_count = Counter( 'pal_server_policy_update_count', 'Total number of policy updates triggered to opal server', - labelnames=['source'] + ["source", "status"] ) opal_server_policy_update_latency = Histogram( 'opal_server_policy_update_latency_seconds', 'Latency of policy bundle generation in seconds', - labelnames=['source'] + ["source", "status"], + buckets=[.01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 5.0] ) opal_server_policy_bundle_request_count = Counter( 'opal_server_policy_bundle_request_count', - 'Total number of policy bundle requests' + 'Total number of policy bundle requests', + ["type"] ) opal_server_policy_bundle_latency = Histogram( 'opal_server_policy_bundle_latency_seconds', - 'Latency of serving policy bundles in seconds' + 'Latency of serving policy bundles in seconds', + ["type"], + buckets=[.01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 5.0] ) opal_server_policy_update_size = Histogram( 'opal_server_policy_update_size', 'Size of policy updates (in number of files)', + ["type"], buckets=[1, 10, 50, 100, 500, 1000] ) diff --git a/packages/opal-server/opal_server/data/api.py b/packages/opal-server/opal_server/data/api.py index d015938a2..007b06314 100644 --- a/packages/opal-server/opal_server/data/api.py +++ b/packages/opal-server/opal_server/data/api.py @@ -20,7 +20,10 @@ from opal_common.urls import set_url_query_param from opal_server.config import opal_server_config from opal_server.data.data_update_publisher import DataUpdatePublisher -from opal_common.monitoring.prometheus_metrics import opal_server_data_update_total, opal_server_data_update_errors +from opal_common.monitoring.prometheus_metrics import ( + opal_server_data_update_total, + opal_server_data_update_errors +) def init_data_updates_router( @@ -125,13 +128,27 @@ async def publish_data_update_event( restrict_optional_topics_to_publish( authenticator, claims, update ) # may throw Unauthorized + + opal_server_data_update_total.labels( + status="success", + type="update" + ).inc() + + await data_update_publisher.publish_data_updates(update) + return {"status": "ok"} except Unauthorized as e: - opal_server_data_update_errors.inc() + opal_server_data_update_errors.labels( + error_type="unauthorized", + endpoint="update" + ).inc() logger.error(f"Unauthorized to publish update: {repr(e)}") raise - opal_server_data_update_total.inc() - - await data_update_publisher.publish_data_updates(update) - return {"status": "ok"} + except Exception as e: + opal_server_data_update_errors.labels( + error_type="unknown", + endpoint="update" + ).inc() + logger.error(f"Failed to publish update: {repr(e)}") + raise return router diff --git a/packages/opal-server/opal_server/policy/bundles/api.py b/packages/opal-server/opal_server/policy/bundles/api.py index 564404676..dde4583a6 100644 --- a/packages/opal-server/opal_server/policy/bundles/api.py +++ b/packages/opal-server/opal_server/policy/bundles/api.py @@ -105,25 +105,27 @@ async def get_policy( description="hash of previous bundle already downloaded, server will return a diff bundle.", ), ): - opal_server_policy_bundle_request_count.inc() - - with opal_server_policy_bundle_latency.time(): - maker = BundleMaker( - repo, - in_directories=set(input_paths), - extensions=opal_server_config.FILTER_FILE_EXTENSIONS, - root_manifest_path=opal_server_config.POLICY_REPO_MANIFEST_PATH, - bundle_ignore=opal_server_config.BUNDLE_IGNORE, - ) - # check if commit exist in the repo - revision = None - if base_hash: - try: - revision = repo.rev_parse(base_hash) - except ValueError: - logger.warning(f"base_hash {base_hash} not exist in the repo") + """Get the policy bundle from the policy repo.""" + + + maker = BundleMaker( + repo, + in_directories=set(input_paths), + extensions=opal_server_config.FILTER_FILE_EXTENSIONS, + root_manifest_path=opal_server_config.POLICY_REPO_MANIFEST_PATH, + bundle_ignore=opal_server_config.BUNDLE_IGNORE, + ) + # check if commit exist in the repo + revision = None + if base_hash: + try: + revision = repo.rev_parse(base_hash) + except ValueError: + logger.warning(f"base_hash {base_hash} not exist in the repo") - if revision is None: + if revision is None: + opal_server_policy_bundle_request_count.labels(type="full").inc() + with opal_server_policy_bundle_latency.labels(type="full").time(): bundle = maker.make_bundle(repo.head.commit) bundle_size = ( (len(bundle.data_modules) if bundle.data_modules is not None else 0) + @@ -131,21 +133,24 @@ async def get_policy( ) if bundle.deleted_files: bundle_size += len(bundle.deleted_files.files) - opal_server_policy_update_size.observe(bundle_size) + opal_server_policy_update_size.labels(type="full").observe(bundle_size) return bundle - try: - old_commit = repo.commit(base_hash) - diff_bundle = maker.make_diff_bundle(old_commit, repo.head.commit) - diff_bundle_size = ( - (len(diff_bundle.data_modules) if diff_bundle.data_modules is not None else 0) + - (len(diff_bundle.policy_modules) if diff_bundle.policy_modules is not None else 0) + else: + opal_server_policy_bundle_request_count.labels(type="diff").inc() + with opal_server_policy_bundle_latency.labels(type="diff").time(): + try: + old_commit = repo.commit(base_hash) + diff_bundle = maker.make_diff_bundle(old_commit, repo.head.commit) + diff_bundle_size = ( + (len(diff_bundle.data_modules) if diff_bundle.data_modules is not None else 0) + + (len(diff_bundle.policy_modules) if diff_bundle.policy_modules is not None else 0) + ) + if diff_bundle.deleted_files: + diff_bundle_size += len(diff_bundle.deleted_files.files) + opal_server_policy_update_size.labels(type="diff").observe(diff_bundle_size) + return diff_bundle + except ValueError: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"commit with hash {base_hash} was not found in the policy repo!", ) - if diff_bundle.deleted_files: - diff_bundle_size += len(diff_bundle.deleted_files.files) - opal_server_policy_update_size.observe(diff_bundle_size) - return diff_bundle - except ValueError: - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail=f"commit with hash {base_hash} was not found in the policy repo!", - ) diff --git a/packages/opal-server/opal_server/policy/watcher/callbacks.py b/packages/opal-server/opal_server/policy/watcher/callbacks.py index 6b760f203..269ddeadf 100644 --- a/packages/opal-server/opal_server/policy/watcher/callbacks.py +++ b/packages/opal-server/opal_server/policy/watcher/callbacks.py @@ -117,7 +117,9 @@ async def publish_changed_directories( ) if notification: - opal_server_policy_update_size.observe(len(notification.update.changed_directories)) + opal_server_policy_update_size.labels(type="directory").observe( + len(notification.update.changed_directories) + ) async with publisher: await publisher.publish( diff --git a/packages/opal-server/opal_server/policy/watcher/task.py b/packages/opal-server/opal_server/policy/watcher/task.py index 5878ff712..165fa7e38 100644 --- a/packages/opal-server/opal_server/policy/watcher/task.py +++ b/packages/opal-server/opal_server/policy/watcher/task.py @@ -104,6 +104,10 @@ def _init_should_stop(self): async def _fail(self, exc: Exception): """called when the watcher fails, and stops all tasks gracefully.""" + opal_server_policy_update_count.labels( + source="watcher", + status="error" + ).inc() logger.error("policy watcher failed with exception: {err}", err=repr(exc)) self.signal_stop() # trigger uvicorn graceful shutdown @@ -127,7 +131,24 @@ async def stop(self): async def trigger(self, topic: Topic, data: Any): """triggers the policy watcher from outside to check for changes (git pull)""" - opal_server_policy_update_count.labels(source="webhook").inc() - - with opal_server_policy_update_latency.labels(source="webhook").time(): - await self._watcher.check_for_changes() + try: + opal_server_policy_update_count.labels( + source="webhook", + status="started" + ).inc() + + with opal_server_policy_update_latency.labels( + source="webhook", + status="success" + ).time(): + await self._watcher.check_for_changes() + opal_server_policy_update_count.labels( + source="webhook", + status="success" + ).inc() + except Exception as e: + opal_server_policy_update_count.labels( + source="webhook", + status="error" + ).inc() + raise From a6f39b93740a0e84bb8a6ac7d248b83efed826cf Mon Sep 17 00:00:00 2001 From: Piyush Sardana Date: Fri, 15 Nov 2024 06:32:52 +0100 Subject: [PATCH 12/29] added labels in token requests generations and errors --- .../monitoring/prometheus_metrics.py | 15 ++++--- .../opal-server/opal_server/security/api.py | 43 +++++++++++++++++-- 2 files changed, 47 insertions(+), 11 deletions(-) diff --git a/packages/opal-common/opal_common/monitoring/prometheus_metrics.py b/packages/opal-common/opal_common/monitoring/prometheus_metrics.py index 4517399b0..3d44541a4 100644 --- a/packages/opal-common/opal_common/monitoring/prometheus_metrics.py +++ b/packages/opal-common/opal_common/monitoring/prometheus_metrics.py @@ -112,20 +112,21 @@ token_request_count = Counter( "opal_token_request_count", "Total number of token requests", - ['token_type'] + ["token_type", "status"] +) + +token_generated_count = Counter( + "opal_token_generated_count", + "Number of successfully generated tokens", + ["peer_type", "ttl"] ) token_generation_errors = Counter( "opal_token_generation_errors", "Total number of errors during token generation", - labelnames=["error_type"] + ["error_type", "token_type"] ) -token_generated_count = Counter( - "opal_token_generated_count", - "Total number of tokens successfully generated", - labelnames=["peer_type"] -) active_clients = Gauge( 'opal_active_clients_total', diff --git a/packages/opal-server/opal_server/security/api.py b/packages/opal-server/opal_server/security/api.py index 0ab4d8135..1d9db4409 100644 --- a/packages/opal-server/opal_server/security/api.py +++ b/packages/opal-server/opal_server/security/api.py @@ -22,9 +22,22 @@ def init_security_router(signer: JWTSigner, authenticator: StaticBearerAuthentic dependencies=[Depends(authenticator)], ) async def generate_new_access_token(req: AccessTokenRequest): - token_request_count.labels(token_type=req.type.value).inc() + token_request_count.labels( + token_type=req.type.value, + status="received" + ).inc() + if not signer.enabled: - token_generation_errors.labels(error_type="SignerDisabled").inc() + token_generation_errors.labels( + error_type="SignerDisabled", + token_type=req.type.value + ).inc() + + token_request_count.labels( + token_type=req.type.value, + status="error" + ).inc() + raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="opal server was not configured with security, cannot generate tokens!", @@ -33,7 +46,17 @@ async def generate_new_access_token(req: AccessTokenRequest): claims = {"peer_type": req.type.value, **req.claims} token = signer.sign(sub=req.id, token_lifetime=req.ttl, custom_claims=claims) logger.info(f"Generated opal token: peer_type={req.type.value}") - token_generated_count.labels(peer_type=req.type.value).inc() + + token_generated_count.labels( + peer_type=req.type.value, + ttl=req.ttl + ).inc() + + token_request_count.labels( + token_type=req.type.value, + status="success" + ).inc() + return AccessToken( token=token, details=TokenDetails( @@ -45,7 +68,19 @@ async def generate_new_access_token(req: AccessTokenRequest): ) except Exception as ex: logger.error(f"Failed to generate token: {str(ex)}") - token_generation_errors.labels(error_type="TokenGenerationFailed").inc() + error_type = ( + "TokenGenerationFailed" + if "token" in str(ex).lower() + else "UnexpectedError" + ) + token_generation_errors.labels( + error_type=error_type, + token_type=req.type.value + ).inc() + token_request_count.labels( + token_type=req.type.value, + status="error" + ).inc() raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to generate token due to server error.", From 699e9da25fda9fb4ffa7219bd5547ac9088db2f3 Mon Sep 17 00:00:00 2001 From: Piyush Sardana Date: Fri, 15 Nov 2024 06:51:30 +0100 Subject: [PATCH 13/29] added more labels for prometheus metrics for scope --- .../monitoring/prometheus_metrics.py | 15 +- .../opal-server/opal_server/scopes/api.py | 141 +++++++++++++++--- 2 files changed, 128 insertions(+), 28 deletions(-) diff --git a/packages/opal-common/opal_common/monitoring/prometheus_metrics.py b/packages/opal-common/opal_common/monitoring/prometheus_metrics.py index 3d44541a4..6a4b7b191 100644 --- a/packages/opal-common/opal_common/monitoring/prometheus_metrics.py +++ b/packages/opal-common/opal_common/monitoring/prometheus_metrics.py @@ -63,13 +63,14 @@ opal_server_scope_request_count = Counter( 'opal_server_scope_request_count', 'Total number of requests to scope endpoints', - labelnames=['endpoint', 'method'] + ["endpoint", "method", "status"] ) opal_server_scope_request_latency = Histogram( 'opal_server_scope_request_latency', 'Latency of scope requests in seconds', - labelnames=['endpoint', 'method'] + ["endpoint", "method", "status"], + buckets=[.01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 5.0] ) opal_server_scope_data_update_count = Counter( @@ -105,10 +106,16 @@ opal_server_scope_error_count = Counter( 'opal_server_scope_error_count', 'Total count of errors encountered per scope operation', - labelnames=['scope_id', 'error_type'] + ["scope_id", "error_type", "endpoint"] +) + +opal_server_scope_operation_count = Counter( + "opal_server_scope_operation_count", + "Number of scope operations (create/update/delete)", + ["operation", "status"] ) -# Define metrics +# Generic metrics token_request_count = Counter( "opal_token_request_count", "Total number of token requests", diff --git a/packages/opal-server/opal_server/scopes/api.py b/packages/opal-server/opal_server/scopes/api.py index aaccdd74e..b3838f150 100644 --- a/packages/opal-server/opal_server/scopes/api.py +++ b/packages/opal-server/opal_server/scopes/api.py @@ -44,6 +44,12 @@ opal_server_scope_request_count, opal_server_scope_request_latency, opal_server_scope_error_count, + opal_server_scope_operation_count, + opal_server_scope_data_update_latency, + opal_server_scope_data_update_count, + opal_server_scope_data_update_errors, + opal_server_scope_policy_sync_latency, + opal_server_scope_policy_sync_count ) from opal_server.config import opal_server_config from opal_server.data.data_update_publisher import DataUpdatePublisher @@ -109,17 +115,33 @@ async def put_scope( scope_in: Scope, claims: JWTClaims = Depends(authenticator), ): - opal_server_scope_request_count.labels(endpoint="put_scope", method="PUT").inc() - with opal_server_scope_request_latency.labels(endpoint="put_scope", method="PUT").time(): + opal_server_scope_request_count.labels( + endpoint="put_scope", + method="PUT", + status="started" + ).inc() + with opal_server_scope_request_latency.labels( + endpoint="put_scope", + method="PUT", + status="processing" + ).time(): try: require_peer_type(authenticator, claims, PeerType.datasource) except Unauthorized as ex: logger.error(f"Unauthorized to PUT scope: {repr(ex)}") - opal_server_scope_error_count.labels(scope_id=scope_in.scope_id, error_type="Unauthorized").inc() + opal_server_scope_error_count.labels( + scope_id=scope_in.scope_id, + error_type="Unauthorized", + endpoint="put_scope" + ).inc() raise verify_private_key_or_throw(scope_in) await scopes.put(scope_in) + opal_server_scope_operation_count.labels( + operation="create", + status="success" + ).inc() force_fetch_str = " (force fetch)" if force_fetch else "" logger.info(f"Sync scope: {scope_in.scope_id}{force_fetch_str}") @@ -138,15 +160,30 @@ async def put_scope( response_model_exclude={"policy": {"auth"}}, ) async def get_all_scopes(*, claims: JWTClaims = Depends(authenticator)): - opal_server_scope_request_count.labels(endpoint="get_all_scopes", method="GET").inc() - with opal_server_scope_request_latency.labels(endpoint="get_all_scopes", method="GET").time(): + opal_server_scope_request_count.labels( + endpoint="get_all_scopes", + method="GET", + status="started" + ).inc() + with opal_server_scope_request_latency.labels( + endpoint="get_all_scopes", + method="GET", + status="processing" + ).time(): try: require_peer_type(authenticator, claims, PeerType.datasource) except Unauthorized as ex: logger.error(f"Unauthorized to get scopes: {repr(ex)}") - opal_server_scope_error_count.labels(scope_id="all_scopes", error_type="Unauthorized").inc() + opal_server_scope_error_count.labels( + scope_id="all_scopes", + error_type="Unauthorized", + endpoint="get_all_scopes" + ).inc() raise - + opal_server_scope_operation_count.labels( + operation="get_all", + status="success" + ).inc() return await scopes.all() @router.get( @@ -155,19 +192,40 @@ async def get_all_scopes(*, claims: JWTClaims = Depends(authenticator)): response_model_exclude={"policy": {"auth"}}, ) async def get_scope(*, scope_id: str, claims: JWTClaims = Depends(authenticator)): - opal_server_scope_request_count.labels(endpoint="get_scope", method="GET").inc() - with opal_server_scope_request_latency.labels(endpoint="get_scope", method="GET").time(): + opal_server_scope_request_count.labels( + endpoint="get_scope", + method="GET", + status="started" + ).inc() + with opal_server_scope_request_latency.labels( + endpoint="get_scope", + method="GET", + status="processing" + ).time(): try: require_peer_type(authenticator, claims, PeerType.datasource) except Unauthorized as ex: logger.error(f"Unauthorized to get scope: {repr(ex)}") - opal_server_scope_error_count.labels(scope_id=scope_id, error_type="Unauthorized").inc() + opal_server_scope_error_count.labels( + scope_id=scope_id, + error_type="Unauthorized", + endpoint="get_scope" + ).inc() raise try: scope = await scopes.get(scope_id) + opal_server_scope_operation_count.labels( + operation="get", + status="success" + ).inc() return scope except ScopeNotFoundError: + opal_server_scope_error_count.labels( + scope_id=scope_id, + error_type="NotFound", + endpoint="get_scope" + ).inc() raise HTTPException( status.HTTP_404_NOT_FOUND, detail=f"No such scope: {scope_id}" ) @@ -179,18 +237,33 @@ async def get_scope(*, scope_id: str, claims: JWTClaims = Depends(authenticator) async def delete_scope( *, scope_id: str, claims: JWTClaims = Depends(authenticator) ): - opal_server_scope_request_count.labels(endpoint="delete_scope", method="DELETE").inc() - with opal_server_scope_request_latency.labels(endpoint="delete_scope", method="DELETE").time(): + opal_server_scope_request_count.labels( + endpoint="delete_scope", + method="DELETE", + status="started" + ).inc() + with opal_server_scope_request_latency.labels( + endpoint="delete_scope", + method="DELETE", + status="processing" + ).time(): try: require_peer_type(authenticator, claims, PeerType.datasource) except Unauthorized as ex: logger.error(f"Unauthorized to delete scope: {repr(ex)}") - opal_server_scope_error_count.labels(scope_id=scope_id, error_type="Unauthorized").inc() + opal_server_scope_error_count.labels( + scope_id=scope_id, + error_type="Unauthorized", + endpoint="delete_scope" + ).inc() raise # TODO: This should also asynchronously clean the repo from the disk (if it's not used by other scopes) await scopes.delete(scope_id) - + opal_server_scope_operation_count.labels( + operation="delete", + status="success" + ).inc() return Response(status_code=status.HTTP_204_NO_CONTENT) @router.post("/{scope_id}/refresh", status_code=status.HTTP_200_OK) @@ -204,8 +277,7 @@ async def refresh_scope( ), claims: JWTClaims = Depends(authenticator), ): - opal_server_scope_request_count.labels(endpoint="refresh_scope", method="POST").inc() - with opal_server_scope_request_latency.labels(endpoint="refresh_scope", method="POST").time(): + with opal_server_scope_policy_sync_latency.labels(scope_id=scope_id).time(): try: require_peer_type(authenticator, claims, PeerType.datasource) except Unauthorized as ex: @@ -231,7 +303,7 @@ async def refresh_scope( "hinted_hash": hinted_hash, }, ) - + opal_server_scope_policy_sync_count.labels(scope_id=scope_id).inc() return Response(status_code=status.HTTP_200_OK) except ScopeNotFoundError: @@ -243,18 +315,33 @@ async def refresh_scope( @router.post("/refresh", status_code=status.HTTP_200_OK) async def sync_all_scopes(claims: JWTClaims = Depends(authenticator)): """sync all scopes.""" - opal_server_scope_request_count.labels(endpoint="sync_all_scopes", method="POST").inc() - with opal_server_scope_request_latency.labels(endpoint="sync_all_scopes", method="POST").time(): + opal_server_scope_request_count.labels( + endpoint="sync_all_scopes", + method="POST", + status="started" + ).inc() + with opal_server_scope_request_latency.labels( + endpoint="sync_all_scopes", + method="POST", + status="processing" + ).time(): try: require_peer_type(authenticator, claims, PeerType.datasource) except Unauthorized as ex: logger.error(f"Unauthorized to refresh all scopes: {repr(ex)}") - opal_server_scope_error_count.labels(scope_id="all_scopes", error_type="Unauthorized").inc() + opal_server_scope_error_count.labels( + scope_id="all_scopes", + error_type="Unauthorized", + endpoint="sync_all_scopes" + ).inc() raise # All server replicas (leaders) should sync all scopes. await pubsub_endpoint.publish(opal_server_config.POLICY_REPO_WEBHOOK_TOPIC) - + opal_server_scope_operation_count.labels( + operation="sync_all", + status="success" + ).inc() return Response(status_code=status.HTTP_200_OK) @router.get( @@ -365,8 +452,7 @@ async def publish_data_update_event( claims: JWTClaims = Depends(authenticator), scope_id: str = Path(..., description="Scope ID"), ): - opal_server_scope_request_count.labels(endpoint="publish_data_update_event", method="POST").inc() - with opal_server_scope_request_latency.labels(endpoint="publish_data_update_event", method="POST").time(): + with opal_server_scope_data_update_latency.labels(scope_id=scope_id).time(): try: require_peer_type(authenticator, claims, PeerType.datasource) @@ -378,9 +464,16 @@ async def publish_data_update_event( await DataUpdatePublisher( ScopedServerSideTopicPublisher(pubsub_endpoint, scope_id) ).publish_data_updates(update) + + opal_server_scope_data_update_count.labels(scope_id=scope_id).inc() except Unauthorized as ex: logger.error(f"Unauthorized to publish update: {repr(ex)}") - opal_server_scope_error_count.labels(scope_id=scope_id, error_type="Unauthorized").inc() + opal_server_scope_data_update_errors.labels(scope_id=scope_id).inc() + opal_server_scope_error_count.labels( + scope_id=scope_id, + error_type="UpdateFailed", + endpoint="data_update" + ).inc() raise return router From 140bac35bc3354b1cac302f914c2701b0b7a9700 Mon Sep 17 00:00:00 2001 From: Piyush Sardana Date: Fri, 15 Nov 2024 07:25:59 +0100 Subject: [PATCH 14/29] added metrics for opal client --- packages/opal-client/opal_client/data/api.py | 51 +++++++++--- .../opal-client/opal_client/policy/api.py | 29 ++++++- .../opal_client/policy_store/api.py | 80 +++++++++++++------ .../monitoring/prometheus_metrics.py | 66 ++++++++++++++- 4 files changed, 188 insertions(+), 38 deletions(-) diff --git a/packages/opal-client/opal_client/data/api.py b/packages/opal-client/opal_client/data/api.py index 32432b583..3ce785e60 100644 --- a/packages/opal-client/opal_client/data/api.py +++ b/packages/opal-client/opal_client/data/api.py @@ -3,7 +3,11 @@ from fastapi import APIRouter, HTTPException, status from opal_client.data.updater import DataUpdater from opal_common.logger import logger - +from opal_common.monitoring.prometheus_metrics import ( + opal_client_data_update_trigger_count, + opal_client_data_update_latency, + opal_client_data_update_errors +) def init_data_router(data_updater: Optional[DataUpdater]): router = APIRouter() @@ -11,15 +15,42 @@ def init_data_router(data_updater: Optional[DataUpdater]): @router.post("/data-updater/trigger", status_code=status.HTTP_200_OK) async def trigger_policy_data_update(): logger.info("triggered policy data update from api") - if data_updater: - await data_updater.get_base_policy_data( - data_fetch_reason="request from sdk" - ) - return {"status": "ok"} - else: + opal_client_data_update_trigger_count.labels( + source="api", + status="started" + ).inc() + try: + if data_updater: + with opal_client_data_update_latency.labels(source="api").time(): + await data_updater.get_base_policy_data( + data_fetch_reason="request from sdk" + ) + opal_client_data_update_trigger_count.labels( + source="api", + status="success" + ).inc() + return {"status": "ok"} + else: + opal_client_data_update_errors.labels( + error_type="updater_disabled", + source="api" + ).inc() + opal_client_data_update_trigger_count.labels( + source="api", + status="error" + ).inc() + raise HTTPException( + status_code=status.HTTP_503_SERVICE_UNAVAILABLE, + detail="Data Updater is currently disabled. Dynamic data updates are not available.", + ) + except Exception as e: + opal_client_data_update_errors.labels( + error_type="unknown", + source="api" + ).inc() + logger.error(f"Error during data update: {str(e)}") raise HTTPException( - status_code=status.HTTP_503_SERVICE_UNAVAILABLE, - detail="Data Updater is currently disabled. Dynamic data updates are not available.", + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="Failed to update data" ) - return router diff --git a/packages/opal-client/opal_client/policy/api.py b/packages/opal-client/opal_client/policy/api.py index 1d680161c..e5f70e620 100644 --- a/packages/opal-client/opal_client/policy/api.py +++ b/packages/opal-client/opal_client/policy/api.py @@ -1,7 +1,11 @@ from fastapi import APIRouter, status from opal_client.policy.updater import PolicyUpdater from opal_common.logger import logger - +from opal_common.monitoring.prometheus_metrics import ( + opal_client_policy_update_trigger_count, + opal_client_policy_update_latency, + opal_client_policy_update_errors +) def init_policy_router(policy_updater: PolicyUpdater): router = APIRouter() @@ -9,7 +13,26 @@ def init_policy_router(policy_updater: PolicyUpdater): @router.post("/policy-updater/trigger", status_code=status.HTTP_200_OK) async def trigger_policy_update(): logger.info("triggered policy update from api") - await policy_updater.trigger_update_policy(force_full_update=True) - return {"status": "ok"} + opal_client_policy_update_trigger_count.labels( + source="api", + status="started", + update_type="full" + ).inc() + try: + with opal_client_policy_update_latency.labels(source="api", update_type="full").time(): + await policy_updater.trigger_update_policy(force_full_update=True) + opal_client_policy_update_trigger_count.labels( + source="api", + status="success", + update_type="full" + ).inc() + return {"status": "ok"} + except Exception as e: + opal_client_policy_update_errors.labels( + error_type="unknown", + source="api" + ).inc() + logger.error(f"Error during policy update: {str(e)}") + raise return router diff --git a/packages/opal-client/opal_client/policy_store/api.py b/packages/opal-client/opal_client/policy_store/api.py index b27d83d70..b411cc863 100644 --- a/packages/opal-client/opal_client/policy_store/api.py +++ b/packages/opal-client/opal_client/policy_store/api.py @@ -7,11 +7,18 @@ from opal_common.authentication.verifier import Unauthorized from opal_common.logger import logger from opal_common.schemas.security import PeerType - +from opal_common.monitoring.prometheus_metrics import ( + opal_client_policy_store_request_count, + opal_client_policy_store_request_latency, + opal_client_policy_store_auth_errors, + opal_client_policy_store_status +) def init_policy_store_router(authenticator: JWTAuthenticator): router = APIRouter() - + opal_client_policy_store_status.labels( + auth_type=opal_client_config.POLICY_STORE_AUTH_TYPE or PolicyStoreAuth.NONE + ).set(1) @router.get( "/policy-store/config", response_model=PolicyStoreDetails, @@ -20,29 +27,54 @@ def init_policy_store_router(authenticator: JWTAuthenticator): deprecated=True, ) async def get_policy_store_details(claims: JWTClaims = Depends(authenticator)): - try: - require_peer_type( - authenticator, claims, PeerType.listener - ) # may throw Unauthorized - except Unauthorized as e: - logger.error(f"Unauthorized to publish update: {repr(e)}") - raise + opal_client_policy_store_request_count.labels( + endpoint="config", + status="started" + ).inc() + with opal_client_policy_store_request_latency.labels( + endpoint="config" + ).time(): + try: + require_peer_type( + authenticator, claims, PeerType.listener + ) # may throw Unauthorized + except Unauthorized as e: + opal_client_policy_store_auth_errors.labels( + error_type="unauthorized", + endpoint="config" + ).inc() + opal_client_policy_store_request_count.labels( + endpoint="config", + status="error" + ).inc() + logger.error(f"Unauthorized to publish update: {repr(e)}") + + raise + + token = None + oauth_client_secret = None + if not opal_client_config.EXCLUDE_POLICY_STORE_SECRETS: + token = opal_client_config.POLICY_STORE_AUTH_TOKEN + oauth_client_secret = ( + opal_client_config.POLICY_STORE_AUTH_OAUTH_CLIENT_SECRET + ) - token = None - oauth_client_secret = None - if not opal_client_config.EXCLUDE_POLICY_STORE_SECRETS: - token = opal_client_config.POLICY_STORE_AUTH_TOKEN - oauth_client_secret = ( - opal_client_config.POLICY_STORE_AUTH_OAUTH_CLIENT_SECRET + auth_type = opal_client_config.POLICY_STORE_AUTH_TYPE or PolicyStoreAuth.NONE + opal_client_policy_store_status.labels( + auth_type=auth_type + ).set(1) + opal_client_policy_store_request_count.labels( + endpoint="config", + status="success" + ).inc() + return PolicyStoreDetails( + url=opal_client_config.POLICY_STORE_URL, + token=token or None, + auth_type=auth_type, + oauth_client_id=opal_client_config.POLICY_STORE_AUTH_OAUTH_CLIENT_ID + or None, + oauth_client_secret=oauth_client_secret or None, + oauth_server=opal_client_config.POLICY_STORE_AUTH_OAUTH_SERVER or None, ) - return PolicyStoreDetails( - url=opal_client_config.POLICY_STORE_URL, - token=token or None, - auth_type=opal_client_config.POLICY_STORE_AUTH_TYPE or PolicyStoreAuth.NONE, - oauth_client_id=opal_client_config.POLICY_STORE_AUTH_OAUTH_CLIENT_ID - or None, - oauth_client_secret=oauth_client_secret or None, - oauth_server=opal_client_config.POLICY_STORE_AUTH_OAUTH_SERVER or None, - ) return router diff --git a/packages/opal-common/opal_common/monitoring/prometheus_metrics.py b/packages/opal-common/opal_common/monitoring/prometheus_metrics.py index 6a4b7b191..4f502d7c9 100644 --- a/packages/opal-common/opal_common/monitoring/prometheus_metrics.py +++ b/packages/opal-common/opal_common/monitoring/prometheus_metrics.py @@ -134,7 +134,7 @@ ["error_type", "token_type"] ) - +# Client metrics active_clients = Gauge( 'opal_active_clients_total', 'Number of currently connected OPAL clients', @@ -145,4 +145,68 @@ 'opal_client_data_subscriptions', 'Number of data topics a client is subscribed to', ['client_id', 'topic'] +) + +# Opal Client metrics +opal_client_data_update_trigger_count = Counter( + 'opal_client_data_update_trigger_count', + 'Number of data update triggers', + ['source', 'status'] +) + +opal_client_data_update_latency = Histogram( + 'opal_client_data_update_latency_seconds', + 'Latency of data update operations', + ['source'], + buckets=[.01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 5.0] +) + +opal_client_data_update_errors = Counter( + 'opal_client_data_update_errors', + 'Number of errors during data updates', + ['error_type', 'source'] +) + +opal_client_policy_update_trigger_count = Counter( + 'opal_client_policy_update_trigger_count', + 'Number of policy update triggers', + ['source', 'status', 'update_type'] +) + +opal_client_policy_update_latency = Histogram( + 'opal_client_policy_update_latency_seconds', + 'Latency of policy update operations', + ['source', 'update_type'], + buckets=[.01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 5.0] +) + +opal_client_policy_update_errors = Counter( + 'opal_client_policy_update_errors', + 'Number of errors during policy updates', + ['error_type', 'source'] +) + +opal_client_policy_store_request_count = Counter( + 'opal_client_policy_store_request_count', + 'Number of requests to policy store endpoints', + ['endpoint', 'status'] +) + +opal_client_policy_store_request_latency = Histogram( + 'opal_client_policy_store_request_latency_seconds', + 'Latency of policy store requests', + ['endpoint'], + buckets=[.01, .025, .05, .075, .1, .25, .5, .75, 1.0] +) + +opal_client_policy_store_auth_errors = Counter( + 'opal_client_policy_store_auth_errors', + 'Number of authentication/authorization errors for policy store', + ['error_type', 'endpoint'] +) + +opal_client_policy_store_status = Gauge( + 'opal_client_policy_store_status', + 'Current status of policy store connection', + ['auth_type'] ) \ No newline at end of file From 1cb9ccfde0c29fe7bb88e7c271f4403a7c0bb5f2 Mon Sep 17 00:00:00 2001 From: Piyush Sardana Date: Mon, 18 Nov 2024 11:42:25 +0100 Subject: [PATCH 15/29] added docker compose example with prometheus --- ...docker-compose-with-prometheus-metrics.yml | 67 +++++++++++++++++++ docker/prometheus/prometheus.yml | 14 ++++ 2 files changed, 81 insertions(+) create mode 100644 docker/prometheus/docker-compose-with-prometheus-metrics.yml create mode 100644 docker/prometheus/prometheus.yml diff --git a/docker/prometheus/docker-compose-with-prometheus-metrics.yml b/docker/prometheus/docker-compose-with-prometheus-metrics.yml new file mode 100644 index 000000000..0796b214f --- /dev/null +++ b/docker/prometheus/docker-compose-with-prometheus-metrics.yml @@ -0,0 +1,67 @@ +services: + broadcast_channel: + image: postgres:alpine + environment: + - POSTGRES_DB=postgres + - POSTGRES_USER=postgres + - POSTGRES_PASSWORD=postgres + volumes: + - postgres_data:/var/lib/postgresql/data + + prometheus: + image: prom/prometheus:v2.45.0 + volumes: + - ./prometheus.yml:/etc/prometheus/prometheus.yml + - prometheus_data:/prometheus + ports: + - "9090:9090" + command: + - '--config.file=/etc/prometheus/prometheus.yml' + - '--storage.tsdb.path=/prometheus' + - '--web.console.libraries=/etc/prometheus/console_libraries' + - '--web.console.templates=/etc/prometheus/consoles' + - '--web.enable-lifecycle' + + grafana: + image: grafana/grafana:9.5.3 + ports: + - "3000:3000" + volumes: + - grafana_data:/var/lib/grafana + environment: + - GF_SECURITY_ADMIN_PASSWORD=admin + - GF_USERS_ALLOW_SIGN_UP=false + depends_on: + - prometheus + + opal_server: + image: permitio/opal-server:latest + environment: + - OPAL_BROADCAST_URI=postgres://postgres:postgres@broadcast_channel:5432/postgres + - UVICORN_NUM_WORKERS=4 + - OPAL_POLICY_REPO_URL=https://github.com/permitio/opal-example-policy-repo + - OPAL_POLICY_REPO_POLLING_INTERVAL=30 + - OPAL_DATA_CONFIG_SOURCES={"config":{"entries":[{"url":"http://opal_server:7002/policy-data","topics":["policy_data"],"dst_path":"/static"}]}} + - OPAL_LOG_FORMAT_INCLUDE_PID=true + ports: + - "7002:7002" + depends_on: + - broadcast_channel + + opal_client: + image: permitio/opal-client:latest + environment: + - OPAL_SERVER_URL=http://opal_server:7002 + - OPAL_LOG_FORMAT_INCLUDE_PID=true + - OPAL_INLINE_OPA_LOG_FORMAT=http + ports: + - "7766:7000" + - "8181:8181" + depends_on: + - opal_server + command: sh -c "exec ./wait-for.sh opal_server:7002 --timeout=20 -- ./start.sh" + +volumes: + postgres_data: + prometheus_data: + grafana_data: diff --git a/docker/prometheus/prometheus.yml b/docker/prometheus/prometheus.yml new file mode 100644 index 000000000..a781e3659 --- /dev/null +++ b/docker/prometheus/prometheus.yml @@ -0,0 +1,14 @@ +global: + scrape_interval: 15s + evaluation_interval: 15s + +scrape_configs: + - job_name: 'opal_server' + static_configs: + - targets: ['opal_server:7002'] + metrics_path: '/metrics' + + - job_name: 'opal_client' + static_configs: + - targets: ['opal_client:7000'] + metrics_path: '/metrics' From e1fec7e02a6e8f484178e3baf13bd5fbf4aaba29 Mon Sep 17 00:00:00 2001 From: Piyush Sardana Date: Mon, 18 Nov 2024 11:49:40 +0100 Subject: [PATCH 16/29] fixed metric labels --- .../opal-common/opal_common/monitoring/prometheus_metrics.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/opal-common/opal_common/monitoring/prometheus_metrics.py b/packages/opal-common/opal_common/monitoring/prometheus_metrics.py index 4f502d7c9..8e6b8c3f7 100644 --- a/packages/opal-common/opal_common/monitoring/prometheus_metrics.py +++ b/packages/opal-common/opal_common/monitoring/prometheus_metrics.py @@ -4,13 +4,13 @@ opal_server_data_update_total = Counter( 'opal_server_data_update_total', 'Total number of data update events published to opal server', - ["status"] + ["status", "type"] ) opal_server_data_update_errors = Counter( 'opal_server_data_update_errors', 'Total number of errors in opal server data update publishing', - ["error_type"] + ["error_type", "endpoint"] ) opal_server_data_update_latency = Histogram( From 78ef7770b31f8dcdc9974f7cdfaf44fd2a1b1b30 Mon Sep 17 00:00:00 2001 From: Piyush Sardana Date: Mon, 18 Nov 2024 14:20:20 +0100 Subject: [PATCH 17/29] added documentation --- ...docker-compose-with-prometheus-metrics.yml | 0 .../{ => prometheus}/prometheus.yml | 0 .../docs/tutorials/monitoring_opal.mdx | 33 +++++++++++++++++++ 3 files changed, 33 insertions(+) rename docker/prometheus/{ => prometheus}/docker-compose-with-prometheus-metrics.yml (100%) rename docker/prometheus/{ => prometheus}/prometheus.yml (100%) diff --git a/docker/prometheus/docker-compose-with-prometheus-metrics.yml b/docker/prometheus/prometheus/docker-compose-with-prometheus-metrics.yml similarity index 100% rename from docker/prometheus/docker-compose-with-prometheus-metrics.yml rename to docker/prometheus/prometheus/docker-compose-with-prometheus-metrics.yml diff --git a/docker/prometheus/prometheus.yml b/docker/prometheus/prometheus/prometheus.yml similarity index 100% rename from docker/prometheus/prometheus.yml rename to docker/prometheus/prometheus/prometheus.yml diff --git a/documentation/docs/tutorials/monitoring_opal.mdx b/documentation/docs/tutorials/monitoring_opal.mdx index b7c9c4088..c9f99d268 100644 --- a/documentation/docs/tutorials/monitoring_opal.mdx +++ b/documentation/docs/tutorials/monitoring_opal.mdx @@ -11,6 +11,7 @@ There are multiple ways you can monitor your OPAL deployment: - **Health-checks** - OPAL exposes HTTP health check endpoints ([See below](##health-checks)) - [**Callbacks**](/tutorials/healthcheck_policy_and_update_callbacks#-data-update-callbacks) - Using the callback webhooks feature - having OPAL-clients report their updates - **Statistics** - Using the built-in statistics feature in OPAL ([See below](##opal-statistics)) +- **Prometheus Metrics** - OPAL can expose metrics in Prometheus format for monitoring ([See below](#prometheus-metrics)). ## Health checks @@ -52,3 +53,35 @@ Available through `/pubsub_client_info` api route on the server. ### Caveats: - When `UVICORN_NUM_WORKERS > 1`, retrieved information would only include clients connected to the replying server process. - This is an early access feature and is likely to change. Backward compatibility is not garaunteed. + +## Prometheus Metrics + +OPAL can expose metrics in Prometheus format, which can be scraped by Prometheus for monitoring and alerting. + +### Enabling Prometheus Metrics + +Prometheus metrics are enabled by default on both OPAL Server and OPAL Client. + +Both the server and client expose a `/metrics` endpoint that returns metrics in Prometheus format. + +### Accessing Metrics + +- **OPAL Server Metrics Endpoint**: `/metrics` +- **OPAL Client Metrics Endpoint**: `/metrics` + +### Example + +To monitor OPAL using Prometheus and Grafana, a ready-to-use Docker Compose configuration is provided in the root directory of the repository under docker/prometheus. The file is named docker-compose-with-prometheus-metrics.yml. + +Navigate to the directory containing the Docker Compose file: + +``` +cd docker/prometheus +``` +Run the following command to start Prometheus and Grafana: + +``` +docker compose -f docker-compose-with-prometheus-metrics.yml up +``` + +This setup will start Prometheus to scrape metrics from OPAL server and client, and Grafana to visualize the metrics. \ No newline at end of file From 5bde81927b34cdd8ed32376bae68e478b9a010af Mon Sep 17 00:00:00 2001 From: Piyush Sardana Date: Wed, 20 Nov 2024 12:16:53 +0100 Subject: [PATCH 18/29] added open telemetry traces and metrics --- ...docker-compose-with-prometheus-metrics.yml | 30 +- .../docker_files/otel-collector-config.yaml | 25 ++ .../prometheus.yml | 0 .../docs/tutorials/monitoring_opal.mdx | 35 +- packages/opal-client/opal_client/data/api.py | 56 +-- .../opal-client/opal_client/policy/api.py | 43 +- .../opal_client/policy_store/api.py | 57 ++- packages/opal-client/requires.txt | 8 +- packages/opal-common/opal_common/config.py | 16 + .../monitoring/prometheus_metrics.py | 212 ---------- .../opal-common/opal_common/schemas/policy.py | 14 + packages/opal-server/opal_server/data/api.py | 17 - .../opal_server/data/data_update_publisher.py | 95 ++--- .../opal_server/policy/bundles/api.py | 101 +++-- .../opal_server/policy/watcher/callbacks.py | 5 - .../opal_server/policy/watcher/task.py | 30 +- packages/opal-server/opal_server/pubsub.py | 61 ++- .../opal-server/opal_server/scopes/api.py | 380 +++++++----------- .../opal-server/opal_server/security/api.py | 82 ++-- packages/opal-server/opal_server/server.py | 80 +++- packages/opal-server/requires.txt | 8 +- packages/requires.txt | 7 +- requirements.txt | 7 +- 23 files changed, 637 insertions(+), 732 deletions(-) rename docker/{prometheus/prometheus => }/docker-compose-with-prometheus-metrics.yml (70%) create mode 100644 docker/docker_files/otel-collector-config.yaml rename docker/{prometheus/prometheus => docker_files}/prometheus.yml (100%) delete mode 100644 packages/opal-common/opal_common/monitoring/prometheus_metrics.py diff --git a/docker/prometheus/prometheus/docker-compose-with-prometheus-metrics.yml b/docker/docker-compose-with-prometheus-metrics.yml similarity index 70% rename from docker/prometheus/prometheus/docker-compose-with-prometheus-metrics.yml rename to docker/docker-compose-with-prometheus-metrics.yml index 0796b214f..58ef66194 100644 --- a/docker/prometheus/prometheus/docker-compose-with-prometheus-metrics.yml +++ b/docker/docker-compose-with-prometheus-metrics.yml @@ -7,11 +7,21 @@ services: - POSTGRES_PASSWORD=postgres volumes: - postgres_data:/var/lib/postgresql/data + otel-collector: + image: otel/opentelemetry-collector-contrib:latest + volumes: + - ./docker_files/otel-collector-config.yaml:/etc/otelcol/config.yaml + command: ["--config", "/etc/otelcol/config.yaml"] + ports: + - "4317:4317" + - "8888:8888" + networks: + - opal-network prometheus: image: prom/prometheus:v2.45.0 volumes: - - ./prometheus.yml:/etc/prometheus/prometheus.yml + - ./docker_files/prometheus.yml:/etc/prometheus/prometheus.yml - prometheus_data:/prometheus ports: - "9090:9090" @@ -21,6 +31,10 @@ services: - '--web.console.libraries=/etc/prometheus/console_libraries' - '--web.console.templates=/etc/prometheus/consoles' - '--web.enable-lifecycle' + networks: + - opal-network + depends_on: + - otel-collector grafana: image: grafana/grafana:9.5.3 @@ -33,6 +47,8 @@ services: - GF_USERS_ALLOW_SIGN_UP=false depends_on: - prometheus + networks: + - opal-network opal_server: image: permitio/opal-server:latest @@ -43,10 +59,16 @@ services: - OPAL_POLICY_REPO_POLLING_INTERVAL=30 - OPAL_DATA_CONFIG_SOURCES={"config":{"entries":[{"url":"http://opal_server:7002/policy-data","topics":["policy_data"],"dst_path":"/static"}]}} - OPAL_LOG_FORMAT_INCLUDE_PID=true + - OPAL_ENABLE_OPENTELEMETRY_TRACING=true + - OPAL_ENABLE_OPENTELEMETRY_METRICS=true + - OPAL_OPENTELEMETRY_OTLP_ENDPOINT="otel-collector:4317" ports: - "7002:7002" depends_on: - broadcast_channel + - otel-collector + networks: + - opal-network opal_client: image: permitio/opal-client:latest @@ -59,8 +81,14 @@ services: - "8181:8181" depends_on: - opal_server + - otel-collector command: sh -c "exec ./wait-for.sh opal_server:7002 --timeout=20 -- ./start.sh" + networks: + - opal-network +networks: + opal-network: + driver: bridge volumes: postgres_data: prometheus_data: diff --git a/docker/docker_files/otel-collector-config.yaml b/docker/docker_files/otel-collector-config.yaml new file mode 100644 index 000000000..282cfed27 --- /dev/null +++ b/docker/docker_files/otel-collector-config.yaml @@ -0,0 +1,25 @@ +receivers: + otlp: + protocols: + grpc: + endpoint: 0.0.0.0:4317 + +exporters: + prometheus: + endpoint: "0.0.0.0:8888" + logging: + logLevel: debug + +processors: + batch: + +service: + pipelines: + traces: + receivers: [otlp] + processors: [batch] + exporters: [logging] + metrics: + receivers: [otlp] + processors: [batch] + exporters: [prometheus] diff --git a/docker/prometheus/prometheus/prometheus.yml b/docker/docker_files/prometheus.yml similarity index 100% rename from docker/prometheus/prometheus/prometheus.yml rename to docker/docker_files/prometheus.yml diff --git a/documentation/docs/tutorials/monitoring_opal.mdx b/documentation/docs/tutorials/monitoring_opal.mdx index c9f99d268..6c4a8ec42 100644 --- a/documentation/docs/tutorials/monitoring_opal.mdx +++ b/documentation/docs/tutorials/monitoring_opal.mdx @@ -11,7 +11,7 @@ There are multiple ways you can monitor your OPAL deployment: - **Health-checks** - OPAL exposes HTTP health check endpoints ([See below](##health-checks)) - [**Callbacks**](/tutorials/healthcheck_policy_and_update_callbacks#-data-update-callbacks) - Using the callback webhooks feature - having OPAL-clients report their updates - **Statistics** - Using the built-in statistics feature in OPAL ([See below](##opal-statistics)) -- **Prometheus Metrics** - OPAL can expose metrics in Prometheus format for monitoring ([See below](#prometheus-metrics)). +- **OpenTelemetry Metrics and Tracing** - OPAL can expose metrics and tracing information using OpenTelemetry for monitoring ([See below](#opentelemetry-metrics-and-tracing)). ## Health checks @@ -54,34 +54,37 @@ Available through `/pubsub_client_info` api route on the server. - When `UVICORN_NUM_WORKERS > 1`, retrieved information would only include clients connected to the replying server process. - This is an early access feature and is likely to change. Backward compatibility is not garaunteed. -## Prometheus Metrics +## OpenTelemetry Metrics and Tracing -OPAL can expose metrics in Prometheus format, which can be scraped by Prometheus for monitoring and alerting. +OPAL supports exporting metrics and tracing information using OpenTelemetry, which can be integrated with various monitoring and observability tools. -### Enabling Prometheus Metrics +### Enabling OpenTelemetry Metrics and Tracing -Prometheus metrics are enabled by default on both OPAL Server and OPAL Client. +To enable OpenTelemetry metrics and tracing, you need to set the following environment variables in both OPAL server and OPAL client: -Both the server and client expose a `/metrics` endpoint that returns metrics in Prometheus format. +``` +OPAL_ENABLE_OPENTELEMETRY_TRACING=true +OPAL_ENABLE_OPENTELEMETRY_METRICS=true +OPAL_OPENTELEMETRY_OTLP_ENDPOINT= +``` -### Accessing Metrics +- OPAL_ENABLE_OPENTELEMETRY_TRACING: Set to `true` to enable tracing. +- OPAL_ENABLE_OPENTELEMETRY_METRICS: Set to `true` to enable metrics. +- OPAL_OPENTELEMETRY_OTLP_ENDPOINT: Set the endpoint for the OpenTelemetry Collector -- **OPAL Server Metrics Endpoint**: `/metrics` -- **OPAL Client Metrics Endpoint**: `/metrics` +### Exposing Metrics and Traces -### Example +- Both the server and client will expose a `/metrics` endpoint that returns metrics in Prometheus format. +- Traces are exported to the configured OpenTelemetry Collector endpoint using OTLP over gRPC. -To monitor OPAL using Prometheus and Grafana, a ready-to-use Docker Compose configuration is provided in the root directory of the repository under docker/prometheus. The file is named docker-compose-with-prometheus-metrics.yml. +### Example -Navigate to the directory containing the Docker Compose file: +To monitor OPAL using Prometheus and Grafana, a ready-to-use Docker Compose configuration is provided in the root directory of the repository under docker. The file is named docker-compose-with-prometheus-metrics.yml. -``` -cd docker/prometheus -``` Run the following command to start Prometheus and Grafana: ``` -docker compose -f docker-compose-with-prometheus-metrics.yml up +docker compose -f docker/docker-compose-with-prometheus-metrics.yml up ``` This setup will start Prometheus to scrape metrics from OPAL server and client, and Grafana to visualize the metrics. \ No newline at end of file diff --git a/packages/opal-client/opal_client/data/api.py b/packages/opal-client/opal_client/data/api.py index 3ce785e60..c39ad639f 100644 --- a/packages/opal-client/opal_client/data/api.py +++ b/packages/opal-client/opal_client/data/api.py @@ -3,11 +3,13 @@ from fastapi import APIRouter, HTTPException, status from opal_client.data.updater import DataUpdater from opal_common.logger import logger -from opal_common.monitoring.prometheus_metrics import ( - opal_client_data_update_trigger_count, - opal_client_data_update_latency, - opal_client_data_update_errors -) +from opal_common.config import opal_common_config +from opentelemetry import trace + +if opal_common_config.ENABLE_OPENTELEMETRY_TRACING: + tracer = trace.get_tracer(__name__) +else: + tracer = None def init_data_router(data_updater: Optional[DataUpdater]): router = APIRouter() @@ -15,40 +17,40 @@ def init_data_router(data_updater: Optional[DataUpdater]): @router.post("/data-updater/trigger", status_code=status.HTTP_200_OK) async def trigger_policy_data_update(): logger.info("triggered policy data update from api") - opal_client_data_update_trigger_count.labels( - source="api", - status="started" - ).inc() + if tracer: + with tracer.start_as_current_span("opal_client_data_update_trigger") as span: + return await _handle_policy_data_update(span) + else: + return await _handle_policy_data_update() + + async def _handle_policy_data_update(span=None): try: if data_updater: - with opal_client_data_update_latency.labels(source="api").time(): + if tracer and span: + with tracer.start_as_current_span("opal_client_data_update_apply"): + await data_updater.get_base_policy_data( + data_fetch_reason="request from sdk" + ) + else: await data_updater.get_base_policy_data( data_fetch_reason="request from sdk" ) - opal_client_data_update_trigger_count.labels( - source="api", - status="success" - ).inc() - return {"status": "ok"} + return {"status": "ok"} else: - opal_client_data_update_errors.labels( - error_type="updater_disabled", - source="api" - ).inc() - opal_client_data_update_trigger_count.labels( - source="api", - status="error" - ).inc() + if span: + span.set_status(trace.StatusCode.ERROR) + span.set_attribute("error", True) + span.set_attribute("error_type", "updater_disabled") raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="Data Updater is currently disabled. Dynamic data updates are not available.", ) except Exception as e: - opal_client_data_update_errors.labels( - error_type="unknown", - source="api" - ).inc() logger.error(f"Error during data update: {str(e)}") + if span: + span.set_status(trace.StatusCode.ERROR) + span.set_attribute("error", True) + span.record_exception(e) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to update data" diff --git a/packages/opal-client/opal_client/policy/api.py b/packages/opal-client/opal_client/policy/api.py index e5f70e620..5a39dfbf7 100644 --- a/packages/opal-client/opal_client/policy/api.py +++ b/packages/opal-client/opal_client/policy/api.py @@ -1,11 +1,14 @@ from fastapi import APIRouter, status from opal_client.policy.updater import PolicyUpdater from opal_common.logger import logger -from opal_common.monitoring.prometheus_metrics import ( - opal_client_policy_update_trigger_count, - opal_client_policy_update_latency, - opal_client_policy_update_errors -) +from opal_common.config import opal_common_config + +from opentelemetry import trace + +if opal_common_config.ENABLE_OPENTELEMETRY_TRACING: + tracer = trace.get_tracer(__name__) +else: + tracer = None def init_policy_router(policy_updater: PolicyUpdater): router = APIRouter() @@ -13,26 +16,22 @@ def init_policy_router(policy_updater: PolicyUpdater): @router.post("/policy-updater/trigger", status_code=status.HTTP_200_OK) async def trigger_policy_update(): logger.info("triggered policy update from api") - opal_client_policy_update_trigger_count.labels( - source="api", - status="started", - update_type="full" - ).inc() + if tracer: + with tracer.start_as_current_span("opal_client_policy_update_apply") as span: + return await _handle_policy_update(span) + else: + return await _handle_policy_update() + + async def _handle_policy_update(span=None): try: - with opal_client_policy_update_latency.labels(source="api", update_type="full").time(): - await policy_updater.trigger_update_policy(force_full_update=True) - opal_client_policy_update_trigger_count.labels( - source="api", - status="success", - update_type="full" - ).inc() - return {"status": "ok"} + await policy_updater.trigger_update_policy(force_full_update=True) + return {"status": "ok"} except Exception as e: - opal_client_policy_update_errors.labels( - error_type="unknown", - source="api" - ).inc() logger.error(f"Error during policy update: {str(e)}") + if span: + span.set_status(trace.StatusCode.ERROR) + span.set_attribute("error", True) + span.record_exception(e) raise return router diff --git a/packages/opal-client/opal_client/policy_store/api.py b/packages/opal-client/opal_client/policy_store/api.py index b411cc863..d772c905a 100644 --- a/packages/opal-client/opal_client/policy_store/api.py +++ b/packages/opal-client/opal_client/policy_store/api.py @@ -7,18 +7,33 @@ from opal_common.authentication.verifier import Unauthorized from opal_common.logger import logger from opal_common.schemas.security import PeerType -from opal_common.monitoring.prometheus_metrics import ( - opal_client_policy_store_request_count, - opal_client_policy_store_request_latency, - opal_client_policy_store_auth_errors, - opal_client_policy_store_status -) +from opal_common.config import opal_common_config + +from opentelemetry import metrics + +if opal_common_config.ENABLE_OPENTELEMETRY_METRICS: + meter = metrics.get_meter(__name__) + + def policy_store_status_callback(observable_gauge): + auth_type = opal_client_config.POLICY_STORE_AUTH_TYPE or PolicyStoreAuth.NONE + observable_gauge.observe( + 1, + attributes={"auth_type": str(auth_type)}, + ) + + policy_store_status_metric = meter.create_observable_gauge( + name="opal_client_policy_store_status", + description="Current status of the policy store authentication type", + unit="1", + callbacks=[policy_store_status_callback], + ) +else: + meter = None + policy_store_status_metric = None def init_policy_store_router(authenticator: JWTAuthenticator): router = APIRouter() - opal_client_policy_store_status.labels( - auth_type=opal_client_config.POLICY_STORE_AUTH_TYPE or PolicyStoreAuth.NONE - ).set(1) + @router.get( "/policy-store/config", response_model=PolicyStoreDetails, @@ -27,28 +42,12 @@ def init_policy_store_router(authenticator: JWTAuthenticator): deprecated=True, ) async def get_policy_store_details(claims: JWTClaims = Depends(authenticator)): - opal_client_policy_store_request_count.labels( - endpoint="config", - status="started" - ).inc() - with opal_client_policy_store_request_latency.labels( - endpoint="config" - ).time(): try: require_peer_type( authenticator, claims, PeerType.listener ) # may throw Unauthorized except Unauthorized as e: - opal_client_policy_store_auth_errors.labels( - error_type="unauthorized", - endpoint="config" - ).inc() - opal_client_policy_store_request_count.labels( - endpoint="config", - status="error" - ).inc() logger.error(f"Unauthorized to publish update: {repr(e)}") - raise token = None @@ -60,13 +59,7 @@ async def get_policy_store_details(claims: JWTClaims = Depends(authenticator)): ) auth_type = opal_client_config.POLICY_STORE_AUTH_TYPE or PolicyStoreAuth.NONE - opal_client_policy_store_status.labels( - auth_type=auth_type - ).set(1) - opal_client_policy_store_request_count.labels( - endpoint="config", - status="success" - ).inc() + return PolicyStoreDetails( url=opal_client_config.POLICY_STORE_URL, token=token or None, diff --git a/packages/opal-client/requires.txt b/packages/opal-client/requires.txt index 5b57d2593..84e70fbd3 100644 --- a/packages/opal-client/requires.txt +++ b/packages/opal-client/requires.txt @@ -4,4 +4,10 @@ psutil>=5.9.1,<6 tenacity>=8.0.1,<9 dpath>=2.1.5,<3 jsonpatch>=1.33,<2 -prometheus_client \ No newline at end of file +prometheus_client +opentelemetry-api>=1.28.2 +opentelemetry-sdk>=1.28.2 +opentelemetry-instrumentation +opentelemetry-instrumentation-fastapi +opentelemetry-exporter-otlp +opentelemetry-exporter-prometheus \ No newline at end of file diff --git a/packages/opal-common/opal_common/config.py b/packages/opal-common/opal_common/config.py index ab18dd0cb..088932c2f 100644 --- a/packages/opal-common/opal_common/config.py +++ b/packages/opal-common/opal_common/config.py @@ -162,6 +162,22 @@ class OpalCommonConfig(Confi): ENABLE_METRICS = confi.bool("ENABLE_METRICS", False) + ENABLE_OPENTELEMETRY_TRACING = confi.bool ( + "ENABLE_OPENTELEMETRY_TRACING", + False, + description="Set if OPAL server should enable tracing with OpenTelemetry", + ) + ENABLE_OPENTELEMETRY_METRICS = confi.bool ( + "ENABLE_OPENTELEMETRY_METRICS", + False, + description="Set if OPAL server should enable metrics with OpenTelemetry", + ) + OPENTELEMETRY_OTLP_ENDPOINT = confi.str( + "OPENTELEMETRY_OTLP_ENDPOINT", + "http://localhost:4317", + description="The OpenTelemetry OTLP endpoint to send traces to", + ) + # optional APM tracing with datadog ENABLE_DATADOG_APM = confi.bool( "ENABLE_DATADOG_APM", diff --git a/packages/opal-common/opal_common/monitoring/prometheus_metrics.py b/packages/opal-common/opal_common/monitoring/prometheus_metrics.py deleted file mode 100644 index 8e6b8c3f7..000000000 --- a/packages/opal-common/opal_common/monitoring/prometheus_metrics.py +++ /dev/null @@ -1,212 +0,0 @@ -from prometheus_client import Counter, Gauge, Histogram - -# Opal Server Data update metrics -opal_server_data_update_total = Counter( - 'opal_server_data_update_total', - 'Total number of data update events published to opal server', - ["status", "type"] -) - -opal_server_data_update_errors = Counter( - 'opal_server_data_update_errors', - 'Total number of errors in opal server data update publishing', - ["error_type", "endpoint"] -) - -opal_server_data_update_latency = Histogram( - 'opal_opal_server_data_update_latency_seconds', - 'Latency of data update publishing to opal server in seconds', - buckets=[.001, .005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 5.0] -) - -opal_server_data_update_count_per_topic = Counter( - 'opal_server_data_update_count_per_topic', - 'Count of data updates published per topic to opal server', - ['topic'] -) - -# Opal Server Policy update metrics -opal_server_policy_update_count = Counter( - 'pal_server_policy_update_count', - 'Total number of policy updates triggered to opal server', - ["source", "status"] -) - -opal_server_policy_update_latency = Histogram( - 'opal_server_policy_update_latency_seconds', - 'Latency of policy bundle generation in seconds', - ["source", "status"], - buckets=[.01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 5.0] -) - -opal_server_policy_bundle_request_count = Counter( - 'opal_server_policy_bundle_request_count', - 'Total number of policy bundle requests', - ["type"] -) - -opal_server_policy_bundle_latency = Histogram( - 'opal_server_policy_bundle_latency_seconds', - 'Latency of serving policy bundles in seconds', - ["type"], - buckets=[.01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 5.0] -) - -opal_server_policy_update_size = Histogram( - 'opal_server_policy_update_size', - 'Size of policy updates (in number of files)', - ["type"], - buckets=[1, 10, 50, 100, 500, 1000] -) - -# Scope metrics -opal_server_scope_request_count = Counter( - 'opal_server_scope_request_count', - 'Total number of requests to scope endpoints', - ["endpoint", "method", "status"] -) - -opal_server_scope_request_latency = Histogram( - 'opal_server_scope_request_latency', - 'Latency of scope requests in seconds', - ["endpoint", "method", "status"], - buckets=[.01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 5.0] -) - -opal_server_scope_data_update_count = Counter( - 'opal_server_scope_data_update_count', - 'Total number of data updates published per scope', - labelnames=['scope_id'] -) - -opal_server_scope_data_update_errors = Counter( - 'opal_server_scope_data_update_errors', - 'Total number of errors during data update publication per scope', - labelnames=['scope_id'] -) - -opal_server_scope_data_update_latency = Histogram( - 'opal_server_scope_data_update_latency_seconds', - 'Latency of data update publishing in seconds per scope', - labelnames=['scope_id'] -) - -opal_server_scope_policy_sync_count = Counter( - 'opal_server_scope_policy_sync_count', - 'Total number of policy syncs per scope', - labelnames=['scope_id'] -) - -opal_server_scope_policy_sync_latency = Histogram( - 'opal_server_scope_policy_sync_latency_seconds', - 'Latency of policy sync in seconds per scope', - labelnames=['scope_id'] -) - -opal_server_scope_error_count = Counter( - 'opal_server_scope_error_count', - 'Total count of errors encountered per scope operation', - ["scope_id", "error_type", "endpoint"] -) - -opal_server_scope_operation_count = Counter( - "opal_server_scope_operation_count", - "Number of scope operations (create/update/delete)", - ["operation", "status"] -) - -# Generic metrics -token_request_count = Counter( - "opal_token_request_count", - "Total number of token requests", - ["token_type", "status"] -) - -token_generated_count = Counter( - "opal_token_generated_count", - "Number of successfully generated tokens", - ["peer_type", "ttl"] -) - -token_generation_errors = Counter( - "opal_token_generation_errors", - "Total number of errors during token generation", - ["error_type", "token_type"] -) - -# Client metrics -active_clients = Gauge( - 'opal_active_clients_total', - 'Number of currently connected OPAL clients', - ['client_id', 'source'] -) - -client_data_subscriptions = Gauge( - 'opal_client_data_subscriptions', - 'Number of data topics a client is subscribed to', - ['client_id', 'topic'] -) - -# Opal Client metrics -opal_client_data_update_trigger_count = Counter( - 'opal_client_data_update_trigger_count', - 'Number of data update triggers', - ['source', 'status'] -) - -opal_client_data_update_latency = Histogram( - 'opal_client_data_update_latency_seconds', - 'Latency of data update operations', - ['source'], - buckets=[.01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 5.0] -) - -opal_client_data_update_errors = Counter( - 'opal_client_data_update_errors', - 'Number of errors during data updates', - ['error_type', 'source'] -) - -opal_client_policy_update_trigger_count = Counter( - 'opal_client_policy_update_trigger_count', - 'Number of policy update triggers', - ['source', 'status', 'update_type'] -) - -opal_client_policy_update_latency = Histogram( - 'opal_client_policy_update_latency_seconds', - 'Latency of policy update operations', - ['source', 'update_type'], - buckets=[.01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 5.0] -) - -opal_client_policy_update_errors = Counter( - 'opal_client_policy_update_errors', - 'Number of errors during policy updates', - ['error_type', 'source'] -) - -opal_client_policy_store_request_count = Counter( - 'opal_client_policy_store_request_count', - 'Number of requests to policy store endpoints', - ['endpoint', 'status'] -) - -opal_client_policy_store_request_latency = Histogram( - 'opal_client_policy_store_request_latency_seconds', - 'Latency of policy store requests', - ['endpoint'], - buckets=[.01, .025, .05, .075, .1, .25, .5, .75, 1.0] -) - -opal_client_policy_store_auth_errors = Counter( - 'opal_client_policy_store_auth_errors', - 'Number of authentication/authorization errors for policy store', - ['error_type', 'endpoint'] -) - -opal_client_policy_store_status = Gauge( - 'opal_client_policy_store_status', - 'Current status of policy store connection', - ['auth_type'] -) \ No newline at end of file diff --git a/packages/opal-common/opal_common/schemas/policy.py b/packages/opal-common/opal_common/schemas/policy.py index 7599655c0..f76307d6a 100644 --- a/packages/opal-common/opal_common/schemas/policy.py +++ b/packages/opal-common/opal_common/schemas/policy.py @@ -40,6 +40,20 @@ class PolicyBundle(BaseSchema): policy_modules: List[RegoModule] deleted_files: Optional[DeletedFiles] + def calculate_size(self) -> int: + """Calculates the size of the policy bundle.""" + size = 0 + if self.data_modules: + size += len(self.data_modules) + if self.policy_modules: + size += len(self.policy_modules) + if self.deleted_files: + if self.deleted_files.data_modules: + size += len(self.deleted_files.data_modules) + if self.deleted_files.policy_modules: + size += len(self.deleted_files.policy_modules) + return size + class PolicyUpdateMessage(BaseSchema): old_policy_hash: str diff --git a/packages/opal-server/opal_server/data/api.py b/packages/opal-server/opal_server/data/api.py index 007b06314..3ef9d5732 100644 --- a/packages/opal-server/opal_server/data/api.py +++ b/packages/opal-server/opal_server/data/api.py @@ -20,10 +20,6 @@ from opal_common.urls import set_url_query_param from opal_server.config import opal_server_config from opal_server.data.data_update_publisher import DataUpdatePublisher -from opal_common.monitoring.prometheus_metrics import ( - opal_server_data_update_total, - opal_server_data_update_errors -) def init_data_updates_router( @@ -129,25 +125,12 @@ async def publish_data_update_event( authenticator, claims, update ) # may throw Unauthorized - opal_server_data_update_total.labels( - status="success", - type="update" - ).inc() - await data_update_publisher.publish_data_updates(update) return {"status": "ok"} except Unauthorized as e: - opal_server_data_update_errors.labels( - error_type="unauthorized", - endpoint="update" - ).inc() logger.error(f"Unauthorized to publish update: {repr(e)}") raise except Exception as e: - opal_server_data_update_errors.labels( - error_type="unknown", - endpoint="update" - ).inc() logger.error(f"Failed to publish update: {repr(e)}") raise diff --git a/packages/opal-server/opal_server/data/data_update_publisher.py b/packages/opal-server/opal_server/data/data_update_publisher.py index 6eb1d43b9..596c4107d 100644 --- a/packages/opal-server/opal_server/data/data_update_publisher.py +++ b/packages/opal-server/opal_server/data/data_update_publisher.py @@ -9,12 +9,11 @@ DataUpdate, ServerDataSourceConfig, ) +from opal_common.config import opal_common_config from opal_common.topics.publisher import TopicPublisher -from opal_common.monitoring.prometheus_metrics import ( - opal_server_data_update_latency, - opal_server_data_update_count_per_topic -) +from opentelemetry import trace +tracer = trace.get_tracer(__name__) TOPIC_DELIMITER = "/" PREFIX_DELIMITER = ":" @@ -75,49 +74,51 @@ async def publish_data_updates(self, update: DataUpdate): topics (List[str]): topics (with hierarchy) to notify subscribers of update (DataUpdate): update data-source configuration for subscribers to fetch data from """ + if opal_common_config.ENABLE_OPENTELEMETRY_TRACING: + with tracer.start_as_current_span("opal_server_data_update") as span: + await self._publish_data_updates(update) + + all_topic_combos = set() + + span.set_attribute("topics_count", len(all_topic_combos)) + span.set_attribute("entries_count", len(update.entries)) + # a nicer format of entries to the log + logged_entries = [ + dict( + url=entry.url, + method=entry.save_method, + path=entry.dst_path or "/", + inline_data=(entry.data is not None), + topics=entry.topics, + ) + for entry in update.entries + ] + + # Expand the topics for each event to include sub topic combos (e.g. publish 'a/b/c' as 'a' , 'a/b', and 'a/b/c') + for entry in update.entries: + topic_combos = [] + if entry.topics: + for topic in entry.topics: + topic_combos.extend(DataUpdatePublisher.get_topic_combos(topic)) + entry.topics = topic_combos # Update entry with the exhaustive list, so client won't have to expand it again + all_topic_combos.update(topic_combos) - with opal_server_data_update_latency.time(): - all_topic_combos = set() - - # a nicer format of entries to the log - logged_entries = [ - dict( - url=entry.url, - method=entry.save_method, - path=entry.dst_path or "/", - inline_data=(entry.data is not None), - topics=entry.topics, + else: + logger.warning( + "[{pid}] No topics were provided for the following entry: {entry}", + pid=os.getpid(), + entry=entry, + ) + + # publish all topics with all their sub combinations + logger.info( + "[{pid}] Publishing data update to topics: {topics}, reason: {reason}, entries: {entries}", + pid=os.getpid(), + topics=all_topic_combos, + reason=update.reason, + entries=logged_entries, ) - for entry in update.entries - ] - - # Expand the topics for each event to include sub topic combos (e.g. publish 'a/b/c' as 'a' , 'a/b', and 'a/b/c') - for entry in update.entries: - topic_combos = [] - if entry.topics: - for topic in entry.topics: - topic_combos.extend(DataUpdatePublisher.get_topic_combos(topic)) - entry.topics = topic_combos # Update entry with the exhaustive list, so client won't have to expand it again - all_topic_combos.update(topic_combos) - - for topic in topic_combos: - opal_server_data_update_count_per_topic.labels(topic).inc() - else: - logger.warning( - "[{pid}] No topics were provided for the following entry: {entry}", - pid=os.getpid(), - entry=entry, - ) - # publish all topics with all their sub combinations - logger.info( - "[{pid}] Publishing data update to topics: {topics}, reason: {reason}, entries: {entries}", - pid=os.getpid(), - topics=all_topic_combos, - reason=update.reason, - entries=logged_entries, - ) - - await self._publisher.publish( - list(all_topic_combos), update.dict(by_alias=True) - ) + await self._publisher.publish( + list(all_topic_combos), update.dict(by_alias=True) + ) diff --git a/packages/opal-server/opal_server/policy/bundles/api.py b/packages/opal-server/opal_server/policy/bundles/api.py index 9e8b3d0e2..d96e184dd 100644 --- a/packages/opal-server/opal_server/policy/bundles/api.py +++ b/packages/opal-server/opal_server/policy/bundles/api.py @@ -10,14 +10,29 @@ from opal_common.git_utils.commit_viewer import CommitViewer from opal_common.git_utils.repo_cloner import RepoClonePathFinder from opal_common.logger import logger +from opal_common.config import opal_common_config from opal_common.schemas.policy import PolicyBundle from opal_server.config import opal_server_config from starlette.responses import RedirectResponse -from opal_common.monitoring.prometheus_metrics import ( - opal_server_policy_bundle_request_count, - opal_server_policy_bundle_latency, - opal_server_policy_update_size, -) +from opentelemetry import trace, metrics +from opentelemetry.metrics import get_meter +from opentelemetry.trace import get_tracer + +if opal_common_config.ENABLE_OPENTELEMETRY_TRACING: + tracer = trace.get_tracer(__name__) +else: + tracer = None + +if opal_common_config.ENABLE_OPENTELEMETRY_METRICS: + meter = metrics.get_meter(__name__) + bundle_size_histogram = meter.create_histogram( + name="opal_server_policy_bundle_size", + description="Size of policy bundles served", + unit="files", + ) +else: + meter = None + bundle_size_histogram = None router = APIRouter() @@ -107,7 +122,16 @@ async def get_policy( ): """Get the policy bundle from the policy repo.""" - + + if tracer is not None: + with tracer.start_as_current_span("opal_server_policy_bundle_request") as span: + bundle = await process_policy_bundle(repo, input_paths, base_hash, span) + else: + bundle = await process_policy_bundle(repo, input_paths, base_hash) + + return bundle + +async def process_policy_bundle(repo, input_paths, base_hash, span=None): maker = BundleMaker( repo, in_directories=set(input_paths), @@ -115,42 +139,45 @@ async def get_policy( root_manifest_path=opal_server_config.POLICY_REPO_MANIFEST_PATH, bundle_ignore=opal_server_config.BUNDLE_IGNORE, ) - # check if commit exist in the repo + revision = None if base_hash: try: revision = repo.rev_parse(base_hash) except ValueError: - logger.warning(f"base_hash {base_hash} not exist in the repo") + logger.warning(f"base_hash {base_hash} does not exist in the repo") if revision is None: - opal_server_policy_bundle_request_count.labels(type="full").inc() - with opal_server_policy_bundle_latency.labels(type="full").time(): - bundle = maker.make_bundle(repo.head.commit) - bundle_size = ( - (len(bundle.data_modules) if bundle.data_modules is not None else 0) + - (len(bundle.policy_modules) if bundle.policy_modules is not None else 0) - ) - if bundle.deleted_files: - bundle_size += len(bundle.deleted_files.files) - opal_server_policy_update_size.labels(type="full").observe(bundle_size) - return bundle + bundle = maker.make_bundle(repo.head.commit) + bundle_size = bundle.calculate_size() + + if bundle_size_histogram is not None: + bundle_size_histogram.record(bundle_size, {"type": "full"}) + + if span is not None: + span.set_attribute("bundle.type", "full") + span.set_attribute("bundle.size", bundle_size) + + return bundle else: - opal_server_policy_bundle_request_count.labels(type="diff").inc() - with opal_server_policy_bundle_latency.labels(type="diff").time(): - try: - old_commit = repo.commit(base_hash) - diff_bundle = maker.make_diff_bundle(old_commit, repo.head.commit) - diff_bundle_size = ( - (len(diff_bundle.data_modules) if diff_bundle.data_modules is not None else 0) + - (len(diff_bundle.policy_modules) if diff_bundle.policy_modules is not None else 0) - ) - if diff_bundle.deleted_files: - diff_bundle_size += len(diff_bundle.deleted_files.files) - opal_server_policy_update_size.labels(type="diff").observe(diff_bundle_size) - return diff_bundle - except ValueError: - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail=f"commit with hash {base_hash} was not found in the policy repo!", - ) + try: + old_commit = repo.commit(base_hash) + bundle = maker.make_diff_bundle(old_commit, repo.head.commit) + bundle_size = bundle.calculate_size() + + if bundle_size_histogram is not None: + bundle_size_histogram.record(bundle_size, {"type": "diff"}) + + if span is not None: + span.set_attribute("bundle.type", "diff") + span.set_attribute("bundle.size", bundle_size) + + return bundle + except ValueError: + if span is not None: + span.set_status(trace.Status(trace.StatusCode.ERROR, "Commit not found")) + span.record_exception(ValueError(f"Commit {base_hash} not found")) + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Commit with hash {base_hash} was not found in the policy repo!", + ) \ No newline at end of file diff --git a/packages/opal-server/opal_server/policy/watcher/callbacks.py b/packages/opal-server/opal_server/policy/watcher/callbacks.py index 1e763ee1b..6af333d8b 100644 --- a/packages/opal-server/opal_server/policy/watcher/callbacks.py +++ b/packages/opal-server/opal_server/policy/watcher/callbacks.py @@ -18,7 +18,6 @@ ) from opal_common.topics.publisher import TopicPublisher from opal_common.topics.utils import policy_topics -from opal_common.monitoring.prometheus_metrics import opal_server_policy_update_size async def create_update_all_directories_in_repo( @@ -117,10 +116,6 @@ async def publish_changed_directories( ) if notification: - opal_server_policy_update_size.labels(type="directory").observe( - len(notification.update.changed_directories) - ) - async with publisher: await publisher.publish( topics=notification.topics, data=notification.update.dict() diff --git a/packages/opal-server/opal_server/policy/watcher/task.py b/packages/opal-server/opal_server/policy/watcher/task.py index 16a990cef..606607912 100644 --- a/packages/opal-server/opal_server/policy/watcher/task.py +++ b/packages/opal-server/opal_server/policy/watcher/task.py @@ -6,13 +6,12 @@ from fastapi_websocket_pubsub import Topic from fastapi_websocket_pubsub.pub_sub_server import PubSubEndpoint from opal_common.logger import logger +from opal_common.config import opal_common_config from opal_common.sources.base_policy_source import BasePolicySource from opal_server.config import opal_server_config -from opal_common.monitoring.prometheus_metrics import ( - opal_server_policy_update_count, - opal_server_policy_update_latency -) +from opentelemetry import trace +tracer = trace.get_tracer(__name__) class BasePolicyWatcherTask: """Manages the asyncio tasks of the policy watcher.""" @@ -104,10 +103,6 @@ def _init_should_stop(self): async def _fail(self, exc: Exception): """called when the watcher fails, and stops all tasks gracefully.""" - opal_server_policy_update_count.labels( - source="watcher", - status="error" - ).inc() logger.error("policy watcher failed with exception: {err}", err=repr(exc)) self.signal_stop() # trigger uvicorn graceful shutdown @@ -132,23 +127,8 @@ async def trigger(self, topic: Topic, data: Any): """Triggers the policy watcher from outside to check for changes (git pull)""" try: - opal_server_policy_update_count.labels( - source="webhook", - status="started" - ).inc() - - with opal_server_policy_update_latency.labels( - source="webhook", - status="success" - ).time(): + if opal_common_config.ENABLE_OPENTELEMETRY_TRACING: + with tracer.start_as_current_span("opal_server_policy_update"): await self._watcher.check_for_changes() - opal_server_policy_update_count.labels( - source="webhook", - status="success" - ).inc() except Exception as e: - opal_server_policy_update_count.labels( - source="webhook", - status="error" - ).inc() raise diff --git a/packages/opal-server/opal_server/pubsub.py b/packages/opal-server/opal_server/pubsub.py index 5f402c3ea..222abe26e 100644 --- a/packages/opal-server/opal_server/pubsub.py +++ b/packages/opal-server/opal_server/pubsub.py @@ -28,17 +28,32 @@ from opal_common.confi.confi import load_conf_if_none from opal_common.config import opal_common_config from opal_common.logger import logger -from opal_common.monitoring.prometheus_metrics import ( - active_clients, - client_data_subscriptions -) from opal_server.config import opal_server_config from pydantic import BaseModel from starlette.datastructures import QueryParams +from opentelemetry import metrics OPAL_CLIENT_INFO_PARAM_PREFIX = "__opal_" OPAL_CLIENT_INFO_CLIENT_ID = f"{OPAL_CLIENT_INFO_PARAM_PREFIX}client_id" +if opal_common_config.ENABLE_OPENTELEMETRY_METRICS: + meter = metrics.get_meter(__name__) +else: + meter = None + +if opal_common_config.ENABLE_OPENTELEMETRY_METRICS: + active_clients_counter = meter.create_counter( + name="opal_server_active_clients", + description="Number of active clients connected to the OPAL server", + ) + + client_data_subscriptions_counter = meter.create_up_down_counter( + name="opal_client_data_subscriptions", + description="Number of data subscriptions per client", + ) +else: + active_clients_counter = None + client_data_subscriptions_counter = None class ClientInfo(BaseModel): client_id: str @@ -83,8 +98,9 @@ def new_client( connect_time=time.time(), query_params=query_params, ) - source = f"{source_host}:{source_port}" if source_host and source_port else "unknown" - active_clients.labels(client_id=client_id, source=source).inc() + if active_clients_counter is not None: + source = f"{source_host}:{source_port}" if source_host and source_port else "unknown" + active_clients_counter.add(1, attributes={"client_id": client_id, "source": source}) client_info.refcount += 1 self._clients_by_ids[client_id] = client_info yield client_info @@ -94,8 +110,9 @@ def new_client( if client_info.refcount >= 1: self._clients_by_ids[client_id] = client_info else: - source = f"{client_info.source_host}:{client_info.source_port}" if client_info.source_host and client_info.source_port else "unknown" - active_clients.labels(client_id=client_id, source=source).dec() + if active_clients_counter is not None: + source = f"{client_info.source_host}:{client_info.source_port}" if client_info.source_host and client_info.source_port else "unknown" + active_clients_counter.add(-1, attributes={"client_id": client_id, "source": source}) async def on_subscribe( self, @@ -110,11 +127,15 @@ async def on_subscribe( # on_subscribe is sometimes called for the broadcaster, when there is no "current client" if client_info is not None: client_info.subscribed_topics.update(topics) - for topic in topics: - client_data_subscriptions.labels( - client_id=client_info.client_id, - topic=topic - ).inc() + if client_data_subscriptions_counter is not None: + for topic in topics: + client_data_subscriptions_counter.add( + 1, + attributes={ + "client_id": client_info.client_id, + "topic": topic + } + ) async def on_unsubscribe( self, @@ -129,11 +150,15 @@ async def on_unsubscribe( # on_subscribe is sometimes called for the broadcaster, when there is no "current client" if client_info is not None: client_info.subscribed_topics.difference_update(topics) - for topic in topics: - client_data_subscriptions.labels( - client_id=client_info.client_id, - topic=topic - ).dec() + if client_data_subscriptions_counter is not None: + for topic in topics: + client_data_subscriptions_counter.add( + -1, + attributes={ + "client_id": client_info.client_id, + "topic": topic + } + ) class PubSub: diff --git a/packages/opal-server/opal_server/scopes/api.py b/packages/opal-server/opal_server/scopes/api.py index b3838f150..0b63bce2e 100644 --- a/packages/opal-server/opal_server/scopes/api.py +++ b/packages/opal-server/opal_server/scopes/api.py @@ -25,6 +25,7 @@ from opal_common.authentication.types import EncryptionKeyFormat, JWTClaims from opal_common.authentication.verifier import Unauthorized from opal_common.logger import logger +from opal_common.config import opal_common_config from opal_common.monitoring import metrics from opal_common.schemas.data import ( DataSourceConfig, @@ -40,22 +41,29 @@ ServerSideTopicPublisher, ) from opal_common.urls import set_url_query_param -from opal_common.monitoring.prometheus_metrics import ( - opal_server_scope_request_count, - opal_server_scope_request_latency, - opal_server_scope_error_count, - opal_server_scope_operation_count, - opal_server_scope_data_update_latency, - opal_server_scope_data_update_count, - opal_server_scope_data_update_errors, - opal_server_scope_policy_sync_latency, - opal_server_scope_policy_sync_count -) from opal_server.config import opal_server_config from opal_server.data.data_update_publisher import DataUpdatePublisher from opal_server.git_fetcher import GitPolicyFetcher from opal_server.scopes.scope_repository import ScopeNotFoundError, ScopeRepository +from opentelemetry import trace +from opentelemetry import metrics as otel_metrics + +if opal_common_config.ENABLE_OPENTELEMETRY_TRACING: + tracer = trace.get_tracer(__name__) +else: + tracer = None + +if opal_common_config.ENABLE_OPENTELEMETRY_METRICS: + meter = otel_metrics.get_meter(__name__) + policy_bundle_size_histogram = meter.create_histogram( + name="opal_server_scope_policy_bundle_size", + description="Size of the policy bundles served per scope", + unit="bytes", + ) +else: + meter = None + policy_bundle_size_histogram = None def verify_private_key(private_key: str, key_format: EncryptionKeyFormat) -> bool: try: @@ -115,44 +123,37 @@ async def put_scope( scope_in: Scope, claims: JWTClaims = Depends(authenticator), ): - opal_server_scope_request_count.labels( - endpoint="put_scope", - method="PUT", - status="started" - ).inc() - with opal_server_scope_request_latency.labels( - endpoint="put_scope", - method="PUT", - status="processing" - ).time(): - try: - require_peer_type(authenticator, claims, PeerType.datasource) - except Unauthorized as ex: - logger.error(f"Unauthorized to PUT scope: {repr(ex)}") - opal_server_scope_error_count.labels( - scope_id=scope_in.scope_id, - error_type="Unauthorized", - endpoint="put_scope" - ).inc() - raise - - verify_private_key_or_throw(scope_in) - await scopes.put(scope_in) - opal_server_scope_operation_count.labels( - operation="create", - status="success" - ).inc() - - force_fetch_str = " (force fetch)" if force_fetch else "" - logger.info(f"Sync scope: {scope_in.scope_id}{force_fetch_str}") + if tracer: + with tracer.start_as_current_span("opal_server_scope_policy_update") as span: + span.set_attribute("scope_id", scope_in.scope_id) + return await _handle_put_scope(force_fetch, scope_in, claims) + else: + return await _handle_put_scope(force_fetch, scope_in, claims) + + async def _handle_put_scope( + force_fetch: bool, + scope_in: Scope, + claims: JWTClaims, + ): + try: + require_peer_type(authenticator, claims, PeerType.datasource) + except Unauthorized as ex: + logger.error(f"Unauthorized to PUT scope: {repr(ex)}") + raise - # All server replicas (leaders) should sync the scope. - await pubsub_endpoint.publish( - opal_server_config.POLICY_REPO_WEBHOOK_TOPIC, - {"scope_id": scope_in.scope_id, "force_fetch": force_fetch}, - ) + verify_private_key_or_throw(scope_in) + await scopes.put(scope_in) + + force_fetch_str = " (force fetch)" if force_fetch else "" + logger.info(f"Sync scope: {scope_in.scope_id}{force_fetch_str}") + + # All server replicas (leaders) should sync the scope. + await pubsub_endpoint.publish( + opal_server_config.POLICY_REPO_WEBHOOK_TOPIC, + {"scope_id": scope_in.scope_id, "force_fetch": force_fetch}, + ) - return Response(status_code=status.HTTP_201_CREATED) + return Response(status_code=status.HTTP_201_CREATED) @router.get( "", @@ -160,31 +161,12 @@ async def put_scope( response_model_exclude={"policy": {"auth"}}, ) async def get_all_scopes(*, claims: JWTClaims = Depends(authenticator)): - opal_server_scope_request_count.labels( - endpoint="get_all_scopes", - method="GET", - status="started" - ).inc() - with opal_server_scope_request_latency.labels( - endpoint="get_all_scopes", - method="GET", - status="processing" - ).time(): - try: - require_peer_type(authenticator, claims, PeerType.datasource) - except Unauthorized as ex: - logger.error(f"Unauthorized to get scopes: {repr(ex)}") - opal_server_scope_error_count.labels( - scope_id="all_scopes", - error_type="Unauthorized", - endpoint="get_all_scopes" - ).inc() - raise - opal_server_scope_operation_count.labels( - operation="get_all", - status="success" - ).inc() - return await scopes.all() + try: + require_peer_type(authenticator, claims, PeerType.datasource) + except Unauthorized as ex: + logger.error(f"Unauthorized to get scopes: {repr(ex)}") + raise + return await scopes.all() @router.get( "/{scope_id}", @@ -192,43 +174,19 @@ async def get_all_scopes(*, claims: JWTClaims = Depends(authenticator)): response_model_exclude={"policy": {"auth"}}, ) async def get_scope(*, scope_id: str, claims: JWTClaims = Depends(authenticator)): - opal_server_scope_request_count.labels( - endpoint="get_scope", - method="GET", - status="started" - ).inc() - with opal_server_scope_request_latency.labels( - endpoint="get_scope", - method="GET", - status="processing" - ).time(): - try: - require_peer_type(authenticator, claims, PeerType.datasource) - except Unauthorized as ex: - logger.error(f"Unauthorized to get scope: {repr(ex)}") - opal_server_scope_error_count.labels( - scope_id=scope_id, - error_type="Unauthorized", - endpoint="get_scope" - ).inc() - raise + try: + require_peer_type(authenticator, claims, PeerType.datasource) + except Unauthorized as ex: + logger.error(f"Unauthorized to get scope: {repr(ex)}") + raise - try: - scope = await scopes.get(scope_id) - opal_server_scope_operation_count.labels( - operation="get", - status="success" - ).inc() - return scope - except ScopeNotFoundError: - opal_server_scope_error_count.labels( - scope_id=scope_id, - error_type="NotFound", - endpoint="get_scope" - ).inc() - raise HTTPException( - status.HTTP_404_NOT_FOUND, detail=f"No such scope: {scope_id}" - ) + try: + scope = await scopes.get(scope_id) + return scope + except ScopeNotFoundError: + raise HTTPException( + status.HTTP_404_NOT_FOUND, detail=f"No such scope: {scope_id}" + ) @router.delete( "/{scope_id}", @@ -237,34 +195,15 @@ async def get_scope(*, scope_id: str, claims: JWTClaims = Depends(authenticator) async def delete_scope( *, scope_id: str, claims: JWTClaims = Depends(authenticator) ): - opal_server_scope_request_count.labels( - endpoint="delete_scope", - method="DELETE", - status="started" - ).inc() - with opal_server_scope_request_latency.labels( - endpoint="delete_scope", - method="DELETE", - status="processing" - ).time(): - try: - require_peer_type(authenticator, claims, PeerType.datasource) - except Unauthorized as ex: - logger.error(f"Unauthorized to delete scope: {repr(ex)}") - opal_server_scope_error_count.labels( - scope_id=scope_id, - error_type="Unauthorized", - endpoint="delete_scope" - ).inc() - raise - - # TODO: This should also asynchronously clean the repo from the disk (if it's not used by other scopes) - await scopes.delete(scope_id) - opal_server_scope_operation_count.labels( - operation="delete", - status="success" - ).inc() - return Response(status_code=status.HTTP_204_NO_CONTENT) + try: + require_peer_type(authenticator, claims, PeerType.datasource) + except Unauthorized as ex: + logger.error(f"Unauthorized to delete scope: {repr(ex)}") + raise + + # TODO: This should also asynchronously clean the repo from the disk (if it's not used by other scopes) + await scopes.delete(scope_id) + return Response(status_code=status.HTTP_204_NO_CONTENT) @router.post("/{scope_id}/refresh", status_code=status.HTTP_200_OK) async def refresh_scope( @@ -277,72 +216,49 @@ async def refresh_scope( ), claims: JWTClaims = Depends(authenticator), ): - with opal_server_scope_policy_sync_latency.labels(scope_id=scope_id).time(): - try: - require_peer_type(authenticator, claims, PeerType.datasource) - except Unauthorized as ex: - logger.error(f"Unauthorized to delete scope: {repr(ex)}") - opal_server_scope_error_count.labels(scope_id=scope_id, error_type="Unauthorized").inc() - raise + try: + require_peer_type(authenticator, claims, PeerType.datasource) + except Unauthorized as ex: + logger.error(f"Unauthorized to delete scope: {repr(ex)}") + raise - try: - _ = await scopes.get(scope_id) - - logger.info(f"Refresh scope: {scope_id}") - - # If the hinted hash is None, we have no way to know whether we should - # re-fetch the remote, so we force fetch, just in case. - force_fetch = hinted_hash is None - - # All server replicas (leaders) should sync the scope. - await pubsub_endpoint.publish( - opal_server_config.POLICY_REPO_WEBHOOK_TOPIC, - { - "scope_id": scope_id, - "force_fetch": force_fetch, - "hinted_hash": hinted_hash, - }, - ) - opal_server_scope_policy_sync_count.labels(scope_id=scope_id).inc() - return Response(status_code=status.HTTP_200_OK) + try: + _ = await scopes.get(scope_id) - except ScopeNotFoundError: - opal_server_scope_error_count.labels(scope_id=scope_id, error_type="NotFound").inc() - raise HTTPException( - status.HTTP_404_NOT_FOUND, detail=f"No such scope: {scope_id}" - ) + logger.info(f"Refresh scope: {scope_id}") + + # If the hinted hash is None, we have no way to know whether we should + # re-fetch the remote, so we force fetch, just in case. + force_fetch = hinted_hash is None + + # All server replicas (leaders) should sync the scope. + await pubsub_endpoint.publish( + opal_server_config.POLICY_REPO_WEBHOOK_TOPIC, + { + "scope_id": scope_id, + "force_fetch": force_fetch, + "hinted_hash": hinted_hash, + }, + ) + return Response(status_code=status.HTTP_200_OK) + + except ScopeNotFoundError: + raise HTTPException( + status.HTTP_404_NOT_FOUND, detail=f"No such scope: {scope_id}" + ) @router.post("/refresh", status_code=status.HTTP_200_OK) async def sync_all_scopes(claims: JWTClaims = Depends(authenticator)): """sync all scopes.""" - opal_server_scope_request_count.labels( - endpoint="sync_all_scopes", - method="POST", - status="started" - ).inc() - with opal_server_scope_request_latency.labels( - endpoint="sync_all_scopes", - method="POST", - status="processing" - ).time(): - try: - require_peer_type(authenticator, claims, PeerType.datasource) - except Unauthorized as ex: - logger.error(f"Unauthorized to refresh all scopes: {repr(ex)}") - opal_server_scope_error_count.labels( - scope_id="all_scopes", - error_type="Unauthorized", - endpoint="sync_all_scopes" - ).inc() - raise - - # All server replicas (leaders) should sync all scopes. - await pubsub_endpoint.publish(opal_server_config.POLICY_REPO_WEBHOOK_TOPIC) - opal_server_scope_operation_count.labels( - operation="sync_all", - status="success" - ).inc() - return Response(status_code=status.HTTP_200_OK) + try: + require_peer_type(authenticator, claims, PeerType.datasource) + except Unauthorized as ex: + logger.error(f"Unauthorized to refresh all scopes: {repr(ex)}") + raise + + # All server replicas (leaders) should sync all scopes. + await pubsub_endpoint.publish(opal_server_config.POLICY_REPO_WEBHOOK_TOPIC) + return Response(status_code=status.HTTP_200_OK) @router.get( "/{scope_id}/policy", @@ -358,19 +274,33 @@ async def get_scope_policy( description="hash of previous bundle already downloaded, server will return a diff bundle.", ), ): + if tracer: + with tracer.start_as_current_span("opal_server_scope_policy_bundle_request") as span: + span.set_attribute("scope_id", scope_id) + policy_bundle = await _handle_get_scope_policy(scope_id, base_hash) + if policy_bundle_size_histogram and policy_bundle.bundle: + bundle_size = policy_bundle.calculate_size() + policy_bundle_size_histogram.record( + bundle_size, + attributes={"scope_id": scope_id}, + ) + return policy_bundle + else: + return await _handle_get_scope_policy(scope_id, base_hash) + + async def _handle_get_scope_policy(scope_id: str, base_hash: Optional[str]): try: scope = await scopes.get(scope_id) except ScopeNotFoundError: logger.warning( - "Requested scope {scope_id} not found, returning default scope", - scope_id=scope_id, + f"Requested scope {scope_id} not found, returning default scope" ) return await _generate_default_scope_bundle(scope_id) if not isinstance(scope.policy, GitPolicyScopeSource): raise HTTPException( status.HTTP_501_NOT_IMPLEMENTED, - detail=f"policy source is not yet implemented: {scope_id}", + detail=f"Policy source is not yet implemented for scope: {scope_id}", ) fetcher = GitPolicyFetcher( @@ -383,8 +313,7 @@ async def get_scope_policy( return await run_sync(fetcher.make_bundle, base_hash) except (InvalidGitRepositoryError, pygit2.GitError, ValueError): logger.warning( - "Requested scope {scope_id} has invalid repo, returning default scope", - scope_id=scope_id, + f"Requested scope {scope_id} has invalid repo, returning default scope" ) return await _generate_default_scope_bundle(scope_id) @@ -452,28 +381,31 @@ async def publish_data_update_event( claims: JWTClaims = Depends(authenticator), scope_id: str = Path(..., description="Scope ID"), ): - with opal_server_scope_data_update_latency.labels(scope_id=scope_id).time(): - try: - require_peer_type(authenticator, claims, PeerType.datasource) - - restrict_optional_topics_to_publish(authenticator, claims, update) - - for entry in update.entries: - entry.topics = [f"data:{topic}" for topic in entry.topics] - - await DataUpdatePublisher( - ScopedServerSideTopicPublisher(pubsub_endpoint, scope_id) - ).publish_data_updates(update) - - opal_server_scope_data_update_count.labels(scope_id=scope_id).inc() - except Unauthorized as ex: - logger.error(f"Unauthorized to publish update: {repr(ex)}") - opal_server_scope_data_update_errors.labels(scope_id=scope_id).inc() - opal_server_scope_error_count.labels( - scope_id=scope_id, - error_type="UpdateFailed", - endpoint="data_update" - ).inc() - raise + if tracer: + with tracer.start_as_current_span("opal_server_scope_data_update") as span: + span.set_attribute("scope_id", scope_id) + await _handle_publish_data_update_event(update, claims, scope_id) + else: + await _handle_publish_data_update_event(update, claims, scope_id) + + async def _handle_publish_data_update_event( + update: DataUpdate, + claims: JWTClaims, + scope_id: str, + ): + try: + require_peer_type(authenticator, claims, PeerType.datasource) + + restrict_optional_topics_to_publish(authenticator, claims, update) + + for entry in update.entries: + entry.topics = [f"data:{topic}" for topic in entry.topics] + + await DataUpdatePublisher( + ScopedServerSideTopicPublisher(pubsub_endpoint, scope_id) + ).publish_data_updates(update) + except Unauthorized as ex: + logger.error(f"Unauthorized to publish update: {repr(ex)}") + raise return router diff --git a/packages/opal-server/opal_server/security/api.py b/packages/opal-server/opal_server/security/api.py index 1d9db4409..8a1ccbca8 100644 --- a/packages/opal-server/opal_server/security/api.py +++ b/packages/opal-server/opal_server/security/api.py @@ -4,12 +4,28 @@ from opal_common.authentication.deps import StaticBearerAuthenticator from opal_common.authentication.signer import JWTSigner from opal_common.logger import logger +from opal_common.config import opal_common_config from opal_common.schemas.security import AccessToken, AccessTokenRequest, TokenDetails -from opal_common.monitoring.prometheus_metrics import ( - token_generation_errors, - token_generated_count, - token_request_count, -) +from opentelemetry import metrics + +if opal_common_config.ENABLE_OPENTELEMETRY_METRICS: + meter = metrics.get_meter(__name__) +else: + meter = None + +if opal_common_config.ENABLE_OPENTELEMETRY_METRICS: + token_requested_counter = meter.create_counter( + name="opal_server_token_requested", + description="Number of token requests" + ) + + token_generated_counter = meter.create_up_down_counter( + name="opal_client_token_generated", + description="Number of tokens generated" + ) +else: + token_requested_counter = None + token_generated_counter = None def init_security_router(signer: JWTSigner, authenticator: StaticBearerAuthenticator): @@ -22,21 +38,18 @@ def init_security_router(signer: JWTSigner, authenticator: StaticBearerAuthentic dependencies=[Depends(authenticator)], ) async def generate_new_access_token(req: AccessTokenRequest): - token_request_count.labels( - token_type=req.type.value, - status="received" - ).inc() + if token_requested_counter is not None: + token_requested_counter.add(1, attributes={ + 'token_type': req.type.value, + 'status': 'received' + }) if not signer.enabled: - token_generation_errors.labels( - error_type="SignerDisabled", - token_type=req.type.value - ).inc() - - token_request_count.labels( - token_type=req.type.value, - status="error" - ).inc() + if token_requested_counter is not None: + token_requested_counter.add(1, attributes={ + 'token_type': req.type.value, + 'status': 'error' + }) raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, @@ -47,15 +60,17 @@ async def generate_new_access_token(req: AccessTokenRequest): token = signer.sign(sub=req.id, token_lifetime=req.ttl, custom_claims=claims) logger.info(f"Generated opal token: peer_type={req.type.value}") - token_generated_count.labels( - peer_type=req.type.value, - ttl=req.ttl - ).inc() + if token_generated_counter is not None: + token_generated_counter.add(1, attributes={ + 'peer_type': req.type.value, + 'ttl': req.ttl.total_seconds() if req.ttl else None + }) - token_request_count.labels( - token_type=req.type.value, - status="success" - ).inc() + if token_requested_counter is not None: + token_requested_counter.add(1, attributes={ + 'token_type': req.type.value, + 'status': 'success' + }) return AccessToken( token=token, @@ -65,7 +80,7 @@ async def generate_new_access_token(req: AccessTokenRequest): expired=datetime.utcnow() + req.ttl, claims=claims, ), - ) + ) except Exception as ex: logger.error(f"Failed to generate token: {str(ex)}") error_type = ( @@ -73,14 +88,11 @@ async def generate_new_access_token(req: AccessTokenRequest): if "token" in str(ex).lower() else "UnexpectedError" ) - token_generation_errors.labels( - error_type=error_type, - token_type=req.type.value - ).inc() - token_request_count.labels( - token_type=req.type.value, - status="error" - ).inc() + if token_requested_counter is not None: + token_requested_counter.add(1, attributes={ + 'token_type': req.type.value, + 'status': 'error' + }) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to generate token due to server error.", diff --git a/packages/opal-server/opal_server/server.py b/packages/opal-server/opal_server/server.py index 848beb47f..3f7b4dcef 100644 --- a/packages/opal-server/opal_server/server.py +++ b/packages/opal-server/opal_server/server.py @@ -8,16 +8,16 @@ from fastapi import Depends, FastAPI from fastapi_websocket_pubsub.event_broadcaster import EventBroadcasterContextManager +from prometheus_client import generate_latest, CONTENT_TYPE_LATEST from fastapi.responses import Response -from prometheus_client import CONTENT_TYPE_LATEST, generate_latest - from opal_common.authentication.deps import JWTAuthenticator, StaticBearerAuthenticator from opal_common.authentication.signer import JWTSigner from opal_common.confi.confi import load_conf_if_none from opal_common.config import opal_common_config from opal_common.logger import configure_logs, logger from opal_common.middleware import configure_middleware -from opal_common.monitoring import apm, metrics +from opal_common.monitoring import apm +import opal_common.monitoring.metrics as opal_dd_metrics from opal_common.schemas.data import ServerDataSourceConfig from opal_common.synchronization.named_lock import NamedLock from opal_common.topics.publisher import ( @@ -42,6 +42,15 @@ from opal_server.security.api import init_security_router from opal_server.security.jwks import JwksStaticEndpoint from opal_server.statistics import OpalStatistics, init_statistics_router +from opentelemetry import trace +from opentelemetry import metrics +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.resources import Resource +from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.sdk.trace.export import BatchSpanProcessor +from opentelemetry.exporter.prometheus import PrometheusMetricReader +from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter +from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor class OpalServer: @@ -109,7 +118,7 @@ def __init__( self._policy_remote_url = policy_remote_url self._configure_monitoring() - metrics.increment("startup") + opal_dd_metrics.increment("startup") self.data_sources_config: ServerDataSourceConfig = ( data_sources_config @@ -206,6 +215,16 @@ def _init_fast_api_app(self): self._configure_api_routes(app) self._configure_lifecycle_callbacks(app) + if opal_common_config.ENABLE_OPENTELEMETRY_TRACING: + FastAPIInstrumentor.instrument_app(app) + + if opal_common_config.ENABLE_OPENTELEMETRY_METRICS: + @app.get("/metrics") + async def metrics(): + data = generate_latest() + return Response(content=data, media_type=CONTENT_TYPE_LATEST) + logger.info("Mounted /metrics endpoint for Prometheus metrics.") + return app def _configure_monitoring(self): @@ -213,13 +232,59 @@ def _configure_monitoring(self): apm.configure_apm(opal_server_config.ENABLE_DATADOG_APM, "opal-server") - metrics.configure_metrics( + opal_dd_metrics.configure_metrics( enable_metrics=opal_common_config.ENABLE_METRICS, statsd_host=os.environ.get("DD_AGENT_HOST", "localhost"), statsd_port=8125, namespace="opal", ) + if opal_common_config.ENABLE_OPENTELEMETRY_TRACING: + self._initialize_opentelemetry_tracing() + + # log values of the configuration variable ENABLE_OPENTELEMETRY_METRICS + logger.info( + "OpenTelemetry metrics are enabled: {enabled}", + enabled=opal_common_config.ENABLE_OPENTELEMETRY_METRICS, + ) + if opal_common_config.ENABLE_OPENTELEMETRY_METRICS: + self._initialize_opentelemetry_metrics() + + def _initialize_opentelemetry_tracing(self): + resource = Resource.create({"service.name": "opal-server"}) + tracer_provider = TracerProvider(resource=resource) + trace.set_tracer_provider(tracer_provider) + + otlp_exporter = OTLPSpanExporter( + endpoint = opal_common_config.OPENTELEMETRY_OTLP_ENDPOINT, + insecure=True + ) + + span_processor = BatchSpanProcessor(otlp_exporter) + tracer_provider.add_span_processor(span_processor) + + self.tracer = trace.get_tracer(__name__) + logger.info("OpenTelemetry tracing is enabled.") + + def _initialize_opentelemetry_metrics(self): + resource = Resource.create({"service.name": "opal-server"}) + self.prometheus_metric_reader = PrometheusMetricReader() + + meter_provider = MeterProvider( + resource=resource, + metric_readers=[self.prometheus_metric_reader] + ) + metrics.set_meter_provider(meter_provider) + self.meter = metrics.get_meter(__name__) + + self.startup_counter = self.meter.create_counter( + name="startup", + description="Number of times the application has started", + ) + self.startup_counter.add(1) + + logger.info("OpenTelemetry metrics are enabled.") + def _configure_api_routes(self, app: FastAPI): """Mounts the api routes on the app object.""" authenticator = JWTAuthenticator(self.signer) @@ -281,11 +346,6 @@ def _configure_api_routes(self, app: FastAPI): @app.get("/", include_in_schema=False) def healthcheck(): return {"status": "ok"} - - @app.get("/metrics", include_in_schema=False) - def metrics(): - """Endpoint to expose Prometheus metrics.""" - return Response(generate_latest(), media_type=CONTENT_TYPE_LATEST) return app diff --git a/packages/opal-server/requires.txt b/packages/opal-server/requires.txt index 9dd75773e..e27d5f133 100644 --- a/packages/opal-server/requires.txt +++ b/packages/opal-server/requires.txt @@ -7,4 +7,10 @@ slowapi>=0.1.5,<1 pygit2>=1.14.1,<1.15 asgiref>=3.5.2,<4 redis>=4.3.4,<5 -prometheus_client \ No newline at end of file +prometheus_client +opentelemetry-api>=1.28.2 +opentelemetry-sdk>=1.28.2 +opentelemetry-instrumentation +opentelemetry-instrumentation-fastapi +opentelemetry-exporter-otlp +opentelemetry-exporter-prometheus \ No newline at end of file diff --git a/packages/requires.txt b/packages/requires.txt index 96939e4a7..9611c9af0 100644 --- a/packages/requires.txt +++ b/packages/requires.txt @@ -12,4 +12,9 @@ uvicorn[standard]>=0.17.6,<1 fastapi-utils>=0.2.1,<1 setuptools>=70.0.0 # not directly required, pinned by Snyk to avoid a vulnerability anyio>=4.4.0 # not directly required, pinned by Snyk to avoid a vulnerability -prometheus_client \ No newline at end of file +prometheus_client +opentelemetry-api +opentelemetry-sdk +opentelemetry-instrumentation +opentelemetry-instrumentation-fastapi +opentelemetry-exporter-otlp \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 1d0322f24..cc34de0b9 100644 --- a/requirements.txt +++ b/requirements.txt @@ -10,4 +10,9 @@ twine setuptools>=70.0.0 # not directly required, pinned by Snyk to avoid a vulnerability zipp>=3.19.1 # not directly required, pinned by Snyk to avoid a vulnerability prometheus_client - +opentelemetry-api>=1.28.2 +opentelemetry-sdk>=1.28.2 +opentelemetry-instrumentation +opentelemetry-instrumentation-fastapi +opentelemetry-exporter-otlp +opentelemetry-exporter-prometheus \ No newline at end of file From 81e989a79908b5e651e062e21a61b66809b295c0 Mon Sep 17 00:00:00 2001 From: Piyush Sardana Date: Wed, 20 Nov 2024 12:32:30 +0100 Subject: [PATCH 19/29] added metrics and traces in documentation --- .../docs/tutorials/monitoring_opal.mdx | 80 +++++++++++++++++++ 1 file changed, 80 insertions(+) diff --git a/documentation/docs/tutorials/monitoring_opal.mdx b/documentation/docs/tutorials/monitoring_opal.mdx index 6c4a8ec42..a2b0fab13 100644 --- a/documentation/docs/tutorials/monitoring_opal.mdx +++ b/documentation/docs/tutorials/monitoring_opal.mdx @@ -77,6 +77,86 @@ OPAL_OPENTELEMETRY_OTLP_ENDPOINT= - Both the server and client will expose a `/metrics` endpoint that returns metrics in Prometheus format. - Traces are exported to the configured OpenTelemetry Collector endpoint using OTLP over gRPC. +### Available Metrics and Traces + +Below is a list of the available metrics and traces in OPAL, along with their types, available tags (attributes), and explanations. + +#### OPAL Server Metrics and Traces + +##### 1) `opal_server_data_update` +- **Type**: Trace +- **Description**: Represents a data update operation in the OPAL server. This trace spans the process of publishing data updates to clients. +- **Attributes**: + - `topics_count`: Number of topics involved in the data update. + - `entries_count`: Number of data update entries. + - Additional attributes related to errors or execution time. + +##### 2) `opal_server_policy_update` +- **Type**: Trace +- **Description**: Represents a policy update operation in the OPAL server. This trace spans the process of checking for policy changes and notifying clients. +- **Attributes**: + - Information about the policy repository, such as commit hashes. + - Errors encountered during the update process. + +##### 3) `opal_server_policy_bundle_request` +- **Type**: Trace +- **Description**: Represents a request for a policy bundle from a client. This trace spans the process of generating and serving the policy bundle to the client. +- **Attributes**: + - `bundle.type`: The type of bundle (full or diff). + - `bundle.size`: The size of the bundle in number of files or bytes. + - `scope_id`: The scope identifier if scopes are used. + +##### 4) `opal_server_policy_bundle_size` +- **Type**: Metric (Histogram) +- **Unit**: Files +- **Description**: Records the size of the policy bundles served by the OPAL server. The size is measured in the number of files included in the bundle. +- **Attributes**: + - `type`: The type of bundle (full or diff). + +##### 5) `opal_server_active_clients` +- **Type**: Metric (UpDownCounter) +- **Description**: Tracks the number of active clients connected to the OPAL server. +- **Attributes**: + - `client_id`: The unique identifier of the client. + - `source`: The source host and port of the client (e.g., 192.168.1.10:34567). + +#### OPAL Client Metrics and Traces + +##### 1) `opal_client_data_subscriptions` +- **Type**: Metric (UpDownCounter) +- **Description**: Tracks the number of data subscriptions per client. +- **Attributes**: + - `client_id`: The unique identifier of the client. + - `topic`: The topic to which the client is subscribed. + +##### 2) `opal_client_data_update_trigger` +- **Type**: Trace +- **Description**: Represents the operation of triggering a data update via the API in the OPAL client. +- **Attributes**: + - `source`: The source of the trigger (e.g., API). + - Errors encountered during the trigger. + +##### 3) `opal_client_data_update_apply` +- **Type**: Trace +- **Description**: Represents the application of a data update within the OPAL client. This trace spans the process of fetching and applying data updates from the server. +- **Attributes**: + - Execution time. + - Errors encountered during the update. + +##### 4) `opal_client_policy_update_apply` +- **Type**: Trace +- **Description**: Represents the application of a policy update within the OPAL client. This trace spans the process of fetching and applying policy updates from the server. +- **Attributes**: + - Execution time. + - Errors encountered during the update. + +##### 5) `opal_client_policy_store_status` +- **Type**: Metric (Observable Gauge) +- **Description**: Indicates the current status of the policy store's authentication type used by the OPAL client. +- **Attributes**: + - `auth_type`: The authentication type configured for the policy store (e.g., TOKEN, OAUTH, NONE). + - **Value**: The metric has a value of 1 when the policy store is active with the specified authentication type. + ### Example To monitor OPAL using Prometheus and Grafana, a ready-to-use Docker Compose configuration is provided in the root directory of the repository under docker. The file is named docker-compose-with-prometheus-metrics.yml. From d197fe878c722194f562724265f0b1edf5fa88dd Mon Sep 17 00:00:00 2001 From: Piyush Sardana Date: Fri, 22 Nov 2024 06:09:00 +0100 Subject: [PATCH 20/29] added scope id as an attribute --- .../opal_server/data/data_update_publisher.py | 2 +- packages/opal-server/opal_server/scopes/api.py | 18 +++++++++++++----- 2 files changed, 14 insertions(+), 6 deletions(-) diff --git a/packages/opal-server/opal_server/data/data_update_publisher.py b/packages/opal-server/opal_server/data/data_update_publisher.py index 596c4107d..0daf6b5fc 100644 --- a/packages/opal-server/opal_server/data/data_update_publisher.py +++ b/packages/opal-server/opal_server/data/data_update_publisher.py @@ -80,7 +80,6 @@ async def publish_data_updates(self, update: DataUpdate): all_topic_combos = set() - span.set_attribute("topics_count", len(all_topic_combos)) span.set_attribute("entries_count", len(update.entries)) # a nicer format of entries to the log logged_entries = [ @@ -118,6 +117,7 @@ async def publish_data_updates(self, update: DataUpdate): reason=update.reason, entries=logged_entries, ) + span.set_attribute("topics", list(all_topic_combos)) await self._publisher.publish( list(all_topic_combos), update.dict(by_alias=True) diff --git a/packages/opal-server/opal_server/scopes/api.py b/packages/opal-server/opal_server/scopes/api.py index 0b63bce2e..8993885b3 100644 --- a/packages/opal-server/opal_server/scopes/api.py +++ b/packages/opal-server/opal_server/scopes/api.py @@ -57,7 +57,7 @@ if opal_common_config.ENABLE_OPENTELEMETRY_METRICS: meter = otel_metrics.get_meter(__name__) policy_bundle_size_histogram = meter.create_histogram( - name="opal_server_scope_policy_bundle_size", + name="opal_server_policy_bundle_size", description="Size of the policy bundles served per scope", unit="bytes", ) @@ -124,7 +124,7 @@ async def put_scope( claims: JWTClaims = Depends(authenticator), ): if tracer: - with tracer.start_as_current_span("opal_server_scope_policy_update") as span: + with tracer.start_as_current_span("opal_server_policy_update") as span: span.set_attribute("scope_id", scope_in.scope_id) return await _handle_put_scope(force_fetch, scope_in, claims) else: @@ -275,7 +275,7 @@ async def get_scope_policy( ), ): if tracer: - with tracer.start_as_current_span("opal_server_scope_policy_bundle_request") as span: + with tracer.start_as_current_span("opal_server_policy_bundle_request") as span: span.set_attribute("scope_id", scope_id) policy_bundle = await _handle_get_scope_policy(scope_id, base_hash) if policy_bundle_size_histogram and policy_bundle.bundle: @@ -382,9 +382,9 @@ async def publish_data_update_event( scope_id: str = Path(..., description="Scope ID"), ): if tracer: - with tracer.start_as_current_span("opal_server_scope_data_update") as span: + with tracer.start_as_current_span("opal_server_data_update") as span: span.set_attribute("scope_id", scope_id) - await _handle_publish_data_update_event(update, claims, scope_id) + await _handle_publish_data_update_event(update, claims, scope_id, span) else: await _handle_publish_data_update_event(update, claims, scope_id) @@ -392,14 +392,22 @@ async def _handle_publish_data_update_event( update: DataUpdate, claims: JWTClaims, scope_id: str, + span: trace.Span = None, ): try: + all_topics = set() require_peer_type(authenticator, claims, PeerType.datasource) restrict_optional_topics_to_publish(authenticator, claims, update) for entry in update.entries: entry.topics = [f"data:{topic}" for topic in entry.topics] + all_topics.update(entry.topics) + + if span is not None: + span.set_attribute("entries_count", len(update.entries)) + span.set_attribute("topics", list(all_topics)) + await DataUpdatePublisher( ScopedServerSideTopicPublisher(pubsub_endpoint, scope_id) From f63beb673e75e9d975b58822b5563dd4b69f05b2 Mon Sep 17 00:00:00 2001 From: Piyush Sardana Date: Fri, 22 Nov 2024 06:20:10 +0100 Subject: [PATCH 21/29] renamed docker compose --- ...etrics.yml => docker-compose-with-prometheus-and-otel.yml} | 3 ++- documentation/docs/tutorials/monitoring_opal.mdx | 4 ++-- 2 files changed, 4 insertions(+), 3 deletions(-) rename docker/{docker-compose-with-prometheus-metrics.yml => docker-compose-with-prometheus-and-otel.yml} (97%) diff --git a/docker/docker-compose-with-prometheus-metrics.yml b/docker/docker-compose-with-prometheus-and-otel.yml similarity index 97% rename from docker/docker-compose-with-prometheus-metrics.yml rename to docker/docker-compose-with-prometheus-and-otel.yml index 58ef66194..1024d8024 100644 --- a/docker/docker-compose-with-prometheus-metrics.yml +++ b/docker/docker-compose-with-prometheus-and-otel.yml @@ -7,8 +7,9 @@ services: - POSTGRES_PASSWORD=postgres volumes: - postgres_data:/var/lib/postgresql/data + otel-collector: - image: otel/opentelemetry-collector-contrib:latest + image: otel/opentelemetry-collector-contrib:0.114.0 volumes: - ./docker_files/otel-collector-config.yaml:/etc/otelcol/config.yaml command: ["--config", "/etc/otelcol/config.yaml"] diff --git a/documentation/docs/tutorials/monitoring_opal.mdx b/documentation/docs/tutorials/monitoring_opal.mdx index a2b0fab13..2ea14ee4f 100644 --- a/documentation/docs/tutorials/monitoring_opal.mdx +++ b/documentation/docs/tutorials/monitoring_opal.mdx @@ -159,12 +159,12 @@ Below is a list of the available metrics and traces in OPAL, along with their ty ### Example -To monitor OPAL using Prometheus and Grafana, a ready-to-use Docker Compose configuration is provided in the root directory of the repository under docker. The file is named docker-compose-with-prometheus-metrics.yml. +To monitor OPAL using Prometheus and Grafana, a ready-to-use Docker Compose configuration is provided in the root directory of the repository under docker. The file is named docker-compose-with-prometheus-and-otel.yml. Run the following command to start Prometheus and Grafana: ``` -docker compose -f docker/docker-compose-with-prometheus-metrics.yml up +docker compose -f docker/docker-compose-with-prometheus-and-otel.yml up ``` This setup will start Prometheus to scrape metrics from OPAL server and client, and Grafana to visualize the metrics. \ No newline at end of file From 52e3114ec1b531d2f03f1eaaef85e3bfe903bdc5 Mon Sep 17 00:00:00 2001 From: Piyush Sardana Date: Fri, 22 Nov 2024 06:22:09 +0100 Subject: [PATCH 22/29] fixed how span is being used --- packages/opal-server/opal_server/scopes/api.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/opal-server/opal_server/scopes/api.py b/packages/opal-server/opal_server/scopes/api.py index 8993885b3..e19de5d0c 100644 --- a/packages/opal-server/opal_server/scopes/api.py +++ b/packages/opal-server/opal_server/scopes/api.py @@ -404,7 +404,7 @@ async def _handle_publish_data_update_event( entry.topics = [f"data:{topic}" for topic in entry.topics] all_topics.update(entry.topics) - if span is not None: + if span: span.set_attribute("entries_count", len(update.entries)) span.set_attribute("topics", list(all_topics)) From 2db0782555794fad607532f18861cbd9074261a0 Mon Sep 17 00:00:00 2001 From: Piyush Sardana Date: Fri, 22 Nov 2024 06:27:25 +0100 Subject: [PATCH 23/29] added documentation --- documentation/docs/getting-started/configuration.mdx | 8 ++++++-- packages/opal-common/opal_common/config.py | 4 ++-- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/documentation/docs/getting-started/configuration.mdx b/documentation/docs/getting-started/configuration.mdx index 636f4d20f..b0137ab12 100644 --- a/documentation/docs/getting-started/configuration.mdx +++ b/documentation/docs/getting-started/configuration.mdx @@ -25,7 +25,7 @@ Please use this table as a reference. | LOG_FILE_COMPRESSION | | | | LOG_FILE_SERIALIZE | Serialize log messages in file into json format (useful for log aggregation platforms) | | | LOG_FILE_LEVEL | -| LOG_DIAGNOSE | Include diagnosis in log messages | | +| LOG_DIAGNOSE | Include diagnosis in log messages | | | STATISTICS_ENABLED | Collect statistics about OPAL clients. | | | STATISTICS_ADD_CLIENT_CHANNEL | The topic to update about the new OPAL clients connection. | | | STATISTICS_REMOVE_CLIENT_CHANNEL | The topic to update about the OPAL clients disconnection. | | @@ -40,7 +40,11 @@ Please use this table as a reference. | AUTH_PUBLIC_KEY | | | | AUTH_JWT_ALGORITHM | JWT algorithm. See possible values [here](https://pyjwt.readthedocs.io/en/stable/algorithms.html). | | | AUTH_JWT_AUDIENCE | | | -| AUTH_JWT_ISSUER | | | +| AUTH_JWT_ISSUER | | | +| ENABLE_OPENTELEMETRY_TRACING | Set if OPAL should enable tracing with OpenTelemetry | | +| ENABLE_OPENTELEMETRY_METRICS | Set if OPAL should enable metrics with OpenTelemetry | | +| ENABLE_OPENTELEMETRY_TRACING | The OpenTelemetry OTLP endpoint to send traces to, set only if ENABLE_OPENTELEMETRY_TRACING is enabled | | + ## OPAL Server Configuration Variables diff --git a/packages/opal-common/opal_common/config.py b/packages/opal-common/opal_common/config.py index 088932c2f..729706858 100644 --- a/packages/opal-common/opal_common/config.py +++ b/packages/opal-common/opal_common/config.py @@ -165,12 +165,12 @@ class OpalCommonConfig(Confi): ENABLE_OPENTELEMETRY_TRACING = confi.bool ( "ENABLE_OPENTELEMETRY_TRACING", False, - description="Set if OPAL server should enable tracing with OpenTelemetry", + description="Set if OPAL should enable tracing with OpenTelemetry", ) ENABLE_OPENTELEMETRY_METRICS = confi.bool ( "ENABLE_OPENTELEMETRY_METRICS", False, - description="Set if OPAL server should enable metrics with OpenTelemetry", + description="Set if OPAL should enable metrics with OpenTelemetry", ) OPENTELEMETRY_OTLP_ENDPOINT = confi.str( "OPENTELEMETRY_OTLP_ENDPOINT", From 837780ee37bc96ad8be3cfa7f877ddd741e877db Mon Sep 17 00:00:00 2001 From: Piyush Sardana Date: Fri, 22 Nov 2024 06:33:59 +0100 Subject: [PATCH 24/29] fixed descriptions --- packages/opal-server/opal_server/policy/bundles/api.py | 2 +- packages/opal-server/opal_server/policy/watcher/task.py | 2 +- packages/opal-server/opal_server/scopes/api.py | 8 +++++--- 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/packages/opal-server/opal_server/policy/bundles/api.py b/packages/opal-server/opal_server/policy/bundles/api.py index d96e184dd..9aa77d892 100644 --- a/packages/opal-server/opal_server/policy/bundles/api.py +++ b/packages/opal-server/opal_server/policy/bundles/api.py @@ -139,7 +139,7 @@ async def process_policy_bundle(repo, input_paths, base_hash, span=None): root_manifest_path=opal_server_config.POLICY_REPO_MANIFEST_PATH, bundle_ignore=opal_server_config.BUNDLE_IGNORE, ) - + # check if commit exist in the repo revision = None if base_hash: try: diff --git a/packages/opal-server/opal_server/policy/watcher/task.py b/packages/opal-server/opal_server/policy/watcher/task.py index 606607912..f71769633 100644 --- a/packages/opal-server/opal_server/policy/watcher/task.py +++ b/packages/opal-server/opal_server/policy/watcher/task.py @@ -102,7 +102,7 @@ def _init_should_stop(self): self._should_stop = asyncio.Event() async def _fail(self, exc: Exception): - """called when the watcher fails, and stops all tasks gracefully.""" + """Called when the watcher fails, and stops all tasks gracefully.""" logger.error("policy watcher failed with exception: {err}", err=repr(exc)) self.signal_stop() # trigger uvicorn graceful shutdown diff --git a/packages/opal-server/opal_server/scopes/api.py b/packages/opal-server/opal_server/scopes/api.py index e19de5d0c..867257172 100644 --- a/packages/opal-server/opal_server/scopes/api.py +++ b/packages/opal-server/opal_server/scopes/api.py @@ -249,7 +249,7 @@ async def refresh_scope( @router.post("/refresh", status_code=status.HTTP_200_OK) async def sync_all_scopes(claims: JWTClaims = Depends(authenticator)): - """sync all scopes.""" + """Sync all scopes.""" try: require_peer_type(authenticator, claims, PeerType.datasource) except Unauthorized as ex: @@ -293,7 +293,8 @@ async def _handle_get_scope_policy(scope_id: str, base_hash: Optional[str]): scope = await scopes.get(scope_id) except ScopeNotFoundError: logger.warning( - f"Requested scope {scope_id} not found, returning default scope" + f"Requested scope {scope_id} not found, returning default scope", + scope_id=scope_id, ) return await _generate_default_scope_bundle(scope_id) @@ -313,7 +314,8 @@ async def _handle_get_scope_policy(scope_id: str, base_hash: Optional[str]): return await run_sync(fetcher.make_bundle, base_hash) except (InvalidGitRepositoryError, pygit2.GitError, ValueError): logger.warning( - f"Requested scope {scope_id} has invalid repo, returning default scope" + f"Requested scope {scope_id} has invalid repo, returning default scope", + scope_id=scope_id, ) return await _generate_default_scope_bundle(scope_id) From 61fd24cb2b85efb1361259592f586a7c0424e4f6 Mon Sep 17 00:00:00 2001 From: Piyush Sardana Date: Tue, 3 Dec 2024 12:21:09 +0100 Subject: [PATCH 25/29] removed top level code and protected metrics end point --- packages/opal-client/opal_client/client.py | 89 +++++++++++- packages/opal-client/opal_client/data/api.py | 21 +-- .../opal-client/opal_client/policy/api.py | 17 +-- .../opal_client/policy_store/api.py | 42 +++--- .../opal_common/monitoring/otel_metrics.py | 16 +++ .../opal_common/monitoring/tracer.py | 16 +++ .../opal_common/monitoring/tracing_utils.py | 20 +++ .../opal_server/data/data_update_publisher.py | 97 ++++++------- .../opal_server/policy/bundles/api.py | 136 +++++++++--------- .../opal_server/policy/watcher/task.py | 9 +- packages/opal-server/opal_server/pubsub.py | 46 +++--- .../opal-server/opal_server/scopes/api.py | 75 +++++----- .../opal-server/opal_server/security/api.py | 46 +++--- packages/opal-server/opal_server/server.py | 6 +- 14 files changed, 383 insertions(+), 253 deletions(-) create mode 100644 packages/opal-common/opal_common/monitoring/otel_metrics.py create mode 100644 packages/opal-common/opal_common/monitoring/tracer.py create mode 100644 packages/opal-common/opal_common/monitoring/tracing_utils.py diff --git a/packages/opal-client/opal_client/client.py b/packages/opal-client/opal_client/client.py index 8327cb1b1..98829950c 100644 --- a/packages/opal-client/opal_client/client.py +++ b/packages/opal-client/opal_client/client.py @@ -13,7 +13,8 @@ import aiofiles.os import aiohttp import websockets -from fastapi import FastAPI, status +from fastapi import FastAPI, status, HTTPException, Security, status +from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer from fastapi.responses import JSONResponse from fastapi_websocket_pubsub.pub_sub_client import PubSubOnConnectCallback from fastapi_websocket_rpc.rpc_channel import OnDisconnectCallback @@ -33,13 +34,47 @@ from opal_client.policy_store.policy_store_client_factory import ( PolicyStoreClientFactory, ) +from opal_common.authentication.verifier import Unauthorized from opal_common.authentication.deps import JWTAuthenticator from opal_common.authentication.verifier import JWTVerifier from opal_common.config import opal_common_config from opal_common.logger import configure_logs, logger from opal_common.middleware import configure_middleware from opal_common.security.sslcontext import get_custom_ssl_context +from opal_common.monitoring.tracer import init_tracer +from opal_common.monitoring.otel_metrics import init_meter +from opentelemetry.sdk.resources import Resource +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry import trace +from opentelemetry import metrics +from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.sdk.trace.export import BatchSpanProcessor +from opentelemetry.exporter.prometheus import PrometheusMetricReader +from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter + + +security_scheme = HTTPBearer() + +def _add_metrics_route(app: FastAPI): + """Add a protected metrics endpoint to the FastAPI app.""" + if opal_common_config.ENABLE_OPENTELEMETRY_METRICS: + @app.get("/metrics") + async def protected_metrics_endpoint( + auth: HTTPAuthorizationCredentials = Security(security_scheme) + ): + """Protected metrics endpoint.""" + authenticator = JWTAuthenticator() + try: + authenticator.verify_token(auth.credentials) + data = generate_latest() + return Response(content=data, media_type=CONTENT_TYPE_LATEST) + except Unauthorized: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Invalid authentication credentials" + ) + logger.info("Mounted protected /metrics endpoint for Prometheus metrics.") class OpalClient: def __init__( @@ -88,7 +123,7 @@ def __init__( ) # set logs configure_logs() - + self._configure_monitoring() self.offline_mode_enabled = ( offline_mode_enabled or opal_client_config.OFFLINE_MODE_ENABLED ) @@ -259,6 +294,49 @@ def _init_fast_api_app(self): self._configure_lifecycle_callbacks(app) return app + def _configure_monitoring(self): + if opal_common_config.ENABLE_OPENTELEMETRY_TRACING: + self._initialize_opentelemetry_tracing() + + if opal_common_config.ENABLE_OPENTELEMETRY_METRICS: + self._initialize_opentelemetry_metrics() + + def _initialize_opentelemetry_tracing(self): + resource = Resource.create({"service.name": "opal-client"}) + tracer_provider = TracerProvider(resource=resource) + trace.set_tracer_provider(tracer_provider) + + otlp_exporter = OTLPSpanExporter( + endpoint = opal_common_config.OPENTELEMETRY_OTLP_ENDPOINT, + insecure=True + ) + + span_processor = BatchSpanProcessor(otlp_exporter) + tracer_provider.add_span_processor(span_processor) + + init_tracer(tracer_provider) + logger.info("OpenTelemetry tracing is enabled.") + + def _initialize_opentelemetry_metrics(self): + resource = Resource.create({"service.name": "opal-client"}) + self.prometheus_metric_reader = PrometheusMetricReader() + + meter_provider = MeterProvider( + resource=resource, + metric_readers=[self.prometheus_metric_reader] + ) + metrics.set_meter_provider(meter_provider) + init_meter(meter_provider) + self.meter = metrics.get_meter(__name__) + + self.startup_counter = self.meter.create_counter( + name="startup", + description="Number of times the application has started", + ) + self.startup_counter.add(1) + + logger.info("OpenTelemetry metrics are enabled.") + async def _is_ready(self): # Data loaded from file or from server return self._backup_loaded or await self.policy_store.is_ready() @@ -320,11 +398,8 @@ async def ready(): status_code=status.HTTP_503_SERVICE_UNAVAILABLE, content={"status": "unavailable"}, ) - - @app.get("/metrics", include_in_schema=False) - async def metrics(): - """Endpoint to expose Prometheus metrics.""" - return Response(generate_latest(), media_type=CONTENT_TYPE_LATEST) + + _add_metrics_route(app) return app diff --git a/packages/opal-client/opal_client/data/api.py b/packages/opal-client/opal_client/data/api.py index c39ad639f..09705ac61 100644 --- a/packages/opal-client/opal_client/data/api.py +++ b/packages/opal-client/opal_client/data/api.py @@ -3,13 +3,8 @@ from fastapi import APIRouter, HTTPException, status from opal_client.data.updater import DataUpdater from opal_common.logger import logger -from opal_common.config import opal_common_config from opentelemetry import trace - -if opal_common_config.ENABLE_OPENTELEMETRY_TRACING: - tracer = trace.get_tracer(__name__) -else: - tracer = None +from opal_common.monitoring.tracing_utils import start_span def init_data_router(data_updater: Optional[DataUpdater]): router = APIRouter() @@ -17,21 +12,13 @@ def init_data_router(data_updater: Optional[DataUpdater]): @router.post("/data-updater/trigger", status_code=status.HTTP_200_OK) async def trigger_policy_data_update(): logger.info("triggered policy data update from api") - if tracer: - with tracer.start_as_current_span("opal_client_data_update_trigger") as span: - return await _handle_policy_data_update(span) - else: - return await _handle_policy_data_update() + async with start_span("opal_client_data_update_trigger") as span: + return await _handle_policy_data_update(span) async def _handle_policy_data_update(span=None): try: if data_updater: - if tracer and span: - with tracer.start_as_current_span("opal_client_data_update_apply"): - await data_updater.get_base_policy_data( - data_fetch_reason="request from sdk" - ) - else: + async with start_span("opal_client_data_update_apply") if span else (await None): await data_updater.get_base_policy_data( data_fetch_reason="request from sdk" ) diff --git a/packages/opal-client/opal_client/policy/api.py b/packages/opal-client/opal_client/policy/api.py index 5a39dfbf7..197c049ce 100644 --- a/packages/opal-client/opal_client/policy/api.py +++ b/packages/opal-client/opal_client/policy/api.py @@ -1,14 +1,9 @@ from fastapi import APIRouter, status from opal_client.policy.updater import PolicyUpdater from opal_common.logger import logger -from opal_common.config import opal_common_config - from opentelemetry import trace +from opal_common.monitoring.tracing_utils import start_span -if opal_common_config.ENABLE_OPENTELEMETRY_TRACING: - tracer = trace.get_tracer(__name__) -else: - tracer = None def init_policy_router(policy_updater: PolicyUpdater): router = APIRouter() @@ -16,15 +11,13 @@ def init_policy_router(policy_updater: PolicyUpdater): @router.post("/policy-updater/trigger", status_code=status.HTTP_200_OK) async def trigger_policy_update(): logger.info("triggered policy update from api") - if tracer: - with tracer.start_as_current_span("opal_client_policy_update_apply") as span: - return await _handle_policy_update(span) - else: - return await _handle_policy_update() + async with start_span("opal_client_policy_update_trigger") as span: + return await _handle_policy_update(span) async def _handle_policy_update(span=None): try: - await policy_updater.trigger_update_policy(force_full_update=True) + async with start_span("opal_client_policy_update_apply", parent=span): + await policy_updater.trigger_update_policy(force_full_update=True) return {"status": "ok"} except Exception as e: logger.error(f"Error during policy update: {str(e)}") diff --git a/packages/opal-client/opal_client/policy_store/api.py b/packages/opal-client/opal_client/policy_store/api.py index d772c905a..cd1cfd0cd 100644 --- a/packages/opal-client/opal_client/policy_store/api.py +++ b/packages/opal-client/opal_client/policy_store/api.py @@ -8,31 +8,39 @@ from opal_common.logger import logger from opal_common.schemas.security import PeerType from opal_common.config import opal_common_config - +from opal_common.monitoring.tracer import get_tracer +from opal_common.monitoring.otel_metrics import get_meter from opentelemetry import metrics -if opal_common_config.ENABLE_OPENTELEMETRY_METRICS: - meter = metrics.get_meter(__name__) +_policy_store_status_metric = None - def policy_store_status_callback(observable_gauge): - auth_type = opal_client_config.POLICY_STORE_AUTH_TYPE or PolicyStoreAuth.NONE - observable_gauge.observe( - 1, - attributes={"auth_type": str(auth_type)}, +def get_policy_store_status_metric(): + global _policy_store_status_metric + if _policy_store_status_metric is None: + if not opal_common_config.ENABLE_OPENTELEMETRY_METRICS: + return None + meter = get_meter() + _policy_store_status_metric = meter.create_observable_gauge( + name="opal_client_policy_store_status", + description="Current status of the policy store authentication type", + unit="1", + callbacks=[_update_policy_store_status] ) + return _policy_store_status_metric + +def _update_policy_store_status(observer: metrics.ObservableGauge): + auth_type = opal_client_config.POLICY_STORE_AUTH_TYPE or PolicyStoreAuth.NONE + status_code = { + PolicyStoreAuth.NONE: 0, + PolicyStoreAuth.TOKEN: 1, + PolicyStoreAuth.OAUTH: 2 + }.get(auth_type, -1) + observer.observe(status_code, attributes={"auth_type": auth_type}) - policy_store_status_metric = meter.create_observable_gauge( - name="opal_client_policy_store_status", - description="Current status of the policy store authentication type", - unit="1", - callbacks=[policy_store_status_callback], - ) -else: - meter = None - policy_store_status_metric = None def init_policy_store_router(authenticator: JWTAuthenticator): router = APIRouter() + get_policy_store_status_metric() @router.get( "/policy-store/config", diff --git a/packages/opal-common/opal_common/monitoring/otel_metrics.py b/packages/opal-common/opal_common/monitoring/otel_metrics.py new file mode 100644 index 000000000..e79748dbd --- /dev/null +++ b/packages/opal-common/opal_common/monitoring/otel_metrics.py @@ -0,0 +1,16 @@ +from opentelemetry import metrics +from opentelemetry.metrics import NoOpMeter + +_meter = None + +def init_meter(meter_provider=None): + global _meter + if meter_provider is not None: + _meter = meter_provider.get_meter(__name__) + else: + _meter = metrics.get_meter(__name__) + +def get_meter(): + if _meter is None: + return NoOpMeter() + return _meter diff --git a/packages/opal-common/opal_common/monitoring/tracer.py b/packages/opal-common/opal_common/monitoring/tracer.py new file mode 100644 index 000000000..53da85bb0 --- /dev/null +++ b/packages/opal-common/opal_common/monitoring/tracer.py @@ -0,0 +1,16 @@ +from opentelemetry import trace +from opentelemetry.trace import NoOpTracer + +_tracer = None + +def init_tracer(tracer_provider=None): + global _tracer + if tracer_provider is not None: + _tracer = tracer_provider.get_tracer(__name__) + else: + _tracer = trace.get_tracer(__name__) + +def get_tracer(): + if _tracer is None: + return NoOpTracer() + return _tracer \ No newline at end of file diff --git a/packages/opal-common/opal_common/monitoring/tracing_utils.py b/packages/opal-common/opal_common/monitoring/tracing_utils.py new file mode 100644 index 000000000..832f82cec --- /dev/null +++ b/packages/opal-common/opal_common/monitoring/tracing_utils.py @@ -0,0 +1,20 @@ +from contextlib import asynccontextmanager +from typing import AsyncGenerator, Optional + +from opal_common.config import opal_common_config +from opal_common.monitoring.tracer import get_tracer +from opentelemetry.trace import Span + +@asynccontextmanager +async def start_span(name: str) -> AsyncGenerator[Optional[Span], None]: + """ + Reusable async context manager for starting a span. + Yields the span if tracing is enabled, else None. + """ + if not opal_common_config.ENABLE_OPENTELEMETRY_TRACING: + yield None + return + + tracer = get_tracer() + with tracer.start_as_current_span(name) as span: + yield span \ No newline at end of file diff --git a/packages/opal-server/opal_server/data/data_update_publisher.py b/packages/opal-server/opal_server/data/data_update_publisher.py index 0daf6b5fc..78c7d1ad6 100644 --- a/packages/opal-server/opal_server/data/data_update_publisher.py +++ b/packages/opal-server/opal_server/data/data_update_publisher.py @@ -9,11 +9,9 @@ DataUpdate, ServerDataSourceConfig, ) -from opal_common.config import opal_common_config from opal_common.topics.publisher import TopicPublisher -from opentelemetry import trace +from opal_common.monitoring.tracing_utils import start_span -tracer = trace.get_tracer(__name__) TOPIC_DELIMITER = "/" PREFIX_DELIMITER = ":" @@ -74,51 +72,54 @@ async def publish_data_updates(self, update: DataUpdate): topics (List[str]): topics (with hierarchy) to notify subscribers of update (DataUpdate): update data-source configuration for subscribers to fetch data from """ - if opal_common_config.ENABLE_OPENTELEMETRY_TRACING: - with tracer.start_as_current_span("opal_server_data_update") as span: - await self._publish_data_updates(update) - - all_topic_combos = set() - - span.set_attribute("entries_count", len(update.entries)) - # a nicer format of entries to the log - logged_entries = [ - dict( - url=entry.url, - method=entry.save_method, - path=entry.dst_path or "/", - inline_data=(entry.data is not None), - topics=entry.topics, - ) - for entry in update.entries - ] - - # Expand the topics for each event to include sub topic combos (e.g. publish 'a/b/c' as 'a' , 'a/b', and 'a/b/c') - for entry in update.entries: - topic_combos = [] - if entry.topics: - for topic in entry.topics: - topic_combos.extend(DataUpdatePublisher.get_topic_combos(topic)) - entry.topics = topic_combos # Update entry with the exhaustive list, so client won't have to expand it again - all_topic_combos.update(topic_combos) - - else: - logger.warning( - "[{pid}] No topics were provided for the following entry: {entry}", - pid=os.getpid(), - entry=entry, - ) - - # publish all topics with all their sub combinations - logger.info( - "[{pid}] Publishing data update to topics: {topics}, reason: {reason}, entries: {entries}", + async with start_span("opal_server_data_update") as span: + return await self._publish_data_updates(update, span) + + async def _publish_data_updates(self, update: DataUpdate, span=None): + """Internal method to handle data update publishing.""" + all_topic_combos = set() + + # A nicer format of entries to the log + logged_entries = [ + dict( + url=entry.url, + method=entry.save_method, + path=entry.dst_path or "/", + inline_data=(entry.data is not None), + topics=entry.topics, + ) + for entry in update.entries + ] + + # Expand the topics for each event to include subtopic combos + # (e.g., publish 'a/b/c' as 'a', 'a/b', and 'a/b/c') + for entry in update.entries: + topic_combos = [] + if entry.topics: + for topic in entry.topics: + topic_combos.extend(self.get_topic_combos(topic)) + entry.topics = topic_combos # Update entry with the exhaustive list + all_topic_combos.update(topic_combos) + else: + logger.warning( + "[{pid}] No topics were provided for the following entry: {entry}", pid=os.getpid(), - topics=all_topic_combos, - reason=update.reason, - entries=logged_entries, + entry=entry, ) - span.set_attribute("topics", list(all_topic_combos)) - await self._publisher.publish( - list(all_topic_combos), update.dict(by_alias=True) - ) + # Publish all topics with all their subcombinations + logger.info( + "[{pid}] Publishing data update to topics: {topics}, reason: {reason}, entries: {entries}", + pid=os.getpid(), + topics=all_topic_combos, + reason=update.reason, + entries=logged_entries, + ) + + if span is not None: + span.set_attribute("entries_count", len(update.entries)) + span.set_attribute("topics", list(all_topic_combos)) + + await self._publisher.publish( + list(all_topic_combos), update.dict(by_alias=True) + ) \ No newline at end of file diff --git a/packages/opal-server/opal_server/policy/bundles/api.py b/packages/opal-server/opal_server/policy/bundles/api.py index 9aa77d892..8e0afe011 100644 --- a/packages/opal-server/opal_server/policy/bundles/api.py +++ b/packages/opal-server/opal_server/policy/bundles/api.py @@ -12,30 +12,28 @@ from opal_common.logger import logger from opal_common.config import opal_common_config from opal_common.schemas.policy import PolicyBundle +from opal_common.monitoring.tracing_utils import start_span +from opal_common.monitoring.otel_metrics import get_meter from opal_server.config import opal_server_config from starlette.responses import RedirectResponse -from opentelemetry import trace, metrics -from opentelemetry.metrics import get_meter -from opentelemetry.trace import get_tracer - -if opal_common_config.ENABLE_OPENTELEMETRY_TRACING: - tracer = trace.get_tracer(__name__) -else: - tracer = None - -if opal_common_config.ENABLE_OPENTELEMETRY_METRICS: - meter = metrics.get_meter(__name__) - bundle_size_histogram = meter.create_histogram( - name="opal_server_policy_bundle_size", - description="Size of policy bundles served", - unit="files", - ) -else: - meter = None - bundle_size_histogram = None +from opentelemetry import trace router = APIRouter() +_bundle_size_histogram = None + +def get_bundle_size_histogram(): + global _bundle_size_histogram + if _bundle_size_histogram is None: + if not opal_common_config.ENABLE_OPENTELEMETRY_METRICS: + return None + meter = get_meter() + _bundle_size_histogram = meter.create_histogram( + name="opal_server_policy_bundle_size", + description="Size of policy bundles served", + unit="files", + ) + return _bundle_size_histogram async def get_repo( base_clone_path: str = None, @@ -122,62 +120,64 @@ async def get_policy( ): """Get the policy bundle from the policy repo.""" - - if tracer is not None: - with tracer.start_as_current_span("opal_server_policy_bundle_request") as span: - bundle = await process_policy_bundle(repo, input_paths, base_hash, span) - else: - bundle = await process_policy_bundle(repo, input_paths, base_hash) + async with start_span("opal_server_policy_bundle_request") as span: + bundle = await process_policy_bundle(repo, input_paths, base_hash, span) return bundle async def process_policy_bundle(repo, input_paths, base_hash, span=None): - maker = BundleMaker( - repo, - in_directories=set(input_paths), - extensions=opal_server_config.FILTER_FILE_EXTENSIONS, - root_manifest_path=opal_server_config.POLICY_REPO_MANIFEST_PATH, - bundle_ignore=opal_server_config.BUNDLE_IGNORE, - ) - # check if commit exist in the repo - revision = None - if base_hash: - try: - revision = repo.rev_parse(base_hash) - except ValueError: - logger.warning(f"base_hash {base_hash} does not exist in the repo") - - if revision is None: - bundle = maker.make_bundle(repo.head.commit) - bundle_size = bundle.calculate_size() - - if bundle_size_histogram is not None: - bundle_size_histogram.record(bundle_size, {"type": "full"}) - - if span is not None: - span.set_attribute("bundle.type", "full") - span.set_attribute("bundle.size", bundle_size) - - return bundle - else: - try: - old_commit = repo.commit(base_hash) - bundle = maker.make_diff_bundle(old_commit, repo.head.commit) + async with start_span("opal_server_policy_bundle_processing", parent=span) as processing_span: + maker = BundleMaker( + repo, + in_directories=set(input_paths), + extensions=opal_server_config.FILTER_FILE_EXTENSIONS, + root_manifest_path=opal_server_config.POLICY_REPO_MANIFEST_PATH, + bundle_ignore=opal_server_config.BUNDLE_IGNORE, + ) + # check if commit exist in the repo + revision = None + if base_hash: + try: + revision = repo.rev_parse(base_hash) + except ValueError: + logger.warning(f"base_hash {base_hash} does not exist in the repo") + + bundle_size_histogram = get_bundle_size_histogram() + + if revision is None: + bundle = maker.make_bundle(repo.head.commit) bundle_size = bundle.calculate_size() if bundle_size_histogram is not None: - bundle_size_histogram.record(bundle_size, {"type": "diff"}) + bundle_size_histogram.record(bundle_size, {"type": "full"}) - if span is not None: - span.set_attribute("bundle.type", "diff") - span.set_attribute("bundle.size", bundle_size) + if processing_span is not None: + processing_span.set_attribute("bundle.type", "full") + processing_span.set_attribute("bundle.size", bundle_size) return bundle - except ValueError: - if span is not None: - span.set_status(trace.Status(trace.StatusCode.ERROR, "Commit not found")) - span.record_exception(ValueError(f"Commit {base_hash} not found")) - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail=f"Commit with hash {base_hash} was not found in the policy repo!", - ) \ No newline at end of file + else: + try: + old_commit = repo.commit(base_hash) + bundle = maker.make_diff_bundle(old_commit, repo.head.commit) + bundle_size = bundle.calculate_size() + + if bundle_size_histogram is not None: + bundle_size_histogram.record(bundle_size, {"type": "diff"}) + + if span is not None: + processing_span.set_attribute("bundle.type", "diff") + processing_span.set_attribute("bundle.size", bundle_size) + + return bundle + except ValueError as e: + if processing_span is not None: + processing_span.set_status(trace.StatusCode.ERROR) + processing_span.set_attribute("error", True) + processing_span.record_exception(e) + processing_span.set_attribute("error_message", str(e)) + processing_span.set_attribute("base_hash", base_hash) + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Commit with hash {base_hash} was not found in the policy repo!", + ) \ No newline at end of file diff --git a/packages/opal-server/opal_server/policy/watcher/task.py b/packages/opal-server/opal_server/policy/watcher/task.py index f71769633..a8e512ebf 100644 --- a/packages/opal-server/opal_server/policy/watcher/task.py +++ b/packages/opal-server/opal_server/policy/watcher/task.py @@ -6,12 +6,11 @@ from fastapi_websocket_pubsub import Topic from fastapi_websocket_pubsub.pub_sub_server import PubSubEndpoint from opal_common.logger import logger +from opal_common.monitoring.tracing_utils import start_span from opal_common.config import opal_common_config from opal_common.sources.base_policy_source import BasePolicySource from opal_server.config import opal_server_config -from opentelemetry import trace -tracer = trace.get_tracer(__name__) class BasePolicyWatcherTask: """Manages the asyncio tasks of the policy watcher.""" @@ -127,8 +126,8 @@ async def trigger(self, topic: Topic, data: Any): """Triggers the policy watcher from outside to check for changes (git pull)""" try: - if opal_common_config.ENABLE_OPENTELEMETRY_TRACING: - with tracer.start_as_current_span("opal_server_policy_update"): - await self._watcher.check_for_changes() + async with start_span("opal_server_policy_update") as span: + span.set_attribute("topic", str(topic)) + await self._watcher.check_for_changes() except Exception as e: raise diff --git a/packages/opal-server/opal_server/pubsub.py b/packages/opal-server/opal_server/pubsub.py index 222abe26e..4f8826aed 100644 --- a/packages/opal-server/opal_server/pubsub.py +++ b/packages/opal-server/opal_server/pubsub.py @@ -28,32 +28,40 @@ from opal_common.confi.confi import load_conf_if_none from opal_common.config import opal_common_config from opal_common.logger import logger +from opal_common.monitoring.otel_metrics import get_meter from opal_server.config import opal_server_config from pydantic import BaseModel from starlette.datastructures import QueryParams -from opentelemetry import metrics OPAL_CLIENT_INFO_PARAM_PREFIX = "__opal_" OPAL_CLIENT_INFO_CLIENT_ID = f"{OPAL_CLIENT_INFO_PARAM_PREFIX}client_id" -if opal_common_config.ENABLE_OPENTELEMETRY_METRICS: - meter = metrics.get_meter(__name__) -else: - meter = None +_active_clients_counter = None +_client_data_subscriptions_counter = None -if opal_common_config.ENABLE_OPENTELEMETRY_METRICS: - active_clients_counter = meter.create_counter( - name="opal_server_active_clients", - description="Number of active clients connected to the OPAL server", - ) +def get_active_clients_counter(): + global _active_clients_counter + if _active_clients_counter is None: + if not opal_common_config.ENABLE_OPENTELEMETRY_METRICS: + return None + meter = get_meter() + _active_clients_counter = meter.create_counter( + name="opal_server_active_clients", + description="Number of active clients connected to the OPAL server", + ) + return _active_clients_counter - client_data_subscriptions_counter = meter.create_up_down_counter( - name="opal_client_data_subscriptions", - description="Number of data subscriptions per client", - ) -else: - active_clients_counter = None - client_data_subscriptions_counter = None +def get_client_data_subscriptions_counter(): + global _client_data_subscriptions_counter + if _client_data_subscriptions_counter is None: + if not opal_common_config.ENABLE_OPENTELEMETRY_METRICS: + return None + meter = get_meter() + _client_data_subscriptions_counter = meter.create_up_down_counter( + name="opal_client_data_subscriptions", + description="Number of data subscriptions per client", + ) + return _client_data_subscriptions_counter class ClientInfo(BaseModel): client_id: str @@ -98,6 +106,7 @@ def new_client( connect_time=time.time(), query_params=query_params, ) + active_clients_counter = get_active_clients_counter() if active_clients_counter is not None: source = f"{source_host}:{source_port}" if source_host and source_port else "unknown" active_clients_counter.add(1, attributes={"client_id": client_id, "source": source}) @@ -110,6 +119,7 @@ def new_client( if client_info.refcount >= 1: self._clients_by_ids[client_id] = client_info else: + active_clients_counter = get_active_clients_counter() if active_clients_counter is not None: source = f"{client_info.source_host}:{client_info.source_port}" if client_info.source_host and client_info.source_port else "unknown" active_clients_counter.add(-1, attributes={"client_id": client_id, "source": source}) @@ -127,6 +137,7 @@ async def on_subscribe( # on_subscribe is sometimes called for the broadcaster, when there is no "current client" if client_info is not None: client_info.subscribed_topics.update(topics) + client_data_subscriptions_counter = get_client_data_subscriptions_counter() if client_data_subscriptions_counter is not None: for topic in topics: client_data_subscriptions_counter.add( @@ -150,6 +161,7 @@ async def on_unsubscribe( # on_subscribe is sometimes called for the broadcaster, when there is no "current client" if client_info is not None: client_info.subscribed_topics.difference_update(topics) + client_data_subscriptions_counter = get_client_data_subscriptions_counter() if client_data_subscriptions_counter is not None: for topic in topics: client_data_subscriptions_counter.add( diff --git a/packages/opal-server/opal_server/scopes/api.py b/packages/opal-server/opal_server/scopes/api.py index 867257172..e4e738271 100644 --- a/packages/opal-server/opal_server/scopes/api.py +++ b/packages/opal-server/opal_server/scopes/api.py @@ -45,25 +45,24 @@ from opal_server.data.data_update_publisher import DataUpdatePublisher from opal_server.git_fetcher import GitPolicyFetcher from opal_server.scopes.scope_repository import ScopeNotFoundError, ScopeRepository - +from opal_common.monitoring.tracing_utils import start_span +from opal_common.monitoring.otel_metrics import get_meter from opentelemetry import trace -from opentelemetry import metrics as otel_metrics - -if opal_common_config.ENABLE_OPENTELEMETRY_TRACING: - tracer = trace.get_tracer(__name__) -else: - tracer = None - -if opal_common_config.ENABLE_OPENTELEMETRY_METRICS: - meter = otel_metrics.get_meter(__name__) - policy_bundle_size_histogram = meter.create_histogram( - name="opal_server_policy_bundle_size", - description="Size of the policy bundles served per scope", - unit="bytes", - ) -else: - meter = None - policy_bundle_size_histogram = None + +_policy_bundle_size_histogram = None + +def get_policy_bundle_size_histogram(): + global _policy_bundle_size_histogram + if _policy_bundle_size_histogram is None: + if not opal_common_config.ENABLE_OPENTELEMETRY_METRICS: + return None + meter = get_meter() + _policy_bundle_size_histogram = meter.create_histogram( + name="opal_server_policy_bundle_size", + description="Size of the policy bundles served per scope", + unit="bytes", + ) + return _policy_bundle_size_histogram def verify_private_key(private_key: str, key_format: EncryptionKeyFormat) -> bool: try: @@ -123,11 +122,8 @@ async def put_scope( scope_in: Scope, claims: JWTClaims = Depends(authenticator), ): - if tracer: - with tracer.start_as_current_span("opal_server_policy_update") as span: - span.set_attribute("scope_id", scope_in.scope_id) - return await _handle_put_scope(force_fetch, scope_in, claims) - else: + async with start_span("opal_server_policy_update") as span: + span.set_attribute("scope_id", scope_in.scope_id) return await _handle_put_scope(force_fetch, scope_in, claims) async def _handle_put_scope( @@ -274,19 +270,17 @@ async def get_scope_policy( description="hash of previous bundle already downloaded, server will return a diff bundle.", ), ): - if tracer: - with tracer.start_as_current_span("opal_server_policy_bundle_request") as span: - span.set_attribute("scope_id", scope_id) - policy_bundle = await _handle_get_scope_policy(scope_id, base_hash) - if policy_bundle_size_histogram and policy_bundle.bundle: - bundle_size = policy_bundle.calculate_size() - policy_bundle_size_histogram.record( - bundle_size, - attributes={"scope_id": scope_id}, - ) - return policy_bundle - else: - return await _handle_get_scope_policy(scope_id, base_hash) + async with start_span("opal_server_policy_bundle_request") as span: + span.set_attribute("scope_id", scope_id) + policy_bundle = await _handle_get_scope_policy(scope_id, base_hash) + policy_bundle_size_histogram = get_policy_bundle_size_histogram() + if policy_bundle_size_histogram and policy_bundle.bundle: + bundle_size = policy_bundle.calculate_size() + policy_bundle_size_histogram.record( + bundle_size, + attributes={"scope_id": scope_id}, + ) + return policy_bundle async def _handle_get_scope_policy(scope_id: str, base_hash: Optional[str]): try: @@ -383,12 +377,9 @@ async def publish_data_update_event( claims: JWTClaims = Depends(authenticator), scope_id: str = Path(..., description="Scope ID"), ): - if tracer: - with tracer.start_as_current_span("opal_server_data_update") as span: - span.set_attribute("scope_id", scope_id) - await _handle_publish_data_update_event(update, claims, scope_id, span) - else: - await _handle_publish_data_update_event(update, claims, scope_id) + async with start_span("opal_server_data_update") as span: + span.set_attribute("scope_id", scope_id) + await _handle_publish_data_update_event(update, claims, scope_id, span) async def _handle_publish_data_update_event( update: DataUpdate, diff --git a/packages/opal-server/opal_server/security/api.py b/packages/opal-server/opal_server/security/api.py index 8a1ccbca8..077bad7fb 100644 --- a/packages/opal-server/opal_server/security/api.py +++ b/packages/opal-server/opal_server/security/api.py @@ -6,27 +6,34 @@ from opal_common.logger import logger from opal_common.config import opal_common_config from opal_common.schemas.security import AccessToken, AccessTokenRequest, TokenDetails -from opentelemetry import metrics +from opal_common.monitoring.otel_metrics import get_meter -if opal_common_config.ENABLE_OPENTELEMETRY_METRICS: - meter = metrics.get_meter(__name__) -else: - meter = None +_token_requested_counter = None +_token_generated_counter = None -if opal_common_config.ENABLE_OPENTELEMETRY_METRICS: - token_requested_counter = meter.create_counter( - name="opal_server_token_requested", - description="Number of token requests" - ) - - token_generated_counter = meter.create_up_down_counter( - name="opal_client_token_generated", - description="Number of tokens generated" - ) -else: - token_requested_counter = None - token_generated_counter = None +def get_token_requested_counter(): + global _token_requested_counter + if _token_requested_counter is None: + if not opal_common_config.ENABLE_OPENTELEMETRY_METRICS: + return None + meter = get_meter() + _token_requested_counter = meter.create_counter( + name="opal_server_token_requested", + description="Number of token requests" + ) + return _token_requested_counter +def get_token_generated_counter(): + global _token_generated_counter + if _token_generated_counter is None: + if not opal_common_config.ENABLE_OPENTELEMETRY_METRICS: + return None + meter = get_meter() + _token_generated_counter = meter.create_up_down_counter( + name="opal_client_token_generated", + description="Number of tokens generated" + ) + return _token_generated_counter def init_security_router(signer: JWTSigner, authenticator: StaticBearerAuthenticator): router = APIRouter() @@ -38,6 +45,7 @@ def init_security_router(signer: JWTSigner, authenticator: StaticBearerAuthentic dependencies=[Depends(authenticator)], ) async def generate_new_access_token(req: AccessTokenRequest): + token_requested_counter = get_token_requested_counter() if token_requested_counter is not None: token_requested_counter.add(1, attributes={ 'token_type': req.type.value, @@ -59,7 +67,7 @@ async def generate_new_access_token(req: AccessTokenRequest): claims = {"peer_type": req.type.value, **req.claims} token = signer.sign(sub=req.id, token_lifetime=req.ttl, custom_claims=claims) logger.info(f"Generated opal token: peer_type={req.type.value}") - + token_generated_counter = get_token_generated_counter() if token_generated_counter is not None: token_generated_counter.add(1, attributes={ 'peer_type': req.type.value, diff --git a/packages/opal-server/opal_server/server.py b/packages/opal-server/opal_server/server.py index 3f7b4dcef..ae38faeeb 100644 --- a/packages/opal-server/opal_server/server.py +++ b/packages/opal-server/opal_server/server.py @@ -8,6 +8,7 @@ from fastapi import Depends, FastAPI from fastapi_websocket_pubsub.event_broadcaster import EventBroadcasterContextManager +from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer from prometheus_client import generate_latest, CONTENT_TYPE_LATEST from fastapi.responses import Response from opal_common.authentication.deps import JWTAuthenticator, StaticBearerAuthenticator @@ -25,6 +26,8 @@ ServerSideTopicPublisher, TopicPublisher, ) +from opal_common.monitoring.tracer import init_tracer +from opal_common.monitoring.otel_metrics import init_meter from opal_server.config import opal_server_config from opal_server.data.api import init_data_updates_router from opal_server.data.data_update_publisher import DataUpdatePublisher @@ -263,7 +266,7 @@ def _initialize_opentelemetry_tracing(self): span_processor = BatchSpanProcessor(otlp_exporter) tracer_provider.add_span_processor(span_processor) - self.tracer = trace.get_tracer(__name__) + init_tracer(tracer_provider) logger.info("OpenTelemetry tracing is enabled.") def _initialize_opentelemetry_metrics(self): @@ -275,6 +278,7 @@ def _initialize_opentelemetry_metrics(self): metric_readers=[self.prometheus_metric_reader] ) metrics.set_meter_provider(meter_provider) + init_meter(meter_provider) self.meter = metrics.get_meter(__name__) self.startup_counter = self.meter.create_counter( From 7f2fab055e53cf4496a4b70eecdbc1d9d79cbd7c Mon Sep 17 00:00:00 2001 From: Piyush Sardana Date: Wed, 4 Dec 2024 06:17:51 +0100 Subject: [PATCH 26/29] fixes for tracing spans --- .../opal-common/opal_common/monitoring/tracing_utils.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/packages/opal-common/opal_common/monitoring/tracing_utils.py b/packages/opal-common/opal_common/monitoring/tracing_utils.py index 832f82cec..7b7b7b014 100644 --- a/packages/opal-common/opal_common/monitoring/tracing_utils.py +++ b/packages/opal-common/opal_common/monitoring/tracing_utils.py @@ -1,12 +1,12 @@ from contextlib import asynccontextmanager from typing import AsyncGenerator, Optional - +from opentelemetry.trace import Span, set_span_in_context from opal_common.config import opal_common_config from opal_common.monitoring.tracer import get_tracer from opentelemetry.trace import Span @asynccontextmanager -async def start_span(name: str) -> AsyncGenerator[Optional[Span], None]: +async def start_span(name: str, parent: Optional[Span] = None) -> AsyncGenerator[Optional[Span], None]: """ Reusable async context manager for starting a span. Yields the span if tracing is enabled, else None. @@ -16,5 +16,6 @@ async def start_span(name: str) -> AsyncGenerator[Optional[Span], None]: return tracer = get_tracer() - with tracer.start_as_current_span(name) as span: + parent_context = set_span_in_context(parent) if parent else None + with tracer.start_as_current_span(name, context=parent_context) as span: yield span \ No newline at end of file From 191cebf0c1319aab344d89506e15e490b9094820 Mon Sep 17 00:00:00 2001 From: Piyush Sardana Date: Wed, 4 Dec 2024 06:25:51 +0100 Subject: [PATCH 27/29] fix metrics end point --- packages/opal-client/opal_client/client.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/packages/opal-client/opal_client/client.py b/packages/opal-client/opal_client/client.py index 98829950c..ec1ad2846 100644 --- a/packages/opal-client/opal_client/client.py +++ b/packages/opal-client/opal_client/client.py @@ -55,7 +55,7 @@ security_scheme = HTTPBearer() -def _add_metrics_route(app: FastAPI): +def _add_metrics_route(app: FastAPI, authenticator: JWTAuthenticator): """Add a protected metrics endpoint to the FastAPI app.""" if opal_common_config.ENABLE_OPENTELEMETRY_METRICS: @app.get("/metrics") @@ -63,9 +63,8 @@ async def protected_metrics_endpoint( auth: HTTPAuthorizationCredentials = Security(security_scheme) ): """Protected metrics endpoint.""" - authenticator = JWTAuthenticator() try: - authenticator.verify_token(auth.credentials) + claims = authenticator(auth.credentials) data = generate_latest() return Response(content=data, media_type=CONTENT_TYPE_LATEST) except Unauthorized: @@ -399,7 +398,7 @@ async def ready(): content={"status": "unavailable"}, ) - _add_metrics_route(app) + _add_metrics_route(app, authenticator) return app From ea0c770b88b13a9426bf83802fd19408ae08d5df Mon Sep 17 00:00:00 2001 From: Piyush Sardana Date: Wed, 4 Dec 2024 15:34:04 +0100 Subject: [PATCH 28/29] fixed docker compose and removed logging exporter from otel --- docker/docker_files/otel-collector-config.yaml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docker/docker_files/otel-collector-config.yaml b/docker/docker_files/otel-collector-config.yaml index 282cfed27..945a7f9a8 100644 --- a/docker/docker_files/otel-collector-config.yaml +++ b/docker/docker_files/otel-collector-config.yaml @@ -7,8 +7,8 @@ receivers: exporters: prometheus: endpoint: "0.0.0.0:8888" - logging: - logLevel: debug + debug: + verbosity: detailed processors: batch: @@ -18,7 +18,7 @@ service: traces: receivers: [otlp] processors: [batch] - exporters: [logging] + exporters: [debug] metrics: receivers: [otlp] processors: [batch] From 4b24dc08a9e04252222b066b0ee2b1d3fce4f933 Mon Sep 17 00:00:00 2001 From: Dan Yishai Date: Wed, 11 Dec 2024 15:02:12 +0200 Subject: [PATCH 29/29] Fixed pre-commit --- .../docs/tutorials/monitoring_opal.mdx | 4 +- packages/opal-client/opal_client/client.py | 37 +++++---- packages/opal-client/opal_client/data/api.py | 10 ++- .../opal-client/opal_client/policy/api.py | 2 +- .../opal_client/policy_store/api.py | 59 +++++++------- packages/opal-client/requires.txt | 2 +- packages/opal-common/opal_common/config.py | 4 +- .../opal_common/monitoring/otel_metrics.py | 2 + .../opal_common/monitoring/tracer.py | 4 +- .../opal_common/monitoring/tracing_utils.py | 15 ++-- .../opal_server/data/data_update_publisher.py | 5 +- .../opal_server/policy/bundles/api.py | 17 +++-- .../opal_server/policy/watcher/task.py | 1 - packages/opal-server/opal_server/pubsub.py | 33 +++++--- .../opal-server/opal_server/scopes/api.py | 9 ++- .../opal-server/opal_server/security/api.py | 76 ++++++++++--------- packages/opal-server/opal_server/server.py | 33 ++++---- packages/opal-server/requires.txt | 2 +- requirements.txt | 2 +- 19 files changed, 170 insertions(+), 147 deletions(-) diff --git a/documentation/docs/tutorials/monitoring_opal.mdx b/documentation/docs/tutorials/monitoring_opal.mdx index 2ea14ee4f..0aaa989fe 100644 --- a/documentation/docs/tutorials/monitoring_opal.mdx +++ b/documentation/docs/tutorials/monitoring_opal.mdx @@ -86,7 +86,7 @@ Below is a list of the available metrics and traces in OPAL, along with their ty ##### 1) `opal_server_data_update` - **Type**: Trace - **Description**: Represents a data update operation in the OPAL server. This trace spans the process of publishing data updates to clients. -- **Attributes**: +- **Attributes**: - `topics_count`: Number of topics involved in the data update. - `entries_count`: Number of data update entries. - Additional attributes related to errors or execution time. @@ -167,4 +167,4 @@ Run the following command to start Prometheus and Grafana: docker compose -f docker/docker-compose-with-prometheus-and-otel.yml up ``` -This setup will start Prometheus to scrape metrics from OPAL server and client, and Grafana to visualize the metrics. \ No newline at end of file +This setup will start Prometheus to scrape metrics from OPAL server and client, and Grafana to visualize the metrics. diff --git a/packages/opal-client/opal_client/client.py b/packages/opal-client/opal_client/client.py index ec1ad2846..03c7f9e25 100644 --- a/packages/opal-client/opal_client/client.py +++ b/packages/opal-client/opal_client/client.py @@ -6,16 +6,14 @@ import uuid from logging import disable from typing import Awaitable, Callable, List, Literal, Optional, Union -from prometheus_client import CONTENT_TYPE_LATEST, generate_latest -from fastapi.responses import Response import aiofiles import aiofiles.os import aiohttp import websockets -from fastapi import FastAPI, status, HTTPException, Security, status +from fastapi import FastAPI, HTTPException, Security, status +from fastapi.responses import JSONResponse, Response from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer -from fastapi.responses import JSONResponse from fastapi_websocket_pubsub.pub_sub_client import PubSubOnConnectCallback from fastapi_websocket_rpc.rpc_channel import OnDisconnectCallback from opal_client.callbacks.api import init_callbacks_api @@ -34,33 +32,33 @@ from opal_client.policy_store.policy_store_client_factory import ( PolicyStoreClientFactory, ) -from opal_common.authentication.verifier import Unauthorized from opal_common.authentication.deps import JWTAuthenticator -from opal_common.authentication.verifier import JWTVerifier +from opal_common.authentication.verifier import JWTVerifier, Unauthorized from opal_common.config import opal_common_config from opal_common.logger import configure_logs, logger from opal_common.middleware import configure_middleware -from opal_common.security.sslcontext import get_custom_ssl_context -from opal_common.monitoring.tracer import init_tracer from opal_common.monitoring.otel_metrics import init_meter +from opal_common.monitoring.tracer import init_tracer +from opal_common.security.sslcontext import get_custom_ssl_context +from opentelemetry import metrics, trace +from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter +from opentelemetry.exporter.prometheus import PrometheusMetricReader +from opentelemetry.sdk.metrics import MeterProvider from opentelemetry.sdk.resources import Resource from opentelemetry.sdk.trace import TracerProvider -from opentelemetry import trace -from opentelemetry import metrics -from opentelemetry.sdk.metrics import MeterProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor -from opentelemetry.exporter.prometheus import PrometheusMetricReader -from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter - +from prometheus_client import CONTENT_TYPE_LATEST, generate_latest security_scheme = HTTPBearer() + def _add_metrics_route(app: FastAPI, authenticator: JWTAuthenticator): """Add a protected metrics endpoint to the FastAPI app.""" if opal_common_config.ENABLE_OPENTELEMETRY_METRICS: + @app.get("/metrics") async def protected_metrics_endpoint( - auth: HTTPAuthorizationCredentials = Security(security_scheme) + auth: HTTPAuthorizationCredentials = Security(security_scheme), ): """Protected metrics endpoint.""" try: @@ -70,11 +68,12 @@ async def protected_metrics_endpoint( except Unauthorized: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, - detail="Invalid authentication credentials" + detail="Invalid authentication credentials", ) logger.info("Mounted protected /metrics endpoint for Prometheus metrics.") + class OpalClient: def __init__( self, @@ -306,8 +305,7 @@ def _initialize_opentelemetry_tracing(self): trace.set_tracer_provider(tracer_provider) otlp_exporter = OTLPSpanExporter( - endpoint = opal_common_config.OPENTELEMETRY_OTLP_ENDPOINT, - insecure=True + endpoint=opal_common_config.OPENTELEMETRY_OTLP_ENDPOINT, insecure=True ) span_processor = BatchSpanProcessor(otlp_exporter) @@ -321,8 +319,7 @@ def _initialize_opentelemetry_metrics(self): self.prometheus_metric_reader = PrometheusMetricReader() meter_provider = MeterProvider( - resource=resource, - metric_readers=[self.prometheus_metric_reader] + resource=resource, metric_readers=[self.prometheus_metric_reader] ) metrics.set_meter_provider(meter_provider) init_meter(meter_provider) diff --git a/packages/opal-client/opal_client/data/api.py b/packages/opal-client/opal_client/data/api.py index 09705ac61..335de62f2 100644 --- a/packages/opal-client/opal_client/data/api.py +++ b/packages/opal-client/opal_client/data/api.py @@ -3,8 +3,9 @@ from fastapi import APIRouter, HTTPException, status from opal_client.data.updater import DataUpdater from opal_common.logger import logger -from opentelemetry import trace from opal_common.monitoring.tracing_utils import start_span +from opentelemetry import trace + def init_data_router(data_updater: Optional[DataUpdater]): router = APIRouter() @@ -18,7 +19,9 @@ async def trigger_policy_data_update(): async def _handle_policy_data_update(span=None): try: if data_updater: - async with start_span("opal_client_data_update_apply") if span else (await None): + async with start_span("opal_client_data_update_apply") if span else ( + await None + ): await data_updater.get_base_policy_data( data_fetch_reason="request from sdk" ) @@ -40,6 +43,7 @@ async def _handle_policy_data_update(span=None): span.record_exception(e) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail="Failed to update data" + detail="Failed to update data", ) + return router diff --git a/packages/opal-client/opal_client/policy/api.py b/packages/opal-client/opal_client/policy/api.py index 197c049ce..84ae8ac4b 100644 --- a/packages/opal-client/opal_client/policy/api.py +++ b/packages/opal-client/opal_client/policy/api.py @@ -1,8 +1,8 @@ from fastapi import APIRouter, status from opal_client.policy.updater import PolicyUpdater from opal_common.logger import logger -from opentelemetry import trace from opal_common.monitoring.tracing_utils import start_span +from opentelemetry import trace def init_policy_router(policy_updater: PolicyUpdater): diff --git a/packages/opal-client/opal_client/policy_store/api.py b/packages/opal-client/opal_client/policy_store/api.py index cd1cfd0cd..e37f4db38 100644 --- a/packages/opal-client/opal_client/policy_store/api.py +++ b/packages/opal-client/opal_client/policy_store/api.py @@ -5,15 +5,15 @@ from opal_common.authentication.deps import JWTAuthenticator from opal_common.authentication.types import JWTClaims from opal_common.authentication.verifier import Unauthorized -from opal_common.logger import logger -from opal_common.schemas.security import PeerType from opal_common.config import opal_common_config -from opal_common.monitoring.tracer import get_tracer +from opal_common.logger import logger from opal_common.monitoring.otel_metrics import get_meter +from opal_common.schemas.security import PeerType from opentelemetry import metrics _policy_store_status_metric = None + def get_policy_store_status_metric(): global _policy_store_status_metric if _policy_store_status_metric is None: @@ -24,16 +24,17 @@ def get_policy_store_status_metric(): name="opal_client_policy_store_status", description="Current status of the policy store authentication type", unit="1", - callbacks=[_update_policy_store_status] + callbacks=[_update_policy_store_status], ) return _policy_store_status_metric + def _update_policy_store_status(observer: metrics.ObservableGauge): auth_type = opal_client_config.POLICY_STORE_AUTH_TYPE or PolicyStoreAuth.NONE status_code = { PolicyStoreAuth.NONE: 0, PolicyStoreAuth.TOKEN: 1, - PolicyStoreAuth.OAUTH: 2 + PolicyStoreAuth.OAUTH: 2, }.get(auth_type, -1) observer.observe(status_code, attributes={"auth_type": auth_type}) @@ -50,32 +51,32 @@ def init_policy_store_router(authenticator: JWTAuthenticator): deprecated=True, ) async def get_policy_store_details(claims: JWTClaims = Depends(authenticator)): - try: - require_peer_type( - authenticator, claims, PeerType.listener - ) # may throw Unauthorized - except Unauthorized as e: - logger.error(f"Unauthorized to publish update: {repr(e)}") - raise + try: + require_peer_type( + authenticator, claims, PeerType.listener + ) # may throw Unauthorized + except Unauthorized as e: + logger.error(f"Unauthorized to publish update: {repr(e)}") + raise - token = None - oauth_client_secret = None - if not opal_client_config.EXCLUDE_POLICY_STORE_SECRETS: - token = opal_client_config.POLICY_STORE_AUTH_TOKEN - oauth_client_secret = ( - opal_client_config.POLICY_STORE_AUTH_OAUTH_CLIENT_SECRET - ) + token = None + oauth_client_secret = None + if not opal_client_config.EXCLUDE_POLICY_STORE_SECRETS: + token = opal_client_config.POLICY_STORE_AUTH_TOKEN + oauth_client_secret = ( + opal_client_config.POLICY_STORE_AUTH_OAUTH_CLIENT_SECRET + ) - auth_type = opal_client_config.POLICY_STORE_AUTH_TYPE or PolicyStoreAuth.NONE + auth_type = opal_client_config.POLICY_STORE_AUTH_TYPE or PolicyStoreAuth.NONE - return PolicyStoreDetails( - url=opal_client_config.POLICY_STORE_URL, - token=token or None, - auth_type=auth_type, - oauth_client_id=opal_client_config.POLICY_STORE_AUTH_OAUTH_CLIENT_ID - or None, - oauth_client_secret=oauth_client_secret or None, - oauth_server=opal_client_config.POLICY_STORE_AUTH_OAUTH_SERVER or None, - ) + return PolicyStoreDetails( + url=opal_client_config.POLICY_STORE_URL, + token=token or None, + auth_type=auth_type, + oauth_client_id=opal_client_config.POLICY_STORE_AUTH_OAUTH_CLIENT_ID + or None, + oauth_client_secret=oauth_client_secret or None, + oauth_server=opal_client_config.POLICY_STORE_AUTH_OAUTH_SERVER or None, + ) return router diff --git a/packages/opal-client/requires.txt b/packages/opal-client/requires.txt index 84e70fbd3..4acb85cb6 100644 --- a/packages/opal-client/requires.txt +++ b/packages/opal-client/requires.txt @@ -10,4 +10,4 @@ opentelemetry-sdk>=1.28.2 opentelemetry-instrumentation opentelemetry-instrumentation-fastapi opentelemetry-exporter-otlp -opentelemetry-exporter-prometheus \ No newline at end of file +opentelemetry-exporter-prometheus diff --git a/packages/opal-common/opal_common/config.py b/packages/opal-common/opal_common/config.py index 9276b7b57..d83dc8394 100644 --- a/packages/opal-common/opal_common/config.py +++ b/packages/opal-common/opal_common/config.py @@ -196,12 +196,12 @@ class OpalCommonConfig(Confi): "ENABLE_METRICS", False, description="Enable metrics collection" ) - ENABLE_OPENTELEMETRY_TRACING = confi.bool ( + ENABLE_OPENTELEMETRY_TRACING = confi.bool( "ENABLE_OPENTELEMETRY_TRACING", False, description="Set if OPAL should enable tracing with OpenTelemetry", ) - ENABLE_OPENTELEMETRY_METRICS = confi.bool ( + ENABLE_OPENTELEMETRY_METRICS = confi.bool( "ENABLE_OPENTELEMETRY_METRICS", False, description="Set if OPAL should enable metrics with OpenTelemetry", diff --git a/packages/opal-common/opal_common/monitoring/otel_metrics.py b/packages/opal-common/opal_common/monitoring/otel_metrics.py index e79748dbd..44452dbc5 100644 --- a/packages/opal-common/opal_common/monitoring/otel_metrics.py +++ b/packages/opal-common/opal_common/monitoring/otel_metrics.py @@ -3,6 +3,7 @@ _meter = None + def init_meter(meter_provider=None): global _meter if meter_provider is not None: @@ -10,6 +11,7 @@ def init_meter(meter_provider=None): else: _meter = metrics.get_meter(__name__) + def get_meter(): if _meter is None: return NoOpMeter() diff --git a/packages/opal-common/opal_common/monitoring/tracer.py b/packages/opal-common/opal_common/monitoring/tracer.py index 53da85bb0..35069204f 100644 --- a/packages/opal-common/opal_common/monitoring/tracer.py +++ b/packages/opal-common/opal_common/monitoring/tracer.py @@ -3,6 +3,7 @@ _tracer = None + def init_tracer(tracer_provider=None): global _tracer if tracer_provider is not None: @@ -10,7 +11,8 @@ def init_tracer(tracer_provider=None): else: _tracer = trace.get_tracer(__name__) + def get_tracer(): if _tracer is None: return NoOpTracer() - return _tracer \ No newline at end of file + return _tracer diff --git a/packages/opal-common/opal_common/monitoring/tracing_utils.py b/packages/opal-common/opal_common/monitoring/tracing_utils.py index 7b7b7b014..565589eea 100644 --- a/packages/opal-common/opal_common/monitoring/tracing_utils.py +++ b/packages/opal-common/opal_common/monitoring/tracing_utils.py @@ -1,14 +1,17 @@ from contextlib import asynccontextmanager from typing import AsyncGenerator, Optional -from opentelemetry.trace import Span, set_span_in_context + from opal_common.config import opal_common_config from opal_common.monitoring.tracer import get_tracer -from opentelemetry.trace import Span +from opentelemetry.trace import Span, set_span_in_context + @asynccontextmanager -async def start_span(name: str, parent: Optional[Span] = None) -> AsyncGenerator[Optional[Span], None]: - """ - Reusable async context manager for starting a span. +async def start_span( + name: str, parent: Optional[Span] = None +) -> AsyncGenerator[Optional[Span], None]: + """Reusable async context manager for starting a span. + Yields the span if tracing is enabled, else None. """ if not opal_common_config.ENABLE_OPENTELEMETRY_TRACING: @@ -18,4 +21,4 @@ async def start_span(name: str, parent: Optional[Span] = None) -> AsyncGenerator tracer = get_tracer() parent_context = set_span_in_context(parent) if parent else None with tracer.start_as_current_span(name, context=parent_context) as span: - yield span \ No newline at end of file + yield span diff --git a/packages/opal-server/opal_server/data/data_update_publisher.py b/packages/opal-server/opal_server/data/data_update_publisher.py index 78c7d1ad6..f272a5d51 100644 --- a/packages/opal-server/opal_server/data/data_update_publisher.py +++ b/packages/opal-server/opal_server/data/data_update_publisher.py @@ -4,14 +4,13 @@ from fastapi_utils.tasks import repeat_every from opal_common.logger import logger +from opal_common.monitoring.tracing_utils import start_span from opal_common.schemas.data import ( DataSourceEntryWithPollingInterval, DataUpdate, ServerDataSourceConfig, ) from opal_common.topics.publisher import TopicPublisher -from opal_common.monitoring.tracing_utils import start_span - TOPIC_DELIMITER = "/" PREFIX_DELIMITER = ":" @@ -122,4 +121,4 @@ async def _publish_data_updates(self, update: DataUpdate, span=None): await self._publisher.publish( list(all_topic_combos), update.dict(by_alias=True) - ) \ No newline at end of file + ) diff --git a/packages/opal-server/opal_server/policy/bundles/api.py b/packages/opal-server/opal_server/policy/bundles/api.py index 8e0afe011..895e08da7 100644 --- a/packages/opal-server/opal_server/policy/bundles/api.py +++ b/packages/opal-server/opal_server/policy/bundles/api.py @@ -6,22 +6,23 @@ from fastapi import APIRouter, Depends, Header, HTTPException, Query, Response, status from git.repo import Repo from opal_common.confi.confi import load_conf_if_none +from opal_common.config import opal_common_config from opal_common.git_utils.bundle_maker import BundleMaker from opal_common.git_utils.commit_viewer import CommitViewer from opal_common.git_utils.repo_cloner import RepoClonePathFinder from opal_common.logger import logger -from opal_common.config import opal_common_config -from opal_common.schemas.policy import PolicyBundle -from opal_common.monitoring.tracing_utils import start_span from opal_common.monitoring.otel_metrics import get_meter +from opal_common.monitoring.tracing_utils import start_span +from opal_common.schemas.policy import PolicyBundle from opal_server.config import opal_server_config -from starlette.responses import RedirectResponse from opentelemetry import trace +from starlette.responses import RedirectResponse router = APIRouter() _bundle_size_histogram = None + def get_bundle_size_histogram(): global _bundle_size_histogram if _bundle_size_histogram is None: @@ -35,6 +36,7 @@ def get_bundle_size_histogram(): ) return _bundle_size_histogram + async def get_repo( base_clone_path: str = None, clone_subdirectory_prefix: str = None, @@ -125,8 +127,11 @@ async def get_policy( return bundle + async def process_policy_bundle(repo, input_paths, base_hash, span=None): - async with start_span("opal_server_policy_bundle_processing", parent=span) as processing_span: + async with start_span( + "opal_server_policy_bundle_processing", parent=span + ) as processing_span: maker = BundleMaker( repo, in_directories=set(input_paths), @@ -180,4 +185,4 @@ async def process_policy_bundle(repo, input_paths, base_hash, span=None): raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Commit with hash {base_hash} was not found in the policy repo!", - ) \ No newline at end of file + ) diff --git a/packages/opal-server/opal_server/policy/watcher/task.py b/packages/opal-server/opal_server/policy/watcher/task.py index a8e512ebf..264b81950 100644 --- a/packages/opal-server/opal_server/policy/watcher/task.py +++ b/packages/opal-server/opal_server/policy/watcher/task.py @@ -7,7 +7,6 @@ from fastapi_websocket_pubsub.pub_sub_server import PubSubEndpoint from opal_common.logger import logger from opal_common.monitoring.tracing_utils import start_span -from opal_common.config import opal_common_config from opal_common.sources.base_policy_source import BasePolicySource from opal_server.config import opal_server_config diff --git a/packages/opal-server/opal_server/pubsub.py b/packages/opal-server/opal_server/pubsub.py index 4f8826aed..5a51f2814 100644 --- a/packages/opal-server/opal_server/pubsub.py +++ b/packages/opal-server/opal_server/pubsub.py @@ -39,6 +39,7 @@ _active_clients_counter = None _client_data_subscriptions_counter = None + def get_active_clients_counter(): global _active_clients_counter if _active_clients_counter is None: @@ -51,6 +52,7 @@ def get_active_clients_counter(): ) return _active_clients_counter + def get_client_data_subscriptions_counter(): global _client_data_subscriptions_counter if _client_data_subscriptions_counter is None: @@ -63,6 +65,7 @@ def get_client_data_subscriptions_counter(): ) return _client_data_subscriptions_counter + class ClientInfo(BaseModel): client_id: str source_host: Optional[str] @@ -108,8 +111,14 @@ def new_client( ) active_clients_counter = get_active_clients_counter() if active_clients_counter is not None: - source = f"{source_host}:{source_port}" if source_host and source_port else "unknown" - active_clients_counter.add(1, attributes={"client_id": client_id, "source": source}) + source = ( + f"{source_host}:{source_port}" + if source_host and source_port + else "unknown" + ) + active_clients_counter.add( + 1, attributes={"client_id": client_id, "source": source} + ) client_info.refcount += 1 self._clients_by_ids[client_id] = client_info yield client_info @@ -121,8 +130,14 @@ def new_client( else: active_clients_counter = get_active_clients_counter() if active_clients_counter is not None: - source = f"{client_info.source_host}:{client_info.source_port}" if client_info.source_host and client_info.source_port else "unknown" - active_clients_counter.add(-1, attributes={"client_id": client_id, "source": source}) + source = ( + f"{client_info.source_host}:{client_info.source_port}" + if client_info.source_host and client_info.source_port + else "unknown" + ) + active_clients_counter.add( + -1, attributes={"client_id": client_id, "source": source} + ) async def on_subscribe( self, @@ -142,10 +157,7 @@ async def on_subscribe( for topic in topics: client_data_subscriptions_counter.add( 1, - attributes={ - "client_id": client_info.client_id, - "topic": topic - } + attributes={"client_id": client_info.client_id, "topic": topic}, ) async def on_unsubscribe( @@ -166,10 +178,7 @@ async def on_unsubscribe( for topic in topics: client_data_subscriptions_counter.add( -1, - attributes={ - "client_id": client_info.client_id, - "topic": topic - } + attributes={"client_id": client_info.client_id, "topic": topic}, ) diff --git a/packages/opal-server/opal_server/scopes/api.py b/packages/opal-server/opal_server/scopes/api.py index e4e738271..7b8ee916f 100644 --- a/packages/opal-server/opal_server/scopes/api.py +++ b/packages/opal-server/opal_server/scopes/api.py @@ -24,9 +24,11 @@ from opal_common.authentication.deps import JWTAuthenticator, get_token_from_header from opal_common.authentication.types import EncryptionKeyFormat, JWTClaims from opal_common.authentication.verifier import Unauthorized -from opal_common.logger import logger from opal_common.config import opal_common_config +from opal_common.logger import logger from opal_common.monitoring import metrics +from opal_common.monitoring.otel_metrics import get_meter +from opal_common.monitoring.tracing_utils import start_span from opal_common.schemas.data import ( DataSourceConfig, DataUpdate, @@ -45,12 +47,11 @@ from opal_server.data.data_update_publisher import DataUpdatePublisher from opal_server.git_fetcher import GitPolicyFetcher from opal_server.scopes.scope_repository import ScopeNotFoundError, ScopeRepository -from opal_common.monitoring.tracing_utils import start_span -from opal_common.monitoring.otel_metrics import get_meter from opentelemetry import trace _policy_bundle_size_histogram = None + def get_policy_bundle_size_histogram(): global _policy_bundle_size_histogram if _policy_bundle_size_histogram is None: @@ -64,6 +65,7 @@ def get_policy_bundle_size_histogram(): ) return _policy_bundle_size_histogram + def verify_private_key(private_key: str, key_format: EncryptionKeyFormat) -> bool: try: key = cast_private_key(private_key, key_format=key_format) @@ -401,7 +403,6 @@ async def _handle_publish_data_update_event( span.set_attribute("entries_count", len(update.entries)) span.set_attribute("topics", list(all_topics)) - await DataUpdatePublisher( ScopedServerSideTopicPublisher(pubsub_endpoint, scope_id) ).publish_data_updates(update) diff --git a/packages/opal-server/opal_server/security/api.py b/packages/opal-server/opal_server/security/api.py index 077bad7fb..bd96315de 100644 --- a/packages/opal-server/opal_server/security/api.py +++ b/packages/opal-server/opal_server/security/api.py @@ -3,14 +3,15 @@ from fastapi import APIRouter, Depends, HTTPException, status from opal_common.authentication.deps import StaticBearerAuthenticator from opal_common.authentication.signer import JWTSigner -from opal_common.logger import logger from opal_common.config import opal_common_config -from opal_common.schemas.security import AccessToken, AccessTokenRequest, TokenDetails +from opal_common.logger import logger from opal_common.monitoring.otel_metrics import get_meter +from opal_common.schemas.security import AccessToken, AccessTokenRequest, TokenDetails _token_requested_counter = None _token_generated_counter = None + def get_token_requested_counter(): global _token_requested_counter if _token_requested_counter is None: @@ -18,11 +19,11 @@ def get_token_requested_counter(): return None meter = get_meter() _token_requested_counter = meter.create_counter( - name="opal_server_token_requested", - description="Number of token requests" + name="opal_server_token_requested", description="Number of token requests" ) return _token_requested_counter + def get_token_generated_counter(): global _token_generated_counter if _token_generated_counter is None: @@ -30,11 +31,11 @@ def get_token_generated_counter(): return None meter = get_meter() _token_generated_counter = meter.create_up_down_counter( - name="opal_client_token_generated", - description="Number of tokens generated" + name="opal_client_token_generated", description="Number of tokens generated" ) return _token_generated_counter + def init_security_router(signer: JWTSigner, authenticator: StaticBearerAuthenticator): router = APIRouter() @@ -47,17 +48,15 @@ def init_security_router(signer: JWTSigner, authenticator: StaticBearerAuthentic async def generate_new_access_token(req: AccessTokenRequest): token_requested_counter = get_token_requested_counter() if token_requested_counter is not None: - token_requested_counter.add(1, attributes={ - 'token_type': req.type.value, - 'status': 'received' - }) + token_requested_counter.add( + 1, attributes={"token_type": req.type.value, "status": "received"} + ) if not signer.enabled: if token_requested_counter is not None: - token_requested_counter.add(1, attributes={ - 'token_type': req.type.value, - 'status': 'error' - }) + token_requested_counter.add( + 1, attributes={"token_type": req.type.value, "status": "error"} + ) raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, @@ -65,20 +64,24 @@ async def generate_new_access_token(req: AccessTokenRequest): ) try: claims = {"peer_type": req.type.value, **req.claims} - token = signer.sign(sub=req.id, token_lifetime=req.ttl, custom_claims=claims) + token = signer.sign( + sub=req.id, token_lifetime=req.ttl, custom_claims=claims + ) logger.info(f"Generated opal token: peer_type={req.type.value}") token_generated_counter = get_token_generated_counter() if token_generated_counter is not None: - token_generated_counter.add(1, attributes={ - 'peer_type': req.type.value, - 'ttl': req.ttl.total_seconds() if req.ttl else None - }) + token_generated_counter.add( + 1, + attributes={ + "peer_type": req.type.value, + "ttl": req.ttl.total_seconds() if req.ttl else None, + }, + ) if token_requested_counter is not None: - token_requested_counter.add(1, attributes={ - 'token_type': req.type.value, - 'status': 'success' - }) + token_requested_counter.add( + 1, attributes={"token_type": req.type.value, "status": "success"} + ) return AccessToken( token=token, @@ -90,20 +93,19 @@ async def generate_new_access_token(req: AccessTokenRequest): ), ) except Exception as ex: - logger.error(f"Failed to generate token: {str(ex)}") - error_type = ( - "TokenGenerationFailed" - if "token" in str(ex).lower() - else "UnexpectedError" - ) - if token_requested_counter is not None: - token_requested_counter.add(1, attributes={ - 'token_type': req.type.value, - 'status': 'error' - }) - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail="Failed to generate token due to server error.", + logger.error(f"Failed to generate token: {str(ex)}") + error_type = ( + "TokenGenerationFailed" + if "token" in str(ex).lower() + else "UnexpectedError" + ) + if token_requested_counter is not None: + token_requested_counter.add( + 1, attributes={"token_type": req.type.value, "status": "error"} ) + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="Failed to generate token due to server error.", + ) return router diff --git a/packages/opal-server/opal_server/server.py b/packages/opal-server/opal_server/server.py index ae38faeeb..7c4ae2b1a 100644 --- a/packages/opal-server/opal_server/server.py +++ b/packages/opal-server/opal_server/server.py @@ -6,11 +6,11 @@ from functools import partial from typing import List, Optional +import opal_common.monitoring.metrics as opal_dd_metrics from fastapi import Depends, FastAPI -from fastapi_websocket_pubsub.event_broadcaster import EventBroadcasterContextManager -from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer -from prometheus_client import generate_latest, CONTENT_TYPE_LATEST from fastapi.responses import Response +from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer +from fastapi_websocket_pubsub.event_broadcaster import EventBroadcasterContextManager from opal_common.authentication.deps import JWTAuthenticator, StaticBearerAuthenticator from opal_common.authentication.signer import JWTSigner from opal_common.confi.confi import load_conf_if_none @@ -18,7 +18,8 @@ from opal_common.logger import configure_logs, logger from opal_common.middleware import configure_middleware from opal_common.monitoring import apm -import opal_common.monitoring.metrics as opal_dd_metrics +from opal_common.monitoring.otel_metrics import init_meter +from opal_common.monitoring.tracer import init_tracer from opal_common.schemas.data import ServerDataSourceConfig from opal_common.synchronization.named_lock import NamedLock from opal_common.topics.publisher import ( @@ -26,8 +27,6 @@ ServerSideTopicPublisher, TopicPublisher, ) -from opal_common.monitoring.tracer import init_tracer -from opal_common.monitoring.otel_metrics import init_meter from opal_server.config import opal_server_config from opal_server.data.api import init_data_updates_router from opal_server.data.data_update_publisher import DataUpdatePublisher @@ -45,15 +44,15 @@ from opal_server.security.api import init_security_router from opal_server.security.jwks import JwksStaticEndpoint from opal_server.statistics import OpalStatistics, init_statistics_router -from opentelemetry import trace -from opentelemetry import metrics -from opentelemetry.sdk.trace import TracerProvider -from opentelemetry.sdk.resources import Resource -from opentelemetry.sdk.metrics import MeterProvider -from opentelemetry.sdk.trace.export import BatchSpanProcessor -from opentelemetry.exporter.prometheus import PrometheusMetricReader +from opentelemetry import metrics, trace from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter +from opentelemetry.exporter.prometheus import PrometheusMetricReader from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor +from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.sdk.resources import Resource +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import BatchSpanProcessor +from prometheus_client import CONTENT_TYPE_LATEST, generate_latest class OpalServer: @@ -222,10 +221,12 @@ def _init_fast_api_app(self): FastAPIInstrumentor.instrument_app(app) if opal_common_config.ENABLE_OPENTELEMETRY_METRICS: + @app.get("/metrics") async def metrics(): data = generate_latest() return Response(content=data, media_type=CONTENT_TYPE_LATEST) + logger.info("Mounted /metrics endpoint for Prometheus metrics.") return app @@ -259,8 +260,7 @@ def _initialize_opentelemetry_tracing(self): trace.set_tracer_provider(tracer_provider) otlp_exporter = OTLPSpanExporter( - endpoint = opal_common_config.OPENTELEMETRY_OTLP_ENDPOINT, - insecure=True + endpoint=opal_common_config.OPENTELEMETRY_OTLP_ENDPOINT, insecure=True ) span_processor = BatchSpanProcessor(otlp_exporter) @@ -274,8 +274,7 @@ def _initialize_opentelemetry_metrics(self): self.prometheus_metric_reader = PrometheusMetricReader() meter_provider = MeterProvider( - resource=resource, - metric_readers=[self.prometheus_metric_reader] + resource=resource, metric_readers=[self.prometheus_metric_reader] ) metrics.set_meter_provider(meter_provider) init_meter(meter_provider) diff --git a/packages/opal-server/requires.txt b/packages/opal-server/requires.txt index e27d5f133..57305e819 100644 --- a/packages/opal-server/requires.txt +++ b/packages/opal-server/requires.txt @@ -13,4 +13,4 @@ opentelemetry-sdk>=1.28.2 opentelemetry-instrumentation opentelemetry-instrumentation-fastapi opentelemetry-exporter-otlp -opentelemetry-exporter-prometheus \ No newline at end of file +opentelemetry-exporter-prometheus diff --git a/requirements.txt b/requirements.txt index cc34de0b9..86e2f7efe 100644 --- a/requirements.txt +++ b/requirements.txt @@ -15,4 +15,4 @@ opentelemetry-sdk>=1.28.2 opentelemetry-instrumentation opentelemetry-instrumentation-fastapi opentelemetry-exporter-otlp -opentelemetry-exporter-prometheus \ No newline at end of file +opentelemetry-exporter-prometheus