Migrating from RxJS
A comprehensive guide to migrating from RxJS to Kairo's pipeline composition patterns.
Why Migrate?
While RxJS is powerful for reactive programming, Kairo provides:
- Simpler mental model - No complex operators or subscription management
- Result pattern integration - Consistent error handling across all operations
- Configuration objects - Clear, declarative syntax instead of operator chains
- TypeScript-first - Better type safety and inference
- Lighter weight - Smaller bundle size without complex observables
Key Differences
RxJS vs Kairo Philosophy
typescript
// RxJS - Reactive streams with operators
import { of, map, filter, catchError } from 'rxjs'
const stream$ = of(data).pipe(
map(item => transform(item)),
filter(item => item.active),
catchError(err => of(fallback))
)
// Kairo - Functional pipelines with Result pattern
import { pipeline, Result } from '@sanzoku-labs/kairo'
const process = pipeline.compose([
data => pipeline.map(data, transform),
data => pipeline.filter(data, item => item.active),
data => Result.isErr(data) ? Result.Ok(fallback) : data
])Common Pattern Migrations
Basic Transformations
typescript
// RxJS
import { of, map, filter } from 'rxjs'
const processed$ = of(users).pipe(
map(users => users.filter(user => user.active)),
map(users => users.map(user => user.name))
)
// Kairo
import { pipeline } from '@sanzoku-labs/kairo'
const processed = pipeline.compose([
users => pipeline.filter(users, user => user.active),
users => pipeline.map(users, user => user.name)
])(users)Async Operations
typescript
// RxJS
import { from, mergeMap, map } from 'rxjs'
const fetchUsers$ = from(userIds).pipe(
mergeMap(id => from(fetch(`/api/users/${id}`))),
map(response => response.json())
)
// Kairo
import { pipeline, service } from '@sanzoku-labs/kairo'
const fetchUsers = pipeline.compose([
userIds => pipeline.asyncMap(userIds, id =>
service.get(`/api/users/${id}`)
),
results => pipeline.map(results, result =>
Result.isOk(result) ? result.value : null
)
])Error Handling
typescript
// RxJS
import { of, map, catchError } from 'rxjs'
const safe$ = of(data).pipe(
map(data => riskyOperation(data)),
catchError(error => of(defaultValue))
)
// Kairo
import { pipeline, Result } from '@sanzoku-labs/kairo'
const safe = pipeline.compose([
data => pipeline.tryCatch(
() => riskyOperation(data),
error => new ProcessingError('OPERATION_FAILED', error.message)
),
result => Result.isErr(result) ? Result.Ok(defaultValue) : result
])Operator Migrations
Map
typescript
// RxJS
import { of, map } from 'rxjs'
const doubled$ = of([1, 2, 3]).pipe(
map(arr => arr.map(x => x * 2))
)
// Kairo
import { pipeline } from '@sanzoku-labs/kairo'
const doubled = pipeline.map([1, 2, 3], x => x * 2)Filter
typescript
// RxJS
import { of, map } from 'rxjs'
const evens$ = of([1, 2, 3, 4]).pipe(
map(arr => arr.filter(x => x % 2 === 0))
)
// Kairo
import { pipeline } from '@sanzoku-labs/kairo'
const evens = pipeline.filter([1, 2, 3, 4], x => x % 2 === 0)Reduce
typescript
// RxJS
import { of, reduce } from 'rxjs'
const sum$ = of(1, 2, 3, 4).pipe(
reduce((acc, val) => acc + val, 0)
)
// Kairo
import { pipeline } from '@sanzoku-labs/kairo'
const sum = pipeline.reduce([1, 2, 3, 4], (acc, val) => acc + val, 0)FlatMap/MergeMap
typescript
// RxJS
import { of, mergeMap } from 'rxjs'
const flattened$ = of([1, 2, 3]).pipe(
mergeMap(arr => of(...arr))
)
// Kairo
import { pipeline } from '@sanzoku-labs/kairo'
const flattened = pipeline.flatMap([1, 2, 3], x => [x])CombineLatest
typescript
// RxJS
import { combineLatest, map } from 'rxjs'
const combined$ = combineLatest([stream1$, stream2$]).pipe(
map(([a, b]) => ({ a, b }))
)
// Kairo
import { pipeline } from '@sanzoku-labs/kairo'
const combined = pipeline.parallel([
() => getStream1Data(),
() => getStream2Data()
]).then(([a, b]) => ({ a, b }))Debounce/Throttle
typescript
// RxJS
import { fromEvent, debounceTime, throttleTime } from 'rxjs'
const debounced$ = fromEvent(input, 'keyup').pipe(
debounceTime(300)
)
const throttled$ = fromEvent(button, 'click').pipe(
throttleTime(1000)
)
// Kairo
import { pipeline } from '@sanzoku-labs/kairo'
const debouncedHandler = pipeline.debounce(handleInput, 300)
const throttledHandler = pipeline.throttle(handleClick, 1000)Complex Pattern Migrations
HTTP Requests with Retry
typescript
// RxJS
import { ajax } from 'rxjs/ajax'
import { retry, catchError, map } from 'rxjs'
const apiCall$ = ajax.getJSON('/api/data').pipe(
retry(3),
map(response => response.data),
catchError(error => of({ error: true, message: error.message }))
)
// Kairo
import { service, Result } from '@sanzoku-labs/kairo'
const apiCall = async () => {
const result = await service.get('/api/data', {
retry: { attempts: 3 }
})
return Result.match(result, {
Ok: response => response.data,
Err: error => ({ error: true, message: error.message })
})
}Data Transformation Pipeline
typescript
// RxJS
import { of, map, filter, scan } from 'rxjs'
const pipeline$ = of(rawData).pipe(
map(data => validateData(data)),
filter(data => data.valid),
map(data => normalizeData(data)),
scan((acc, item) => [...acc, item], [])
)
// Kairo
import { pipeline, data } from '@sanzoku-labs/kairo'
const processPipeline = pipeline.compose([
data => data.validate(data, DataSchema),
result => Result.isOk(result) ? result.value : [],
data => pipeline.map(data, item => normalizeData(item)),
data => pipeline.reduce(data, (acc, item) => [...acc, item], [])
])Conditional Processing
typescript
// RxJS
import { of, map, switchMap } from 'rxjs'
const conditional$ = of(user).pipe(
switchMap(user =>
user.isAdmin
? of(user).pipe(map(u => processAdmin(u)))
: of(user).pipe(map(u => processUser(u)))
)
)
// Kairo
import { pipeline } from '@sanzoku-labs/kairo'
const conditional = pipeline.branch(user, [
{
condition: user => user.isAdmin,
pipeline: user => processAdmin(user)
},
{
condition: user => !user.isAdmin,
pipeline: user => processUser(user)
}
])Subject Alternatives
BehaviorSubject → State Management
typescript
// RxJS
import { BehaviorSubject } from 'rxjs'
class UserStore {
private userSubject = new BehaviorSubject<User | null>(null)
public user$ = this.userSubject.asObservable()
setUser(user: User) {
this.userSubject.next(user)
}
getUser() {
return this.userSubject.value
}
}
// Kairo (simple state management)
import { Result } from '@sanzoku-labs/kairo'
class UserStore {
private user: User | null = null
private listeners: ((user: User | null) => void)[] = []
setUser(user: User) {
this.user = user
this.listeners.forEach(listener => listener(user))
}
getUser(): User | null {
return this.user
}
subscribe(listener: (user: User | null) => void) {
this.listeners.push(listener)
return () => {
this.listeners = this.listeners.filter(l => l !== listener)
}
}
}ReplaySubject → Event History
typescript
// RxJS
import { ReplaySubject } from 'rxjs'
const events$ = new ReplaySubject<Event>(10)
// Kairo (simple event history)
class EventHistory {
private events: Event[] = []
private maxSize = 10
add(event: Event) {
this.events.push(event)
if (this.events.length > this.maxSize) {
this.events.shift()
}
}
getEvents(): Event[] {
return [...this.events]
}
getLast(count: number): Event[] {
return this.events.slice(-count)
}
}Real-World Migration Example
Before: RxJS-based Data Service
typescript
import { Observable, of, from, combineLatest } from 'rxjs'
import { map, mergeMap, catchError, retry, debounceTime } from 'rxjs/operators'
class DataService {
private searchSubject = new Subject<string>()
search$ = this.searchSubject.pipe(
debounceTime(300),
mergeMap(query =>
from(fetch(`/api/search?q=${query}`)).pipe(
mergeMap(response => from(response.json())),
retry(3),
catchError(error => of({ error: true, results: [] }))
)
)
)
getUserData(userId: string): Observable<UserData> {
return combineLatest([
this.getUser(userId),
this.getUserPosts(userId),
this.getUserSettings(userId)
]).pipe(
map(([user, posts, settings]) => ({
user,
posts,
settings,
lastUpdated: new Date()
})),
catchError(error => of({
error: true,
message: error.message
}))
)
}
private getUser(id: string): Observable<User> {
return from(fetch(`/api/users/${id}`)).pipe(
mergeMap(response => from(response.json())),
retry(2)
)
}
private getUserPosts(userId: string): Observable<Post[]> {
return from(fetch(`/api/users/${userId}/posts`)).pipe(
mergeMap(response => from(response.json())),
retry(2)
)
}
private getUserSettings(userId: string): Observable<Settings> {
return from(fetch(`/api/users/${userId}/settings`)).pipe(
mergeMap(response => from(response.json())),
retry(2)
)
}
search(query: string) {
this.searchSubject.next(query)
}
}After: Kairo-based Data Service
typescript
import { service, pipeline, Result } from '@sanzoku-labs/kairo'
class DataService {
private searchHandler = pipeline.debounce(
(query: string) => this.performSearch(query),
300
)
async search(query: string): Promise<Result<ServiceError, SearchResults>> {
return this.searchHandler(query)
}
private async performSearch(query: string): Promise<Result<ServiceError, SearchResults>> {
return service.get(`/api/search`, {
params: { q: query },
retry: { attempts: 3 },
timeout: 5000
})
}
async getUserData(userId: string): Promise<Result<ServiceError, UserData>> {
const fetchPipeline = pipeline.compose([
// Fetch all data in parallel
(id: string) => pipeline.parallel([
() => this.getUser(id),
() => this.getUserPosts(id),
() => this.getUserSettings(id)
]),
// Combine results
(results: [Result<ServiceError, User>, Result<ServiceError, Post[]>, Result<ServiceError, Settings>]) => {
const [userResult, postsResult, settingsResult] = results
// Check if all requests succeeded
if (Result.isOk(userResult) && Result.isOk(postsResult) && Result.isOk(settingsResult)) {
return Result.Ok({
user: userResult.value,
posts: postsResult.value,
settings: settingsResult.value,
lastUpdated: new Date()
})
}
// Return first error found
const error = Result.isErr(userResult) ? userResult.error :
Result.isErr(postsResult) ? postsResult.error :
settingsResult.error
return Result.Err(error)
}
])
return fetchPipeline(userId)
}
private async getUser(id: string): Promise<Result<ServiceError, User>> {
return service.get(`/api/users/${id}`, {
retry: { attempts: 2 },
timeout: 3000
})
}
private async getUserPosts(userId: string): Promise<Result<ServiceError, Post[]>> {
return service.get(`/api/users/${userId}/posts`, {
retry: { attempts: 2 },
timeout: 3000
})
}
private async getUserSettings(userId: string): Promise<Result<ServiceError, Settings>> {
return service.get(`/api/users/${userId}/settings`, {
retry: { attempts: 2 },
timeout: 3000
})
}
}
// Usage
const dataService = new DataService()
// Search
const searchResult = await dataService.search('user query')
Result.match(searchResult, {
Ok: results => console.log('Search results:', results),
Err: error => console.error('Search failed:', error.message)
})
// Get user data
const userDataResult = await dataService.getUserData('user-123')
Result.match(userDataResult, {
Ok: userData => console.log('User data:', userData),
Err: error => console.error('Failed to get user data:', error.message)
})Migration Benefits
Bundle Size Reduction
typescript
// RxJS bundle impact
import { Observable, of, from, combineLatest } from 'rxjs'
import { map, mergeMap, catchError, retry, debounceTime } from 'rxjs/operators'
// ~30kb+ when tree-shaken
// Kairo bundle impact
import { service, pipeline, Result } from '@sanzoku-labs/kairo'
// ~5kb when tree-shakenSimpler Error Handling
typescript
// RxJS - Multiple error handling patterns
observable$.pipe(
catchError(error => of(fallback)),
finalize(() => cleanup())
).subscribe({
next: value => handle(value),
error: error => handleError(error)
})
// Kairo - Consistent Result pattern
const result = await operation()
Result.match(result, {
Ok: value => handle(value),
Err: error => handleError(error)
})Better TypeScript Support
typescript
// RxJS - Complex type inference
const stream$: Observable<ProcessedData> = source$.pipe(
map((data: RawData) => process(data)),
filter((data: ProcessedData) => data.valid)
)
// Kairo - Clear type inference
const process = pipeline.compose([
(data: RawData) => pipeline.map(data, item => processItem(item)),
(data: ProcessedData[]) => pipeline.filter(data, item => item.valid)
])Migration Checklist
- [ ] Identify reactive patterns in your RxJS code
- [ ] Replace observables with async functions and Result pattern
- [ ] Convert operators to pipeline functions
- [ ] Replace subjects with simple state management
- [ ] Update error handling to use Result pattern
- [ ] Replace subscription management with async/await
- [ ] Test for performance improvements
- [ ] Remove RxJS dependency
Migration Strategy
- Start with simple operations - Map, filter, reduce
- Replace HTTP calls - Convert to service calls with Result pattern
- Simplify error handling - Use Result pattern throughout
- Remove complex operators - Replace with pipeline composition
- Update state management - Use simple patterns instead of subjects
- Test thoroughly - Ensure all async operations work correctly