Anabranch is a TypeScript library for async stream processing where errors don't kill the pipeline. Instead of a single thrown exception stopping everything, errors flow alongside successes as tagged results: { type: "success", value } or { type: "error", error }. The pipeline runs to completion and you decide what to do with the errors at the end.

The name comes from hydrology. An anabranch is a branch of a river that diverges from the main channel and may rejoin it later. This is how I envisioned the package to work: errors branch off from the success path but don't block it, get collected alongside successes, and are handled wherever you choose.

The Problem

When processing a collection of items asynchronously, a single failure kills the loop. The typical fix is a per-item try/catch, which quickly turns the logic inside out:

const results = [];
const failures = [];

for (const id of ids) {
  try {
    const user = await fetchUser(id);
    results.push(transform(user));
  } catch (err) {
    failures.push({ id, err });
  }
}

Now imagine two more transformations, each of which can fail:

for (const id of ids) {
  try {
    const user = await fetchUser(id);
    let enriched;
    try {
      enriched = await enrich(user);
    } catch (err) {
      failures.push({ id, err });
      continue;
    }
    try {
      results.push(await transform(enriched));
    } catch (err) {
      failures.push({ id, err });
    }
  } catch (err) {
    failures.push({ id, err });
  }
}

What I wanted instead was a composable pipeline where errors are just another value, one that doesn't derail the whole thing:

import { Source } from "@anabranch/anabranch";

const { successes, errors } = await Source.from(async function* () {
  for (const id of ids) yield id;
})
  .map((id) => fetchUser(id))
  .map((user) => transform(user))
  .partition();

console.log(`${successes.length} processed, ${errors.length} failed`);

Both pipelines do the same thing (collect successes and accumulate failures), but the second one stays flat and composable no matter how many steps you add.

Source and Stream

Source is the entry point. It wraps an async generator: each yield becomes a success result, and any thrown error becomes a single error result. From a Source you get a Stream, which is where the pipeline API lives.

A Stream is an AsyncIterable<Result<T, E>> with about 25 chainable methods: map, flatMap, filter, fold, scan, take, chunks, and more. Terminal operations like collect(), toArray(), successes(), and partition() consume the stream and return a promise. Everything is lazy: nothing runs until you call a terminal operation.

Dual-Channel Operations

Every success-side operation has an error-side counterpart. map has mapErr. filter has filterErr. tap has tapErr. On top of those, there are recovery operations: recover converts any error into a success value, and recoverWhen lets you target a specific error type while leaving others untouched. The type system tracks this: after a recover(), the error type collapses to never:

import { Source } from "@anabranch/anabranch";
import type { NetworkError, ParseError } from "./errors.ts";

const results = await Source.from<string, NetworkError | ParseError>(fetchItems)
  .map((item) => parse(item))
  .mapErr((err) => ({ ...err, context: "item pipeline" }))
  .recoverWhen(
    (err): err is ParseError => err instanceof ParseError,
    (_err) => null,           // replace parse errors with null
  )
  .filter((v) => v !== null)  // drop nulls, TypeScript knows v is not null
  .collect();                  // Stream<ParsedItem, NetworkError> -- ParseError is gone

The throwOn method is the escape hatch going the other direction: it re-throws on a specific error type, which is useful when you want to bail out of a pipeline at a known fatal condition.

Task

Task<T, E> is for single async operations: an HTTP call, a DB query, reading a file. It's lazy (nothing runs until you call .run() or .result()) and carries the same success/error vocabulary as Stream. The same map, flatMap, mapErr, recover, and recoverWhen methods are available. What Task adds is retry, timeout, and withSignal:

import { Task } from "@anabranch/anabranch";

const user = await Task.of((signal) => fetch("/api/user", { signal }).then((r) => r.json()))
  .retry({
    attempts: 3,
    delay: (attempt, _err) => 200 * 2 ** attempt, // 200ms, 400ms, 800ms
    when: (err) => err instanceof NetworkError,
  })
  .timeout(5_000)
  .run();

