The Streamatorium is an experiment in applying functional programming and Unix principles to the web.
Stream processes look like this:
#! pipe # Print out 10 numbers to the console [0..9].map STDOUT
Get popular repos from a json data source and display them one at a time to the console.
popular = (repo) -> repo.watchers > 100 "".tap getJSON each limit(10) pluck("url") getJSON filter(popular) pluck("name") STDOUT
Atoms are any object. Atoms form streams by flowing through pipes. Atoms originate in sources and end up in sinks.
Example atoms:
0, 1, "", true, false, "heyyy", 954, {}, {name: "flambo"}, [{...}, ...]
logs any atom to the console
STDOUT = (atom) ->
console.log atom
sink eats any atom passed to it and does nothing
NULL = (atom) ->
A source is a function that takes a sink as an argument.
source = (sink) -> ...
We have added a utility method tap
to turn any object into a source.
#! pipe "Hello World".tap STDOUT
This is simply a convenience to keep the source on the left rather than:
(STDOUT) "Hello World"
With just a sink it doesn't seem to save much, but compare to a lager pipeline:
(T invoke("split", "") STDOUT) "Hello World"
Any array can become a source using the existing map
or forEach
#! pipe [1, 2, 3].map STDOUT [4, 5, 6].forEach STDOUT
A pipe is a function that takes a sink and returns a sink. A pipe is both a source and a sink.
A pipeline connects sources to sinks through pipes.
source pipe0 pipe1 pipe2 sink
This works due to function composition:
passes items through to the output unchanged. It is more useful as a
demonstration than as a practical pipe.
identity = (output) ->
(atom) ->
output atom
outputs atoms asynchrounously instead of immediately.
defer = (output) ->
(atom) ->
setTimeout output, 0, atom
splats arrays into individual items. Non-arrays are passed through as is.
each = (output) ->
(arrayOrItem) ->
[].concat(arrayOrItem).forEach (item) ->
output item
prettily prints an object as JSON.
prettyPrint = (output) ->
(atom) ->
output JSON.stringify(atom, null, 2)
getJSON = (output) ->
(url) ->
$.getJSON(url).then output
is a generalized T. When contsructed with a list of sinks it returns
a sink that outputs to all of the sinks it was constructed with.
split = (outputs...) ->
(atom) ->
outputs.forEach (output) ->
output atom
, similar to unix tee, splits a stream so that each atom flows to two
tee = (sink) ->
(output) ->
split sink, output
is a pipe that will mirror its atoms to the console. It is useful for
inspecting the flow at any point in the pipeline.
T = tee(STDOUT)
A pipe generator is a function that returns a pipe. The splitters above are one kind of pipe generator.
Example of tee
implemented wthout split
and annotated to show each part.
tee = (sink) -> # Generator (output) -> # Pipe (atom) -> # Sink sink atom output atom
Generate a pipe that transforms atoms by applying the given transformation function to each atom as it passes through.
map = (fn) ->
(output) ->
(atom) ->
output fn(atom)
selects an attribute from an atom and passes that attribute on.
pluck = (name) ->
map (atom) -> atom[name]
generates a pipe that invokes the named function with the given
arguments on each item passing through then passes the result on to the sink it
is connected to.
invoke = (name, args...) ->
map (atom) -> atom[name](args...)
Generate a pipe that only allows certain atoms to pass through. filter
the given indicator function and only passes through atoms for which it returns
filter = (fn) ->
(output) ->
(atom) ->
output atom if fn(atom)
The soak
pipe filters out null
and undefined
soak = filter (atom) -> atom?
is a switch. Whenever it receives an input it will ouput either true or
false and switch its state to output the opposite value the next input it
receives. It doesn't matter what atom it receives.
toggle = (output) ->
value = true
(atom) ->
output value
value = !value
A counter acts like a pedometer, counting each atom that flows through. It outputs the total count anytime an atom is received. It can count anything.
counter = (output) ->
value = 0
(atom) ->
output value += 1
An accumulator sums the atoms that flow through and outputs the current total each time an atom is received.
This example sums the odd numbers to produce a list of squares.
accumulator = (output) ->
value = 0
(atom) ->
output value += atom
Aggregate a stream of individual characters separated by whitespace into a stream of word strings.
tokenizer = (output) ->
word = ""
(character) ->
if character.match /\s/
if word
output word
word = ""
word += character
limit = (n) ->
(output) ->
count = 0
(atom) ->
output(atom) if count < n
count += 1
Connect the "end" of one pipeline to the begining of a new one.
TODO: Allow connectors to be created in any order. TODO: Allow many to many connectors.
connector = ->
atoms = []
output = null
flush = ->
if output
while atoms.length
output atoms.shift()
collector = (atom) ->
atoms.push atom
collector.source = (sink) ->
output = sink
return collector
connectors = {}
TO = (id) ->
connectors[id] = connector()
FROM = (id) ->
Emit an atom periodically. The clock
constructor returns a source.
clock = (t) ->
(output) ->
setInterval ->
output 1
, t * 1000
Attempt at a buffer that collects input and releases them based on a control/signal input.
is a source
gate = (ctrl) ->
(output) ->
buffer = []
ctrl ->
output buffer.shift()
(atom) ->
buffer.push atom
latch = (ctrl) ->
(output) ->
value = undefined
ctrl ->
output value
(atom) ->
value = atom
module.exports = Streamatorium =
accumulator: accumulator
clock: clock
counter: counter
each: each
filter: filter
gate: gate
getJSON: getJSON
identity: identity
invoke: invoke
limit: limit
map: map
pluck: pluck
pollute: ->
Object.keys(Streamatorium).forEach (name) ->
unless name is "pollute"
global[name] = Streamatorium[name]
prettyPrint: prettyPrint
soak: soak
tee: tee
toggle: toggle
#! setup require("/interactive_runtime")