plumbum Tutorial¶
This notebook walks through the core ideas behind plumbum pipelines. It covers synchronous operators, iterable helpers, and the async variants.
Getting Started¶
Operators are regular Python callables decorated with @pb. Data is threaded
into the first argument using the > operator.
from pdum.plumbum import pb
from pdum.plumbum.iterops import select, where, tee, batched
from pdum.plumbum.aiterops import aselect, awhere, alist
from pdum.plumbum.jq import aexplode, agroup_by
Building Pipelines¶
@pb
def add(value: int, amount: int) -> int:
return value + amount
@pb
def multiply(value: int, factor: int) -> int:
return value * factor
pipeline = add(3) | multiply(2)
5 > pipeline
16
Wrapping Existing Callables¶
Use pb() directly when you need a one-off callable inside a pipeline. It keeps the operator inertia without forcing you to decorate everything in advance. This is only neeed at the start of the pipeline. After that, callables are converted automatically.
5 > pb(print) # emits 5
"hello" > pb(str.upper) | (lambda s: s + "!") | print
5 HELLO!
Keyword Arguments and Incremental Binding¶
Operators accept keyword arguments just like the underlying function, and you can partially apply additional arguments in later calls.
@pb
def greet(name: str, greeting: str = "Hello", punctuation: str = "!") -> str:
return f"{greeting}, {name}{punctuation}"
"Alice" > greet(greeting="Hi") | print
op = greet("Greetings")
"Diana" > op(punctuation="...") | print
Hi, Alice! Greetings, Diana...
Debugging Pipelines¶
Create side-effectful callables when you need to peek at intermediate states without tearing apart the pipeline.
@pb
def log(value):
print(value)
return value
10 > add(5) | log | multiply(2) | log
15 30
30
Working with Arbitrary Types¶
Because plumbum only manages call signatures, any Python object can flow through a pipeline.
class Point:
def __init__(self, x: int, y: int) -> None:
self.x = x
self.y = y
def __repr__(self) -> str:
return f"Point({self.x}, {self.y})"
@pb
def translate(point: Point, dx: int, dy: int) -> Point:
return Point(point.x + dx, point.y + dy)
Point(1, 2) > translate(5, 3)
{"a": 1} > pb(lambda mapping: {**mapping, "b": 2})
{'a': 1, 'b': 2}
Iterable Helpers¶
pdum.plumbum.iterops exposes ready-made operators for working with
iterables. They compose like normal operators.
range(5) > select(lambda value: value * 2) | where(lambda value: value % 3 != 0) | list
[2, 4, 8]
range(5) > batched(2) | list
[(0, 1), (2, 3), (4,)]
Inspecting Pipelines¶
Use tee to observe intermediate values without breaking the flow.
range(5) > select(lambda value: value + 1) | tee | where(lambda value: value % 2 == 0) | list
1 2 3 4 5
[2, 4]
Pipelines can be reusable¶
@pb
def filter_positive(numbers):
for number in numbers:
if number > 0:
yield number
@pb
def square_all(numbers):
for number in numbers:
yield number**2
process = filter_positive | square_all | sum
[-2, 3, -1, 4, 5] > process, [-10, 2, -5, 6] > process
(50, 40)
String Processing¶
def strip(text: str) -> str:
return text.strip()
def uppercase(text: str) -> str:
return text.upper()
def replace(text: str, old: str, new: str) -> str:
return text.replace(old, new)
clean_text = " hello world " > pb(strip) | pb(replace)(" ", "_") | uppercase
Async Pipelines¶
Async operators use the @apb decorator. You can await the threaded
result directly from a notebook cell.
from pdum.plumbum import apb
@apb
async def async_double(value: int) -> int:
return value * 2
await (5 > async_double | add(3))
13
Async Iterable Helpers¶
The pdum.plumbum.aiterops module mirrors the iterable helpers with async
variants prefixed by a.
Async helpers accept synchronous or asynchronous callables just like the synchronous API.
from pdum.plumbum.aiterops import aiter
def even(value: int) -> bool:
return value % 2 == 0
await ([1, 2, 3, 4] > aiter | aselect(lambda value: value + 1) | awhere(even) | alist)
[2, 4]
jq-like Operators¶
The pdum.plumbum.jq module adds jq-inspired path querying and transformation helpers on top of regular pipelines.
from pdum.plumbum import pb
from pdum.plumbum.jq import field, transform, group_by
from pdum.plumbum.iterops import select, where
records = [
{"user": {"id": 1, "name": "Ada"}, "scores": [10, 15]},
{"user": {"id": 2, "name": "Linus"}, "scores": [20]},
]
curved = records > select(transform("scores[]", lambda score: score * 1.1)) | tee | list
names = records > select(field("user.name")) | tee | list
high_scorers = records > where(lambda row: max(row["scores"]) >= 15) | group_by("user.id") | tee | list
{'user': {'id': 1, 'name': 'Ada'}, 'scores': [11.0, 16.5]}
{'user': {'id': 2, 'name': 'Linus'}, 'scores': [22.0]}
'Ada'
'Linus'
(1, [{'user': {'id': 1, 'name': 'Ada'}, 'scores': [10, 15]}])
(2, [{'user': {'id': 2, 'name': 'Linus'}, 'scores': [20]}])
from pdum.plumbum.aiterops import aiter, alist
records = [
{"users": [{"id": 1}, {"id": 2}]},
{"users": [{"id": 2}, {"id": 3}]},
]
await (records > aiter | aexplode("users") | agroup_by("id") | alist)
[(1, [{'id': 1}]), (2, [{'id': 2}, {'id': 2}]), (3, [{'id': 3}])]