-
-
Notifications
You must be signed in to change notification settings - Fork 89
Add SqliteMessageQueue and related tests #526
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Summary of ChangesHello @2chanhaeng, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly enhances the message queuing capabilities by introducing a new SQLite-based message queue implementation. Concurrently, it establishes a robust and standardized testing framework for message queues, which has been applied to existing implementations. This dual approach not only expands the available message queue options but also improves the consistency, reliability, and maintainability of message queue tests across the project, making future integrations and testing more efficient. Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a new SqliteMessageQueue and a standardized testing utility, testMessageQueue, to unify message queue tests across the repository. The refactoring of existing tests to use this new utility is a great step towards reducing code duplication and ensuring consistent test coverage.
My review focuses on the new implementations. I've found a couple of critical issues in the SqliteMessageQueue related to race conditions and error handling that could lead to message loss and listener crashes. I've also identified areas for improvement in the new testMessageQueue utility to make it more robust and align with the MessageQueue interface. Additionally, I've suggested a minor improvement to the SQLite test to ensure proper cleanup of temporary database files.
Codecov Report❌ Patch coverage is
... and 2 files with indirect coverage changes 🚀 New features to boost your workflow:
|
|
How about adding |
That's a good idea, but it seems to go beyond #477. How about you create a separate issue for it? |
SqliteMessageQueue.listen fedify-dev#526 (comment)
Well, the cleanup method belongs to |
2chanhaeng
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about adding
[Symbol.dispose]()for consistency with other MessageQueue implementations?That's a good idea, but it seems to go beyond #477. How about you create a separate issue for it?
Well, the cleanup method belongs to
sqlite/src/ mq.tsso i don't think that's out of scope if you implement it for only SqliteMessageQueue. It seems quite simple but caring other message queues should be handled in other PR like you said. (Neither amqp has that)
Oh, OK. I added it at 80e11c6!
- Remove `let Temporal ...` - Export `@fedify/sqlite` - Add missing dependencies
SqliteMessageQueue.listen fedify-dev#526 (comment)
| `DELETE FROM "${this.#tableName}" | ||
| WHERE id = ( | ||
| SELECT id FROM "${this.#tableName}" | ||
| WHERE scheduled <= ? | ||
| ORDER BY scheduled | ||
| LIMIT 1 | ||
| ) | ||
| RETURNING id, message`, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While DELETE … RETURNING is atomic, when multiple processes execute the same query simultaneously, there's a potential race condition. Using BEGIN IMMEDIATE ensures proper locking and prevents duplicate message processing.
Also, don't forget you need to handle SQLITE_BUSY error as well!
| static readonly #tableNameRegex = /^[A-Za-z_][A-Za-z0-9_]{0,63}$/; | ||
| // In-memory event emitter for notifying listeners when messages are enqueued. | ||
| // Scoped per table name to allow multiple queues to coexist. | ||
| static readonly #notifyChannels = new Map<string, EventTarget>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The static #notifyChannels map can accumulate EventTarget instances indefinitely. Consider implementing cleanup logic when the last listener for a channel is removed.
This is a minor issue for most use cases, but worth noting for long-running applications with many different table names.
| new EnqueueEvent(delayMs), | ||
| ); | ||
| } catch (error) { | ||
| this.#db.exec("ROLLBACK"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, errors are caught and re-thrown without logging. Adding error logs would help with debugging:
| this.#db.exec("ROLLBACK"); | |
| this.#db.exec("ROLLBACK"); | |
| logger.error("Failed to enqueue messages: {error}", { error, messages }); |
| this.#db | ||
| .prepare( | ||
| `INSERT INTO "${this.#tableName}" (id, message, created, scheduled) | ||
| VALUES (?, ?, ?, ?)`, | ||
| ) | ||
| .run(id, encodedMessage, now, scheduled); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Even with busy_timeout set, SQLITE_BUSY errors can still occur when the timeout expires. In a multi-process message queue, this is a normal condition that requires retry logic.
Consider also adding retry logic to enqueueMany() and listen() methods!
| if (this.#initialized) { | ||
| return; | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Without journal_mode=WAL, SQLite uses the default rollback journal mode which severely limits concurrent access. This will cause reader/writer conflicts in multi-process environments.
Summary
Introduce the
SqliteMessageQueueclass along with a testing framework formessage queues. Refactor existing tests to utilize the new
testMessageQueueutility for improved test structure and readability.
Related issue
SqliteMessageQueuefor single-node deployments #477Changes
SqliteMessageQueueclass to@fedify/sqlitepackage implementingthe
MessageQueueinterface using SQLite as the backing storetestMessageQueue()utility function to@fedify/testingpackagefor standardized testing of
MessageQueueimplementationswaitFor()andgetRandomKey()helper functions to@fedify/testingtestMessageQueue:@fedify/amqp@fedify/denokv@fedify/postgres@fedify/redisSqliteMessageQueueBenefits
a reusable test harness
MessageQueueimplementationsMessageQueueimplementations in the futurewith standardized testing
Checklist
mise teston your machine?