8000 Add storage-head feature by brly · Pull Request #216 · frugalos/frugalos · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Add storage-head feature #216

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Oct 4, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
10000
Original file line numberDiff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ jemallocator = "0.1.8"
jemalloc-ctl = "0.2"
hostname = "0.1"
httpcodec = "0.2"
libfrugalos = "0.4.0"
libfrugalos = "0.5.0"
num_cpus = "1"
prometrics = "0.1"
raftlog = "0.5"
Expand Down
2 changes: 1 addition & 1 deletion frugalos_config/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ fibers_rpc = "0.2"
fibers_tasque = "0.1"
frugalos_raft = { version = "0.8", path = "../frugalos_raft/" }
futures = "0.1"
libfrugalos = "0.4.0"
libfrugalos = "0.5.0"
prometrics = "0.1"
protobuf_codec = "0.2"
raftlog = "0.5"
Expand Down
2 changes: 1 addition & 1 deletion frugalos_mds/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ fibers_tasque = "0.1"
frugalos_core = { version = "0.1", path = "../frugalos_core" }
frugalos_raft = { version = "0.8", path = "../frugalos_raft/" }
futures = "0.1"
libfrugalos = "0.4.0"
libfrugalos = "0.5.0"
patricia_tree = { version = "0.1.8", features = ["binary-format"] }
prometrics = "0.1"
protobuf_codec = "0.2"
Expand Down
2 changes: 1 addition & 1 deletion frugalos_segment/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ frugalos_core = { version = "0.1", path = "../frugalos_core/" }
frugalos_mds = { version = "0.10", path = "../frugalos_mds/" }
frugalos_raft = { version = "0.8", path = "../frugalos_raft/" }
futures = "0.1"
libfrugalos = "0.4.0"
libfrugalos = "0.5.0"
prometrics = "0.1"
rand = "0.5"
raftlog = "0.5"
Expand Down
124 changes: 123 additions & 1 deletion frugalos_segment/src/client/dispersed_storage.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#![allow(clippy::needless_pass_by_value)]
use cannyls::deadline::Deadline;
use cannyls::lump::LumpData;
use cannyls::lump::{LumpData, LumpHeader};
use cannyls_rpc::Client as CannyLsClient;
use cannyls_rpc::DeviceId;
use ecpool::liberasurecode::LibErasureCoderBuilder;
Expand Down Expand Up @@ -146,6 +146,36 @@ impl DispersedClient {
span,
})
}
pub fn head(
self,
version: ObjectVersion,
deadline: Deadline,
parent: SpanHandle,
) -> BoxFuture<()> {
let mut candidates = self
.cluster
.candidates(version)
.cloned()
.collect::<Vec<_>>();
candidates.reverse();
let span = parent.child("head_content", |span| {
span.tag(StdTag::component(module_path!()))
.tag(Tag::new("object.version", version.0 as i64))
.tag(Tag::new("storage.type", "dispersed"))
.start()
});
Box::new(DispersedHead::new(
self.logger,
self.data_fragments,
candidates,
version,
deadline,
self.rpc_service,
&self.client_config,
span.handle(),
Some(timer::timeout(self.client_config.head_timeout)),
))
}
pub fn put(
self,
version: ObjectVersion,
Expand Down Expand Up @@ -533,3 +563,95 @@ impl Future for ReconstructDispersedFragment {
Ok(Async::NotReady)
}
}

pub struct DispersedHead {
logger: Logger,
future: futures::future::SelectAll<BoxFuture<Option<LumpHeader>>>,
data_fragments: usize,
exists: usize,
timeout: Option<timer::Timeout>,
}
impl DispersedHead {
#[allow(clippy::too_many_arguments)]
fn new(
logger: Logger,
data_fragments: usize,
candidates: Vec<ClusterMember>,
version: ObjectVersion,
deadline: Deadline,
rpc_service: RpcServiceHandle,
client_config: &DispersedClientConfig,
parent: SpanHandle,
timeout: Option<timer::Timeout>,
) -> Self {
let futures = candidates.iter().map(move |cluster_member| {
let client = CannyLsClient::new(cluster_member.node.addr, rpc_service.clone());
let lump_id = cluster_member.make_lump_id(version);
let mut span = parent.child("dispersed_head", |span| {
span.tag(StdTag::component(module_path!()))
.tag(StdTag::span_kind("client"))
.tag(StdTag::peer_ip(cluster_member.node.addr.ip()))
.tag(StdTag::peer_port(cluster_member.node.addr.port()))
.tag(Tag::new("device", cluster_member.device.clone()))
.tag(Tag::new("lump", format!("{:?}", lump_id)))
.start()
});
let mut request = client.request();
request.rpc_options(client_config.cannyls.rpc_options());
let device = cluster_member.device.clone();
let future = request
.deadline(deadline)
.head_lump(DeviceId::new(device), lump_id)
.then(move |result| {
if let Err(ref e) = result {
span.log_error(e);
}
result
});
let future: BoxFuture<_> = Box::new(future.map_err(|e| track!(Error::from(e))));
future
});
DispersedHead {
logger: logger.clone(),
future: futures::future::select_all(futures),
data_fragments,
exists: 0,
timeout,
}
}
}
impl Future for DispersedHead {
type Item = ();
type Error = Error;

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let remainings = match self.future.poll() {
Ok(Async::NotReady) => return Ok(Async::NotReady),
Err((e, _, remainings)) => {
debug!(self.logger, "DispersedHead:{}", e);
remainings
}
Ok(Async::Ready((lump_header, _, remainings))) => {
self.exists += lump_header.map_or(0, |_| 1);
if self.exists >= self.data_fragments {
return Ok(Async::Ready(()));
}
remainings
}
};
if remainings.len() + self.exists < self.data_fragments {
let cause = format!("DispersedHead: There are no enough fragments (Detail: futures.len({}) + fragments.len({}) < data_fragments({}))",
remainings.len(),
self.exists,
self.data_fragments
);
return Err(track!(Error::from(ErrorKind::Corrupted.cause(cause))));
}
self.future = futures::future::select_all(remainings);
if let Ok(Async::Ready(Some(()))) = self.timeout.poll() {
let cause = "DispersedHead: timeout expired";
return Err(track!(Error::from(ErrorKind::Busy.cause(cause))));
}
Ok(Async::NotReady)
}
}
100 changes: 100 additions & 0 deletions frugalos_segment/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,29 @@ impl Client {
self.mds.head(id, consistency, parent)
}

