File

src/models/model.service.ts

Index

Properties
Methods

Constructor

constructor(prismaService: PrismaService, projectService: ProjectService, mlflowProvider: MLflowProvider, opentelemetryProvider: OpentelemetryProvider, temporalProvider: TemporalProvider, notificationService: NotificationService)
Parameters :
Name Type Optional
prismaService PrismaService No
projectService ProjectService No
mlflowProvider MLflowProvider No
opentelemetryProvider OpentelemetryProvider No
temporalProvider TemporalProvider No
notificationService NotificationService No

Methods

Async deleteModel
deleteModel(id: number)
Parameters :
Name Type Optional
id number No
Returns : unknown
Async deleteRelatedMetrics
deleteRelatedMetrics(modelId: number)
Parameters :
Name Type Optional
modelId number No
Returns : unknown
Async fetchMetricsFromSource
fetchMetricsFromSource(model: Model)
Parameters :
Name Type Optional
model Model No
Returns : unknown
Async fetchModelMetricsForSync
fetchModelMetricsForSync(id: number, queryOptions: literal type)
Parameters :
Name Type Optional
id number No
queryOptions literal type No
Returns : unknown
Async fetchModelMetricsFromStore
fetchModelMetricsFromStore(id: number, queryOptions: literal type)
Parameters :
Name Type Optional
id number No
queryOptions literal type No
Returns : unknown
Async findAll
findAll()
Returns : Promise<Model[]>
Async findFiltered
findFiltered(provider?: ModelProvider, status?: ModelStatus)
Parameters :
Name Type Optional
provider ModelProvider Yes
status ModelStatus Yes
Returns : unknown
Async findForProject
findForProject(projectId: number)
Parameters :
Name Type Optional
projectId number No
Returns : Promise<Model[]>
Async findOne
findOne(id: number)
Parameters :
Name Type Optional
id number No
Async findOrThrow
findOrThrow(id: number)

Fetches model - Throws an error if model doesn't exist

Parameters :
Name Type Optional Description
id number No

model Id

Returns : Promise<Model>
Async findToSync
findToSync()
Returns : unknown
Async getModelInfo
getModelInfo(id: number)
Parameters :
Name Type Optional
id number No
Returns : unknown
Async pushMetrics
pushMetrics(metrics, modelId)
Parameters :
Name Optional
metrics No
modelId No
Returns : unknown
Async registerModel
registerModel(model: RegisterModelDto)
Parameters :
Name Type Optional
model RegisterModelDto No
Returns : unknown
Async requestModelRegistration
requestModelRegistration(request: RegisterModelRequestDto)
Parameters :
Name Type Optional
request RegisterModelRequestDto No
Returns : unknown
Async saveModelMetrics
saveModelMetrics(model: Model, metrics: any, turoAttributes: any)
Parameters :
Name Type Optional
model Model No
metrics any No
turoAttributes any No
Returns : unknown
Async updateLastSeenTime
updateLastSeenTime(id: number, lastSeenAt: Date)
Parameters :
Name Type Optional
id number No
lastSeenAt Date No
Returns : unknown
Async updateLastSyncTime
updateLastSyncTime(id: number, lastSyncedAt: Date)
Parameters :
Name Type Optional
id number No
lastSyncedAt Date No
Returns : unknown
Async updateModelName
updateModelName(id: number, requestBody: EditModelDto)
Parameters :
Name Type Optional
id number No
requestBody EditModelDto No
Returns : unknown
Async updateModelStatus
updateModelStatus(id: number, status: ModelStatus)
Parameters :
Name Type Optional
id number No
status ModelStatus No
Returns : unknown

Properties

Private providerServiceMap
Type : object
Default value : { [ModelProvider.MLFLOW]: this.mlflowProvider, [ModelProvider.OPENTELEMETRY]: this.opentelemetryProvider, }
import { ForbiddenException, Injectable } from "@nestjs/common";
import { PrismaService } from "../common/prisma/prisma.service";
import { Model, ModelProvider, ModelStatus } from "./entities/model.entity";
import { toKebabCase, getNamesFromEmail } from "../common/helper";
import {
  RegisterModelDto,
  RegisterModelRequestDto,
} from "./dto/register-model.dto";
import { MLflowProvider } from "../providers/mlflow/mlflow.provider";
import { OpentelemetryProvider } from "../providers/opentelemetry/opentelemetry.provider";
import { ProjectService } from "../project/project.service";
import { TemporalProvider } from "../providers/temporal/temporal.provider";
import { EditModelDto } from "./dto/edit-model.dto";
import { NotificationService } from "../notification/notification.service";
import { modelEventsConfig, ModelEvents } from "./model.events";
import { EventType } from "../common/events";
import { NotificationEntityType } from "../notification/types/notification.enums";
import { MetricSources, MetricStatus } from "../metrics/entities/metrics.entity";
@Injectable()
export class ModelService {
  private providerServiceMap = {
    [ModelProvider.MLFLOW]: this.mlflowProvider,
    [ModelProvider.OPENTELEMETRY]: this.opentelemetryProvider,
  };

