8000 Implemented Fork by vigoo · Pull Request #1578 · golemcloud/golem · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Implemented Fork #1578

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions golem-test-framework/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ tonic = { workspace = true }
tracing = { workspace = true }
url = { workspace = true }
uuid = { workspace = true }
wasm-metadata = { version = "0.227.1" }

[dev-dependencies]
test-r = { workspace = true }
Expand Down
19 changes: 15 additions & 4 deletions golem-test-framework/src/config/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Duration;
use tempfile::TempDir;
use tracing::{Instrument, Level};

use crate::components::component_compilation_service::docker::DockerComponentCompilationService;
Expand Down Expand Up @@ -87,6 +88,7 @@ pub struct CliTestDependencies {
initial_component_files_service: Arc<InitialComponentFilesService>,
plugin_wasm_files_service: Arc<PluginWasmFilesService>,
component_directory: PathBuf,
component_temp_directory: Arc<TempDir>,
}

#[derive(Parser, Debug, Clone)]
Expand Down Expand Up @@ -455,6 +457,7 @@ impl CliTestDependencies {
initial_component_files_service,
plugin_wasm_files_service,
component_directory: params.component_directory.clone().into(),
component_temp_directory: Arc::new(TempDir::new().unwrap()),
}
}

Expand Down Expand Up @@ -633,6 +636,7 @@ impl CliTestDependencies {
blob_storage,
plugin_wasm_files_service,
initial_component_files_service,
component_temp_directory: Arc::new(TempDir::new().unwrap()),
}
}

Expand Down Expand Up @@ -783,6 +787,7 @@ impl CliTestDependencies {
initial_component_files_service,
plugin_wasm_files_service,
component_directory: Path::new(&params.component_directory).to_path_buf(),
component_temp_directory: Arc::new(TempDir::new().unwrap()),
}
}

Expand Down Expand Up @@ -946,6 +951,7 @@ impl CliTestDependencies {
blob_storage,
plugin_wasm_files_service,
initial_component_files_service,
component_temp_directory: Arc::new(TempDir::new().unwrap()),
}
}

Expand Down Expand Up @@ -1052,6 +1058,7 @@ impl CliTestDependencies {
blob_storage,
plugin_wasm_files_service,
initial_component_files_service,
component_temp_directory: Arc::new(TempDir::new().unwrap()),
}
}
TestMode::Docker {
Expand Down Expand Up @@ -1150,6 +1157,10 @@ impl TestDependencies for CliTestDependencies {
self.redis.clone()
}

fn blob_storage(&self) -> Arc<dyn BlobStorage + Send + Sync + 'static> {
self.blob_storage.clone()
}

fn redis_monitor(&self) -> Arc<dyn RedisMonitor + Send + Sync + 'static> {
self.redis_monitor.clone()
}
Expand All @@ -1162,6 +1173,10 @@ impl TestDependencies for CliTestDependencies {
&self.component_directory
}

fn component_temp_directory(&self) -> &Path {
self.component_temp_directory.path()
}

fn component_service(&self) -> Arc<dyn ComponentService> {
self.component_service.clone()
}
Expand All @@ -1180,10 +1195,6 @@ impl TestDependencies for CliTestDependencies {
self.worker_executor_cluster.clone()
}

fn blob_storage(&self) -> Arc<dyn BlobStorage + Send + Sync + 'static> {
self.blob_storage.clone()
}

fn initial_component_files_service(&self) -> Arc<InitialComponentFilesService> {
self.initial_component_files_service.clone()
}
Expand Down
17 changes: 13 additions & 4 deletions golem-test-framework/src/config/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ use golem_service_base::storage::blob::BlobStorage;
use std::fmt::{Debug, Formatter};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tempfile::TempDir;
use tracing::{Instrument, Level};

pub struct EnvBasedTestDependenciesConfig {
Expand Down Expand Up @@ -172,6 +173,7 @@ pub struct EnvBasedTestDependencies {
blob_storage: Arc<dyn BlobStorage + Send + Sync + 'static>,
initial_component_files_service: Arc<InitialComponentFilesService>,
plugin_wasm_files_service: Arc<PluginWasmFilesService>,
component_temp_directory: Arc<TempDir>,
}

impl Debug for EnvBasedTestDependencies {
Expand Down Expand Up @@ -491,6 +493,9 @@ impl EnvBasedTestDependencies {
blob_storage,
initial_component_files_service,
plugin_wasm_files_service,
component_temp_directory: Arc::new(
TempDir::new().expect("Failed to create temporary directory"),
),
}
}
}
Expand All @@ -505,6 +510,10 @@ impl TestDependencies for EnvBasedTestDependencies {
self.redis.clone()
}

fn blob_storage(&self) -> Arc<dyn BlobStorage + Send + Sync + 'static> {
self.blob_storage.clone()
}

fn redis_monitor(&self) -> Arc<dyn RedisMonitor + Send + Sync + 'static> {
self.redis_monitor.clone()
}
Expand All @@ -517,6 +526,10 @@ impl TestDependencies for EnvBasedTestDependencies {
&self.config.golem_test_components
}