/// オブジェクトの存在確認をストレージ側に問い合わせる。
pub fn head_storage(
&self,
id: ObjectId,
deadline: Deadline,
consistency: ReadConsistency,
parent: SpanHandle,
) -> impl Future<Item = Option<ObjectVersion>, Error = Error> {
let storage = self.storage.clone();
self.mds
.head(id, consistency, parent.clone())
.and_then(move |version| {
if let Some(version) = version {
let future = storage
.head(version, deadline, parent)
.map(move |()| Some(version));
Either::A(future)
} else {
Either::B(futures::future::ok(None))
}
})
}

/// オブジェクトを保存する。
pub fn put(
&self,
Expand Down Expand Up @@ -238,6 +261,8 @@ impl Drop for PutFailureTracking {
#[cfg(test)]
mod tests {
use super::*;
use cannyls_rpc::DeviceId;
use config::ClusterMember;
use fibers::executor::Executor;
use rustracing_jaeger::span::Span;
use std::{thread, time};
Expand Down Expand Up @@ -376,4 +401,79 @@ mod tests {

Ok(())
}

#[test]
fn head_storage_work() -> TestResult {
let data_fragments = 2;
let parity_fragments = 1;
let cluster_size = 3;
let mut system = System::new(data_fragments, parity_fragments)?;
let (members, client) = setup_system(&mut system, cluster_size)?;
let rpc_service_handle = system.rpc_service_handle();

thread::spawn(move || loop {
system.executor.run_once().unwrap();
thread::sleep(time::Duration::from_micros(100));
});

let expected = vec![0x03];
let object_id = "test_data".to_owned();

// wait until the segment becomes stable; for example, there is a raft leader.
// However, 5-secs is an ungrounded value.
thread::sleep(time::Duration::from_secs(5));

let (object_version, _) = wait(client.put(
object_id.clone(),
expected.clone(),
Deadline::Infinity,
Expect::Any,
Span::inactive().handle(),
))?;

let result = wait(client.head_storage(
object_id.to_owned(),
Deadline::Infinity,
ReadConsistency::Consistent,
Span::inactive().handle(),
))?;
assert_eq!(result, Some(object_version));
// delete (num of data_fragments) lumps
let mut i = 0;
for (node_id, device_id, _) in members {
let client = cannyls_rpc::Client::new(node_id.addr, rpc_service_handle.clone());
let cluster_member = ClusterMember {
node: node_id,
device: device_id.clone(),
};
let lump_id = cluster_member.make_lump_id(object_version);
let request = client.request();
let future = request
.delete_lump(DeviceId::new(device_id.clone()), lump_id)
.map_err(|e| e.into());
let result = wait(future)?;
assert_eq!(result, true);
i += 1;
if i >= data_fragments {
break;
}
}

// head_storage request will be failed
let result = wait(client.head(
object_id.to_owned(),
ReadConsistency::Consistent,
Span::inactive().handle(),
))?;
assert_eq!(result, Some(object_version));
let result = wait(client.head_storage(
object_id.to_owned(),
Deadline::Infinity,
ReadConsistency::Consistent,
Span::inactive().handle(),
));
assert!(result.is_err());

Ok(())
}
}
4 changes: 4 additions & 0 deletions frugalos_segment/src/client/replicated_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ impl ReplicatedClient {
};
Box::new(future)
}
/// TODO 実装
pub fn head(self, _version: ObjectVersion, _deadline: Deadline) -> BoxFuture<()> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO コメントとかをつけていただけると助かります

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

旨を記述しておきます

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

未実装の場合 unreachable!() を実装にしている既存コードがあります。が、これが普通なのかまでは分かっていません…。

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

なるほど。
今回は未実装なんですけど、コードとしては到達してしまうのでマクロは置かないでおきますかね。

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ここは普通に到達する (Replicated バケツのオブジェクトに HEAD) ので、unreachable! にはしない方がいいのではという気がします。warn! とかはしてもいいかもという気がします。

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unreachable! を使ってるコードは本当に unreachable だったかもしれません。

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

現状で LGTM です

Box::new(futures::future::ok(()))
}
pub fn put(
self,
version: ObjectVersion,
Expand Down
12 changes: 12 additions & 0 deletions frugalos_segment/src/client/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,18 @@ impl StorageClient {
StorageClient::Dispersed(c) => c.get(object.version, deadline, parent),
}
}
pub fn head(
self,
version: ObjectVersion,
deadline: Deadline,
parent: SpanHandle,
) -> BoxFuture<()> {
match self {
StorageClient::Metadata => Box::new(future::ok(())),
StorageClient::Replicated(c) => c.head(version, deadline),
StorageClient::Dispersed(c) => c.head(version, deadline, parent),
}
}
pub fn put(
self,
version: ObjectVersion,
Expand Down
Loading
0