8000 Log entire stdout and stderr for terminated backend worker process by namannandan · Pull Request #3036 · pytorch/serve · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Log entire stdout and stderr for terminated backend worker process #3036

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Apr 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -306,22 +306,10 @@ private void attachRunner(
argl.add(String.valueOf(1));
}

public synchronized void terminateIOStreams() {
if (errReader != null) {
logger.warn("terminateIOStreams() threadName={}", errReader.getName());
errReader.terminate();
}
if (outReader != null) {
logger.warn("terminateIOStreams() threadName={}", outReader.getName());
outReader.terminate();
}
}

public synchronized void exit() {
if (process != null) {
process.destroyForcibly();
connector.clean();
terminateIOStreams();
}
}

Expand Down Expand Up @@ -373,19 +361,11 @@ public ReaderThread(String name, InputStream is, boolean error, WorkerLifeCycle
this.metricCache = MetricCache.getInstance();
}

public void terminate() {
isRunning.set(false);
}

@Override
public void run() {
try (Scanner scanner = new Scanner(is, StandardCharsets.UTF_8.name())) {
while (isRunning.get() && scanner.hasNext()) {
while (scanner.hasNextLine()) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume that hasNextLine() will return True for the input stream as long as the process is alive even though the stream does not contain new input?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe hasNextLine() will block until the next line is available and return true once a new line of input is available. It will return false once EOF is encountered or underlying stream has been closed.

String result = scanner.nextLine();
if (result == null) {
break;
}

Matcher matcher = METRIC_PATTERN.matcher(result);
if (matcher.matches()) {
logger.info("result={}, pattern={}", result, matcher.group(2));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,6 @@ public void shutdown() {
}
}
backendChannel.clear();
lifeCycle.terminateIOStreams();
Thread thread = currentThread.getAndSet(null);
if (thread != null) {
thread.interrupt();
Expand Down
1 change: 1 addition & 0 deletions requirements/developer.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ pytest==7.3.1
pylint==3.0.3
pytest-mock==3.14.0
pytest-cov==4.1.0
pytest-timeout==2.3.1
grpcio==1.62.1
protobuf==4.25.1
grpcio-tools==1.60.0
Expand Down
193 changes: 193 additions & 0 deletions test/pytest/test_handler_traceback_logging.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
import shutil
from pathlib import Path
from unittest.mock import patch

import pytest
import test_utils
from model_archiver import ModelArchiverConfig

CURR_FILE_PATH = Path(__file__).parent
REPO_ROOT_DIR = CURR_FILE_PATH.parent.parent

MODEL_PY = """
import torch
import torch.nn as nn

class Foo(nn.Module):
def __init__(self):
super().__init__()

def forward(self, x):
return x
"""

HANDLER_PY = """
from typing import List, Dict, Any
from ts.context import Context


class FailingModel(object):
def __init__(self) -> None:
pass

def initialize(self, context: Context) -> None:
# Deliberate bug in handler with nested calls to test traceback logging
self.call1()

def handle(self, data: List[Dict[str, Any]], context: Context):
return None

def call1(self):
self.call2()

def call2(self):
self.call3()

def call3(self):
self.call4()

def call4(self):
self.call5()

def call5(self):
assert False
"""

MODEL_CONFIG_YAML = """
maxRetryTimeoutInSec: 300
"""

CONFIG_PROPERTIES = """
default_response_timeout=120
"""


@pytest.fixture(scope="module")
def model_name():
yield "test_model"


@pytest.fixture(scope="module")
def work_dir(tmp_path_factory, model_name):
return Path(tmp_path_factory.mktemp(model_name))


@pytest.fixture(scope="module")
def torchserve(model_store, work_dir):
test_utils.torchserve_cleanup()

config_properties_file = work_dir / "config.properties&qu 6D40 ot;
config_properties_file.write_text(CONFIG_PROPERTIES)

pipe = test_utils.start_torchserve(
model_store=model_store,
no_config_snapshots=True,
gen_mar=False,
snapshot_file=config_properties_file.as_posix(),
)

yield pipe

test_utils.torchserve_cleanup()


@pytest.fixture(scope="module", name="mar_file_path")
def create_mar_file(work_dir, model_archiver, model_name):
mar_file_path = work_dir.joinpath(model_name + ".mar")

model_py_file = work_dir / "model.py"
model_py_file.write_text(MODEL_PY)

model_config_yaml_file = work_dir / "model_config.yaml"
model_config_yaml_file.write_text(MODEL_CONFIG_YAML)

handler_py_file = work_dir / "handler.py"
handler_py_file.write_text(HANDLER_PY)

config = ModelArchiverConfig(
model_name=model_name,
version="1.0",
serialized_file=None,
model_file=model_py_file.as_posix(),
handler=handler_py_file.as_posix(),
extra_files=None,
export_path=work_dir,
requirements_file=None,
runtime="python",
force=False,
archive_format="default",
config_file=model_config_yaml_file.as_posix(),
)

with patch("archiver.ArgParser.export_model_args_parser", return_value=config):
model_archiver.generate_model_archive()

assert mar_file_path.exists()

yield mar_file_path.as_posix()

# Clean up files
mar_file_path.unlink(missing_ok=True)


@pytest.fixture(scope="module", name="model_name")
def register_model(mar_file_path, model_store, torchserve):
"""
Register the model in torchserve
"""
shutil.copy(mar_file_path, model_store)

file_name = Path(mar_file_path).name

model_name = Path(file_name).stem

params = (
("model_name", model_name),
("url", file_name),
("initial_workers", "1"),
("synchronous", "false"),
("batch_size", "1"),
)

test_utils.reg_resp = test_utils.register_model_with_params(params)

yield model_name, torchserve

test_utils.unregister_model(model_name)


@pytest.mark.timeout(60)
def test_handler_traceback_logging(model_name):
"""
Full circle test with torchserve
"""

model_name, pipe = model_name

traceback = [
"Traceback (most recent call last):",
"line 12, in initialize",
"self.call1()",
"line 18, in call1",
"self.call2()",
"line 21, in call2",
"self.call3()",
"line 24, in call3",
"self.call4()",
"line 27, in call4",
"self.call5()",
"line 30, in call5",
"assert False",
"AssertionError",
]

# Test traceback logging for first attempt and three retries to start worker
for _ in range(4):
logs = []
while True:
logs.append(pipe.get())
if "AssertionError" in logs[-1]:
break

for line in traceback:
assert any(line in log for log in logs)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be:

assert all(line in logs for line in traceback)

any will stop after the first line is found

Copy link
Collaborator Author
@namannandan namannandan Apr 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

line in logs would not work because the logs include other details such as timestamp and log level along with the traceback line which cannot be hardcoded in the test, so I believe the logic would have to be line in log for log in logs.

Agree that any will stop after the first line is found, which is why I loop over each of the traceback lines I expect to find in the captured logs:

        for line in traceback:
            assert any(line in log for log in logs)

Loading
0