Subscribing to Live Events

A complete guide that demonstrates the backend code which executes a Pipes query and the frontend code which consumes both historical and real-time data.

Data flows through Live as events. Events can be detected or generated by plugins to represent some real condition. Once the data flows, the developer can mark the event to skip the real-time engine or the storage (as described in event flow modifiers). But, in general, the data will flow first to the real-time engine and then will be stored at Intelie Storage Providers as historical data.

From the query point of view, the Pipes expression can manifest the interest by both real-time and historical data. The historical part of the query will be fetched first, and finally, the real-time part of the query will monitor and deliver data to the developer code.

Pipes query execution and data delivery

For short, the plugin backend code can take the responsibility to perform the Pipes query. This way typically is preferred since it allows the developer to encapsulate the query expression at some REST endpoint. For example:

import net.intelie.live.*;

@UseProxy
@Path("/")
public class MyEventService {
    @NotNull
    private final Live live;

    public MyEventService(@NotNull Live live) {
        this.live = live;
    }

    @POST
    @Path("/metrics")
    @NeedsNoAuthority
    @Produces(MediaType.APPLICATION_JSON)
    public List<QueryResponse> followLastDayMetrics() throws Exception {
        Query query = new Query("__metrics")
                .span("last 1 day")
                .follow(true)
                .description("Retrieves the history of 1 day ago and monitors system metrics in real time");
        BayeuxQuery bayeuxQuery = new BayeuxQuery();
        bayeuxQuery.addQuery(query);
        return this.live.queries().registerBayeuxQuery(bayeuxQuery);
    }
}

The Query object can be encapsulated by BayeuxQuery before being registered at Live runtime. The bayeux channel semantics will produce an identifier for the data channel where the data will flow until the frontend. The channel identifier will be encapsulated in a QueryResponse object per each query.

As seen before in Web Services section, the REST endpoint can be registered by plugin backend as following:

live.web().addService("/events", new MyEventService(live))

In summary, in this section we prepared a query to run once the frontend code connects into the data channel to listen by events. It is prepared to deliver both historical and real-time data about the __metrics event type including the last 1 day of data and the follow(true) means the runtime will deliver any real-time data from now on as soon the historical part of the dataset be read from persistent storage (if any).

Event subscription for web applications

Live delivers a frontend API called Live Subscription Service. It allows the developer to request the data channel (typically built upon websockets if your browser supports). Then, the plugin frontend code will be able to receive events produced by Pipes queries encapsulated by REST endpoints, as demonstrated in the previous section.

response = await fetch("/services/plugin-tutorial/events/metrics", 
                       { method: "POST", body: "" })
channels = await response.json()
console.log(channels[0])
// {channel: '/data/7af9ff5eeefd46a2bbfec4768d11c2b3'}

The data channel identified by /data/7af9ff5eeefd46a2bbfec4768d11c2b3 represents a CometD channel and typically the data will be transported by WebSockets if the HTTP server/proxies are properly configured.

However, Live enables a high-level subscription abstraction by the usage of the Live Subscription Service API and simplifies the code the plugin developer needs to write.

import Subscriber from 'live/services/subscription'
const subscription = new Subscriber({
    url: `services/plugin-tutorial/events/metrics`
})
subscription.flush()

The Subscription object delivers the following API:

Method
Parameter
Description

delegate(fn)

fn (function): callback to handle the events

Handles all types of events sent by the Live platform

on(type, fn)

type (string): inform the event type to be used in the filter fn (function): callback to handle the event itself

Handles a single type of events, see Live Event Types

request(params, rIndex)

params: (array) The request parameters rIndex: (number) The request identifier

Create a new Request for the back-end querying code

flush(index)

index: (string) The request identifier

Marks the user code as ready to listen by events

close(index)

index: (string) The request identifier

Closes the underlay cometd channel

A very simple web application code to identify only data events (as described at Live Event Types) looks like the following:

import Subscriber from 'live/services/subscription'

const subscription = new Subscriber({
    url: `services/plugin-tutorial/events/metrics`
})

export type EventMetrics = {
    stimestamp: number
    gtimestamp: number
    timestamp: number
    gperiod: number
    metric: string
    value: Record<string, string | {}>
    __src: 'core'
    __type: '__metrics'
}

subscription.on('event', (events :EventMetrics[]) => {   
    if(events[0].metric === "system") {
        console.log(events[0].timestamp) 
    } 

    if(events[0].metric === "disk") {
        // do something else
    } 
})

subscription.flush()

At subscription.on('event', (events :EventMetrics[]) => {...} we provided a callback to handle the data itself (using event as the event type). This callback receives as the first parameter a batch of data. At this point, the example above only prints out some information at console.log. After that, we use the flush method, which dispatches to the service the plugin code is, finally, ready to receive the events.

In summary, in this section, we create a subscriber by using the Live Subscription Service API, we request the data channel from the backend code REST endpoint /services/plugin-tutorial/events/metrics and Live runtime starts the query once the frontend code flushes the data channel.

Last updated