-
Notifications
You must be signed in to change notification settings - Fork 334
mcp: improve http transports error handling and make transport work with any size message #734
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
findleyr
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's discuss this change in #726. I'm not sure we shouldn't just remove the buffering.
91f93af to
ee3d418
Compare
ee3d418 to
85a4340
Compare
|
@findleyr current proposal (no fixed buffer size) approach is inline with most sdks. Can we get this merged? |
|
@alexbumbacea sorry for the delay, will review tomorrow. |
mcp/event.go
Outdated
| yield(Event{}, fmt.Errorf("context done: %w", ctx.Err())) | ||
| return | ||
| } else { | ||
| yield(Event{}, fmt.Errorf("error reading event: %w", err)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we want to expose these errors in the API: we can just format them with %v.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not clear why we don't wat to wrap it and instead we just want to print it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you wrap an error, then callers can extract the error. That means the error type becomes part of your effective API. Which means you can't change it.
If you just include the error text, you don't have that problem. If later, someone needs to distinguish particular errors, we can design the proper API for them.
mcp/event.go
Outdated
| const maxTokenSize = 1 * 1024 * 1024 // 1 MiB max line size | ||
| scanner.Buffer(nil, maxTokenSize) | ||
| func scanEvents(ctx context.Context, r io.Reader) iter.Seq2[Event, error] { | ||
| scanner := bufio.NewReader(r) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
'reader'
mcp/event.go
Outdated
| scanner := bufio.NewScanner(r) | ||
| const maxTokenSize = 1 * 1024 * 1024 // 1 MiB max line size | ||
| scanner.Buffer(nil, maxTokenSize) | ||
| func scanEvents(ctx context.Context, r io.Reader) iter.Seq2[Event, error] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we adding a ctx parameter? Does not appear to be used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Used on L117 to stop the iteration on cancellation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack, ok... but is that necessary? Wouldn't a context cancellation close the response body?
|
|
||
| case data := <-c.incoming: | ||
| case m := <-c.incoming: | ||
| if m.err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Returning an error from Read breaks the connection. I don't think that's actually what we want to happen here, is it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given that this method return either an error or a jsonrpc.Message, I think this is an acceptable behaviour.
Considering previous behaviour when there was an error during scan events, parsing the body would stop as the loop over scanEvents would break in case there was an error, but without bubbling up the error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I don't think I sufficiently conveyed the consequences.
The JSON-RPC library expects to operate on a logical stream. Any failure to read from that stream indicates that the logical connection is broken.
So as a consequence of this change, a context cancellation calling a tool will break the entire MCP session.
I think I know the desired behavior: you want to get an error from the CallTool request, but this change doesn't achieve that result.
The jsonrpc2 library doesn't really support this: we'd need to add something like jsonrpc2.Connection.Fail(jsonrpc2.ID, error) to allow this type of layering traversal.
If you'd like to land this PR, I suggest not returning an error here, and I can make the change to achieve what you want.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How is this error different from the error returned a few lines bellow by jsonrpc2.DecodeMessage?
Your assumption is correct, I want to bubble up any errors from lower levels, so that for the caller the problem would be clear enough, but this implementation doesn't seem to be using a jsonrpc2 connection, just the encode/decode part. (this is sse implementation, not jsonrpc).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The jsonrpc2.Connection calls Read, which must have stream semantics: an error from Read breaks the stream.
I think there's an argument to be made that a malformed payload should break the stream: if the server sends something that we can't even parse as a jsonrpc2.Message, then the stream is corrupt.
But if the error is due to a network error or client cancellation, we don't want to terminate the session.
See also #683.
Let's focus on fixing the size limit. We can leave the bubbling up of errors, but should not return an error here. I'll make the change to surface the error to the application layer:it's a subtle change to the jsonrpc2 connection.
mcp/event.go
Outdated
| yield(Event{}, fmt.Errorf("context done: %w", ctx.Err())) | ||
| return | ||
| } else { | ||
| yield(Event{}, fmt.Errorf("error reading event: %w", err)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you wrap an error, then callers can extract the error. That means the error type becomes part of your effective API. Which means you can't change it.
If you just include the error text, you don't have that problem. If later, someone needs to distinguish particular errors, we can design the proper API for them.
mcp/event.go
Outdated
| return true | ||
| } | ||
| for { | ||
| line, err := scanner.ReadBytes('\n') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we think this is a potential DOS attack vector? True, the client is reading from the server, which it presumably trusts in many other ways. But a malicious server could make the client crash by feeding it a large event.
What about a high and configurable upper limit?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it was already discussed in #726 that having this kind of limit would not protect against this in the event the server would present that as a multi "data" event.
Looking into other implementations of the MCP client (other languages and even other written in go) they do not impose such a limitation.
mcp/event.go
Outdated
| scanner := bufio.NewScanner(r) | ||
| const maxTokenSize = 1 * 1024 * 1024 // 1 MiB max line size | ||
| scanner.Buffer(nil, maxTokenSize) | ||
| func scanEvents(ctx context.Context, r io.Reader) iter.Seq2[Event, error] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Used on L117 to stop the iteration on cancellation.
85a4340 to
7ba1c9a
Compare
|
@findleyr all comments have been either replied to or addressed. are there any other concerns? |
mcp/event.go
Outdated
| if !bytes.Equal(before, dataKey) { | ||
| flushData() | ||
| // If we're starting a new event field and we already have an event, emit it first | ||
| if bytes.Equal(before, eventKey) && !evt.Empty() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't look right: the event: field does not need to be the first field in the event. Maybe add a failing test for this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed and added tests the scenario you mentioned
mcp/event.go
Outdated
| dataBuf *bytes.Buffer // if non-nil, preceding field was also data | ||
| ) | ||
| flushData := func() { | ||
| emitEvent := func() bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/emitEvent/yieldEvent
|
|
||
| case data := <-c.incoming: | ||
| case m := <-c.incoming: | ||
| if m.err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I don't think I sufficiently conveyed the consequences.
The JSON-RPC library expects to operate on a logical stream. Any failure to read from that stream indicates that the logical connection is broken.
So as a consequence of this change, a context cancellation calling a tool will break the entire MCP session.
I think I know the desired behavior: you want to get an error from the CallTool request, but this change doesn't achieve that result.
The jsonrpc2 library doesn't really support this: we'd need to add something like jsonrpc2.Connection.Fail(jsonrpc2.ID, error) to allow this type of layering traversal.
If you'd like to land this PR, I suggest not returning an error here, and I can make the change to achieve what you want.
mcp/event.go
Outdated
| scanner := bufio.NewScanner(r) | ||
| const maxTokenSize = 1 * 1024 * 1024 // 1 MiB max line size | ||
| scanner.Buffer(nil, maxTokenSize) | ||
| func scanEvents(ctx context.Context, r io.Reader) iter.Seq2[Event, error] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack, ok... but is that necessary? Wouldn't a context cancellation close the response body?
3577e96 to
377a108
Compare
377a108 to
8f634ce
Compare
We have a use case for messages larger than 1MB, hence making this configurable would help.
In order to discover the source of our problems we had to spend some time investigating, as errors were silently discarded. So we also included changes to improving surfacing errors .