├── bun.lockb ├── .vscode ├── extensions.json └── settings.json ├── .husky ├── pre-push ├── pre-commit └── commit-message ├── src ├── index.ts ├── constants.ts ├── utils.test.ts ├── utils.ts ├── queue.ts └── queue.test.ts ├── examples └── nextjs14 │ ├── bun.lockb │ ├── app │ ├── favicon.ico │ ├── globals.css │ ├── api │ │ ├── send │ │ │ └── route.ts │ │ └── receive │ │ │ └── route.ts │ ├── layout.tsx │ └── page.tsx │ ├── next.config.mjs │ ├── postcss.config.js │ ├── .gitignore │ ├── tailwind.config.ts │ ├── public │ ├── vercel.svg │ └── next.svg │ ├── tsconfig.json │ ├── package.json │ └── README.md ├── tsup.config.js ├── tsconfig.json ├── .github └── workflows │ ├── tests.yaml │ └── release.yaml ├── biome.json ├── package.json ├── commitlint.config.js ├── .gitignore └── README.md /bun.lockb: -------------------------------------------------------------------------------- https://raw.githubusercontent.com/upstash/queue/master/bun.lockb -------------------------------------------------------------------------------- /.vscode/extensions.json: -------------------------------------------------------------------------------- 1 | { 2 | "recommendations": ["biomejs.biome"] 3 | } 4 | -------------------------------------------------------------------------------- /.husky/pre-push: -------------------------------------------------------------------------------- 1 | #!/usr/bin/env sh 2 | . "$(dirname -- "$0")/_/husky.sh" 3 | 4 | bun run build -------------------------------------------------------------------------------- /src/index.ts: -------------------------------------------------------------------------------- 1 | export { Queue } from "./queue"; 2 | export type { QueueConfig } from "./queue"; 3 | -------------------------------------------------------------------------------- /examples/nextjs14/bun.lockb: -------------------------------------------------------------------------------- https://raw.githubusercontent.com/upstash/queue/master/examples/nextjs14/bun.lockb -------------------------------------------------------------------------------- /.husky/pre-commit: -------------------------------------------------------------------------------- 1 | #!/usr/bin/env sh 2 | . "$(dirname -- "$0")/_/husky.sh" 3 | 4 | bun run fmt && bun run test -------------------------------------------------------------------------------- /.husky/commit-message: -------------------------------------------------------------------------------- 1 | #!/usr/bin/env sh 2 | . "$(dirname -- "$0")/_/husky.sh" 3 | 4 | bun --no -- commitlint --edit "" -------------------------------------------------------------------------------- /examples/nextjs14/app/favicon.ico: -------------------------------------------------------------------------------- https://raw.githubusercontent.com/upstash/queue/master/examples/nextjs14/app/favicon.ico -------------------------------------------------------------------------------- /examples/nextjs14/next.config.mjs: -------------------------------------------------------------------------------- 1 | /** @type {import('next').NextConfig} */ 2 | const nextConfig = {}; 3 | 4 | export default nextConfig; 5 | -------------------------------------------------------------------------------- /examples/nextjs14/postcss.config.js: -------------------------------------------------------------------------------- 1 | module.exports = { 2 | plugins: { 3 | tailwindcss: {}, 4 | autoprefixer: {}, 5 | }, 6 | }; 7 | -------------------------------------------------------------------------------- /tsup.config.js: -------------------------------------------------------------------------------- 1 | import { defineConfig } from "tsup"; 2 | 3 | export default defineConfig({ 4 | entry: ["src/index.ts"], 5 | format: ["cjs", "esm"], 6 | sourcemap: false, 7 | clean: true, 8 | bundle: true, 9 | dts: true, 10 | minify: true, 11 | treeshake: true, 12 | }); 13 | -------------------------------------------------------------------------------- /tsconfig.json: -------------------------------------------------------------------------------- 1 | { 2 | "compilerOptions": { 3 | "lib": ["ESNext"], 4 | "module": "esnext", 5 | "target": "esnext", 6 | "moduleResolution": "bundler", 7 | "moduleDetection": "force", 8 | "allowImportingTsExtensions": true, 9 | "noEmit": true, 10 | "strict": true, 11 | "downlevelIteration": true, 12 | "skipLibCheck": true, 13 | "allowSyntheticDefaultImports": true, 14 | "forceConsistentCasingInFileNames": true, 15 | "allowJs": true, 16 | "types": [ 17 | "bun-types" // add Bun global 18 | ] 19 | } 20 | } 21 | -------------------------------------------------------------------------------- /examples/nextjs14/.gitignore: -------------------------------------------------------------------------------- 1 | # See https://help.github.com/articles/ignoring-files/ for more about ignoring files. 2 | 3 | # dependencies 4 | /node_modules 5 | /.pnp 6 | .pnp.js 7 | .yarn/install-state.gz 8 | 9 | # testing 10 | /coverage 11 | 12 | # next.js 13 | /.next/ 14 | /out/ 15 | 16 | # production 17 | /build 18 | 19 | # misc 20 | .DS_Store 21 | *.pem 22 | 23 | # debug 24 | npm-debug.log* 25 | yarn-debug.log* 26 | yarn-error.log* 27 | 28 | # local env files 29 | .env*.local 30 | 31 | # vercel 32 | .vercel 33 | 34 | # typescript 35 | *.tsbuildinfo 36 | next-env.d.ts 37 | -------------------------------------------------------------------------------- /examples/nextjs14/app/globals.css: -------------------------------------------------------------------------------- 1 | @tailwind base; 2 | @tailwind components; 3 | @tailwind utilities; 4 | 5 | .message-enter { 6 | transform: translateX(-100%); 7 | opacity: 0; 8 | } 9 | .message-enter-active { 10 | transform: translateX(0); 11 | opacity: 1; 12 | transition: transform 500ms ease-in-out, opacity 500ms ease-in-out; 13 | } 14 | .message-exit { 15 | transform: translateX(0); 16 | opacity: 1; 17 | } 18 | .message-exit-active { 19 | transform: translateX(100%); 20 | opacity: 0; 21 | transition: transform 500ms ease-in-out, opacity 500ms ease-in-out; 22 | } 23 | -------------------------------------------------------------------------------- /examples/nextjs14/tailwind.config.ts: -------------------------------------------------------------------------------- 1 | import type { Config } from "tailwindcss"; 2 | 3 | const config: Config = { 4 | content: [ 5 | "./pages/**/*.{js,ts,jsx,tsx,mdx}", 6 | "./components/**/*.{js,ts,jsx,tsx,mdx}", 7 | "./app/**/*.{js,ts,jsx,tsx,mdx}", 8 | ], 9 | theme: { 10 | extend: { 11 | backgroundImage: { 12 | "gradient-radial": "radial-gradient(var(--tw-gradient-stops))", 13 | "gradient-conic": 14 | "conic-gradient(from 180deg at 50% 50%, var(--tw-gradient-stops))", 15 | }, 16 | }, 17 | }, 18 | plugins: [], 19 | }; 20 | export default config; 21 | -------------------------------------------------------------------------------- /.vscode/settings.json: -------------------------------------------------------------------------------- 1 | { 2 | "editor.formatOnSave": true, 3 | "git.autofetch": true, 4 | "editor.codeActionsOnSave": { 5 | "source.fixAll": "explicit" 6 | }, 7 | "[html]": { 8 | "editor.defaultFormatter": "biomejs.biome" 9 | }, 10 | "[json]": { 11 | "editor.defaultFormatter": "biomejs.biome" 12 | }, 13 | "[javascript]": { 14 | "editor.defaultFormatter": "biomejs.biome" 15 | }, 16 | "[jsonc]": { 17 | "editor.defaultFormatter": "biomejs.biome" 18 | }, 19 | "[typescript]": { 20 | "editor.defaultFormatter": "biomejs.biome" 21 | }, 22 | "editor.defaultFormatter": "biomejs.biome" 23 | } 24 | -------------------------------------------------------------------------------- /examples/nextjs14/public/vercel.svg: -------------------------------------------------------------------------------- 1 | -------------------------------------------------------------------------------- /examples/nextjs14/app/api/send/route.ts: -------------------------------------------------------------------------------- 1 | import { Queue } from "@upstash/queue"; 2 | import { Redis } from "@upstash/redis"; 3 | 4 | const redis = new Redis({ 5 | url: process.env.UPSTASH_REDIS_REST_URL as string, 6 | token: process.env.UPSTASH_REDIS_REST_TOKEN as string, 7 | }); 8 | 9 | export async function POST(req: Request) { 10 | const { queueName, message } = await req.json(); 11 | 12 | const queue = new Queue({ 13 | redis: redis, 14 | concurrencyLimit: 5, 15 | queueName: queueName, 16 | }); 17 | 18 | const messageId = await queue.sendMessage({ message: message }); 19 | 20 | return Response.json({ messageId }); 21 | } 22 | -------------------------------------------------------------------------------- /src/constants.ts: -------------------------------------------------------------------------------- 1 | export const DEFAULT_CONSUMER_GROUP_NAME = "Messages"; 2 | export const DEFAULT_CONSUMER_PREFIX = "Consumer"; 3 | export const DEFAULT_QUEUE_NAME = "Queue"; 4 | export const DEFAULT_CONCURRENCY_LIMIT = 0; 5 | export const DEFAULT_AUTO_VERIFY = true; 6 | export const DEFAULT_VISIBILITY_TIMEOUT_IN_MS = 1000 * 30; 7 | 8 | export const MAX_CONCURRENCY_LIMIT = 5; 9 | 10 | export const ERROR_MAP = { 11 | CONCURRENCY_LIMIT_EXCEEDED: `Cannot receive more than ${MAX_CONCURRENCY_LIMIT}`, 12 | CONCURRENCY_DEFAULT_LIMIT_EXCEEDED: `Cannot receive more than ${ 13 | DEFAULT_CONCURRENCY_LIMIT + 1 14 | }, due to concurrency limit option not being set`, 15 | } as const; 16 | -------------------------------------------------------------------------------- /examples/nextjs14/app/layout.tsx: -------------------------------------------------------------------------------- 1 | import type { Metadata } from "next"; 2 | import { Inter } from "next/font/google"; 3 | import "./globals.css"; 4 | 5 | const inter = Inter({ subsets: ["latin"] }); 6 | 7 | export const metadata: Metadata = { 8 | title: "Create Next App", 9 | description: "Generated by create next app", 10 | }; 11 | 12 | export default function RootLayout({ 13 | children, 14 | }: Readonly<{ 15 | children: React.ReactNode; 16 | }>) { 17 | return ( 18 | 19 | 20 |
{children}
21 | 22 | 23 | ); 24 | } 25 | -------------------------------------------------------------------------------- /examples/nextjs14/tsconfig.json: -------------------------------------------------------------------------------- 1 | { 2 | "compilerOptions": { 3 | "lib": ["dom", "dom.iterable", "esnext"], 4 | "allowJs": true, 5 | "skipLibCheck": true, 6 | "strict": true, 7 | "noEmit": true, 8 | "esModuleInterop": true, 9 | "module": "esnext", 10 | "moduleResolution": "bundler", 11 | "resolveJsonModule": true, 12 | "isolatedModules": true, 13 | "jsx": "preserve", 14 | "incremental": true, 15 | "plugins": [ 16 | { 17 | "name": "next" 18 | } 19 | ], 20 | "paths": { 21 | "@/*": ["./*"] 22 | } 23 | }, 24 | "include": ["next-env.d.ts", "**/*.ts", "**/*.tsx", ".next/types/**/*.ts"], 25 | "exclude": ["node_modules"] 26 | } 27 | -------------------------------------------------------------------------------- /examples/nextjs14/package.json: -------------------------------------------------------------------------------- 1 | { 2 | "name": "nextjs14", 3 | "version": "0.1.0", 4 | "private": true, 5 | "scripts": { 6 | "dev": "next dev", 7 | "build": "next build", 8 | "start": "next start", 9 | "lint": "next lint" 10 | }, 11 | "dependencies": { 12 | "@types/react-transition-group": "^4.4.10", 13 | "@upstash/queue": "^1.0.0", 14 | "next": "14.1.0", 15 | "react": "^18", 16 | "react-dom": "^18", 17 | "react-transition-group": "^4.4.5" 18 | }, 19 | "devDependencies": { 20 | "typescript": "^5", 21 | "@types/node": "^20", 22 | "@types/react": "^18", 23 | "@types/react-dom": "^18", 24 | "autoprefixer": "^10.0.1", 25 | "postcss": "^8", 26 | "tailwindcss": "^3.3.0" 27 | } 28 | } 29 | -------------------------------------------------------------------------------- /examples/nextjs14/app/api/receive/route.ts: -------------------------------------------------------------------------------- 1 | import { Queue } from "@upstash/queue"; 2 | import { Redis } from "@upstash/redis"; 3 | 4 | const redis = new Redis({ 5 | url: process.env.UPSTASH_REDIS_REST_URL as string, 6 | token: process.env.UPSTASH_REDIS_REST_TOKEN as string, 7 | }); 8 | 9 | type MessageBody = { 10 | message: string; 11 | }; 12 | export async function POST(req: Request) { 13 | const { queueName } = await req.json(); 14 | 15 | const queue = new Queue({ 16 | redis: redis, 17 | concurrencyLimit: 5, 18 | queueName: queueName, 19 | }); 20 | 21 | const receiveResponse = await queue.receiveMessage(); 22 | 23 | return Response.json({ 24 | id: receiveResponse?.streamId, 25 | message: receiveResponse?.body.message, 26 | }); 27 | } 28 | -------------------------------------------------------------------------------- /src/utils.test.ts: -------------------------------------------------------------------------------- 1 | import { describe, expect, test } from "bun:test"; 2 | import { parseXclaimAutoResponse, parseXreadGroupResponse } from "./utils"; 3 | 4 | describe("Response Parsers", () => { 5 | test("should parse XCLAIMAUTO response", () => { 6 | const input = ["0-0", [["1703754659687-0", ["messageBody", '{"hello":"world"}']]], []]; 7 | 8 | expect(parseXclaimAutoResponse(input)).toEqual({ 9 | streamId: "1703754659687-0", 10 | body: '{"hello":"world"}', 11 | }); 12 | }); 13 | 14 | test("should parse XREADGROUP response", () => { 15 | const input = [ 16 | ["UpstashMQ:1251e0e7", [["1703755686316-0", ["messageBody", '{"hello":"world"}']]]], 17 | ]; 18 | 19 | expect(parseXreadGroupResponse(input)).toEqual({ 20 | streamId: "1703755686316-0", 21 | body: '{"hello":"world"}', 22 | }); 23 | }); 24 | }); 25 | -------------------------------------------------------------------------------- /.github/workflows/tests.yaml: -------------------------------------------------------------------------------- 1 | name: Tests 2 | on: 3 | push: 4 | branches: 5 | - master 6 | pull_request: 7 | schedule: 8 | - cron: "0 0 * * *" # daily 9 | 10 | env: 11 | UPSTASH_REDIS_REST_URL: ${{ secrets.UPSTASH_REDIS_REST_URL }} 12 | UPSTASH_REDIS_REST_TOKEN: ${{ secrets.UPSTASH_REDIS_REST_TOKEN }} 13 | jobs: 14 | test: 15 | runs-on: ubuntu-latest 16 | concurrency: test 17 | 18 | name: Tests 19 | steps: 20 | - name: Setup repo 21 | uses: actions/checkout@v3 22 | 23 | - name: Setup Bun 24 | uses: oven-sh/setup-bun@v1 25 | with: 26 | bun-version: latest 27 | 28 | - name: Install Dependencies 29 | run: bun install 30 | 31 | - name: Run Lint 32 | run: bun run fmt 33 | 34 | - name: Run tests 35 | run: bun run test 36 | 37 | - name: Run Build 38 | run: bun run build 39 | -------------------------------------------------------------------------------- /biome.json: -------------------------------------------------------------------------------- 1 | { 2 | "$schema": "https://biomejs.dev/schemas/1.6.0/schema.json", 3 | 4 | "linter": { 5 | "enabled": true, 6 | "rules": { 7 | "recommended": true, 8 | "a11y": { 9 | "noSvgWithoutTitle": "off" 10 | }, 11 | "correctness": { 12 | "noUnusedVariables": "error" 13 | }, 14 | "security": { 15 | "noDangerouslySetInnerHtml": "off" 16 | }, 17 | "style": { 18 | "useBlockStatements": "error", 19 | "noNonNullAssertion": "off" 20 | }, 21 | "performance": { 22 | "noDelete": "off" 23 | }, 24 | "suspicious": { 25 | "noExplicitAny": "off" 26 | } 27 | }, 28 | "ignore": ["node_modules", "dist"] 29 | }, 30 | "formatter": { 31 | "indentStyle": "space", 32 | "indentWidth": 2, 33 | "enabled": true, 34 | "lineWidth": 100, 35 | "ignore": ["node_modules", "dist"] 36 | }, 37 | "organizeImports": { 38 | "enabled": true 39 | }, 40 | "javascript": { 41 | "formatter": { 42 | "trailingComma": "es5" 43 | } 44 | } 45 | } 46 | -------------------------------------------------------------------------------- /.github/workflows/release.yaml: -------------------------------------------------------------------------------- 1 | name: Release 2 | 3 | on: 4 | release: 5 | types: 6 | - published 7 | 8 | jobs: 9 | release: 10 | name: Release 11 | runs-on: ubuntu-latest 12 | steps: 13 | - name: Checkout Repo 14 | uses: actions/checkout@v3 15 | 16 | - name: Setup Bun 17 | uses: oven-sh/setup-bun@v1 18 | with: 19 | bun-version: latest 20 | 21 | - name: Set env 22 | run: echo "VERSION=${GITHUB_REF#refs/*/}" >> $GITHUB_ENV 23 | 24 | - name: Set package version 25 | run: echo $(jq --arg v "${{ env.VERSION }}" '(.version) = $v' package.json) > package.json 26 | 27 | - name: Install dependencies 28 | run: bun install 29 | 30 | - name: Build 31 | run: bun run build 32 | 33 | - name: Add npm token 34 | run: echo "//registry.npmjs.org/:_authToken=${{secrets.NPM_TOKEN}}" > .npmrc 35 | 36 | - name: Publish release candidate 37 | if: "github.event.release.prerelease" 38 | run: npm publish --access public --tag=canary --no-git-checks 39 | 40 | - name: Publish 41 | if: "!github.event.release.prerelease" 42 | run: npm publish --access public --no-git-checks 43 | -------------------------------------------------------------------------------- /package.json: -------------------------------------------------------------------------------- 1 | { 2 | "name": "@upstash/queue", 3 | "description": "A message queue based on Redis streams, without dependencies.", 4 | "module": "./dist/index.mjs", 5 | "main": "./dist/index.js", 6 | "types": "./dist/index.d.ts", 7 | "version": "1.0.0", 8 | "keywords": ["redis", "stream", "upstash", "message", "queue"], 9 | "author": "Oguzhan Olguncu ", 10 | "repository": { 11 | "type": "git", 12 | "url": "https://github.com/upstash/queue" 13 | }, 14 | "license": "MIT", 15 | "files": ["dist"], 16 | "bugs": { 17 | "url": "https://github.com/upstash/queue/issues" 18 | }, 19 | "scripts": { 20 | "test": "bun test src --coverage --bail --timeout 20000", 21 | "fmt": "bunx biome check --apply ./src", 22 | "build": "tsup", 23 | "prepare": "husky install | chmod ug+x .husky/*" 24 | }, 25 | "dependencies": { 26 | "@upstash/redis": "1.29.0", 27 | "uuid": "^9.0.1" 28 | }, 29 | "devDependencies": { 30 | "@types/uuid": "^9.0.8", 31 | "typescript": "latest", 32 | "@biomejs/biome": "1.6.0", 33 | "bun-types": "latest", 34 | "husky": "^9.0.11", 35 | "tsup": "latest", 36 | "@commitlint/cli": "^18.6.0", 37 | "@commitlint/config-conventional": "^18.6.0" 38 | } 39 | } 40 | -------------------------------------------------------------------------------- /examples/nextjs14/public/next.svg: -------------------------------------------------------------------------------- 1 | -------------------------------------------------------------------------------- /examples/nextjs14/README.md: -------------------------------------------------------------------------------- 1 | This is a [Next.js](https://nextjs.org/) project bootstrapped with [`create-next-app`](https://github.com/vercel/next.js/tree/canary/packages/create-next-app). 2 | 3 | ## Getting Started 4 | 5 | First, run the development server: 6 | 7 | ```bash 8 | npm run dev 9 | # or 10 | yarn dev 11 | # or 12 | pnpm dev 13 | # or 14 | bun dev 15 | ``` 16 | 17 | Open [http://localhost:3000](http://localhost:3000) with your browser to see the result. 18 | 19 | You can start editing the page by modifying `app/page.tsx`. The page auto-updates as you edit the file. 20 | 21 | This project uses [`next/font`](https://nextjs.org/docs/basic-features/font-optimization) to automatically optimize and load Inter, a custom Google Font. 22 | 23 | ## Learn More 24 | 25 | To learn more about Next.js, take a look at the following resources: 26 | 27 | - [Next.js Documentation](https://nextjs.org/docs) - Learn about Next.js features and API. 28 | - [Learn Next.js](https://nextjs.org/learn) - An interactive Next.js tutorial. 29 | 30 | You can check out [the Next.js GitHub repository](https://github.com/vercel/next.js/) - Your feedback and contributions are welcome! 31 | 32 | ## Deploy on Vercel 33 | 34 | The easiest way to deploy your Next.js app is to use the [Vercel Platform](https://vercel.com/new?utm_medium=default-template&filter=next.js&utm_source=create-next-app&utm_campaign=create-next-app-readme) from the creators of Next.js. 35 | 36 | Check out our [Next.js deployment documentation](https://nextjs.org/docs/deployment) for more details. -------------------------------------------------------------------------------- /commitlint.config.js: -------------------------------------------------------------------------------- 1 | // build: Changes that affect the build system or external dependencies (example scopes: gulp, broccoli, npm) 2 | // ci: Changes to our CI configuration files and scripts (example scopes: Travis, Circle, BrowserStack, SauceLabs) 3 | // docs: Documentation only changes 4 | // feat: A new feature 5 | // fix: A bug fix 6 | // perf: A code change that improves performance 7 | // refactor: A code change that neither fixes a bug nor adds a feature 8 | // style: Changes that do not affect the meaning of the code (white-space, formatting, missing semi-colons, etc) 9 | // test: Adding missing tests or correcting existing tests 10 | 11 | module.exports = { 12 | extends: ["@commitlint/config-conventional"], 13 | rules: { 14 | "body-leading-blank": [1, "always"], 15 | "body-max-line-length": [2, "always", 100], 16 | "footer-leading-blank": [1, "always"], 17 | "footer-max-line-length": [2, "always", 100], 18 | "header-max-length": [2, "always", 100], 19 | "scope-case": [2, "always", "lower-case"], 20 | "subject-case": [ 21 | 2, 22 | "never", 23 | ["sentence-case", "start-case", "pascal-case", "upper-case"], 24 | ], 25 | "subject-empty": [2, "never"], 26 | "subject-full-stop": [2, "never", "."], 27 | "type-case": [2, "always", "lower-case"], 28 | "type-empty": [2, "never"], 29 | "type-enum": [ 30 | 2, 31 | "always", 32 | [ 33 | "build", 34 | "chore", 35 | "ci", 36 | "docs", 37 | "feat", 38 | "fix", 39 | "perf", 40 | "refactor", 41 | "revert", 42 | "style", 43 | "test", 44 | "translation", 45 | "security", 46 | "changeset", 47 | ], 48 | ], 49 | }, 50 | }; 51 | -------------------------------------------------------------------------------- /.gitignore: -------------------------------------------------------------------------------- 1 | # Based on https://raw.githubusercontent.com/github/gitignore/main/Node.gitignore 2 | 3 | # Logs 4 | 5 | logs 6 | _.log 7 | npm-debug.log_ 8 | yarn-debug.log* 9 | yarn-error.log* 10 | lerna-debug.log* 11 | .pnpm-debug.log* 12 | 13 | # Diagnostic reports (https://nodejs.org/api/report.html) 14 | 15 | report.[0-9]_.[0-9]_.[0-9]_.[0-9]_.json 16 | 17 | # Runtime data 18 | 19 | pids 20 | _.pid 21 | _.seed 22 | \*.pid.lock 23 | 24 | # Directory for instrumented libs generated by jscoverage/JSCover 25 | 26 | lib-cov 27 | 28 | # Coverage directory used by tools like istanbul 29 | 30 | coverage 31 | \*.lcov 32 | 33 | # nyc test coverage 34 | 35 | .nyc_output 36 | 37 | # Grunt intermediate storage (https://gruntjs.com/creating-plugins#storing-task-files) 38 | 39 | .grunt 40 | 41 | # Bower dependency directory (https://bower.io/) 42 | 43 | bower_components 44 | 45 | # node-waf configuration 46 | 47 | .lock-wscript 48 | 49 | # Compiled binary addons (https://nodejs.org/api/addons.html) 50 | 51 | build/Release 52 | 53 | # Dependency directories 54 | 55 | node_modules/ 56 | jspm_packages/ 57 | 58 | # Snowpack dependency directory (https://snowpack.dev/) 59 | 60 | web_modules/ 61 | 62 | # TypeScript cache 63 | 64 | \*.tsbuildinfo 65 | 66 | # Optional npm cache directory 67 | 68 | .npm 69 | 70 | # Optional eslint cache 71 | 72 | .eslintcache 73 | 74 | # Optional stylelint cache 75 | 76 | .stylelintcache 77 | 78 | # Microbundle cache 79 | 80 | .rpt2_cache/ 81 | .rts2_cache_cjs/ 82 | .rts2_cache_es/ 83 | .rts2_cache_umd/ 84 | 85 | # Optional REPL history 86 | 87 | .node_repl_history 88 | 89 | # Output of 'npm pack' 90 | 91 | \*.tgz 92 | 93 | # Yarn Integrity file 94 | 95 | .yarn-integrity 96 | 97 | # dotenv environment variable files 98 | 99 | .env 100 | .env.development.local 101 | .env.test.local 102 | .env.production.local 103 | .env.local 104 | 105 | # parcel-bundler cache (https://parceljs.org/) 106 | 107 | .cache 108 | .parcel-cache 109 | 110 | # Next.js build output 111 | 112 | .next 113 | out 114 | 115 | # Nuxt.js build / generate output 116 | 117 | .nuxt 118 | dist 119 | 120 | # Gatsby files 121 | 122 | .cache/ 123 | 124 | # Comment in the public line in if your project uses Gatsby and not Next.js 125 | 126 | # https://nextjs.org/blog/next-9-1#public-directory-support 127 | 128 | # public 129 | 130 | # vuepress build output 131 | 132 | .vuepress/dist 133 | 134 | # vuepress v2.x temp and cache directory 135 | 136 | .temp 137 | .cache 138 | 139 | # Docusaurus cache and generated files 140 | 141 | .docusaurus 142 | 143 | # Serverless directories 144 | 145 | .serverless/ 146 | 147 | # FuseBox cache 148 | 149 | .fusebox/ 150 | 151 | # DynamoDB Local files 152 | 153 | .dynamodb/ 154 | 155 | # TernJS port file 156 | 157 | .tern-port 158 | 159 | # Stores VSCode versions used for testing VSCode extensions 160 | 161 | .vscode-test 162 | 163 | # yarn v2 164 | 165 | .yarn/cache 166 | .yarn/unplugged 167 | .yarn/build-state.yml 168 | .yarn/install-state.gz 169 | .pnp.\* 170 | 171 | # IntelliJ based IDEs 172 | .idea 173 | 174 | # Finder (MacOS) folder config 175 | .DS_Store 176 | 177 | -------------------------------------------------------------------------------- /src/utils.ts: -------------------------------------------------------------------------------- 1 | import { v4 as generateRandomUUID } from "uuid"; 2 | 3 | export const MQ_PREFIX = "UpstashMQ"; 4 | 5 | export const formatMessageQueueKey = (queueName: string) => { 6 | return `${MQ_PREFIX}:${queueName}`; 7 | }; 8 | 9 | export const delay = (duration: number): Promise => { 10 | return new Promise((resolve) => { 11 | setTimeout(resolve, duration); 12 | }); 13 | }; 14 | 15 | export const generateRandomConsumerName = (): string => { 16 | return `consumer-${generateRandomUUID()}`; 17 | }; 18 | 19 | export type ParsedStreamMessage = { 20 | streamId: string; 21 | body: TStreamResult; 22 | } | null; 23 | 24 | type StartId = string; 25 | type GroupName = string; 26 | type StreamId = string; 27 | type MessageBody = [string, string]; 28 | 29 | type XreadGroupStreamArray = [[GroupName, Array<[StreamId, MessageBody]>]]; 30 | 31 | /** 32 | * Parses the result of a Redis XREADGROUP response, extracting relevant information. 33 | * 34 | * @param {unknown[]} streamArray - The array representing the Redis XREADGROUP response. 35 | * Example structure: [["UpstashMQ:1251e0e7",[["1703755686316-0", ["messageBody", '{"hello":"world"}']]]]] 36 | * 37 | * @returns {ParsedStreamMessage | null} - Parsed result containing the stream ID 38 | * and parsed message body, or null if parsing fails. 39 | */ 40 | export const parseXreadGroupResponse = ( 41 | streamArray: unknown[] 42 | ): ParsedStreamMessage => { 43 | const typedStream = streamArray as XreadGroupStreamArray; 44 | 45 | const streamData = typedStream?.[0]; 46 | const firstMessage = streamData?.[1]?.[0]; 47 | const streamId = firstMessage?.[0]; 48 | const messageBodyString = firstMessage?.[1]?.[1]; 49 | 50 | if (!streamId || !messageBodyString) { 51 | return null; 52 | } 53 | 54 | return { 55 | streamId: streamId, 56 | body: messageBodyString as any, 57 | }; 58 | }; 59 | 60 | type XclaimAutoRedisStreamArray = [StartId, Array<[StreamId, MessageBody]>]; 61 | 62 | /** 63 | * Parses the result of a Redis XCLAIM response with automatic stream creation, 64 | * extracting relevant information. 65 | * 66 | * @param {unknown[]} streamArray - The array representing the Redis XCLAIM response. 67 | * Example structure: ["0-0",[["1703754659687-0", ["messageBody", '{"hello":"world"}']]],[]] 68 | * 69 | * @returns {ParsedStreamMessage | null} - Parsed result containing the stream ID 70 | * and parsed message body, or null if parsing fails. 71 | */ 72 | export const parseXclaimAutoResponse = ( 73 | streamArray: unknown[] 74 | ): ParsedStreamMessage => { 75 | const typedStream = streamArray as XclaimAutoRedisStreamArray; 76 | const firstMessage = typedStream?.[1]?.[0]; 77 | const streamId = firstMessage?.[0]; 78 | const messageBodyString = firstMessage?.[1]?.[1]; 79 | 80 | if (!streamId || !messageBodyString) { 81 | return null; 82 | } 83 | 84 | return { 85 | streamId, 86 | body: messageBodyString as any, 87 | }; 88 | }; 89 | 90 | /** 91 | * Asserts that the provided data is non-null and non-undefined. 92 | * If the assertion fails, an error with the specified message is thrown. 93 | * 94 | * @param {T} data - The data to assert as non-nullable. 95 | * @param {string} message - The error message to throw if the assertion fails. 96 | * @throws {Error} Throws an error if the assertion fails. 97 | * @returns {asserts data is NonNullable} - Type assertion indicating that the data is non-nullable. 98 | */ 99 | export function invariant(data: T, message: string): asserts data is NonNullable { 100 | if (!data) { 101 | throw new Error(message); 102 | } 103 | } 104 | -------------------------------------------------------------------------------- /examples/nextjs14/app/page.tsx: -------------------------------------------------------------------------------- 1 | "use client"; 2 | import { useState } from "react"; 3 | 4 | import { CSSTransition, TransitionGroup } from "react-transition-group"; 5 | import * as uuid from "uuid"; 6 | 7 | type MessageBody = { 8 | content: string; 9 | }; 10 | 11 | type MessageWithId = { 12 | id: string; 13 | body: MessageBody; 14 | }; 15 | 16 | const queueName = `queue-${uuid.v4().substring(0, 18)}`; 17 | 18 | export default function Home() { 19 | const [messageInput, setMessageInput] = useState(uuid.v4().substring(0, 18)); 20 | const [queueMessages, setQueueMessages] = useState([]); 21 | 22 | const send = async () => { 23 | fetch("/api/send", { 24 | method: "POST", 25 | body: JSON.stringify({ message: messageInput, queueName: queueName }), 26 | }).then(async (res) => { 27 | const data = await res.json(); 28 | if (data.messageId) { 29 | setQueueMessages([ 30 | ...queueMessages, 31 | { 32 | id: data.messageId, 33 | body: { content: messageInput }, 34 | } as MessageWithId, 35 | ]); 36 | } 37 | }); 38 | }; 39 | 40 | const receive = async () => { 41 | fetch("/api/receive", { 42 | method: "POST", 43 | body: JSON.stringify({ queueName: queueName }), 44 | }).then(async (res) => { 45 | const data = await res.json(); 46 | 47 | if (data.id && data.message) { 48 | setQueueMessages(queueMessages.filter((msg) => msg.id !== data.id)); 49 | } 50 | }); 51 | }; 52 | return ( 53 |
54 |
55 |

