src/models/model.service.ts
Properties |
|
Methods |
|
constructor(prismaService: PrismaService, projectService: ProjectService, mlflowProvider: MLflowProvider, opentelemetryProvider: OpentelemetryProvider, temporalProvider: TemporalProvider, notificationService: NotificationService)
|
|||||||||||||||||||||
|
Defined in src/models/model.service.ts:24
|
|||||||||||||||||||||
|
Parameters :
|
| Async deleteModel | ||||||
deleteModel(id: number)
|
||||||
|
Defined in src/models/model.service.ts:261
|
||||||
|
Parameters :
Returns :
unknown
|
| Async deleteRelatedMetrics | ||||||
deleteRelatedMetrics(modelId: number)
|
||||||
|
Defined in src/models/model.service.ts:318
|
||||||
|
Parameters :
Returns :
unknown
|
| Async fetchMetricsFromSource | ||||||
fetchMetricsFromSource(model: Model)
|
||||||
|
Defined in src/models/model.service.ts:96
|
||||||
|
Parameters :
Returns :
unknown
|
| Async fetchModelMetricsForSync | |||||||||
fetchModelMetricsForSync(id: number, queryOptions: literal type)
|
|||||||||
|
Defined in src/models/model.service.ts:126
|
|||||||||
|
Parameters :
Returns :
unknown
|
| Async fetchModelMetricsFromStore | |||||||||
fetchModelMetricsFromStore(id: number, queryOptions: literal type)
|
|||||||||
|
Defined in src/models/model.service.ts:112
|
|||||||||
|
Parameters :
Returns :
unknown
|
| Async findAll |
findAll()
|
|
Defined in src/models/model.service.ts:183
|
|
Returns :
Promise<Model[]>
|
| Async findFiltered | |||||||||
findFiltered(provider?: ModelProvider, status?: ModelStatus)
|
|||||||||
|
Defined in src/models/model.service.ts:205
|
|||||||||
|
Parameters :
Returns :
unknown
|
| Async findForProject | ||||||
findForProject(projectId: number)
|
||||||
|
Defined in src/models/model.service.ts:266
|
||||||
|
Parameters :
Returns :
Promise<Model[]>
|
| Async findOne | ||||||
findOne(id: number)
|
||||||
|
Defined in src/models/model.service.ts:216
|
||||||
|
Parameters :
Returns :
Promise<Model | null>
|
| Async findOrThrow | ||||||||
findOrThrow(id: number)
|
||||||||
|
Defined in src/models/model.service.ts:227
|
||||||||
|
Fetches model - Throws an error if model doesn't exist
Parameters :
Returns :
Promise<Model>
|
| Async findToSync |
findToSync()
|
|
Defined in src/models/model.service.ts:192
|
|
Returns :
unknown
|
| Async getModelInfo | ||||||
getModelInfo(id: number)
|
||||||
|
Defined in src/models/model.service.ts:140
|
||||||
|
Parameters :
Returns :
unknown
|
| Async pushMetrics | ||||||
pushMetrics(metrics, modelId)
|
||||||
|
Defined in src/models/model.service.ts:35
|
||||||
|
Parameters :
Returns :
unknown
|
| Async registerModel | ||||||
registerModel(model: RegisterModelDto)
|
||||||
|
Defined in src/models/model.service.ts:156
|
||||||
|
Parameters :
Returns :
unknown
|
| Async requestModelRegistration | ||||||
requestModelRegistration(request: RegisterModelRequestDto)
|
||||||
|
Defined in src/models/model.service.ts:293
|
||||||
|
Parameters :
Returns :
unknown
|
| Async saveModelMetrics |
saveModelMetrics(model: Model, metrics: any, turoAttributes: any)
|
|
Defined in src/models/model.service.ts:104
|
|
Returns :
unknown
|
| Async updateLastSeenTime |
updateLastSeenTime(id: number, lastSeenAt: Date)
|
|
Defined in src/models/model.service.ts:242
|
|
Returns :
unknown
|
| Async updateLastSyncTime |
updateLastSyncTime(id: number, lastSyncedAt: Date)
|
|
Defined in src/models/model.service.ts:234
|
|
Returns :
unknown
|
| Async updateModelName | |||||||||
updateModelName(id: number, requestBody: EditModelDto)
|
|||||||||
|
Defined in src/models/model.service.ts:276
|
|||||||||
|
Parameters :
Returns :
unknown
|
| Async updateModelStatus | |||||||||
updateModelStatus(id: number, status: ModelStatus)
|
|||||||||
|
Defined in src/models/model.service.ts:250
|
|||||||||
|
Parameters :
Returns :
unknown
|
| Private providerServiceMap |
Type : object
|
Default value : {
[ModelProvider.MLFLOW]: this.mlflowProvider,
[ModelProvider.OPENTELEMETRY]: this.opentelemetryProvider,
}
|
|
Defined in src/models/model.service.ts:21
|
import { ForbiddenException, Injectable } from "@nestjs/common";
import { PrismaService } from "../common/prisma/prisma.service";
import { Model, ModelProvider, ModelStatus } from "./entities/model.entity";
import { toKebabCase, getNamesFromEmail } from "../common/helper";
import {
RegisterModelDto,
RegisterModelRequestDto,
} from "./dto/register-model.dto";
import { MLflowProvider } from "../providers/mlflow/mlflow.provider";
import { OpentelemetryProvider } from "../providers/opentelemetry/opentelemetry.provider";
import { ProjectService } from "../project/project.service";
import { TemporalProvider } from "../providers/temporal/temporal.provider";
import { EditModelDto } from "./dto/edit-model.dto";
import { NotificationService } from "../notification/notification.service";
import { modelEventsConfig, ModelEvents } from "./model.events";
import { EventType } from "../common/events";
import { NotificationEntityType } from "../notification/types/notification.enums";
import { MetricSources, MetricStatus } from "../metrics/entities/metrics.entity";
@Injectable()
export class ModelService {
private providerServiceMap = {
[ModelProvider.MLFLOW]: this.mlflowProvider,
[ModelProvider.OPENTELEMETRY]: this.opentelemetryProvider,
};
constructor(
private readonly prismaService: PrismaService,
private readonly projectService: ProjectService,
private readonly mlflowProvider: MLflowProvider,
private readonly opentelemetryProvider: OpentelemetryProvider,
private readonly temporalProvider: TemporalProvider,
private readonly notificationService: NotificationService,
) {}
async pushMetrics(metrics, modelId) {
const model = await this.findOne(+modelId);
const project = await this.projectService.findOne(+model.projectID);
if (model.provider != ModelProvider.OPENTELEMETRY) {
// Blocked pushing metrics via metrics endpoint for providers other than OPENTELEMETRY
throw new ForbiddenException(
"Failed to push model metrics - unsupported action",
);
}
const seenAt = new Date();
const turoAttributes = [
{
key: "turo.model",
value: {
stringValue: `${model.slug}`,
},
},
{
key: "turo.project",
value: {
stringValue: `${project.slug}`,
},
},
{
key: "turo.timestamp",
value: {
intValue: seenAt.getTime(),
},
},
];
const response = await this.providerServiceMap[model.provider].saveMetrics(
metrics,
turoAttributes,
);
await this.updateLastSeenTime(model.id, seenAt);
try {
await this.temporalProvider.runAsync(
"SyncModelMetrics",
[model],
process.env.DEFAULT_QUEUE_NAME,
`workflow-SyncModelMetrics-model-${model.id}`,
"10s",
);
} catch (error) {
if (error.name === TemporalProvider.WORKFLOW_EXECUTION_ALREADY_STARTED_ERROR) {
console.log("Workflow already running - ignoring workflow run request");
} else {
console.log("Error running workflow", error);
throw error;
}
}
return response.json();
}
async fetchMetricsFromSource(model: Model) {
const metrics =
await this.providerServiceMap[model.provider].fetchMetricsFromSource(
model,
);
return metrics;
}
async saveModelMetrics(model: Model, metrics: any, turoAttributes: any) {
const response = await this.providerServiceMap[model.provider].saveMetrics(
metrics,
turoAttributes,
);
return response;
}
async fetchModelMetricsFromStore(
id: number,
queryOptions: {
startTime: number;
endTime: number;
},
) {
const model = await this.findOne(+id);
return this.providerServiceMap[model.provider].fetchModelMetricsFromStore(
model,
queryOptions,
);
}
async fetchModelMetricsForSync(
id: number,
queryOptions: {
startTime: number;
endTIme: number;
},
) {
const model = await this.findOne(+id);
return this.providerServiceMap[model.provider].fetchModelMetricsForSync(
model,
queryOptions,
);
}
async getModelInfo(id: number) {
const model = await this.findOne(+id);
// run fetch metrics workflow
if (model.provider == ModelProvider.MLFLOW) {
this.temporalProvider.runAsync(
"FetchMlflowModelMetrics",
[{ model: model }],
process.env.DEFAULT_QUEUE_NAME,
`workflow-FetchMlflowModelMetrics-${model.id}`,
);
}
return this.providerServiceMap[model.provider].getModelInfo(model);
}
async registerModel(model: RegisterModelDto) {
const project = await this.projectService.findOrThrow(model.projectID);
const currentTimeStamp = new Date().getTime();
const createdModel = new Model(
await this.prismaService.model.create({
data: {
status: ModelStatus.ACTIVE,
slug: toKebabCase(
`${project.slug}-${model.name}-${currentTimeStamp}`,
),
...model,
},
}),
);
if (createdModel.provider == ModelProvider.MLFLOW) {
this.temporalProvider.runAsync(
"FetchMlflowModelMetrics",
[{ model: createdModel }],
process.env.DEFAULT_QUEUE_NAME,
`workflow-FetchMlflowModelMetrics-${createdModel.id}`,
);
}
return createdModel;
}
async findAll(): Promise<Model[]> {
const models = await this.prismaService.model.findMany({
where: {
NOT: { status: { equals: ModelStatus.DELETED } },
},
});
return models.map((model) => new Model(model));
}
async findToSync() {
const models = await this.prismaService.model.findMany({
where: {
status: ModelStatus.ACTIVE,
OR: [
{ lastSeenAt: { gt: this.prismaService.model.fields.lastSyncedAt } }, // where last seen at is greater than last synced at
{ lastSyncedAt: null }, // where last synced at is null
],
},
});
return models.map((model) => new Model(model));
}
async findFiltered(provider?: ModelProvider, status?: ModelStatus) {
const filter = {
where: {
provider: { equals: provider },
status: { equals: status },
},
};
const models = await this.prismaService.model.findMany(filter);
return models.map((model) => new Model(model));
}
async findOne(id: number): Promise<Model | null> {
const model = await this.prismaService.model.findUnique({
where: { id: id },
});
return new Model(model);
}
/**
* Fetches model - Throws an error if model doesn't exist
* @param {number} id model Id
*/
async findOrThrow(id: number): Promise<Model> {
const model = await this.prismaService.model.findUniqueOrThrow({
where: { id: id },
});
return new Model(model);
}
async updateLastSyncTime(id: number, lastSyncedAt: Date) {
const model = await this.prismaService.model.update({
where: { id: id },
data: { lastSyncedAt: lastSyncedAt },
});
return new Model(model);
}
async updateLastSeenTime(id: number, lastSeenAt: Date) {
const model = await this.prismaService.model.update({
where: { id: id },
data: { lastSeenAt: lastSeenAt },
});
return new Model(model);
}
async updateModelStatus(id: number, status: ModelStatus) {
return new Model(
await this.prismaService.model.update({
where: {
id: id,
},
data: { status: status },
}),
);
}
async deleteModel(id: number) {
await this.deleteRelatedMetrics(id);
return this.updateModelStatus(id, ModelStatus.DELETED);
}
async findForProject(projectId: number): Promise<Model[]> {
const models = await this.prismaService.model.findMany({
where: {
NOT: { status: { equals: ModelStatus.DELETED } },
AND: { projectID: { equals: projectId } },
},
});
return models.map((model) => new Model(model));
}
async updateModelName(id: number, requestBody: EditModelDto) {
const { name, projectId } = requestBody;
const project = await this.projectService.findOrThrow(+projectId);
const currentTimeStamp = new Date().getTime();
return new Model(
await this.prismaService.model.update({
where: {
id: id,
},
data: {
name: name,
slug: toKebabCase(`${project.slug}-${name}-${currentTimeStamp}`),
},
}),
);
}
async requestModelRegistration(request: RegisterModelRequestDto) {
const { projectID, requestTo, requestedBy, description } = request;
const project = await this.projectService.findOne(projectID);
const requestURL = `${process.env.TURO_BASE_URL}${process.env.TURO_PROJECT_BASE_URL}${project.id}?activeTab=metrics®isterModel=true`;
this.notificationService.triggerNotification(
modelEventsConfig,
projectID,
0,
EventType.MODEL_CONNECTION_REQUEST,
NotificationEntityType.MODEL_CONNECTION_REQUEST,
ModelEvents.MODEL_CONNECTION_REQUEST,
{
projectName: project.name,
requestedByName: getNamesFromEmail(requestedBy),
description,
requestURL,
},
[requestTo],
);
return true;
}
async deleteRelatedMetrics(modelId: number) {
const model = await this.findOne(+modelId);
if (!model) {
throw new Error(`Model with id ${modelId} not found`);
}
// Delete metrics related to the model
const response = await this.prismaService.metrics.updateMany({
where: { sourceId: model.id, sourceType: MetricSources.MODEL },
data: { status: MetricStatus.DELETED },
});
return response;
}
}