Tasks also compose for concurrency using Task.allSettled and Task.race, letting you manage multiple independent operations while keeping their individual error-handling logic intact.

One design choice worth being upfront about: error types in this library are unchecked. The source comment on Task.of says it directly: "the error type E represents the expected error shape rather than a runtime guarantee." The same applies to Source. You're declaring what you expect, but TypeScript can't verify it at runtime. The library deliberately stays flexible here rather than adding the machinery that would make errors fully checked. Packages like @anabranch/db and @anabranch/fs build on this by wrapping raw driver errors into structured classes, earning their types at the boundary, but the core keeps it lightweight.

Task.acquireRelease is a first-class bracket pattern: it acquires a resource, uses it, then releases it whether the use phase succeeded or failed. This is how the database package manages connection lifetimes, and you can use it directly for any resource that needs deterministic cleanup.

Channel

Sometimes you don't control when values arrive: a WebSocket pushes messages, an event emitter fires callbacks, a queue delivers items. That's what Channel is for: a push-based source that you write to from the outside and consume as a Stream from the inside.

import { Channel } from "@anabranch/anabranch";

const channel = Channel.create<string, Error>({ bufferSize: 256 });

ws.on("message", (data) => channel.send(data));
ws.on("error", (err) => channel.fail(err));
ws.on("close", () => channel.close());

for await (const result of channel) {
  if (result.type === "success") process(result.value);
}

If the consumer falls behind, Channel drops values once the buffer is full, with an optional onDrop callback for monitoring. Because Channel extends Stream, you can chain map, filter, or any other stream operation directly on it before consuming.

Concurrency

By default, map and flatMap execute sequentially. Calling .withConcurrency(n) on the source opts into parallel execution: up to n items are in flight simultaneously and results are collected as they complete, potentially out of order. .withBufferSize(n) adds backpressure so the source pauses if more than n results are queued downstream.

const processed = await Source.from(async function* () {
  for (const id of userIds) yield id;
})
  .withConcurrency(10)   // up to 10 fetches in flight at once
  .withBufferSize(50)    // pause if more than 50 results are queued
  .map((id) => fetchUser(id))
  .filter((user) => user.active)
  .collect();

The sequential path is just an async generator loop with no extra overhead. The concurrent path only activates when concurrency is a finite number greater than one, so you don't pay for it when you don't need it.

Ecosystem: Broken Link Checker

All the ecosystem packages are built on the same primitives and follow the same conventions. The broken link checker is a good example of how they compose. It takes a list of seed URLs, crawls each one to extract links, and checks them concurrently, returning results as a Stream so you can apply any stream operation to the output:

import { BrokenLinkChecker } from "@anabranch/broken-link-checker";

const { successes: broken, errors } = await BrokenLinkChecker.create()
  .withConcurrency(20)
  .withTimeout(10_000)
  .check(["https://example.com", "https://example.com/sitemap.xml"])
  .tap((r) => console.log(`[${r.statusCode}] ${r.url}`))
  .tapErr((err) => console.error("Crawler error:", err.message))
  .filter((r) => !r.ok)  // keep only broken links
  .partition();           // successes = broken, errors = crawler failures

Internally, it uses a Channel as a work queue. Seed URLs go in, the crawler processes them, extracts new links, and feeds those back into the channel. The consumer side is the stream the caller gets back from .check(). HTTP requests go through @anabranch/web-client, which itself returns Task instances with built-in retry and Retry-After-aware exponential backoff.

The builder pattern is worth highlighting. Every package uses a private constructor, a static create() factory, and immutable with*() chaining (each call returns a new instance). It's a bit more ceremony than a plain options bag, but the immutability pays off when you're building variations of a checker by branching off a shared base config.

Ecosystem: Message Queue

The @anabranch/queue package provides a message queue abstraction with Task/Stream semantics. It handles delayed delivery, dead letter queues, and visibility timeouts, with separate adapters for Redis (@anabranch/queue-redis) and RabbitMQ (@anabranch/queue-rabbitmq).

