-
Notifications
You must be signed in to change notification settings - Fork 579
rbd: add support for changed block tracking RPCs #5347
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: devel
Are you sure you want to change the base?
Conversation
internal/rbd/controllerserver.go
Outdated
@@ -1709,3 +1709,67 @@ func (cs *ControllerServer) ControllerUnpublishVolume( | |||
|
|||
return &csi.ControllerUnpublishVolumeResponse{}, nil | |||
} | |||
|
|||
// GetSnapshotMetadata streams the allocated block metadata for a snapshot. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// GetSnapshotMetadata streams the allocated block metadata for a snapshot. | |
// GetMetadataAllocated streams the allocated block metadata for a snapshot. |
internal/rbd/controllerserver.go
Outdated
req *csi.GetMetadataDeltaRequest, | ||
stream csi.SnapshotMetadata_GetMetadataDeltaServer, | ||
) error { | ||
ctx := context.WithValue(stream.Context(), "Base-Snapshot-ID", req.GetBaseSnapshotId()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Base Snapshot ID
internal/rbd/controllerserver.go
Outdated
|
||
targetSnapshotID := req.GetTargetSnapshotId() | ||
if targetSnapshotID == "" { | ||
return status.Error(codes.InvalidArgument, "base snapshot ID cannot be empty") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return status.Error(codes.InvalidArgument, "base snapshot ID cannot be empty") | |
return status.Error(codes.InvalidArgument, "target snapshot ID cannot be empty") |
b60a6f5
to
3892027
Compare
internal/rbd/controllerserver.go
Outdated
@@ -1709,3 +1709,67 @@ func (cs *ControllerServer) ControllerUnpublishVolume( | |||
|
|||
return &csi.ControllerUnpublishVolumeResponse{}, nil | |||
} | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets move this to a new file as its a new functionality, This fine is already having 1770 lines of code.
internal/rbd/controllerserver.go
Outdated
|
||
rbdSnap, err := genSnapFromSnapID(ctx, snapshotID, cr, req.GetSecrets()) | ||
if err != nil { | ||
return status.Error(codes.Internal, err.Error()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what about the NotFound error?
internal/rbd/snap_diff.go
Outdated
maxResults int32) error { | ||
image, err := rbdSnap.open() | ||
if err != nil { | ||
return err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return GRPC error message
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Take care of this in other places in this function
internal/rbd/snap_diff.go
Outdated
} | ||
} | ||
// If no error, we successfully completed the diff operation. | ||
if len(changedBlocks) > 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we have a function to send the response and can be reused here and above like
if len(changedBlocks) > 0 { | |
send := func() error { | |
if len(changedBlocks) == 0 { | |
return nil | |
} | |
log.DebugLog(ctx, "Sending %d changed blocks to the stream", len(changedBlocks)) | |
if err := stream.Send(&csi.GetMetadataAllocatedResponse{ | |
BlockMetadataType: csi.BlockMetadataType_VARIABLE_LENGTH, | |
VolumeCapacityBytes: rbdSnap.VolSize, | |
BlockMetadata: changedBlocks, | |
}); err != nil { | |
log.ErrorLog(ctx, "failed to send response: %v", err) | |
return status.Error(codes.Internal, fmt.Sprintf("failed to send response: %v", err)) | |
} | |
changedBlocks = changedBlocks[:0] // Reset slice | |
return nil | |
} |
internal/rbd/controllerserver.go
Outdated
|
||
baseRBDSnap, err := genSnapFromSnapID(ctx, baseSnapshotID, cr, req.GetSecrets()) | ||
if err != nil { | ||
return status.Error(codes.Internal, err.Error()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to handle NotFound?
internal/rbd/controllerserver.go
Outdated
|
||
targetRBDSnap, err := genSnapFromSnapID(ctx, targetSnapshotID, cr, req.GetSecrets()) | ||
if err != nil { | ||
return status.Error(codes.Internal, err.Error()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to handle NotFound?
internal/rbd/snap_diff.go
Outdated
return nil | ||
} | ||
|
||
func (rbdSnap *rbdSnapshot) getMetadataDelta( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks similar to above and have lot of duplication . Can we rewrite this to avoid duplication of the core logic? That will help us to keep the outer implementation simple
85a53a1
to
288a354
Compare
56db368
to
6a3d211
Compare
Thanks for the quick detailed review The new main modifications are:
failing golang lintci is being handled in #5351 Please take a look again |
This pull request now has conflicts with the target branch. Could you please resolve conflicts and force push the corrected changes? 🙏 |
internal/rbd/snap_diff.go
Outdated
defer cr.DeleteCredentials() | ||
|
||
snapshotID := req.GetSnapshotId() | ||
if snapshotID == "" { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not needed? already handled in validateMetadataAllocatedReq
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
1st pass going through this. Needs cleanup and clear splitting of internal RBD things and ControllerServer Snapshot usage. Extend the types.Snapshot
interface with required functions, so that ControllerServer does not need to use rbdSnap
types directly.
internal/rbd/snap_diff.go
Outdated
|
||
// streamDoneErrCode is a custom error code used to indicate that the | ||
// stream was closed by the client. | ||
streamDoneErrCode = -1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason to not use google.golang.org/grpc/codes
constants? Or, maybe base it on return values of read()
, like EOF
and so on?
internal/rbd/snap_diff.go
Outdated
// internalMaxResults is the maximum number of results to return in a single | ||
// response. This is used to limit the number of changed blocks that needs | ||
// to be stored in memory before sending a response to the stream. | ||
internalMaxResults = 20 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't 20 very small?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't 20 very small?
I'm fine with any value here on the lower
The CSI spec gives us freedom to send lower number than asked for.
We'll have a followup to make this configurable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any suggestions ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I expect that the size of a single metadata for a changed block is relatively small. Start with something like 128, so that the number of repeated calls is reduced (will be less efficient?).
A configuration option is definitely good to do at a later point in time. Does the sidecar provide metrics about the number of changed blocks returned per diff? With that, a more educated guess for a default can be made.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I expect that the size of a single metadata for a changed block is relatively small. Start with something like 128, so that the number of repeated calls is reduced (will be less efficient?).
👍
A configuration option is definitely good to do at a later point in time. Does the sidecar provide metrics about the number of changed blocks returned per diff? With that, a more educated guess for a default can be made.
Currently only metrics regarding start & end time of requests is added
@Nikhil-Ladha ^
72390e5
to
4ff0a22
Compare
@@ -4,6 +4,8 @@ | |||
|
|||
## Features | |||
|
|||
- rbd: add support for [CSI Snapshot Metadata Service RPCs](https://github.com/container-storage-interface/spec/blob/master/spec.md#snapshot-metadata-service-rpcs) | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add something about it to the rbd documentation too?
internal/rbd/errors/errors.go
Outdated
@@ -68,3 +68,8 @@ var ( | |||
// ErrGroupNotFound is returned when group is not found in the cluster on the given pool and/or namespace. | |||
ErrGroupNotFound = fmt.Errorf("%w: RBD group not found", librbd.ErrNotFound) | |||
) | |||
|
|||
type ErrorCode interface { | |||
// Code returns the error code. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment Code
-> ErrorCode
.
Can you also add a comment for the interface itself, with a small explanation how (which kind of errors) it can be used?
// StreamMetadata streams the block metadata for a snapshot. | ||
// If baseRBDSnap is provided, it calculates the delta between the | ||
// current snapshot and the base snapshot. | ||
func (rbdSnap *rbdSnapshot) StreamMetadata( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have the feeling this function does too much. The rbdSnapshot
struct should probably not send anything, but instead return []*csi.BlockMetadata
to the caller which can then send it out? Would that not make it a cleaner implementation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have the feeling this function does too much.
Broke down this function to include multiple helpers
The
rbdSnapshot
struct should probably not send anything, but instead return[]*csi.BlockMetadata
to the caller which can then send it out? Would that not make it a cleaner implementation?
It's a streaming API, so we would be sending more than once.
Used this as inspiration to instead send down a func that only takes []*csi.BlockMetadata
as input and send stream response internally.
internal/rbd/types/snapshot.go
Outdated
// If baseRBDSnap is provided, the delta between the | ||
// current snapshot and the base snapshot is streamed. | ||
StreamMetadata(ctx context.Context, | ||
stream grpc.ServerStream, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
grpc.ServerStream
really isn't nice here, see my other comment.
internal/rbd/snap_diff.go
Outdated
} | ||
|
||
if baseRBDSnap != nil { | ||
fromSnapID, err = baseRBDSnap.GetRBDSnapID(ctx) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of exporting the GetRBDSnapID()
function in the Snapshot interface, it can be an internal function too. In this case, typecasting baseRBDSnap
to a *rbdSnapshot
would be ok.
Consider a helper function for typecasting:
func rbdSnapFromSnapshot(snap types.Snapshot) (*rbdSnapshot, error) {}
internal/rbd/types/snapshot.go
Outdated
GetVolSize() int64 | ||
|
||
// GetRBDSnapID retrieves the RBD snapshot ID for the snapshot. | ||
GetRBDSnapID(ctx context.Context) (uint64, error) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be a non-backend-specific naming, drop RBD from it. Ideally remove it from the interface, see a previous comment.
Signed-off-by: Rakshith R <rar@redhat.com>
4eeea62
to
9956ddf
Compare
Signed-off-by: Rakshith R <rar@redhat.com>
9956ddf
to
3cb5e25
Compare
9c9afb4
to
755348e
Compare
This commit adds support for GetMetadataAllocated and GetMetadataDelta RPCs. Signed-off-by: Rakshith R <rar@redhat.com>
755348e
to
0c29e32
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
some nits, overall LGTM
|
||
* Streams metadata about block differences between two snapshots | ||
* Optimal for incremental backups | ||
* Returns only blocks that changed between snapshots |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doc is for someone who wants to deploy and test it out, can we add relevant steps for the same?
caps = append(caps, &snapDiffCaps) | ||
} | ||
if err != nil { | ||
log.DebugLog(ctx, "error checking for snapshot diff by ID support: %s", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
log as error/warning?
if errors.Is(err, rbderrors.ErrImageNotFound) { | ||
log.ErrorLog(ctx, "snapshot %q not found: %v", baseSnapshotID, err) | ||
|
||
return status.Errorf(codes.NotFound, "snapshot %q not found: %v", baseSnapshotID, err) | ||
} | ||
if err != nil { | ||
return status.Error(codes.Internal, err.Error()) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if errors.Is(err, rbderrors.ErrImageNotFound) { | |
log.ErrorLog(ctx, "snapshot %q not found: %v", baseSnapshotID, err) | |
return status.Errorf(codes.NotFound, "snapshot %q not found: %v", baseSnapshotID, err) | |
} | |
if err != nil { | |
return status.Error(codes.Internal, err.Error()) | |
} | |
if err != nil { | |
if errors.Is(err, rbderrors.ErrImageNotFound) { | |
log.ErrorLog(ctx, "snapshot %q not found: %v", baseSnapshotID, err) | |
return status.Errorf(codes.NotFound, "snapshot %q not found: %v", baseSnapshotID, err) | |
} | |
return status.Error(codes.Internal, err.Error()) | |
} | |
it would be nice to have one block for the error handling, take care of it in other places as well. |
diffIterateErr := image.DiffIterateByID(diffIterateByIDConfig) | ||
err = handleDiffIterateError(ctx, diffIterateErr) | ||
if err != nil { | ||
log.ErrorLog(ctx, "failed to get diff: %s", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
log it at the caller? we dont need to log this at multiple places in this function?
// handleDiffIterateError checks the error returned by DiffIterateByID. | ||
// It handles specific error codes and returns a formatted error message | ||
// for unrecognized error codes. | ||
func handleDiffIterateError(ctx context.Context, err error) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let the caller log it and we dont need to pass the ctx?
@@ -527,7 +527,7 @@ If custom image is built for the rbd-plugin instance, make sure that it contains | |||
|
|||
## Changed Block Tracking (CBT) | |||
|
|||
> **Warning**: Requires Ceph version that supports RBD snap diff by ID feature. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These changes should be part of the previous doc: add documentation for CBT feature
commit.
// defaultMaxResults is the internal limit for max results in metadata streaming | ||
// This is set to a reasonable value to prevent excessive memory usage | ||
// and ensure efficient processing of metadata requests. | ||
// TODO: Consider making this configurable via a cmdline flag. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please have an issue created for this, so it's easier to track.
This pull request now has conflicts with the target branch. Could you please resolve conflicts and force push the corrected changes? 🙏 |
Describe what this PR does
rbd: add support for changed block tracking RPCs
This commit adds support for GetMetadataAllocated
and GetMetadataDelta RPCs.
CSI Snapshot Metadata Service RPCs
Updates: #5346
Checklist:
guidelines in the developer
guide.
Request
notes
updated with breaking and/or notable changes for the next major release.
Show available bot commands
These commands are normally not required, but in case of issues, leave any of
the following bot commands in an otherwise empty comment in this PR:
/retest ci/centos/<job-name>
: retest the<job-name>
after unrelatedfailure (please report the failure too!)