8000 etl: introduce new etl init spec YAML format · NVIDIA/aistore@312af7b · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Commit 312af7b

Browse files
committed
etl: introduce new etl init spec YAML format
- command is always optional - enforce all images to have a default command - environment variables are specific to the chosen transformer, and should be validated by the transformer itself during initialization. Echo Transformer: ``` name: echo runtime: image: aistorage/transformer_echo:latest # command is optional, default to `uvicorn` for FastAPI ``` Transformer that requires environment vars: ``` name: hash-with-args runtime: image: aistorage/transformer_hash_with_args:latest # command is also optional env: - name: seed value: "1234" ``` Python Runtime: ``` name: user-defined-transform runtime: image: aistorage/runtime_python:3.13 env: - name: SERVER value: PICKLED_ETL_SERVER_INSTANCE - name: SERVER_TYPE value: FastAPI - name: DEPENDENCIES value: PARSED_REQUIREMENTS.TXT # default command: ./bootstrap.sh ``` ``` echo ${SERVER} > /server.py echo ${DEPENDENCIES} > /requirements.txt pip install --target='/runtime' -r /requirements.txt export PYTHONPATH=/runtime case $SERVER_TYPE in FastAPIServer) exec uvicorn fastapi_server:app --host 0.0.0.0 --port 8000 --workers 4 --no-access-log ;; FlaskServer) exec gunicorn flask_server:app --bind 0.0.0.0:8000 --workers 4 ;; *) exec python code.py ;; esac ``` TODOs: * replace `InitCode` with a special case of this new format. * unit/integration tests against this new `ETLSpec` format. Signed-off-by: Tony Chen <a122774007@gmail.com>
1 parent acc8c9f commit 312af7b

File tree

12 files changed

+181
-67
lines changed

12 files changed

+181
-67
lines changed

ais/test/etl_test.go

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -340,6 +340,7 @@ func TestETLInline(t *testing.T) {
340340

341341
tests = []testObjConfig{
342342
{transformer: tetl.MD5, comm: etl.Hpush},
343+
{transformer: tetl.MD5ETLSpec, comm: etl.Hpush},
343344
}
344345
)
345346

@@ -431,12 +432,15 @@ func TestETLInlineObjWithArgs(t *testing.T) {
431432
transformer = tetl.HashWithArgs
432433

433434
tests = []struct {
434-
name string
435-
commType string
436-
onlyLong bool
435+
name string
436+
commType string
437+
transformer string
438+
onlyLong bool
437439
}{
438-
{name: "etl-args-hpush", commType: etl.Hpush},
439-
{name: "etl-args-hpull", commType: etl.Hpull},
440+
{name: "etl-args-hpush", commType: etl.Hpush, transformer: tetl.HashWithArgs},
441+
{name: "etl-args-hpush", commType: etl.Hpush, transformer: tetl.HashWithArgsETLSpec},
442+
{name: "etl-args-hpull", commType: etl.Hpull, transformer: tetl.HashWithArgs},
443+
{name: "etl-args-hpull", commType: etl.Hpull, transformer: tetl.HashWithArgsETLSpec},
440444
}
441445
)
442446

@@ -538,9 +542,10 @@ func TestETLAnyToAnyBucket(t *testing.T) {
538542
bcktests = []testBucketConfig{{false, false, false}}
539543

540544
tests = []testObjConfig{
541-
{transformer: tetl.Echo, comm: etl.WebSocket, onlyLong: true}, // TODO -- FIXME: re-enable this test after echo transformer updated
545+
{transformer: tetl.Echo, comm: etl.WebSocket, onlyLong: true},
542546
{transformer: tetl.Echo, comm: etl.Hpull},
543547
{transformer: tetl.Echo, comm: etl.Hpush},
548+
{transformer: tetl.EchoETLSpec, comm: etl.Hpush},
544549
}
545550
)
546551

