Live Platform
  • Introduction
  • Release Notes
    • Live 3
      • 3.59.0
      • 3.58.0
      • 3.57.0
      • 3.56.0
      • 3.55.0
      • 3.54.0
      • 3.53.0
      • 3.52.0
      • 3.51.0
      • 3.50.0
      • 3.49.0
      • 3.48.0
      • 3.47.0
      • 3.46.0
      • 3.45.0
      • 3.44.0
      • 3.43.0
      • 3.42.0
      • 3.41.0
      • 3.40.0
      • 3.39.0
      • 3.38.0
      • 3.37.0
      • 3.36.0
      • 3.35.0
      • 3.34.0
      • 3.33.0
      • 3.32.0
      • 3.31.0
      • 3.30.0
      • 3.29.0
      • 3.28.0
      • 3.27.0
      • 3.26.0
      • 3.25.0
      • 3.24.0
      • 3.23.0
      • 3.22.0
      • 3.21.0
      • 3.20.0
      • 3.19.0
      • 3.18.0
      • 3.17.0
      • 3.16.0
      • 3.15.0
      • 3.14.0
      • 3.13.0
      • 3.12.0
      • 3.11.0
      • 3.10.0
      • 3.9.0
      • 3.8.0
      • 3.7.0
      • 3.6.0
      • 3.5.0
      • 3.4.0
      • 3.3.0
      • 3.2.0
      • 3.1.0
      • 3.0.0
    • Live 2
  • Articles
    • Creating an aggregation
    • Creating a pipe
  • Theoretical Background
    • Fundamentals
    • Key Advantages
  • Platform Architecture
    • Introduction
    • Queries
    • Glossary
  • Featured plugins
    • Annotations
    • Groovy support
    • Messenger
    • Microsoft Teams
    • MongoDB
    • MongoDB Timeseries
    • MongoDB Kit
    • Purge plugin
    • SQL
    • TCP Input
    • TimescaleDB
  • Data visualization
    • Pipes widgets
      • Temporal
      • Cartesian
      • Multi-value snapshot
      • Single-value snapshot
      • Tables
      • Heatmap
      • JSX Widgets
      • Lollipop
      • Histogram
      • State Timeline
      • Boxplot
    • Pipes modifiers on Pipes charts
  • Alerts and notifications
    • Pipes modifiers on rules
  • Pipes Queries
    • Introduction
    • Dynamic filters
    • Meta parameters
    • Reducer
      • Uniform compress
      • PIP
    • Storage Hints
    • Execution Context
    • Event flow modifiers
  • Developers
    • Plugins
    • Packages
    • Backend API
      • Lookup Tables
      • Extensions
      • Settings
      • Storage Providers
      • Web Services
      • Web Setup
      • Entity Audit
    • Web application
      • Services
        • Point service
        • Menu service
      • Browser Compatibility
      • Runtime modules
        • Core Javascript modules
        • Library modules
        • Adding modules to runtime
      • Localization (i18n)
      • Date formatting
      • Dashboard and Widgets
        • Widget API
        • Custom widget editors
        • Live Event Types
        • Live Widget Configuration
        • Live Widget Packaging
        • Widget Request Interceptors
      • React Contexts
        • Dashboard
        • Dashboard widget
      • Registering Home Page options
    • Python application
    • Subscribing to Live Events
  • Administration
    • Configuration
      • Home Page Customization
      • live.properties
    • Infrastructure Monitoring
    • Storage Monitoring
    • Queries Monitoring
    • Logs Monitoring
    • Data Purging
  • Features
    • Access Permission
    • Datasources
    • Export Dashboard
    • Partial Indexes
    • WebApp Metrics
    • Entity Audit
Powered by GitBook
On this page
  • The aggregation
  • Testing

Was this helpful?

  1. Articles

Creating an aggregation

This article shows an example of creating a function to calculate the harmonic mean of a sequence in INTELIE Pipes.

The aggregation

To create the aggregation, the interface SimpleAggregation should be implemented.

The implementation contains the state and the intermediate representation of the aggregation.

The method marked with @Export is exposed as a Pipes function. Its body updates the state of the aggregation.

The eval() method provides the result of the calculation from its state.

The merge() method allows combining many states into one. This is used mainly by aggregation windows, to combine partial results produced inside it, but it can also be used to run Pipes' aggregations in a distributed system. Note that this means the aggregation should ideally be an associative operation.

import net.intelie.pipes.Export;
import net.intelie.pipes.Help;
import net.intelie.pipes.SimpleAggregation;

public class HarmonicMean implements SimpleAggregation.Full<HarmonicMean, Double> {
    private long count;
    private double inverseSum;
    
    @Help(key = "aggregation-harmonicMean") // key is used for standard library functions
    @Export("harmonicMean")
    public void harmonicMean(double x) {
        count++;
        inverseSum += 1 / x;
    }
    
    @Override
    public Double eval() {
        if (count == 0) return 0.0;
        return count / inverseSum;
    }
    
    @Override
    public void clear() {
        count = 0;
        inverseSum = 0;
    }
    
    @Override
    public void merge(HarmonicMean tree) {
        count += tree.count;
        inverseSum += tree.inverseSum;
    }
    
    @Override
    public void unmerge(HarmonicMean tree) {
        count -= tree.count;
        inverseSum -= tree.inverseSum;
    }
}

The unmerge method, defined by the SimpleAggregation.Full class, can be implemented as an optimization for sliding windows intermediate representation calculation, if the aggregation function is invertible. Otherwise, Pipes uses a unoptimized 2-stack-based algorithm to calculate sliding windows, which may use more resources.

Testing

Pipes provides some tools that help unit testing. An easy way is to validate the results after some state flips.

import net.intelie.pipes.Aggregation;
import net.intelie.pipes.State;
import net.intelie.pipes.Tree;
import net.intelie.pipes.support.Assertions;
import net.intelie.pipes.support.EqualsWithDelta;
import net.intelie.pipes.support.TestArgs;
import org.junit.Test;
import java.util.Collections;

public class HarmonicMeanTest {
    @Test
    public void testAggregateTwoValues() throws Exception {
        Aggregation<Double> aggregation = TestArgs.makeSimple("harmonicMean", HarmonicMean.class, "x#");
        
        State state = aggregation.newState(0);
        state.yield(null, Collections.singletonMap("x", 1));
        state.yield(null, Collections.singletonMap("x", 3));
        state.yield(null, Collections.singletonMap("x", 8));
        
        Tree tree1 = state.flip();
        state.yield(null, Collections.singletonMap("x", 2));
        state.yield(null, Collections.singletonMap("x", 5));
        state.yield(null, Collections.singletonMap("x", 13));
        
        Tree tree2 = state.flip();
        
        Assertions.assertMerge(aggregation, tree1, tree2,
                new EqualsWithDelta(0.0, 1e-6),
                new EqualsWithDelta(3 / (1 + 1 / 3.0 + 1 / 8.0), 1e-6),
                new EqualsWithDelta(3 / (1 / 2.0 + 1 / 5.0 + 1 / 13.0), 1e-6),
                new EqualsWithDelta(2.6842558072842, 1e-6)
        );
    }
}

PreviousLive 2NextCreating a pipe

Last updated 2 years ago

Was this helpful?