├── .gitignore ├── go.mod ├── Makefile ├── signing_key_test.go ├── events_test.go ├── .github └── workflows │ └── build.yaml ├── go.sum ├── signing_key.go ├── LICENSE ├── dlq_test.go ├── receiver_test.go ├── README.md ├── queue_test.go ├── reciever.go ├── schedule_test.go ├── events.go ├── queue.go ├── url_group_test.go ├── dlq.go ├── schedule.go ├── url_group.go ├── client.go ├── messages.go ├── messages_test.go └── options.go /.gitignore: -------------------------------------------------------------------------------- 1 | # Test binary, built with `go test -c` 2 | *.test 3 | 4 | # Output of the go coverage tool, specifically when used with LiteIDE 5 | *.out 6 | 7 | # Go workspace file 8 | go.work 9 | 10 | .idea/* -------------------------------------------------------------------------------- /go.mod: -------------------------------------------------------------------------------- 1 | module github.com/upstash/qstash-go 2 | 3 | go 1.22.2 4 | 5 | require ( 6 | github.com/golang-jwt/jwt/v5 v5.2.1 7 | github.com/stretchr/testify v1.9.0 8 | ) 9 | 10 | require ( 11 | github.com/davecgh/go-spew v1.1.1 // indirect 12 | github.com/pmezard/go-difflib v1.0.0 // indirect 13 | gopkg.in/yaml.v3 v3.0.1 // indirect 14 | ) 15 | -------------------------------------------------------------------------------- /Makefile: -------------------------------------------------------------------------------- 1 | .PHONY: build test 2 | 3 | build: 4 | go mod tidy 5 | go fmt ./... 6 | go vet ./... 7 | ifneq (, $(shell which staticcheck)) 8 | staticcheck ./... 9 | else 10 | @echo "Skipping 'staticcheck'... Install using (go install honnef.co/go/tools/cmd/staticcheck@latest)" 11 | endif 12 | go build ./... 13 | 14 | test: 15 | ifeq (, $(shell which gotestsum)) 16 | go test ./... 17 | else 18 | gotestsum 19 | endif -------------------------------------------------------------------------------- /signing_key_test.go: -------------------------------------------------------------------------------- 1 | package qstash 2 | 3 | import ( 4 | "github.com/stretchr/testify/assert" 5 | "os" 6 | "testing" 7 | ) 8 | 9 | func TestKeysGet(t *testing.T) { 10 | client := NewClientWithEnv() 11 | 12 | keys, err := client.Keys().Get() 13 | assert.NoError(t, err) 14 | 15 | assert.Equal(t, keys.Current, os.Getenv("QSTASH_CURRENT_SIGNING_KEY")) 16 | assert.Equal(t, keys.Next, os.Getenv("QSTASH_NEXT_SIGNING_KEY")) 17 | } 18 | -------------------------------------------------------------------------------- /events_test.go: -------------------------------------------------------------------------------- 1 | package qstash 2 | 3 | import ( 4 | "github.com/stretchr/testify/assert" 5 | "testing" 6 | "time" 7 | ) 8 | 9 | func TestEvents(t *testing.T) { 10 | client := NewClientWithEnv() 11 | 12 | now := time.Now() 13 | for i := 0; i < 100; i++ { 14 | res, err := client.PublishJSON(PublishJSONOptions{ 15 | Url: "http://httpstat.us/404", 16 | Retries: RetryCount(0), 17 | }) 18 | assert.NoError(t, err) 19 | assert.NotEmpty(t, res.MessageId) 20 | } 21 | 22 | assert.Eventually(t, func() bool { 23 | subT := &testing.T{} 24 | 25 | events, _, err := client.Events().List(ListEventsOptions{ 26 | Filter: EventFilter{ 27 | FromDate: now, 28 | }, 29 | }) 30 | 31 | assert.NoError(subT, err) 32 | assert.Len(subT, events, 400) 33 | return !subT.Failed() 34 | }, time.Second*10, time.Millisecond*100) 35 | } 36 | -------------------------------------------------------------------------------- /.github/workflows/build.yaml: -------------------------------------------------------------------------------- 1 | name: Build 2 | 3 | on: 4 | push: 5 | branches: 6 | - master 7 | - main 8 | pull_request: 9 | workflow_dispatch: 10 | 11 | env: 12 | QSTASH_TOKEN: ${{ secrets.QSTASH_TOKEN }} 13 | QSTASH_CURRENT_SIGNING_KEY: ${{ secrets.QSTASH_CURRENT_SIGNING_KEY }} 14 | QSTASH_NEXT_SIGNING_KEY: ${{ secrets.QSTASH_NEXT_SIGNING_KEY }} 15 | 16 | jobs: 17 | 18 | build: 19 | name: Build 20 | runs-on: ubuntu-latest 21 | steps: 22 | - name: Check out code 23 | uses: actions/checkout@v4 24 | 25 | - name: Setup Go 26 | uses: actions/setup-go@v4 27 | with: 28 | go-version-file: 'go.mod' 29 | cache-dependency-path: 'go.sum' 30 | 31 | - name: Install Go tools 32 | run: go install honnef.co/go/tools/cmd/staticcheck@latest 33 | 34 | - name: Build 35 | run: make 36 | 37 | - name: Test 38 | run: make test -------------------------------------------------------------------------------- /go.sum: -------------------------------------------------------------------------------- 1 | github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= 2 | github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= 3 | github.com/golang-jwt/jwt/v5 v5.2.1 h1:OuVbFODueb089Lh128TAcimifWaLhJwVflnrgM17wHk= 4 | github.com/golang-jwt/jwt/v5 v5.2.1/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk= 5 | github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= 6 | github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= 7 | github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= 8 | github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= 9 | gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= 10 | gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= 11 | gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= 12 | gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= 13 | -------------------------------------------------------------------------------- /signing_key.go: -------------------------------------------------------------------------------- 1 | package qstash 2 | 3 | import ( 4 | "net/http" 5 | ) 6 | 7 | type Keys struct { 8 | client *Client 9 | } 10 | 11 | type SigningKeys struct { 12 | Current string `json:"current"` 13 | Next string `json:"next"` 14 | } 15 | 16 | // Get retrieves the current and next signing keys. 17 | func (k *Keys) Get() (keys SigningKeys, err error) { 18 | opts := requestOptions{ 19 | method: http.MethodGet, 20 | path: "/v2/keys", 21 | } 22 | response, _, err := k.client.fetchWith(opts) 23 | if err != nil { 24 | return 25 | } 26 | keys, err = parse[SigningKeys](response) 27 | return 28 | } 29 | 30 | // Rotate rotates the current signing key and gets the new signing key. 31 | // The next signing key becomes the current signing key, and a new signing key is assigned to the next signing key. 32 | func (k *Keys) Rotate() (keys SigningKeys, err error) { 33 | opts := requestOptions{ 34 | method: http.MethodPost, 35 | path: "/v2/rotate", 36 | } 37 | response, _, err := k.client.fetchWith(opts) 38 | if err != nil { 39 | return 40 | } 41 | keys, err = parse[SigningKeys](response) 42 | return 43 | } 44 | -------------------------------------------------------------------------------- /LICENSE: -------------------------------------------------------------------------------- 1 | The MIT License (MIT) 2 | 3 | Copyright (c) 2024 Upstash, Inc. 4 | 5 | Permission is hereby granted, free of charge, to any person obtaining a copy 6 | of this software and associated documentation files (the "Software"), to deal 7 | in the Software without restriction, including without limitation the rights 8 | to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 9 | copies of the Software, and to permit persons to whom the Software is 10 | furnished to do so, subject to the following conditions: 11 | 12 | The above copyright notice and this permission notice shall be included in all 13 | copies or substantial portions of the Software. 14 | 15 | THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 16 | IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 17 | FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 18 | AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 19 | LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 20 | OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 21 | SOFTWARE. -------------------------------------------------------------------------------- /dlq_test.go: -------------------------------------------------------------------------------- 1 | package qstash 2 | 3 | import ( 4 | "github.com/stretchr/testify/assert" 5 | "testing" 6 | "time" 7 | ) 8 | 9 | func TestDlqGetAndDelete(t *testing.T) { 10 | client := NewClientWithEnv() 11 | 12 | res, err := client.PublishJSON(PublishJSONOptions{ 13 | Url: "http://httpstat.us/404", 14 | Retries: RetryCount(1), 15 | }) 16 | 17 | assert.NoError(t, err) 18 | assert.NotEmpty(t, res.MessageId) 19 | 20 | dlqIds := AssertFailedEventually(t, client, res.MessageId) 21 | 22 | err = client.Dlq().Delete(dlqIds[0]) 23 | assert.NoError(t, err) 24 | } 25 | 26 | func TestDlqGetAndDeleteMany(t *testing.T) { 27 | client := NewClientWithEnv() 28 | 29 | messageIds := make([]string, 3) 30 | 31 | for i := 0; i < 3; i++ { 32 | res, err := client.PublishJSON(PublishJSONOptions{ 33 | Url: "http://httpstat.us/404", 34 | Retries: RetryCount(1), 35 | }) 36 | assert.NoError(t, err) 37 | assert.NotEmpty(t, res.MessageId) 38 | messageIds[i] = res.MessageId 39 | } 40 | 41 | dlqIds := AssertFailedEventually(t, client, messageIds...) 42 | 43 | deleted, err := client.Dlq().DeleteMany(dlqIds) 44 | assert.NoError(t, err) 45 | assert.Equal(t, len(messageIds), deleted) 46 | } 47 | 48 | func AssertFailedEventually(t *testing.T, client *Client, messageIds ...string) (dlqIds []string) { 49 | dlqIds = make([]string, len(messageIds)) 50 | assert.Eventually(t, func() bool { 51 | 52 | subT := &testing.T{} 53 | 54 | for idx, messageId := range messageIds { 55 | 56 | dlqMessages, _, err := client.Dlq().List(ListDlqOptions{ 57 | Filter: DlqFilter{ 58 | MessageId: messageId, 59 | }, 60 | }) 61 | assert.NoError(subT, err) 62 | assert.Len(subT, dlqMessages, 1) 63 | 64 | if !subT.Failed() { 65 | match := dlqMessages[0] 66 | res, err := client.Dlq().Get(match.DlqId) 67 | assert.NoError(subT, err) 68 | assert.Equal(subT, "404 Not Found", res.ResponseBody) 69 | assert.Equal(subT, "404 Not Found", match.ResponseBody) 70 | dlqIds[idx] = match.DlqId 71 | } 72 | } 73 | return !subT.Failed() 74 | }, time.Second*30, time.Millisecond*100) 75 | return 76 | } 77 | -------------------------------------------------------------------------------- /receiver_test.go: -------------------------------------------------------------------------------- 1 | package qstash 2 | 3 | import ( 4 | "crypto/sha256" 5 | "encoding/base64" 6 | "encoding/json" 7 | "fmt" 8 | "github.com/golang-jwt/jwt/v5" 9 | "github.com/stretchr/testify/assert" 10 | "os" 11 | "strings" 12 | "testing" 13 | "time" 14 | ) 15 | 16 | func sign(body string, key string) (string, error) { 17 | // Compute SHA-256 hash 18 | hash := sha256.New() 19 | hash.Write([]byte(body)) 20 | bodyHash := hash.Sum(nil) 21 | 22 | bodyHashBase64 := base64.URLEncoding.EncodeToString(bodyHash) 23 | bodyHashBase64 = strings.Trim(bodyHashBase64, "=") 24 | 25 | // Create JWT payload 26 | now := time.Now().Unix() 27 | payload := jwt.MapClaims{ 28 | "aud": "", 29 | "body": bodyHashBase64, 30 | "exp": now + 300, 31 | "iat": now, 32 | "iss": "Upstash", 33 | "jti": fmt.Sprintf("%f", float64(now)), // Converting time to a string to mimic Python's time.time() 34 | "nbf": now, 35 | "sub": "https://example.com", 36 | } 37 | 38 | // Create JWT token 39 | token := jwt.NewWithClaims(jwt.SigningMethodHS256, payload) 40 | token.Header["alg"] = "HS256" 41 | token.Header["typ"] = "JWT" 42 | 43 | // Sign the token 44 | signature, err := token.SignedString([]byte(key)) 45 | if err != nil { 46 | return "", err 47 | } 48 | 49 | return signature, nil 50 | } 51 | 52 | func TestVerify(t *testing.T) { 53 | 54 | receiver := NewReceiverWithEnv() 55 | 56 | body, err := json.Marshal(map[string]string{"hello": "world"}) 57 | assert.NoError(t, err) 58 | 59 | signature, err := sign(string(body), os.Getenv(currentSigningKeyEnvProperty)) 60 | assert.NoError(t, err) 61 | 62 | err = receiver.Verify(VerifyOptions{ 63 | Signature: signature, 64 | Url: "https://example.com", 65 | Body: string(body), 66 | }) 67 | assert.NoError(t, err) 68 | } 69 | 70 | func TestFailedVerify(t *testing.T) { 71 | receiver := NewReceiverWithEnv() 72 | 73 | body, err := json.Marshal(map[string]string{"hello": "world"}) 74 | assert.NoError(t, err) 75 | 76 | signature, err := sign(string(body), os.Getenv(currentSigningKeyEnvProperty)) 77 | assert.NoError(t, err) 78 | 79 | err = receiver.Verify(VerifyOptions{ 80 | Signature: signature, 81 | Url: "https://example.com/fail", 82 | Body: string(body), 83 | }) 84 | assert.Error(t, err) 85 | } 86 | -------------------------------------------------------------------------------- /README.md: -------------------------------------------------------------------------------- 1 | # Upstash QStash Go SDK 2 | 3 | > [!NOTE] 4 | > **This project is in the Experimental Stage.** 5 | > 6 | > We declare this project experimental to set clear expectations for your usage. There could be known or unknown bugs, the API could evolve, or the project could be discontinued if it does not find community adoption. While we cannot provide professional support for experimental projects, we’d be happy to hear from you if you see value in this project! 7 | 8 | 9 | QStash is an HTTP based messaging and scheduling solution for serverless and edge runtimes. 10 | 11 | [QStash Documentation](https://upstash.com/docs/qstash) 12 | 13 | ## Install 14 | 15 | Use go get to install the Upstash QStash package: 16 | 17 | ``` 18 | go get github.com/upstash/qstash-go 19 | ``` 20 | 21 | Import the Upstash QStash package in your project: 22 | 23 | ``` 24 | import "github.com/upstash/qstash-go" 25 | ``` 26 | 27 | ## Usage 28 | 29 | ### Publish a JSON message 30 | 31 | ``` 32 | client := qstash.NewClient("") 33 | 34 | // Error checking is omitted for breavity 35 | res, _ := client.PublishJSON(qstash.PublishJSONOptions{ 36 | Url: "https://example.com", 37 | Body: map[string]any{ 38 | "hello": "world", 39 | }, 40 | Headers: map[string]string{ 41 | "test-header": "test-value", 42 | }, 43 | }) 44 | 45 | fmt.Println(res.MessageId) 46 | ``` 47 | 48 | ### Create a scheduled message 49 | 50 | ``` 51 | client := qstash.NewClient("") 52 | 53 | scheduleId, err := client.Schedules().Create(qstash.ScheduleOptions{ 54 | Destination: "https://example.com", 55 | Cron: "*/5 * * * *", 56 | }) 57 | // handle err 58 | 59 | fmt.Print(scheduleId) 60 | ``` 61 | 62 | ### Receiving messages 63 | 64 | ``` 65 | receiver := qstash.NewReceiver("", "NEXT_SIGNING_KEY") 66 | 67 | // ... in your request handler 68 | 69 | signature := req.Header.Get("Upstash-Signature") 70 | body, err := io.ReadAll(req.Body) 71 | // handle err 72 | 73 | err := receiver.Verify(qstash.VerifyOptions{ 74 | Signature: signature, 75 | Body: string(body), 76 | Url: "https://example.com", // optional 77 | }) 78 | // handle err 79 | ``` 80 | 81 | Additional methods are available for managing url groups, schedules, and messages. 82 | -------------------------------------------------------------------------------- /queue_test.go: -------------------------------------------------------------------------------- 1 | package qstash 2 | 3 | import ( 4 | "github.com/stretchr/testify/assert" 5 | "testing" 6 | ) 7 | 8 | func TestQueue(t *testing.T) { 9 | client := NewClientWithEnv() 10 | 11 | name := "test-queue" 12 | err := client.Queues().Upsert(Queue{ 13 | Name: name, 14 | Parallelism: 1, 15 | }) 16 | assert.NoError(t, err) 17 | 18 | queue, err := client.Queues().Get(name) 19 | assert.NoError(t, err) 20 | assert.Equal(t, queue.Name, name) 21 | assert.Equal(t, queue.Parallelism, 1) 22 | 23 | // Reconfigure queue parallelism 24 | err = client.Queues().Upsert(Queue{ 25 | Name: name, 26 | Parallelism: 2, 27 | }) 28 | assert.NoError(t, err) 29 | 30 | queues, err := client.Queues().List() 31 | assert.NoError(t, err) 32 | assert.Len(t, queues, 1) 33 | assert.Equal(t, queues[0].Name, name) 34 | assert.Equal(t, queues[0].Parallelism, 2) 35 | 36 | // Delete queue 37 | err = client.Queues().Delete(name) 38 | assert.NoError(t, err) 39 | 40 | queues, err = client.Queues().List() 41 | assert.NoError(t, err) 42 | assert.Empty(t, queues) 43 | } 44 | 45 | func TestQueuePauseAndResume(t *testing.T) { 46 | client := NewClientWithEnv() 47 | 48 | name := "test-queue" 49 | err := client.Queues().Upsert(Queue{ 50 | Name: name, 51 | Parallelism: 1, 52 | }) 53 | assert.NoError(t, err) 54 | 55 | queue, err := client.Queues().Get(name) 56 | assert.NoError(t, err) 57 | assert.False(t, queue.IsPaused) 58 | 59 | // Pause the queue 60 | err = client.Queues().Pause(name) 61 | assert.NoError(t, err) 62 | 63 | queue, err = client.Queues().Get(name) 64 | assert.NoError(t, err) 65 | assert.True(t, queue.IsPaused) 66 | 67 | // Resume the queue 68 | err = client.Queues().Resume(name) 69 | assert.NoError(t, err) 70 | 71 | queue, err = client.Queues().Get(name) 72 | assert.NoError(t, err) 73 | assert.False(t, queue.IsPaused) 74 | 75 | // Pause the queue with upsert 76 | err = client.Queues().Upsert(Queue{ 77 | Name: name, 78 | Parallelism: 1, 79 | IsPaused: true, 80 | }) 81 | assert.NoError(t, err) 82 | 83 | queue, err = client.Queues().Get(name) 84 | assert.NoError(t, err) 85 | assert.True(t, queue.IsPaused) 86 | 87 | // Resume the queue with upsert 88 | err = client.Queues().Upsert(Queue{ 89 | Name: name, 90 | Parallelism: 1, 91 | IsPaused: false, 92 | }) 93 | assert.NoError(t, err) 94 | 95 | queue, err = client.Queues().Get(name) 96 | assert.NoError(t, err) 97 | assert.False(t, queue.IsPaused) 98 | } 99 | -------------------------------------------------------------------------------- /reciever.go: -------------------------------------------------------------------------------- 1 | package qstash 2 | 3 | import ( 4 | "crypto/sha256" 5 | "encoding/base64" 6 | "errors" 7 | "fmt" 8 | "github.com/golang-jwt/jwt/v5" 9 | "os" 10 | "strings" 11 | "time" 12 | ) 13 | 14 | var ( 15 | ErrInvalidSignature = fmt.Errorf("failed to validate signature") 16 | ) 17 | 18 | // Receiver offers a simple way to verify the signature of a request. 19 | type Receiver struct { 20 | CurrentSigningKey string 21 | NextSigningKey string 22 | } 23 | 24 | func NewReceiverWithEnv() *Receiver { 25 | return &Receiver{ 26 | CurrentSigningKey: os.Getenv(currentSigningKeyEnvProperty), 27 | NextSigningKey: os.Getenv(nextSigningKeyEnvProperty), 28 | } 29 | } 30 | 31 | func NewReceiver(currentSigningKey, nextSigningKey string) *Receiver { 32 | return &Receiver{ 33 | CurrentSigningKey: currentSigningKey, 34 | NextSigningKey: nextSigningKey, 35 | } 36 | } 37 | 38 | type claims struct { 39 | Body string `json:"body"` 40 | jwt.RegisteredClaims 41 | } 42 | 43 | func verify(key string, opts VerifyOptions) (err error) { 44 | token, err := jwt.ParseWithClaims(opts.Signature, &claims{}, func(token *jwt.Token) (interface{}, error) { 45 | if _, ok := token.Method.(*jwt.SigningMethodHMAC); !ok { 46 | return nil, ErrInvalidSignature 47 | } 48 | return []byte(key), nil 49 | }, jwt.WithLeeway(opts.Tolerance), jwt.WithIssuer("Upstash")) 50 | if err != nil { 51 | return ErrInvalidSignature 52 | } 53 | c, ok := token.Claims.(*claims) 54 | if !ok { 55 | return ErrInvalidSignature 56 | } 57 | if opts.Url != "" && c.Subject != opts.Url { 58 | return ErrInvalidSignature 59 | } 60 | h := sha256.New() 61 | h.Write([]byte(opts.Body)) 62 | bHash := h.Sum(nil) 63 | b64hash := strings.Trim(base64.URLEncoding.EncodeToString(bHash), "=") 64 | if strings.Trim(c.Body, "=") != b64hash { 65 | return ErrInvalidSignature 66 | } 67 | return nil 68 | } 69 | 70 | type VerifyOptions struct { 71 | // Signature is the signature from the `Upstash-Signature` header. 72 | Signature string 73 | // Url is the address of the endpoint where the request was sent to. When set to `None`, url is not check. 74 | Url string 75 | // Body is the raw request body. 76 | Body string 77 | // Tolerance is the duration to tolerate when checking `nbf` and `exp` claims, to deal with small clock differences among different servers. 78 | Tolerance time.Duration 79 | } 80 | 81 | // Verify verifies the signature of a request. 82 | // It tries to verify the signature with the current signing key. 83 | // If that fails, maybe because you have rotated the keys recently, it will try to verify the signature with the next signing key. 84 | func (r *Receiver) Verify(opts VerifyOptions) (err error) { 85 | err = verify(r.CurrentSigningKey, opts) 86 | if errors.Is(err, ErrInvalidSignature) { 87 | err = verify(r.NextSigningKey, opts) 88 | } 89 | return 90 | } 91 | -------------------------------------------------------------------------------- /schedule_test.go: -------------------------------------------------------------------------------- 1 | package qstash 2 | 3 | import ( 4 | "github.com/stretchr/testify/assert" 5 | "testing" 6 | ) 7 | 8 | func TestSchedule(t *testing.T) { 9 | client := NewClientWithEnv() 10 | 11 | // Create a schedule 12 | scheduleId, err := client.Schedules().CreateJSON(ScheduleJSONOptions{ 13 | Cron: "1 1 1 1 1", 14 | Destination: "https://example.com", 15 | Body: map[string]any{ 16 | "ex_key": "ex_value", 17 | }, 18 | }) 19 | assert.NoError(t, err) 20 | assert.NotEmpty(t, scheduleId) 21 | 22 | // Get a schedule 23 | schedule, err := client.Schedules().Get(scheduleId) 24 | assert.NoError(t, err) 25 | assert.Equal(t, schedule.Id, scheduleId) 26 | assert.Equal(t, schedule.Cron, "1 1 1 1 1") 27 | assert.Equal(t, schedule.Destination, "https://example.com") 28 | 29 | // List all schedules 30 | schedules, err := client.Schedules().List() 31 | assert.NoError(t, err) 32 | assert.GreaterOrEqual(t, len(schedules), 1) 33 | 34 | scheduleIds := make([]string, len(schedules)) 35 | for idx, s := range schedules { 36 | scheduleIds[idx] = s.Id 37 | } 38 | assert.Contains(t, scheduleIds, scheduleId) 39 | 40 | // Delete the schedule 41 | err = client.Schedules().Delete(scheduleId) 42 | assert.NoError(t, err) 43 | 44 | schedules, err = client.Schedules().List() 45 | assert.NoError(t, err) 46 | scheduleIds = make([]string, len(schedules)) 47 | for idx, s := range schedules { 48 | scheduleIds[idx] = s.Id 49 | } 50 | assert.NotContains(t, schedules, scheduleId) 51 | } 52 | 53 | func TestSchedulePauseAndResume(t *testing.T) { 54 | client := NewClientWithEnv() 55 | 56 | // Create a schedule 57 | scheduleId, err := client.Schedules().CreateJSON(ScheduleJSONOptions{ 58 | Cron: "1 1 1 1 1", 59 | Destination: "https://example.com", 60 | Body: map[string]any{ 61 | "ex_key": "ex_value", 62 | }, 63 | }) 64 | assert.NoError(t, err) 65 | assert.NotEmpty(t, scheduleId) 66 | 67 | // Get a schedule 68 | schedule, err := client.Schedules().Get(scheduleId) 69 | assert.NoError(t, err) 70 | assert.Equal(t, schedule.Id, scheduleId) 71 | assert.Equal(t, schedule.Cron, "1 1 1 1 1") 72 | assert.Equal(t, schedule.Destination, "https://example.com") 73 | assert.False(t, schedule.IsPaused) 74 | 75 | // Pause the schedule 76 | err = client.Schedules().Pause(scheduleId) 77 | assert.NoError(t, err) 78 | 79 | schedule, err = client.Schedules().Get(scheduleId) 80 | assert.NoError(t, err) 81 | assert.Equal(t, schedule.Id, scheduleId) 82 | assert.True(t, schedule.IsPaused) 83 | 84 | // Resume the schedule 85 | err = client.Schedules().Resume(scheduleId) 86 | assert.NoError(t, err) 87 | 88 | schedule, err = client.Schedules().Get(scheduleId) 89 | assert.NoError(t, err) 90 | assert.Equal(t, schedule.Id, scheduleId) 91 | assert.False(t, schedule.IsPaused) 92 | 93 | err = client.Schedules().Delete(scheduleId) 94 | assert.NoError(t, err) 95 | } 96 | -------------------------------------------------------------------------------- /events.go: -------------------------------------------------------------------------------- 1 | package qstash 2 | 3 | import ( 4 | "net/http" 5 | "time" 6 | ) 7 | 8 | type Events struct { 9 | client *Client 10 | } 11 | 12 | type EventState string 13 | 14 | var ( 15 | Created EventState = "CREATED" 16 | Active EventState = "ACTIVE" 17 | Retry EventState = "RETRY" 18 | Error EventState = "ERROR" 19 | Delivered EventState = "DELIVERED" 20 | Failed EventState = "FAILED" 21 | CancelRequested EventState = "CANCEL_REQUESTED" 22 | Canceled EventState = "CANCELED" 23 | ) 24 | 25 | type Event struct { 26 | // Time is the timestamp of this event in Unix time (milliseconds). 27 | Time int64 `json:"time"` 28 | // MessageId is the ID of associated message. 29 | MessageId string `json:"messageId"` 30 | // State is the current state of the message. 31 | State EventState `json:"state"` 32 | // Error is set only if the status of the message is an error. 33 | Error string `json:"error,omitempty"` 34 | // NextDeliveryTime is the next scheduled time of the message in milliseconds 35 | NextDeliveryTime int64 `json:"nextDeliveryTime,omitempty"` 36 | // Url is the destination url 37 | Url string `json:"url"` 38 | // UrlGroup is the name of the url group if this message was sent through an url group, empty otherwise. 39 | UrlGroup string `json:"topicName,omitempty"` 40 | // Endpoint is the name of the endpoint if this message was sent through an url group, empty otherwise. 41 | EndpointName string `json:"endpointName,omitempty"` 42 | // Api is the name of the api if this message was sent to an api. 43 | Api string `json:"api,omitempty"` 44 | // QueueName is the name of the queue if this message is enqueued on a queue, empty otherwise. 45 | QueueName string `json:"queueName,omitempty"` 46 | // ScheduleId is the ID of responsible schedule if the message is triggered by a schedule. 47 | ScheduleId string `json:"scheduleId,omitempty"` 48 | } 49 | 50 | type EventFilter struct { 51 | // MessageId filters events by the ID of the message. 52 | MessageId string 53 | // State filters events by the state of the message. 54 | State EventState 55 | // Url filters events by the URL of the message. 56 | Url string 57 | // UrlGroup filters events by URL group of the message. 58 | UrlGroup string 59 | // Api filters events by the API name of the message. 60 | Api string 61 | // Queue filters events by queue name. 62 | Queue string 63 | // ScheduleId filters events by schedule ID. 64 | ScheduleId string 65 | // FromDate filters events by starting time in milliseconds. 66 | FromDate time.Time 67 | // ToDate filters events by ending time in milliseconds. 68 | ToDate time.Time 69 | } 70 | 71 | type listEventsResponse struct { 72 | Cursor string `json:"cursor,omitempty"` 73 | Events []Event `json:"events"` 74 | } 75 | 76 | // List retrieves all events that occurred, such as message creation or delivery. 77 | func (e *Events) List(options ListEventsOptions) ([]Event, string, error) { 78 | opts := requestOptions{ 79 | method: http.MethodGet, 80 | path: "/v2/events", 81 | params: options.Params(), 82 | } 83 | response, _, err := e.client.fetchWith(opts) 84 | if err != nil { 85 | return nil, "", err 86 | } 87 | events, err := parse[listEventsResponse](response) 88 | if err != nil { 89 | return nil, "", err 90 | } 91 | return events.Events, events.Cursor, nil 92 | } 93 | -------------------------------------------------------------------------------- /queue.go: -------------------------------------------------------------------------------- 1 | package qstash 2 | 3 | import ( 4 | "encoding/json" 5 | "fmt" 6 | "net/http" 7 | ) 8 | 9 | // Queues in QStash are mechanisms that ensure ordered delivery (FIFO) and allow controlled parallelism in processing messages. 10 | // Messages are queued and delivered one by one in a first-in-first-out order, ensuring each message is processed before the next one is activated. 11 | // If a message fails due to an endpoint returning a non-2xx code, retries are attempted before moving to the next message. 12 | // To avoid overwhelming an endpoint and improve throughput, parallelism can be configured, allowing multiple messages to be processed concurrently. 13 | type Queues struct { 14 | client *Client 15 | } 16 | 17 | type QueueWithLag struct { 18 | // Name is the name of the queue. 19 | Name string `json:"name"` 20 | // Parallelism is the number of parallel consumers consuming from the queue. 21 | Parallelism int `json:"parallelism"` 22 | // CreatedAt is the creation time of the queue, in unix milliseconds. 23 | CreatedAt int64 `json:"createdAt"` 24 | // UpdatedAt is the last update time of the queue, in unix milliseconds 25 | UpdatedAt int64 `json:"updatedAt"` 26 | // Lag is the number of unprocessed messages that exist in the queue. 27 | Lag int64 `json:"lag"` 28 | // IsPaused is whether the queue is paused or not. 29 | IsPaused bool `json:"paused"` 30 | } 31 | 32 | type Queue struct { 33 | // Name is the name of the queue 34 | Name string `json:"queueName" validate:"required"` 35 | // Parallelism is the number of parallel consumers consuming from the queue. 36 | Parallelism int `json:"parallelism"` 37 | // IsPaused is whether the queue is paused or not. 38 | IsPaused bool `json:"paused"` 39 | } 40 | 41 | // Upsert updates or creates a queue. 42 | func (c *Queues) Upsert(queue Queue) (err error) { 43 | payload, err := json.Marshal(queue) 44 | if err != nil { 45 | return 46 | } 47 | opts := requestOptions{ 48 | method: http.MethodPost, 49 | path: "/v2/queues", 50 | body: string(payload), 51 | header: contentTypeJson, 52 | } 53 | _, _, err = c.client.fetchWith(opts) 54 | return 55 | } 56 | 57 | // Get retrieves a queue by its name. 58 | func (c *Queues) Get(name string) (schedule QueueWithLag, err error) { 59 | opts := requestOptions{ 60 | method: http.MethodGet, 61 | path: fmt.Sprintf("/v2/queues/%s", name), 62 | } 63 | response, _, err := c.client.fetchWith(opts) 64 | if err != nil { 65 | return 66 | } 67 | schedule, err = parse[QueueWithLag](response) 68 | return 69 | } 70 | 71 | // List retrieves all queues. 72 | func (c *Queues) List() (schedules []QueueWithLag, err error) { 73 | opts := requestOptions{ 74 | method: http.MethodGet, 75 | path: "/v2/queues", 76 | } 77 | response, _, err := c.client.fetchWith(opts) 78 | if err != nil { 79 | return 80 | } 81 | schedules, err = parse[[]QueueWithLag](response) 82 | return 83 | } 84 | 85 | // Delete deletes a queue by its name. 86 | func (c *Queues) Delete(queue string) (err error) { 87 | opts := requestOptions{ 88 | method: http.MethodDelete, 89 | path: fmt.Sprintf("/v2/queues/%s", queue), 90 | } 91 | _, _, err = c.client.fetchWith(opts) 92 | return 93 | } 94 | 95 | // Pause pauses the queue. 96 | // A paused queue will not deliver messages until it is resumed. 97 | func (c *Queues) Pause(queue string) (err error) { 98 | opts := requestOptions{ 99 | method: http.MethodPost, 100 | path: fmt.Sprintf("/v2/queues/%s/pause", queue), 101 | } 102 | _, _, err = c.client.fetchWith(opts) 103 | return 104 | } 105 | 106 | // Resume resumes the queue. 107 | func (c *Queues) Resume(queue string) (err error) { 108 | opts := requestOptions{ 109 | method: http.MethodPost, 110 | path: fmt.Sprintf("/v2/queues/%s/resume", queue), 111 | } 112 | _, _, err = c.client.fetchWith(opts) 113 | return 114 | } 115 | -------------------------------------------------------------------------------- /url_group_test.go: -------------------------------------------------------------------------------- 1 | package qstash 2 | 3 | import ( 4 | "github.com/stretchr/testify/assert" 5 | "testing" 6 | "time" 7 | ) 8 | 9 | func TestUrlGroup(t *testing.T) { 10 | now := time.Now() 11 | client := NewClientWithEnv() 12 | 13 | name := "go_url_group" 14 | 15 | err := client.UrlGroups().Delete(name) 16 | assert.NoError(t, err) 17 | 18 | err = client.UrlGroups().UpsertEndpoints(name, []Endpoint{ 19 | {Url: "https://example.com", Name: "First endpoint"}, 20 | {Url: "https://example.net", Name: "Second endpoint"}, 21 | }) 22 | assert.NoError(t, err) 23 | 24 | urlGroup, err := client.UrlGroups().Get(name) 25 | assert.NoError(t, err) 26 | assert.Equal(t, urlGroup.Name, name) 27 | assert.Len(t, urlGroup.Endpoints, 2) 28 | assert.Greater(t, urlGroup.CreatedAt, now.UnixMilli()) 29 | assert.Greater(t, urlGroup.UpdatedAt, now.UnixMilli()) 30 | assert.Equal(t, urlGroup.Endpoints[0], Endpoint{Url: "https://example.com", Name: "First endpoint"}) 31 | assert.Equal(t, urlGroup.Endpoints[1], Endpoint{Url: "https://example.net", Name: "Second endpoint"}) 32 | 33 | urlGroups, err := client.UrlGroups().List() 34 | assert.NoError(t, err) 35 | assert.Len(t, urlGroups, 1) 36 | assert.Equal(t, urlGroups[0].Name, name) 37 | 38 | err = client.UrlGroups().RemoveEndpoints(name, []Endpoint{ 39 | {Url: "https://example.net"}, 40 | }) 41 | assert.NoError(t, err) 42 | 43 | urlGroup, err = client.UrlGroups().Get(name) 44 | assert.NoError(t, err) 45 | assert.Equal(t, urlGroup.Name, name) 46 | assert.Len(t, urlGroup.Endpoints, 1) 47 | assert.Equal(t, urlGroup.Endpoints[0], Endpoint{Url: "https://example.com", Name: "First endpoint"}) 48 | 49 | err = client.UrlGroups().Delete(name) 50 | assert.NoError(t, err) 51 | } 52 | 53 | func TestPublishToUrlGroup(t *testing.T) { 54 | client := NewClientWithEnv() 55 | 56 | name := "go_url_group" 57 | err := client.UrlGroups().Delete(name) 58 | assert.NoError(t, err) 59 | 60 | err = client.UrlGroups().UpsertEndpoints(name, []Endpoint{ 61 | {Url: "https://example.com"}, 62 | {Url: "https://example.net"}, 63 | }) 64 | assert.NoError(t, err) 65 | 66 | res, err := client.UrlGroups().Publish(PublishUrlGroupOptions{ 67 | UrlGroup: name, 68 | Body: "test-body", 69 | }) 70 | assert.NoError(t, err) 71 | assert.Len(t, res, 2) 72 | assert.NotEmpty(t, res[0].MessageId) 73 | assert.NotEmpty(t, res[1].MessageId) 74 | } 75 | 76 | func TestUrlGroup_Error(t *testing.T) { 77 | client := NewClientWithEnv() 78 | 79 | name := "non_existing_url_group" 80 | 81 | _, err := client.UrlGroups().Publish(PublishUrlGroupOptions{ 82 | UrlGroup: name, 83 | Body: "test-body", 84 | }) 85 | assert.ErrorContains(t, err, "topic non_existing_url_group not found") 86 | } 87 | 88 | func TestUrlGroup_Empty(t *testing.T) { 89 | client := NewClientWithEnv() 90 | 91 | _, err := client.UrlGroups().Publish(PublishUrlGroupOptions{ 92 | Body: "test-body", 93 | }) 94 | assert.ErrorContains(t, err, "a non-empty destination must be provided") 95 | } 96 | 97 | func TestEnqueueToUrlGroup(t *testing.T) { 98 | client := NewClientWithEnv() 99 | 100 | name := "go_url_group" 101 | err := client.UrlGroups().Delete(name) 102 | assert.NoError(t, err) 103 | 104 | err = client.UrlGroups().UpsertEndpoints(name, []Endpoint{ 105 | {Url: "https://example.com"}, 106 | {Url: "https://example.net"}, 107 | }) 108 | assert.NoError(t, err) 109 | 110 | res, err := client.UrlGroups().EnqueueJSON(EnqueueUrlGroupJSONOptions{ 111 | Queue: "test-queue", 112 | UrlGroup: name, 113 | Body: map[string]any{"test": "body"}, 114 | Headers: map[string]string{ 115 | "test-header": "test-value", 116 | }, 117 | }) 118 | assert.NoError(t, err) 119 | 120 | assert.Len(t, res, 2) 121 | assert.NoError(t, err) 122 | assert.Len(t, res, 2) 123 | assert.NotEmpty(t, res[0].MessageId) 124 | assert.NotEmpty(t, res[1].MessageId) 125 | } 126 | -------------------------------------------------------------------------------- /dlq.go: -------------------------------------------------------------------------------- 1 | package qstash 2 | 3 | import ( 4 | "encoding/json" 5 | "fmt" 6 | "net/http" 7 | "time" 8 | ) 9 | 10 | // Dlq (Dead Letter Queue) is a specialized queue used to store messages that cannot be processed successfully by the API. 11 | // When the API fails to process a request due to reasons like bugs in the code, temporary issues with third-party services, or network problems, QStash will retry processing the message a few times. 12 | // If the retries are unsuccessful, the message is then moved to the Dlq. 13 | // This allows for these problematic messages to be handled manually, ensuring they don't get lost or cause further issues in the system. 14 | type Dlq struct { 15 | client *Client 16 | } 17 | 18 | type DlqMessage struct { 19 | Message 20 | // DlqId is the unique id within the Dlq. 21 | DlqId string `json:"dlqId"` 22 | // ResponseStatus is the HTTP status code of the last failed delivery attempt. 23 | ResponseStatus int `json:"responseStatus,omitempty"` 24 | // ResponseHeaders is the response headers of the last failed delivery attempt. 25 | ResponseHeaders http.Header `json:"responseHeader,omitempty"` 26 | // ResponseBody is the response body of the last failed delivery attempt if it is composed of UTF-8 characters only, empty otherwise. 27 | ResponseBody string `json:"responseBody,omitempty"` 28 | // ResponseBodyBase64 is the base64 encoded response body of the last failed delivery attempt if the response body contains non-UTF-8 characters, empty otherwise. 29 | ResponseBodyBase64 string `json:"responseBodyBase64,omitempty"` 30 | } 31 | 32 | type DlqFilter struct { 33 | // MessageId filters Dlq entries by the ID of the message. 34 | MessageId string 35 | // Url filters Dlq entries by the URL of the message. 36 | Url string 37 | // UrlGroup filters Dlq entries by URL group of the message. 38 | UrlGroup string 39 | // ScheduleId filters Dlq entries by schedule ID. 40 | ScheduleId string 41 | // Queue filters Dlq entries by queue name. 42 | Queue string 43 | // Api filters Dlq entries by the API name of the message. 44 | Api string 45 | // FromDate filters Dlq entries by starting time in milliseconds. 46 | FromDate time.Time 47 | // ToDate filters Dlq entries by ending time in milliseconds. 48 | ToDate time.Time 49 | // ResponseStatus filters Dlq entries by HTTP response status code of the message. 50 | ResponseStatus int 51 | // CallerIP filters Dlq entries by IP address of the publisher of the message. 52 | CallerIP string 53 | } 54 | 55 | type listDlqResponse struct { 56 | Cursor string `json:"cursor,omitempty"` 57 | Messages []DlqMessage `json:"messages"` 58 | } 59 | 60 | // Get retrieves a message from the DLQ by its unique ID. 61 | func (d *Dlq) Get(dlqId string) (dlqMessage DlqMessage, err error) { 62 | opts := requestOptions{ 63 | method: http.MethodGet, 64 | path: fmt.Sprintf("/v2/dlq/%s", dlqId), 65 | } 66 | response, _, err := d.client.fetchWith(opts) 67 | if err != nil { 68 | return 69 | } 70 | dlqMessage, err = parse[DlqMessage](response) 71 | return 72 | } 73 | 74 | // List retrieves all messages currently in the Dlq. 75 | func (d *Dlq) List(options ListDlqOptions) (messages []DlqMessage, cursor string, err error) { 76 | opts := requestOptions{ 77 | method: http.MethodGet, 78 | path: "/v2/dlq", 79 | params: options.params(), 80 | } 81 | response, _, err := d.client.fetchWith(opts) 82 | if err != nil { 83 | return 84 | } 85 | result, err := parse[listDlqResponse](response) 86 | if err != nil { 87 | return 88 | } 89 | return result.Messages, result.Cursor, nil 90 | } 91 | 92 | // Delete deletes a message from the Dlq by its unique ID. 93 | func (d *Dlq) Delete(dlqId string) error { 94 | opts := requestOptions{ 95 | method: http.MethodDelete, 96 | path: fmt.Sprintf("/v2/dlq/%s", dlqId), 97 | } 98 | _, _, err := d.client.fetchWith(opts) 99 | return err 100 | } 101 | 102 | // DeleteMany deletes multiple messages from the Dlq and returns the number of deleted messages. 103 | func (d *Dlq) DeleteMany(dlqIds []string) (int, error) { 104 | payload, err := json.Marshal(map[string][]string{"dlqIds": dlqIds}) 105 | if err != nil { 106 | return 0, err 107 | } 108 | opts := requestOptions{ 109 | method: http.MethodDelete, 110 | path: "/v2/dlq", 111 | body: string(payload), 112 | header: map[string][]string{"Content-Type": {"application/json"}}, 113 | } 114 | response, _, err := d.client.fetchWith(opts) 115 | if err != nil { 116 | return 0, err 117 | } 118 | deleted, err := parse[map[string]int](response) 119 | if err != nil { 120 | return 0, err 121 | } 122 | return deleted["deleted"], nil 123 | } 124 | -------------------------------------------------------------------------------- /schedule.go: -------------------------------------------------------------------------------- 1 | package qstash 2 | 3 | import ( 4 | "encoding/json" 5 | "fmt" 6 | "net/http" 7 | ) 8 | 9 | // Schedules in QStash allow you to publish messages at specified intervals instead of just once. 10 | // You can create schedules using cron expressions. 11 | // These expressions define the timing of message delivery, evaluated in the UTC timezone. 12 | type Schedules struct { 13 | client *Client 14 | } 15 | 16 | type Schedule struct { 17 | // Id is the unique id of the schedule. 18 | Id string `json:"scheduleId"` 19 | // CreatedAt is the creation time of the schedule, in unix milliseconds. 20 | CreatedAt int64 `json:"createdAt"` 21 | // Cron is the cron expression used to schedule the messages. 22 | Cron string `json:"cron"` 23 | // Destination is the destination url or url group. 24 | Destination string `json:"destination"` 25 | // Method is the HTTP method to use for the message. 26 | Method string `json:"method"` 27 | // Header is the headers of the message. 28 | Header map[string][]string `json:"header,omitempty"` 29 | // Body is the body of the scheduled message if it is composed of UTF-8 characters only, empty otherwise. 30 | Body string `json:"body,omitempty"` 31 | // BodyBase64 is he base64 encoded body if the scheduled message body contains non-UTF-8 characters, empty otherwise. 32 | BodyBase64 string `json:"bodyBase64,omitempty"` 33 | // Retries is the number of retries that should be attempted in case of delivery failure. 34 | Retries int32 `json:"retries"` 35 | // Delay is the delay in seconds before the message is delivered. 36 | Delay int32 `json:"delay,omitempty"` 37 | // Callback is the url which is called each time the message is attempted to be delivered. 38 | Callback string `json:"callback,omitempty"` 39 | // FailureCallback is the url which is called after the message is failed 40 | FailureCallback string `json:"failureCallback,omitempty"` 41 | // LastScheduleTime is the timestamp in unix milli of last scheduled message 42 | LastScheduleTime int64 `json:"lastScheduleTime,omitempty"` 43 | // LastScheduleTime is the timestamp in unix milli of the next scheduled message 44 | NextScheduleTime int64 `json:"nextScheduleTime,omitempty"` 45 | // LastScheduleStates is the message id state pair for last schedule. 46 | LastScheduleStates map[string]string `json:"lastScheduleStates,omitempty"` 47 | // CallerIP is IP address of the creator of this schedule. 48 | CallerIP string `json:"callerIP,omitempty"` 49 | // IsPaused indicates whether the schedule is paused or not. 50 | IsPaused bool `json:"isPaused,omitempty"` 51 | } 52 | 53 | type scheduleResponse struct { 54 | ScheduleId string `json:"scheduleId"` 55 | } 56 | 57 | // Create creates a schedule to send messages periodically and returns the ID of created schedule. 58 | func (s *Schedules) Create(schedule ScheduleOptions) (string, error) { 59 | opts := requestOptions{ 60 | method: http.MethodPost, 61 | path: fmt.Sprintf("/v2/Schedules/%s", schedule.Destination), 62 | header: schedule.headers(), 63 | body: schedule.Body, 64 | } 65 | response, _, err := s.client.fetchWith(opts) 66 | if err != nil { 67 | return "", err 68 | } 69 | result, err := parse[scheduleResponse](response) 70 | if err != nil { 71 | return "", err 72 | } 73 | return result.ScheduleId, err 74 | } 75 | 76 | // CreateJSON creates a schedule to send messages periodically, 77 | // automatically serializing the body as JSON string, and setting content type to `application/json`. 78 | func (s *Schedules) CreateJSON(schedule ScheduleJSONOptions) (scheduleId string, err error) { 79 | payload, err := json.Marshal(schedule.Body) 80 | if err != nil { 81 | return 82 | } 83 | opts := requestOptions{ 84 | method: http.MethodPost, 85 | path: fmt.Sprintf("/v2/schedules/%s", schedule.Destination), 86 | header: schedule.headers(), 87 | body: string(payload), 88 | } 89 | response, _, err := s.client.fetchWith(opts) 90 | if err != nil { 91 | return 92 | } 93 | result, err := parse[scheduleResponse](response) 94 | if err != nil { 95 | return 96 | } 97 | return result.ScheduleId, err 98 | } 99 | 100 | // Get retrieves the schedule by its id. 101 | func (s *Schedules) Get(scheduleId string) (schedule Schedule, err error) { 102 | opts := requestOptions{ 103 | method: http.MethodGet, 104 | path: fmt.Sprintf("/v2/Schedules/%s", scheduleId), 105 | } 106 | response, _, err := s.client.fetchWith(opts) 107 | if err != nil { 108 | return 109 | } 110 | schedule, err = parse[Schedule](response) 111 | return 112 | } 113 | 114 | // List retrieves all the schedules. 115 | func (s *Schedules) List() (schedules []Schedule, err error) { 116 | opts := requestOptions{ 117 | method: http.MethodGet, 118 | path: "/v2/schedules", 119 | } 120 | response, _, err := s.client.fetchWith(opts) 121 | if err != nil { 122 | return 123 | } 124 | schedules, err = parse[[]Schedule](response) 125 | return 126 | } 127 | 128 | // Pause pauses the schedule. 129 | // A paused schedule will not produce new messages until it is resumed. 130 | func (s *Schedules) Pause(scheduleId string) (err error) { 131 | opts := requestOptions{ 132 | method: http.MethodPatch, 133 | path: fmt.Sprintf("/v2/schedules/%s/pause", scheduleId), 134 | } 135 | _, _, err = s.client.fetchWith(opts) 136 | return 137 | } 138 | 139 | // Resume resumes the schedule. 140 | func (s *Schedules) Resume(scheduleId string) (err error) { 141 | opts := requestOptions{ 142 | method: http.MethodPatch, 143 | path: fmt.Sprintf("/v2/schedules/%s/resume", scheduleId), 144 | } 145 | _, _, err = s.client.fetchWith(opts) 146 | return 147 | } 148 | 149 | // Delete deletes the schedule. 150 | func (s *Schedules) Delete(scheduleId string) (err error) { 151 | opts := requestOptions{ 152 | method: http.MethodDelete, 153 | path: fmt.Sprintf("/v2/schedules/%s", scheduleId), 154 | } 155 | _, _, err = s.client.fetchWith(opts) 156 | return 157 | } 158 | -------------------------------------------------------------------------------- /url_group.go: -------------------------------------------------------------------------------- 1 | package qstash 2 | 3 | import ( 4 | "encoding/json" 5 | "fmt" 6 | "net/http" 7 | ) 8 | 9 | // UrlGroups in QStash are namespaces where you can publish messages that are then sent to multiple endpoints. 10 | // After creating an url group, you can define multiple endpoints, each represented by a publicly available URL. 11 | // When a message is published to an url group, it is distributed to all subscribed endpoints. 12 | type UrlGroups struct { 13 | client *Client 14 | } 15 | 16 | type Endpoint struct { 17 | // Url is the target address of the endpoint. 18 | Url string `json:"url"` 19 | // Name is the optional name of the endpoint. 20 | Name string `json:"name,omitempty"` 21 | } 22 | 23 | type UrlGroup struct { 24 | // Name is the name of the url group. 25 | Name string `json:"name"` 26 | // CreatedAt is the creation time of the url group, in unix milliseconds. 27 | CreatedAt int64 `json:"createdAt"` 28 | // UpdatedAt is the last update time of the url group, in unix milliseconds. 29 | UpdatedAt int64 `json:"updatedAt"` 30 | // Endpoints is the list of endpoints belong to url group. 31 | Endpoints []Endpoint `json:"endpoints"` 32 | } 33 | 34 | // Publish publishes a message to QStash. 35 | func (u *UrlGroups) Publish(po PublishUrlGroupOptions) (result []PublishOrEnqueueResponse, err error) { 36 | opts := requestOptions{ 37 | method: http.MethodPost, 38 | path: fmt.Sprintf("/v2/publish/%s", po.UrlGroup), 39 | header: po.headers(), 40 | body: po.Body, 41 | } 42 | response, _, err := u.client.fetchWith(opts) 43 | if err != nil { 44 | return 45 | } 46 | result, err = parse[[]PublishOrEnqueueResponse](response) 47 | return 48 | } 49 | 50 | // PublishJSON publishes a message to QStash, automatically serializing the body as JSON string, 51 | // and setting content type to `application/json`. 52 | func (u *UrlGroups) PublishJSON(message PublishUrlGroupJSONOptions) (result []PublishOrEnqueueResponse, err error) { 53 | payload, err := json.Marshal(message.Body) 54 | if err != nil { 55 | return 56 | } 57 | opts := requestOptions{ 58 | method: http.MethodPost, 59 | path: fmt.Sprintf("/v2/publish/%s", message.UrlGroup), 60 | header: message.headers(), 61 | body: string(payload), 62 | } 63 | response, _, err := u.client.fetchWith(opts) 64 | if err != nil { 65 | return 66 | } 67 | result, err = parse[[]PublishOrEnqueueResponse](response) 68 | return 69 | } 70 | 71 | // Enqueue enqueues a message, after creating the queue if it does not exist. 72 | func (u *UrlGroups) Enqueue(options EnqueueUrlGroupOptions) (result []PublishOrEnqueueResponse, err error) { 73 | opts := requestOptions{ 74 | method: http.MethodPost, 75 | header: options.headers(), 76 | path: fmt.Sprintf("/v2/enqueue/%s/%s", options.Queue, options.UrlGroup), 77 | body: options.Body, 78 | } 79 | response, _, err := u.client.fetchWith(opts) 80 | if err != nil { 81 | return 82 | } 83 | result, err = parse[[]PublishOrEnqueueResponse](response) 84 | return 85 | } 86 | 87 | // EnqueueJSON enqueues a message, after creating the queue if it does not exist. 88 | // It automatically serializes the body as JSON string, and setting content type to `application/json`. 89 | func (u *UrlGroups) EnqueueJSON(message EnqueueUrlGroupJSONOptions) (result []PublishOrEnqueueResponse, err error) { 90 | payload, err := json.Marshal(message.Body) 91 | if err != nil { 92 | return 93 | } 94 | opts := requestOptions{ 95 | method: http.MethodPost, 96 | path: fmt.Sprintf("/v2/enqueue/%s/%s", message.Queue, message.UrlGroup), 97 | body: string(payload), 98 | header: message.headers(), 99 | } 100 | response, _, err := u.client.fetchWith(opts) 101 | if err != nil { 102 | return 103 | } 104 | result, err = parse[[]PublishOrEnqueueResponse](response) 105 | return 106 | } 107 | 108 | // UpsertEndpoints adds or updates one or more endpoints to an url group. 109 | // If the url group or the endpoint does not exist, it will be created. 110 | // If the endpoint exists, it will be updated. 111 | func (u *UrlGroups) UpsertEndpoints(urlGroup string, endpoints []Endpoint) (err error) { 112 | for _, endpoint := range endpoints { 113 | if endpoint.Url == "" { 114 | err = fmt.Errorf("`url` of the endpoint must be provided") 115 | return 116 | } 117 | } 118 | payload, err := json.Marshal(map[string][]Endpoint{ 119 | "endpoints": endpoints, 120 | }) 121 | if err != nil { 122 | return 123 | } 124 | opts := requestOptions{ 125 | method: http.MethodPost, 126 | path: fmt.Sprintf("/v2/topics/%s/endpoints", urlGroup), 127 | body: string(payload), 128 | header: contentTypeJson, 129 | } 130 | _, _, err = u.client.fetchWith(opts) 131 | return 132 | } 133 | 134 | // RemoveEndpoints removes one or more endpoints from an url group. 135 | // If all endpoints have been removed, the url group will be deleted. 136 | func (u *UrlGroups) RemoveEndpoints(urlGroup string, endpoints []Endpoint) (err error) { 137 | for _, endpoint := range endpoints { 138 | if endpoint.Url == "" && endpoint.Name == "" { 139 | err = fmt.Errorf("one of `url` or `name` of the endpoint must be provided") 140 | return 141 | } 142 | } 143 | payload, err := json.Marshal(map[string][]Endpoint{ 144 | "endpoints": endpoints, 145 | }) 146 | if err != nil { 147 | return 148 | } 149 | opts := requestOptions{ 150 | method: http.MethodDelete, 151 | path: fmt.Sprintf("/v2/topics/%s/endpoints", urlGroup), 152 | body: string(payload), 153 | header: contentTypeJson, 154 | } 155 | _, _, err = u.client.fetchWith(opts) 156 | return 157 | } 158 | 159 | // Get retrieves the url group by its name. 160 | func (u *UrlGroups) Get(urlGroup string) (result UrlGroup, err error) { 161 | opts := requestOptions{ 162 | method: http.MethodGet, 163 | path: fmt.Sprintf("/v2/topics/%s", urlGroup), 164 | } 165 | response, _, err := u.client.fetchWith(opts) 166 | if err != nil { 167 | return 168 | } 169 | result, err = parse[UrlGroup](response) 170 | if err != nil { 171 | return 172 | } 173 | return 174 | } 175 | 176 | // List retrieves all the url groups. 177 | func (u *UrlGroups) List() (result []UrlGroup, err error) { 178 | opts := requestOptions{ 179 | method: http.MethodGet, 180 | path: "/v2/topics", 181 | } 182 | response, _, err := u.client.fetchWith(opts) 183 | if err != nil { 184 | return 185 | } 186 | result, err = parse[[]UrlGroup](response) 187 | if err != nil { 188 | return 189 | } 190 | return 191 | } 192 | 193 | // Delete deletes the url group and all its endpoints. 194 | func (u *UrlGroups) Delete(urlGroup string) (err error) { 195 | opts := requestOptions{ 196 | method: http.MethodDelete, 197 | path: fmt.Sprintf("/v2/topics/%s", urlGroup), 198 | } 199 | _, _, err = u.client.fetchWith(opts) 200 | return 201 | } 202 | -------------------------------------------------------------------------------- /client.go: -------------------------------------------------------------------------------- 1 | package qstash 2 | 3 | import ( 4 | "encoding/json" 5 | "errors" 6 | "fmt" 7 | "io" 8 | "net/http" 9 | "net/url" 10 | "os" 11 | "strings" 12 | ) 13 | 14 | const ( 15 | tokenEnvProperty = "QSTASH_TOKEN" 16 | urlEnvProperty = "QSTASH_URL" 17 | currentSigningKeyEnvProperty = "QSTASH_CURRENT_SIGNING_KEY" 18 | nextSigningKeyEnvProperty = "QSTASH_NEXT_SIGNING_KEY" 19 | 20 | upstashMethodHeader = "Upstash-Method" 21 | upstashRetriesHeader = "Upstash-Retries" 22 | upstashCallbackHeader = "Upstash-Callback" 23 | upstashFailureCallbackHeader = "Upstash-Failure-Callback" 24 | upstashForwardHeader = "Upstash-Forward" 25 | upstashCronHeader = "Upstash-Cron" 26 | upstashDelayHeader = "Upstash-Delay" 27 | upstashTimeoutHeader = "Upstash-Timeout" 28 | upstashDeduplicationId = "Upstash-Deduplication-Id" 29 | upstashNotBefore = "Upstash-Not-Before" 30 | upstashContentBasedDeduplication = "Upstash-Content-Based-Deduplication" 31 | ) 32 | 33 | var ( 34 | contentTypeJson = http.Header{"Content-Type": []string{"application/json"}} 35 | ) 36 | 37 | type Options struct { 38 | // Url is the base address of QStash, it's set to https://qstash.upstash.io by default. 39 | Url string 40 | // Token is the authorization token from the Upstash console. 41 | Token string 42 | // Client is the HTTP client used for sending requests. 43 | Client *http.Client 44 | } 45 | 46 | func (o *Options) init() { 47 | if o.Url == "" { 48 | o.Url = "https://qstash.upstash.io" 49 | } 50 | if o.Client == nil { 51 | o.Client = http.DefaultClient 52 | } 53 | if o.Token == "" { 54 | panic("Missing QStash Token") 55 | } 56 | } 57 | 58 | // NewClient initializes a client instance with the given token and the default HTTP client. 59 | func NewClient(token string) *Client { 60 | return NewClientWith(Options{ 61 | Token: token, 62 | }) 63 | } 64 | 65 | // NewClientWithEnv initializes a client with the token from the QSTASH_TOKEN environment variable and the default HTTP client. 66 | func NewClientWithEnv() *Client { 67 | return NewClientWith(Options{ 68 | Token: os.Getenv(tokenEnvProperty), 69 | }) 70 | } 71 | 72 | // NewClientWith initializes a client with the given token and HTTP client. 73 | func NewClientWith(options Options) *Client { 74 | options.init() 75 | header := http.Header{} 76 | header.Set("Authorization", "Bearer "+options.Token) 77 | base := os.Getenv(urlEnvProperty) 78 | if base == "" { 79 | base = options.Url 80 | } 81 | index := &Client{ 82 | token: options.Token, 83 | client: options.Client, 84 | url: base, 85 | headers: header, 86 | } 87 | 88 | return index 89 | } 90 | 91 | type Client struct { 92 | token string 93 | client *http.Client 94 | url string 95 | headers http.Header 96 | } 97 | 98 | func (c *Client) Schedules() *Schedules { 99 | return &Schedules{client: c} 100 | } 101 | 102 | func (c *Client) Dlq() *Dlq { 103 | return &Dlq{client: c} 104 | } 105 | 106 | func (c *Client) Events() *Events { 107 | return &Events{client: c} 108 | } 109 | 110 | func (c *Client) UrlGroups() *UrlGroups { 111 | return &UrlGroups{client: c} 112 | } 113 | 114 | func (c *Client) Keys() *Keys { 115 | return &Keys{client: c} 116 | } 117 | 118 | func (c *Client) Messages() *Messages { 119 | return &Messages{client: c} 120 | } 121 | 122 | func (c *Client) Queues() *Queues { 123 | return &Queues{client: c} 124 | } 125 | 126 | type requestOptions struct { 127 | method string 128 | path string 129 | body string 130 | header http.Header 131 | params url.Values 132 | } 133 | 134 | func (c *Client) fetchWith(opts requestOptions) ([]byte, int, error) { 135 | request, err := http.NewRequest(opts.method, fmt.Sprintf("%s%s", c.url, opts.path), strings.NewReader(opts.body)) 136 | if err != nil { 137 | return nil, -1, err 138 | } 139 | if opts.params != nil { 140 | request.URL.RawQuery = opts.params.Encode() 141 | } 142 | hc := c.headers.Clone() 143 | for k, v := range opts.header { 144 | hc.Set(k, v[0]) 145 | } 146 | request.Header = hc 147 | res, err := c.client.Do(request) 148 | if err != nil { 149 | return nil, -1, err 150 | } 151 | response, err := io.ReadAll(res.Body) 152 | if err != nil { 153 | return nil, -1, err 154 | } 155 | if res.StatusCode >= http.StatusBadRequest { 156 | var rErr restError 157 | if err = json.Unmarshal(response, &rErr); err != nil { 158 | return response, res.StatusCode, err 159 | } 160 | return response, res.StatusCode, errors.New(rErr.Error) 161 | } 162 | return response, res.StatusCode, nil 163 | } 164 | 165 | type restError struct { 166 | Error string `json:"error"` 167 | } 168 | 169 | func parse[T any](data []byte) (t T, err error) { 170 | err = json.Unmarshal(data, &t) 171 | return 172 | } 173 | 174 | func getDestination(url string, urlGroup string, api string) (string, error) { 175 | destination := "" 176 | count := 0 177 | if url != "" { 178 | destination = url 179 | count++ 180 | } 181 | if urlGroup != "" { 182 | destination = urlGroup 183 | count++ 184 | } 185 | if api != "" { 186 | destination = fmt.Sprintf("api/%s", api) 187 | count++ 188 | } 189 | if count != 1 { 190 | return "", fmt.Errorf("multiple destinations found, configure only one of Url, UrlGroup or Api") 191 | } 192 | return destination, nil 193 | } 194 | 195 | func prepareHeaders( 196 | contentType string, 197 | method string, 198 | headers map[string]string, 199 | retries *int, 200 | callback string, 201 | failureCallback string, 202 | delay string, 203 | notBefore string, 204 | deduplicationId string, 205 | contentBasedDeduplication bool, 206 | timeout string, 207 | cron string, 208 | ) http.Header { 209 | header := http.Header{} 210 | if contentType != "" { 211 | header.Set("Content-Type", contentType) 212 | } 213 | if method != "" { 214 | header.Set(upstashMethodHeader, method) 215 | } 216 | for k, v := range headers { 217 | if !strings.HasPrefix(strings.ToLower(k), "upstash-forward-") { 218 | k = fmt.Sprintf("%s-%s", upstashForwardHeader, k) 219 | } 220 | header.Set(k, v) 221 | } 222 | if retries != nil { 223 | header.Set(upstashRetriesHeader, fmt.Sprintf("%d", *retries)) 224 | } 225 | if callback != "" { 226 | header.Set(upstashCallbackHeader, callback) 227 | } 228 | if failureCallback != "" { 229 | header.Set(upstashFailureCallbackHeader, failureCallback) 230 | } 231 | if delay != "" { 232 | header.Set(upstashDelayHeader, delay) 233 | } 234 | if notBefore != "" { 235 | header.Set(upstashNotBefore, notBefore) 236 | } 237 | if deduplicationId != "" { 238 | header.Set(upstashDeduplicationId, deduplicationId) 239 | } 240 | if contentBasedDeduplication { 241 | header.Set(upstashContentBasedDeduplication, "true") 242 | } 243 | if timeout != "" { 244 | header.Set(upstashTimeoutHeader, timeout) 245 | } 246 | if cron != "" { 247 | header.Set(upstashCronHeader, cron) 248 | } 249 | return header 250 | } 251 | -------------------------------------------------------------------------------- /messages.go: -------------------------------------------------------------------------------- 1 | package qstash 2 | 3 | import ( 4 | "encoding/json" 5 | "fmt" 6 | "net/http" 7 | ) 8 | 9 | type Messages struct { 10 | client *Client 11 | } 12 | 13 | type Message struct { 14 | // MessageId is the unique identifier of the message. 15 | MessageId string `json:"messageId"` 16 | // Endpoint is the endpoint name of the message if the endpoint is given a name within the url group. 17 | Endpoint string `json:"endpointName,omitempty"` 18 | // Url is the address to which the message should be delivered. 19 | Url string `json:"url,omitempty"` 20 | // UrlGroup is the url group name if this message was sent to an url group, empty otherwise. 21 | UrlGroup string `json:"urlGroup,omitempty"` 22 | // Method is the HTTP method to use for the message. 23 | Method string `json:"method"` 24 | // Header is the HTTP headers sent the endpoint. 25 | Header http.Header `json:"header"` 26 | // Body is the body of the message if it is composed of UTF-8 characters only, empty otherwise. 27 | Body string `json:"body,omitempty"` 28 | // BodyBase64 is the base64 encoded body if the body contains non-UTF-8 characters, empty otherwise. 29 | BodyBase64 string `json:"bodyBase64,omitempty"` 30 | // MaxRetries is the number of retries that should be attempted in case of delivery failure. 31 | MaxRetries int32 `json:"maxRetries"` 32 | // NotBefore is the unix timestamp in milliseconds before which the message should not be delivered. 33 | NotBefore int64 `json:"notBefore"` 34 | // CreatedAt is the unix timestamp in milliseconds when the message was created. 35 | CreatedAt int64 `json:"createdAt"` 36 | // Callback is the url which is called each time the message is attempted to be delivered. 37 | Callback string `json:"callback,omitempty"` 38 | // FailureCallback is the url which is called after the message is failed. 39 | FailureCallback string `json:"failureCallback,omitempty"` 40 | // ScheduleId is the id of scheduled job of the message if the message is triggered by a schedule. 41 | ScheduleId string `json:"scheduleId,omitempty"` 42 | // CallerIP is IP address of the publisher of this message. 43 | CallerIP string `json:"callerIP,omitempty"` 44 | // Queue is the queue name if this message was enqueued to a queue. 45 | Queue string `json:"queueName,omitempty"` 46 | // Api is the api name if this message was sent to an api. 47 | Api string `json:"api,omitempty"` 48 | } 49 | 50 | type PublishOrEnqueueResponse struct { 51 | // MessageId is the unique identifier of new message. 52 | MessageId string `json:"messageId"` 53 | // Deduplicated indicates whether the message is a duplicate that was not sent to the destination. 54 | Deduplicated bool `json:"deduplicated,omitempty"` 55 | // Url is the target address of the message if it was sent to a URL group, empty otherwise. 56 | Url string `json:"url,omitempty"` 57 | } 58 | 59 | type batchResponse struct { 60 | responses [][]PublishOrEnqueueResponse 61 | } 62 | 63 | func (b *batchResponse) UnmarshalJSON(data []byte) (err error) { 64 | switch { 65 | case len(data) == 0 || string(data) == `null`: 66 | return nil 67 | case data[0] == '[': 68 | var t []interface{} 69 | if err = json.Unmarshal(data, &t); err != nil { 70 | return 71 | } 72 | for _, v := range t { 73 | if _, ok := v.([]interface{}); ok { 74 | var result []PublishOrEnqueueResponse 75 | response, jErr := json.Marshal(v) 76 | if jErr != nil { 77 | return 78 | } 79 | if err = json.Unmarshal(response, &result); err != nil { 80 | return 81 | } 82 | b.responses = append(b.responses, result) 83 | } else { 84 | var result PublishOrEnqueueResponse 85 | response, jErr := json.Marshal(v) 86 | if jErr != nil { 87 | return 88 | } 89 | if err = json.Unmarshal(response, &result); err != nil { 90 | return 91 | } 92 | b.responses = append(b.responses, []PublishOrEnqueueResponse{result}) 93 | } 94 | } 95 | default: 96 | return fmt.Errorf("unsupported json type") 97 | } 98 | return nil 99 | } 100 | 101 | // Publish publishes a message to QStash. 102 | func (c *Client) Publish(options PublishOptions) (result PublishOrEnqueueResponse, err error) { 103 | destination, err := getDestination(options.Url, "", options.Api) 104 | if err != nil { 105 | return 106 | } 107 | opts := requestOptions{ 108 | method: http.MethodPost, 109 | path: fmt.Sprintf("/v2/publish/%s", destination), 110 | header: options.headers(), 111 | body: options.Body, 112 | } 113 | response, _, err := c.fetchWith(opts) 114 | if err != nil { 115 | return 116 | } 117 | result, err = parse[PublishOrEnqueueResponse](response) 118 | return 119 | } 120 | 121 | // PublishJSON publishes a message to QStash, automatically serializing the body as JSON string, 122 | // and setting content type to `application/json`. 123 | func (c *Client) PublishJSON(options PublishJSONOptions) (result PublishOrEnqueueResponse, err error) { 124 | destination, err := getDestination(options.Url, "", options.Api) 125 | if err != nil { 126 | return 127 | } 128 | payload, err := json.Marshal(options.Body) 129 | if err != nil { 130 | return 131 | } 132 | opts := requestOptions{ 133 | method: http.MethodPost, 134 | path: fmt.Sprintf("/v2/publish/%s", destination), 135 | header: options.headers(), 136 | body: string(payload), 137 | } 138 | response, _, err := c.fetchWith(opts) 139 | if err != nil { 140 | return 141 | } 142 | result, err = parse[PublishOrEnqueueResponse](response) 143 | return 144 | } 145 | 146 | // Enqueue enqueues a message, after creating the queue if it does not exist. 147 | func (c *Client) Enqueue(options EnqueueOptions) (result PublishOrEnqueueResponse, err error) { 148 | destination, err := getDestination(options.Url, "", options.Api) 149 | if err != nil { 150 | return 151 | } 152 | opts := requestOptions{ 153 | method: http.MethodPost, 154 | header: options.headers(), 155 | path: fmt.Sprintf("/v2/enqueue/%s/%s", options.Queue, destination), 156 | body: options.Body, 157 | } 158 | response, _, err := c.fetchWith(opts) 159 | if err != nil { 160 | return 161 | } 162 | result, err = parse[PublishOrEnqueueResponse](response) 163 | return 164 | } 165 | 166 | // EnqueueJSON enqueues a message, after creating the queue if it does not exist. 167 | // It automatically serializes the body as JSON string, and setting content type to `application/json`. 168 | func (c *Client) EnqueueJSON(options EnqueueJSONOptions) (result PublishOrEnqueueResponse, err error) { 169 | destination, err := getDestination(options.Url, "", options.Api) 170 | if err != nil { 171 | return 172 | } 173 | payload, err := json.Marshal(options.Body) 174 | if err != nil { 175 | return 176 | } 177 | opts := requestOptions{ 178 | method: http.MethodPost, 179 | path: fmt.Sprintf("/v2/enqueue/%s/%s", options.Queue, destination), 180 | body: string(payload), 181 | header: options.headers(), 182 | } 183 | response, _, err := c.fetchWith(opts) 184 | if err != nil { 185 | return 186 | } 187 | result, err = parse[PublishOrEnqueueResponse](response) 188 | return 189 | } 190 | 191 | // Batch publishes or enqueues multiple messages in a single request. 192 | func (c *Client) Batch(options []BatchOptions) (results [][]PublishOrEnqueueResponse, err error) { 193 | messages := make([]map[string]interface{}, len(options)) 194 | for idx, option := range options { 195 | destination, err := getDestination(option.Url, option.UrlGroup, option.Api) 196 | if err != nil { 197 | return nil, err 198 | } 199 | messages[idx] = map[string]interface{}{ 200 | "destination": destination, 201 | "headers": option.headers(), 202 | "body": option.Body, 203 | "queue": option.Queue, 204 | } 205 | } 206 | payload, err := json.Marshal(messages) 207 | if err != nil { 208 | return 209 | } 210 | opts := requestOptions{ 211 | method: http.MethodPost, 212 | path: "/v2/batch", 213 | body: string(payload), 214 | header: map[string][]string{"Content-Type": {"application/json"}}, 215 | } 216 | response, _, err := c.fetchWith(opts) 217 | if err != nil { 218 | return 219 | } 220 | result, err := parse[batchResponse](response) 221 | if err != nil { 222 | return nil, err 223 | } 224 | return result.responses, err 225 | } 226 | 227 | // BatchJSON publishes or enqueues multiple messages in a single request, 228 | // automatically serializing the message bodies as JSON strings, and setting content type to `application/json`. 229 | func (c *Client) BatchJSON(options []BatchJSONOptions) (results [][]PublishOrEnqueueResponse, err error) { 230 | messages := make([]map[string]interface{}, len(options)) 231 | 232 | for idx, option := range options { 233 | destination, err := getDestination(option.Url, option.UrlGroup, option.Api) 234 | if err != nil { 235 | return nil, err 236 | } 237 | body, err := json.Marshal(option.Body) 238 | if err != nil { 239 | return nil, err 240 | } 241 | messages[idx] = map[string]interface{}{ 242 | "destination": destination, 243 | "headers": option.headers(), 244 | "body": string(body), 245 | "queue": option.Queue, 246 | } 247 | } 248 | payload, err := json.Marshal(messages) 249 | if err != nil { 250 | return 251 | } 252 | opts := requestOptions{ 253 | method: http.MethodPost, 254 | path: "/v2/batch", 255 | body: string(payload), 256 | header: contentTypeJson, 257 | } 258 | response, _, err := c.fetchWith(opts) 259 | if err != nil { 260 | return 261 | } 262 | result, err := parse[batchResponse](response) 263 | if err != nil { 264 | return nil, err 265 | } 266 | return result.responses, err 267 | } 268 | 269 | // Get gets the message by its id. 270 | func (m *Messages) Get(messageId string) (message Message, err error) { 271 | opts := requestOptions{ 272 | method: http.MethodGet, 273 | path: fmt.Sprintf("/v2/messages/%s", messageId), 274 | } 275 | response, _, err := m.client.fetchWith(opts) 276 | if err != nil { 277 | return Message{}, err 278 | } 279 | message, err = parse[Message](response) 280 | return 281 | } 282 | 283 | // Cancel cancels delivery of an existing message. 284 | // 285 | // Cancelling a message will remove it from QStash and stop it from being 286 | // delivered in the future. If a message is in flight to your API, 287 | // it might be too late to cancel. 288 | func (m *Messages) Cancel(messageId string) error { 289 | opts := requestOptions{ 290 | method: http.MethodDelete, 291 | path: fmt.Sprintf("/v2/messages/%s", messageId), 292 | } 293 | _, _, err := m.client.fetchWith(opts) 294 | return err 295 | } 296 | 297 | // CancelMany cancels delivery of given messages. 298 | func (m *Messages) CancelMany(messageIds []string) (int, error) { 299 | payload, err := json.Marshal(map[string][]string{"messageIds": messageIds}) 300 | if err != nil { 301 | return 0, err 302 | } 303 | opts := requestOptions{ 304 | method: http.MethodDelete, 305 | path: "/v2/messages", 306 | body: string(payload), 307 | header: contentTypeJson, 308 | } 309 | response, _, err := m.client.fetchWith(opts) 310 | if err != nil { 311 | return 0, err 312 | } 313 | deleted, err := parse[map[string]int](response) 314 | if err != nil { 315 | return 0, err 316 | } 317 | return deleted["cancelled"], nil 318 | } 319 | 320 | // CancelAll cancels delivery of all existing messages. 321 | func (m *Messages) CancelAll() (int, error) { 322 | opts := requestOptions{ 323 | method: http.MethodDelete, 324 | path: "/v2/messages", 325 | header: contentTypeJson, 326 | } 327 | response, _, err := m.client.fetchWith(opts) 328 | if err != nil { 329 | return 0, err 330 | } 331 | deleted, err := parse[map[string]int](response) 332 | if err != nil { 333 | return 0, err 334 | } 335 | return deleted["cancelled"], nil 336 | } 337 | -------------------------------------------------------------------------------- /messages_test.go: -------------------------------------------------------------------------------- 1 | package qstash 2 | 3 | import ( 4 | "fmt" 5 | "github.com/stretchr/testify/assert" 6 | "net/http" 7 | "testing" 8 | "time" 9 | ) 10 | 11 | func TestPublishToUrl(t *testing.T) { 12 | client := NewClientWithEnv() 13 | 14 | res, err := client.Publish(PublishOptions{ 15 | Body: "test-body", 16 | Url: "http://example.com", 17 | Headers: map[string]string{ 18 | "test-header": "test-value", 19 | }, 20 | Retries: RetryCount(0), 21 | }) 22 | assert.NoError(t, err) 23 | assert.NotEmpty(t, res.MessageId) 24 | 25 | AssertDeliveredEventually(t, client, res.MessageId) 26 | } 27 | 28 | func TestPublishToUrlWithDelay(t *testing.T) { 29 | client := NewClientWithEnv() 30 | 31 | res, err := client.Publish(PublishOptions{ 32 | Body: "test-body", 33 | Url: "http://example.com", 34 | Headers: map[string]string{ 35 | "test-header": "test-value", 36 | }, 37 | Retries: RetryCount(0), 38 | Delay: "10s", 39 | }) 40 | assert.NoError(t, err) 41 | assert.NotEmpty(t, res.MessageId) 42 | 43 | message, err := client.Messages().Get(res.MessageId) 44 | assert.NoError(t, err) 45 | assert.Equal(t, "test-body", message.Body) 46 | assert.Equal(t, "http://example.com", message.Url) 47 | assert.Equal(t, http.Header{"Test-Header": []string{"test-value"}}, message.Header) 48 | 49 | } 50 | 51 | func TestPublishToJson(t *testing.T) { 52 | client := NewClientWithEnv() 53 | 54 | res, err := client.PublishJSON(PublishJSONOptions{ 55 | Body: map[string]any{ 56 | "ex_key": "ex_value", 57 | }, 58 | Url: "https://example.com", 59 | Headers: map[string]string{ 60 | "test-header": "test-value", 61 | }, 62 | }) 63 | assert.NoError(t, err) 64 | assert.NotEmpty(t, res.MessageId) 65 | 66 | AssertDeliveredEventually(t, client, res.MessageId) 67 | } 68 | 69 | func TestDisallowMultipleDestinations(t *testing.T) { 70 | client := NewClientWithEnv() 71 | 72 | _, err := client.Publish(PublishOptions{ 73 | Url: "https://example.com", 74 | Api: "llm", 75 | }) 76 | assert.ErrorContains(t, err, "multiple destinations found") 77 | } 78 | 79 | func TestBatch(t *testing.T) { 80 | client := NewClientWithEnv() 81 | 82 | N := 3 83 | messages := make([]BatchOptions, N) 84 | 85 | for i := 0; i < N; i++ { 86 | messages[i] = BatchOptions{ 87 | Body: fmt.Sprintf("hi %d", i), 88 | Url: "https://example.com", 89 | Retries: RetryCount(0), 90 | Headers: map[string]string{ 91 | fmt.Sprintf("test-header-%d", i): fmt.Sprintf("test-value-%d", i), 92 | "Content-Type": "text/plain", 93 | }, 94 | } 95 | } 96 | 97 | responses, err := client.Batch(messages) 98 | assert.NoError(t, err) 99 | assert.Len(t, responses, N) 100 | 101 | for _, response := range responses { 102 | assert.NotEmpty(t, response) 103 | for _, r := range response { 104 | assert.NotEmpty(t, r.MessageId) 105 | } 106 | 107 | } 108 | } 109 | 110 | func TestBatchMixed(t *testing.T) { 111 | client := NewClientWithEnv() 112 | 113 | name := "go_url_group" 114 | 115 | err := client.UrlGroups().Delete(name) 116 | assert.NoError(t, err) 117 | 118 | err = client.UrlGroups().UpsertEndpoints(name, []Endpoint{ 119 | {Url: "https://example.com", Name: "First endpoint"}, 120 | {Url: "https://example.net", Name: "Second endpoint"}, 121 | }) 122 | assert.NoError(t, err) 123 | 124 | urlGroup, err := client.UrlGroups().Get(name) 125 | assert.NoError(t, err) 126 | assert.Equal(t, urlGroup.Name, name) 127 | assert.Len(t, urlGroup.Endpoints, 2) 128 | 129 | N := 3 130 | messages := make([]BatchOptions, N) 131 | 132 | for i := 0; i < N; i++ { 133 | if i%2 == 0 { 134 | messages[i] = BatchOptions{ 135 | UrlGroup: name, 136 | Body: fmt.Sprintf("hi %d", i), 137 | Retries: RetryCount(0), 138 | Headers: map[string]string{ 139 | fmt.Sprintf("test-header-%d", i): fmt.Sprintf("test-value-%d", i), 140 | "Content-Type": "text/plain", 141 | }, 142 | } 143 | } else { 144 | messages[i] = BatchOptions{ 145 | Body: fmt.Sprintf("hi %d", i), 146 | Url: "https://example.com", 147 | Retries: RetryCount(0), 148 | Headers: map[string]string{ 149 | fmt.Sprintf("test-header-%d", i): fmt.Sprintf("test-value-%d", i), 150 | "Content-Type": "text/plain", 151 | }, 152 | } 153 | } 154 | 155 | } 156 | 157 | responses, err := client.Batch(messages) 158 | assert.NoError(t, err) 159 | assert.Len(t, responses, N) 160 | 161 | for _, response := range responses { 162 | for _, r := range response { 163 | assert.NotEmpty(t, r.MessageId) 164 | } 165 | } 166 | 167 | err = client.UrlGroups().Delete(name) 168 | assert.NoError(t, err) 169 | } 170 | 171 | func TestBatchJSON(t *testing.T) { 172 | client := NewClientWithEnv() 173 | 174 | N := 3 175 | messages := make([]BatchJSONOptions, N) 176 | 177 | for i := 0; i < N; i++ { 178 | messages[i] = BatchJSONOptions{ 179 | Body: map[string]any{"hi": i}, 180 | Url: "https://example.com", 181 | Retries: RetryCount(0), 182 | Headers: map[string]string{ 183 | fmt.Sprintf("test-header-%d", i): fmt.Sprintf("test-value-%d", i), 184 | }, 185 | } 186 | } 187 | 188 | responses, err := client.BatchJSON(messages) 189 | assert.NoError(t, err) 190 | assert.Len(t, responses, N) 191 | 192 | for _, response := range responses { 193 | for _, r := range response { 194 | assert.NotEmpty(t, r.MessageId) 195 | } 196 | } 197 | } 198 | 199 | func TestBatchJSONMixed(t *testing.T) { 200 | client := NewClientWithEnv() 201 | 202 | name := "go_url_group" 203 | 204 | err := client.UrlGroups().Delete(name) 205 | assert.NoError(t, err) 206 | 207 | err = client.UrlGroups().UpsertEndpoints(name, []Endpoint{ 208 | {Url: "https://example.com", Name: "First endpoint"}, 209 | {Url: "https://example.net", Name: "Second endpoint"}, 210 | }) 211 | assert.NoError(t, err) 212 | 213 | urlGroup, err := client.UrlGroups().Get(name) 214 | assert.NoError(t, err) 215 | assert.Equal(t, urlGroup.Name, name) 216 | assert.Len(t, urlGroup.Endpoints, 2) 217 | 218 | N := 3 219 | messages := make([]BatchJSONOptions, N) 220 | 221 | for i := 0; i < N; i++ { 222 | if i%2 == 0 { 223 | messages[i] = BatchJSONOptions{ 224 | UrlGroup: name, 225 | Body: map[string]any{"hi": i}, 226 | Retries: RetryCount(0), 227 | Headers: map[string]string{ 228 | fmt.Sprintf("test-header-%d", i): fmt.Sprintf("test-value-%d", i), 229 | "Content-Type": "text/plain", 230 | }, 231 | } 232 | } else { 233 | messages[i] = BatchJSONOptions{ 234 | Body: map[string]any{"hi": i}, 235 | Url: "https://example.com", 236 | Retries: RetryCount(0), 237 | Headers: map[string]string{ 238 | fmt.Sprintf("test-header-%d", i): fmt.Sprintf("test-value-%d", i), 239 | "Content-Type": "text/plain", 240 | }, 241 | } 242 | } 243 | 244 | } 245 | 246 | responses, err := client.BatchJSON(messages) 247 | assert.NoError(t, err) 248 | assert.Len(t, responses, N) 249 | 250 | for _, response := range responses { 251 | for _, r := range response { 252 | assert.NotEmpty(t, r.MessageId) 253 | } 254 | } 255 | 256 | err = client.UrlGroups().Delete(name) 257 | assert.NoError(t, err) 258 | } 259 | 260 | func TestPublishToLlmApi(t *testing.T) { 261 | client := NewClientWithEnv() 262 | 263 | res, err := client.PublishJSON(PublishJSONOptions{ 264 | Api: "llm", 265 | Body: map[string]any{ 266 | "model": "meta-llama/Meta-Llama-3-8B-Instruct", 267 | "messages": []map[string]string{ 268 | { 269 | "role": "user", 270 | "content": "hello", 271 | }, 272 | }, 273 | }, 274 | Callback: "http://example.com", 275 | }) 276 | assert.NoError(t, err) 277 | assert.NotEmpty(t, res.MessageId) 278 | 279 | AssertDeliveredEventually(t, client, res.MessageId) 280 | } 281 | 282 | func TestBatchLlmApi(t *testing.T) { 283 | client := NewClientWithEnv() 284 | 285 | messages, err := client.BatchJSON([]BatchJSONOptions{ 286 | { 287 | Api: "llm", 288 | Body: map[string]any{ 289 | "model": "meta-llama/Meta-Llama-3-8B-Instruct", 290 | "messages": []map[string]string{ 291 | { 292 | "role": "user", 293 | "content": "hello", 294 | }, 295 | }, 296 | }, 297 | Callback: "http://example.com", 298 | }, 299 | }) 300 | assert.NoError(t, err) 301 | assert.Len(t, messages, 1) 302 | assert.Len(t, messages[0], 1) 303 | assert.NotEmpty(t, messages[0][0].MessageId) 304 | 305 | AssertDeliveredEventually(t, client, messages[0][0].MessageId) 306 | } 307 | 308 | func TestEnqueue(t *testing.T) { 309 | client := NewClientWithEnv() 310 | 311 | res, err := client.Enqueue(EnqueueOptions{ 312 | Queue: "test-queue", 313 | Body: "test-body", 314 | Url: "https://example.com", 315 | Headers: map[string]string{ 316 | "test-header": "test-value", 317 | }, 318 | }) 319 | assert.NoError(t, err) 320 | assert.NotEmpty(t, res.MessageId) 321 | } 322 | 323 | func TestEnqueueJSON(t *testing.T) { 324 | client := NewClientWithEnv() 325 | 326 | res, err := client.EnqueueJSON(EnqueueJSONOptions{ 327 | Queue: "test-queue", 328 | Body: map[string]any{"test": "body"}, 329 | Url: "https://example.com", 330 | Headers: map[string]string{ 331 | "test-header": "test-value", 332 | }, 333 | }) 334 | assert.NoError(t, err) 335 | assert.NotEmpty(t, res.MessageId) 336 | } 337 | 338 | func TestEnqueueLlmApi(t *testing.T) { 339 | client := NewClientWithEnv() 340 | 341 | res, err := client.EnqueueJSON(EnqueueJSONOptions{ 342 | Queue: "test-queue", 343 | Api: "llm", 344 | Body: map[string]any{ 345 | "model": "meta-llama/Meta-Llama-3-8B-Instruct", 346 | "messages": []map[string]string{ 347 | { 348 | "role": "user", 349 | "content": "hello", 350 | }, 351 | }, 352 | }, 353 | Callback: "http://example.com", 354 | }) 355 | 356 | assert.NoError(t, err) 357 | assert.NotEmpty(t, res.MessageId) 358 | } 359 | 360 | func TestTimeout(t *testing.T) { 361 | client := NewClientWithEnv() 362 | 363 | res, err := client.PublishJSON(PublishJSONOptions{ 364 | Body: map[string]any{"ex_key": "ex_value"}, 365 | Url: "https://example.com", 366 | Timeout: "1s", 367 | }) 368 | assert.NoError(t, err) 369 | assert.NotEmpty(t, res.MessageId) 370 | 371 | AssertDeliveredEventually(t, client, res.MessageId) 372 | } 373 | 374 | func TestCancelMany(t *testing.T) { 375 | client := NewClientWithEnv() 376 | 377 | messageIds := []string{} 378 | 379 | for i := 0; i < 10; i++ { 380 | res, err := client.PublishJSON(PublishJSONOptions{ 381 | Body: map[string]any{"ex_key": "ex_value"}, 382 | Url: "https://example.com", 383 | Delay: "60s", 384 | }) 385 | assert.NoError(t, err) 386 | if i%2 == 0 { 387 | assert.NotEmpty(t, res.MessageId) 388 | messageIds = append(messageIds, res.MessageId) 389 | } 390 | } 391 | deleted, err := client.Messages().CancelMany(messageIds) 392 | assert.NoError(t, err) 393 | assert.Equal(t, 5, deleted) 394 | } 395 | 396 | func TestCancelAll(t *testing.T) { 397 | client := NewClientWithEnv() 398 | 399 | for i := 0; i < 10; i++ { 400 | res, err := client.PublishJSON(PublishJSONOptions{ 401 | Body: map[string]any{"ex_key": "ex_value"}, 402 | Url: "http://httpstat.us/400", 403 | Delay: "30s", 404 | Retries: RetryCount(0), 405 | }) 406 | assert.NoError(t, err) 407 | assert.NotEmpty(t, res.MessageId) 408 | } 409 | 410 | time.Sleep(1 * time.Second) 411 | deleted, err := client.Messages().CancelAll() 412 | assert.NoError(t, err) 413 | assert.Greater(t, deleted, 0) 414 | } 415 | 416 | func AssertDeliveredEventually(t *testing.T, client *Client, messageId string) { 417 | assert.Eventually(t, func() bool { 418 | subT := &testing.T{} 419 | 420 | events, _, err := client.Events().List(ListEventsOptions{ 421 | Filter: EventFilter{ 422 | MessageId: messageId, 423 | State: Delivered, 424 | }, 425 | }) 426 | 427 | assert.NoError(subT, err) 428 | assert.Len(subT, events, 1) 429 | 430 | return !subT.Failed() 431 | }, time.Second*30, 100*time.Millisecond) 432 | } 433 | -------------------------------------------------------------------------------- /options.go: -------------------------------------------------------------------------------- 1 | package qstash 2 | 3 | import ( 4 | "fmt" 5 | "net/http" 6 | "net/url" 7 | "strconv" 8 | "strings" 9 | ) 10 | 11 | func RetryCount(val int) *int { 12 | return &val 13 | } 14 | 15 | type PublishOptions struct { 16 | Url string 17 | Api string 18 | Body string 19 | Method string 20 | ContentType string 21 | Headers map[string]string 22 | Retries *int 23 | Callback string 24 | FailureCallback string 25 | Delay string 26 | NotBefore string 27 | DeduplicationId string 28 | ContentBasedDeduplication bool 29 | Timeout string 30 | } 31 | 32 | func (m PublishOptions) headers() http.Header { 33 | return prepareHeaders( 34 | m.ContentType, 35 | m.Method, 36 | m.Headers, 37 | m.Retries, 38 | m.Callback, 39 | m.FailureCallback, 40 | m.Delay, 41 | m.NotBefore, 42 | m.DeduplicationId, 43 | m.ContentBasedDeduplication, 44 | m.Timeout, 45 | "", 46 | ) 47 | } 48 | 49 | type PublishUrlGroupOptions struct { 50 | UrlGroup string 51 | Body string 52 | Method string 53 | ContentType string 54 | Headers map[string]string 55 | Retries *int 56 | Callback string 57 | FailureCallback string 58 | Delay string 59 | NotBefore string 60 | DeduplicationId string 61 | ContentBasedDeduplication bool 62 | Timeout string 63 | } 64 | 65 | func (m PublishUrlGroupOptions) headers() http.Header { 66 | return prepareHeaders( 67 | m.ContentType, 68 | m.Method, 69 | m.Headers, 70 | m.Retries, 71 | m.Callback, 72 | m.FailureCallback, 73 | m.Delay, 74 | m.NotBefore, 75 | m.DeduplicationId, 76 | m.ContentBasedDeduplication, 77 | m.Timeout, 78 | "", 79 | ) 80 | } 81 | 82 | type PublishJSONOptions struct { 83 | Url string 84 | Api string 85 | Body map[string]any 86 | Method string 87 | Headers map[string]string 88 | Retries *int 89 | Callback string 90 | FailureCallback string 91 | Delay string 92 | NotBefore string 93 | DeduplicationId string 94 | ContentBasedDeduplication bool 95 | Timeout string 96 | } 97 | 98 | func (m PublishJSONOptions) headers() http.Header { 99 | return prepareHeaders( 100 | "application/json", 101 | m.Method, 102 | m.Headers, 103 | m.Retries, 104 | m.Callback, 105 | m.FailureCallback, 106 | m.Delay, 107 | m.NotBefore, 108 | m.DeduplicationId, 109 | m.ContentBasedDeduplication, 110 | m.Timeout, 111 | "", 112 | ) 113 | } 114 | 115 | type PublishUrlGroupJSONOptions struct { 116 | UrlGroup string 117 | Body map[string]any 118 | Method string 119 | Headers map[string]string 120 | Retries *int 121 | Callback string 122 | FailureCallback string 123 | Delay string 124 | NotBefore string 125 | DeduplicationId string 126 | ContentBasedDeduplication bool 127 | Timeout string 128 | } 129 | 130 | func (m PublishUrlGroupJSONOptions) headers() http.Header { 131 | return prepareHeaders( 132 | "application/json", 133 | m.Method, 134 | m.Headers, 135 | m.Retries, 136 | m.Callback, 137 | m.FailureCallback, 138 | m.Delay, 139 | m.NotBefore, 140 | m.DeduplicationId, 141 | m.ContentBasedDeduplication, 142 | m.Timeout, 143 | "", 144 | ) 145 | } 146 | 147 | type EnqueueOptions struct { 148 | Queue string 149 | Url string 150 | Api string 151 | Body string 152 | Method string 153 | ContentType string 154 | Headers map[string]string 155 | Retries *int 156 | Callback string 157 | FailureCallback string 158 | Delay string 159 | NotBefore string 160 | DeduplicationId string 161 | ContentBasedDeduplication bool 162 | Timeout string 163 | } 164 | 165 | func (m *EnqueueOptions) headers() http.Header { 166 | return prepareHeaders( 167 | m.ContentType, 168 | m.Method, 169 | m.Headers, 170 | m.Retries, 171 | m.Callback, 172 | m.FailureCallback, 173 | m.Delay, 174 | m.NotBefore, 175 | m.DeduplicationId, 176 | m.ContentBasedDeduplication, 177 | m.Timeout, 178 | "", 179 | ) 180 | } 181 | 182 | type EnqueueUrlGroupOptions struct { 183 | Queue string 184 | UrlGroup string 185 | Body string 186 | Method string 187 | ContentType string 188 | Headers map[string]string 189 | Retries *int 190 | Callback string 191 | FailureCallback string 192 | Delay string 193 | NotBefore string 194 | DeduplicationId string 195 | ContentBasedDeduplication bool 196 | Timeout string 197 | } 198 | 199 | func (m *EnqueueUrlGroupOptions) headers() http.Header { 200 | return prepareHeaders( 201 | m.ContentType, 202 | m.Method, 203 | m.Headers, 204 | m.Retries, 205 | m.Callback, 206 | m.FailureCallback, 207 | m.Delay, 208 | m.NotBefore, 209 | m.DeduplicationId, 210 | m.ContentBasedDeduplication, 211 | m.Timeout, 212 | "", 213 | ) 214 | } 215 | 216 | type EnqueueJSONOptions struct { 217 | Queue string 218 | Url string 219 | Api string 220 | Body map[string]any 221 | Method string 222 | Headers map[string]string 223 | Retries *int 224 | Callback string 225 | FailureCallback string 226 | Delay string 227 | NotBefore string 228 | DeduplicationId string 229 | ContentBasedDeduplication bool 230 | Timeout string 231 | } 232 | 233 | func (m *EnqueueJSONOptions) headers() http.Header { 234 | return prepareHeaders( 235 | "application/json", 236 | m.Method, 237 | m.Headers, 238 | m.Retries, 239 | m.Callback, 240 | m.FailureCallback, 241 | m.Delay, 242 | m.NotBefore, 243 | m.DeduplicationId, 244 | m.ContentBasedDeduplication, 245 | m.Timeout, 246 | "", 247 | ) 248 | } 249 | 250 | type EnqueueUrlGroupJSONOptions struct { 251 | Queue string 252 | UrlGroup string 253 | Body map[string]any 254 | Method string 255 | Headers map[string]string 256 | Retries *int 257 | Callback string 258 | FailureCallback string 259 | Delay string 260 | NotBefore string 261 | DeduplicationId string 262 | ContentBasedDeduplication bool 263 | Timeout string 264 | } 265 | 266 | func (m *EnqueueUrlGroupJSONOptions) headers() http.Header { 267 | return prepareHeaders( 268 | "application/json", 269 | m.Method, 270 | m.Headers, 271 | m.Retries, 272 | m.Callback, 273 | m.FailureCallback, 274 | m.Delay, 275 | m.NotBefore, 276 | m.DeduplicationId, 277 | m.ContentBasedDeduplication, 278 | m.Timeout, 279 | "", 280 | ) 281 | } 282 | 283 | type ScheduleOptions struct { 284 | Cron string 285 | ContentType string 286 | Body string 287 | Destination string 288 | Method string 289 | Headers map[string]string 290 | Retries *int 291 | Callback string 292 | FailureCallback string 293 | Delay string 294 | Timeout string 295 | } 296 | 297 | func (m *ScheduleOptions) headers() http.Header { 298 | return prepareHeaders( 299 | m.ContentType, 300 | m.Method, 301 | m.Headers, 302 | m.Retries, 303 | m.Callback, 304 | m.FailureCallback, 305 | m.Delay, 306 | "", 307 | "", 308 | false, 309 | m.Timeout, 310 | m.Cron, 311 | ) 312 | } 313 | 314 | type ScheduleJSONOptions struct { 315 | Cron string 316 | Body map[string]any 317 | Destination string 318 | Method string 319 | Headers map[string]string 320 | Retries *int 321 | Callback string 322 | FailureCallback string 323 | Delay string 324 | Timeout string 325 | } 326 | 327 | func (m *ScheduleJSONOptions) headers() http.Header { 328 | return prepareHeaders( 329 | "application/json", 330 | m.Method, 331 | m.Headers, 332 | m.Retries, 333 | m.Callback, 334 | m.FailureCallback, 335 | m.Delay, 336 | "", 337 | "", 338 | false, 339 | m.Timeout, 340 | m.Cron, 341 | ) 342 | } 343 | 344 | type BatchOptions struct { 345 | Queue string 346 | Url string 347 | UrlGroup string 348 | Api string 349 | Body string 350 | Method string 351 | ContentType string 352 | Headers map[string]string 353 | Retries *int 354 | Callback string 355 | FailureCallback string 356 | Delay string 357 | NotBefore string 358 | DeduplicationId string 359 | ContentBasedDeduplication bool 360 | Timeout string 361 | } 362 | 363 | func (m *BatchOptions) headers() map[string]string { 364 | header := make(map[string]string) 365 | if m.ContentType != "" { 366 | header["Content-Type"] = m.ContentType 367 | } 368 | if m.Method != "" { 369 | header[upstashMethodHeader] = m.Method 370 | } 371 | for k, v := range m.Headers { 372 | if !strings.HasPrefix(strings.ToLower(k), "upstash-forward-") { 373 | k = fmt.Sprintf("%s-%s", upstashForwardHeader, k) 374 | } 375 | header[k] = v 376 | } 377 | if m.Retries != nil { 378 | header[upstashRetriesHeader] = fmt.Sprintf("%d", *m.Retries) 379 | } 380 | if m.Callback != "" { 381 | header[upstashCallbackHeader] = m.Callback 382 | } 383 | if m.FailureCallback != "" { 384 | header[upstashFailureCallbackHeader] = m.FailureCallback 385 | } 386 | if m.Delay != "" { 387 | header[upstashDelayHeader] = m.Delay 388 | } 389 | if m.NotBefore != "" { 390 | header[upstashNotBefore] = m.NotBefore 391 | } 392 | if m.DeduplicationId != "" { 393 | header[upstashDeduplicationId] = m.DeduplicationId 394 | } 395 | if m.ContentBasedDeduplication { 396 | header[upstashContentBasedDeduplication] = "true" 397 | } 398 | if m.Timeout != "" { 399 | header[upstashTimeoutHeader] = m.Timeout 400 | } 401 | return header 402 | } 403 | 404 | type BatchJSONOptions struct { 405 | Queue string 406 | Url string 407 | UrlGroup string 408 | Api string 409 | Body map[string]any 410 | Method string 411 | Headers map[string]string 412 | Retries *int 413 | Callback string 414 | FailureCallback string 415 | Delay string 416 | NotBefore string 417 | DeduplicationId string 418 | ContentBasedDeduplication bool 419 | Timeout string 420 | } 421 | 422 | func (m *BatchJSONOptions) headers() map[string]string { 423 | header := make(map[string]string) 424 | header["Content-Type"] = "application/json" 425 | if m.Method != "" { 426 | header[upstashMethodHeader] = m.Method 427 | } 428 | for k, v := range m.Headers { 429 | if !strings.HasPrefix(strings.ToLower(k), "upstash-forward-") { 430 | k = fmt.Sprintf("%s-%s", upstashForwardHeader, k) 431 | } 432 | header[k] = v 433 | } 434 | if m.Retries != nil { 435 | header[upstashRetriesHeader] = fmt.Sprintf("%d", *m.Retries) 436 | } 437 | if m.Callback != "" { 438 | header[upstashCallbackHeader] = m.Callback 439 | } 440 | if m.FailureCallback != "" { 441 | header[upstashFailureCallbackHeader] = m.FailureCallback 442 | } 443 | if m.Delay != "" { 444 | header[upstashDelayHeader] = m.Delay 445 | } 446 | if m.NotBefore != "" { 447 | header[upstashNotBefore] = m.NotBefore 448 | } 449 | if m.DeduplicationId != "" { 450 | header[upstashDeduplicationId] = m.DeduplicationId 451 | } 452 | if m.ContentBasedDeduplication { 453 | header[upstashContentBasedDeduplication] = "true" 454 | } 455 | if m.Timeout != "" { 456 | header[upstashTimeoutHeader] = m.Timeout 457 | } 458 | return header 459 | } 460 | 461 | type ListDlqOptions struct { 462 | // Cursor is the starting point for listing Dlq entries. 463 | Cursor string 464 | // Count is the maximum number of Dlq entries to return, default/maximum is 100. 465 | Count int 466 | // Filter is the filter to apply. 467 | Filter DlqFilter 468 | } 469 | 470 | func (l *ListDlqOptions) params() url.Values { 471 | params := url.Values{} 472 | if l.Cursor != "" { 473 | params.Set("cursor", l.Cursor) 474 | } 475 | if l.Count > 0 { 476 | params.Set("count", strconv.Itoa(l.Count)) 477 | } 478 | if l.Filter.MessageId != "" { 479 | params.Set("messageId", l.Filter.MessageId) 480 | } 481 | if l.Filter.Url != "" { 482 | params.Set("url", l.Filter.Url) 483 | } 484 | if l.Filter.UrlGroup != "" { 485 | params.Set("topicName", l.Filter.UrlGroup) 486 | } 487 | if l.Filter.ScheduleId != "" { 488 | params.Set("scheduleId", l.Filter.ScheduleId) 489 | } 490 | if l.Filter.Queue != "" { 491 | params.Set("queueName", l.Filter.Queue) 492 | } 493 | if l.Filter.Api != "" { 494 | params.Set("api", l.Filter.Api) 495 | } 496 | if !l.Filter.FromDate.IsZero() { 497 | params.Set("fromDate", strconv.FormatInt(l.Filter.FromDate.UnixMilli(), 10)) 498 | } 499 | if !l.Filter.ToDate.IsZero() { 500 | params.Set("toDate", strconv.FormatInt(l.Filter.ToDate.UnixMilli(), 10)) 501 | } 502 | if l.Filter.ResponseStatus != 0 { 503 | params.Set("responseStatus", strconv.Itoa(l.Filter.ResponseStatus)) 504 | } 505 | if l.Filter.CallerIP != "" { 506 | params.Set("callerIp", l.Filter.CallerIP) 507 | } 508 | return params 509 | } 510 | 511 | type ListEventsOptions struct { 512 | // Cursor is the starting point for listing events. 513 | Cursor string 514 | // Count is the maximum number of events to return. 515 | Count int 516 | // Filter is the filter to apply. 517 | Filter EventFilter 518 | } 519 | 520 | func (l *ListEventsOptions) Params() url.Values { 521 | params := url.Values{} 522 | if l.Cursor != "" { 523 | params.Set("cursor", l.Cursor) 524 | } 525 | if l.Count > 0 { 526 | params.Set("count", strconv.Itoa(l.Count)) 527 | } 528 | if l.Filter.MessageId != "" { 529 | params.Set("messageId", l.Filter.MessageId) 530 | } 531 | if l.Filter.State != "" { 532 | params.Set("state", string(l.Filter.State)) 533 | } 534 | if l.Filter.Url != "" { 535 | params.Set("url", l.Filter.Url) 536 | } 537 | if l.Filter.UrlGroup != "" { 538 | params.Set("topicName", l.Filter.UrlGroup) 539 | } 540 | if l.Filter.ScheduleId != "" { 541 | params.Set("scheduleId", l.Filter.ScheduleId) 542 | } 543 | if l.Filter.Queue != "" { 544 | params.Set("queueName", l.Filter.Queue) 545 | } 546 | if l.Filter.Api != "" { 547 | params.Set("api", l.Filter.Api) 548 | } 549 | if !l.Filter.FromDate.IsZero() { 550 | params.Set("fromDate", strconv.FormatInt(l.Filter.FromDate.UnixMilli(), 10)) 551 | } 552 | if !l.Filter.ToDate.IsZero() { 553 | params.Set("toDate", strconv.FormatInt(l.Filter.ToDate.UnixMilli(), 10)) 554 | } 555 | return params 556 | } 557 | --------------------------------------------------------------------------------