├── bun.lockb
├── .vscode
├── extensions.json
└── settings.json
├── .husky
├── pre-push
├── pre-commit
└── commit-message
├── src
├── index.ts
├── constants.ts
├── utils.test.ts
├── utils.ts
├── queue.ts
└── queue.test.ts
├── examples
└── nextjs14
│ ├── bun.lockb
│ ├── app
│ ├── favicon.ico
│ ├── globals.css
│ ├── api
│ │ ├── send
│ │ │ └── route.ts
│ │ └── receive
│ │ │ └── route.ts
│ ├── layout.tsx
│ └── page.tsx
│ ├── next.config.mjs
│ ├── postcss.config.js
│ ├── .gitignore
│ ├── tailwind.config.ts
│ ├── public
│ ├── vercel.svg
│ └── next.svg
│ ├── tsconfig.json
│ ├── package.json
│ └── README.md
├── tsup.config.js
├── tsconfig.json
├── .github
└── workflows
│ ├── tests.yaml
│ └── release.yaml
├── biome.json
├── package.json
├── commitlint.config.js
├── .gitignore
└── README.md
/bun.lockb:
--------------------------------------------------------------------------------
https://raw.githubusercontent.com/upstash/queue/master/bun.lockb
--------------------------------------------------------------------------------
/.vscode/extensions.json:
--------------------------------------------------------------------------------
1 | {
2 | "recommendations": ["biomejs.biome"]
3 | }
4 |
--------------------------------------------------------------------------------
/.husky/pre-push:
--------------------------------------------------------------------------------
1 | #!/usr/bin/env sh
2 | . "$(dirname -- "$0")/_/husky.sh"
3 |
4 | bun run build
--------------------------------------------------------------------------------
/src/index.ts:
--------------------------------------------------------------------------------
1 | export { Queue } from "./queue";
2 | export type { QueueConfig } from "./queue";
3 |
--------------------------------------------------------------------------------
/examples/nextjs14/bun.lockb:
--------------------------------------------------------------------------------
https://raw.githubusercontent.com/upstash/queue/master/examples/nextjs14/bun.lockb
--------------------------------------------------------------------------------
/.husky/pre-commit:
--------------------------------------------------------------------------------
1 | #!/usr/bin/env sh
2 | . "$(dirname -- "$0")/_/husky.sh"
3 |
4 | bun run fmt && bun run test
--------------------------------------------------------------------------------
/.husky/commit-message:
--------------------------------------------------------------------------------
1 | #!/usr/bin/env sh
2 | . "$(dirname -- "$0")/_/husky.sh"
3 |
4 | bun --no -- commitlint --edit ""
--------------------------------------------------------------------------------
/examples/nextjs14/app/favicon.ico:
--------------------------------------------------------------------------------
https://raw.githubusercontent.com/upstash/queue/master/examples/nextjs14/app/favicon.ico
--------------------------------------------------------------------------------
/examples/nextjs14/next.config.mjs:
--------------------------------------------------------------------------------
1 | /** @type {import('next').NextConfig} */
2 | const nextConfig = {};
3 |
4 | export default nextConfig;
5 |
--------------------------------------------------------------------------------
/examples/nextjs14/postcss.config.js:
--------------------------------------------------------------------------------
1 | module.exports = {
2 | plugins: {
3 | tailwindcss: {},
4 | autoprefixer: {},
5 | },
6 | };
7 |
--------------------------------------------------------------------------------
/tsup.config.js:
--------------------------------------------------------------------------------
1 | import { defineConfig } from "tsup";
2 |
3 | export default defineConfig({
4 | entry: ["src/index.ts"],
5 | format: ["cjs", "esm"],
6 | sourcemap: false,
7 | clean: true,
8 | bundle: true,
9 | dts: true,
10 | minify: true,
11 | treeshake: true,
12 | });
13 |
--------------------------------------------------------------------------------
/tsconfig.json:
--------------------------------------------------------------------------------
1 | {
2 | "compilerOptions": {
3 | "lib": ["ESNext"],
4 | "module": "esnext",
5 | "target": "esnext",
6 | "moduleResolution": "bundler",
7 | "moduleDetection": "force",
8 | "allowImportingTsExtensions": true,
9 | "noEmit": true,
10 | "strict": true,
11 | "downlevelIteration": true,
12 | "skipLibCheck": true,
13 | "allowSyntheticDefaultImports": true,
14 | "forceConsistentCasingInFileNames": true,
15 | "allowJs": true,
16 | "types": [
17 | "bun-types" // add Bun global
18 | ]
19 | }
20 | }
21 |
--------------------------------------------------------------------------------
/examples/nextjs14/.gitignore:
--------------------------------------------------------------------------------
1 | # See https://help.github.com/articles/ignoring-files/ for more about ignoring files.
2 |
3 | # dependencies
4 | /node_modules
5 | /.pnp
6 | .pnp.js
7 | .yarn/install-state.gz
8 |
9 | # testing
10 | /coverage
11 |
12 | # next.js
13 | /.next/
14 | /out/
15 |
16 | # production
17 | /build
18 |
19 | # misc
20 | .DS_Store
21 | *.pem
22 |
23 | # debug
24 | npm-debug.log*
25 | yarn-debug.log*
26 | yarn-error.log*
27 |
28 | # local env files
29 | .env*.local
30 |
31 | # vercel
32 | .vercel
33 |
34 | # typescript
35 | *.tsbuildinfo
36 | next-env.d.ts
37 |
--------------------------------------------------------------------------------
/examples/nextjs14/app/globals.css:
--------------------------------------------------------------------------------
1 | @tailwind base;
2 | @tailwind components;
3 | @tailwind utilities;
4 |
5 | .message-enter {
6 | transform: translateX(-100%);
7 | opacity: 0;
8 | }
9 | .message-enter-active {
10 | transform: translateX(0);
11 | opacity: 1;
12 | transition: transform 500ms ease-in-out, opacity 500ms ease-in-out;
13 | }
14 | .message-exit {
15 | transform: translateX(0);
16 | opacity: 1;
17 | }
18 | .message-exit-active {
19 | transform: translateX(100%);
20 | opacity: 0;
21 | transition: transform 500ms ease-in-out, opacity 500ms ease-in-out;
22 | }
23 |
--------------------------------------------------------------------------------
/examples/nextjs14/tailwind.config.ts:
--------------------------------------------------------------------------------
1 | import type { Config } from "tailwindcss";
2 |
3 | const config: Config = {
4 | content: [
5 | "./pages/**/*.{js,ts,jsx,tsx,mdx}",
6 | "./components/**/*.{js,ts,jsx,tsx,mdx}",
7 | "./app/**/*.{js,ts,jsx,tsx,mdx}",
8 | ],
9 | theme: {
10 | extend: {
11 | backgroundImage: {
12 | "gradient-radial": "radial-gradient(var(--tw-gradient-stops))",
13 | "gradient-conic":
14 | "conic-gradient(from 180deg at 50% 50%, var(--tw-gradient-stops))",
15 | },
16 | },
17 | },
18 | plugins: [],
19 | };
20 | export default config;
21 |
--------------------------------------------------------------------------------
/.vscode/settings.json:
--------------------------------------------------------------------------------
1 | {
2 | "editor.formatOnSave": true,
3 | "git.autofetch": true,
4 | "editor.codeActionsOnSave": {
5 | "source.fixAll": "explicit"
6 | },
7 | "[html]": {
8 | "editor.defaultFormatter": "biomejs.biome"
9 | },
10 | "[json]": {
11 | "editor.defaultFormatter": "biomejs.biome"
12 | },
13 | "[javascript]": {
14 | "editor.defaultFormatter": "biomejs.biome"
15 | },
16 | "[jsonc]": {
17 | "editor.defaultFormatter": "biomejs.biome"
18 | },
19 | "[typescript]": {
20 | "editor.defaultFormatter": "biomejs.biome"
21 | },
22 | "editor.defaultFormatter": "biomejs.biome"
23 | }
24 |
--------------------------------------------------------------------------------
/examples/nextjs14/public/vercel.svg:
--------------------------------------------------------------------------------
1 |
--------------------------------------------------------------------------------
/examples/nextjs14/app/api/send/route.ts:
--------------------------------------------------------------------------------
1 | import { Queue } from "@upstash/queue";
2 | import { Redis } from "@upstash/redis";
3 |
4 | const redis = new Redis({
5 | url: process.env.UPSTASH_REDIS_REST_URL as string,
6 | token: process.env.UPSTASH_REDIS_REST_TOKEN as string,
7 | });
8 |
9 | export async function POST(req: Request) {
10 | const { queueName, message } = await req.json();
11 |
12 | const queue = new Queue({
13 | redis: redis,
14 | concurrencyLimit: 5,
15 | queueName: queueName,
16 | });
17 |
18 | const messageId = await queue.sendMessage({ message: message });
19 |
20 | return Response.json({ messageId });
21 | }
22 |
--------------------------------------------------------------------------------
/src/constants.ts:
--------------------------------------------------------------------------------
1 | export const DEFAULT_CONSUMER_GROUP_NAME = "Messages";
2 | export const DEFAULT_CONSUMER_PREFIX = "Consumer";
3 | export const DEFAULT_QUEUE_NAME = "Queue";
4 | export const DEFAULT_CONCURRENCY_LIMIT = 0;
5 | export const DEFAULT_AUTO_VERIFY = true;
6 | export const DEFAULT_VISIBILITY_TIMEOUT_IN_MS = 1000 * 30;
7 |
8 | export const MAX_CONCURRENCY_LIMIT = 5;
9 |
10 | export const ERROR_MAP = {
11 | CONCURRENCY_LIMIT_EXCEEDED: `Cannot receive more than ${MAX_CONCURRENCY_LIMIT}`,
12 | CONCURRENCY_DEFAULT_LIMIT_EXCEEDED: `Cannot receive more than ${
13 | DEFAULT_CONCURRENCY_LIMIT + 1
14 | }, due to concurrency limit option not being set`,
15 | } as const;
16 |
--------------------------------------------------------------------------------
/examples/nextjs14/app/layout.tsx:
--------------------------------------------------------------------------------
1 | import type { Metadata } from "next";
2 | import { Inter } from "next/font/google";
3 | import "./globals.css";
4 |
5 | const inter = Inter({ subsets: ["latin"] });
6 |
7 | export const metadata: Metadata = {
8 | title: "Create Next App",
9 | description: "Generated by create next app",
10 | };
11 |
12 | export default function RootLayout({
13 | children,
14 | }: Readonly<{
15 | children: React.ReactNode;
16 | }>) {
17 | return (
18 |
19 |
20 | {children}
21 |
22 |
23 | );
24 | }
25 |
--------------------------------------------------------------------------------
/examples/nextjs14/tsconfig.json:
--------------------------------------------------------------------------------
1 | {
2 | "compilerOptions": {
3 | "lib": ["dom", "dom.iterable", "esnext"],
4 | "allowJs": true,
5 | "skipLibCheck": true,
6 | "strict": true,
7 | "noEmit": true,
8 | "esModuleInterop": true,
9 | "module": "esnext",
10 | "moduleResolution": "bundler",
11 | "resolveJsonModule": true,
12 | "isolatedModules": true,
13 | "jsx": "preserve",
14 | "incremental": true,
15 | "plugins": [
16 | {
17 | "name": "next"
18 | }
19 | ],
20 | "paths": {
21 | "@/*": ["./*"]
22 | }
23 | },
24 | "include": ["next-env.d.ts", "**/*.ts", "**/*.tsx", ".next/types/**/*.ts"],
25 | "exclude": ["node_modules"]
26 | }
27 |
--------------------------------------------------------------------------------
/examples/nextjs14/package.json:
--------------------------------------------------------------------------------
1 | {
2 | "name": "nextjs14",
3 | "version": "0.1.0",
4 | "private": true,
5 | "scripts": {
6 | "dev": "next dev",
7 | "build": "next build",
8 | "start": "next start",
9 | "lint": "next lint"
10 | },
11 | "dependencies": {
12 | "@types/react-transition-group": "^4.4.10",
13 | "@upstash/queue": "^1.0.0",
14 | "next": "14.1.0",
15 | "react": "^18",
16 | "react-dom": "^18",
17 | "react-transition-group": "^4.4.5"
18 | },
19 | "devDependencies": {
20 | "typescript": "^5",
21 | "@types/node": "^20",
22 | "@types/react": "^18",
23 | "@types/react-dom": "^18",
24 | "autoprefixer": "^10.0.1",
25 | "postcss": "^8",
26 | "tailwindcss": "^3.3.0"
27 | }
28 | }
29 |
--------------------------------------------------------------------------------
/examples/nextjs14/app/api/receive/route.ts:
--------------------------------------------------------------------------------
1 | import { Queue } from "@upstash/queue";
2 | import { Redis } from "@upstash/redis";
3 |
4 | const redis = new Redis({
5 | url: process.env.UPSTASH_REDIS_REST_URL as string,
6 | token: process.env.UPSTASH_REDIS_REST_TOKEN as string,
7 | });
8 |
9 | type MessageBody = {
10 | message: string;
11 | };
12 | export async function POST(req: Request) {
13 | const { queueName } = await req.json();
14 |
15 | const queue = new Queue({
16 | redis: redis,
17 | concurrencyLimit: 5,
18 | queueName: queueName,
19 | });
20 |
21 | const receiveResponse = await queue.receiveMessage();
22 |
23 | return Response.json({
24 | id: receiveResponse?.streamId,
25 | message: receiveResponse?.body.message,
26 | });
27 | }
28 |
--------------------------------------------------------------------------------
/src/utils.test.ts:
--------------------------------------------------------------------------------
1 | import { describe, expect, test } from "bun:test";
2 | import { parseXclaimAutoResponse, parseXreadGroupResponse } from "./utils";
3 |
4 | describe("Response Parsers", () => {
5 | test("should parse XCLAIMAUTO response", () => {
6 | const input = ["0-0", [["1703754659687-0", ["messageBody", '{"hello":"world"}']]], []];
7 |
8 | expect(parseXclaimAutoResponse(input)).toEqual({
9 | streamId: "1703754659687-0",
10 | body: '{"hello":"world"}',
11 | });
12 | });
13 |
14 | test("should parse XREADGROUP response", () => {
15 | const input = [
16 | ["UpstashMQ:1251e0e7", [["1703755686316-0", ["messageBody", '{"hello":"world"}']]]],
17 | ];
18 |
19 | expect(parseXreadGroupResponse(input)).toEqual({
20 | streamId: "1703755686316-0",
21 | body: '{"hello":"world"}',
22 | });
23 | });
24 | });
25 |
--------------------------------------------------------------------------------
/.github/workflows/tests.yaml:
--------------------------------------------------------------------------------
1 | name: Tests
2 | on:
3 | push:
4 | branches:
5 | - master
6 | pull_request:
7 | schedule:
8 | - cron: "0 0 * * *" # daily
9 |
10 | env:
11 | UPSTASH_REDIS_REST_URL: ${{ secrets.UPSTASH_REDIS_REST_URL }}
12 | UPSTASH_REDIS_REST_TOKEN: ${{ secrets.UPSTASH_REDIS_REST_TOKEN }}
13 | jobs:
14 | test:
15 | runs-on: ubuntu-latest
16 | concurrency: test
17 |
18 | name: Tests
19 | steps:
20 | - name: Setup repo
21 | uses: actions/checkout@v3
22 |
23 | - name: Setup Bun
24 | uses: oven-sh/setup-bun@v1
25 | with:
26 | bun-version: latest
27 |
28 | - name: Install Dependencies
29 | run: bun install
30 |
31 | - name: Run Lint
32 | run: bun run fmt
33 |
34 | - name: Run tests
35 | run: bun run test
36 |
37 | - name: Run Build
38 | run: bun run build
39 |
--------------------------------------------------------------------------------
/biome.json:
--------------------------------------------------------------------------------
1 | {
2 | "$schema": "https://biomejs.dev/schemas/1.6.0/schema.json",
3 |
4 | "linter": {
5 | "enabled": true,
6 | "rules": {
7 | "recommended": true,
8 | "a11y": {
9 | "noSvgWithoutTitle": "off"
10 | },
11 | "correctness": {
12 | "noUnusedVariables": "error"
13 | },
14 | "security": {
15 | "noDangerouslySetInnerHtml": "off"
16 | },
17 | "style": {
18 | "useBlockStatements": "error",
19 | "noNonNullAssertion": "off"
20 | },
21 | "performance": {
22 | "noDelete": "off"
23 | },
24 | "suspicious": {
25 | "noExplicitAny": "off"
26 | }
27 | },
28 | "ignore": ["node_modules", "dist"]
29 | },
30 | "formatter": {
31 | "indentStyle": "space",
32 | "indentWidth": 2,
33 | "enabled": true,
34 | "lineWidth": 100,
35 | "ignore": ["node_modules", "dist"]
36 | },
37 | "organizeImports": {
38 | "enabled": true
39 | },
40 | "javascript": {
41 | "formatter": {
42 | "trailingComma": "es5"
43 | }
44 | }
45 | }
46 |
--------------------------------------------------------------------------------
/.github/workflows/release.yaml:
--------------------------------------------------------------------------------
1 | name: Release
2 |
3 | on:
4 | release:
5 | types:
6 | - published
7 |
8 | jobs:
9 | release:
10 | name: Release
11 | runs-on: ubuntu-latest
12 | steps:
13 | - name: Checkout Repo
14 | uses: actions/checkout@v3
15 |
16 | - name: Setup Bun
17 | uses: oven-sh/setup-bun@v1
18 | with:
19 | bun-version: latest
20 |
21 | - name: Set env
22 | run: echo "VERSION=${GITHUB_REF#refs/*/}" >> $GITHUB_ENV
23 |
24 | - name: Set package version
25 | run: echo $(jq --arg v "${{ env.VERSION }}" '(.version) = $v' package.json) > package.json
26 |
27 | - name: Install dependencies
28 | run: bun install
29 |
30 | - name: Build
31 | run: bun run build
32 |
33 | - name: Add npm token
34 | run: echo "//registry.npmjs.org/:_authToken=${{secrets.NPM_TOKEN}}" > .npmrc
35 |
36 | - name: Publish release candidate
37 | if: "github.event.release.prerelease"
38 | run: npm publish --access public --tag=canary --no-git-checks
39 |
40 | - name: Publish
41 | if: "!github.event.release.prerelease"
42 | run: npm publish --access public --no-git-checks
43 |
--------------------------------------------------------------------------------
/package.json:
--------------------------------------------------------------------------------
1 | {
2 | "name": "@upstash/queue",
3 | "description": "A message queue based on Redis streams, without dependencies.",
4 | "module": "./dist/index.mjs",
5 | "main": "./dist/index.js",
6 | "types": "./dist/index.d.ts",
7 | "version": "1.0.0",
8 | "keywords": ["redis", "stream", "upstash", "message", "queue"],
9 | "author": "Oguzhan Olguncu ",
10 | "repository": {
11 | "type": "git",
12 | "url": "https://github.com/upstash/queue"
13 | },
14 | "license": "MIT",
15 | "files": ["dist"],
16 | "bugs": {
17 | "url": "https://github.com/upstash/queue/issues"
18 | },
19 | "scripts": {
20 | "test": "bun test src --coverage --bail --timeout 20000",
21 | "fmt": "bunx biome check --apply ./src",
22 | "build": "tsup",
23 | "prepare": "husky install | chmod ug+x .husky/*"
24 | },
25 | "dependencies": {
26 | "@upstash/redis": "1.29.0",
27 | "uuid": "^9.0.1"
28 | },
29 | "devDependencies": {
30 | "@types/uuid": "^9.0.8",
31 | "typescript": "latest",
32 | "@biomejs/biome": "1.6.0",
33 | "bun-types": "latest",
34 | "husky": "^9.0.11",
35 | "tsup": "latest",
36 | "@commitlint/cli": "^18.6.0",
37 | "@commitlint/config-conventional": "^18.6.0"
38 | }
39 | }
40 |
--------------------------------------------------------------------------------
/examples/nextjs14/public/next.svg:
--------------------------------------------------------------------------------
1 |
--------------------------------------------------------------------------------
/examples/nextjs14/README.md:
--------------------------------------------------------------------------------
1 | This is a [Next.js](https://nextjs.org/) project bootstrapped with [`create-next-app`](https://github.com/vercel/next.js/tree/canary/packages/create-next-app).
2 |
3 | ## Getting Started
4 |
5 | First, run the development server:
6 |
7 | ```bash
8 | npm run dev
9 | # or
10 | yarn dev
11 | # or
12 | pnpm dev
13 | # or
14 | bun dev
15 | ```
16 |
17 | Open [http://localhost:3000](http://localhost:3000) with your browser to see the result.
18 |
19 | You can start editing the page by modifying `app/page.tsx`. The page auto-updates as you edit the file.
20 |
21 | This project uses [`next/font`](https://nextjs.org/docs/basic-features/font-optimization) to automatically optimize and load Inter, a custom Google Font.
22 |
23 | ## Learn More
24 |
25 | To learn more about Next.js, take a look at the following resources:
26 |
27 | - [Next.js Documentation](https://nextjs.org/docs) - Learn about Next.js features and API.
28 | - [Learn Next.js](https://nextjs.org/learn) - An interactive Next.js tutorial.
29 |
30 | You can check out [the Next.js GitHub repository](https://github.com/vercel/next.js/) - Your feedback and contributions are welcome!
31 |
32 | ## Deploy on Vercel
33 |
34 | The easiest way to deploy your Next.js app is to use the [Vercel Platform](https://vercel.com/new?utm_medium=default-template&filter=next.js&utm_source=create-next-app&utm_campaign=create-next-app-readme) from the creators of Next.js.
35 |
36 | Check out our [Next.js deployment documentation](https://nextjs.org/docs/deployment) for more details.
--------------------------------------------------------------------------------
/commitlint.config.js:
--------------------------------------------------------------------------------
1 | // build: Changes that affect the build system or external dependencies (example scopes: gulp, broccoli, npm)
2 | // ci: Changes to our CI configuration files and scripts (example scopes: Travis, Circle, BrowserStack, SauceLabs)
3 | // docs: Documentation only changes
4 | // feat: A new feature
5 | // fix: A bug fix
6 | // perf: A code change that improves performance
7 | // refactor: A code change that neither fixes a bug nor adds a feature
8 | // style: Changes that do not affect the meaning of the code (white-space, formatting, missing semi-colons, etc)
9 | // test: Adding missing tests or correcting existing tests
10 |
11 | module.exports = {
12 | extends: ["@commitlint/config-conventional"],
13 | rules: {
14 | "body-leading-blank": [1, "always"],
15 | "body-max-line-length": [2, "always", 100],
16 | "footer-leading-blank": [1, "always"],
17 | "footer-max-line-length": [2, "always", 100],
18 | "header-max-length": [2, "always", 100],
19 | "scope-case": [2, "always", "lower-case"],
20 | "subject-case": [
21 | 2,
22 | "never",
23 | ["sentence-case", "start-case", "pascal-case", "upper-case"],
24 | ],
25 | "subject-empty": [2, "never"],
26 | "subject-full-stop": [2, "never", "."],
27 | "type-case": [2, "always", "lower-case"],
28 | "type-empty": [2, "never"],
29 | "type-enum": [
30 | 2,
31 | "always",
32 | [
33 | "build",
34 | "chore",
35 | "ci",
36 | "docs",
37 | "feat",
38 | "fix",
39 | "perf",
40 | "refactor",
41 | "revert",
42 | "style",
43 | "test",
44 | "translation",
45 | "security",
46 | "changeset",
47 | ],
48 | ],
49 | },
50 | };
51 |
--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------
1 | # Based on https://raw.githubusercontent.com/github/gitignore/main/Node.gitignore
2 |
3 | # Logs
4 |
5 | logs
6 | _.log
7 | npm-debug.log_
8 | yarn-debug.log*
9 | yarn-error.log*
10 | lerna-debug.log*
11 | .pnpm-debug.log*
12 |
13 | # Diagnostic reports (https://nodejs.org/api/report.html)
14 |
15 | report.[0-9]_.[0-9]_.[0-9]_.[0-9]_.json
16 |
17 | # Runtime data
18 |
19 | pids
20 | _.pid
21 | _.seed
22 | \*.pid.lock
23 |
24 | # Directory for instrumented libs generated by jscoverage/JSCover
25 |
26 | lib-cov
27 |
28 | # Coverage directory used by tools like istanbul
29 |
30 | coverage
31 | \*.lcov
32 |
33 | # nyc test coverage
34 |
35 | .nyc_output
36 |
37 | # Grunt intermediate storage (https://gruntjs.com/creating-plugins#storing-task-files)
38 |
39 | .grunt
40 |
41 | # Bower dependency directory (https://bower.io/)
42 |
43 | bower_components
44 |
45 | # node-waf configuration
46 |
47 | .lock-wscript
48 |
49 | # Compiled binary addons (https://nodejs.org/api/addons.html)
50 |
51 | build/Release
52 |
53 | # Dependency directories
54 |
55 | node_modules/
56 | jspm_packages/
57 |
58 | # Snowpack dependency directory (https://snowpack.dev/)
59 |
60 | web_modules/
61 |
62 | # TypeScript cache
63 |
64 | \*.tsbuildinfo
65 |
66 | # Optional npm cache directory
67 |
68 | .npm
69 |
70 | # Optional eslint cache
71 |
72 | .eslintcache
73 |
74 | # Optional stylelint cache
75 |
76 | .stylelintcache
77 |
78 | # Microbundle cache
79 |
80 | .rpt2_cache/
81 | .rts2_cache_cjs/
82 | .rts2_cache_es/
83 | .rts2_cache_umd/
84 |
85 | # Optional REPL history
86 |
87 | .node_repl_history
88 |
89 | # Output of 'npm pack'
90 |
91 | \*.tgz
92 |
93 | # Yarn Integrity file
94 |
95 | .yarn-integrity
96 |
97 | # dotenv environment variable files
98 |
99 | .env
100 | .env.development.local
101 | .env.test.local
102 | .env.production.local
103 | .env.local
104 |
105 | # parcel-bundler cache (https://parceljs.org/)
106 |
107 | .cache
108 | .parcel-cache
109 |
110 | # Next.js build output
111 |
112 | .next
113 | out
114 |
115 | # Nuxt.js build / generate output
116 |
117 | .nuxt
118 | dist
119 |
120 | # Gatsby files
121 |
122 | .cache/
123 |
124 | # Comment in the public line in if your project uses Gatsby and not Next.js
125 |
126 | # https://nextjs.org/blog/next-9-1#public-directory-support
127 |
128 | # public
129 |
130 | # vuepress build output
131 |
132 | .vuepress/dist
133 |
134 | # vuepress v2.x temp and cache directory
135 |
136 | .temp
137 | .cache
138 |
139 | # Docusaurus cache and generated files
140 |
141 | .docusaurus
142 |
143 | # Serverless directories
144 |
145 | .serverless/
146 |
147 | # FuseBox cache
148 |
149 | .fusebox/
150 |
151 | # DynamoDB Local files
152 |
153 | .dynamodb/
154 |
155 | # TernJS port file
156 |
157 | .tern-port
158 |
159 | # Stores VSCode versions used for testing VSCode extensions
160 |
161 | .vscode-test
162 |
163 | # yarn v2
164 |
165 | .yarn/cache
166 | .yarn/unplugged
167 | .yarn/build-state.yml
168 | .yarn/install-state.gz
169 | .pnp.\*
170 |
171 | # IntelliJ based IDEs
172 | .idea
173 |
174 | # Finder (MacOS) folder config
175 | .DS_Store
176 |
177 |
--------------------------------------------------------------------------------
/src/utils.ts:
--------------------------------------------------------------------------------
1 | import { v4 as generateRandomUUID } from "uuid";
2 |
3 | export const MQ_PREFIX = "UpstashMQ";
4 |
5 | export const formatMessageQueueKey = (queueName: string) => {
6 | return `${MQ_PREFIX}:${queueName}`;
7 | };
8 |
9 | export const delay = (duration: number): Promise => {
10 | return new Promise((resolve) => {
11 | setTimeout(resolve, duration);
12 | });
13 | };
14 |
15 | export const generateRandomConsumerName = (): string => {
16 | return `consumer-${generateRandomUUID()}`;
17 | };
18 |
19 | export type ParsedStreamMessage = {
20 | streamId: string;
21 | body: TStreamResult;
22 | } | null;
23 |
24 | type StartId = string;
25 | type GroupName = string;
26 | type StreamId = string;
27 | type MessageBody = [string, string];
28 |
29 | type XreadGroupStreamArray = [[GroupName, Array<[StreamId, MessageBody]>]];
30 |
31 | /**
32 | * Parses the result of a Redis XREADGROUP response, extracting relevant information.
33 | *
34 | * @param {unknown[]} streamArray - The array representing the Redis XREADGROUP response.
35 | * Example structure: [["UpstashMQ:1251e0e7",[["1703755686316-0", ["messageBody", '{"hello":"world"}']]]]]
36 | *
37 | * @returns {ParsedStreamMessage | null} - Parsed result containing the stream ID
38 | * and parsed message body, or null if parsing fails.
39 | */
40 | export const parseXreadGroupResponse = (
41 | streamArray: unknown[]
42 | ): ParsedStreamMessage => {
43 | const typedStream = streamArray as XreadGroupStreamArray;
44 |
45 | const streamData = typedStream?.[0];
46 | const firstMessage = streamData?.[1]?.[0];
47 | const streamId = firstMessage?.[0];
48 | const messageBodyString = firstMessage?.[1]?.[1];
49 |
50 | if (!streamId || !messageBodyString) {
51 | return null;
52 | }
53 |
54 | return {
55 | streamId: streamId,
56 | body: messageBodyString as any,
57 | };
58 | };
59 |
60 | type XclaimAutoRedisStreamArray = [StartId, Array<[StreamId, MessageBody]>];
61 |
62 | /**
63 | * Parses the result of a Redis XCLAIM response with automatic stream creation,
64 | * extracting relevant information.
65 | *
66 | * @param {unknown[]} streamArray - The array representing the Redis XCLAIM response.
67 | * Example structure: ["0-0",[["1703754659687-0", ["messageBody", '{"hello":"world"}']]],[]]
68 | *
69 | * @returns {ParsedStreamMessage | null} - Parsed result containing the stream ID
70 | * and parsed message body, or null if parsing fails.
71 | */
72 | export const parseXclaimAutoResponse = (
73 | streamArray: unknown[]
74 | ): ParsedStreamMessage => {
75 | const typedStream = streamArray as XclaimAutoRedisStreamArray;
76 | const firstMessage = typedStream?.[1]?.[0];
77 | const streamId = firstMessage?.[0];
78 | const messageBodyString = firstMessage?.[1]?.[1];
79 |
80 | if (!streamId || !messageBodyString) {
81 | return null;
82 | }
83 |
84 | return {
85 | streamId,
86 | body: messageBodyString as any,
87 | };
88 | };
89 |
90 | /**
91 | * Asserts that the provided data is non-null and non-undefined.
92 | * If the assertion fails, an error with the specified message is thrown.
93 | *
94 | * @param {T} data - The data to assert as non-nullable.
95 | * @param {string} message - The error message to throw if the assertion fails.
96 | * @throws {Error} Throws an error if the assertion fails.
97 | * @returns {asserts data is NonNullable} - Type assertion indicating that the data is non-nullable.
98 | */
99 | export function invariant(data: T, message: string): asserts data is NonNullable {
100 | if (!data) {
101 | throw new Error(message);
102 | }
103 | }
104 |
--------------------------------------------------------------------------------
/examples/nextjs14/app/page.tsx:
--------------------------------------------------------------------------------
1 | "use client";
2 | import { useState } from "react";
3 |
4 | import { CSSTransition, TransitionGroup } from "react-transition-group";
5 | import * as uuid from "uuid";
6 |
7 | type MessageBody = {
8 | content: string;
9 | };
10 |
11 | type MessageWithId = {
12 | id: string;
13 | body: MessageBody;
14 | };
15 |
16 | const queueName = `queue-${uuid.v4().substring(0, 18)}`;
17 |
18 | export default function Home() {
19 | const [messageInput, setMessageInput] = useState(uuid.v4().substring(0, 18));
20 | const [queueMessages, setQueueMessages] = useState([]);
21 |
22 | const send = async () => {
23 | fetch("/api/send", {
24 | method: "POST",
25 | body: JSON.stringify({ message: messageInput, queueName: queueName }),
26 | }).then(async (res) => {
27 | const data = await res.json();
28 | if (data.messageId) {
29 | setQueueMessages([
30 | ...queueMessages,
31 | {
32 | id: data.messageId,
33 | body: { content: messageInput },
34 | } as MessageWithId,
35 | ]);
36 | }
37 | });
38 | };
39 |
40 | const receive = async () => {
41 | fetch("/api/receive", {
42 | method: "POST",
43 | body: JSON.stringify({ queueName: queueName }),
44 | }).then(async (res) => {
45 | const data = await res.json();
46 |
47 | if (data.id && data.message) {
48 | setQueueMessages(queueMessages.filter((msg) => msg.id !== data.id));
49 | }
50 | });
51 | };
52 | return (
53 |
54 |
55 |
56 | Welcome to @upstash/queue
57 |
58 |
59 |
60 | This is an example of how to use @upstash/queue as a FIFO queue in your Next.js
61 | application.
62 |
63 |
64 |
65 | You can create and consume messages from the queue using the buttons.
66 |
67 |
68 |
69 |
70 |
71 |
72 |
Queue
73 |
74 | {queueMessages.length > 0 &&
75 | queueMessages.map((message, index) => {
76 | return (
77 |
78 |
79 |
80 | {index}
81 |
82 |
83 |
84 |
85 | ID: {message.id}
86 |
87 |
88 |
89 | Content:{" "}
90 | {message.body.content}
91 |
92 |
93 |
94 |
95 | );
96 | })}
97 |
98 | {queueMessages.length === 0 &&
No messages in the queue...
}
99 |
100 |
101 |
102 |
103 |
104 |
105 |
106 | setMessageInput(e.target.value)}
112 | />
113 |
123 |
124 |
125 |
134 |
135 |
136 |
145 |
146 |
147 | );
148 | }
149 |
--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------
1 | # Upstash Queue
2 | ·  [](https://github.com/upstash/queue/actions/workflows/tests.yaml)   
3 |
4 |
5 | > [!NOTE]
6 | > **This project is a Community Project.**
7 | >
8 | > The project is maintained and supported by the community. Upstash may contribute but does not officially support or assume responsibility for it.
9 |
10 |
11 |
12 | A simple, fast, robust, stream-based message queue for Node.js, backed by Upstash Redis, inspired by AWS SQS.
13 |
14 | - Simple: ~350 LOC, and single dependency.
15 | - Lightweight: Under ~5kb zipped.
16 | - Fast: Maximizes throughput by minimizing Redis and network overhead. Benchmarks well.
17 | - Robust: Designed with concurrency and failure in mind; full code coverage.
18 |
19 | ```ts
20 | import { Redis } from "@upstash/redis";
21 | import { Queue } from "@upstash/queue";
22 |
23 | const queue = new Queue({ redis: new Redis() });
24 |
25 | await queue.sendMessage({ hello: "world1" });
26 |
27 | const message1 = await queue.receiveMessage<{ hello: "world1" }>();
28 | expect(message1?.body).toEqual({ hello: "world1" });
29 | ```
30 |
31 | ## Introduction
32 |
33 | `@upstash/queue` is a Node.js library that provides a simple and efficient way to implement a message queue system using Redis streams. It offers features such as message sending, receiving, automatic message verification, and concurrency control. This library is particularly useful for building distributed systems and background job processing.
34 |
35 | ## Why Upstash Queue?
36 |
37 | Upstash Queue brings the simplicity and performance of Redis streams to Node.js developers, making it easy to integrate a robust message queue system into their applications. Whether you're working on distributed systems, background job processing, or other asynchronous workflows, Upstash Queue provides essential features such as message sending, receiving, automatic verification, and concurrency control.
38 |
39 | **Key Features:**
40 |
41 | - **Efficiency:** Leverages the speed and reliability of Redis streams for message handling.
42 | - **Simplicity:** Dead simple implementation of distributed systems and background job processing with a clean and intuitive API.
43 | - **Concurrency Control:** Easily manages concurrent message processing to optimize system performance.
44 | - **Automatic Verification:** Benefits from built-in message verification mechanisms for reliable and secure communication.
45 |
46 | ## Table of Contents
47 |
48 | - [Installation](#installation)
49 | - [Getting Started](#getting-started)
50 | - [Sending a Message](#sending-a-message)
51 | - [Receiving a Message](#receiving-a-message)
52 | - [Verifying a Message](#verifying-a-message)
53 | - [Configuration Options](#configuration-options)
54 | - [Examples](#examples)
55 | - [FIFO Example](#fifo-example)
56 | - [Sending Message with Delay then Poll](#sending-message-with-delay-then-poll)
57 | - [Manual Verification](#manual-verification)
58 | - [Concurrent Message Processing/Consuming](#concurrent-message-processingconsuming)
59 |
60 | ## Installation
61 |
62 | To start using Upstash Queue, install the library via npm:
63 |
64 | ```sh
65 | npm install @upstash/queue
66 | ```
67 |
68 | ## Getting Started
69 |
70 | ```typescript
71 | import Redis from "@upstash/redis";
72 | import { Queue } from "@upstash/queue";
73 |
74 | const redis = new Redis();
75 | const queue = new Queue({ redis });
76 | ```
77 |
78 | ### Sending a Message
79 |
80 | ```typescript
81 | const payload = { key: "value" };
82 | const delayInSeconds = 0; // Set to 0 for immediate delivery.
83 | const streamId = await queue.sendMessage(payload, delayInSeconds);
84 | ```
85 |
86 | ### Receiving a Message
87 |
88 | ```typescript
89 | const pollingForNewMessages = 1000; // Set to 0 for non-blocking, otherwise, it will try to get a message then fail if none is available.
90 | const receivedMessage = await queue.receiveMessage(pollingForNewMessages);
91 | console.log("Received Message:", receivedMessage);
92 | ```
93 |
94 | ### Verifying a Message
95 |
96 | ```typescript
97 | const streamId = "some_stream_id";
98 | const verificationStatus = await queue.verifyMessage(streamId);
99 | console.log("Verification Status:", verificationStatus);
100 | ```
101 |
102 | ### Configuration Options
103 |
104 | When initializing the Queue instance, you can provide various configuration options:
105 |
106 | - redis: Redis client instance (required).
107 | - queueName: Name of the Redis stream (default: "UpstashMQ:Queue").
108 | - concurrencyLimit: Maximum concurrent message processing allowed (default: 1).
109 | - autoVerify: Auto-verify received messages (default: true).
110 | - consumerGroupName: Group that holds consumers when automatically created (default: "Messages").
111 | - visibilityTimeout: Time until recently sent messages become visible to other consumers (default: 30 seconds).
112 |
113 | #### Verifying a Message
114 |
115 | ```typescript
116 | const queueConfig = {
117 | redis: new Redis(),
118 | queueName: "MyCustomQueue",
119 | concurrencyLimit: 2,
120 | autoVerify: false,
121 | consumerGroupName: "MyConsumers",
122 | visibilityTimeout: 60000, // 1 minute.
123 | };
124 |
125 | const customQueue = new Queue(queueConfig);
126 | ```
127 |
128 | ## Examples
129 |
130 | ### FIFO Example
131 |
132 | ```typescript
133 | const queue = new Queue({ redis });
134 | await queue.sendMessage({ hello: "world1" });
135 | await delay(100);
136 | await queue.sendMessage({ hello: "world2" });
137 | await delay(100);
138 | await queue.sendMessage({ hello: "world3" });
139 |
140 | const message1 = await queue.receiveMessage<{ hello: "world1" }>(); // Logs out { hello: "world1" }.
141 |
142 | const message2 = await queue.receiveMessage<{ hello: "world2" }>(); // Logs out { hello: "world2" }.
143 |
144 | const message3 = await queue.receiveMessage<{ hello: "world3" }>(); // Logs out { hello: "world3" }.
145 | ```
146 |
147 | ### Sending Message with Delay then Poll
148 |
149 | ```typescript
150 | const fakeValue = randomValue();
151 | const queueName = "app-logs";
152 |
153 | const producer = new Queue({ redis, queueName });
154 | const consumer = new Queue({
155 | redis: new Redis(),
156 | queueName,
157 | });
158 | await producer.sendMessage(
159 | {
160 | dev: fakeValue,
161 | },
162 | 2000
163 | );
164 |
165 | const receiveMessageRes = await consumer.receiveMessage<{
166 | dev: string;
167 | }>(5000);
168 | ```
169 |
170 | ### Manual Verification
171 |
172 | ```typescript
173 | const queue = new Queue({ redis, autoVerify: false });
174 | await queue.sendMessage({ hello: "world" });
175 |
176 | const message = await queue.receiveMessage<{ hello: "world" }>(); // Logs out { hello: "world" }.
177 | if (message) {
178 | await queue.verifyMessage(message.streamId); //Logs out "VERIFIED" or "NOT VERIFIED".
179 | }
180 | ```
181 |
182 | ### Concurrent Message Processing/Consuming
183 |
184 | If `concurrencyLimit` is not set, one of the `receiveMessage()` will throw. You need to explicitly set the concurrencyLimit. Default is 1.
185 |
186 | ```typescript
187 | const queue = new Queue({
188 | redis: new Redis(),
189 | queueName: randomValue(),
190 | concurrencyLimit: 2,
191 | });
192 |
193 | await queue.sendMessage({
194 | dev: randomValue(),
195 | });
196 |
197 | await queue.sendMessage({
198 | dev: randomValue(),
199 | });
200 |
201 | await Promise.all([queue.receiveMessage(), queue.receiveMessage()]);
202 | ```
203 |
--------------------------------------------------------------------------------
/src/queue.ts:
--------------------------------------------------------------------------------
1 | import { v4 as generateRandomUUID } from "uuid";
2 |
3 | import {
4 | DEFAULT_AUTO_VERIFY,
5 | DEFAULT_CONCURRENCY_LIMIT,
6 | DEFAULT_CONSUMER_GROUP_NAME,
7 | DEFAULT_QUEUE_NAME,
8 | DEFAULT_VISIBILITY_TIMEOUT_IN_MS,
9 | ERROR_MAP,
10 | MAX_CONCURRENCY_LIMIT,
11 | } from "./constants";
12 | import {
13 | type ParsedStreamMessage,
14 | formatMessageQueueKey,
15 | invariant,
16 | parseXclaimAutoResponse,
17 | parseXreadGroupResponse,
18 | } from "./utils";
19 |
20 | import type { Redis } from "@upstash/redis";
21 |
22 | export type QueueConfig = {
23 | redis: Redis;
24 | /**
25 | * Queue name for the redis stream
26 | * @default "UpstashMQ:Queue"
27 | */
28 | queueName?: string;
29 | /**
30 | * The maximum number of concurrent message processing allowed.
31 | * @default 1
32 | */
33 | concurrencyLimit?: 0 | 1 | 2 | 3 | 4 | 5;
34 | /**
35 | * Auto verifies received messages. If not set message will be picked up by some other consumer after visiblityTimeout.
36 | * @default true
37 | */
38 | autoVerify?: boolean;
39 | /**
40 | * This is the group that holds every other consumer when automatically created.
41 | * @default "Messages"
42 | */
43 | consumerGroupName?: string;
44 | /**
45 | * Recently sent messages won't be visible to other consumers until this period of time. If no one else acknowledges it it will be picked up by others.
46 | * @default "30 seconds"
47 | */
48 | visibilityTimeout?: number;
49 | };
50 |
51 | export class Queue {
52 | config: QueueConfig;
53 | concurrencyCounter = DEFAULT_CONCURRENCY_LIMIT;
54 | hasConsumerGroupInitialized = false;
55 |
56 | private messageTimeouts = new Set();
57 |
58 | constructor(config: QueueConfig) {
59 | this.config = {
60 | redis: config.redis,
61 |
62 | concurrencyLimit: config.concurrencyLimit ?? DEFAULT_CONCURRENCY_LIMIT,
63 | autoVerify: config.autoVerify ?? DEFAULT_AUTO_VERIFY,
64 | consumerGroupName: config.consumerGroupName ?? DEFAULT_CONSUMER_GROUP_NAME,
65 | queueName: config.queueName
66 | ? this.appendPrefixTo(config.queueName)
67 | : this.appendPrefixTo(DEFAULT_QUEUE_NAME),
68 | visibilityTimeout: config.visibilityTimeout ?? DEFAULT_VISIBILITY_TIMEOUT_IN_MS,
69 | };
70 | }
71 |
72 | private appendPrefixTo(key: string) {
73 | return formatMessageQueueKey(key);
74 | }
75 |
76 | private async initializeConsumerGroup() {
77 | if (this.hasConsumerGroupInitialized) {
78 | return;
79 | }
80 |
81 | invariant(
82 | this.config.consumerGroupName,
83 | "Consumer group name cannot be empty when initializing consumer group"
84 | );
85 | invariant(this.config.queueName, "Queue name cannot be empty when initializing consumer group");
86 |
87 | try {
88 | await this.config.redis.xgroup(this.config.queueName, {
89 | type: "CREATE",
90 | group: this.config.consumerGroupName,
91 | id: "$",
92 | options: { MKSTREAM: true },
93 | });
94 | this.hasConsumerGroupInitialized = true;
95 | } catch (e) {
96 | if (e instanceof Error) {
97 | if (e.message.includes("BUSYGROUP Consumer Group name already exists")) {
98 | this.hasConsumerGroupInitialized = true;
99 | return;
100 | }
101 | }
102 |
103 | return;
104 | }
105 | }
106 |
107 | async sendMessage(payload: T, delayMs = 0) {
108 | const { redis } = this.config;
109 | await this.initializeConsumerGroup();
110 | try {
111 | const streamKey = this.config.queueName;
112 | invariant(streamKey, "Queue name cannot be empty when sending a message");
113 |
114 | const _sendMessage = () =>
115 | redis.xadd(streamKey, "*", {
116 | messageBody: payload,
117 | });
118 |
119 | if (delayMs > 0) {
120 | let streamIdResult: string | null = null;
121 |
122 | const timeoutId = setTimeout(() => {
123 | _sendMessage().then((res) => {
124 | streamIdResult = res;
125 | this.messageTimeouts.delete(timeoutId);
126 | });
127 | }, delayMs);
128 | this.messageTimeouts.add(timeoutId);
129 | return streamIdResult;
130 | }
131 | return await _sendMessage();
132 | } catch (error) {
133 | console.error("Error in sendMessage:", error);
134 | return null;
135 | }
136 | }
137 |
138 | async receiveMessage(blockTimeMs = 0) {
139 | this.checkIfReceiveMessageAllowed();
140 | await this.initializeConsumerGroup();
141 |
142 | const xclaimParsedMessage = await this.claimStuckPendingMessageAndVerify();
143 | if (xclaimParsedMessage) {
144 | return xclaimParsedMessage;
145 | }
146 |
147 | // Claiming failed, fallback to default read message
148 | return await this.readAndVerifyPendingMessage(blockTimeMs);
149 | }
150 |
151 | private checkIfReceiveMessageAllowed() {
152 | const { concurrencyLimit } = this.config;
153 |
154 | const concurrencyNotSetAndAboveDefaultLimit =
155 | concurrencyLimit === DEFAULT_CONCURRENCY_LIMIT &&
156 | this.concurrencyCounter >= DEFAULT_CONCURRENCY_LIMIT + 1;
157 |
158 | const concurrencyAboveTheMaxLimit = this.concurrencyCounter > MAX_CONCURRENCY_LIMIT;
159 |
160 | if (concurrencyNotSetAndAboveDefaultLimit) {
161 | throw new Error(ERROR_MAP.CONCURRENCY_DEFAULT_LIMIT_EXCEEDED);
162 | }
163 | this.incrementConcurrencyCount();
164 |
165 | if (concurrencyAboveTheMaxLimit) {
166 | throw new Error(ERROR_MAP.CONCURRENCY_LIMIT_EXCEEDED);
167 | }
168 | }
169 |
170 | private async claimStuckPendingMessageAndVerify(): Promise<
171 | ParsedStreamMessage
172 | > {
173 | const { autoVerify } = this.config;
174 | const consumerName = this.generateRandomConsumerName();
175 |
176 | const xclaimParsedMessage = await this.claimAndParseMessage(consumerName);
177 |
178 | if (xclaimParsedMessage && autoVerify) {
179 | await this.verifyMessage(xclaimParsedMessage.streamId);
180 | }
181 |
182 | if (xclaimParsedMessage == null) {
183 | await this.removeEmptyConsumer(consumerName);
184 | }
185 |
186 | return xclaimParsedMessage;
187 | }
188 |
189 | private async removeEmptyConsumer(consumer: string) {
190 | const { redis, consumerGroupName, queueName } = this.config;
191 | invariant(consumerGroupName, "Consumer group name cannot be empty when removing a consumer");
192 | invariant(queueName, "Queue name cannot be empty when removing a consumer");
193 |
194 | await redis.xgroup(queueName, {
195 | type: "DELCONSUMER",
196 | consumer,
197 | group: consumerGroupName,
198 | });
199 | }
200 |
201 | private async claimAndParseMessage(
202 | consumerName: string
203 | ): Promise> {
204 | const xclaimRes = await this.autoClaim(consumerName);
205 | return parseXclaimAutoResponse(xclaimRes);
206 | }
207 |
208 | private async autoClaim(consumerName: string) {
209 | const { redis, consumerGroupName, queueName, visibilityTimeout } = this.config;
210 | invariant(consumerGroupName, "Consumer group name cannot be empty when receiving a message");
211 | invariant(queueName, "Queue name cannot be empty when receving a message");
212 | invariant(visibilityTimeout, "Visibility timeout name cannot be empty when receving a message");
213 |
214 | return await redis.xautoclaim(
215 | queueName,
216 | consumerGroupName,
217 | consumerName,
218 | visibilityTimeout,
219 | "0-0",
220 | { count: 1 }
221 | );
222 | }
223 |
224 | private async readAndVerifyPendingMessage(
225 | blockTimeMs: number
226 | ): Promise> {
227 | const { autoVerify } = this.config;
228 |
229 | const parsedXreadMessage = await this.readAndParseMessage(blockTimeMs);
230 |
231 | if (parsedXreadMessage && autoVerify) {
232 | await this.verifyMessage(parsedXreadMessage.streamId);
233 | }
234 |
235 | return parsedXreadMessage;
236 | }
237 |
238 | async readAndParseMessage(
239 | blockTimeMs: number
240 | ): Promise | null> {
241 | const consumerName = this.generateRandomConsumerName();
242 |
243 | const xreadRes =
244 | blockTimeMs > 0
245 | ? await this.receiveBlockingMessage(blockTimeMs, consumerName)
246 | : await this.receiveNonBlockingMessage(consumerName);
247 |
248 | return parseXreadGroupResponse(xreadRes);
249 | }
250 |
251 | private async receiveBlockingMessage(blockTimeMs: number, consumerName: string) {
252 | const { redis, consumerGroupName, queueName } = this.config;
253 | invariant(consumerGroupName, "Consumer group name cannot be empty when receiving a message");
254 | invariant(queueName, "Queue name cannot be empty when receving a message");
255 |
256 | return await redis.xreadgroup(consumerGroupName, consumerName, queueName, ">", {
257 | count: 1,
258 | blockMS: blockTimeMs,
259 | });
260 | }
261 |
262 | private async receiveNonBlockingMessage(consumerName: string) {
263 | const { redis, consumerGroupName, queueName } = this.config;
264 | invariant(consumerGroupName, "Consumer group name cannot be empty when receiving a message");
265 | invariant(queueName, "Queue name cannot be empty when receving a message");
266 |
267 | const data = await redis.xreadgroup(consumerGroupName, consumerName, queueName, ">", {
268 | count: 1,
269 | });
270 | return data;
271 | }
272 |
273 | async verifyMessage(streamId: string): Promise<"VERIFIED" | "NOT VERIFIED"> {
274 | const { redis } = this.config;
275 | await this.initializeConsumerGroup();
276 | this.decrementConcurrencyCount();
277 |
278 | try {
279 | invariant(
280 | this.config.consumerGroupName,
281 | "Consumer group name cannot be empty when verifying a message"
282 | );
283 |
284 | invariant(this.config.queueName, "Queue name cannot be empty when verifying a message");
285 |
286 | const xackRes = await redis.xack(
287 | this.config.queueName,
288 | this.config.consumerGroupName,
289 | streamId
290 | );
291 | if (typeof xackRes === "number" && xackRes > 0) {
292 | return "VERIFIED";
293 | }
294 | return "NOT VERIFIED";
295 | } catch (finalError) {
296 | console.error(
297 | `Final attempt to acknowledge message failed: ${(finalError as Error).message}`
298 | );
299 | return "NOT VERIFIED";
300 | }
301 | }
302 |
303 | private generateRandomConsumerName = () => {
304 | if (this.concurrencyCounter > MAX_CONCURRENCY_LIMIT) {
305 | throw new Error(ERROR_MAP.CONCURRENCY_LIMIT_EXCEEDED);
306 | }
307 | const randomUUID = generateRandomUUID();
308 | return this.appendPrefixTo(randomUUID);
309 | };
310 |
311 | private incrementConcurrencyCount() {
312 | this.concurrencyCounter++;
313 | }
314 |
315 | private decrementConcurrencyCount() {
316 | this.concurrencyCounter--;
317 | }
318 | }
319 |
--------------------------------------------------------------------------------
/src/queue.test.ts:
--------------------------------------------------------------------------------
1 | import { afterAll, describe, expect, test } from "bun:test";
2 | import { Redis } from "@upstash/redis";
3 |
4 | import { Queue } from ".";
5 | import {
6 | DEFAULT_AUTO_VERIFY,
7 | DEFAULT_CONCURRENCY_LIMIT,
8 | DEFAULT_CONSUMER_GROUP_NAME,
9 | DEFAULT_QUEUE_NAME,
10 | ERROR_MAP,
11 | } from "./constants";
12 | import { delay, formatMessageQueueKey } from "./utils";
13 |
14 | const randomValue = () => crypto.randomUUID().slice(0, 8);
15 | const redis = Redis.fromEnv();
16 |
17 | describe("Queue", () => {
18 | afterAll(async () => await redis.flushdb());
19 |
20 | describe("Queue name default option", () => {
21 | test("should return the default queue name", () => {
22 | const queue = new Queue({ redis });
23 | expect(queue.config.queueName).toEqual(formatMessageQueueKey(DEFAULT_QUEUE_NAME));
24 | });
25 |
26 | test("should return the customized name", () => {
27 | const queueName = "cookie-jar";
28 | const queue = new Queue({ redis, queueName });
29 | expect(queue.config.queueName).toEqual(formatMessageQueueKey(queueName));
30 | });
31 | });
32 |
33 | describe("Consumer group name default option", () => {
34 | test("should return the default customerGroupName", () => {
35 | const queue = new Queue({ redis, queueName: randomValue() });
36 | expect(queue.config.consumerGroupName).toEqual(DEFAULT_CONSUMER_GROUP_NAME);
37 | });
38 |
39 | test("should return the customized customerGroupName", () => {
40 | const consumerGroupName = "bigger-cookie-jar";
41 | const queue = new Queue({
42 | redis,
43 | consumerGroupName,
44 | queueName: randomValue(),
45 | });
46 | expect(queue.config.consumerGroupName).toEqual(consumerGroupName);
47 | });
48 | });
49 |
50 | describe("Concurrency limit default option", () => {
51 | test("should return 0 when concurrency is default", () => {
52 | const queue = new Queue({ redis, queueName: randomValue() });
53 | expect(queue.config.concurrencyLimit).toEqual(DEFAULT_CONCURRENCY_LIMIT);
54 | });
55 |
56 | test("should return the customized concurrency limit", () => {
57 | const concurrencyLimit = 5;
58 | const queue = new Queue({
59 | redis,
60 | concurrencyLimit,
61 | queueName: randomValue(),
62 | });
63 | expect(queue.config.concurrencyLimit).toEqual(concurrencyLimit);
64 | });
65 | });
66 |
67 | describe("Auto verify default option", () => {
68 | test("should return 0 when concurrency is default", () => {
69 | const queue = new Queue({ redis, queueName: randomValue() });
70 | expect(queue.config.autoVerify).toEqual(DEFAULT_AUTO_VERIFY);
71 | });
72 |
73 | test("should return the customized concurrency limit", () => {
74 | const autoVerify = false;
75 | const queue = new Queue({ redis, autoVerify, queueName: randomValue() });
76 | expect(queue.config.autoVerify).toEqual(autoVerify);
77 | });
78 | });
79 |
80 | describe("Concurrency", () => {
81 | test("should allow only specified amount of receiveMessages concurrently", async () => {
82 | const consumerCount = 4;
83 |
84 | const consumer = new Queue({
85 | redis,
86 | concurrencyLimit: consumerCount,
87 | queueName: randomValue(),
88 | });
89 |
90 | for (let i = 0; i < consumerCount; i++) {
91 | await consumer.receiveMessage();
92 | }
93 |
94 | expect(consumer.concurrencyCounter).toEqual(consumerCount);
95 | });
96 |
97 | test("should throw when try to consume more than 5 at the same time", async () => {
98 | let errorMessage = "";
99 | let iterationCount = 0;
100 | try {
101 | const consumer = new Queue({
102 | redis,
103 | queueName: randomValue(),
104 | concurrencyLimit: 5,
105 | });
106 |
107 | for (let i = 0; i < 10; i++) {
108 | await consumer.receiveMessage();
109 | iterationCount++;
110 | }
111 | } catch (error) {
112 | errorMessage = (error as Error).message;
113 | }
114 | expect(iterationCount).toBe(5);
115 | expect(errorMessage).toEqual(ERROR_MAP.CONCURRENCY_LIMIT_EXCEEDED);
116 | });
117 |
118 | test("should give us 0 since all the consumers are available after successful verify", async () => {
119 | const queue = new Queue({
120 | redis,
121 | queueName: randomValue(),
122 | concurrencyLimit: 2,
123 | });
124 |
125 | await queue.sendMessage({
126 | dev: randomValue(),
127 | });
128 |
129 | await queue.sendMessage({
130 | dev: randomValue(),
131 | });
132 |
133 | await Promise.all([queue.receiveMessage(), queue.receiveMessage()]);
134 |
135 | expect(queue.concurrencyCounter).toEqual(0);
136 | });
137 |
138 | test("should throw since default receive messages exceeds default limit: 1", async () => {
139 | let errorMessage = "";
140 | try {
141 | const queue = new Queue({
142 | redis,
143 | queueName: randomValue(),
144 | });
145 |
146 | await queue.sendMessage({
147 | dev: randomValue(),
148 | });
149 | await queue.sendMessage({
150 | dev: randomValue(),
151 | });
152 |
153 | await Promise.all([
154 | queue.receiveMessage(),
155 | queue.receiveMessage(),
156 | queue.receiveMessage(),
157 | queue.receiveMessage(),
158 | ]);
159 | } catch (error) {
160 | errorMessage = (error as Error).message;
161 | }
162 |
163 | expect(errorMessage).toEqual(ERROR_MAP.CONCURRENCY_DEFAULT_LIMIT_EXCEEDED);
164 | });
165 | });
166 |
167 | describe("Auto verify", () => {
168 | test("should auto verify the message and decrement concurrency counter", async () => {
169 | const queue = new Queue({
170 | redis,
171 | queueName: randomValue(),
172 | });
173 |
174 | await queue.sendMessage({
175 | dev: randomValue(),
176 | });
177 |
178 | await queue.receiveMessage();
179 |
180 | expect(queue.concurrencyCounter).toBe(0);
181 | });
182 |
183 | test("should verify manually and decrement the counter", async () => {
184 | const queue = new Queue({
185 | redis,
186 | queueName: randomValue(),
187 | autoVerify: false,
188 | });
189 |
190 | await queue.sendMessage({
191 | dev: randomValue(),
192 | });
193 |
194 | const receiveRes = await queue.receiveMessage();
195 | if (receiveRes) {
196 | const { streamId } = receiveRes;
197 | await queue.verifyMessage(streamId);
198 | }
199 |
200 | expect(queue.concurrencyCounter).toBe(0);
201 | });
202 |
203 | test("should not release concurrency since auto verify is disabled and no verifyMessage present", async () => {
204 | const queue = new Queue({
205 | redis,
206 | queueName: randomValue(),
207 | autoVerify: false,
208 | });
209 |
210 | await queue.sendMessage({
211 | dev: randomValue(),
212 | });
213 |
214 | await queue.receiveMessage();
215 |
216 | expect(queue.concurrencyCounter).not.toBe(0);
217 | });
218 | });
219 |
220 | describe("Autoclaim orphans", () => {
221 | test(
222 | "should left nothing in pending list",
223 | async () => {
224 | const queue = new Queue({
225 | redis,
226 | autoVerify: false,
227 | queueName: randomValue(),
228 | concurrencyLimit: 2,
229 | visibilityTimeout: 10000,
230 | });
231 |
232 | await queue.sendMessage({ hello: "world" });
233 | await queue.receiveMessage<{ hello: "world" }>();
234 | await delay(10000);
235 | const ackedReceive = await queue.receiveMessage<{ hello: "world" }>();
236 | await queue.verifyMessage(ackedReceive?.streamId!);
237 |
238 | const xpendingRes = await queue.config.redis.xpending(
239 | queue.config.queueName!,
240 | queue.config.consumerGroupName!,
241 | "-",
242 | "+",
243 | 1
244 | );
245 |
246 | expect(xpendingRes).toBeEmpty();
247 | },
248 | { timeout: 20000 }
249 | );
250 |
251 | test(
252 | "should return at least 1 pending since they are not claimed",
253 | async () => {
254 | const queue = new Queue({
255 | redis,
256 | autoVerify: false,
257 | queueName: randomValue(),
258 | concurrencyLimit: 2,
259 | });
260 |
261 | await queue.sendMessage({ hello: "world" });
262 | await queue.receiveMessage<{ hello: "world" }>();
263 | await delay(10000);
264 | await queue.receiveMessage<{ hello: "world" }>();
265 |
266 | const xpendingRes = await queue.config.redis.xpending(
267 | queue.config.queueName!,
268 | queue.config.consumerGroupName!,
269 | "-",
270 | "+",
271 | 1
272 | );
273 |
274 | expect(xpendingRes).not.toBeEmpty();
275 | },
276 | { timeout: 20000 }
277 | );
278 | });
279 |
280 | describe("Queue with delays", () => {
281 | test(
282 | "should enqueue with a delay",
283 | async () => {
284 | const fakeValue = randomValue();
285 | const queue = new Queue({
286 | redis,
287 | queueName: "app-logs",
288 | concurrencyLimit: 2,
289 | });
290 | await queue.sendMessage(
291 | {
292 | dev: fakeValue,
293 | },
294 | 2000
295 | );
296 | const res = await queue.receiveMessage();
297 | expect(res?.body).not.toEqual({
298 | dev: fakeValue,
299 | });
300 | await delay(5000);
301 | const res1 = await queue.receiveMessage<{ dev: string }>();
302 | expect(res1?.body).toEqual({
303 | dev: fakeValue,
304 | });
305 | },
306 | { timeout: 10000 }
307 | );
308 |
309 | test(
310 | "should poll until data arives",
311 | () => {
312 | const throwable = async () => {
313 | const fakeValue = randomValue();
314 | const producer = new Queue({ redis, queueName: "app-logs" });
315 | const consumer = new Queue({
316 | redis,
317 | queueName: "app-logs",
318 | });
319 | await producer.sendMessage(
320 | {
321 | dev: fakeValue,
322 | },
323 | 2
324 | );
325 | await consumer.receiveMessage<{
326 | dev: string;
327 | }>(5000);
328 | };
329 | expect(throwable).toThrow();
330 | },
331 | { timeout: 10000 }
332 | );
333 | });
334 |
335 | describe("Reliability tests", () => {
336 | test("should handle multi-threaded message processing in batches", async () => {
337 | const queue = new Queue({
338 | redis,
339 | queueName: randomValue(),
340 | concurrencyLimit: 5,
341 | });
342 |
343 | // First batch of sending 5 messages
344 | const firstBatchSendPromises = [];
345 | for (let i = 0; i < 5; i++) {
346 | firstBatchSendPromises.push(queue.sendMessage({ data: `message-${i}` }));
347 | }
348 | await Promise.all(firstBatchSendPromises);
349 |
350 | // First batch of receiving 5 messages
351 | const firstBatchReceivePromises = [];
352 | for (let i = 0; i < 5; i++) {
353 | firstBatchReceivePromises.push(queue.receiveMessage());
354 | }
355 | await Promise.all(firstBatchReceivePromises);
356 |
357 | // Second batch of sending 5 messages
358 | const secondBatchSendPromises = [];
359 | for (let i = 5; i < 10; i++) {
360 | secondBatchSendPromises.push(queue.sendMessage({ data: `message-${i}` }));
361 | }
362 | await Promise.all(secondBatchSendPromises);
363 |
364 | // Second batch of receiving 5 messages
365 | const secondBatchReceivePromises = [];
366 | for (let i = 5; i < 10; i++) {
367 | secondBatchReceivePromises.push(queue.receiveMessage());
368 | }
369 | await Promise.all(secondBatchReceivePromises);
370 |
371 | expect(queue.concurrencyCounter).toBe(0); // Assuming autoVerify is enabled
372 | }, 30000);
373 |
374 | test("should handle high volume of messages in batches and track received messages", async () => {
375 | const batchSize = 5; // Size of each batch, based on the concurrency limit
376 | const queue = new Queue({
377 | redis,
378 | queueName: randomValue(),
379 | concurrencyLimit: batchSize,
380 | });
381 |
382 | const totalMessages = 100; // High volume of messages
383 | const numberOfBatches = totalMessages / batchSize;
384 | let totalNumberOfReceivedMessages = 0; // Counter for received messages
385 |
386 | for (let batch = 0; batch < numberOfBatches; batch++) {
387 | const sendMessagePromises = [];
388 | for (let i = 0; i < batchSize; i++) {
389 | sendMessagePromises.push(queue.sendMessage({ data: `message-${batch * batchSize + i}` }));
390 | }
391 | await Promise.all(sendMessagePromises);
392 |
393 | const receiveMessagePromises = [];
394 | for (let i = 0; i < batchSize; i++) {
395 | const promise = queue.receiveMessage().then((message) => {
396 | if (message) {
397 | totalNumberOfReceivedMessages++;
398 | }
399 | return message;
400 | });
401 | receiveMessagePromises.push(promise);
402 | }
403 | await Promise.all(receiveMessagePromises);
404 | }
405 |
406 | expect(totalNumberOfReceivedMessages).toBe(totalMessages);
407 | expect(queue.concurrencyCounter).toBe(0);
408 | }, 30000);
409 | });
410 |
411 | describe("FIFO queue", () => {
412 | test("should do a FIFO queue", async () => {
413 | const queue = new Queue({ redis });
414 |
415 | await queue.sendMessage({ hello: "world1" });
416 | await queue.sendMessage({ hello: "world2" });
417 | await queue.sendMessage({ hello: "world3" });
418 |
419 | const message1 = await queue.receiveMessage<{ hello: "world1" }>();
420 | expect(message1?.body).toEqual({ hello: "world1" });
421 |
422 | const message2 = await queue.receiveMessage<{ hello: "world2" }>();
423 | expect(message2?.body).toEqual({ hello: "world2" });
424 |
425 | const message3 = await queue.receiveMessage<{ hello: "world3" }>();
426 | expect(message3?.body).toEqual({ hello: "world3" });
427 | });
428 | });
429 | });
430 |
--------------------------------------------------------------------------------