Pipelines#

import param
import panel as pn

pn.extension('katex')

The Param user guide described how to set up classes that declare parameters and link them to some computation or visualization. In this section we will discover how to connect multiple such panels into a Pipeline to express complex multi-page workflows where the output of one stage feeds into the next stage.

To start using a Pipeline, let us declare an empty one by instantiating the class:

pipeline = pn.pipeline.Pipeline()

Having set up a Pipeline it is now possible to start populating it with one or more stages. We have previously seen how a Parameterized class can have parameters controlling some visualization or computation on a method, with that linkage declared using the param.depends decorator. To use such classes as a pipeline stage, the Parameterized class will also need to designate at least one of the methods as an “output”, and it should also provide a visual representation for that pipeline stage.

To declare the output for a stage, decorate one of its methods with param.output. A Pipeline will use this information to determine what outputs are available to be fed into the next stage of the workflow. In the example below, the Stage1 class has two parameters (a and b) and one output (c). The signature of the decorator allows a number of different ways of declaring the outputs:

  • param.output(): Declaring an output without arguments will declare that the method returns an output that will inherit the name of the method and does not make any specific type declarations.

  • param.output(param.Number): Declaring an output with a specific Parameter or Python type also declares an output with the name of the method but declares that the output will be of a specific type.

  • param.output(c=param.Number): Declaring an output using a keyword argument allows overriding the method name as the name of the output and declares the type.

It is also possible to declare multiple outputs, either as keywords or tuples:

  • param.output(c=param.Number, d=param.String) or

  • param.output(('c', param.Number), ('d', param.String))

The example below takes two inputs (a and b) and produces two outputs (c, computed by mutiplying the inputs, and d, computed by raising a to the power b). To use the class as a pipeline stage, we also have to implement a panel method, which returns a Panel object providing a visual representation of the stage. Here we help implement the panel method using an additional view method that returns a LaTeX pane, which will render the equation to LaTeX.

In addition to passing along the outputs, the Pipeline will also pass along the values of any input parameters whose names match input parameters on the next stage (unless inherit_params is set to False).

Let’s start by displaying this stage on its own:

class Stage1(param.Parameterized):

    a = param.Number(default=5, bounds=(0, 10))

    b = param.Number(default=5, bounds=(0, 10))

    ready = param.Boolean(default=False, precedence=-1)

    @param.output(('c', param.Number), ('d', param.Number))
    def output(self):
        return self.a * self.b, self.a ** self.b

    @param.depends('a', 'b')
    def view(self):
        c, d = self.output()
        return pn.pane.LaTeX('${a} * {b} = {c}$\n${a}^{{{b}}} = {d}$'.format(
            a=self.a, b=self.b, c=c, d=d), style={'font-size': '2em'})

    def panel(self):
        return pn.Row(self.param, self.view)

stage1 = Stage1()
stage1.panel()

To summarize, we have followed several conventions when setting up this stage of our Pipeline:

  1. Declare a Parameterized class with some input parameters.

  2. Declare one or more methods decorated with the param.output decorator.

  3. Declare a panel method that returns a view of the object that the Pipeline can render.

Now that the object has been instantiated we can also query it for its outputs:

stage1.param.outputs()
{'c': (<param.Number object at 0x7f09dbb71640>, <bound method Stage1.output of Stage1(a=5, b=5, name='Stage104996', ready=False)>, 0), 'd': (<param.Number object at 0x7f09dbb71700>, <bound method Stage1.output of Stage1(a=5, b=5, name='Stage104996', ready=False)>, 1)}

We can see that Stage1 declares outputs named c and d of type param.Number that can be accessed by calling the output method on the object. Now let us add this stage to our Pipeline using the add_stage method:

pipeline.add_stage('Stage 1', stage1)

The add_stage method takes the name of the stage as its first argument, the stage class or instance as the second parameter, and any additional keyword arguments if you want to override default behavior.

