@fluencelabs/aqua-dht
Aqua implementation of DHT and PubSub
aqua-dht provides PubSub API that can be used for service discovery and event delivery.

Releases

You can find the latest releases of aqua-dht on NPM and changelogs are on GitHub​

API

For the API reference, take a look at pubsub.aqua in the AquaDHT repo.

Terminology

    @fluencelabs/aqua-dht - Aqua library on NPM. Provides high level and low-level APIs to develop custom Aqua scripts. Suits more advanced use-cases.
Both aqua-dht-ts and aqua-dht can be used with TypeScript and JavaScript.
    PubSub - short for Publish-Subscribe. A pattern for messaging and discovery. Subscribers create subscriptions on a topic so that publishers send messages to subscribers.
    Kademlia - algorithm for organizing a peer-to-peer network in such a way that peers can find each other in no more than O(logN) hops, where N is the size of the network.
    topic - string with associated peer_id and a list of subscribers. Associated peer_id can be thought of as a topic creator.
    topic creator - peer id that created the topic. Other users can't create a topic with the same name.
    subscriber - a peer that has called subscribe on a topic, optionally with an associated relay_id and service_id. Any peer can be a subscriber, including nodes and clients.
    subscriber's relay_id - subscriber is available on the network through this relay.
When a subscriber doesn't have a publicly available IP address, e.g. the client peer is a browser, it connects to the network through a relay node. That means that other peers only can connect to that subscriber through a relay.
In that case subscribe must be called with therelay_id, so other peers can reach the subscriber.
    subscriber's service_id - id of the service provided by that subscriber. Sometimes a subscriber may want publishers of a specific topic to call functions on this service_id in order to be able to distinguish among service calls.
    subscription - a DHT record associated with a topic. Holds information about a subscriber.
    subscription lifetime - subscriptions have two are evicted (removed) 24 hours after their creation or after 1 hour of being unused.
    subscriptions limit - each topic can have at most 20 subscribers. Each new subscriber will remove an old one, following a LIFO principle.
    publisher - any peer that has retrieved subscribers of a topic and calls functions on these subscribers
    AquaDHT - a service that provides low-level DHT API. pubsub.aqua is built on top of it.
    DHT key - a low-level representation of a topic
    DHT record - a low-level representation of a subscriber, see Record in Aqua
    host value - a DHT record with peer_id being that of a node. When a node is subscribed to a topic via subscribeNode or initTopicAndSubscribeNode, the subscription is a host value. Host values live much longer (10 days) than other DHT records. See Subscribe to a topic for details.
    script caller - peer that executes a script by sending it to the network. In Aqua it's %init_peer_id%
    node - usually a Fluence node hosted in the cloud. Nodes are long-lived, can host WebAssembly services and they participate in the Kademlia network.

How To Use Aqua-DHT

There are several simple examples in the fluencelabs/aqua-dht repo, give them a look.

Create A Topic

Before subscribing to a topic is possible, that topic must be created. That's exactly what initTopic does.
Here's a rather simplistic example in Aqua:
1
import "@fluencelabs/aqua-dht/pubsub.aqua"
2
​
3
type PeerId: string
4
​
5
func my_function(relay: PeerId, topic: string):
6
initTopic(relay, topic)
Copied!

Subscribe To A Topic

There are 4 functions that create subscriptions. Let's review them.
There are four functions that achieve subscriptions. Let's review them.
These you would use for most of your needs:
    subscribe - subscribes %init_peer_id% to an existing topic.
    initTopicAndSubscribe - creates a topic first and then subscribes %init_peer_id% to it.
And these are needed to subscribe a service to a topic:
    subscribeNode - subscribes a node to an existing topic.
    initTopicAndSubscribeNode - creates a topic first and then subscribes a node to it.
Now, let's review them in more detail.

initTopicAndSubscribe & subscribe

