8000 拆封receiveMessages by jingyugao · Pull Request #38 · withlin/canal-go · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

拆封receiveMessages #38

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 1 addition & 48 deletions client/simple_canal_connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,54 +306,7 @@ func (c *SimpleCanalConnector) receiveMessages() (*pb.Message, error) {
if err != nil {
return nil, err
}
p := new(pb.Packet)
err = proto.Unmarshal(data, p)
if err != nil {
return nil, err
}
messages := new(pb.Messages)
message := new(pb.Message)

length := len(messages.Messages)
message.Entries = make([]pb.Entry, length)
ack := new(pb.Ack)
var items []pb.Entry
var entry pb.Entry
switch p.Type {
case pb.PacketType_MESSAGES:
if !(p.GetCompression() == pb.Compression_NONE) {
panic("compression is not supported in this connector")
}
err := proto.Unmarshal(p.Body, messages)
if err != nil {
return nil, err
}
if c.LazyParseEntry {
message.RawEntries = messages.Messages
} else {

for _, value := range messages.Messages {
err := proto.Unmarshal(value, &entry)
if err != nil {
return nil, err
}
items = append(items, entry)
}
}
message.Entries = items
message.Id = messages.GetBatchId()
return message, nil

case pb.PacketType_ACK:
err := proto.Unmarshal(p.Body, ack)
if err != nil {
return nil, err
}
panic(errors.New(fmt.Sprintf("something goes wrong with reason:%s", ack.GetErrorMessage())))
default:
panic(errors.New(fmt.Sprintf("unexpected packet type:%s", p.Type)))

}
return pb.Decode(data, c.LazyParseEntry)
}

//Ack Ack Canal-server的数据(就是昨晚某些逻辑操作后删除canal-server端的数据)
Expand Down
57 changes: 57 additions & 0 deletions protocol/Message.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,13 @@

package com_alibaba_otter_canal_protocol

import (
"errors"
"fmt"

"github.com/gogo/protobuf/proto"
)

type Message struct {
Id int64
Entries []Entry
Expand All @@ -27,3 +34,53 @@ func NewMessage(id int64) *Message {
message := &Message{Id: id, Entries: nil, Raw: false, RawEntries: nil}
return message
}

func Decode(data []byte, lazyParseEntry bool) (*Message, error) {
p := new(Packet)
err := proto.Unmarshal(data, p)
if err != nil {
return nil, err
}
messages := new(Messages)
message := new(Message)

length := len(messages.Messages)
message.Entries = make([]Entry, length)
ack := new(Ack)
var items []Entry
var entry Entry
switch p.Type {
case PacketType_MESSAGES:
if !(p.GetCompression() == Compression_NONE) {
panic("compression is not supported in this connector")
}
err := proto.Unmarshal(p.Body, messages)
if err != nil {
return nil, err
}
if lazyParseEntry {
message.RawEntries = messages.Messages
} else {

for _, value := range messages.Messages {
err := proto.Unmarshal(value, &entry)
if err != nil {
return nil, err
}
items = append(items, entry)
}
}
message.Entries = items
message.Id = messages.GetBatchId()
return message, nil

case PacketType_ACK:
err := proto.Unmarshal(p.Body, ack)
if err != nil {
return nil, err
}
panic(errors.New(fmt.Sprintf("something goes wrong with reason:%s", ack.GetErrorMessage())))
default:
panic(errors.New(fmt.Sprintf("unexpected packet type:%s", p.Type)))
}
}
0