Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/src/main/java/io/ddf/content/ViewHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ public String toString() {
@Override
public String toSql() {
if (name == null) {
throw new IllegalArgumentException("Missing Operator name from Adatao client for operands[] "
throw new IllegalArgumentException("Missing Operator name from DDF client for operands[] "
+ Arrays.toString(operands));
}
switch (name) {
Expand Down
127 changes: 127 additions & 0 deletions core/src/main/java/io/ddf/etl/ATimeSeriesHandler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package io.ddf.etl;


import io.ddf.DDF;
import io.ddf.analytics.ABinningHandler.BinningType;
import io.ddf.exception.DDFException;
import io.ddf.misc.ADDFFunctionalGroupHandler;
import java.util.List;
import java.util.concurrent.TimeUnit;
import com.google.common.collect.Lists;

public abstract class ATimeSeriesHandler extends ADDFFunctionalGroupHandler implements IHandleTimeSeries {

protected String mTimestampColumn;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need to keep mTimestamColumn and mTsIDColumn as instance variable, they already passed in by downsample function right?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I planned to use them in save_ts method, you can modify that when working on save_ts impl

protected String mTsIDColumn = null;


public ATimeSeriesHandler(DDF theDDF) {
super(theDDF);

}

public void setTimeStampColumn(String colName) {
mTimestampColumn = colName;
}

public String getTimeStampColumn() {
return mTimestampColumn;
}


public String getTsIDColumn() {
return mTsIDColumn;
}

public void setTsIDColumn(String colName) {
this.mTsIDColumn = colName;
}

@Override
public DDF downsample(String timestampColumn, List<String> aggregateFunctions, int interval, TimeUnit timeUnit)
throws DDFException {

this.mTimestampColumn = timestampColumn;
List<String> groupByCols = Lists.newArrayList(timestampColumn);
if (mTsIDColumn != null && !mTsIDColumn.isEmpty()) {
groupByCols.add(mTsIDColumn);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mTsIDColumn shouldn't be added here, this method should only downsample based on timestampColumn.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

downsampling should be for each mTsIDColumn

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is mTsIDColumn?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ID of a time series

}

long intervalInSeconds = timeUnit.toSeconds(interval);

int numBins = getNumBins(intervalInSeconds);
DDF binnedDDF = this.getDDF().binning(timestampColumn, BinningType.EQUALINTERVAL.toString(), numBins, null, false,
true, true);
DDF newDDF = binnedDDF.groupBy(groupByCols, aggregateFunctions);

return newDDF;
}

@Override
public DDF downsample(String timestampColumn, String tsIDColumn, List<String> aggregateFunctions, int interval,
TimeUnit timeUnit) throws DDFException {

this.mTsIDColumn = tsIDColumn;
List<String> rs = getDistinctValues(tsIDColumn);

DDF ddf0 = filterByValue(tsIDColumn, rs.get(0));

ddf0.getTimeSeriesHandler().setTsIDColumn(tsIDColumn);
DDF newDDF = ddf0.getTimeSeriesHandler().downsample(timestampColumn, aggregateFunctions, interval, timeUnit);
if (rs.size() > 1) {
for (int i = 1; i < rs.size(); i++) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There 1 key for most usecases for now. I wanted to quickly reuse DDF binning method here. Other KPIs like col diff or moving average used SparkDF window functions.

DDF filteredDDF = filterByValue(tsIDColumn, rs.get(i));
filteredDDF.getTimeSeriesHandler().setTsIDColumn(tsIDColumn);
DDF nextDDF = filteredDDF.getTimeSeriesHandler().downsample(timestampColumn, aggregateFunctions, interval,
timeUnit);
newDDF = newDDF.getJoinsHandler().merge(nextDDF);
}
}
return newDDF;
}

@Override
public DDF addDiffColumn(String timestampColumn, String colToGetDiff, String diffColumn) throws DDFException{
return addDiffColumn(timestampColumn, null, colToGetDiff, diffColumn);
}

@Override
public DDF addDiffColumn(String timestampColumn, String tsIDColumn, String colToGetDiff, String diffColumn)
throws DDFException {
// TODO Auto-generated method stub
return null;
}

@Override
public DDF computeMovingAverage(String timestampColumn, String tsIDColumn, String colToComputeMovingAverage,
String movingAverageColName, int windowSize) throws DDFException {
// TODO Auto-generated method stub
return null;
}

@Override
public void saveTimeSeriesToCSV(String pathToStorage) {
// TODO Auto-generated method stub

}

private int getNumBins(long intervalInSeconds) throws DDFException {
long minTimeStamp = this.getDDF().getVectorMin(mTimestampColumn).longValue();
long maxTimeStamp = this.getDDF().getVectorMax(mTimestampColumn).longValue();
int numBins = (int) ((maxTimeStamp - minTimeStamp) / intervalInSeconds);
return numBins;

}

private List<String> getDistinctValues(String colName) throws DDFException {
String sqlCmd = String.format("SELECT distinct(%s) FROM %s", colName, this.getDDF().getTableName());
List<String> rs = this.getManager().sql(sqlCmd, this.getEngine()).getRows();
return rs;
}

private DDF filterByValue(String colName, String value) throws DDFException {
String sqlCmd = String.format("SELECT * FROM %s WHERE %s = '%s'", this.getDDF().getTableName(), colName, value);
DDF filteredDDF = this.getDDF().getSqlHandler().sql2ddf(sqlCmd);
return filteredDDF;
}
}
29 changes: 29 additions & 0 deletions core/src/main/java/io/ddf/etl/IHandleTimeSeries.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package io.ddf.etl;

import java.util.List;
import io.ddf.DDF;
import io.ddf.exception.DDFException;
import io.ddf.misc.IHandleDDFFunctionalGroup;
import java.util.concurrent.TimeUnit;

public interface IHandleTimeSeries extends IHandleDDFFunctionalGroup {

void setTimeStampColumn(String colName);

void setTsIDColumn(String colName);

String getTimeStampColumn() throws DDFException;

DDF downsample(String timestampColumn, List<String> aggregateFunctions, int interval, TimeUnit timeUnit) throws DDFException;

DDF downsample(String timestampColumn, String tsIDColumn, List<String> aggregateFunctions, int interval, TimeUnit timeUnit) throws DDFException;

DDF addDiffColumn(String timestampColumn, String colToGetDiff, String diffColName) throws DDFException;

DDF addDiffColumn(String timestampColumn, String tsIDColumn, String colToGetDiff, String diffColName) throws DDFException;

DDF computeMovingAverage(String timestampColumn, String tsIDColumn, String colToComputeMovingAverage, String movingAverageColName,
int windowSize) throws DDFException;

void saveTimeSeriesToCSV(String path);
}
6 changes: 0 additions & 6 deletions core/src/main/java/io/ddf/misc/IHandleTimeSeries.java

This file was deleted.

3 changes: 3 additions & 0 deletions ddf-conf/ddf.ini
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ ISupportMLMetrics = io.ddf.spark.ml.MLMetricsSupporter
IHandleTransformations = io.ddf.etl.TransformationHandler
IHandleMutability = io.ddf.content.MutabilityHandler
IHandleMissingData = io.ddf.etl.MissingDataHandler
;IHandleTimeSeries = io.ddf.etl.TimeSeriesHandler
;IHandleSql = io.ddf.etl.SqlHandler
;IRunAlgorithms = io.ddf.analytics.AlgorithmRunner
MAX_LEVELS_COUNT = 10000
Expand All @@ -45,6 +46,8 @@ ISupportMLMetrics = io.ddf.spark.ml.MLMetricsSupporter
IHandleBinning = io.ddf.spark.analytics.BinningHandler
IHandleMutability = io.ddf.content.MutabilityHandler
IHandleMissingData = io.ddf.etl.MissingDataHandler
IHandleTimeSeries = io.ddf.spark.etl.TimeSeriesHandler
IHandlePersistence = io.ddf.spark.content.PersistenceHandler
kmeans = org.apache.spark.mllib.clustering.KMeans
linearRegressionLasso = org.apache.spark.mllib.regression.LassoWithSGD
linearRegressionWithSGD = org.apache.spark.mllib.regression.LinearRegressionWithSGD
Expand Down
Loading