File

src/providers/temporal/temporal.provider.ts

Index

Properties
Methods

Constructor

constructor()

Methods

Async cancelWorkflowRun
cancelWorkflowRun(workflowId: string, runId: string)
Parameters :
Name Type Optional
workflowId string No
runId string No
Returns : Promise<any>
Async createSchedule
createSchedule(workflowType: string, taskQueue: string, scheduleId: string, spec: any)
Parameters :
Name Type Optional
workflowType string No
taskQueue string No
scheduleId string No
spec any No
Returns : unknown
Async getApiClient
getApiClient()
Returns : Promise<Client>
Async getSchedule
getSchedule(scheduleId: string)
Parameters :
Name Type Optional
scheduleId string No
Returns : unknown
getTemporalClientConnection
getTemporalClientConnection()
Returns : any
Async run
run(workflowName: string, args: any[], taskQueueName: string, workflowId: string)
Parameters :
Name Type Optional
workflowName string No
args any[] No
taskQueueName string No
workflowId string No
Returns : Promise<string>
Async runAsync
runAsync(workflowName: string, args: any[], taskQueueName: string, workflowId: string, startDelay: Duration)
Parameters :
Name Type Optional Default value
workflowName string No
args any[] No
taskQueueName string No
workflowId string No
startDelay Duration No "0s"
Returns : Promise<string>
Async sendSignal
sendSignal(workflowId: string, runId: string, signalName: string, args: object)
Parameters :
Name Type Optional
workflowId string No
runId string No
signalName string No
args object No
Returns : Promise<void>
Async updateSchedule
updateSchedule(scheduleId: string, spec: any)
Parameters :
Name Type Optional
scheduleId string No
spec any No
Returns : unknown

Properties

Private apiURL
Static Readonly WORKFLOW_EXECUTION_ALREADY_STARTED_ERROR
Type : string
Default value : "WorkflowExecutionAlreadyStartedError"
import { Injectable, InternalServerErrorException } from "@nestjs/common";
import { Connection, Client, ScheduleNotFoundError } from "@temporalio/client";
import { Duration } from "@temporalio/common";

@Injectable()
export class TemporalProvider {
  public static readonly WORKFLOW_EXECUTION_ALREADY_STARTED_ERROR = "WorkflowExecutionAlreadyStartedError";
  private apiURL;
  constructor() {
    this.apiURL = process.env.WORKFLOW_SERVER_URL;
  }

  getTemporalClientConnection() {
    return Connection.connect({ address: this.apiURL });
  }

  async getApiClient(): Promise<Client> {
    try {
      const connection = await this.getTemporalClientConnection();
      const client = new Client({
        connection,
        // namespace: 'foo.bar', // connects to 'default' namespace if not specified
      });
      return client;
    } catch (error) {
      throw new InternalServerErrorException(error);
    }
  }

  async runAsync(
    workflowName: string,
    args: any[],
    taskQueueName: string,
    workflowId: string,
    startDelay: Duration = "0s",
  ): Promise<string> {
    const client = await this.getApiClient();

    const handle = await client.workflow.start(workflowName, {
      taskQueue: taskQueueName,
      // type inference works! args: [name: string]
      args: args,
      // in practice, use a meaningful business ID, like customerId or transactionId
      workflowId: workflowId,
      // workflow options
      startDelay: startDelay,
    });
    // optional: wait for client result
    //   console.log(await handle.result()); // Hello, Temporal!
    return handle.workflowId;
  }

  async run(
    workflowName: string,
    args: any[],
    taskQueueName: string,
    workflowId: string,
  ): Promise<string> {
    const client = await this.getApiClient();

    const handle = await client.workflow.start(workflowName, {
      taskQueue: taskQueueName,
      // type inference works! args: [name: string]
      args: args,
      // in practice, use a meaningful business ID, like customerId or transactionId
      workflowId: workflowId,
    });
    // optional: wait for client result
    //   console.log(); // Hello, Temporal!
    return await handle.result();
  }

  async sendSignal(
    workflowId: string,
    runId: string,
    signalName: string,
    args: object,
  ): Promise<void> {
    // this function currently doesnot support signal arguments
    // TODO: modify send signal to support signal arguments
    const client = await this.getApiClient();
    return await client.workflow
      .getHandle(workflowId, runId)
      .signal(signalName, args);
  }

  async cancelWorkflowRun(workflowId: string, runId: string): Promise<any> {
    const client = await this.getApiClient();
    return await client.workflow.getHandle(workflowId, runId).cancel();
  }

  async createSchedule(
    workflowType: string,
    taskQueue: string,
    scheduleId: string,
    spec: any,
  ) {
    const client = await this.getApiClient();
    return await client.schedule.create({
      action: {
        type: "startWorkflow",
        workflowType: workflowType,
        taskQueue: taskQueue,
      },
      scheduleId: scheduleId,
      spec: spec,
    });
  }

  async updateSchedule(scheduleId: string, spec: any) {
    const client = await this.getApiClient();
    const handle = await client.schedule.getHandle(scheduleId);
    return await handle.update((schedule) => ({ ...schedule, spec: spec }));
  }

  async getSchedule(scheduleId: string) {
    try {
      const client = await this.getApiClient();
      const handle = await client.schedule.getHandle(scheduleId);
      const schedule = await handle.describe();
      return schedule;
    } catch (error) {
      if (error instanceof ScheduleNotFoundError) {
        return null;
      }
      throw error;
    }
  }
}

results matching ""

    No results matching ""