8000 Move polling periodic data entries to client (support scopes) by roekatz · Pull Request #573 · permitio/opal · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Move polling periodic data entries to client (support scopes) #573

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 39 additions & 4 deletions packages/opal-client/opal_client/data/updater.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import itertools
import json
import uuid
from functools import partial
from typing import Any, Dict, List, Optional, Tuple

import aiohttp
Expand All @@ -22,7 +23,7 @@
from opal_client.policy_store.policy_store_client_factory import (
DEFAULT_POLICY_STORE_GETTER,
)
from opal_common.async_utils import TakeANumberQueue, TasksPool
from opal_common.async_utils import TakeANumberQueue, TasksPool, repeated_call
from opal_common.config import opal_common_config
from opal_common.fetcher.events import FetcherConfig
from opal_common.http import is_http_error_response
Expand Down Expand Up @@ -130,6 +131,7 @@ def __init__(
)
self._updates_storing_queue = TakeANumberQueue(logger)
self._tasks = TasksPool()
self._polling_update_tasks = []

async def __aenter__(self):
await self.start()
Expand Down Expand Up @@ -202,12 +204,35 @@ async def get_base_policy_data(
logger.info(
"Performing data configuration, reason: {reason}", reason=data_fetch_reason
)
await self._stop_polling_update_tasks() # If this is a reconnect - should stop previously received periodic updates
sources_config = await self.get_policy_data_config(url=config_url)
# translate config to a data update
entries = sources_config.entries
update = DataUpdate(reason=data_fetch_reason, entries=entries)

init_entries, periodic_entries = [], []
for entry in sources_config.entries:
(
periodic_entries
if (entry.periodic_update_interval is not None)
else init_entries
).append(entry)

# Process one time entries now
update = DataUpdate(reason=data_fetch_reason, entries=init_entries)
await self.trigger_data_update(update)

# Schedule repeated processing of periodic polling entries
async def _trigger_update_with_entry(entry):
await self.trigger_data_update(
DataUpdate(reason="Periodic Update", entries=[entry])
)

for entry in periodic_entries:
repeat_process_entry = repeated_call(
partial(_trigger_update_with_entry, entry),
entry.periodic_update_interval,
logger=logger,
)
self._polling_update_tasks.append(asyncio.create_task(repeat_process_entry))

async def on_connect(self, client: PubSubClient, channel: RpcChannel):
"""Pub/Sub on_connect callback On connection to backend, whether its
the first connection, or reconnecting after downtime, refetch the state
Expand Down Expand Up @@ -262,6 +287,13 @@ async def _subscriber(self):
async with self._client:
await self._client.wait_until_done()

async def _stop_polling_update_tasks(self):
if len(self._polling_update_tasks) > 0:
for task in self._polling_update_tasks:
task.cancel()
await asyncio.gather(*self._polling_update_tasks, return_exceptions=True)
self._polling_update_tasks = []

async def stop(self):
self._stopping = True
logger.info("Stopping data updater")
Expand All @@ -275,6 +307,9 @@ async def stop(self):
"Timeout waiting for DataUpdater pubsub client to disconnect"
)

# stop periodic updates
await self._stop_polling_update_tasks()

# stop subscriber task
if self._subscriber_task is not None:
logger.debug("Cancelling DataUpdater subscriber task")
Expand Down
22 changes: 21 additions & 1 deletion packages/opal-common/opal_common/async_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import asyncio
import sys
from functools import partial
from typing import Any, Callable, Coroutine, List, TypeVar
from typing import Any, Callable, Coroutine, List, Optional, Tuple, TypeVar

import loguru

Expand Down Expand Up @@ -100,3 +100,23 @@ def add_task(self, f):
t = asyncio.create_task(f)
self._tasks.append(t)
t.add_done_callback(self._cleanup_task)


async def repeated_call(
func: Coroutine,
seconds: float,
*args: Tuple[Any],
logger: Optional[loguru.Logger] = None,
):
while True:
try:
await func(*args)
await asyncio.sleep(seconds)
except asyncio.CancelledError:
raise
except Exception as exc:
logger.exception(
"Error during repeated call to {func}: {exc}",
func=func,
exc=exc,
)
54 changes: 0 additions & 54 deletions packages/opal-server/opal_server/data/data_update_publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,57 +111,3 @@ async def publish_data_updates(self, update: DataUpdate):
await self._publisher.publish(
list(all_topic_combos), update.dict(by_alias=True)
)

async def _periodic_update_callback(
self, update: DataSourceEntryWithPollingInterval
):
"""Called for every periodic update based on repeat_every."""
logger.info(
"[{pid}] Sending Periodic update: {source}", pid=os.getpid(), source=update
)
# Create new publish entry

return await self.publish_data_updates(
DataUpdate(reason="Periodic Update", entries=[update])
)

def create_polling_updates(self, sources: ServerDataSourceConfig):
# For every entry with a non zero period update interval, bind an interval to it
updaters = []
if hasattr(sources, "config") and hasattr(sources.config, "entries"):
for source in sources.config.entries:
if (
hasattr(source, "periodic_update_interval")
and isinstance(source.periodic_update_interval, float)
and source.periodic_update_interval is not None
):
logger.info(
"[{pid}] Establishing Period Updates for the following source: {source}",
pid=os.getpid(),
source=source,
)

async def bind_for_repeat(bind_source=source):
await self._periodic_update_callback(bind_source)

updaters.append(
repeat_every(
seconds=source.periodic_update_interval,
wait_first=True,
logger=logger,
)(bind_for_repeat)
)
return updaters

@staticmethod
async def mount_and_start_polling_updates(
publisher: TopicPublisher, sources: ServerDataSourceConfig
):
logger.info("[{pid}] Starting Polling Updates", pid=os.getpid())
data_publisher = DataUpdatePublisher(publisher)
await asyncio.gather(
*(
polling_update()
for polling_update in data_publisher.create_polling_updates(sources)
)
)
9 changes: 1 addition & 8 deletions packages/opal-server/opal_server/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -350,14 +350,7 @@ async def start_server_background_tasks(self):
pid=os.getpid(),
)

if not opal_server_config.SCOPES:
# bind data updater publishers to the leader worker
asyncio.create_task(
DataUpdatePublisher.mount_and_start_polling_updates(
self.publisher, opal_server_config.DATA_CONFIG_SOURCES
)
)
else:
if opal_server_config.SCOPES:
await load_scopes(self._scopes)

if self.broadcast_keepalive is not None:
Expand Down
Loading
0