This repository was archived by the owner on Nov 28, 2023. It is now read-only.
This repository was archived by the owner on Nov 28, 2023. It is now read-only.
Closed
Description
Pipeline gets stuck unexpectedly.
Error message:
DEBUG:canals.pipeline.pipeline:> Queue at step 6: {'input_switch': []}
DEBUG:canals.pipeline.pipeline:Component 'input_switch' is waiting. All mandatory inputs received, some optional are not skipped: {'input_1'}
Traceback (most recent call last):
File "/home/mp/deepset/dev/haystack/debug/loops_v2.py", line 84, in <module>
result = pipeline.run({
^^^^^^^^^^^^^^
File "/home/mp/deepset/dev/haystack/venv/lib/python3.11/site-packages/canals/pipeline/pipeline.py", line 459, in run
raise PipelineRuntimeError(
canals.errors.PipelineRuntimeError: 'input_switch' is stuck waiting for input, but there are no other components to run. This is likely a Canals bug. Open an issue at https://github.com/deepset-ai/canals.
Process finished with exit code 1
Script to reproduce:
import os
from haystack.preview import Pipeline, Document
from haystack.preview.document_stores import MemoryDocumentStore
from haystack.preview.components.retrievers import MemoryBM25Retriever
from haystack.preview.components.generators.openai.gpt35 import GPT35Generator
from haystack.preview.components.builders.answer_builder import AnswerBuilder
from haystack.preview.components.builders.prompt_builder import PromptBuilder
import random
from haystack.preview import component
from typing import Optional, List
import logging
logging.basicConfig()
logging.getLogger().setLevel(logging.DEBUG)
@component
class OutputParser():
# Dummy parser
@component.output_types(valid=List[str], invalid=Optional[List[str]])
def run(
self,
replies: List[str]):
if random.randint(0,100) > 70:
#raise Exception("Faile to parse input: the field xyz is missing")
print("Faile to parse input: the field xyz is missing")
return {"valid": None, "invalid": replies}
else:
print("Successfully parsed input")
return {"valid": replies, "invalid": None}
@component
class InputSwitch():
@component.output_types(output=str)
def run(self, input_1: Optional[List[str]]=None, input_2: Optional[List[str]]=None):
if input_1 is not None:
return {"output": input_1}
else:
return {"output": input_2}
query = ("Create a json file of the 3 biggest cities in the wolrld with the following fields: name, country, and population. "
"None of the fields must be empty.")
prompt_template = """
{{query}}
{% if replies %}
We already got the following output: {{replies}}
However, this doesn't comply with the format requirements from above. Please correct the output and try again.
{% endif %}
"""
pipeline = Pipeline()
pipeline.add_component(instance=PromptBuilder(template=prompt_template), name="prompt_builder")
pipeline.add_component(instance=GPT35Generator(api_key=os.environ.get("OPENAI_API_KEY")), name="llm")
pipeline.add_component(instance=OutputParser(), name="output_parser")
pipeline.add_component(instance=InputSwitch(), name="input_switch")
pipeline.connect("prompt_builder", "llm")
pipeline.connect("input_switch.output", "prompt_builder.replies")
pipeline.connect("llm", "output_parser")
pipeline.connect("output_parser.invalid", "input_switch.input_2")
## Run the Pipeline
result = pipeline.run({
"prompt_builder": {"query": query},
"input_switch": {"input_1": []},
})
print(result)
Metadata
Metadata
Assignees
Labels
No labels