56 | Welcome to @upstash/queue 57 |

58 | 59 |

60 | This is an example of how to use @upstash/queue as a FIFO queue in your Next.js 61 | application. 62 |

63 | 64 |

65 | You can create and consume messages from the queue using the buttons. 66 |

67 |
68 | 69 |
70 |
71 |
72 |

Queue

73 | 74 | {queueMessages.length > 0 && 75 | queueMessages.map((message, index) => { 76 | return ( 77 | 78 |
79 |
80 | {index} 81 |
82 | 83 |
84 |

85 | ID: {message.id} 86 |

87 | 88 |

89 | Content:{" "} 90 | {message.body.content} 91 |

92 |
93 |
94 |
95 | ); 96 | })} 97 |
98 | {queueMessages.length === 0 &&

No messages in the queue...

} 99 |
100 |
101 |
102 | 103 |
104 |
105 |
106 | setMessageInput(e.target.value)} 112 | /> 113 | 123 |
124 |
125 | 134 |
135 |
136 | 145 |
146 |
147 | ); 148 | } 149 | -------------------------------------------------------------------------------- /README.md: -------------------------------------------------------------------------------- 1 | # Upstash Queue 2 | · ![license](https://img.shields.io/npm/l/%40upstash%2Fqueue) [![Tests](https://github.com/upstash/queue/actions/workflows/tests.yaml/badge.svg)](https://github.com/upstash/queue/actions/workflows/tests.yaml) ![npm (scoped)](https://img.shields.io/npm/v/@upstash/queue) ![npm bundle size](https://img.shields.io/bundlephobia/minzip/@upstash/queue) ![npm weekly download](https://img.shields.io/npm/dw/%40upstash%2Fqueue) 3 | 4 | 5 | > [!NOTE] 6 | > **This project is a Community Project.** 7 | > 8 | > The project is maintained and supported by the community. Upstash may contribute but does not officially support or assume responsibility for it. 9 | 10 | 11 | 12 | A simple, fast, robust, stream-based message queue for Node.js, backed by Upstash Redis, inspired by AWS SQS. 13 | 14 | - Simple: ~350 LOC, and single dependency. 15 | - Lightweight: Under ~5kb zipped. 16 | - Fast: Maximizes throughput by minimizing Redis and network overhead. Benchmarks well. 17 | - Robust: Designed with concurrency and failure in mind; full code coverage. 18 | 19 | ```ts 20 | import { Redis } from "@upstash/redis"; 21 | import { Queue } from "@upstash/queue"; 22 | 23 | const queue = new Queue({ redis: new Redis() }); 24 | 25 | await queue.sendMessage({ hello: "world1" }); 26 | 27 | const message1 = await queue.receiveMessage<{ hello: "world1" }>(); 28 | expect(message1?.body).toEqual({ hello: "world1" }); 29 | ``` 30 | 31 | ## Introduction 32 | 33 | `@upstash/queue` is a Node.js library that provides a simple and efficient way to implement a message queue system using Redis streams. It offers features such as message sending, receiving, automatic message verification, and concurrency control. This library is particularly useful for building distributed systems and background job processing. 34 | 35 | ## Why Upstash Queue? 36 | 37 | Upstash Queue brings the simplicity and performance of Redis streams to Node.js developers, making it easy to integrate a robust message queue system into their applications. Whether you're working on distributed systems, background job processing, or other asynchronous workflows, Upstash Queue provides essential features such as message sending, receiving, automatic verification, and concurrency control. 38 | 39 | **Key Features:** 40 | 41 | - **Efficiency:** Leverages the speed and reliability of Redis streams for message handling. 42 | - **Simplicity:** Dead simple implementation of distributed systems and background job processing with a clean and intuitive API. 43 | - **Concurrency Control:** Easily manages concurrent message processing to optimize system performance. 44 | - **Automatic Verification:** Benefits from built-in message verification mechanisms for reliable and secure communication. 45 | 46 | ## Table of Contents 47 | 48 | - [Installation](#installation) 49 | - [Getting Started](#getting-started) 50 | - [Sending a Message](#sending-a-message) 51 | - [Receiving a Message](#receiving-a-message) 52 | - [Verifying a Message](#verifying-a-message) 53 | - [Configuration Options](#configuration-options) 54 | - [Examples](#examples) 55 | - [FIFO Example](#fifo-example) 56 | - [Sending Message with Delay then Poll](#sending-message-with-delay-then-poll) 57 | - [Manual Verification](#manual-verification) 58 | - [Concurrent Message Processing/Consuming](#concurrent-message-processingconsuming) 59 | 60 | ## Installation 61 | 62 | To start using Upstash Queue, install the library via npm: 63 | 64 | ```sh 65 | npm install @upstash/queue 66 | ``` 67 | 68 | ## Getting Started 69 | 70 | ```typescript 71 | import Redis from "@upstash/redis"; 72 | import { Queue } from "@upstash/queue"; 73 | 74 | const redis = new Redis(); 75 | const queue = new Queue({ redis }); 76 | ``` 77 | 78 | ### Sending a Message 79 | 80 | ```typescript 81 | const payload = { key: "value" }; 82 | const delayInSeconds = 0; // Set to 0 for immediate delivery. 83 | const streamId = await queue.sendMessage(payload, delayInSeconds); 84 | ``` 85 | 86 | ### Receiving a Message 87 | 88 | ```typescript 89 | const pollingForNewMessages = 1000; // Set to 0 for non-blocking, otherwise, it will try to get a message then fail if none is available. 90 | const receivedMessage = await queue.receiveMessage(pollingForNewMessages); 91 | console.log("Received Message:", receivedMessage); 92 | ``` 93 | 94 | ### Verifying a Message 95 | 96 | ```typescript 97 | const streamId = "some_stream_id"; 98 | const verificationStatus = await queue.verifyMessage(streamId); 99 | console.log("Verification Status:", verificationStatus); 100 | ``` 101 | 102 | ### Configuration Options 103 | 104 | When initializing the Queue instance, you can provide various configuration options: 105 | 106 | - redis: Redis client instance (required). 107 | - queueName: Name of the Redis stream (default: "UpstashMQ:Queue"). 108 | - concurrencyLimit: Maximum concurrent message processing allowed (default: 1). 109 | - autoVerify: Auto-verify received messages (default: true). 110 | - consumerGroupName: Group that holds consumers when automatically created (default: "Messages"). 111 | - visibilityTimeout: Time until recently sent messages become visible to other consumers (default: 30 seconds). 112 | 113 | #### Verifying a Message 114 | 115 | ```typescript 116 | const queueConfig = { 117 | redis: new Redis(), 118 | queueName: "MyCustomQueue", 119 | concurrencyLimit: 2, 120 | autoVerify: false, 121 | consumerGroupName: "MyConsumers", 122 | visibilityTimeout: 60000, // 1 minute. 123 | }; 124 | 125 | const customQueue = new Queue(queueConfig); 126 | ``` 127 | 128 | ## Examples 129 | 130 | ### FIFO Example 131 | 132 | ```typescript 133 | const queue = new Queue({ redis }); 134 | await queue.sendMessage({ hello: "world1" }); 135 | await delay(100); 136 | await queue.sendMessage({ hello: "world2" }); 137 | await delay(100); 138 | await queue.sendMessage({ hello: "world3" }); 139 | 140 | const message1 = await queue.receiveMessage<{ hello: "world1" }>(); // Logs out { hello: "world1" }. 141 | 142 | const message2 = await queue.receiveMessage<{ hello: "world2" }>(); // Logs out { hello: "world2" }. 143 | 144 | const message3 = await queue.receiveMessage<{ hello: "world3" }>(); // Logs out { hello: "world3" }. 145 | ``` 146 | 147 | ### Sending Message with Delay then Poll 148 | 149 | ```typescript 150 | const fakeValue = randomValue(); 151 | const queueName = "app-logs"; 152 | 153 | const producer = new Queue({ redis, queueName }); 154 | const consumer = new Queue({ 155 | redis: new Redis(), 156 | queueName, 157 | }); 158 | await producer.sendMessage( 159 | { 160 | dev: fakeValue, 161 | }, 162 | 2000 163 | ); 164 | 165 | const receiveMessageRes = await consumer.receiveMessage<{ 166 | dev: string; 167 | }>(5000); 168 | ``` 169 | 170 | ### Manual Verification 171 | 172 | ```typescript 173 | const queue = new Queue({ redis, autoVerify: false }); 174 | await queue.sendMessage({ hello: "world" }); 175 | 176 | const message = await queue.receiveMessage<{ hello: "world" }>(); // Logs out { hello: "world" }. 177 | if (message) { 178 | await queue.verifyMessage(message.streamId); //Logs out "VERIFIED" or "NOT VERIFIED". 179 | } 180 | ``` 181 | 182 | ### Concurrent Message Processing/Consuming 183 | 184 | If `concurrencyLimit` is not set, one of the `receiveMessage()` will throw. You need to explicitly set the concurrencyLimit. Default is 1. 185 | 186 | ```typescript 187 | const queue = new Queue({ 188 | redis: new Redis(), 189 | queueName: randomValue(), 190 | concurrencyLimit: 2, 191 | }); 192 | 193 | await queue.sendMessage({ 194 | dev: randomValue(), 195 | }); 196 | 197 | await queue.sendMessage({ 198 | dev: randomValue(), 199 | }); 200 | 201 | await Promise.all([queue.receiveMessage(), queue.receiveMessage()]); 202 | ``` 203 | -------------------------------------------------------------------------------- /src/queue.ts: -------------------------------------------------------------------------------- 1 | import { v4 as generateRandomUUID } from "uuid"; 2 | 3 | import { 4 | DEFAULT_AUTO_VERIFY, 5 | DEFAULT_CONCURRENCY_LIMIT, 6 | DEFAULT_CONSUMER_GROUP_NAME, 7 | DEFAULT_QUEUE_NAME, 8 | DEFAULT_VISIBILITY_TIMEOUT_IN_MS, 9 | ERROR_MAP, 10 | MAX_CONCURRENCY_LIMIT, 11 | } from "./constants"; 12 | import { 13 | type ParsedStreamMessage, 14 | formatMessageQueueKey, 15 | invariant, 16 | parseXclaimAutoResponse, 17 | parseXreadGroupResponse, 18 | } from "./utils"; 19 | 20 | import type { Redis } from "@upstash/redis"; 21 | 22 | export type QueueConfig = { 23 | redis: Redis; 24 | /** 25 | * Queue name for the redis stream 26 | * @default "UpstashMQ:Queue" 27 | */ 28 | queueName?: string; 29 | /** 30 | * The maximum number of concurrent message processing allowed. 31 | * @default 1 32 | */ 33 | concurrencyLimit?: 0 | 1 | 2 | 3 | 4 | 5; 34 | /** 35 | * Auto verifies received messages. If not set message will be picked up by some other consumer after visiblityTimeout. 36 | * @default true 37 | */ 38 | autoVerify?: boolean; 39 | /** 40 | * This is the group that holds every other consumer when automatically created. 41 | * @default "Messages" 42 | */ 43 | consumerGroupName?: string; 44 | /** 45 | * Recently sent messages won't be visible to other consumers until this period of time. If no one else acknowledges it it will be picked up by others. 46 | * @default "30 seconds" 47 | */ 48 | visibilityTimeout?: number; 49 | }; 50 | 51 | export class Queue { 52 | config: QueueConfig; 53 | concurrencyCounter = DEFAULT_CONCURRENCY_LIMIT; 54 | hasConsumerGroupInitialized = false; 55 | 56 | private messageTimeouts = new Set(); 57 | 58 | constructor(config: QueueConfig) { 59 | this.config = { 60 | redis: config.redis, 61 | 62 | concurrencyLimit: config.concurrencyLimit ?? DEFAULT_CONCURRENCY_LIMIT, 63 | autoVerify: config.autoVerify ?? DEFAULT_AUTO_VERIFY, 64 | consumerGroupName: config.consumerGroupName ?? DEFAULT_CONSUMER_GROUP_NAME, 65 | queueName: config.queueName 66 | ? this.appendPrefixTo(config.queueName) 67 | : this.appendPrefixTo(DEFAULT_QUEUE_NAME), 68 | visibilityTimeout: config.visibilityTimeout ?? DEFAULT_VISIBILITY_TIMEOUT_IN_MS, 69 | }; 70 | } 71 | 72 | private appendPrefixTo(key: string) { 73 | return formatMessageQueueKey(key); 74 | } 75 | 76 | private async initializeConsumerGroup() { 77 | if (this.hasConsumerGroupInitialized) { 78 | return; 79 | } 80 | 81 | invariant( 82 | this.config.consumerGroupName, 83 | "Consumer group name cannot be empty when initializing consumer group" 84 | ); 85 | invariant(this.config.queueName, "Queue name cannot be empty when initializing consumer group"); 86 | 87 | try { 88 | await this.config.redis.xgroup(this.config.queueName, { 89 | type: "CREATE", 90 | group: this.config.consumerGroupName, 91 | id: "$", 92 | options: { MKSTREAM: true }, 93 | }); 94 | this.hasConsumerGroupInitialized = true; 95 | } catch (e) { 96 | if (e instanceof Error) { 97 | if (e.message.includes("BUSYGROUP Consumer Group name already exists")) { 98 | this.hasConsumerGroupInitialized = true; 99 | return; 100 | } 101 | } 102 | 103 | return; 104 | } 105 | } 106 | 107 | async sendMessage(payload: T, delayMs = 0) { 108 | const { redis } = this.config; 109 | await this.initializeConsumerGroup(); 110 | try { 111 | const streamKey = this.config.queueName; 112 | invariant(streamKey, "Queue name cannot be empty when sending a message"); 113 | 114 | const _sendMessage = () => 115 | redis.xadd(streamKey, "*", { 116 | messageBody: payload, 117 | }); 118 | 119 | if (delayMs > 0) { 120 | let streamIdResult: string | null = null; 121 | 122 | const timeoutId = setTimeout(() => { 123 | _sendMessage().then((res) => { 124 | streamIdResult = res; 125 | this.messageTimeouts.delete(timeoutId); 126 | }); 127 | }, delayMs); 128 | this.messageTimeouts.add(timeoutId); 129 | return streamIdResult; 130 | } 131 | return await _sendMessage(); 132 | } catch (error) { 133 | console.error("Error in sendMessage:", error); 134 | return null; 135 | } 136 | } 137 | 138 | async receiveMessage(blockTimeMs = 0) { 139 | this.checkIfReceiveMessageAllowed(); 140 | await this.initializeConsumerGroup(); 141 | 142 | const xclaimParsedMessage = await this.claimStuckPendingMessageAndVerify(); 143 | if (xclaimParsedMessage) { 144 | return xclaimParsedMessage; 145 | } 146 | 147 | // Claiming failed, fallback to default read message 148 | return await this.readAndVerifyPendingMessage(blockTimeMs); 149 | } 150 | 151 | private checkIfReceiveMessageAllowed() { 152 | const { concurrencyLimit } = this.config; 153 | 154 | const concurrencyNotSetAndAboveDefaultLimit = 155 | concurrencyLimit === DEFAULT_CONCURRENCY_LIMIT && 156 | this.concurrencyCounter >= DEFAULT_CONCURRENCY_LIMIT + 1; 157 | 158 | const concurrencyAboveTheMaxLimit = this.concurrencyCounter > MAX_CONCURRENCY_LIMIT; 159 | 160 | if (concurrencyNotSetAndAboveDefaultLimit) { 161 | throw new Error(ERROR_MAP.CONCURRENCY_DEFAULT_LIMIT_EXCEEDED); 162 | } 163 | this.incrementConcurrencyCount(); 164 | 165 | if (concurrencyAboveTheMaxLimit) { 166 | throw new Error(ERROR_MAP.CONCURRENCY_LIMIT_EXCEEDED); 167 | } 168 | } 169 | 170 | private async claimStuckPendingMessageAndVerify(): Promise< 171 | ParsedStreamMessage 172 | > { 173 | const { autoVerify } = this.config; 174 | const consumerName = this.generateRandomConsumerName(); 175 | 176 | const xclaimParsedMessage = await this.claimAndParseMessage(consumerName); 177 | 178 | if (xclaimParsedMessage && autoVerify) { 179 | await this.verifyMessage(xclaimParsedMessage.streamId); 180 | } 181 | 182 | if (xclaimParsedMessage == null) { 183 | await this.removeEmptyConsumer(consumerName); 184 | } 185 | 186 | return xclaimParsedMessage; 187 | } 188 | 189 | private async removeEmptyConsumer(consumer: string) { 190 | const { redis, consumerGroupName, queueName } = this.config; 191 | invariant(consumerGroupName, "Consumer group name cannot be empty when removing a consumer"); 192 | invariant(queueName, "Queue name cannot be empty when removing a consumer"); 193 | 194 | await redis.xgroup(queueName, { 195 | type: "DELCONSUMER", 196 | consumer, 197 | group: consumerGroupName, 198 | }); 199 | } 200 | 201 | private async claimAndParseMessage( 202 | consumerName: string 203 | ): Promise> { 204 | const xclaimRes = await this.autoClaim(consumerName); 205 | return parseXclaimAutoResponse(xclaimRes); 206 | } 207 | 208 | private async autoClaim(consumerName: string) { 209 | const { redis, consumerGroupName, queueName, visibilityTimeout } = this.config; 210 | invariant(consumerGroupName, "Consumer group name cannot be empty when receiving a message"); 211 | invariant(queueName, "Queue name cannot be empty when receving a message"); 212 | invariant(visibilityTimeout, "Visibility timeout name cannot be empty when receving a message"); 213 | 214 | return await redis.xautoclaim( 215 | queueName, 216 | consumerGroupName, 217 | consumerName, 218 | visibilityTimeout, 219 | "0-0", 220 | { count: 1 } 221 | ); 222 | } 223 | 224 | private async readAndVerifyPendingMessage( 225 | blockTimeMs: number 226 | ): Promise> { 227 | const { autoVerify } = this.config; 228 | 229 | const parsedXreadMessage = await this.readAndParseMessage(blockTimeMs); 230 | 231 | if (parsedXreadMessage && autoVerify) { 232 | await this.verifyMessage(parsedXreadMessage.streamId); 233 | } 234 | 235 | return parsedXreadMessage; 236 | } 237 | 238 | async readAndParseMessage( 239 | blockTimeMs: number 240 | ): Promise | null> { 241 | const consumerName = this.generateRandomConsumerName(); 242 | 243 | const xreadRes = 244 | blockTimeMs > 0 245 | ? await this.receiveBlockingMessage(blockTimeMs, consumerName) 246 | : await this.receiveNonBlockingMessage(consumerName); 247 | 248 | return parseXreadGroupResponse(xreadRes); 249 | } 250 | 251 | private async receiveBlockingMessage(blockTimeMs: number, consumerName: string) { 252 | const { redis, consumerGroupName, queueName } = this.config; 253 | invariant(consumerGroupName, "Consumer group name cannot be empty when receiving a message"); 254 | invariant(queueName, "Queue name cannot be empty when receving a message"); 255 | 256 | return await redis.xreadgroup(consumerGroupName, consumerName, queueName, ">", { 257 | count: 1, 258 | blockMS: blockTimeMs, 259 | }); 260 | } 261 | 262 | private async receiveNonBlockingMessage(consumerName: string) { 263 | const { redis, consumerGroupName, queueName } = this.config; 264 | invariant(consumerGroupName, "Consumer group name cannot be empty when receiving a message"); 265 | invariant(queueName, "Queue name cannot be empty when receving a message"); 266 | 267 | const data = await redis.xreadgroup(consumerGroupName, consumerName, queueName, ">", { 268 | count: 1, 269 | }); 270 | return data; 271 | } 272 | 273 | async verifyMessage(streamId: string): Promise<"VERIFIED" | "NOT VERIFIED"> { 274 | const { redis } = this.config; 275 | await this.initializeConsumerGroup(); 276 | this.decrementConcurrencyCount(); 277 | 278 | try { 279 | invariant( 280 | this.config.consumerGroupName, 281 | "Consumer group name cannot be empty when verifying a message" 282 | ); 283 | 284 | invariant(this.config.queueName, "Queue name cannot be empty when verifying a message"); 285 | 286 | const xackRes = await redis.xack( 287 | this.config.queueName, 288 | this.config.consumerGroupName, 289 | streamId 290 | ); 291 | if (typeof xackRes === "number" && xackRes > 0) { 292 | return "VERIFIED"; 293 | } 294 | return "NOT VERIFIED"; 295 | } catch (finalError) { 296 | console.error( 297 | `Final attempt to acknowledge message failed: ${(finalError as Error).message}` 298 | ); 299 | return "NOT VERIFIED"; 300 | } 301 | } 302 | 303 | private generateRandomConsumerName = () => { 304 | if (this.concurrencyCounter > MAX_CONCURRENCY_LIMIT) { 305 | throw new Error(ERROR_MAP.CONCURRENCY_LIMIT_EXCEEDED); 306 | } 307 | const randomUUID = generateRandomUUID(); 308 | return this.appendPrefixTo(randomUUID); 309 | }; 310 | 311 | private incrementConcurrencyCount() { 312 | this.concurrencyCounter++; 313 | } 314 | 315 | private decrementConcurrencyCount() { 316 | this.concurrencyCounter--; 317 | } 318 | } 319 | -------------------------------------------------------------------------------- /src/queue.test.ts: -------------------------------------------------------------------------------- 1 | import { afterAll, describe, expect, test } from "bun:test"; 2 | import { Redis } from "@upstash/redis"; 3 | 4 | import { Queue } from "."; 5 | import { 6 | DEFAULT_AUTO_VERIFY, 7 | DEFAULT_CONCURRENCY_LIMIT, 8 | DEFAULT_CONSUMER_GROUP_NAME, 9 | DEFAULT_QUEUE_NAME, 10 | ERROR_MAP, 11 | } from "./constants"; 12 | import { delay, formatMessageQueueKey } from "./utils"; 13 | 14 | const randomValue = () => crypto.randomUUID().slice(0, 8); 15 | const redis = Redis.fromEnv(); 16 | 17 | describe("Queue", () => { 18 | afterAll(async () => await redis.flushdb()); 19 | 20 | describe("Queue name default option", () => { 21 | test("should return the default queue name", () => { 22 | const queue = new Queue({ redis }); 23 | expect(queue.config.queueName).toEqual(formatMessageQueueKey(DEFAULT_QUEUE_NAME)); 24 | }); 25 | 26 | test("should return the customized name", () => { 27 | const queueName = "cookie-jar"; 28 | const queue = new Queue({ redis, queueName }); 29 | expect(queue.config.queueName).toEqual(formatMessageQueueKey(queueName)); 30 | }); 31 | }); 32 | 33 | describe("Consumer group name default option", () => { 34 | test("should return the default customerGroupName", () => { 35 | const queue = new Queue({ redis, queueName: randomValue() }); 36 | expect(queue.config.consumerGroupName).toEqual(DEFAULT_CONSUMER_GROUP_NAME); 37 | }); 38 | 39 | test("should return the customized customerGroupName", () => { 40 | const consumerGroupName = "bigger-cookie-jar"; 41 | const queue = new Queue({ 42 | redis, 43 | consumerGroupName, 44 | queueName: randomValue(), 45 | }); 46 | expect(queue.config.consumerGroupName).toEqual(consumerGroupName); 47 | }); 48 | }); 49 | 50 | describe("Concurrency limit default option", () => { 51 | test("should return 0 when concurrency is default", () => { 52 | const queue = new Queue({ redis, queueName: randomValue() }); 53 | expect(queue.config.concurrencyLimit).toEqual(DEFAULT_CONCURRENCY_LIMIT); 54 | }); 55 | 56 | test("should return the customized concurrency limit", () => { 57 | const concurrencyLimit = 5; 58 | const queue = new Queue({ 59 | redis, 60 | concurrencyLimit, 61 | queueName: randomValue(), 62 | }); 63 | expect(queue.config.concurrencyLimit).toEqual(concurrencyLimit); 64 | }); 65 | }); 66 | 67 | describe("Auto verify default option", () => { 68 | test("should return 0 when concurrency is default", () => { 69 | const queue = new Queue({ redis, queueName: randomValue() }); 70 | expect(queue.config.autoVerify).toEqual(DEFAULT_AUTO_VERIFY); 71 | }); 72 | 73 | test("should return the customized concurrency limit", () => { 74 | const autoVerify = false; 75 | const queue = new Queue({ redis, autoVerify, queueName: randomValue() }); 76 | expect(queue.config.autoVerify).toEqual(autoVerify); 77 | }); 78 | }); 79 | 80 | describe("Concurrency", () => { 81 | test("should allow only specified amount of receiveMessages concurrently", async () => { 82 | const consumerCount = 4; 83 | 84 | const consumer = new Queue({ 85 | redis, 86 | concurrencyLimit: consumerCount, 87 | queueName: randomValue(), 88 | }); 89 | 90 | for (let i = 0; i < consumerCount; i++) { 91 | await consumer.receiveMessage(); 92 | } 93 | 94 | expect(consumer.concurrencyCounter).toEqual(consumerCount); 95 | }); 96 | 97 | test("should throw when try to consume more than 5 at the same time", async () => { 98 | let errorMessage = ""; 99 | let iterationCount = 0; 100 | try { 101 | const consumer = new Queue({ 102 | redis, 103 | queueName: randomValue(), 104 | concurrencyLimit: 5, 105 | }); 106 | 107 | for (let i = 0; i < 10; i++) { 108 | await consumer.receiveMessage(); 109 | iterationCount++; 110 | } 111 | } catch (error) { 112 | errorMessage = (error as Error).message; 113 | } 114 | expect(iterationCount).toBe(5); 115 | expect(errorMessage).toEqual(ERROR_MAP.CONCURRENCY_LIMIT_EXCEEDED); 116 | }); 117 | 118 | test("should give us 0 since all the consumers are available after successful verify", async () => { 119 | const queue = new Queue({ 120 | redis, 121 | queueName: randomValue(), 122 | concurrencyLimit: 2, 123 | }); 124 | 125 | await queue.sendMessage({ 126 | dev: randomValue(), 127 | }); 128 | 129 | await queue.sendMessage({ 130 | dev: randomValue(), 131 | }); 132 | 133 | await Promise.all([queue.receiveMessage(), queue.receiveMessage()]); 134 | 135 | expect(queue.concurrencyCounter).toEqual(0); 136 | }); 137 | 138 | test("should throw since default receive messages exceeds default limit: 1", async () => { 139 | let errorMessage = ""; 140 | try { 141 | const queue = new Queue({ 142 | redis, 143 | queueName: randomValue(), 144 | }); 145 | 146 | await queue.sendMessage({ 147 | dev: randomValue(), 148 | }); 149 | await queue.sendMessage({ 150 | dev: randomValue(), 151 | }); 152 | 153 | await Promise.all([ 154 | queue.receiveMessage(), 155 | queue.receiveMessage(), 156 | queue.receiveMessage(), 157 | queue.receiveMessage(), 158 | ]); 159 | } catch (error) { 160 | errorMessage = (error as Error).message; 161 | } 162 | 163 | expect(errorMessage).toEqual(ERROR_MAP.CONCURRENCY_DEFAULT_LIMIT_EXCEEDED); 164 | }); 165 | }); 166 | 167 | describe("Auto verify", () => { 168 | test("should auto verify the message and decrement concurrency counter", async () => { 169 | const queue = new Queue({ 170 | redis, 171 | queueName: randomValue(), 172 | }); 173 | 174 | await queue.sendMessage({ 175 | dev: randomValue(), 176 | }); 177 | 178 | await queue.receiveMessage(); 179 | 180 | expect(queue.concurrencyCounter).toBe(0); 181 | }); 182 | 183 | test("should verify manually and decrement the counter", async () => { 184 | const queue = new Queue({ 185 | redis, 186 | queueName: randomValue(), 187 | autoVerify: false, 188 | }); 189 | 190 | await queue.sendMessage({ 191 | dev: randomValue(), 192 | }); 193 | 194 | const receiveRes = await queue.receiveMessage(); 195 | if (receiveRes) { 196 | const { streamId } = receiveRes; 197 | await queue.verifyMessage(streamId); 198 | } 199 | 200 | expect(queue.concurrencyCounter).toBe(0); 201 | }); 202 | 203 | test("should not release concurrency since auto verify is disabled and no verifyMessage present", async () => { 204 | const queue = new Queue({ 205 | redis, 206 | queueName: randomValue(), 207 | autoVerify: false, 208 | }); 209 | 210 | await queue.sendMessage({ 211 | dev: randomValue(), 212 | }); 213 | 214 | await queue.receiveMessage(); 215 | 216 | expect(queue.concurrencyCounter).not.toBe(0); 217 | }); 218 | }); 219 | 220 | describe("Autoclaim orphans", () => { 221 | test( 222 | "should left nothing in pending list", 223 | async () => { 224 | const queue = new Queue({ 225 | redis, 226 | autoVerify: false, 227 | queueName: randomValue(), 228 | concurrencyLimit: 2, 229 | visibilityTimeout: 10000, 230 | }); 231 | 232 | await queue.sendMessage({ hello: "world" }); 233 | await queue.receiveMessage<{ hello: "world" }>(); 234 | await delay(10000); 235 | const ackedReceive = await queue.receiveMessage<{ hello: "world" }>(); 236 | await queue.verifyMessage(ackedReceive?.streamId!); 237 | 238 | const xpendingRes = await queue.config.redis.xpending( 239 | queue.config.queueName!, 240 | queue.config.consumerGroupName!, 241 | "-", 242 | "+", 243 | 1 244 | ); 245 | 246 | expect(xpendingRes).toBeEmpty(); 247 | }, 248 | { timeout: 20000 } 249 | ); 250 | 251 | test( 252 | "should return at least 1 pending since they are not claimed", 253 | async () => { 254 | const queue = new Queue({ 255 | redis, 256 | autoVerify: false, 257 | queueName: randomValue(), 258 | concurrencyLimit: 2, 259 | }); 260 | 261 | await queue.sendMessage({ hello: "world" }); 262 | await queue.receiveMessage<{ hello: "world" }>(); 263 | await delay(10000); 264 | await queue.receiveMessage<{ hello: "world" }>(); 265 | 266 | const xpendingRes = await queue.config.redis.xpending( 267 | queue.config.queueName!, 268 | queue.config.consumerGroupName!, 269 | "-", 270 | "+", 271 | 1 272 | ); 273 | 274 | expect(xpendingRes).not.toBeEmpty(); 275 | }, 276 | { timeout: 20000 } 277 | ); 278 | }); 279 | 280 | describe("Queue with delays", () => { 281 | test( 282 | "should enqueue with a delay", 283 | async () => { 284 | const fakeValue = randomValue(); 285 | const queue = new Queue({ 286 | redis, 287 | queueName: "app-logs", 288 | concurrencyLimit: 2, 289 | }); 290 | await queue.sendMessage( 291 | { 292 | dev: fakeValue, 293 | }, 294 | 2000 295 | ); 296 | const res = await queue.receiveMessage(); 297 | expect(res?.body).not.toEqual({ 298 | dev: fakeValue, 299 | }); 300 | await delay(5000); 301 | const res1 = await queue.receiveMessage<{ dev: string }>(); 302 | expect(res1?.body).toEqual({ 303 | dev: fakeValue, 304 | }); 305 | }, 306 | { timeout: 10000 } 307 | ); 308 | 309 | test( 310 | "should poll until data arives", 311 | () => { 312 | const throwable = async () => { 313 | const fakeValue = randomValue(); 314 | const producer = new Queue({ redis, queueName: "app-logs" }); 315 | const consumer = new Queue({ 316 | redis, 317 | queueName: "app-logs", 318 | }); 319 | await producer.sendMessage( 320 | { 321 | dev: fakeValue, 322 | }, 323 | 2 324 | ); 325 | await consumer.receiveMessage<{ 326 | dev: string; 327 | }>(5000); 328 | }; 329 | expect(throwable).toThrow(); 330 | }, 331 | { timeout: 10000 } 332 | ); 333 | }); 334 | 335 | describe("Reliability tests", () => { 336 | test("should handle multi-threaded message processing in batches", async () => { 337 | const queue = new Queue({ 338 | redis, 339 | queueName: randomValue(), 340 | concurrencyLimit: 5, 341 | }); 342 | 343 | // First batch of sending 5 messages 344 | const firstBatchSendPromises = []; 345 | for (let i = 0; i < 5; i++) { 346 | firstBatchSendPromises.push(queue.sendMessage({ data: `message-${i}` })); 347 | } 348 | await Promise.all(firstBatchSendPromises); 349 | 350 | // First batch of receiving 5 messages 351 | const firstBatchReceivePromises = []; 352 | for (let i = 0; i < 5; i++) { 353 | firstBatchReceivePromises.push(queue.receiveMessage()); 354 | } 355 | await Promise.all(firstBatchReceivePromises); 356 | 357 | // Second batch of sending 5 messages 358 | const secondBatchSendPromises = []; 359 | for (let i = 5; i < 10; i++) { 360 | secondBatchSendPromises.push(queue.sendMessage({ data: `message-${i}` })); 361 | } 362 | await Promise.all(secondBatchSendPromises); 363 | 364 | // Second batch of receiving 5 messages 365 | const secondBatchReceivePromises = []; 366 | for (let i = 5; i < 10; i++) { 367 | secondBatchReceivePromises.push(queue.receiveMessage()); 368 | } 369 | await Promise.all(secondBatchReceivePromises); 370 | 371 | expect(queue.concurrencyCounter).toBe(0); // Assuming autoVerify is enabled 372 | }, 30000); 373 | 374 | test("should handle high volume of messages in batches and track received messages", async () => { 375 | const batchSize = 5; // Size of each batch, based on the concurrency limit 376 | const queue = new Queue({ 377 | redis, 378 | queueName: randomValue(), 379 | concurrencyLimit: batchSize, 380 | }); 381 | 382 | const totalMessages = 100; // High volume of messages 383 | const numberOfBatches = totalMessages / batchSize; 384 | let totalNumberOfReceivedMessages = 0; // Counter for received messages 385 | 386 | for (let batch = 0; batch < numberOfBatches; batch++) { 387 | const sendMessagePromises = []; 388 | for (let i = 0; i < batchSize; i++) { 389 | sendMessagePromises.push(queue.sendMessage({ data: `message-${batch * batchSize + i}` })); 390 | } 391 | await Promise.all(sendMessagePromises); 392 | 393 | const receiveMessagePromises = []; 394 | for (let i = 0; i < batchSize; i++) { 395 | const promise = queue.receiveMessage().then((message) => { 396 | if (message) { 397 | totalNumberOfReceivedMessages++; 398 | } 399 | return message; 400 | }); 401 | receiveMessagePromises.push(promise); 402 | } 403 | await Promise.all(receiveMessagePromises); 404 | } 405 | 406 | expect(totalNumberOfReceivedMessages).toBe(totalMessages); 407 | expect(queue.concurrencyCounter).toBe(0); 408 | }, 30000); 409 | }); 410 | 411 | describe("FIFO queue", () => { 412 | test("should do a FIFO queue", async () => { 413 | const queue = new Queue({ redis }); 414 | 415 | await queue.sendMessage({ hello: "world1" }); 416 | await queue.sendMessage({ hello: "world2" }); 417 | await queue.sendMessage({ hello: "world3" }); 418 | 419 | const message1 = await queue.receiveMessage<{ hello: "world1" }>(); 420 | expect(message1?.body).toEqual({ hello: "world1" }); 421 | 422 | const message2 = await queue.receiveMessage<{ hello: "world2" }>(); 423 | expect(message2?.body).toEqual({ hello: "world2" }); 424 | 425 | const message3 = await queue.receiveMessage<{ hello: "world3" }>(); 426 | expect(message3?.body).toEqual({ hello: "world3" }); 427 | }); 428 | }); 429 | }); 430 | --------------------------------------------------------------------------------