CRDT Streams
In Aqua, an ordinary value is a name that points to a single result:
1
value <- foo()
Copied!
A stream, on the other hand, is a name that points to zero or more results:
1
values: *string
2
​
3
-- Write to a stream several times
4
values <- foo()
5
values <- foo()
6
​
7
-- A value can be pushed to a stream
8
-- without an explicit function call with <<- operator:
9
values <<- "foo"
10
x <- foo()
11
values <<- x
Copied!
Stream is a kind of collection and can be used in place of other collections:
1
func foo(peer: string, relay: ?string):
2
on peer via relay:
3
Op.noop()
4
​
5
func bar(peer: string, relay: string):
6
relayMaybe: *string
7
if peer != %init_peer_id%:
8
-- Wwrite into a stream
9
relayMaybe <<- relay
10
-- Pass a stream as an optional value
11
foo(peer, relayMaybe)
Copied!
But the most powerful use of streams pertains to their use with parallel execution, which incurs non-determinism.

Streams: Lifecycle And Guarantees

A stream's lifecycle can be separated into three stages:
    Source: (Parallel) Writes to a stream
    Map: Handles the stream values
    Sink: Converts the resulting stream into a scalar
Consider the following example:
1
alias PeerId: string
2
​
3
func foo(peers: []PeerId) -> string:
4
-- Store a list of peer IDs collected from somewhere
5
-- This is a stream (denoted by *, which means "0 or more values")
6
resp: *PeerId
7
​
8
-- Will go to all peers in parallel
9
for p <- peers par:
10
-- Move execution flow to the peer p
11
on p:
12
-- Get a peer ID from a service call (called on p)
13
resp <- Srv.call()
14
​
15
-- You can think of resp2 as a locally consistent lazy list
16
resp2: *PeerId
17
​
18
-- What is the value of resp at this point?
19
-- Keep an eye on the `par` there: actually, it's FORKing execution
20
-- to several branches on different peers.
21
for r <- resp par:
22
-- Move execution to peer r
23
on r:
24
-- Call Srv locally
25
resp2 <- Srv.call()
26
​
27
-- Wait for 6 responses on resp2: it's JOIN
28
Op.identity(resp2!5)
29
-- Once we have 5 responses, merge them
30
-- Function treats resp2 as an array of strings, and concatenates all
31
-- of them into a single string.
32
-- This function call "fixes" the content of resp2, making a single observation.
33
-- This is a "stream canonicalization" event: values, order, and length
34
-- is fixed at the moment of first function call, function will not be called
35
-- again, with different data.
36
r <- Srv.concat(resp2)
37
-- You can keep writing to a stream after it's value is used
38
39
<- r
Copied!
In this case, for each peer p in peers, a new PeerID is going to be obtained from the Srv.call and written into the resp stream.
Every peer p in peers does not know anything about how the other iterations proceed.
Once PeerId is written to the resp stream, the second for is triggered. This is the mapping stage.
And then the results are sent to the first peer, to call Op.identity there. This Op.identity waits until element number 5 is defined on resp2 stream.
When the join is complete, the stream is consumed by the concatenation service to produce a scalar value, which is returned.
During execution, involved peers have different views on the state of execution: each of the for parallel branches has no view or access to the other branches' data and eventually, the execution flows to the initial peer. The initial peer then merges writes to the resp stream and to the resp2 stream, respectively. These writes are done in a conflict-free fashion. Furthermore, the respective heads of the resp, resp2 streams will not change from each peer's point of view as they are immutable and new values can only be appended. However, different peers may have a different order of the stream values depending on the order of receiving these values.
Last modified 21d ago