These two subscribe the caller of a script to a topic. initTopicAndSubscribe will create a topic before subscribing, so be careful not to call it on someone else's topic. subscribe simply adds a subscription on an existing topic.
Here's how you could use it in TypeScript:
You first need to have export.aqua file and compile it to TypeScript, see here​
1
import { FluencePeer } from "@fluencelabs/fluence";
2
import { krasnodar } from "@fluencelabs/fluence-network-environment";
3
import { initTopicAndSubscribeBlocking, findSubscribers, subscribe } from "./generated/export";
4
​
5
const relayA = krasnodar[1];
6
// create the first peer and connect it to the network
7
const peerA = new FluencePeer();
8
await peerA.start({ connectTo: relayA });
9
​
10
let topic = "myTopic";
11
let value = "put anything useful here";
12
let serviceId = "Foo";
13
// create topic and subscribe to it
14
await initTopicAndSubscribeBlocking(
15
peerA,
16
topic, value, relayA, serviceId,
17
(s) => console.log(`node ${s} saved the record`)
18
);
19
// this will contain peerA's subscription
20
var subscribers = await findSubscribers(peerA, topic);
21
​
22
const relayB = krasnodar[2];
23
// now, let's create the second peer and connect to the network
24
const peerB = new FluencePeer();
25
await peerB.start({ connectTo: relayB });
26
​
27
// and subscribe it to the same topic
28
await subscribe(peerB, topic);
29
// this will contain both peerA and peerB subscriptions
30
subscribers = await findSubscribers(peerB, topic);
Copied!
There isinitTopicAndSubscribeBlocking and initTopicAndSubscribe.
Blocking version waits until at least a single write is done. The latter version is "fire and forget": it awaitsinstantly, but doesn't guarantee that write has happened.
​

initTopicAndSubscribeNode & subscribeNode

These two functions work almost the same as their non-Node counterparts, except that they subscribe node instead of a caller. This is useful when you want to subscribe a service hosted on the node on a topic.
Such subscriptions live 10 times longer than these by subscribe, because services can't renew subscriptions on their own.

Renew Subscription Periodically

After a normal (non-host) subscription is created, it must be used at least once an hour to keep it from being marked stale and deleted. Also, peers must resubscribe themselves at least once per 24 hours to prevent subscription expiration and deletion.
While this collection schedule may seem aggressive, it keeps keep PubSub up-to-date and performant as short-lived client-peers, such as browsers, can go offline at any time or periodically change their relay nodes.

Call A Function On Subscribers

executeOnSubscribers

aqua-dht provides a function to call a callback on every Record associated with a topic:
1
func executeOnSubscribers(node_id: PeerId, topic: string, call: Record -> ())
Copied!
It reduces boilerplate when writing a custom Aqua script to call a function on each subscriber. Here's an example:
1
import "@fluencelabs/aqua-dht/pubsub.aqua"
2
​
3
-- API that every subscriber must adhere to
4
-- You can think of it as an application protocol
5
service SubscriberAPI:
6
something_happened(value: string)
7
​
8
func call_subscriber(sub: Record):
9
-- topological move to subscriber via relay
10
on sub.peer_id via sub.relay_id:
11
-- resolve service on a subscriber
12
SubscriberAPI sub.service_id
13
-- call function
14
SubscriberAPI.something_happened(sub.value)
15
​
16
-- call SubscriberAPI.something_happened() on every subscriber
17
func call_everyone(relay: PeerId, topic: string):
18
executeOnSubscribers(relay, topic, call_subscriber)
Copied!

Passing data to subscribers

However, because of how callbacks work, currently executeOnSubscribers doesn't allow us to send dynamic data to subscribers. Overcoming this is as easy as writing a for loop:
Consider this Aqua code:
1
import "@fluencelabs/aqua-dht/pubsub.aqua"
2
​
3
-- Application event
4
data Event:
5
value: string
6
​
7
-- API that every subscriber must adhere to
8
-- You can think of it as an application protocol
9
service SubscriberAPI:
10
receive_event(event: Event)
11
​
12
func call_subscriber(sub: Record, event: Event):
13
-- topological move to subscriber via relay
14
on sub.peer_id via sub.relay_id:
15
-- resolve service on a subscriber
16
SubscriberAPI sub.service_id
17
-- call function
18
SubscriberAPI.receive_event(event)
19
​
20
-- send event to every subscriber
21
func send_everyone(relay: PeerId, topic: string, event: Event):
22
-- retrieve all subscribers of a topic
23
subscribers <- findSubscribers(relay, topic)
24
-- iterate through them
25
for sub <- subscribers par:
26
call_subscriber(sub, event)
Copied!

