File

src/providers/influxDB/influxdb.provider.ts

Index

Properties
Methods

Methods

Private Static client
client()
Returns : any
Async fetchCostMetricsForSync
fetchCostMetricsForSync(costEndpoint: any, startTime: Date, endTime: Date)
Parameters :
Name Type Optional
costEndpoint any No
startTime Date No
endTime Date No
Returns : unknown
Async fetchModelMetricsForSync
fetchModelMetricsForSync(model: Model, startTime: Date, endTime: Date)
Parameters :
Name Type Optional
model Model No
startTime Date No
endTime Date No
Returns : unknown
Async fetchModelMetricsFromStore
fetchModelMetricsFromStore(model: Model, startTime: Date, endTime: Date)
Parameters :
Name Type Optional
model Model No
startTime Date No
endTime Date No
Returns : unknown
Async query
query(fluxQuery)
Parameters :
Name Optional
fluxQuery No
Returns : unknown
Async queryCostRow
queryCostRow(fluxQuery)

Function to query influxdb along with all row properties

Parameters :
Name Optional
fluxQuery No
Returns : unknown
Async runQuery
runQuery(fluxQuery)
Parameters :
Name Optional
fluxQuery No
Returns : unknown
Async saveMetrics
saveMetrics(metrics, turoAttributes: any)
Parameters :
Name Type Optional
metrics No
turoAttributes any No
Returns : unknown

Properties

Private Static _client
Type : null
Default value : null
Private Static bucket
Default value : process.env.INFLUX_BUCKET
Private Static influxdbUrl
Default value : process.env.INFLUX_URL
Private Static org
Default value : process.env.INFLUX_ORG
Private Static token
Default value : process.env.INFLUX_TOKEN
import { Injectable } from "@nestjs/common";
import { Model } from "../../models/entities/model.entity";
import { InfluxDB, Point } from "@influxdata/influxdb-client";

@Injectable()
export class InfluxDbProvider {
  private static influxdbUrl = process.env.INFLUX_URL;
  private static token = process.env.INFLUX_TOKEN;
  private static org = process.env.INFLUX_ORG;
  private static bucket = process.env.INFLUX_BUCKET;

  private static _client = null;

  private static client() {
    if (!this._client) {
      if (!this.influxdbUrl) {
        throw new Error(
          `Can't intialise InfluxDBProvider - influxdbUrl missing`,
        );
      }
      this._client = new InfluxDB({
        url: InfluxDbProvider.influxdbUrl,
        token: InfluxDbProvider.token,
      });
    }
    return this._client;
  }

  async runQuery(fluxQuery) {
    const queryApi = InfluxDbProvider.client().getQueryApi(
      InfluxDbProvider.org,
    );
    const response = [];
    const _query = async () => {
      for await (const { values, tableMeta } of queryApi.iterateRows(
        fluxQuery,
      )) {
        const obj = tableMeta.toObject(values);
        response.push(obj);
      }
    };
    await _query();
    return response;
  }

  async query(fluxQuery) {
    const data = {};
    const result = await this.runQuery(fluxQuery);
    result.forEach((row) => {
      const measurement = row["_measurement"];

      const field = row["_field"];
      const value = row["_value"];
      const time = new Date(row["_time"]);
      const obj = { value: value, time: time.getTime() };

      if (!(measurement in data)) {
        data[measurement] = {};
      }
      if (!(field in data[measurement])) {
        data[measurement][field] = [];
      }

      data[measurement][field].push(obj);
    });
    return data;
  }

  /**
   * Function to query influxdb along with all row properties
   * @param fluxQuery
   * @returns
   */
  async queryCostRow(fluxQuery) {
    const data = {};
    const result = await this.runQuery(fluxQuery);
    result.forEach((row) => {
      const measurement = row["_measurement"];

      const field = row["_field"];
      const value = row["_value"];
      const time = new Date(row["_time"]);
      const obj = { ...row, value: value, time: time.getTime() };

      if (!(measurement in data)) {
        data[measurement] = {};
      }
      if (!(field in data[measurement])) {
        data[measurement][field] = [];
      }

      data[measurement][field].push(obj);
    });
    return data;
  }

  async fetchModelMetricsFromStore(
    model: Model,
    startTime: Date,
    endTime: Date,
  ) {
    const fluxQuery = `from(bucket: "${InfluxDbProvider.bucket}")
      |> range(start: ${Math.round(startTime.getTime() / 1000)}, stop: ${Math.round(endTime.getTime() / 1000)})
      |> filter(fn: (r) => r["turo.model"] == "${model.slug}")
      |> filter(fn: (r) => r["_field"] != "flags")
    `;
    return this.query(fluxQuery);
  }

  async saveMetrics(metrics, turoAttributes: any) {
    const writeApi = InfluxDbProvider.client().getWriteApi(
      InfluxDbProvider.org,
      InfluxDbProvider.bucket,
    );
    writeApi.useDefaultTags(turoAttributes);
    const points = [];
    metrics.forEach((metric) => {
      const time = new Date(metric["timestamp"]);
      const point = new Point(metric["key"])
        .timestamp(time)
        .floatField("value", metric["value"]);
      points.push(point);
    });
    await writeApi.writePoints(points);
    writeApi.close();
    return {
      status: true,
      message: "WRITE FINISHED",
      points: points,
      turoAttributes: turoAttributes,
    };
  }

  async fetchModelMetricsForSync(model: Model, startTime: Date, endTime: Date) {
    const fluxQuery = `from(bucket: "${InfluxDbProvider.bucket}")
      |> range(start: ${Math.round(new Date(null).getTime() / 1000)}, stop: ${Math.round(new Date().getTime() / 1000)})
      |> filter(fn: (r) => r["turo.model"] == "${model.slug}")
      |> map(fn: (r) => ({ r with turo_timestamp: int(v: r["turo.timestamp"]) }))
      |> filter(fn: (r) => r.turo_timestamp >= ${startTime.getTime()} and r.turo_timestamp <= ${endTime.getTime()})
      |> filter(fn: (r) => r["_field"] != "flags")
    `;
    return this.query(fluxQuery);
  }

  async fetchCostMetricsForSync(
    costEndpoint: any,
    startTime: Date,
    endTime: Date,
  ) {
    const fluxQuery = `from(bucket: "${InfluxDbProvider.bucket}")
      |> range(start: ${Math.round(new Date(null).getTime() / 1000)}, stop: ${Math.round(new Date().getTime() / 1000)})
      |> filter(fn: (r) => r["turo.costEndpointId"] == "${costEndpoint.id}")
      |> map(fn: (r) => ({ r with turo_timestamp: int(v: r["turo.timestamp"]) }))
      |> filter(fn: (r) => r.turo_timestamp >= ${startTime.getTime()} and r.turo_timestamp <= ${endTime.getTime()})
      |> filter(fn: (r) => r["_field"] != "flags")
    `;
    return this.queryCostRow(fluxQuery);
  }
}

results matching ""

    No results matching ""