Live Platform
  • Introduction
  • Release Notes
    • Live 3
      • 3.59.0
      • 3.58.0
      • 3.57.0
      • 3.56.0
      • 3.55.0
      • 3.54.0
      • 3.53.0
      • 3.52.0
      • 3.51.0
      • 3.50.0
      • 3.49.0
      • 3.48.0
      • 3.47.0
      • 3.46.0
      • 3.45.0
      • 3.44.0
      • 3.43.0
      • 3.42.0
      • 3.41.0
      • 3.40.0
      • 3.39.0
      • 3.38.0
      • 3.37.0
      • 3.36.0
      • 3.35.0
      • 3.34.0
      • 3.33.0
      • 3.32.0
      • 3.31.0
      • 3.30.0
      • 3.29.0
      • 3.28.0
      • 3.27.0
      • 3.26.0
      • 3.25.0
      • 3.24.0
      • 3.23.0
      • 3.22.0
      • 3.21.0
      • 3.20.0
      • 3.19.0
      • 3.18.0
      • 3.17.0
      • 3.16.0
      • 3.15.0
      • 3.14.0
      • 3.13.0
      • 3.12.0
      • 3.11.0
      • 3.10.0
      • 3.9.0
      • 3.8.0
      • 3.7.0
      • 3.6.0
      • 3.5.0
      • 3.4.0
      • 3.3.0
      • 3.2.0
      • 3.1.0
      • 3.0.0
    • Live 2
  • Articles
    • Creating an aggregation
    • Creating a pipe
  • Theoretical Background
    • Fundamentals
    • Key Advantages
  • Platform Architecture
    • Introduction
    • Queries
    • Glossary
  • Featured plugins
    • Annotations
    • Groovy support
    • Messenger
    • Microsoft Teams
    • MongoDB
    • MongoDB Timeseries
    • MongoDB Kit
    • Purge plugin
    • SQL
    • TCP Input
    • TimescaleDB
  • Data visualization
    • Pipes widgets
      • Temporal
      • Cartesian
      • Multi-value snapshot
      • Single-value snapshot
      • Tables
      • Heatmap
      • JSX Widgets
      • Lollipop
      • Histogram
      • State Timeline
      • Boxplot
    • Pipes modifiers on Pipes charts
  • Alerts and notifications
    • Pipes modifiers on rules
  • Pipes Queries
    • Introduction
    • Dynamic filters
    • Meta parameters
    • Reducer
      • Uniform compress
      • PIP
    • Storage Hints
    • Execution Context
    • Event flow modifiers
  • Developers
    • Plugins
    • Packages
    • Backend API
      • Lookup Tables
      • Extensions
      • Settings
      • Storage Providers
      • Web Services
      • Web Setup
      • Entity Audit
    • Web application
      • Services
        • Point service
        • Menu service
      • Browser Compatibility
      • Runtime modules
        • Core Javascript modules
        • Library modules
        • Adding modules to runtime
      • Localization (i18n)
      • Date formatting
      • Dashboard and Widgets
        • Widget API
        • Custom widget editors
        • Live Event Types
        • Live Widget Configuration
        • Live Widget Packaging
        • Widget Request Interceptors
      • React Contexts
        • Dashboard
        • Dashboard widget
      • Registering Home Page options
    • Python application
    • Subscribing to Live Events
  • Administration
    • Configuration
      • Home Page Customization
      • live.properties
    • Infrastructure Monitoring
    • Storage Monitoring
    • Queries Monitoring
    • Logs Monitoring
    • Data Purging
  • Features
    • Access Permission
    • Datasources
    • Export Dashboard
    • Partial Indexes
    • WebApp Metrics
    • Entity Audit
Powered by GitBook
On this page

Was this helpful?

  1. Articles

Creating a pipe

This article shows how to implement the Swinging Door dynamic compression algorithm as a pipe on top of Pipes, using INTELIE Live API.

@Export("@compress.swingingDoor")
@Help(usage = "@compress.swingingDoor <number k> [<maxDeviation>] [<maxOutputPeriod>] [by <object...>]",
        description = "Compresses the input using the swinging door algorithm")
public class SwingingDoorPipe implements Pipe {
    private static final long serialVersionUID = 1L;
    private static final long BEST_DAY_TO_ESTIMATE_THINGS = 32503427999999L;

    private final GroupBy group;
    private final Double maxDeviation;
    private final Double maxOutputPeriod;
    private final Scalar<Double> expr;
    private final Metadata metadata;
    private final RowFields fields;
    private final Property<Double> timestamp;

    public SwingingDoorPipe(ArgQueue queue) throws PipeException {
        this.group = queue.groupBy();
        this.expr = queue.scalar(Type.NUMBER).getSafe();
        this.maxDeviation = queue.constantValue(Type.NUMBER)
                .getSafe(Q -> 1000d);
        this.maxOutputPeriod = queue.constantValue(Type.NUMBER)
                .getSafe(Q -> {
                    TimeSpan span = TimeSpan.parse(Q.compiler().expression().compileToQueue("@@fullspan").constantValue(Type.STRING).get());
                    return (span.end(BEST_DAY_TO_ESTIMATE_THINGS) - span.start(BEST_DAY_TO_ESTIMATE_THINGS)) / 1024d;
                });
        this.metadata = queue.context().metadata().resetWeights(expr.weight() * (group.isEmpty() ? 1 : Expression.GROUP_MEM_PENALTY));
        this.timestamp = queue.context().timestamp();
        this.fields = this.metadata.getRowFields();
    }

