Open
Description
Environment
latest node
latest hono
latest crossws
Reproduction
a new hono app
sseAdapter({
bidir: true, // Enable bidirectional messaging support
hooks: {
upgrade: (request) => {
// Extract user context from request headers
let userContext: UserContext | undefined;
try {
const userContextHeader = request.headers.get('userContext');
// console.log('sse request context', request.context);
// console.log('sse request header', request.headers);
if (userContextHeader) {
console.log('userContextHeader', userContextHeader);
userContext = JSON.parse(userContextHeader);
}
if (!userContext) {
// return new Response('Authentication required', { status: 401 });
userContext = {
userId: '1',
role: 'user',
labels: [],
teams: [],
permissions: [],
};
}
} catch (error) {
console.error('Error parsing user context:', error);
return new Response('Invalid user context', { status: 400 });
}
request.context.userContext = userContext;
// In case of bidirectional mode, extra auth is recommended based on request
// return {
// headers: {},
// };
},
open: (peer) => {
// Send welcome message
peer.send(
JSON.stringify({
type: 'welcome',
message: `Welcome ${peer.id}`,
})
);
},
message: async (peer, message) => {
console.log('Received message on sse:', message.text());
try {
// Read the message content as text first, then parse as JSON
const messageText = await message.text();
console.log('Received message text:', messageText); // Log the raw text
const msg = JSON.parse(messageText) as ClientMessage;
} catch (error) {
console.error('Error handling message:', error);
peer.send(
JSON.stringify({
type: 'error',
message: 'Invalid message format',
})
);
}
},
close: (peer) => {
// No need to manually clean up subscriptions
// crossws handles this automatically
console.log(`Client ${peer.id} disconnected`);
},
},
});
public async handleRequest(request: Request): Promise<Response> {
// Check if the SSE adapter is initialized
if (!this.sseAdapter) {
console.warn(
'SSE adapter not initialized yet. Waiting for initialization...'
);
await this.initialize();
}
// Check if this is an SSE request
if (
request.headers.get('accept') === 'text/event-stream' ||
request.headers.has('x-crossws-id')
) {
return this.sseAdapter.fetch(request);
}
// Return 404 for non-SSE requests
return new Response('Not found', { status: 404 });
}
hono side
app.get(`${prefix}/sse`,
7206
async (c) => {
const request = c.req.raw;
// The handleRequest from the sseAdapter handles the underlying request/response
// User context is typically handled during the 'upgrade' hook within the SSEManager
// by reading headers, not passed directly here.
const response = await realtimeAdapter.handleRequest(request);
return response;
});
app.post(`${prefix}/sse`, async (c) => {
const request = c.req.raw;
// The handleRequest from the sseAdapter handles the underlying request/response
// User context is typically handled during the 'upgrade' hook within the SSEManager
// by reading headers, not passed directly here.
const response = await realtimeAdapter.handleRequest(request);
return response;
});
Describe the bug
sending a message from the client causes this error
this.sseClient.addEventListener('open', async () => {
console.log('Connected to SSE server');
// Subscribe to a table
// Send the user context with the subscription request
await this.sseClient.send(
JSON.stringify({
type: 'subscribe',
tableName: 'posts',
})
);
console.log('Sent subscription request');
});
Additional context
No response
Logs
Received message on sse: [object ReadableStream]
Received message text: [object ReadableStream]
Error handling message: SyntaxError: Unexpected token 'o', "[object Rea"... is not valid JSON
at JSON.parse (<anonymous>)
at message (/Users/rou/Desktop/bass/forgebase-ts/dist/apps/hono-test/webpack:/libs/database/src/websocket/SSEManager.ts:105:30)
at processTicksAndRejections (node:internal/process/task_queues:105:5)