概要
冪等性を無視した都合の良い架空の無限で壊れないキューを前提として作りがちですが そのような都合のいいキューは存在しません。 Asynqは、タスクをキューに入れ、ワーカーとバックグラウンドで処理するためのGoライブラリです。Redisが支援していて簡単に統合できるように組み込まれています。
Asynqの動作の概要
- クライアントがタスクをキューに入れる
- サーバーはタスクをキューから取ってきて、各タスクのワーカーgoroutineを開始します
- タスクは複数のワーカーによって同時に処理されます
2020年6月15日現在において開発中であり APIや挙動に変更の可能性があるそうです。
特徴
タスクの少なくとも1つの実行を保証 タスクのスケジューリング タスクはRedisに書き込まれるため、耐久性 失敗したタスクの再試行 加重優先キュー 厳格な優先キュー Redisでは書き込みが高速であるため、タスクを追加するための低レイテンシ 独自のオプションを使用したタスクの重複排除 タスクごとにタイムアウトと期限を許可する ミドルウェアをサポートする柔軟なハンドラーインターフェイス キューを一時停止して、キューからのタスクの処理を停止する機能 HAのRedisセンチネルのサポート キューとタスクを検査およびリモート制御するCLI
やっていき(最小)
redis の起動 for docker compose
docker-compose.yaml
に記載していく
version: '3' services: redis: image: "redis:latest" ports: - "6379:6379" volumes: - "./data/redis:/data"
実行
sudo docker-compose up -d
$ docker exec -it [CONTAINER ID] /bin/bash root@46f68f517bf1:/data# redis-cli 127.0.0.1:6379> keys *
Getting Started
ファイルの作成とパッケージのインストール
mkdir quickstart && cd quickstart go mod init asynq-quickstart mkdir client workers touch client/client.go workers/workers.go go get -u github.com/hibiken/asynq
client/client.go
package main import ( "github.com/hibiken/asynq" "log" "time" ) var redis = asynq.RedisClientOpt{ Addr: "localhost:6379", // Omit if no password is required Password: "mypassword", // Use a dedicated db number for asynq. // By default, Redis offers 16 databases (0..15) DB: 0, } // Task represents a task to be performed. type Task struct { // Type indicates the type of a task to be performed. Type string // Payload holds data needed to perform the task. } func main() { r := asynq.RedisClientOpt{Addr: "localhost:6379"} client := asynq.NewClient(r) // Create a task with typename and payload. t1 := asynq.NewTask("email:welcome", map[string]interface{}{"user_id": 42}) t2 := asynq.NewTask("email:reminder", map[string]interface{}{"user_id": 42}) // Process the task immediately. err := client.Enqueue(t1) if err != nil { log.Fatal(err) } // Process the task 24 hours later. err = client.EnqueueIn(24*time.Hour, t2) if err != nil { log.Fatal(err) } }
workers/workers.go
package main import ( "context" "fmt" "github.com/hibiken/asynq" "log" ) var redis = asynq.RedisClientOpt{ Addr: "localhost:6379", // Omit if no password is required Password: "mypassword", // Use a dedicated db number for asynq. // By default, Redis offers 16 databases (0..15) DB: 0, } // Task represents a task to be performed. type Task struct { // Type indicates the type of a task to be performed. Type string // Payload holds data needed to perform the task. } func handler(ctx context.Context, t *asynq.Task) error { switch t.Type { case "email:welcome": id, err := t.Payload.GetInt("user_id") if err != nil { return err } fmt.Printf("Send Welcome Email to User %d\n", id) case "email:reminder": id, err := t.Payload.GetInt("user_id") if err != nil { return err } fmt.Printf("Send Reminder Email to User %d\n", id) default: return fmt.Errorf("unexpected task type: %s", t.Type) } return nil } func main() { r := asynq.RedisClientOpt{Addr: "localhost:6379"} srv := asynq.NewServer(r, asynq.Config{ Concurrency: 10, }) // Use asynq.HandlerFunc adapter for a handler function if err := srv.Run(asynq.HandlerFunc(handler)); err != nil { log.Fatal(err) } }
実行
# go run client/client.go # go run workers/workers.go asynq: pid=16466 2020/06/15 04:52:25.367499 INFO: Starting processing asynq: pid=16466 2020/06/15 04:52:25.367528 INFO: Send signal TSTP to stop processing new tasks asynq: pid=16466 2020/06/15 04:52:25.367536 INFO: Send signal TERM or INT to terminate the process Send Welcome Email to User 42 <- キューを先に置いておいて処理してくれた。
最後に
気軽に信頼性の高いタスクスケジューラーが構築できるのはそれだけで尊いことです。asynq は実装も単純で最高のアプリであると思いました。