Handle Function Calls

subscribeToEvent function from the Fluence JS SDK allows JS/TS peers to define their API through services and functions.
Let's take the SubscriberAPI from the previous example and extend it a little:
1
data Event:
2
value: string
3
4
service SubscriberAPI:
5
-- receive an event
6
receive_event(event: Event)
7
-- do something and return data
8
do_something(value: string) -> u64
9
​
Copied!
Let's save this file to subscriber_api.aqua and compile it
1
npx aqua -i . -o src/generated
Copied!
1
import { Fluence } from "@fluencelabs/fluence";
2
import { krasnodar } from "@fluencelabs/fluence-network-environment";
3
import { registerSubscriberAPI, SubscriberAPIDef } from "./generated/subscriber_api"
4
​
5
await Fluence.start({ connectTo: krasnodar[2] });
6
​
7
let service_id = 'api';
8
let counter = 0;
9
​
10
await registerSubscriberAPI(service_id, {
11
receive_event: (event: any) => {
12
console.log("event received!", event);
13
},
14
do_something: (value: any) => {
15
counter += 1;
16
console.log("doing logging!", value, counter);
17
return counter;
18
}
19
});
20
​
Copied!

Overcome The Subscriber Limit

If your app requires to have more than 20 subscribers on a single topic, then it's time to think about a custom WebAssembly service that would store all these subscriptions in memory or on disk. Basically a simple subscriptions "directory".
With such a service implemented and deployed, you can use pubsub.aqua to subscribe that "subscriptions directory" service to a topic. Depending on your app's architecture, you might want to have several instances of "subscriptions directory" service.
The code to get all subscriptions from "directory" services might look something like this in Aqua:
1
import "@fluencelabs/aqua-dht/pubsub.aqua"
2
​
3
service SubDir:
4
get_subscribers(topic: string) -> []Record
5
​
6
func dir_subscribers(relay: PeerId, topic: string) -> [][]Record:
7
-- this stream will hold all subscribers
8
allSubs: *[]Record
9
-- retrieve SubDir subscribers from AquaDHT
10
subscribers <- getSubscribers(relay, topic)
11
-- iterate through all SubDir services
12
for dir <- subscribers:
13
on dir.peer_id:
14
SubDir dir.service_id
15
-- get all subscribers from SubDir and write to allSubs
16
allSubs <- SubDir.get_subscribers(topic)
17
<- allSubs
Copied!

Concepts

Kademlia Neighborhood

Fluence nodes participate in the Kademlia network. Kademlia organizes peers in such a way that given any key, you can find a set of peers that are "responsible" for that key. That set contains up to 20 nodes.
That set is called "neighborhood" or "K-closest nodes" (K=20). In Aqua, it is accessible in aqua-lib via the Kad.neighbourhood function.
The two most important properties of the Kademlia neighborhood are: 1) it exists for any key 2) it is more or less stable
A lot of DHTs rely on these properties to implement data replication. So does AquaDHT.

Data replication

On write

When a subscription to a topic is created, it is written to the Kademlia neighborhood of that topic. Here's a subscribe implementation in Aqua:
1
func subscribe(node_id: PeerId, topic: string, value: string, relay_id: ?PeerId, service_id: ?string):
2
-- get neighbourhood for a topic
3
nodes <- getNeighbours(node_id, topic)
4
-- iterate through each node in the neighbourhood
5
for n <- nodes par:
6
on n:
7
try:
8
t <- Peer.timestamp_sec()
9
-- store subscription on each node in the neighbourhood
10
AquaDHT.put_value(topic, value, t, relay_id, service_id, 0)
Copied!
This ensures that data is replicated across several peers.

At rest

Subscriptions are also replicated "at rest". That is, once per hour all stale values are removed, and non-stale values are replicated to all nodes in the neighborhood.
This ensures that even if a neighborhood for a topic has changed due to some peers go offline and others join the network, data will be replicated to all nodes in the neighborhood.
For advanced users accustomed to AIR scripts:
There's an implementation of "at rest" replication for AquaDHT on GitHub​
Last modified 25d ago