8000 feat: Allow development of custom stores by gonzalezzfelipe · Pull Request #67 · txpipe/balius · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

feat: Allow development of custom stores #67

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 17 additions & 15 deletions balius-runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ pub mod wit {

mod metrics;
mod router;
mod store;

// implementations
pub mod drivers;
Expand All @@ -26,9 +25,10 @@ pub mod kv;
pub mod ledgers;
pub mod logging;
pub mod sign;
pub mod store;
pub mod submit;

pub use store::Store;
pub use store::{AtomicUpdateTrait, Store, StoreTrait};
pub use wit::Response;

pub type WorkerId = String;
Expand All @@ -39,7 +39,7 @@ pub enum Error {
Wasm(wasmtime::Error),

#[error("store error {0}")]
Store(Box<redb::Error>),
Store(String),

#[error("worker not found '{0}'")]
WorkerNotFound(WorkerId),
Expand Down Expand Up @@ -80,37 +80,37 @@ impl From<wasmtime::Error> for Error {

impl From<redb::Error> for Error {
fn from(value: redb::Error) -> Self {
Self::Store(Box::new(value))
Self::Store(value.to_string())
}
}

impl From<redb::DatabaseError> for Error {
fn from(value: redb::DatabaseError) -> Self {
Self::Store(Box::new(value.into()))
Self::Store(value.to_string())
}
}

impl From<redb::TransactionError> for Error {
fn from(value: redb::TransactionError) -> Self {
Self::Store(Box::new(value.into()))
Self::Store(value.to_string())
}
}

impl From<redb::TableError> for Error {
fn from(value: redb::TableError) -> Self {
Self::Store(Box::new(value.into()))
Self::Store(value.to_string())
}
}

impl From<redb::CommitError> for Error {
fn from(value: redb::CommitError) -> Self {
Self::Store(Box::new(value.into()))
Self::Store(value.to_string())
}
}

impl From<redb::StorageError> for Error {
fn from(value: redb::StorageError) -> Self {
Self::Store(Box::new(value.into()))
Self::Store(value.to_string())
}
}

Expand Down Expand Up @@ -485,7 +485,7 @@ impl Runtime {

if let Some(seq) = lowest_seq {
debug!(lowest_seq, "found lowest seq");
return self.store.find_chain_point(seq);
return self.store.find_chain_point(seq).await;
}

Ok(None)
Expand Down Expand Up @@ -522,7 +522,7 @@ impl Runtime {
let config = serde_json::to_vec(&config).unwrap();
instance.call_init(&mut wasm_store, &config).await?;

let cursor = self.store.get_worker_cursor(id)?;
let cursor = self.store.get_worker_cursor(id).await?;
debug!(cursor, id, "found cursor for worker");

self.loaded.lock().await.insert(
Expand Down Expand Up @@ -585,18 +585,20 @@ impl Runtime {
) -> Result<(), Error> {
info!("applying block");

let log_seq = self.store.write_ahead(undo_blocks, next_block)?;
let log_seq = self.store.write_ahead(undo_blocks, next_block).await?;

let mut workers = self.loaded.lock().await;

let mut store_update = self.store.start_atomic_update(log_seq)?;
let mut store_update = self.store.start_atomic_update(log_seq).await?;

for (_, worker) in workers.iter_mut() {
worker.apply_chain(undo_blocks, next_block).await?;
store_update.update_worker_cursor(&worker.wasm_store.data().worker_id)?;
store_update
.update_worker_cursor(&worker.wasm_store.data().worker_id)
.await?;
}

store_update.commit()?;
store_update.commit().await?;

Ok(())
}
Expand Down
102 changes: 102 additions & 0 deletions balius-runtime/src/store/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
pub mod redb;

use prost::Message;
use std::sync::Arc;
use tokio::sync::Mutex;

use crate::{Block, ChainPoint, Error};

pub type WorkerId = String;
pub type LogSeq = u64;

#[derive(Message)]
pub struct LogEntry {
#[prost(bytes, tag = "1")]
pub next_block: Vec<u8>,
#[prost(bytes, repeated, tag = "2")]
pub undo_blocks: Vec<Vec<u8>>,
}

#[async_trait::async_trait]
pub trait AtomicUpdateTrait {
async fn update_worker_cursor(&mut self, id: &str) -> Result<(), super::Error>;
async fn commit(&mut self) -> Result<(), super::Error>;
}

#[allow(clippy::large_enum_variant)]
pub enum AtomicUpdate {
Redb(redb::AtomicUpdate),
Custom(Arc<Mutex<dyn AtomicUpdateTrait + Send + Sync>>),
}

#[async_trait::async_trait]
impl AtomicUpdateTrait for AtomicUpdate {
async fn update_worker_cursor(&mut self, id: &str) -> Result<(), super::Error> {
match self {
AtomicUpdate::Redb(au) => au.update_worker_cursor(id).await,
AtomicUpdate::Custom(au) => au.lock().await.update_worker_cursor(id).await,
}
}
async fn commit(&mut self) -> Result<(), super::Error> {
match self {
AtomicUpdate::Redb(au) => au.commit().await,
AtomicUpdate::Custom(au) => au.lock().await.commit().await,
}
}
}

#[derive(Clone)]
pub enum Store {
Redb(redb::Store),
Custom(Arc<Mutex<dyn StoreTrait + Send + Sync>>),
}

#[async_trait::async_trait]
pub trait StoreTrait {
async fn find_chain_point(&self, seq: LogSeq) -> Result<Option<ChainPoint>, Error>;
async fn write_ahead(
&mut self,
undo_blocks: &[Block],
next_block: &Block,
) -> Result<LogSeq, Error>;
async fn get_worker_cursor(&self, id: &str) -> Result<Option<LogSeq>, super::Error>;
async fn start_atomic_update(&self, log_seq: LogSeq) -> Result<AtomicUpdate, super::Error>;
}

#[async_trait::async_trait]
impl StoreTrait for Store {
async fn find_chain_point(&self, seq: LogSeq) -> Result<Option<ChainPoint>, Error> {
match self {
Store::Redb(store) => store.find_chain_point(seq).await,
Store::Custom(store) => store.lock().await.find_chain_point(seq).await,
}
}
async fn write_ahead(
&mut self,
undo_blocks: &[Block],
next_block: &Block,
) -> Result<LogSeq, Error> {
match self {
Store::Redb(store) => store.write_ahead(undo_blocks, next_block).await,
Store::Custom(store) => {
store
.lock()
.await
.write_ahead(undo_blocks, next_block)
.await
}
}
}
async fn get_worker_cursor(&self, id: &str) -> Result<Option<LogSeq>, super::Error> {
match self {
Store::Redb(store) => store.get_worker_cursor(id).await,
Store::Custom(store) => store.lock().await.get_worker_cursor(id).await,
}
}
async fn start_atomic_update(&self, log_seq: LogSeq) -> Result<AtomicUpdate, super::Error> {
match self {
Store::Redb(store) => store.start_atomic_update(log_seq).await,
Store::Custom(store) => store.lock().await.start_atomic_update(log_seq).await,
}
}
}
78 changes: 39 additions & 39 deletions balius-runtime/src/store.rs → balius-runtime/src/store/redb.rs
9E12
Original file line number Diff line number Diff line change
@@ -1,21 +1,12 @@
use itertools::Itertools;
use prost::Message;
use redb::{ReadableTable as _, TableDefinition, WriteTransaction};
use std::{path::Path, sync::Arc};
use tracing::warn;

use crate::{Block, ChainPoint, Error};

pub type WorkerId = String;
pub type LogSeq = u64;

#[derive(Message)]
pub struct LogEntry {
#[prost(bytes, tag = "1")]
pub next_block: Vec<u8>,
#[prost(bytes, repeated, tag = "2")]
pub undo_blocks: Vec<Vec<u8>>,
}
use super::StoreTrait;
pub use super::{AtomicUpdateTrait, LogEntry, LogSeq, WorkerId};

impl redb::Value for LogEntry {
type SelfType<'a>
Expand Down Expand Up @@ -58,20 +49,40 @@ const WAL: TableDefinition<LogSeq, LogEntry> = TableDefinition::new("wal");
const DEFAULT_CACHE_SIZE_MB: usize = 50;

pub struct AtomicUpdate {
wx: WriteTransaction,
wx: Option<WriteTransaction>,
log_seq: LogSeq,
}

impl AtomicUpdate {
pub fn update_worker_cursor(&mut self, id: &str) -> Result<(), super::Error> {
let mut table = self.wx.open_table(CURSORS)?;
pub fn new(wx: WriteTransaction, log_seq: LogSeq) -> Self {
Self {
wx: Some(wx),
log_seq,
}
}
}

#[async_trait::async_trait]
impl AtomicUpdateTrait for AtomicUpdate {
async fn update_worker_cursor(&mut self, id: &str) -> Result<(), super::Error> {
let Some(wx) = self.wx.as_mut() else {
return Err(super::Error::Store(
"Transaction already commited".to_string(),
));
};

let mut table = wx.open_table(CURSORS)?;
table.insert(id.to_owned(), self.log_seq)?;

Ok(())
}

pub fn commit(self) -> Result<(), super::Error> {
self.wx.commit()?;
async fn commit(&mut self) -> Result<(), super::Error> {
let Some(wx) = self.wx.take() else {
return Err(super::Error::Store(
"Transaction already commited".to_string(),
));
};
wx.commit()?;
Ok(())
}
}
Expand Down Expand Up @@ -120,15 +131,18 @@ impl Store {
let entry = table.get(seq)?;
Ok(entry.map(|x| x.value()))
}
}

pub fn find_chain_point(&self, seq: LogSeq) -> Result<Option<ChainPoint>, Error> {
#[async_trait::async_trait]
impl StoreTrait for Store {
async fn find_chain_point(&self, seq: LogSeq) -> Result<Option<ChainPoint>, Error> {
let entry = self.get_entry(seq)?;
let block = Block::from_bytes(&entry.unwrap().next_block);

Ok(Some(block.chain_point()))
}

pub fn write_ahead(
async fn write_ahead(
&mut self,
undo_blocks: &[Block],
next_block: &Block,
Expand All @@ -151,7 +165,7 @@ impl Store {
}

// TODO: see if loading in batch is worth it
pub fn get_worker_cursor(&self, id: &str) -> Result<Option<LogSeq>, super::Error> {
async fn get_worker_cursor(&self, id: &str) -> Result<Option<LogSeq>, super::Error> {
let rx = self.db.begin_read()?;

let table = match rx.open_table(CURSORS) {
Expand All @@ -164,25 +178,11 @@ impl Store {
Ok(cursor.map(|x| x.value()))
}

pub fn start_atomic_update(&self, log_seq: LogSeq) -> Result<AtomicUpdate, super::Error> {
async fn start_atomic_update(
&self,
log_seq: LogSeq,
) -> Result<super::AtomicUpdate, super::Error> {
let wx = self.db.begin_write()?;
Ok(AtomicUpdate { wx, log_seq })
}

// TODO: I don't think we need this since we're going to load each cursor as
// part of the loaded worker
pub fn lowest_cursor(&self) -> Result<Option<LogSeq>, super::Error> {
let rx = self.db.begin_read()?;

let table = rx.open_table(CURSORS)?;

let cursors: Vec<_> = table
.iter()?
.map_ok(|(_, value)| value.value())
.try_collect()?;

let lowest = cursors.iter().fold(None, |all, item| all.min(Some(*item)));

Ok(lowest)
Ok(super::AtomicUpdate::Redb(AtomicUpdate::new(wx, log_seq)))
}
}
6 changes: 3 additions & 3 deletions balius-runtime/tests/e2e.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::{
process::Command,
};

use balius_runtime::{ledgers, Runtime, Store};
use balius_runtime::{ledgers, store::redb::Store as RedbStore, Runtime, Store};
use serde_json::json;
use wit_component::ComponentEncoder;

Expand Down Expand Up @@ -41,9 +41,9 @@ fn build_module(src_dir: impl AsRef<Path>, module_name: &str, target: impl AsRef
async fn faucet_claim() {
build_module("../examples/minter/offchain", "minter", "tests/faucet.wasm");

let store = Store::open("tests/balius.db", None).unwrap();
let store = Store::Redb(RedbStore::open("tests/balius.db", None).unwrap());

let mut runtime = Runtime::builder(store)
let runtime = Runtime::builder(store)
.with_ledger(ledgers::mock::Ledger.into())
.build()
.unwrap();
Expand Down
10 changes: 6 additions & 4 deletions balius/src/bin/command/test.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{collections::HashMap, path::PathBuf};

use balius_runtime::{ledgers, Runtime, Store};
use balius_runtime::{ledgers, store::redb::Store as RedbStore, Runtime, Store};
use miette::{Context as _, IntoDiagnostic as _};
use tokio_util::sync::CancellationToken;
use tracing::{info, warn, Level};
Expand Down Expand Up @@ -90,9 +90,11 @@ async fn run_project_with_config(
setup_tracing()?;

info!("Running Balius project on daemon...");
let store: Store = Store::open("baliusd.db", None)
.into_diagnostic()
.context("opening store")?;
let store = Store::Redb(
RedbStore::open("baliusd.db", None)
.into_diagnostic()
.context("opening store")?,
);

let config = ledgers::u5c::Config {
endpoint_url: utxo_url.clone(),
Expand Down
Loading
Loading
0