A Pipeline with only a single stage is not much of a Pipeline, of course! So let’s set up a second stage, consuming the output of the first. Recall that Stage1 declares an output named c. If output c from Stage1 should flow to Stage2, the latter should declare a Parameter named c to consume the output of the first stage. Stage2 does not have to consume all parameters, and here we will ignore output d.

In the second stage we will define parameters c and exp. To make sure that we don’t get a widget for c (as it will be set by the previous stage, not the user), we’ll set its precedence to be negative (which tells Panel to skip creating a widget for it). In other respects this class is very similar to the first one; it declares both a view method that depends on the parameters of the class, and a panel method that returns a view of the object.

class Stage2(param.Parameterized):

    c = param.Number(default=5, bounds=(0, None))

    exp = param.Number(default=0.1, bounds=(0, 3))

    @param.depends('c', 'exp')
    def view(self):
        return pn.pane.LaTeX('${%s}^{%s}={%.3f}$' % (self.c, self.exp, self.c**self.exp),
                             style={'font-size': '2em'})

    def panel(self):
        return pn.Row(self.param, self.view)

stage2 = Stage2(c=stage1.output()[0])
stage2.panel()

Now that we have declared the second stage of the pipeline, let us add it to the Pipeline object:

pipeline.add_stage('Stage 2', stage2)

And that’s it; we have now declared a two-stage pipeline, where the output c flows from the first stage into the second stage. To begin with we can print the pipeline to see the stages:

print(pipeline)
Pipeline:
    [0] Stage 1: Stage1()
    [1] Stage 2: Stage2(c=25)

To display the pipeline we simply let it render itself:

pipeline = pn.pipeline.Pipeline(debug=True)
pipeline.add_stage('Stage 1', Stage1())
pipeline.add_stage('Stage 2', Stage2)

pipeline

As you can see the Pipeline renders a little diagram displaying the available stages in the workflow along with previous and next buttons to move between each stage. This allows setting up complex workflows with multiple stages, where each component is a self-contained unit, with minimal declarations about stage outputs (using the param.output decorator) and how to render the stage (by declaring a panel method). Note also when progressing to Stage 2, the c parameter widget is not rendered because its value has been provided by the previous stage.

Above we created the Pipeline as we went along, which makes some sense in a notebook to allow debugging and development of each stage. When deploying the Pipeline as a server app or when there’s no reason to instantiate each stage separately; instead we can declare the stages as part of the constructor:

pipeline = pn.pipeline.Pipeline([('Stage 1', Stage1), ('Stage 2', Stage2)])
pipeline

Pipeline stages may be either Parameterized instances or Parameterized classes. With classes, the class will not be instantiated until that stage is reached, which lets you postpone allocating memory, reading files, querying databases, and other expensive actions that might be in the constructor until the parameters for that stage have been collected from previous stages. You can also use an instantiated Parameterized object instance, e.g. if you want to set some parameters to non-default values before putting the stage into the pipeline, but in that case you will need to ensure that updating the parameters of the instantiated object is sufficient to update the full current state of that stage.

Non-linear pipelines#

Pipelines are not limited to simple linear UI workflows like the ones listed above. They support any arbitrary branching structures, i.e., an acyclic graph. A simple example might be a workflow with two alternative stages that rejoin at a later point. In the very simple example below we declare four stages: an Input, Multiply, Add, and Result.

class Input(param.Parameterized):

    value1 = param.Integer(default=2)

    value2 = param.Integer(default=3)

    def panel(self):
        return pn.Row(self.param.value1, self.param.value2)

class Multiply(Input):

    def panel(self):
        return '%.3f * %.3f' % (self.value1, self.value2)

    @param.output('result')
    def output(self):
        return self.value1 * self.value2

class Add(Input):

    def panel(self):
        return '%d + %d' % (self.value1, self.value2)

    @param.output('result')
    def output(self):
        return self.value1 + self.value2

class Result(Input):

    result = param.Number(default=0)

    def panel(self):
        return self.result

dag = pn.pipeline.Pipeline()

dag.add_stage('Input', Input)
dag.add_stage('Multiply', Multiply)
dag.add_stage('Add', Add)
dag.add_stage('Result', Result)

