src/providers/temporal/temporal.provider.ts
Properties |
|
Methods |
|
constructor()
|
| Async cancelWorkflowRun |
cancelWorkflowRun(workflowId: string, runId: string)
|
|
Returns :
Promise<any>
|
| Async createSchedule | |||||||||||||||
createSchedule(workflowType: string, taskQueue: string, scheduleId: string, spec: any)
|
|||||||||||||||
|
Parameters :
Returns :
unknown
|
| Async getApiClient |
getApiClient()
|
|
Returns :
Promise<Client>
|
| Async getSchedule | ||||||
getSchedule(scheduleId: string)
|
||||||
|
Parameters :
Returns :
unknown
|
| getTemporalClientConnection |
getTemporalClientConnection()
|
|
Returns :
any
|
| Async run | |||||||||||||||
run(workflowName: string, args: any[], taskQueueName: string, workflowId: string)
|
|||||||||||||||
|
Parameters :
Returns :
Promise<string>
|
| Async runAsync | ||||||||||||||||||||||||
runAsync(workflowName: string, args: any[], taskQueueName: string, workflowId: string, startDelay: Duration)
|
||||||||||||||||||||||||
|
Parameters :
Returns :
Promise<string>
|
| Async sendSignal | |||||||||||||||
sendSignal(workflowId: string, runId: string, signalName: string, args: object)
|
|||||||||||||||
|
Parameters :
Returns :
Promise<void>
|
| Async updateSchedule |
updateSchedule(scheduleId: string, spec: any)
|
|
Returns :
unknown
|
| Private apiURL |
| Static Readonly WORKFLOW_EXECUTION_ALREADY_STARTED_ERROR |
Type : string
|
Default value : "WorkflowExecutionAlreadyStartedError"
|
import { Injectable, InternalServerErrorException } from "@nestjs/common";
import { Connection, Client, ScheduleNotFoundError } from "@temporalio/client";
import { Duration } from "@temporalio/common";
@Injectable()
export class TemporalProvider {
public static readonly WORKFLOW_EXECUTION_ALREADY_STARTED_ERROR = "WorkflowExecutionAlreadyStartedError";
private apiURL;
constructor() {
this.apiURL = process.env.WORKFLOW_SERVER_URL;
}
getTemporalClientConnection() {
return Connection.connect({ address: this.apiURL });
}
async getApiClient(): Promise<Client> {
try {
const connection = await this.getTemporalClientConnection();
const client = new Client({
connection,
// namespace: 'foo.bar', // connects to 'default' namespace if not specified
});
return client;
} catch (error) {
throw new InternalServerErrorException(error);
}
}
async runAsync(
workflowName: string,
args: any[],
taskQueueName: string,
workflowId: string,
startDelay: Duration = "0s",
): Promise<string> {
const client = await this.getApiClient();
const handle = await client.workflow.start(workflowName, {
taskQueue: taskQueueName,
// type inference works! args: [name: string]
args: args,
// in practice, use a meaningful business ID, like customerId or transactionId
workflowId: workflowId,
// workflow options
startDelay: startDelay,
});
// optional: wait for client result
// console.log(await handle.result()); // Hello, Temporal!
return handle.workflowId;
}
async run(
workflowName: string,
args: any[],
taskQueueName: string,
workflowId: string,
): Promise<string> {
const client = await this.getApiClient();
const handle = await client.workflow.start(workflowName, {
taskQueue: taskQueueName,
// type inference works! args: [name: string]
args: args,
// in practice, use a meaningful business ID, like customerId or transactionId
workflowId: workflowId,
});
// optional: wait for client result
// console.log(); // Hello, Temporal!
return await handle.result();
}
async sendSignal(
workflowId: string,
runId: string,
signalName: string,
args: object,
): Promise<void> {
// this function currently doesnot support signal arguments
// TODO: modify send signal to support signal arguments
const client = await this.getApiClient();
return await client.workflow
.getHandle(workflowId, runId)
.signal(signalName, args);
}
async cancelWorkflowRun(workflowId: string, runId: string): Promise<any> {
const client = await this.getApiClient();
return await client.workflow.getHandle(workflowId, runId).cancel();
}
async createSchedule(
workflowType: string,
taskQueue: string,
scheduleId: string,
spec: any,
) {
const client = await this.getApiClient();
return await client.schedule.create({
action: {
type: "startWorkflow",
workflowType: workflowType,
taskQueue: taskQueue,
},
scheduleId: scheduleId,
spec: spec,
});
}
async updateSchedule(scheduleId: string, spec: any) {
const client = await this.getApiClient();
const handle = await client.schedule.getHandle(scheduleId);
return await handle.update((schedule) => ({ ...schedule, spec: spec }));
}
async getSchedule(scheduleId: string) {
try {
const client = await this.getApiClient();
const handle = await client.schedule.getHandle(scheduleId);
const schedule = await handle.describe();
return schedule;
} catch (error) {
if (error instanceof ScheduleNotFoundError) {
return null;
}
throw error;
}
}
}