Temporal Workflow Engine Nedir, Demo ve Değerlendirme

Temporal Workflow Engine Nedir, Demo ve Değerlendirme

Merhabalar, bu yazımda sizlere açık kaynak kodlu, Java, Go, Python, Typescript ve daha nice dilleri destekleyen, Temporal isimli Workflow Engine (İş Akış Motoru) ve kabiliyetlerinden bahsetmek istiyorum. Olası kullanım senaryolarından, güçlü ve zayıf yönlerinden, alternatiflerinden ve hangi araçlara alternatif olabileceğinden de kısaca bahsecedeğim.

temporal.io logo

Temporal bir iş akış motorudur. Peki ya iş akış motoru nedir? Basitçe dijital iş süreçlerini yöneten, çalıştıran, bu süreçleri veritabanı bağlantısıyla depolayan ve raporlayan sistemlere iş akış motoru denilebilir.

StackStorm, Zeebe, Airflow, Windmill gibi alternatifleri bulunan Temporal, basitlik, dayanıklılık ve çalışma garantisi sunmasıyla diğer motorlardan ayrışıyor. (Farklı alternatifler şu linkten incelenebilir: https://github.com/meirwah/awesome-workflow-engines) Aynı zamanda dokümanları çok anlaşılır ve iyi kurgulanmış vaziyette. (https://docs.temporal.io/)

Bir Temporal uygulaması basit olarak 3 bileşenden oluşuyor.

Temporal Service (Server): İş akışlarınızın çalıştırılmasından ve yönetilmesinden sorumlu program, ana orkestratör olarak düşünebilirsiniz. Hangi worker’ın hangi işi alacağına karar veren, arayüz sağlayan, DB bağlantısını gerçekleştiren programdır. Self-host, cloud servisi seçenekleri mevcuttur.

Temporal Worker: İş akışlarınızı gerçekten işleyen servis. Süreç ve kuralların yer aldığı programlardır.

Temporal SDK: Seçtiğiniz dili destekleyen ve geliştirmeleri yapacağınız kütüphane

Biz uygulamamızda basitlik açısından lokalimizde bir Temporal Service çalıştıracağız ama farklı ortamlara deployment seçenekleri incelenebilir. (https://docs.temporal.io/production-deployment)

Şimdi Temporal SDK ile örnek bir Temporal Worker neyi çalıştırır onu inceleyelim.

Bir Temporal Worker’ı aslında Workflow denilen iş akışlarını çalıştırmaktan sorumludur. Bu iş akışları ise Activity adı verilen alt parçalardan oluşabilir.

Workflow (İş Akışı) seçtiğiniz programlama dilinde yazılmıştır ve genel olarak uygulama akışını belirleyen parçadır.

Activity hata oluşturması muhtemel uygulama mantığını içeren, dış bir servisi çağırmak gibi tekrar tekrar denenebilecek parçadır.

Yani Worker oluşturduğumuz Workflow ve Activity’lere ait kodları tutar ve bunlara ait alt görevleri işlemekle yükümlüdür.

Bir de genelde oluşturduğumuz Workflow’ları çalıştırmak için bir Client uygulamasına ihtiyaç duyarız. Bu kısım istenilirse unit testler ile de gerçekleştirilebilir. Basitçe bir trigger mekanizması olarak düşünebiliriz.

Şimdi bir e-ticaret sistemi için sipariş Temporal Workflow’u oluşturup bu parçaların birbiriyle nasıl çalıştığını görelim.

Kurguladığım workflow şu parçalardan oluşacak:

  • Backend Uygulaması — workflow client (starting point) müşteri satın alımıyla operasyon başlıyor
  • Orkestratör Servisi — backend uygulamasının oluşturduğu siparişi yöneten worker
  • Stok Servisi — stok işlemlerinden sorumlu worker
  • Ödeme Servisi — ödeme işlemlerinden sorumlu worker

Bir e-ticaret sisteminde, müşterinin bir sipariş oluşturduğunu varsayalım. Bu sipariş önce backend servisimize gelecek, backend servisimiz Sipariş akışını başlatacak. Stok servisi stok kontrol ve eksiltme işlemlerini gerçekleştirecek, ödeme servisi ödemenin alınmasını sağlayacak ve operasyonumuz tamamlanacak. Gerçek hayatta nakliye vesair gibi ek süreçler de var ama gösterimin sadeliği açısından bunları dahil etmiyorum. Ama isteyenler projeyi diledikleri gibi genişletebilirler.

Bahsettiğimiz gibi ana bir workflow için alt workflowlar ile süreci tamamlayacağız. Tabii ki burada gösterimi kolaylaştırmak adına bazı adımları kısaltacağız. Ayrıca bu işlemler ana bir workflow altında alt aktiviteler olarak kurgulanabilirdi fakat bunun çok tavsiye edilmediğini gördüm diğer projeleri incelediğimde. Dolayısıyla ana sipariş workflow’umuz ilgili operasyon adımları için child workflow oluşturacak. Kodların hepsi GitHub hesabımda yer alıyor, ilgilenen arkadaşlar göz atabilir.

Kodlama kısmına geçmeden önce kısaca Temporal UI kabiliyetlerinden de bahsetmek istiyorum.

temporal ui

Temporal servisini lokalimizde yahut cloud ortamında çalıştırdıktan sonra, bize akışları görebileceğimiz, farklı namespace’lere bağlanabileceğimiz, authentication/authorization konularını yönetebileceğimiz bir arayüz sunuyor. Burada devam eden yahut tamamlanmış iş akışlarını görüp, filtreleyebiliyoruz. Akışların detaylarını, hangi kuyruğa hangi worker’ların bağlandığını görüntüleyebiliyoruz.

Şimdi uygulama kısmına geçelim fakat öncelikle demo maksatlı kullanacağımız temporal server’ını ayağa kaldıralım.

1
temporal server start-dev

komutuyla geliştirme server’ını ayağa kaldırıyoruz. Server ayağa kalktıktan sonra “ http://localhost:8233” üzerinde arayüzü de kullanıma sürüyor.

  1. Backend Servisi

Backend servisi e-ticaret operasyonunda siparişin Frontend servisinden geldiği ilk noktayı simüle etmek için Java diliyle oluşturulmuştur.

Bu adımı kısa tutmak adına herhangi bir Spring Boot projesine Temporal bağımlılığını ekleyeceğiz. Ve order endpointine gelen POST isteği, bizim iş akışımızı başlatacak.

1
2
3
4
5
<dependency>
 <groupId>io.temporal</groupId>
 <artifactId>temporal-sdk</artifactId>
 <version>1.30.1</version>
</dependency>
1
2
3
4
5
6
7
// connects to local temporal server, remote can be configured
@Bean
@Qualifier("temporal")
public WorkflowClient temporalClient() {
    WorkflowServiceStubs service = WorkflowServiceStubs.newLocalServiceStubs();
    return WorkflowClient.newInstance(service);
}

Backend uygulamamız lokal Temporal servisine bağlanacak şekilde konfigüre edildi, dilersek Temporal Cloud servisine de bağlanabilirdik. Demo kolaylığı açısından lokalimizdeki servise bağlanmayı seçtik.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
@RestController
@RequestMapping("/api/order")
public class OrderController {
    private final TemporalService temporalService;
    public OrderController(TemporalService temporalService) {
        this.temporalService = temporalService;
    }
    @PostMapping
    public ResponseEntity<Void> createOrder(@RequestBody Order order) {
        temporalService.startOrder(order);
        return ResponseEntity.status(202).build(); // 202 Accepted
    }
}

Görüldüğü üzere gayet kısa bir Controller, tek vasfı Temporal servisi objesine gelen siparişi startOrder fonksiyonu ile iletmek ve asenkron sipariş işlemini kabul ettiğini gösteren 202 kodunu dönmek.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
private final String TASK_QUEQUE = "HLC_TRADING_ORDERS";
    public void startOrder(Order order) {
        // start the workflow
        OrderWorkflow workflow =
                temporalClient.newWorkflowStub(
                        OrderWorkflow.class,
                        WorkflowOptions.newBuilder()
                                .setWorkflowId(UUID.randomUUID().toString())
                                .setTaskQueue(TASK_QUEQUE)
                                .build());
        
        // start async
        WorkflowClient.start(workflow::fulfill, order);
        // this would do the same thing but sync.
//        workflow.submit(order);
    }

Temporal servisi içerisindeki startOrder fonksiyonu ise basitçe HLC_TRADING_ORDERS kuyruğuna yeni bir OrderWorkflow gönderiyor. Bunu yaparken rastgele bir ID oluşturuyor, ID kısmını istediğimiz gibi doldurabiliriz yahut istersek Temporal bizim yerimize rastgele bir ID oluşturabilir.

İş akışının giriş noktası ise “fulfill” adlı fonksiyon. Burada dikkat edilmesi gereken nokta OrderWorkflow’un imzasının (signature) bu servisimiz tarafından bilinmesi. Peki servisimiz imzasını bilmediği bir workflow’u çağıramaz mıydı? Çağırabilirdi. Bu ayrım “typed” ve “untyped” “WorkflowStub’lar” tarafından sağlanıyor. Untyped WorkflowStub’lar daha esnek bir yapı sunarken Typed WorkflowStub’lar @WorkflowMethod, @QueryMethod, ve @SignalMethodgibi fonksiyonları direkt olarak çağırabilmesi açısından daha kullanışlı. Her iki yapıda da akışlar senkron yahut asenkron çağırabiliyor.

Buradan sonrasını HLC_TRADING_ORDERS kuyruğunu dinleyen Orkestratör worker’ı halledecek.

2. Orkestratör Servisi

Orkestratör servisi e-ticaret operasyonunda “iş akışının” yönetilmesi adımı simüle etmek için Java diliyle oluşturulmuştur.

Orkestratör servisi

Orkestratör servisimiz “fulfill” workflowu içerisinde öncelikle siparişin stok durumunu teyit etmekle mükellef stok servisinin “doStock” fonksiyonunu, işlem başarıyla tamamlandığı takdirde ödeme servisinin “doPayment” fonksiyonunu çağıracaktır. Bu işlemlerin her biri “childWorkflow” ismi verilen workflow içerisinde yeni bir workflow oluşturmayı sağlayan mekanizma ile gerçekleştirilecektir. İşlemlerin hata vermesi halinde bu süreçleri de yönetebilme kabiliyeti var fakat demonun kısalığı açısından o kısımları eklemedim. Alt akışlar için de ID verilebilir, yahut senkron, asenkron çalıştırma seçenekleri uygulanabilir. Stok servisi HLC_TRADING_ORDERS_STOCK kuyruğunu, ödeme servisi ise HLC_TRADING_ORDERS_PAYMENT kuyruğunu dinliyor.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class OrderWorkflowImpl implements OrderWorkflow {
    private StockWorkflow stockWorkflow = Workflow.newChildWorkflowStub(StockWorkflow.class,
            ChildWorkflowOptions.newBuilder()
                    .setWorkflowId("stockChildWorkflow")
                    .setTaskQueue("HLC_TRADING_ORDERS_STOCK").build()
    );
    private PaymentWorkflow paymentWorkflow = Workflow.newChildWorkflowStub(PaymentWorkflow.class,
            ChildWorkflowOptions.newBuilder()
                    .setWorkflowId("paymentChildWorkflow")
                    .setTaskQueue("HLC_TRADING_ORDERS_PAYMENT").build());
    @Override
    public void fulfill(Order payload) {
        boolean stockRes = stockWorkflow.doStock(payload);
        if (stockRes) {
            boolean paymentRes = paymentWorkflow.doPayment(payload);
            if (paymentRes) {
                System.out.printf("order workflow completed for orderId= %d", payload.getId());
            } else {
                System.out.printf("stock service failed with result = %b\n", paymentRes);
            }
        } else {
            System.out.printf("stock service failed with result = %b\n", stockRes);
        }
    }
}

3. Stok Servisi

Stok servisi e-ticaret operasyonunda stoğun kontrolü ve düşürülmesi adımlarını simüle etmek için Java diliyle oluşturulmuştur.

Stok servisi
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
public class StockWorkflowImpl implements StockWorkflow {
    @Override
    public boolean doStock(Order order) {
        for (Product product : order.getProducts()) {
            boolean stockAvailable = accountActivityStub.checkStock(product.getId(), product.getQuantity());
            if (stockAvailable) {
                accountActivityStub.decreaseStock(product.getId(), product.getQuantity());
            } else {
                return false;
            }
        }
        return true;
    }
}

Bu servis içerisinde 1 ana workflow altında 2 activity oluşturdum. Basit olarak Sipariş modelini alıp ilgili ürünün stokta var ise eksiltilmesini taklit ediyor. Gerçek bir DB bağlantısı yok.

1
2
3
4
5
private final RetryOptions defaultRetryOptions = RetryOptions.newBuilder()
        .setMaximumAttempts(4)
        .setMaximumInterval(Duration.ofSeconds(10L))
        .setDoNotRetry("temporalworkers.exception.CheckStockException")
        .build();

Daha önceden belirttiğimiz gibi Temporal aktivite ve iş akışlarımız hata verse dahi bunları çalıştırma garantisi sunar. Bu hataların nasıl yönetileceğini ise RetryOptions seçeneği ile yönetebiliyoruz. Bir aktivite maksimum kaç kere denenebilir, maksimum çalışma süresi ne olmalıdır yahut hangi hata alındığında aktivite tekrar denenmemelidir gibi soruların cevaplarını bu modelde belirleyebiliyoruz. Özellikle “setMaximumInterval” değerinin muhakkak belirlenmesini tavsiye ediyorum. Aksi hâlde, fark edilmeyen en ufak bir hata ya da koşul, bilinçli bir müdahaleyle sonlandırılmadığı takdirde sürecin sonsuza dek çalışmasına neden olur. Bu da aslında bizim için ek maliyet ve müşteri memnuniyetsizliğiyle sonuçlanacaktır.

Stok servisi içerisinde yer alan 2 aktivitenin birinin fırlattığı “CheckStockException” objesini NonRetryable exception olarak belirledim. Retrayble objeler için ise maksimum deneme sayısını 4 olarak belirledim. Bu kısaca şu demek: CheckStockException hatası alırsam aktivite başarısız olur. Herhangi bir aktivite maksimum 10 saniye çalışabilir, hata veren aktivite en fazla 4 kere denenebilir (CheckStockException değilse).

Bu özelliklerin doğru çalıştığını test etmek için 2 test siparişi oluşturup birinin CheckStockException fırlatmasını ve dolayısıyla başarısız olmasını, diğerinin ise DecreaseStockException fırlatmasını ve tekrar denenmesini sağladım. DecreaseStockException fırlatacak fonksiyon kendi içerisinde 2 kere fail edip 3. sefer başarılı olacak şekilde kurgulandı:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
@Override
public void decreaseStock(long productId, int quantity) throws DecreaseStockException {
    ActivityExecutionContext context = Activity.getExecutionContext();
    int currentAttempt = context.getInfo().getAttempt();
    System.out.printf("decreaseStock called for productId=%d, quantity=%d, attempt=%d\n", productId, quantity, currentAttempt);
    // Fail for the first two attempts (attempt 1 and 2)
    if (currentAttempt <= 2) {
        System.out.println("Simulating failure for attempt " + currentAttempt);
        throw new DecreaseStockException("Simulated failure on attempt " + currentAttempt);
    }
    // Succeed on the third attempt (attempt 3) and subsequent attempts
    System.out.printf("Stock for productId=%d decreased by %d. (Successful on attempt %d)\n", productId, quantity, currentAttempt);
}

Eğer “currentAttempt <= 2” kısmını “currentAttempt <= 3” yapsaydık bu sefer de “setMaximumAttempts(4)” değerine takılacağımızı hatırlatmak isterim.

iş akışları sonuçları

Burada Temporal servisi akış özetleri ekranında 2 OrderWorkflow ve 2 child StockWorkflow görüyoruz. Fail olan childWorkflow OrderWorkflow’un da fail olmasına neden oldu, çünkü CheckStockException fırlattı.

StockWorkflow child OrderWorkflow parent

Başarılı olan childWorkflow ise 2 kere DecreaseStockException fırlattı. Bu hata tekrar denenebilir ve 4 kereden az olduğu için 3.sefer de denenebildi ve başarılı olarak tamamlandı.

DecreseStockException in child StockWorkflow with success

Esas OrderWorkflow’u ise WebUI üzerinden kasti olarak “terminate” ettim. Çünkü Ödeme Worker aşamasına geçmişti ve o servisin implementasyonu henüz tamamlanmamıştı. Şimdi diğer servisimiz olan Ödeme Worker servisine geçelim.

4. Ödeme Servisi

Ödeme servisi e-ticaret operasyonunda ödeme işlemini simüle etmek için Go diliyle oluşturulmuştur.

Ödeme servisi

Ödeme servisi içerisindeki akışın altında 3 aktivite mevcut. Dolandırıcılık risk kontrolü, ödeme işlemi ve bildirim işlemi.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
func FraudRiskCheck(ctx context.Context, order Order) (bool, error) {
    logger := activity.GetLogger(ctx)
    logger.Info("fraud and risk activity simulation", "orderId", order.id)
    time.Sleep(time.Duration(rand.IntN(10)))
    return true, nil
}
func Pay(ctx context.Context, order Order) (bool, error) {
    logger := activity.GetLogger(ctx)
    logger.Info("payment completion activity simulation", "orderId", order.id)
    time.Sleep(time.Duration(rand.IntN(10)))
    return true, nil
}
func PaymentNotification(ctx context.Context, order Order) (bool, error) {
    logger := activity.GetLogger(ctx)
    logger.Info("payment notification activity simulation", "orderId", order.id)
    time.Sleep(time.Duration(rand.IntN(10)))
    return true, nil
}

İş akışımız içerisinde bu işlemleri asenkron bir şekilde çağıracağız. Böylelikle birbirini beklemesi gerekmeyen aktiviteleri aynı anda çağırarak kaynak tasarrufu sağlayacağız. Bu yapıyı aktivite çalıştırırken “Future” yapısı bize sağlayacak. Daha önceki aktivite çağırmalarımızda senkron yapıyı kullanmıştık ve sıradaki aktivite adımlarına geçmek için önceden çağırılmış aktivitenin tamamlanmasını beklemiştik. Bu yapı ile direkt istediğimiz aktiviteyi çağırıp sonuçlarını sonradan değerlendirebiliyoruz.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
func PaymentWorkflow(ctx workflow.Context, order Order) (bool, error) {
    ao := workflow.ActivityOptions{
       StartToCloseTimeout: 10 * time.Second,
    }
    ctx = workflow.WithActivityOptions(ctx, ao)
    logger := workflow.GetLogger(ctx)
    logger.Info("payment workflow started", "orderId ", order.id)
    fraudFuture := workflow.ExecuteActivity(ctx, FraudRiskCheck, order)
    payFuture := workflow.ExecuteActivity(ctx, Pay, order)
    notificationFuture := workflow.ExecuteActivity(ctx, PaymentNotification, order)
    var fraudResult bool
    var payResult bool
    var notificationResult bool
    
    err1 := fraudFuture.Get(ctx, &fraudResult)
    if err1 != nil {
       logger.Error("FraudRiskCheck activity failed.", "Error", err1)
       return false, err1
    }
    logger.Info("FraudRiskCheck completed", "result", fraudResult)
    err2 := payFuture.Get(ctx, &payResult)
    if err2 != nil {
       logger.Error("Pay activity failed.", "Error", err2)
       return false, err2
    }
    logger.Info("Pay completed", "result", payResult)
    err3 := notificationFuture.Get(ctx, &notificationResult)
    if err3 != nil {
       logger.Error("PaymentNotification activity failed.", "Error", err3)
       return false, err3
    }
    
    logger.Info("PaymentNotification completed", "result", notificationResult)
    overallSuccess := fraudResult && payResult && notificationResult
    logger.Info("payment workflow completed.", "overallSuccess", overallSuccess)
    return overallSuccess, nil
}

Bu kurguladığımız yapıyı test etmek için 2 sipariş oluşturdum ve sonuçları şu şekilde:

ödeme servisi asenkron aktiviteler sonuç 1 ödeme servisi asenkron aktiviteler sonuç 1

Aktiviteler içerisindeki rastgele bekleme süresi farkları sonuçlardan görülebilir.

Böylelikle demomuzun sonuna geldik. Backend uygulamamıza gelen sipariş isteği, orkestratör worker’ına iletildi. Orkestratör öncelikle stok servisini daha sonrasında ise ödeme servisini çağırdı. Stok servisi içerisinde tekrar edilebilen ve edilemeyen hatalar, maksimum çalışma ve tekrar süresi gösterildi. Ödeme servisinde ise asenkron çalışabilen aktiviteler gösterildi. Şimdi değerlendirmeye geçebiliriz.

Sonuç

Temporal ne zaman kullanılmalıdır?

  • Uzun süren, stateful akışlarda
  • Karmaşık retry, timeout mekanizmaları gerektiğinde
  • Mikroservisler arası dayanıklı orkestrasyon gerektiğinde.

Basit event stream, kısa ömürlü işler veya yüksek çıktı gerektiren mesaj kuyruğu durumlarında ise kullanılması tavsiye edilmiyor.

Avantajları

  • Dayanaklı, tekrar edilebilen akışlar
  • Retry/timeout/cron/saga native
  • Kod odaklı güçlü SDK’ler, Java, Go, Python vs.
  • Operasyonel hatalara dayanıklı

Dezavantajları

  • Öğrenme eğrisi
  • Altyapı gerekliliği (temporal cluster)
  • Yüksek çıktı (throughput stream) işleri için uygun değil

Diğer araçlar ile farkları

  • Zeebe (Camunda) BPMN-first diagramlar, Temporal code-first
  • Airflow batch/data pipeline, uzun çalışan, event-driven için uygun değil
  • Windmill hızlı prototip ve UI ağırlıklı, Temporal production-grade güvenlik sağlıyor.

Böylelikle yazımın sonuna geldik. Sizin Temporal ile deneyiminiz yahut alternatif senaryolarınız nedir? Geri dönüşlerinizi bekliyorum. Projeye ait kodlara şu repository üzerinden erişebilirsiniz.

Kaynaklar:

docs.temporal.io

Bu gönderi CC BY 4.0 lisansı altındadır.