After adding all the stages we have to express the relationship between these stages. To declare the graph we can use the define_graph method and provide a adjacency map, which declares which stage feeds into which other stages. In this case the Input feeds into both Multiply and Add and both those stages feed into the Result:

dag.define_graph({'Input': ('Multiply', 'Add'), 'Multiply': 'Result', 'Add': 'Result'})

This is of course a very simple example but it demonstrates the ability to express arbitrary workflows with branching and converging steps:

dag

Custom layout#

For a Pipeline object p, p.layout is a Panel layout with the following hierarchically arranged components:

  • layout: The overall layout of the header and stage.

    • header: The navigation components and network diagram.

      • title: The name of the current stage.

      • network: A network diagram representing the pipeline.

      • buttons: All navigation buttons and selectors.

        • prev_button: The button to go to the previous stage.

        • prev_selector: The selector widget to select between previous branching stages.

        • next_button: The button to go to the previous stage

        • next_selector: The selector widget to select the next branching stages.

    • stage: The contents of the current pipeline stage.

You can pick and choose any combination of these components to display in any configuration, e.g. just pn.Column(p.title,p.network,p.stage) if you don’t want to show any buttons for a pipeline p.

For instance, let’s rearrange our dag pipeline to fit into a smaller horizontal space:

pn.Column(
    pn.Row(dag.title, pn.layout.HSpacer(), dag.buttons),
    dag.network,
    dag.stage
)

Programmatic flow control#

By default, controlling the flow between different stages is done using the “Previous” and “Next” buttons. However often we want to control the UI flow programmatically from within a stage. A Pipeline allows programmatic control by declaring a ready_parameter either per stage or globally on the Pipeline, which can block or unblock the buttons depending on the information obtained so far, as well as advancing automatically when combined with the auto_advance parameter. In this way we can control the workflow programmatically from inside the stages.

In the example below we create a version of the previous workflow that can be used without the buttons by declaring ready parameters for each of the stages, which we can toggle with a custom button or simply set to True by default to automatically skip the stage.

Lastly, we can also control which branching stage to switch to from within a stage. To do so we declare a parameter which will hold the name of the next stage to switch to, in this case selecting between ‘Add’ and ‘Multiply’. Later we will point the pipeline to this parameter using the next_parameter argument.

class AutoInput(Input):

    operator = param.Selector(default='Add', objects=['Multiply', 'Add'])

    ready = param.Boolean(default=False)

    def panel(self):
        button = pn.widgets.Button(name='Go', button_type='success')
        button.on_click(lambda event: setattr(self, 'ready', True))
        widgets = pn.Row(self.param.value1, self.param.operator, self.param.value2)
        for w in widgets:
            w.width = 85
        return pn.Column(widgets, button)

class AutoMultiply(Multiply):

    ready = param.Boolean(default=True)

class AutoAdd(Add):

    ready = param.Boolean(default=True)

Now that we have declared these stages let us set up the pipeline, ensuring that we declare the ready_parameter, next_parameter, and auto_advance settings appropriately:

dag = pn.pipeline.Pipeline() # could instead set ready_parameter='ready' and auto_advance=True globally here

dag.add_stage('Input',    AutoInput,    ready_parameter='ready', auto_advance=True, next_parameter='operator')
dag.add_stage('Multiply', AutoMultiply, ready_parameter='ready', auto_advance=True)
dag.add_stage('Add',      AutoAdd,      ready_parameter='ready', auto_advance=True)
dag.add_stage('Result',   Result)

dag.define_graph({'Input': ('Multiply', 'Add'), 'Multiply': 'Result', 'Add': 'Result'})

Finally we display the pipeline without the buttons, which is appropriate because all the flow control is now handled from within the stages:

pn.Column(
    dag.title,
    dag.network,
    dag.stage
)

As you can see, a panel Pipeline can be used to set up complex workflows when needed, with each stage controlled either manually or from within the stage, without having to define complex callbacks or other GUI logic.