fn component_temp_directory(&self) -> &Path {
self.component_temp_directory.path()
}

fn component_service(&self) -> Arc<dyn ComponentService> {
self.component_service.clone()
}
Expand All @@ -535,10 +548,6 @@ impl TestDependencies for EnvBasedTestDependencies {
self.worker_executor_cluster.clone()
}

fn blob_storage(&self) -> Arc<dyn BlobStorage + Send + Sync + 'static> {
self.blob_storage.clone()
}

fn initial_component_files_service(&self) -> Arc<InitialComponentFilesService> {
self.initial_component_files_service.clone()
}
Expand Down
1 change: 1 addition & 0 deletions golem-test-framework/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ pub trait TestDependencies {
fn redis_monitor(&self) -> Arc<dyn RedisMonitor + Send + Sync + 'static>;
fn shard_manager(&self) -> Arc<dyn ShardManager + Send + Sync + 'static>;
fn component_directory(&self) -> &Path;
fn component_temp_directory(&self) -> &Path;
fn component_service(&self) -> Arc<dyn ComponentService>;
fn component_compilation_service(
&self,
Expand Down
45 changes: 45 additions & 0 deletions golem-test-framework/src/dsl/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,13 @@ use golem_wasm_rpc::{Value, ValueAndType};
use std::collections::{HashMap, HashSet};
use std::path::{Path, PathBuf};
use std::time::{Duration, Instant};
use tempfile::Builder;
use tokio::select;
use tokio::sync::mpsc::UnboundedReceiver;
use tokio::sync::oneshot::Sender;
use tracing::{debug, info, Instrument};
use uuid::Uuid;
use wasm_metadata::AddMetadata;

pub struct StoreComponentBuilder<'a, DSL: TestDsl + ?Sized> {
dsl: &'a DSL,
Expand Down Expand Up @@ -500,6 +502,17 @@ impl<T: TestDependencies + Send + Sync> TestDsl for T {
.map(|(k, v)| (k.to_string(), v.clone())),
);

let source_path = if !unverified {
rename_component_if_needed(
self.component_temp_directory(),
&source_path,
&component_name,
)
.expect("Failed to verify and change component metadata")
} else {
source_path
};

let component = {
if unique {
self.component_service()
Expand Down Expand Up @@ -2526,3 +2539,35 @@ impl<T: TestDsl + Sync> TestDslUnsafe for T {
<T as TestDsl>::cancel_invocation(self, worker_id, idempotency_key).await
}
}

fn rename_component_if_needed(temp_dir: &Path, path: &Path, name: &str) -> anyhow::Result<PathBuf> {
// Check metadata
let source = std::fs::read(path)?;
let metadata = ComponentMetadata::analyse_component(&source)?;
if metadata.root_package_name.is_none() || metadata.root_package_name == Some(name.to_string())
{
info!(
"Name in metadata is {:?}, used component name is {}, using the original WASM: {:?}",
metadata.root_package_name, name, path
);
Ok(path.to_path_buf())
} else {
let new_path = Builder::new().keep(true).tempfile_in(temp_dir)?;
let add_metadata = AddMetadata {
name: Some(name.to_string()),
version: metadata
.root_package_version
.map(|v| wasm_metadata::Version::new(v.to_string())),
..Default::default()
};

info!(
"Name in metadata is {:?}, used component name is {}, using an updated WASM: {:?}",
metadata.root_package_name, name, new_path
);

let updated_wasm = add_metadata.to_wasm(&source)?;
std::fs::write(&new_path, updated_wasm)?;
Ok(new_path.path().to_path_buf())
}
}
61 changes: 60 additions & 1 deletion golem-worker-executor-base/src/durable_host/golem/v1x.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ use crate::model::public_oplog::{
};
use crate::model::InterruptKind;
use crate::preview2::golem_api_1_x;
use crate::preview2::golem_api_1_x::host::{GetWorkers, Host, HostGetWorkers, WorkerAnyFilter};
use crate::preview2::golem_api_1_x::host::{
ForkResult, GetWorkers, Host, HostGetWorkers, WorkerAnyFilter,
};
use crate::preview2::golem_api_1_x::oplog::{
Host as OplogHost, HostGetOplog, HostSearchOplog, SearchOplog,
};
Expand All @@ -30,6 +32,10 @@ use crate::services::{HasOplogService, HasPlugins, HasWorker};
use crate::workerctx::{InvocationManagement, StatusManagement, WorkerCtx};
use anyhow::anyhow;
use async_trait::async_trait;
use bincode::de::Decoder;
use bincode::enc::Encoder;
use bincode::error::{DecodeError, EncodeError};
use bincode::Decode;
use golem_common::model::oplog::{DurableFunctionType, OplogEntry};
use golem_common::model::regions::OplogRegion;
use golem_common::model::{ComponentId, ComponentVersion, OwnedWorkerId, ScanCursor, WorkerId};
Expand Down Expand Up @@ -729,6 +735,39 @@ impl<Ctx: WorkerCtx> Host for DurableWorkerCtx<Ctx> {

Ok(result.map(|w| w.into()))
}

async fn fork(&mut self, new_name: String) -> anyhow::Result<ForkResult> {
let durability = Durability::<ForkResult, SerializableError>::new(
self,
"golem::api",
"fork",
DurableFunctionType::WriteRemote,
)
.await?;

if durability.is_live() {
let target_worker_id = WorkerId {
component_id: self.owned_worker_id.component_id(),
worker_name: new_name.clone(),
};
let oplog_index_cut_off = self.state.current_oplog_index().await.previous();

let fork_result = self
.state
.worker_fork
.fork_and_write_fork_result(
&self.owned_worker_id,
&target_worker_id,
oplog_index_cut_off,
)
.await
.map(|_| ForkResult::Original);

Ok(durability.persist(self, new_name, fork_result).await?)
} else {
durability.replay(self).await
}
}
}

#[async_trait]
Expand Down Expand Up @@ -1200,3 +1239,23 @@ impl GetWorkersEntry {
self.next_cursor = cursor;
}
}

impl bincode::Encode for ForkResult {
fn encode<E: Encoder>(&self, encoder: &mut E) -> Result<(), EncodeError> {
match self {
ForkResult::Original => bincode::Encode::encode(&0u8, encoder),
ForkResult::Forked => bincode::Encode::encode(&1u8, encoder),
}
}
}

impl Decode for ForkResult {
fn decode<D: Decoder>(decoder: &mut D) -> Result<Self, DecodeError> {
let value = <u8 as Decode>::decode(decoder)?;
match value {
0 => Ok(ForkResult::Original),
1 => Ok(ForkResult::Forked),
_ => Err(DecodeError::Other("Invalid ForkResult")),
}
}
}
11 changes: 11 additions & 0 deletions golem-worker-executor-base/src/durable_host/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use crate::services::rpc::Rpc;
use crate::services::scheduler::SchedulerService;
use crate::services::worker::WorkerService;
use crate::services::worker_event::WorkerEventService;
use crate::services::worker_fork::WorkerForkService;
use crate::services::worker_proxy::WorkerProxy;
use crate::services::{worker_enumeration, HasAll, HasConfig, HasOplog, HasWorker};
use crate::services::{HasOplogService, HasPlugins};
Expand Down Expand Up @@ -165,6 +166,7 @@ impl<Ctx: WorkerCtx> DurableWorkerCtx<Ctx> {
execution_status: Arc<RwLock<ExecutionStatus>>,
file_loader: Arc<FileLoader>,
plugins: Arc<dyn Plugins<Ctx::Types>>,
worker_fork: Arc<dyn WorkerForkService + Send + Sync>,
) -> Result<Self, GolemError> {
let temp_dir = Arc::new(tempfile::Builder::new().prefix("golem").tempdir().map_err(
|e| GolemError::runtime(format!("Failed to create temporary directory: {e}")),
Expand Down Expand Up @@ -236,6 +238,7 @@ impl<Ctx: WorkerCtx> DurableWorkerCtx<Ctx> {
last_oplog_index,
component_metadata,
worker_config.total_linear_memory_size,
worker_fork,
)
.await,
_temp_dir: temp_dir,
Expand Down Expand Up @@ -357,6 +360,10 @@ impl<Ctx: WorkerCtx> DurableWorkerCtx<Ctx> {
self.state.component_service.clone()
}

pub fn worker_fork(&self) -> Arc<dyn WorkerForkService + Send + Sync> {
self.state.worker_fork.clone()
}

pub fn scheduler_service(&self) -> Arc<dyn SchedulerService + Send + Sync> {
self.state.scheduler_service.clone()
}
Expand Down Expand Up @@ -2071,6 +2078,8 @@ struct PrivateDurableWorkerState<Ctx: WorkerCtx> {
invocation_context: InvocationContext,
current_span_id: SpanId,
forward_trace_context_headers: bool,

worker_fork: Arc<dyn WorkerForkService + Send + Sync>,
}

impl<Ctx: WorkerCtx> PrivateDurableWorkerState<Ctx> {
Expand All @@ -2096,6 +2105,7 @@ impl<Ctx: WorkerCtx> PrivateDurableWorkerState<Ctx> {
last_oplog_index: OplogIndex,
component_metadata: ComponentMetadata<Ctx::Types>,
total_linear_memory_size: u64,
worker_fork: Arc<dyn WorkerForkService + Send + Sync>,
) -> Self {
let replay_state = ReplayState::new(
owned_worker_id.clone(),
Expand Down Expand Up @@ -2139,6 +2149,7 @@ impl<Ctx: WorkerCtx> PrivateDurableWorkerState<Ctx> {
invocation_context,
current_span_id,
forward_trace_context_headers: true,
worker_fork,
}
}

Expand Down
Loading
0