Asynq の最小スタートのログ

概要

冪等性を無視した都合の良い架空の無限で壊れないキューを前提として作りがちですが そのような都合のいいキューは存在しません。 Asynqは、タスクをキューに入れ、ワーカーとバックグラウンドで処理するためのGoライブラリです。Redisが支援していて簡単に統合できるように組み込まれています。

Asynqの動作の概要

  • クライアントがタスクをキューに入れる
  • サーバーはタスクをキューから取ってきて、各タスクのワーカーgoroutineを開始します
  • タスクは複数のワーカーによって同時に処理されます f:id:syu-m-5151:20200615130549p:plain

2020年6月15日現在において開発中であり APIや挙動に変更の可能性があるそうです。

特徴

タスクの少なくとも1つの実行を保証
タスクのスケジューリング
タスクはRedisに書き込まれるため、耐久性
失敗したタスクの再試行
加重優先キュー
厳格な優先キュー
Redisでは書き込みが高速であるため、タスクを追加するための低レイテンシ
独自のオプションを使用したタスクの重複排除
タスクごとにタイムアウトと期限を許可する
ミドルウェアをサポートする柔軟なハンドラーインターフェイス
キューを一時停止して、キューからのタスクの処理を停止する機能
HAのRedisセンチネルのサポート
キューとタスクを検査およびリモート制御するCLI

やっていき(最小)

redis の起動 for docker compose

docker-compose.yaml に記載していく

version: '3'
services:
  redis:
    image: "redis:latest"
    ports:
      - "6379:6379"
    volumes:
      - "./data/redis:/data"

実行

sudo docker-compose up -d
$ docker exec -it [CONTAINER ID] /bin/bash
root@46f68f517bf1:/data# redis-cli
127.0.0.1:6379> keys *

Getting Started

ファイルの作成とパッケージのインストール

mkdir quickstart && cd quickstart
go mod init asynq-quickstart
mkdir client workers
touch client/client.go workers/workers.go
go get -u github.com/hibiken/asynq

client/client.go

package main

import (
        "github.com/hibiken/asynq"
        "log"
        "time"
)

var redis = asynq.RedisClientOpt{
        Addr: "localhost:6379",
        // Omit if no password is required
        Password: "mypassword",
        // Use a dedicated db number for asynq.
        // By default, Redis offers 16 databases (0..15)
        DB: 0,
}

// Task represents a task to be performed.
type Task struct {
        // Type indicates the type of a task to be performed.
        Type string
        // Payload holds data needed to perform the task.
}

func main() {
        r := asynq.RedisClientOpt{Addr: "localhost:6379"}
        client := asynq.NewClient(r)
        // Create a task with typename and payload.
        t1 := asynq.NewTask("email:welcome", map[string]interface{}{"user_id": 42})
        t2 := asynq.NewTask("email:reminder", map[string]interface{}{"user_id": 42})
        // Process the task immediately.
        err := client.Enqueue(t1)
        if err != nil {
                log.Fatal(err)
        }
        // Process the task 24 hours later.
        err = client.EnqueueIn(24*time.Hour, t2)
        if err != nil {
                log.Fatal(err)
        }
}

workers/workers.go

package main

import (
        "context"
        "fmt"
        "github.com/hibiken/asynq"
        "log"
)

var redis = asynq.RedisClientOpt{
        Addr: "localhost:6379",
        // Omit if no password is required
        Password: "mypassword",
        // Use a dedicated db number for asynq.
        // By default, Redis offers 16 databases (0..15)
        DB: 0,
}
// Task represents a task to be performed.
type Task struct {
        // Type indicates the type of a task to be performed.
        Type string
        // Payload holds data needed to perform the task.
}

func handler(ctx context.Context, t *asynq.Task) error {
        switch t.Type {
        case "email:welcome":
                id, err := t.Payload.GetInt("user_id")
                if err != nil {
                        return err
                }
                fmt.Printf("Send Welcome Email to User %d\n", id)
        case "email:reminder":
                id, err := t.Payload.GetInt("user_id")
                if err != nil {
                        return err
                }
                fmt.Printf("Send Reminder Email to User %d\n", id)
        default:
                return fmt.Errorf("unexpected task type: %s", t.Type)
        }
        return nil
}
func main() {
        r := asynq.RedisClientOpt{Addr: "localhost:6379"}
        srv := asynq.NewServer(r, asynq.Config{
                Concurrency: 10,
        })
        // Use asynq.HandlerFunc adapter for a handler function
        if err := srv.Run(asynq.HandlerFunc(handler)); err != nil {
                log.Fatal(err)
        }
}

実行

# go run client/client.go
# go run workers/workers.go                                           
asynq: pid=16466 2020/06/15 04:52:25.367499 INFO: Starting processing
asynq: pid=16466 2020/06/15 04:52:25.367528 INFO: Send signal TSTP to stop processing new tasks
asynq: pid=16466 2020/06/15 04:52:25.367536 INFO: Send signal TERM or INT to terminate the process
Send Welcome Email to User 42 <- キューを先に置いておいて処理してくれた。

最後に

気軽に信頼性の高いタスクスケジューラーが構築できるのはそれだけで尊いことです。asynq は実装も単純で最高のアプリであると思いました。