import { Queue } from "@anabranch/queue";
import { RedisConnector } from "@anabranch/queue-redis";

const connector = new RedisConnector({ redisUrl: REDIS_URL });
const queue = await Queue.connect(connector).run();

// Stream messages with concurrent processing
const { successes, errors } = await queue
  .stream("notifications", { concurrency: 5 })
  .map(async (msg) => await sendNotification(msg.data))
  .partition();

Ecosystem: Object Storage

The @anabranch/storage package brings uniform object storage primitives. It supports AWS S3 (@anabranch/storage-s3), Google Cloud Storage (@anabranch/storage-gcs), and even IndexedDB (@anabranch/storage-browser) for local browser storage.

import { Storage } from "@anabranch/storage";
import { S3Connector } from "@anabranch/storage-s3";

const storage = await Storage.connect(new S3Connector({ bucket: "uploads" })).run();

// Upload a file as a Task
await storage.put("hello.txt", "Hello world").run();

// List objects concurrently using Stream
const results = await storage.list("files/")
  .withConcurrency(10)
  .map(async (entry) => await processFile(entry))
  .collect();

Ecosystem: File System

The @anabranch/fs package provides streaming file-system utilities. Multi-value operations like glob return a Source for streaming, and single-value operations like readFile return a Task for composable error handling.

import { glob, readLines } from "@anabranch/fs";

const { successes, errors } = await glob("./src", "**/*.ts")
  .flatMap(async (entry) => {
    const lines = await readLines(entry.path)
      .filter((line) => line.includes("TODO"))
      .map((line) => ({ path: entry.path, line }))
      .collect();
    return lines;
  })
  .partition();

Ecosystem: Database

The database package wraps any DB adapter in the same Task/Source type universe. db.query() returns a Task, and db.stream() returns a cursor-backed Source for large result sets. DB.withConnection uses Task.acquireRelease under the hood, so the connection is guaranteed to release back to the pool whether the operation succeeds or throws.

A withTransaction method executes a callback within a transaction, automatically committing on success or rolling back on error, providing a safe and composable way to handle database transactions.

import { Task } from "@anabranch/anabranch";
import { DB } from "@anabranch/db";
import { PostgresConnector } from "@anabranch/db-postgres";

const connector = new PostgresConnector({ connectionString: DATABASE_URL });

const totalRevenue = await DB.withConnection(connector, (db) =>
  db.withTransaction(async (tx) =>
    tx.stream<Order>("SELECT * FROM orders WHERE status = 'paid'")
      .tapErr((err) => console.error("Stream error:", err.message))
      .filter((order) => order.currency === "USD")
      .map((order) => order.total)
      .fold((sum, total) => sum + total, 0),
  )
).run();

There are separate packages for PostgreSQL (@anabranch/db-postgres), MySQL (@anabranch/db-mysql), and SQLite (@anabranch/db-sqlite), each implementing the same DBConnector/DBAdapter interface. Structured error types (ConnectionFailed, QueryFailed, ConstraintViolation, TransactionFailed) make error handling specific enough to be useful without catching raw driver exceptions.

Why Deno?

I've been watching Ryan Dahl's talks for a while. What draws me to Deno is the attitude behind it: no bullshit, batteries included, just the right things in the right place. It reminds me of Go in that way. You get a formatter, a test runner, a linter, and first-class TypeScript support without installing anything extra or maintaining a tsconfig.json. The standard library is well-designed and the module system actually makes sense. For a library project in particular, that baseline removes a lot of friction.

The npm compatibility story is good enough now that I don't feel like I'm choosing a niche. Source files use .ts imports and JSR packages, and the npm distributions are generated from those via @deno/dnt. The published packages land on both JSR and npm under the @anabranch scope.

What's Next

Honestly, I'm just enjoying building it. Working with the anabranch primitives to add new packages keeps being fun, and I think the project will keep growing because of that rather than from any particular roadmap. The building blocks feel right, so I'll keep adding more of them.

The source is on GitHub and the API docs are at frodi-karlsson.github.io/anabranch.