File
Methods
|
Private
Static
client
|
client()
|
|
|
|
|
|
Async
fetchCostMetricsForSync
|
fetchCostMetricsForSync(costEndpoint: any, startTime: Date, endTime: Date)
|
|
|
Parameters :
| Name |
Type |
Optional |
| costEndpoint |
any
|
No
|
| startTime |
Date
|
No
|
| endTime |
Date
|
No
|
Returns : unknown
|
|
Async
fetchModelMetricsForSync
|
fetchModelMetricsForSync(model: Model, startTime: Date, endTime: Date)
|
|
|
Parameters :
| Name |
Type |
Optional |
| model |
Model
|
No
|
| startTime |
Date
|
No
|
| endTime |
Date
|
No
|
Returns : unknown
|
|
Async
fetchModelMetricsFromStore
|
fetchModelMetricsFromStore(model: Model, startTime: Date, endTime: Date)
|
|
|
Parameters :
| Name |
Type |
Optional |
| model |
Model
|
No
|
| startTime |
Date
|
No
|
| endTime |
Date
|
No
|
Returns : unknown
|
|
Async
query
|
query(fluxQuery)
|
|
|
Parameters :
| Name |
Optional |
| fluxQuery |
No
|
Returns : unknown
|
|
Async
queryCostRow
|
queryCostRow(fluxQuery)
|
|
|
Function to query influxdb along with all row properties
Parameters :
| Name |
Optional |
| fluxQuery |
No
|
Returns : unknown
|
|
Async
runQuery
|
runQuery(fluxQuery)
|
|
|
Parameters :
| Name |
Optional |
| fluxQuery |
No
|
Returns : unknown
|
|
Async
saveMetrics
|
saveMetrics(metrics, turoAttributes: any)
|
|
|
Parameters :
| Name |
Type |
Optional |
| metrics |
|
No
|
| turoAttributes |
any
|
No
|
Returns : unknown
|
|
Private
Static
_client
|
Type : null
|
Default value : null
|
|
|
|
Private
Static
bucket
|
Default value : process.env.INFLUX_BUCKET
|
|
|
|
Private
Static
influxdbUrl
|
Default value : process.env.INFLUX_URL
|
|
|
|
Private
Static
org
|
Default value : process.env.INFLUX_ORG
|
|
|
|
Private
Static
token
|
Default value : process.env.INFLUX_TOKEN
|
|
|
import { Injectable } from "@nestjs/common";
import { Model } from "../../models/entities/model.entity";
import { InfluxDB, Point } from "@influxdata/influxdb-client";
@Injectable()
export class InfluxDbProvider {
private static influxdbUrl = process.env.INFLUX_URL;
private static token = process.env.INFLUX_TOKEN;
private static org = process.env.INFLUX_ORG;
private static bucket = process.env.INFLUX_BUCKET;
private static _client = null;
private static client() {
if (!this._client) {
if (!this.influxdbUrl) {
throw new Error(
`Can't intialise InfluxDBProvider - influxdbUrl missing`,
);
}
this._client = new InfluxDB({
url: InfluxDbProvider.influxdbUrl,
token: InfluxDbProvider.token,
});
}
return this._client;
}
async runQuery(fluxQuery) {
const queryApi = InfluxDbProvider.client().getQueryApi(
InfluxDbProvider.org,
);
const response = [];
const _query = async () => {
for await (const { values, tableMeta } of queryApi.iterateRows(
fluxQuery,
)) {
const obj = tableMeta.toObject(values);
response.push(obj);
}
};
await _query();
return response;
}
async query(fluxQuery) {
const data = {};
const result = await this.runQuery(fluxQuery);
result.forEach((row) => {
const measurement = row["_measurement"];
const field = row["_field"];
const value = row["_value"];
const time = new Date(row["_time"]);
const obj = { value: value, time: time.getTime() };
if (!(measurement in data)) {
data[measurement] = {};
}
if (!(field in data[measurement])) {
data[measurement][field] = [];
}
data[measurement][field].push(obj);
});
return data;
}
/**
* Function to query influxdb along with all row properties
* @param fluxQuery
* @returns
*/
async queryCostRow(fluxQuery) {
const data = {};
const result = await this.runQuery(fluxQuery);
result.forEach((row) => {
const measurement = row["_measurement"];
const field = row["_field"];
const value = row["_value"];
const time = new Date(row["_time"]);
const obj = { ...row, value: value, time: time.getTime() };
if (!(measurement in data)) {
data[measurement] = {};
}
if (!(field in data[measurement])) {
data[measurement][field] = [];
}
data[measurement][field].push(obj);
});
return data;
}
async fetchModelMetricsFromStore(
model: Model,
startTime: Date,
endTime: Date,
) {
const fluxQuery = `from(bucket: "${InfluxDbProvider.bucket}")
|> range(start: ${Math.round(startTime.getTime() / 1000)}, stop: ${Math.round(endTime.getTime() / 1000)})
|> filter(fn: (r) => r["turo.model"] == "${model.slug}")
|> filter(fn: (r) => r["_field"] != "flags")
`;
return this.query(fluxQuery);
}
async saveMetrics(metrics, turoAttributes: any) {
const writeApi = InfluxDbProvider.client().getWriteApi(
InfluxDbProvider.org,
InfluxDbProvider.bucket,
);
writeApi.useDefaultTags(turoAttributes);
const points = [];
metrics.forEach((metric) => {
const time = new Date(metric["timestamp"]);
const point = new Point(metric["key"])
.timestamp(time)
.floatField("value", metric["value"]);
points.push(point);
});
await writeApi.writePoints(points);
writeApi.close();
return {
status: true,
message: "WRITE FINISHED",
points: points,
turoAttributes: turoAttributes,
};
}
async fetchModelMetricsForSync(model: Model, startTime: Date, endTime: Date) {
const fluxQuery = `from(bucket: "${InfluxDbProvider.bucket}")
|> range(start: ${Math.round(new Date(null).getTime() / 1000)}, stop: ${Math.round(new Date().getTime() / 1000)})
|> filter(fn: (r) => r["turo.model"] == "${model.slug}")
|> map(fn: (r) => ({ r with turo_timestamp: int(v: r["turo.timestamp"]) }))
|> filter(fn: (r) => r.turo_timestamp >= ${startTime.getTime()} and r.turo_timestamp <= ${endTime.getTime()})
|> filter(fn: (r) => r["_field"] != "flags")
`;
return this.query(fluxQuery);
}
async fetchCostMetricsForSync(
costEndpoint: any,
startTime: Date,
endTime: Date,
) {
const fluxQuery = `from(bucket: "${InfluxDbProvider.bucket}")
|> range(start: ${Math.round(new Date(null).getTime() / 1000)}, stop: ${Math.round(new Date().getTime() / 1000)})
|> filter(fn: (r) => r["turo.costEndpointId"] == "${costEndpoint.id}")
|> map(fn: (r) => ({ r with turo_timestamp: int(v: r["turo.timestamp"]) }))
|> filter(fn: (r) => r.turo_timestamp >= ${startTime.getTime()} and r.turo_timestamp <= ${endTime.getTime()})
|> filter(fn: (r) => r["_field"] != "flags")
`;
return this.queryCostRow(fluxQuery);
}
}