8000 CI: Can preset leader via env var BLAZINGMQ_LEADER_NAME for integration tests by bbpetukhov · Pull Request #693 · bloomberg/blazingmq · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

CI: Can preset leader via env var BLAZINGMQ_LEADER_NAME for integration tests #693

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 30 additions & 6 deletions src/python/blazingmq/dev/it/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@

CORE_PATTERN_PATH = "/proc/sys/kernel/core_pattern"

QUORUM_DEFAULT = 0
QUORUM_TO_ENSURE_LEADER = 1
QUORUM_TO_ENSURE_NOT_LEADER = 100


def _match_broker(broker, **kw):
datacenter = kw.get("datacenter", None)
Expand Down Expand Up @@ -134,7 +138,9 @@ def admin_endpoint(self) -> Union[Tuple[str, int], Tuple[None, None]]:
self.last_known_leader.config.port
)

def start(self, wait_leader=True, wait_ready=False):
def start(
self, wait_leader=True, wait_ready=False, leader_name: Union[str, None] = None
):
"""
Start all the nodes and proxies in the cluster.

Expand All @@ -144,14 +150,23 @@ def start(self, wait_leader=True, wait_ready=False):
See also: 'wait_status' for more information on the 'ready' flags.
"""

need_preset_leader = leader_name is not None

with internal_use(self):
for broker in self.config.configurator.brokers.values():
if len(broker.clusters.my_virtual_clusters) > 0:
self.start_virtual_node(broker)
elif len(broker.clusters.my_clusters) == 0:
self.start_proxy(broker)
else:
self.start_node(broker)
brkrproc = self.start_node(broker)

# Select leader based on the given leader_name
if need_preset_leader:
if broker.name == leader_name:
brkrproc.set_quorum(QUORUM_TO_ENSURE_LEADER)
else:
brkrproc.set_quorum(QUORUM_TO_ENSURE_NOT_LEADER)

if self.is_single_node:
self._proxies = self._nodes
Expand All @@ -160,6 +175,11 @@ def start(self, wait_leader=True, wait_ready=False):
self.wait_status(wait_leader, wait_ready)
self.drain()

# Reset quorum to default
if need_preset_leader:
for brkrproc in self._nodes:
brkrproc.set_quorum(QUORUM_DEFAULT)

return (self._nodes, self._proxies)

@property
Expand Down Expand Up @@ -296,7 +316,7 @@ def restart_nodes(self, wait_leader=True, wait_ready=False):
self._logger.info("restarting all nodes")
for node in self.nodes():
if node is not self.last_known_leader:
node.set_quorum(100)
node.set_quorum(QUORUM_TO_ENSURE_NOT_LEADER)

self.last_known_leader = None
with internal_use(self):
Expand Down Expand Up @@ -731,9 +751,13 @@ def _find_cores_dir(self) -> Optional[Path]:

return None

def _start_broker(self, broker: cfg.Broker, array, cluster_name):
def _start_broker(
self, broker: cfg.Broker, brokers: List[Broker], cluster_name: Union[str, None]
):
if broker.name in self._processes:
raise RuntimeError(f'node "{broker.name}" is already running')
raise RuntimeError(
f'node "{broker.name}" is already running in cluster "{cluster_name}"'
)

process = Broker(
broker,
Expand All @@ -746,7 +770,7 @@ def _start_broker(self, broker: cfg.Broker, array, cluster_name):
process.start()

self._processes[broker.name] = process
array.append(process)
brokers.append(process)

process.wait_until_started()

Expand Down
2 changes: 2 additions & 0 deletions src/python/blazingmq/dev/it/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -397,13 +397,15 @@ def apply_tweaks(stage: int):

if get_cluster_param(request, "_start_cluster", True):
with internal_use(cluster):
leader_name = os.environ.get("BLAZINGMQ_LEADER_NAME")
cluster.start(
wait_leader=get_cluster_param(
request, "_wait_leader", True
),
wait_ready=get_cluster_param(
request, "_wait_ready", False
),
leader_name=leader_name,
)

if request.instance is not None and hasattr(
Expand Down
Loading
0