  constructor(
    private readonly prismaService: PrismaService,
    private readonly projectService: ProjectService,
    private readonly mlflowProvider: MLflowProvider,
    private readonly opentelemetryProvider: OpentelemetryProvider,
    private readonly temporalProvider: TemporalProvider,
    private readonly notificationService: NotificationService,
  ) {}

  async pushMetrics(metrics, modelId) {
    const model = await this.findOne(+modelId);
    const project = await this.projectService.findOne(+model.projectID);

    if (model.provider != ModelProvider.OPENTELEMETRY) {
      // Blocked pushing metrics via metrics endpoint for providers other than OPENTELEMETRY
      throw new ForbiddenException(
        "Failed to push model metrics - unsupported action",
      );
    }

    const seenAt = new Date();

    const turoAttributes = [
      {
        key: "turo.model",
        value: {
          stringValue: `${model.slug}`,
        },
      },
      {
        key: "turo.project",
        value: {
          stringValue: `${project.slug}`,
        },
      },
      {
        key: "turo.timestamp",
        value: {
          intValue: seenAt.getTime(),
        },
      },
    ];

    const response = await this.providerServiceMap[model.provider].saveMetrics(
      metrics,
      turoAttributes,
    );

    await this.updateLastSeenTime(model.id, seenAt);

    try {
      await this.temporalProvider.runAsync(
        "SyncModelMetrics",
        [model],
        process.env.DEFAULT_QUEUE_NAME,
        `workflow-SyncModelMetrics-model-${model.id}`,
        "10s",
      );
    } catch (error) {
      if (error.name === TemporalProvider.WORKFLOW_EXECUTION_ALREADY_STARTED_ERROR) {
        console.log("Workflow already running - ignoring workflow run request");
      } else {
        console.log("Error running workflow", error);
        throw error;
      }
    }

    return response.json();
  }

  async fetchMetricsFromSource(model: Model) {
    const metrics =
      await this.providerServiceMap[model.provider].fetchMetricsFromSource(
        model,
      );
    return metrics;
  }

  async saveModelMetrics(model: Model, metrics: any, turoAttributes: any) {
    const response = await this.providerServiceMap[model.provider].saveMetrics(
      metrics,
      turoAttributes,
    );
    return response;
  }

  async fetchModelMetricsFromStore(
    id: number,
    queryOptions: {
      startTime: number;
      endTime: number;
    },
  ) {
    const model = await this.findOne(+id);
    return this.providerServiceMap[model.provider].fetchModelMetricsFromStore(
      model,
      queryOptions,
    );
  }

  async fetchModelMetricsForSync(
    id: number,
    queryOptions: {
      startTime: number;
      endTIme: number;
    },
  ) {
    const model = await this.findOne(+id);
    return this.providerServiceMap[model.provider].fetchModelMetricsForSync(
      model,
      queryOptions,
    );
  }

  async getModelInfo(id: number) {
    const model = await this.findOne(+id);

    // run fetch metrics workflow
    if (model.provider == ModelProvider.MLFLOW) {
      this.temporalProvider.runAsync(
        "FetchMlflowModelMetrics",
        [{ model: model }],
        process.env.DEFAULT_QUEUE_NAME,
        `workflow-FetchMlflowModelMetrics-${model.id}`,
      );
    }

    return this.providerServiceMap[model.provider].getModelInfo(model);
  }

  async registerModel(model: RegisterModelDto) {
    const project = await this.projectService.findOrThrow(model.projectID);
    const currentTimeStamp = new Date().getTime();
    const createdModel =  new Model(
      await this.prismaService.model.create({
        data: {
          status: ModelStatus.ACTIVE,
          slug: toKebabCase(
            `${project.slug}-${model.name}-${currentTimeStamp}`,
          ),
          ...model,
        },
      }),
    );

    if (createdModel.provider == ModelProvider.MLFLOW) {
      this.temporalProvider.runAsync(
        "FetchMlflowModelMetrics",
        [{ model: createdModel }],
        process.env.DEFAULT_QUEUE_NAME,
        `workflow-FetchMlflowModelMetrics-${createdModel.id}`,
      );
    }

    return createdModel;
  }

