Async and Concurrency

import panel as pn
import asyncio

pn.extension()

User applications frequently have to do multiple tasks at the same time such as processing user input, making I/O requests or running long running calcultions. At the same time the application should continue to responsive to user input. This problem can be solved in a number of different ways and here we will look primarily at two approaches, asynchronous callbacks and concurrency using multiple threads.

Asynchronous functions

Python has natively supported asynchronous functions since version 3.5, for a quick overview of some of the concepts involved see the Python documentation. For full asyncio support in Panel you will have to use python>=3.8.

One of the major benefits of leveraging async functions is that it is simple to write callbacks which will perform some longer running IO tasks in the background. Below we simulate this by creating a Button which will update some text when it starts and finishes running a long-running background task (here simulated using asyncio.sleep. If you are running this in the notebook you will note that you can start multiple tasks and it will update the text immediately but continue in the background:

button = pn.widgets.Button(name='Click me!')
text = pn.widgets.StaticText()

async def run_async(event):
    text.value = f'Running {event.new}'
    await asyncio.sleep(2)
    text.value = f'Finished {event.new}'

button.on_click(run_async)

pn.Row(button, text)

Note that on_click is simple one way of registering an asynchronous callback, using .param.watch is also supported and so is scheduling asynchronous periodic callbacks with pn.state.add_periodic_callback.

It is important to note that asynchronous callbacks operate without locking the underlying bokeh Document, which means Bokeh models cannot be safely modified by default. Usually this is not an issue because modifying Panel components appropriately schedules updates to underlying Bokeh models, however in cases where we want to modify a Bokeh model directly, e.g. when embedding and updating a Bokeh plot in a Panel application we explicitly have to decorate the asynchronous callback with pn.io.with_lock.

import numpy as np
from bokeh.plotting import figure
from bokeh.models import ColumnDataSource

button = pn.widgets.Button(name='Click me!')

p = figure(width=500, height=300)
cds = ColumnDataSource(data={'x': [0], 'y': [0]})
p.line(x='x', y='y', source=cds)
pane = pn.pane.Bokeh(p)

@pn.io.with_lock
async def stream(event):
    await asyncio.sleep(1)
    x, y = cds.data['x'][-1], cds.data['y'][-1]
    cds.stream({'x': list(range(x+1, x+6)), 'y': y+np.random.randn(5).cumsum()})
    pane.param.trigger('object')
    
# Equivalent to `.on_click` but shown
button.param.watch(stream, 'clicks')

pn.Row(button, pane)

Concurrency

Asynchronous processing can be very helpful for IO bound tasks, however if you have to perform actual computations it won’t help you at all since those tasks will continue to block the running thread. It is also not always easy or possible to peform all IO bound tasks asynchronously. Therefore threading can be a very valuable tool in your toolbox.

Below we will demonstrate an example of a Thread which we start in the background to process items we put in a queue for processing. We simulate the processing with a time.sleep but it could be any long-running computation. The threading.Condition allows us to manipulate the global shared queue.

import time
import threading

c = threading.Condition()

button = pn.widgets.Button(name='Click to launch')
text = pn.widgets.StaticText()

queue = []

def callback():
    global queue
    while True:
        c.acquire()
        for i, q in enumerate(queue):
            text.value = f'Processing item {i+1} of {len(queue)} items in queue.'
            time.sleep(2)
        queue.clear()
        text.value = "Queue empty"
        c.release()
        time.sleep(1)
        
thread = threading.Thread(target=callback)
thread.start()

Now we will create a callback that puts new items for processing on the queue when a button is clicked:

def on_click(event):
    queue.append(event)

button.on_click(on_click)

pn.Row(button, text).servable()

Since the processing happens on a separate thread the application itself can still remain responsive to further user input (such as putting new items on the queue).

This web page was generated from a Jupyter notebook and not all interactivity will work on this website. Right click to download and run locally for full Python-backed interactivity.

Right click to download this notebook from GitHub.