-
Notifications
You must be signed in to change notification settings - Fork 2k
streaming conversation api #8790
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
@@ -511,7 +509,8 @@ replace ( | |||
// Uncomment for local development for testing with changes in the components-contrib && kit repositories. | |||
// Don't commit with this uncommented! | |||
// | |||
// replace github.com/dapr/components-contrib => ../components-contrib | |||
replace github.com/dapr/components-contrib => ../components-contrib |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Temporary while developing and while component-contrib dependency is needed. I'll need to update the mod sum after that is merged
a0181aa
to
a6aa6d8
Compare
a6aa6d8
to
2c93287
Compare
Signed-off-by: Filinto Duran <1373693+filintod@users.noreply.github.com>
- Changed `contextID` to `context_id` in `dapr.proto` and updated JSON name mapping. - Updated all references in the codebase to use the new field name `ContextId`. - Added `ConverseStreamAlpha1` endpoint to gRPC mappings. - Adjusted test cases to reflect the new naming convention for context ID. - Changed package name from `http` to `grpc` in relevant integration tests. Signed-off-by: Filinto Duran <1373693+filintod@users.noreply.github.com>
Signed-off-by: Filinto Duran <1373693+filintod@users.noreply.github.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another pass of review from me-
Please can organize out the processing the conversation messages into a separate package.
@@ -220,6 +220,9 @@ service Dapr { | |||
|
|||
// Converse with a LLM service | |||
rpc ConverseAlpha1(ConversationRequest) returns (ConversationResponse) {} | |||
|
|||
// Converse with a LLM service using streaming | |||
rpc ConverseStreamAlpha1(ConversationRequest) returns (stream ConversationStreamResponse) {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this API not bi-directional- with a client being able to send more prompts during the same "conversation"?
optional int32 completion_tokens = 2 [json_name = "completionTokens"]; | ||
// Total number of tokens used | ||
optional int32 total_tokens = 3 [json_name = "totalTokens"]; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: eof newline
optional int32 prompt_tokens = 1 [json_name = "promptTokens"]; | ||
// Number of tokens in the completion | ||
optional int32 completion_tokens = 2 [json_name = "completionTokens"]; | ||
// Total number of tokens used | ||
optional int32 total_tokens = 3 [json_name = "totalTokens"]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason signed & 32 bit?
optional int32 prompt_tokens = 1 [json_name = "promptTokens"]; | |
// Number of tokens in the completion | |
optional int32 completion_tokens = 2 [json_name = "completionTokens"]; | |
// Total number of tokens used | |
optional int32 total_tokens = 3 [json_name = "totalTokens"]; | |
optional uint64 prompt_tokens = 1 [json_name = "promptTokens"]; | |
// Number of tokens in the completion | |
optional uint64 completion_tokens = 2 [json_name = "completionTokens"]; | |
// Total number of tokens used | |
optional uint64 total_tokens = 3 [json_name = "totalTokens"]; |
|
||
// ConversationStreamResponse is the streaming response for Conversation. | ||
message ConversationStreamResponse { | ||
oneof response_type { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will this message ever contain other fields outside of oneof
?
Generally it's good practice to move the oneof
definition to a separate message to preserve field number sequence.
if input.GetScrubPII() { | ||
return true | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is the scrubber enabled for all when only some inputs have it enabled?
// Simulate streaming by sending the complete response as chunks | ||
if resp != nil { | ||
contextID = resp.ConversationContext | ||
if len(resp.Outputs) > 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Redundant.
if len(resp.Outputs) > 0 { |
for _, output := range resp.Outputs { | ||
// Break the result into chunks to simulate streaming | ||
content := output.Result | ||
chunkSize := 50 // Send in 50-character chunks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why 50
?
gRPC by default has a maximum message size of 4MB (~4,000,000 bytes)- It's incredibly inefficient to be sending small payloads like this.
I don't think we need to be chunking at all. But if we must.. we should be fetching what the max message size configured is, and chunking on a slightly smaller number to account for headers and other fields in the message.
|
||
// Add AI provider components if their API keys are available | ||
for _, provider := range liveConversationAIProviders { | ||
if apiKey := os.Getenv(provider.envVar); apiKey != "" { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move to an end to end test.
Integration tests are always self contained and offline. Use the echo
conversation component type, or use a mock server.
@@ -34,6 +34,14 @@ type basic struct { | |||
daprd *daprd.Daprd | |||
} | |||
|
|||
func getEchoEstimatedTokens(msg ...string) int { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please do not put helper functions at the top of the file.
s.daprd.WaitUntilRunning(t, ctx) | ||
|
||
client := s.daprd.GRPCClient(t, ctx) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the oneof
order (completion last) being tested?
Description
Add streaming support to conversation API. Depends on dapr/components-contrib#3847
Issue reference
Please reference the issue this PR will close: #8813
Checklist
Please make sure you've completed the relevant tasks for this PR, out of the following list: