How to create strongly typed langchain Runnables and pass data through several steps?

59 Views Asked by At

The latest langchain LCEL enable us to create Runnables. Runnable abstraction can be used for a lot of things, even outside of chains or prompts.

In my scenario, as part of the chain pipeline, the first steps are not LLM or Prompts.

I want that every step (Runnable) receives a strongly typed input named ElementSelectionContext, but they can output different data. As part of the pipeline, between every step, I need to update the ElementSelectionContext fields with the output of the previous step, and pass it to the next step.

Enough talking, here's the code attempt

class ElementSelectionContext(BaseModel):
    element_name: str = Field(frozen=True)
    objective: str = Field(frozen=True)

    page: Optional[Analysis] = None

class ElementSelectionPipeline(RunnableSerializable[ElementSelectionContext, Optional[Element]]):
    def invoke(self, input: ElementSelectionContext, config: RunnableConfig | None = None) -> Optional[InterestingElement]:
        page = Page()
        exact_match = ExactMatch()
        pipeline = (
            page
            | {"page": RunnablePassthrough()} # WHAT do I add here to update the context and pass it through the next step??
            | exact_match
        )
        return pipeline.invoke(input)

class Page(RunnableSerializable[ElementSelectionContext,Analysis]):
    def invoke(self, input: ElementSelectionContext, config: RunnableConfig | None = None) -> Analysis:
        current_dir = os.path.dirname(os.path.abspath(__file__))
        file_path = os.path.join(current_dir, 'sample.json')

        with open(file_path, 'r', encoding="utf-8") as file:
            data = json.load(file)
            response = Analysis(**data)
            return response

class ExactMatch(RunnableSerializable[Analysis, Optional[Element]]):
    def invoke(self, input: ElementSelectionContext, config: RunnableConfig | None = None) -> Optional[Element]:
        #print(input)
        # ERROR in the next line since the 'input' is a dictionary, not 'ElementSelectionContext'
        first_value = next(iter(input.page.map.values()))
        return first_value

# USAGE EXAMPLE
pipeline = ElementSelectionPipeline()
response = pipeline.invoke(ElementSelectionContext(element_name="joba", objective="joba"))
  1. In the ExactMatch the input is a dict and not ElementSelectionContext.
  2. In the ElementSelectionPipeline I do not know how to update the ElementSelectionContext instance and pass it through the ExactMatch.

Any insights?

0

There are 0 best solutions below