ais/tgtetl.go

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -188,20 +188,13 @@ func (t *target) inlineETL(w http.ResponseWriter, r *http.Request, dpq *dpq, lom
188188
// NOTE:
189189
// - poll for a while here for a possible abort error (e.g., pod runtime error)
190190
// - and notice hardcoded timeout
191-
if abortErr := xetl.AbortedAfter(etl.DefaultObjTimeout); abortErr != nil {
191+
if abortErr := xetl.AbortedAfter(etl.DefaultAbortTimeout); abortErr != nil {
192192
t.writeErr(w, r, abortErr, ecode)
193193
return
194194
}
195195

196-
ectx := &cmn.ETLErrCtx{
197-
ETLName: dpq.etl.name,
198-
ETLTransformArgs: dpq.etl.targs,
199-
PodName: comm.PodName(),
200-
SvcName: comm.SvcName(),
201-
}
202-
errV := cmn.NewErrETL(ectx, err.Error(), ecode)
203-
xetl.AddErr(errV)
204-
t.writeErr(w, r, errV, ecode)
196+
xetl.AddErr(err)
197+
t.writeErr(w, r, err, ecode)
205198
}
206199

207200
func (t *target) logsETL(w http.ResponseWriter, r *http.Request, etlName string) {

cmd/cli/cli/const.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1102,9 +1102,8 @@ var (
11021102
// ETL
11031103
etlExtFlag = cli.StringFlag{Name: "ext", Usage: "Mapping from old to new extensions of transformed objects' names"}
11041104
etlNameFlag = cli.StringFlag{
1105-
Name: "name",
1106-
Usage: "unique ETL name (leaving this field empty will have unique ID auto-generated)",
1107-
Required: true,
1105+
Name: "name",
1106+
Usage: "unique ETL name (leaving this field empty will have unique ID auto-generated)",
11081107
}
11091108
etlObjectRequestTimeout = DurationFlag{
11101109
Name: "object-timeout",

cmn/k8s/util.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@ const (
2020
func ValidateEtlName(name string) error {
2121
const prefix = "ETL name %q "
2222
l := len(name)
23+
if l == 0 {
24+
return fmt.Errorf(prefix+"is empty", name)
25+
}
2326
if l < shortNameETL {
2427
return fmt.Errorf(prefix+"is too short", name)
2528
}

ext/etl/api.go

Lines changed: 103 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"strings"
1313
"time"
1414

15+
"github.com/NVIDIA/aistore/api/apc"
1516
"github.com/NVIDIA/aistore/cmn"
1617
"github.com/NVIDIA/aistore/cmn/cos"
1718
"github.com/NVIDIA/aistore/cmn/feat"
@@ -20,6 +21,8 @@ import (
2021

2122
jsoniter "github.com/json-iterator/go"
2223
corev1 "k8s.io/api/core/v1"
24+
"k8s.io/apimachinery/pkg/util/intstr"
25+
"k8s.io/apimachinery/pkg/util/yaml"
2326
"k8s.io/client-go/kubernetes/scheme"
2427
)
2528

@@ -44,8 +47,10 @@ const (
4447
)
4548

4649
const (
47-
DefaultInitTimeout = 45 * time.Second
48-
DefaultObjTimeout = 10 * time.Second
50+
DefaultInitTimeout = 45 * time.Second
51+
DefaultObjTimeout = 10 * time.Second
52+
DefaultAbortTimeout = 2 * time.Second
53+
DefaultContainerPort = 8000
4954
)
5055

5156
// enum ETL lifecycle status (see docs/etl.md#etl-pod-lifecycle for details)
@@ -94,18 +99,30 @@ type (
9499

95100
// and implementations
96101
InitMsgBase struct {
97-
EtlName string `json:"id"`
98-
CommTypeX string `json:"communication"` // enum commTypes
99-
ArgTypeX string `json:"argument"` // enum argTypes
100-
InitTimeout cos.Duration `json:"init_timeout,omitempty"`
101-
ObjTimeout cos.Duration `json:"obj_timeout,omitempty"`
102-
SupportDirectPut bool `json:"support_direct_put,omitempty"`
102+
EtlName string `json:"name" yaml:"name"`
103+
CommTypeX string `json:"communication" yaml:"communication"`
104+
ArgTypeX string `json:"argument" yaml:"argument"`
105+
InitTimeout cos.Duration `json:"init_timeout,omitempty" yaml:"init_timeout,omitempty"`
106+
ObjTimeout cos.Duration `json:"obj_timeout,omitempty" yaml:"obj_timeout,omitempty"`
107+
SupportDirectPut bool `json:"support_direct_put,omitempty" yaml:"support_direct_put,omitempty"`
103108
}
104109
InitSpecMsg struct {
105110
Spec []byte `json:"spec"`
106111
InitMsgBase
107112
}
108113

114+
// ETLSpec is a YAML representation of the ETL pod spec.
115+
ETLSpec struct {
116+
InitMsgBase // included all optional fields from InitMsgBase
117+
Runtime RuntimeSpec `json:"runtime" yaml:"runtime"`
118+
}
119+
120+
RuntimeSpec struct {
121+
Image string `json:"image" yaml:"image"`
122+
Command []string `json:"command,omitempty" yaml:"command,omitempty"`
123+
Env []corev1.EnvVar `json:"env,omitempty" yaml:"env,omitempty"`
124+
}
125+
109126
// ========================================================================================
110127
// InitCodeMsg carries the name of the transforming function;
111128
// the `Transform` function is mandatory and cannot be "" (empty) - it _will_ be called
@@ -280,6 +297,11 @@ func (m *InitMsgBase) validate(detail string) error {
280297
"and that your ETL server properly implements the direct put mechanism")
281298
return cmn.NewErrUnsuppErr(err)
282299
}
300+
301+
if !strings.HasSuffix(m.CommTypeX, CommTypeSeparator) {
302+
m.CommTypeX += CommTypeSeparator
303+
}
304+
283305
// NOTE: default timeout
284306
if m.InitTimeout == 0 {
285307
m.InitTimeout = cos.Duration(DefaultInitTimeout)
@@ -319,9 +341,9 @@ func (m *InitSpecMsg) Validate() error {
319341
errCtx := &cmn.ETLErrCtx{ETLName: m.Name()}
320342

321343
// Check pod specification constraints.
322-
pod, err := ParsePodSpec(errCtx, m.Spec)
344+
pod, err := m.ParsePodSpec()
323345
if err != nil {
324-
return err
346+
return cmn.NewErrETLf(errCtx, "failed to parse pod spec: %v\n%q", err, string(m.Spec))
325347
}
326348
if len(pod.Spec.Containers) != 1 {
327349
return cmn.NewErrETLf(errCtx, "unsupported number of containers (%d), expected: 1", len(pod.Spec.Containers))
@@ -353,37 +375,93 @@ func (m *InitSpecMsg) Validate() error {
353375
return cmn.NewErrETLf(errCtx, "readinessProbe port must be the %q port", k8s.Default)
354376
}
355377

356-
if m.CommTypeX == "" {
357-
comm, found := pod.ObjectMeta.Annotations[CommTypeAnnotation]
358-
if !found {
359-
return cmn.NewErrETLf(errCtx, "annotations.communication_type must be provided, or specified in the init message")
360-
}
361-
m.CommTypeX = comm
362-
}
363-
364378
if dp, found := pod.ObjectMeta.Annotations[SupportDirectPutAnnotation]; found {
365379
m.SupportDirectPut, err = cos.ParseBool(dp)
366380
if err != nil {
367381
return err
368382
}
369383
}
370384

371-
if !strings.HasSuffix(m.CommTypeX, CommTypeSeparator) {
372-
m.CommTypeX += CommTypeSeparator
385+
return m.InitMsgBase.validate(m.String())
386+
}
387+
388+
func (s *ETLSpec) Validate() error {
389+
errCtx := &cmn.ETLErrCtx{ETLName: s.Name()}
390+
if s.Runtime.Image == "" {
391+
return cmn.NewErrETLf(errCtx, "runtime.image must be specified")
373392
}
393+
return nil
394+
}
374395

375-
return m.InitMsgBase.validate(m.String())
396+
// ParsePodSpec parses `m.Spec` into a Kubernetes Pod object.
397+
// First, it tries to parse as an ETLSpec. If that fails, it falls back to a full Pod spec.
398+
func (m *InitSpecMsg) ParsePodSpec() (*corev1.Pod, error) {
399+
if pod, err := m.parseAsETLSpec(); err == nil {
400+
return pod, nil
401+
}
402+
return m.parseAsFullPodSpec()
403+
}
404+
405+
func (m *InitSpecMsg) parseAsETLSpec() (*corev1.Pod, error) {
406+
var etlSpec ETLSpec
407+
if err := yaml.Unmarshal(m.Spec, &etlSpec); err != nil {
408+
return nil, err
409+
}
410+
if err := etlSpec.Validate(); err != nil {
411+
return nil, err
412+
}
413+
pod := &corev1.Pod{
414+
Spec: corev1.PodSpec{
415+
Containers: []corev1.Container{{
416+
Name: etlSpec.EtlName,
417+
Image: etlSpec.Runtime.Image,
418+
ImagePullPolicy: corev1.PullAlways,
419+
Ports: []corev1.ContainerPort{{Name: k8s.Default, ContainerPort: DefaultContainerPort}},
420+
ReadinessProbe: &corev1.Probe{
421+
ProbeHandler: corev1.ProbeHandler{
422+
HTTPGet: &corev1.HTTPGetAction{
423+
Path: "/" + apc.ETLHealth,
424+
Port: intstr.FromString(k8s.Default),
425+
},
426+
},
427+
},
428+
Command: etlSpec.Runtime.Command,
429+
Env: etlSpec.Runtime.Env,
430+
}},
431+
},
432+
}
433+
434+
// Override optional InitMsg fields if present in etlSpec
435+
if etlSpec.EtlName != "" {
436+
m.EtlName = etlSpec.EtlName
437+
}
438+
if commType := etlSpec.CommType(); commType != "" {
439+
m.CommTypeX = commType
440+
}
441+
if argType := etlSpec.ArgType(); argType != "" {
442+
m.ArgTypeX = argType
443+
}
444+
if etlSpec.IsDirectPut() {
445+
m.SupportDirectPut = true
446+
}
447+
if etlSpec.InitTimeout != 0 {
448+
m.InitTimeout = etlSpec.InitTimeout
449+
}
450+
if etlSpec.ObjTimeout != 0 {
451+
m.ObjTimeout = etlSpec.ObjTimeout
452+
}
453+
return pod, nil
376454
}
377455

378-
func ParsePodSpec(errCtx *cmn.ETLErrCtx, spec []byte) (*corev1.Pod, error) {
379-
obj, _, err := scheme.Codecs.UniversalDeserializer().Decode(spec, nil, nil)
456+
func (m *InitSpecMsg) parseAsFullPodSpec() (*corev1.Pod, error) {
457+
obj, _, err := scheme.Codecs.UniversalDeserializer().Decode(m.Spec, nil, nil)
380458
if err != nil {
381-
return nil, cmn.NewErrETLf(errCtx, "failed to parse pod spec: %v\n%q", err, string(spec))
459+
return nil, err
382460
}
383461
pod, ok := obj.(*corev1.Pod)
384462
if !ok {
385463
kind := obj.GetObjectKind().GroupVersionKind().Kind
386-
return nil, cmn.NewErrETL(errCtx, "expected pod spec, got: "+kind)
464+
return nil, errors.New("expected pod spec, got: " + kind)
387465
}
388466
return pod, nil
389467
}

ext/etl/boot.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,8 @@ type etlBootstrapper struct {
4646
}
4747

4848
func (b *etlBootstrapper) createPodSpec() (err error) {
49-
if b.pod, err = ParsePodSpec(b.errCtx, b.msg.Spec); err != nil {
50-
return
49+
if b.pod, err = b.msg.ParsePodSpec(); err != nil {
50+
return cmn.NewErrETLf(b.errCtx, "failed to parse pod spec: %v\n%q", err, string(b.msg.Spec))
5151
}
5252
b.originalPodName = b.pod.GetName()
5353
b.errCtx.ETLName = b.originalPodName
@@ -60,6 +60,7 @@ func (b *etlBootstrapper) _prepSpec() (err error) {
6060
b.pod.SetName(k8s.CleanName(b.msg.Name() + "-" + core.T.SID()))
6161
b.errCtx.PodName = b.pod.GetName()
6262
b.pod.APIVersion = "v1"
63+
b.pod.Kind = "Pod"
6364

6465
// The following combination of Affinity and Anti-Affinity provides for:
6566
// 1. The ETL container is always scheduled on the target invoking it.

ext/etl/communicator.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,6 @@ func doWithTimeout(reqArgs *cmn.HreqArgs, getBody getBodyFunc, timeout time.Dura
200200
ecode, err = cmn.NetworkCallWithRetry(&cmn.RetryArgs{
201201
Call: rtyr.call,
202202
SoftErr: 10,
203-
HardErr: 2,
204203
Verbosity: cmn.RetryLogVerbose,
205204
Sleep: max(cmn.Rom.MaxKeepalive(), time.Second*5),
206205
})

ext/etl/transform.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -277,6 +277,9 @@ func start(msg *InitSpecMsg, xid, secret string, opts StartOpts, config *cmn.Con
277277
pw = newPodWatcher(podName, boot)
278278
comm = newCommunicator(newAborter(msg.Name()), boot, pw)
279279

280+
if comm == nil {
281+
return podName, svcName, nil, err
282+
}
280283
if err := mgr.add(msg.Name(), comm); err != nil {
281284
return podName, svcName, nil, err
282285
}

python/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ We structure this changelog in accordance with [Keep a Changelog](https://keepac
1313
- Improve `ObjectFileReader` logging to include the full exception details and traceback when retrying and resuming.
1414
- Update `ObjectFileReader` resume logic to accommodate for limitation w/ the `Streaming-Cold-GET` and read range as to not cause a timeout.
1515
- If the remote object is not cached, resuming via stream is not possible and reading must restart from the beginning.
16+
- Replace `id` fields with `name` in `InitSpecETLArgs` and `InitCodeETLArgs` types to align with updated ETL init API spec.
1617

1718
## [1.13.8] - 2025-05-15
1819

python/aistore/sdk/types.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -316,7 +316,7 @@ class ETLDetails(BaseModel):
316316
Represents the API response of queries on single ETL details
317317
"""
318318

319-
id: str
319+
name: str
320320
communication: str
321321
init_timeout: Optional[str]
322322
obj_timeout: Optional[str]
@@ -361,7 +361,7 @@ class InitSpecETLArgs(InitETLArgs):
361361

362362
def as_dict(self):
363363
return {
364-
"id": self.etl_name,
364+
"name": self.etl_name,
365365
"init_timeout": self.init_timeout,
366366
"obj_timeout": self.obj_timeout,
367367
"communication": f"{self.communication_type}://",
@@ -383,7 +383,7 @@ class InitCodeETLArgs(InitETLArgs):
383383

384384
def as_dict(self):
385385
dict_rep = {
386-
"id": self.etl_name,
386+
"name": self.etl_name,
387387
"runtime": self.runtime,
388388
"communication": f"{self.communication_type}://",
389389
"init_timeout": self.init_timeout,

python/tests/unit/sdk/test_etl.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ def init_spec_exec_assert(self, expected_action, **kwargs):
6767
expected_action["spec"] = base64.b64encode(
6868
template.encode(UTF_ENCODING)
6969
).decode(UTF_ENCODING)
70-
expected_action["id"] = self.etl_name
70+
expected_action["name"] = self.etl_name
7171

7272
expected_response_text = self.etl_name
7373
mock_response = Mock()
@@ -181,7 +181,7 @@ def encode_fn(preimported_modules, func, comm_type):
181181
return base64.b64encode(template).decode(UTF_ENCODING)
182182

183183
def init_code_exec_assert(self, expected_action, **kwargs):
184-
expected_action["id"] = self.etl_name
184+
expected_action["name"] = self.etl_name
185185

186186
expected_response_text = "response text"
187187
mock_response = Mock()

0 commit comments

Comments
 (0)
0