The latest langchain LCEL enable us to create Runnables. Runnable abstraction can be used for a lot of things, even outside of chains or prompts.
In my scenario, as part of the chain pipeline, the first steps are not LLM or Prompts.
I want that every step (Runnable) receives a strongly typed input named ElementSelectionContext, but they can output different data. As part of the pipeline, between every step, I need to update the ElementSelectionContext fields with the output of the previous step, and pass it to the next step.
Enough talking, here's the code attempt
class ElementSelectionContext(BaseModel):
element_name: str = Field(frozen=True)
objective: str = Field(frozen=True)
page: Optional[Analysis] = None
class ElementSelectionPipeline(RunnableSerializable[ElementSelectionContext, Optional[Element]]):
def invoke(self, input: ElementSelectionContext, config: RunnableConfig | None = None) -> Optional[InterestingElement]:
page = Page()
exact_match = ExactMatch()
pipeline = (
page
| {"page": RunnablePassthrough()} # WHAT do I add here to update the context and pass it through the next step??
| exact_match
)
return pipeline.invoke(input)
class Page(RunnableSerializable[ElementSelectionContext,Analysis]):
def invoke(self, input: ElementSelectionContext, config: RunnableConfig | None = None) -> Analysis:
current_dir = os.path.dirname(os.path.abspath(__file__))
file_path = os.path.join(current_dir, 'sample.json')
with open(file_path, 'r', encoding="utf-8") as file:
data = json.load(file)
response = Analysis(**data)
return response
class ExactMatch(RunnableSerializable[Analysis, Optional[Element]]):
def invoke(self, input: ElementSelectionContext, config: RunnableConfig | None = None) -> Optional[Element]:
#print(input)
# ERROR in the next line since the 'input' is a dictionary, not 'ElementSelectionContext'
first_value = next(iter(input.page.map.values()))
return first_value
# USAGE EXAMPLE
pipeline = ElementSelectionPipeline()
response = pipeline.invoke(ElementSelectionContext(element_name="joba", objective="joba"))
- In the
ExactMatchtheinputis adictand notElementSelectionContext. - In the
ElementSelectionPipelineI do not know how to update theElementSelectionContextinstance and pass it through theExactMatch.
Any insights?