1
1
import os
2
- from urllib .parse import unquote , quote
2
+ from urllib .parse import unquote , quote , urlparse , urlunparse
3
3
4
4
import requests
5
5
from flask import Flask , request , Response , jsonify
6
6
7
7
from aistore .sdk .etl .webserver .base_etl_server import ETLServer
8
+ from aistore .sdk .const import HEADER_NODE_URL
8
9
9
10
10
11
class FlaskServer (ETLServer ):
@@ -31,9 +32,19 @@ def _health(self):
31
32
32
33
def _handle_request (self , path ):
33
34
try :
35
+ transformed = None
34
36
if request .method == "GET" :
35
- return self ._handle_get (path )
36
- return self ._handle_put (path )
37
+ transformed = self ._handle_get (path )
38
+ else :
39
+ transformed = self ._handle_put (path )
40
+
41
+ direct_put_url = request .headers .get (HEADER_NODE_URL )
42
+ if direct_put_url :
43
+ resp = self ._direct_put (direct_put_url , transformed )
44
+ if resp is not None :
45
+ return resp
46
+
47
+ return Response (response = transformed , content_type = self .get_mime_type ())
37
48
except FileNotFoundError :
38
49
self .logger .error ("File not found: %s" , path )
39
50
return (
@@ -64,17 +75,15 @@ def _handle_get(self, path):
64
75
resp .raise_for_status ()
65
76
content = resp .content
66
77
67
- transformed = self .transform (content , path )
68
- return Response (response = transformed , content_type = self .get_mime_type ())
78
+ return self .transform (content , path )
69
79
70
80
def _handle_put (self , path ):
71
81
if self .arg_type == "fqn" :
72
82
content = self ._get_fqn_content (path )
73
83
else :
74
84
content = request .get_data ()
75
85
76
- transformed = self .transform (content , path )
77
- return Response (response = transformed , content_type = self .get_mime_type ())
86
+ return self .transform (content , path )
78
87
79
88
def _get_fqn_content (self , path : str ) -> bytes :
80
89
decoded_path = unquote (path )
@@ -83,6 +92,40 @@ def _get_fqn_content(self, path: str) -> bytes:
83
92
with open (safe_path , "rb" ) as f :
84
93
return f .read ()
85
94
95
+ def _direct_put (self , direct_put_url : str , data : bytes ):
96
+ """
97
+ Sends the transformed object directly to the specified AIS node (`direct_put_url`),
98
+ eliminating the additional network hop through the original target.
99
+ Used only in bucket-to-bucket offline transforms.
100
+ """
101
+ try :
102
+ parsed_target = urlparse (direct_put_url )
103
+ parsed_host = urlparse (self .host_target )
104
+ url = urlunparse (
105
+ parsed_host ._replace (
106
+ netloc = parsed_target .netloc ,
107
+ path = parsed_host .path + parsed_target .path ,
108
+ )
109
+ )
110
+
111
+ resp = requests .put (url , data , timeout = None )
112
+ if resp .status_code == 200 :
113
+ return Response (status = 204 )
114
+
115
+ error = resp .text ()
116
+ self .logger .warning (
117
+ "Failed to deliver object to %s: HTTP %s, %s" ,
118
+ direct_put_url ,
119
+ resp .status_code ,
120
+ error ,
121
+ )
122
+ except Exception as e :
123
+ self .logger .warning (
124
+ "Exception during delivery to %s: %s" , direct_put_url , e
125
+ )
126
+
127
+ return None
128
+
86
129
# Example Gunicorn command to run this server:
87
130
# command: ["gunicorn", "your_module:flask_app", "--bind", "0.0.0.0:8000", "--workers", "4"]
88
131
def start (self ):
0 commit comments