8000 [BugFix] Fix InternalService_RecoverableStub race conditon (backport #59933) by mergify[bot] · Pull Request #59950 · StarRocks/starrocks · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

[BugFix] Fix InternalService_RecoverableStub race conditon (backport #59933) #59950

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 38 additions & 22 deletions be/src/util/internal_service_recoverable_stub.cpp
10000
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,15 @@ class RecoverableClosure : public ::google::protobuf::Closure {
public:
RecoverableClosure(std::shared_ptr<starrocks::PInternalService_RecoverableStub> stub,
::google::protobuf::RpcController* controller, ::google::protobuf::Closure* done)
: _stub(std::move(std::move(stub))), _controller(controller), _done(done) {}
: _stub(std::move(stub)),
_controller(controller),
_done(done),
_next_connection_group(_stub->connection_group() + 1) {}

void Run() override {
auto* cntl = static_cast<brpc::Controller*>(_controller);
if (cntl->Failed() && cntl->ErrorCode() == EHOSTDOWN) {
auto st = _stub->reset_channel();
auto st = _stub->reset_channel("", _next_connection_group);
if (!st.ok()) {
LOG(WARNING) << "Fail to reset channel: " << st.to_string();
}
Expand All @@ -42,31 +45,44 @@ class RecoverableClosure : public ::google::protobuf::Closure {
std::shared_ptr<starrocks::PInternalService_RecoverableStub> _stub;
::google::protobuf::RpcController* _controller;
::google::protobuf::Closure* _done;
int64_t _next_connection_group;
};

PInternalService_RecoverableStub::PInternalService_RecoverableStub(const butil::EndPoint& endpoint)
: _endpoint(endpoint) {}

PInternalService_RecoverableStub::~PInternalService_RecoverableStub() = default;

Status PInternalService_RecoverableStub::reset_channel(const std::string& protocol) {
std::lock_guard<std::mutex> l(_mutex);
Status PInternalService_RecoverableStub::reset_channel(const std::string& protocol, int64_t next_connection_group) {
if (next_connection_group == 0) {
next_connection_group = _connection_group.load() + 1;
}
if (next_connection_group != _connection_group + 1) {
// need to take int64_t overflow into consideration
return Status::OK();
}
brpc::ChannelOptions options;
options.connect_timeout_ms = config::rpc_connect_timeout_ms;
if (protocol == "http") {
options.protocol = protocol;
} else {
// http does not support these.
options.connection_type = config::brpc_connection_type;
options.connection_group = std::to_string(_connection_group++);
options.connection_group = std::to_string(next_connection_group);
}
options.max_retry = 3;
std::unique_ptr<brpc::Channel> channel(new brpc::Channel());
if (channel->Init(_endpoint, &options)) {
LOG(WARNING) << "Fail to init channel " << _endpoint;
return Status::InternalError("Fail to init channel");
}
_stub = std::make_shared<PInternalService_Stub>(channel.release(), google::protobuf::Service::STUB_OWNS_CHANNEL);
auto ptr = std::make_unique<PInternalService_Stub>(channel.release(), google::protobuf::Service::STUB_OWNS_CHANNEL);
std::unique_lock l(_mutex);
if (next_connection_group == _connection_group.load() + 1) {
// prevent the underlying _stub been reset again by the same epoch calls
++_connection_group;
_stub.reset(ptr.release());
}
return Status::OK();
}

Expand All @@ -75,120 +91,120 @@ void PInternalService_RecoverableStub::tablet_writer_open(::google::protobuf::Rp
::starrocks::PTabletWriterOpenResult* response,
::google::protobuf::Closure* done) {
auto closure = new RecoverableClosure(shared_from_this(), controller, done);
_stub->tablet_writer_open(controller, request, response, closure);
stub()->tablet_writer_open(controller, request, response, closure);
}

void PInternalService_RecoverableStub::tablet_writer_cancel(::google::protobuf::RpcController* controller,
const ::starrocks::PTabletWriterCancelRequest* request,
::starrocks::PTabletWriterCancelResult* response,
::google::protobuf::Closure* done) {
auto closure = new RecoverableClosure(shared_from_this(), controller, done);
_stub->tablet_writer_cancel(controller, request, response, closure);
stub()->tablet_writer_cancel(controller, request, response, closure);
}

void PInternalService_RecoverableStub::transmit_chunk(::google::protobuf::RpcController* controller,
const ::starrocks::PTransmitChunkParams* request,
::starrocks::PTransmitChunkResult* response,
::google::protobuf::Closure* done) {
auto closure = new RecoverableClosure(shared_from_this(), controller, done);
_stub->transmit_chunk(controller, request, response, closure);
stub()->transmit_chunk(controller, request, response, closure);
}

void PInternalService_RecoverableStub::transmit_chunk_via_http(::google::protobuf::RpcController* controller,
const ::starrocks::PHttpRequest* request,
::starrocks::PTransmitChunkResult* response,
::google::protobuf::Closure* done) {
auto closure = new RecoverableClosure(shared_from_this(), controller, done);
_stub->transmit_chunk_via_http(controller, request, response, closure);
stub()->transmit_chunk_via_http(controller, request, response, closure);
}

void PInternalService_RecoverableStub::tablet_writer_add_chunk(::google::protobuf::RpcController* controller,
const ::starrocks::PTabletWriterAddChunkRequest* request,
::starrocks::PTabletWriterAddBatchResult* response,
::google::protobuf::Closure* done) {
auto closure = new RecoverableClosure(shared_from_this(), controller, done);
_stub->tablet_writer_add_chunk(controller, request, response, closure);
stub()->tablet_writer_add_chunk(controller, request, response, closure);
}

void PInternalService_RecoverableStub::tablet_writer_add_chunks(
::google::protobuf::RpcController* controller, const ::starrocks::PTabletWriterAddChunksRequest* request,
::starrocks::PTabletWriterAddBatchResult* response, ::google::protobuf::Closure* done) {
auto closure = new RecoverableClosure(shared_from_this(), controller, done);
_stub->tablet_writer_add_chunks(controller, request, response, closure);
stub()->tablet_writer_add_chunks(controller, request, response, closure);
}

void PInternalService_RecoverableStub::tablet_writer_add_chunk_via_http(
::google::protobuf::RpcController* controller, const ::starrocks::PHttpRequest* request,
::starrocks::PTabletWriterAddBatchResult* response, ::google::protobuf::Closure* done) {
auto closure = new RecoverableClosure(shared_from_this(), controller, done);
_stub->tablet_writer_add_chunk_via_http(controller, request, response, closure);
stub()->tablet_writer_add_chunk_via_http(controller, request, response, closure);
}

void PInternalService_RecoverableStub::tablet_writer_add_chunks_via_http(
::google::protobuf::RpcController* controller, const ::starrocks::PHttpRequest* request,
::starrocks::PTabletWriterAddBatchResult* response, ::google::protobuf::Closure* done) {
auto closure = new RecoverableClosure(shared_from_this(), controller, done);
_stub->tablet_writer_add_chunks_via_http(controller, request, response, closure);
stub()->tablet_writer_add_chunks_via_http(controller, request, response, closure);
}

void PInternalService_RecoverableStub::tablet_writer_add_segment(
::google::protobuf::RpcController* controller, const ::starrocks::PTabletWriterAddSegmentRequest* request,
::starrocks::PTabletWriterAddSegmentResult* response, ::google::protobuf::Closure* done) {
auto closure = new RecoverableClosure(shared_from_this(), controller, done);
_stub->tablet_writer_add_segment(controller, request, response, closure);
stub()->tablet_writer_add_segment(controller, request, response, closure);
}

void PInternalService_RecoverableStub::get_load_replica_status(google::protobuf::RpcController* controller,
const PLoadReplicaStatusRequest* request,
PLoadReplicaStatusResult* response,
google::protobuf::Closure* done) {
auto closure = new RecoverableClosure(shared_from_this(), controller, done);
_stub->get_load_replica_status(controller, request, response, closure);
stub()->get_load_replica_status(controller, request, response, closure);
}

void PInternalService_RecoverableStub::load_diagnose(::google::protobuf::RpcController* controller,
const ::starrocks::PLoadDiagnoseRequest* request,
::starrocks::PLoadDiagnoseResult* response,
::google::protobuf::Closure* done) {
auto closure = new RecoverableClosure(shared_from_this(), controller, done);
_stub->load_diagnose(controller, request, response, closure);
stub()->load_diagnose(controller, request, response, closure);
}

void PInternalService_RecoverableStub::transmit_runtime_filter(::google::protobuf::RpcController* controller,
const ::starrocks::PTransmitRuntimeFilterParams* request,
::starrocks::PTransmitRuntimeFilterResult* response,
::google::protobuf::Closure* done) {
auto closure = new RecoverableClosure(shared_from_this(), controller, done);
_stub->transmit_runtime_filter(controller, request, response, closure);
stub()->transmit_runtime_filter(controller, request, response, closure);
}

void PInternalService_RecoverableStub::local_tablet_reader_multi_get(
::google::protobuf::RpcController* controller, const ::starrocks::PTabletReaderMultiGetRequest* request,
::starrocks::PTabletReaderMultiGetResult* response, ::google::protobuf::Closure* done) {
auto closure = new RecoverableClosure(shared_from_this(), controller, done);
_stub->local_tablet_reader_multi_get(controller, request, response, closure);
stub()->local_tablet_reader_multi_get(controller, request, response, closure);
}

void PInternalService_RecoverableStub::execute_command(::google::protobuf::RpcController* controller,
const ::starrocks::ExecuteCommandRequestPB* request,
::starrocks::ExecuteCommandResultPB* response,
::google::protobuf::Closure* done) {
auto closure = new RecoverableClosure(shared_from_this(), controller, done);
_stub->execute_command(controller, request, response, closure);
stub()->execute_command(controller, request, response, closure);
}

void PInternalService_RecoverableStub::process_dictionary_cache(
::google::protobuf::RpcController* controller, const ::starrocks::PProcessDictionaryCacheRequest* request,
::starrocks::PProcessDictionaryCacheResult* response, ::google::protobuf::Closure* done) {
auto closure = new RecoverableClosure(shared_from_this(), controller, done);
_stub->process_dictionary_cache(controller, request, response, closure);
stub()->process_dictionary_cache(controller, request, response, closure);
}

void PInternalService_RecoverableStub::fetch_datacache(::google::protobuf::RpcController* controller,
const ::starrocks::PFetchDataCacheRequest* request,
::starrocks::PFetchDataCacheResponse* response,
::google::protobuf::Closure* done) {
_stub->fetch_datacache(controller, request, response, nullptr);
stub()->fetch_datacache(controller, request, response, nullptr);
}

} // namespace starrocks
18 changes: 11 additions & 7 deletions be/src/util/internal_service_recoverable_stub.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
#pragma once

#include <memory>
#include <mutex>
#include <shared_mutex>

#include "common/status.h"
#include "gen_cpp/internal_service.pb.h"
Expand All @@ -29,11 +29,14 @@ class PInternalService_RecoverableStub : public PInternalService,
PInternalService_RecoverableStub(const butil::EndPoint& endpoint);
~PInternalService_RecoverableStub();

Status reset_channel(const std::string& protocol = "");
Status reset_channel(const std::string& protocol = "", int64_t next_connection_group = 0);

#ifdef BE_TEST
PInternalService_Stub* stub() { return _stub.get(); }
#endif
std::shared_ptr<starrocks::PInternalService_Stub> stub() const {
std::shared_lock l(_mutex);
return _stub;
}

int64_t connection_group() const { return _connection_group.load(); }

// implements PInternalService ------------------------------------------

Expand Down Expand Up @@ -93,8 +96,9 @@ class PInternalService_RecoverableStub : public PInternalService,
private:
std::shared_ptr<starrocks::PInternalService_Stub> _stub;
const butil::EndPoint _endpoint;
int64_t _connection_group = 0;
std::mutex _mutex;
std::atomic<int64_t> _connection_group = 0;
mutable std::shared_mutex _mutex;

GOOGLE_DISALLOW_EVIL_CONSTRUCTORS(PInternalService_RecoverableStub);
};

Expand Down
1 change: 1 addition & 0 deletions be/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,7 @@ set(EXEC_FILES
./util/utf8_check_test.cpp
./util/int96_test.cpp
./util/internal_service_recoverable_stub_test.cpp
./util/internal_service_recoverable_stub_parallel_test.cpp
./util/bit_packing_test.cpp
./util/bit_packing_simd_test.cpp
./util/gc_helper_test.cpp
Expand Down
131 changes: 131 additions & 0 deletions be/test/util/internal_service_recoverable_stub_parallel_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <gtest/gtest.h>

#include <chrono>
#include <future>
#include <thread>

#include "util/internal_service_recoverable_stub.h"
#include "util/ref_count_closure.h"

namespace starrocks {

class PInternalService_RecoverableStub_ParallelTest : public testing::Test {
public:
PInternalService_RecoverableStub_ParallelTest() = default;
~PInternalService_RecoverableStub_ParallelTest() override = default;
};

TEST_F(PInternalService_RecoverableStub_ParallelTest, test_parallel_reset_execute) {
butil::EndPoint endpoint;
auto res = butil::str2endpoint("127.0.0.1", 53343, &endpoint);
EXPECT_EQ(res, 0);

std::shared_ptr<starrocks::PInternalService_RecoverableStub> stub =
std::make_shared<starrocks::PInternalService_RecoverableStub>(endpoint);
EXPECT_TRUE(stub->reset_channel().ok());

std::atomic<bool> running = true;
std::thread reset_thread([&] {
while (running.load()) {
stub->reset_channel();
std::this_thread::sleep_for(std::chrono::microseconds(1));
}
});

int num_threads = 20;
std::vector<std::thread> send_threads;
for (int i = 0; i < num_threads; ++i) {
send_threads.emplace_back([&] {
while (running.load()) {
PTabletWriterAddChunkRequest request;
auto* closure = new starrocks::RefCountClosure<PTabletWriterAddBatchResult>();
closure->ref();
stub->tablet_writer_add_chunk(&closure->cntl, &request, &closure->result, closure);
std::this_thread::sleep_for(std::chrono::microseconds(1));
}
});
}

std::this_thread::sleep_for(std::chrono::seconds(2));
running = false;

reset_thread.join();
for (auto& t : send_threads) {
t.join();
}
}

TEST_F(PInternalService_RecoverableStub_ParallelTest, test_reset_channel_with_connection_group) {
butil::EndPoint endpoint;
auto res = butil::str2endpoint("127.0.0.1", 53344, &endpoint);
EXPECT_EQ(res, 0);
std::shared_ptr<starrocks::PInternalService_RecoverableStub> stub =
std::make_shared<starrocks::PInternalService_RecoverableStub>(endpoint);

// reset channel without the next_connection_group parameter, always allowed
EXPECT_EQ(0, stub->connection_group());
EXPECT_TRUE(stub->reset_channel().ok());
EXPECT_EQ(1, stub->connection_group());
EXPECT_TRUE(stub->reset_channel().ok());
EXPECT_EQ(2, stub->connection_group());

// reset channel with the correct next_connection_group
EXPECT_TRUE(stub->reset_channel("", 3).ok());
EXPECT_EQ(3, stub->connection_group());

// reset with wrong next_connection_group, connection_group() won't increase
EXPECT_TRUE(stub->reset_channel("", 5).ok());
EXPECT_EQ(3, stub->connection_group());

EXPECT_TRUE(stub->reset_channel("", 3).ok());
EXPECT_EQ(3, stub->connection_group());

// rest channel with correct next_connection_group
EXPECT_TRUE(stub->reset_channel("", 4).ok());
EXPECT_EQ(4, stub->connection_group());
}

TEST_F(PInternalService_RecoverableStub_ParallelTest, test_parallel_reset_channel_exclusive) {
butil::EndPoint endpoint;
auto res = butil::str2endpoint("127.0.0.1", 53345, &endpoint);
EXPECT_EQ(res, 0);
std::shared_ptr<starrocks::PInternalService_RecoverableStub> stub =
std::make_shared<starrocks::PInternalService_RecoverableStub>(endpoint);
stub->reset_channel();
int next_connection_group = stub->connection_group() + 1;

int num_threads = 100;
std::vector<std::thread> reset_threads;
std::promise<void> ready_promise;
std::shared_future<void> ready_future(ready_promise.get_future());
for (int i = 0; i < num_threads; ++i) {
reset_threads.emplace_back([&] {
ready_future.wait();
stub->reset_channel("", next_connection_group);
});
}

// allow all the threads to reset_channel() immediately
ready_promise.set_value();
for (auto& t : reset_threads) {
t.join();
}
// only one of the threads can reset the channel successfully.
EXPECT_EQ(next_connection_group, stub->connection_group());
}

} // namespace starrocks
Loading
0