  async findAll(): Promise<Model[]> {
    const models = await this.prismaService.model.findMany({
      where: {
        NOT: { status: { equals: ModelStatus.DELETED } },
      },
    });
    return models.map((model) => new Model(model));
  }

  async findToSync() {
    const models = await this.prismaService.model.findMany({
      where: {
        status: ModelStatus.ACTIVE,
        OR: [
          { lastSeenAt: { gt: this.prismaService.model.fields.lastSyncedAt } }, // where last seen at is greater than last synced at
          { lastSyncedAt: null }, // where last synced at is null
        ],
      },
    });
    return models.map((model) => new Model(model));
  }

  async findFiltered(provider?: ModelProvider, status?: ModelStatus) {
    const filter = {
      where: {
        provider: { equals: provider },
        status: { equals: status },
      },
    };
    const models = await this.prismaService.model.findMany(filter);
    return models.map((model) => new Model(model));
  }

  async findOne(id: number): Promise<Model | null> {
    const model = await this.prismaService.model.findUnique({
      where: { id: id },
    });
    return new Model(model);
  }

  /**
   * Fetches model - Throws an error if model doesn't exist
   * @param {number} id model Id
   */
  async findOrThrow(id: number): Promise<Model> {
    const model = await this.prismaService.model.findUniqueOrThrow({
      where: { id: id },
    });
    return new Model(model);
  }

  async updateLastSyncTime(id: number, lastSyncedAt: Date) {
    const model = await this.prismaService.model.update({
      where: { id: id },
      data: { lastSyncedAt: lastSyncedAt },
    });
    return new Model(model);
  }

  async updateLastSeenTime(id: number, lastSeenAt: Date) {
    const model = await this.prismaService.model.update({
      where: { id: id },
      data: { lastSeenAt: lastSeenAt },
    });
    return new Model(model);
  }

  async updateModelStatus(id: number, status: ModelStatus) {
    return new Model(
      await this.prismaService.model.update({
        where: {
          id: id,
        },
        data: { status: status },
      }),
    );
  }

  async deleteModel(id: number) {
    await this.deleteRelatedMetrics(id);
    return this.updateModelStatus(id, ModelStatus.DELETED);
  }

  async findForProject(projectId: number): Promise<Model[]> {
    const models = await this.prismaService.model.findMany({
      where: {
        NOT: { status: { equals: ModelStatus.DELETED } },
        AND: { projectID: { equals: projectId } },
      },
    });
    return models.map((model) => new Model(model));
  }

  async updateModelName(id: number, requestBody: EditModelDto) {
    const { name, projectId } = requestBody;
    const project = await this.projectService.findOrThrow(+projectId);
    const currentTimeStamp = new Date().getTime();
    return new Model(
      await this.prismaService.model.update({
        where: {
          id: id,
        },
        data: {
          name: name,
          slug: toKebabCase(`${project.slug}-${name}-${currentTimeStamp}`),
        },
      }),
    );
  }

  async requestModelRegistration(request: RegisterModelRequestDto) {
    const { projectID, requestTo, requestedBy, description } = request;
    const project = await this.projectService.findOne(projectID);

    const requestURL = `${process.env.TURO_BASE_URL}${process.env.TURO_PROJECT_BASE_URL}${project.id}?activeTab=metrics&registerModel=true`;

    this.notificationService.triggerNotification(
      modelEventsConfig,
      projectID,
      0,
      EventType.MODEL_CONNECTION_REQUEST,
      NotificationEntityType.MODEL_CONNECTION_REQUEST,
      ModelEvents.MODEL_CONNECTION_REQUEST,
      {
        projectName: project.name,
        requestedByName: getNamesFromEmail(requestedBy),
        description,
        requestURL,
      },
      [requestTo],
    );

    return true;
  }

  async deleteRelatedMetrics(modelId: number) {
    const model = await this.findOne(+modelId);
    if (!model) {
      throw new Error(`Model with id ${modelId} not found`);
    }

    // Delete metrics related to the model
    const response = await this.prismaService.metrics.updateMany({
      where: { sourceId: model.id, sourceType: MetricSources.MODEL },
      data: { status: MetricStatus.DELETED },
    });

    return response;
  }
}

results matching ""

    No results matching ""