Haystack docs home page

Custom Nodes

In Haystack, you can create your own nodes and use them stand-alone or in Pipelines.

Creating a Custom Node

Here's how you do it:

  1. Create a new class that inherits from BaseComponent.
  2. If your node's output will be routed to a fixed number of nodes, set outgoing_edges as a class attribute. Most nodes have one outgoing edge. Decision nodes have more than one outgoing edge. If your node has a variable number of outgoing edges, define CustomNode._calculate_outgoing_edges() to return that number. See FileClassifier._calculate_outgoing_edges() for an example.
  3. Define a run() method that is executed when the Pipeline calls your node. The input arguments should consist of all configuration parameters that you want to allow and the data arguments that you expect as input from a previous node. For example, data parameters can be documents, query, file_paths, and so on.
  4. Set run() to return a tuple. The first element of this tuple is an output dictionary of the data you want to pass to the next node. The second element in the tuple is the name of the outgoing edge (usually output_1).
  5. Define a run_batch() method that makes it possible for query pipelines to accept more than one query as input. You can define the same input arguments for it as you did for the run() method.
  6. Set run_batch() to return a tuple. The first element of the tuple is a dictionary with keys such as Answers or Documents (depending on the node). The second element of the tuple is the name of the outgoing edge (usually output_)
  7. Optional: Add any custom debug information to output["_debug"]. You can then access this information in the pipeline output if you enable the debug mode.

Template:

Here's a template for creating a custom node:

from haystack.nodes.base import BaseComponent
class NodeTemplate(BaseComponent):
# If it's not a decision node, there is only one outgoing edge
outgoing_edges = 1
def run(self, query: str, my_arg: Optional[int] = 10):
# Insert code here to manipulate the input and produce an output dictionary
...
output={
"documents": ...,
"_debug": {"anything": "you want"}
}
return output, "output_1"
def run_batch(self, queries: List[str], my_arg: Optional[int] = 10):
# Insert code here to manipulate the input and produce an output dictionary
...
output={
"documents": ...,
}
return output, "output_1"

A regular node usually has one outgoing edge and one return value. If you're creating a decision node, it typically has more than one outgoing edge. The run() and run_batch() methods of a decision node consist of a decision function that determines the path in the graph for sending the node's input. Such a function has more than one possible return value and each return value is named accordingly, for example: output_1 and output_2.

Example

Let's say that we wanted to add a custom translation module to our pipeline. Instead of just translating into one predefined language, our node should be able to return a summary in any language we want (that is, any language for which we have a trained model). To that end, we define a CustomTranslator class. As there's no decision function involved, we set outgoing_edges = 1:

class CustomTranslator(BaseComponent):
outgoing_edges = 1
def __init__(some_param):
# Store all init params in node's config so that we can easily save and load it via YAML
self.translator = TransformersTranslator(model_name_or_path=f'Helsinki-NLP/opus-mt-en-{language}')

Within a pipeline node, the run() function is where all the action happens. Our run function receives a language argument that tells the translator which translation model to initialize:

def run(self, language='fr', **kwargs):
return self.translator.run(kwargs['documents'])

We initialize this node directly when adding it to the pipeline. As usual, we specify a name and the input for this node:

pipeline.add_node(component=CustomTranslator(), name='CustomTranslator', inputs=['Summarizer'])

We can now call the pipeline with any Helsinki-NLP translation model from Hugging Face with English as a source language. Pipeline arguments are simply propagated through the pipeline. This means that if we want to pass a language value to our custom node, we can specify it in our call to the pipeline. Let's look at the French summary of a popular wizard sport:

query = "What's the history of Quidditch?"
result = pipeline.run(query=query, params={"retriever": {"top_k": 30}, "ranker": {"top_k": 20}, "language": "fr"})
result['documents'][0].text
>>> "''Quidditch'' a obtenu son nom du marais queerditch, l'emplacement du premier jeu enregistré. le jeu a été basé sur un jeu joué par une sorcière au 11ème siècle. un snitch d'or a été introduit à la suite d'un jeu 1269 joué en kent. on pense qu'une version balai du jeu peut avoir inspiré le mouvement du jeu moderne 'harlem shuffle'"

Now, how about Ukrainian?

result = pipeline.run(query=query, params={"retriever": {"top_k": 30}, "ranker": {"top_k": 30}, "language": "uk"})
result['documents'][0].text
>>> '" Quuiditch " отримала свою назву від дивного болота, місця першої в історії записаної гри. Гру було засновано на грі, яку грала відьма у XI столітті. Золотий стукач було введено у гру 1269 гри в кенті. Вважається, що версія мітла у грі, можливо, надихнула сучасну гру на " заплутування " move " гри'