    //the following three methods are only used when running pipes
    //in the distributed mode, otherwise it does not matter what they return 
    @Override
    public boolean split() {
        return false;
    }

    @Override
    public Pipe mapper() {
        return null;
    }

    @Override
    public Pipe reducer() {
        return null;
    }

    @Override
    public Metadata metadata() {
        return metadata;
    }

    @Override
    public PipeInstance newInstance(final Sink listener) {
        return new SwingingDoorPipe.MyInstance(listener);
    }

    @Override
    public PropertyVisitor visit(Scope parent, PropertyVisitor visitor) {
        return visitor;
    }

    private class MyInstance implements PipeInstance, Serializable {
        private static final long serialVersionUID = 1L;

        private final GroupBy.State<SwingingDoorCompression> state;

        public MyInstance(Sink listener) {
            this.state = group.newState(key
                    -> new SwingingDoorCompression(listener, fields, maxDeviation, maxOutputPeriod));
        }

        public void flow(@Nullable Object obj) {
            if (obj == null)
                return;

            Double x = timestamp.eval(null, obj);
            if (x == null || Double.isNaN(x))
                return;

            SwingingDoorCompression compress = state.get(null, obj);

            Double y = expr.eval(null, obj);

            if (y != null && !Double.isNaN(y))
                compress.flow(x, y, obj);
        }

        @Override
        public void flowMany(Iterable iterable) {
            flowManyDefault(this, iterable);
        }

        @Override
        public void turnOn(SchedulerContext context) {
        }

        @Override
        public void destroy() {
            this.destroy(true);
        }

        @Override
        public void destroy(boolean flushTimers) {
        }

        @Override
        public void advanceTo(long timestamp) {
        }

        private void flowManyDefault(PipeInstance handle, Iterable it) {
            if (it == null) return;
            for (Object o : it)
                handle.flow(o);
        }
    }
}

And the compression implementation below.

public class SwingingDoorCompression {

    @NotNull
    private final Sink listener;
    @NotNull
    private final RowFields fields;
    @NotNull
    private final Double maxDeviation;
    @NotNull
    private final Double maxOutputPeriod;

    @NotNull
    private double[] snapshot = new double[2]; // most up to date value that has passed exception
    private boolean dirtySnapshot = true;

    @NotNull
    private double[] archive = new double[2];
    private boolean dirtyArchive = true;

    @NotNull
    private double[] slope = new double[2];
    private boolean dirtySlope = true;

    @Nullable
    private Object acrhiveObj = null;
    @Nullable
    private Object snapshotObj = null;

    @Nullable
    private Object lastFlowed = null;

    public SwingingDoorCompression(Sink listener, RowFields fields, @NotNull Double maxDeviation, @NotNull Double maxOutputPeriod) {
        this.listener = listener;
        this.fields = fields;
        this.maxDeviation = maxDeviation;
        this.maxOutputPeriod = maxOutputPeriod;
    }

    public void flow(@NotNull Double x, @NotNull Double y, Object obj) {
        if (dirtyArchive) {
            setArchive(x, y, obj);
            flow(obj);
            return;
        }

        if (dirtySnapshot || dirtySlope) {
            setSnapshot(x, y, obj);
            setSlopes(x, y);
            return;
        }

        if (inSlope(x, y) && (x.longValue() - maxOutputPeriod) < archive[0]) {
            setSnapshot(x, y, obj);
            setSlopes(x, y);
            return;
        }

        setArchive(snapshot[0], snapshot[1], snapshotObj);

        dirtySnapshot = true;

        if (acrhiveObj != null && acrhiveObj != lastFlowed)
            flow(acrhiveObj);

        flow(obj);
    }

    private void flow(@NotNull Object obj) {
        lastFlowed = obj;
        if (fields != null) {
            if (obj instanceof Row) {
                listener.onEvent(fields, new ArrayRowList((Row) obj));
            }
        } else {
            listener.onRaw(ArrayRawEvent.fromArray(obj));
        }
    }

    @Nullable
    public double[] getSlope() {
        return slope;
    }

    private void setSlopes(@NotNull Double x, @NotNull Double y) {
        dirtySlope = false;
        slope[0] = calculateSlope(x, y + maxDeviation);
        slope[1] = calculateSlope(x, y - maxDeviation);
    }

    private void setArchive(@NotNull Double x, @NotNull Double y, Object obj) {
        dirtyArchive = false;
        archive[0] = x;
        archive[1] = y;
        acrhiveObj = obj;
    }

    private void setSnapshot(@NotNull Double x, @NotNull Double y, Object obj) {
        dirtySnapshot = false;
        snapshot[0] = x;
        snapshot[1] = y;
        snapshotObj = obj;
    }

    private boolean inSlope(@NotNull Double x, @NotNull Double y) {
        return ((slope[0] * x) >= y && (slope[1] * x) <= y);
    }

    @NotNull
    private Double calculateSlope(@NotNull Double x, @NotNull Double y) {
        return (y - archive[1]) / (x - archive[0]);
    }
}

The pipe can be exported using Live API, as follows.

live.pipes().addConstructor(SwingingDoorPipe.class);

The function can then be used in Pipes.

@compress.swingingDoor <number k> [<maxDeviation>] [<maxOutputPeriod>] [by <object...>]
PreviousCreating an aggregationNextFundamentals

Last updated 2 years ago

Was this helpful?