8000 feat: Add support for json and protobuf schema types by bakjos · Pull Request #755 · tchiotludo/akhq · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

feat: Add support for json and protobuf schema types #755

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jul 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ dependencies {
implementation group: "org.apache.kafka", name: "kafka-clients", version: kafkaVersion
implementation group: "io.confluent", name: "kafka-schema-registry-client", version: confluentVersion
implementation group: "io.confluent", name: "kafka-avro-serializer", version: confluentVersion
implementation group: "io.confluent", name: "kafka-json-schema-serializer", version: confluentVersion
implementation group: "io.confluent", name: "kafka-protobuf-serializer", version: confluentVersion
implementation 'org.sourcelab:kafka-connect-client:3.1.1'

// strimzi
Expand Down
8 changes: 5 additions & 3 deletions client/src/containers/Schema/SchemaList/SchemaList.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import './styles.scss';
import AceEditor from 'react-ace';
import 'ace-builds/webpack-resolver';
import 'ace-builds/src-noconflict/mode-json';
import 'ace-builds/src-noconflict/mode-protobuf';
import 'ace-builds/src-noconflict/theme-merbivore_soft';
import { toast } from 'react-toastify';
import 'react-toastify/dist/ReactToastify.css';
Expand Down Expand Up @@ -106,7 +107,8 @@ class SchemaList extends Root {
subject: schema.subject,
version: schema.version,
exception: schema.exception,
schema: schema.schema ? JSON.stringify(JSON.parse(schema.schema), null, 2) : null
schemaType: schema.schemaType,
schema: schema.schemaType === "PROTOBUF" ? schema.schema : (schema.schema ? JSON.stringify(JSON.parse(schema.schema), null, 2) : null)
});
});
this.setState({ schemasRegistry: tableSchemaRegistry, loading: false });
Expand Down Expand Up @@ -217,7 +219,7 @@ class SchemaList extends Root {
extraRowContent: (obj, col, index) => {
return (
<AceEditor
mode="json"
mode={ obj.schemaType === "PROTOBUF"? "protobuf" : "json"}
id={'value' + index}
theme="merbivore_soft"
value={obj[col.accessor]}
Expand All @@ -233,7 +235,7 @@ class SchemaList extends Root {
return (
<pre className="mb-0 khq-data-highlight">
<code>
{JSON.stringify(JSON.parse(obj[col.accessor]))}
{ obj.schemaType === "PROTOBUF"? obj[col.accessor] : JSON.stringify(JSON.parse(obj[col.accessor]))}
</code>
</pre>
);
Expand Down
52 changes: 48 additions & 4 deletions src/main/java/org/akhq/models/Record.java
8000
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
package org.akhq.models;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.protobuf.DynamicMessage;
import com.google.protobuf.Message;
import com.google.protobuf.util.JsonFormat;
import io.confluent.kafka.schemaregistry.ParsedSchema;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.json.JsonSchema;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
import lombok.*;
import org.akhq.configs.SchemaRegistryType;
import org.akhq.utils.AvroToJsonSerializer;
Expand Down Expand Up @@ -34,6 +42,14 @@ public class Record {
private Map<String, String> headers = new HashMap<>();
@JsonIgnore
private Deserializer kafkaAvroDeserializer;
@JsonIgnore
private Deserializer kafkaProtoDeserializer;
@JsonIgnore
private Deserializer kafkaJsonDeserializer;

@JsonIgnore
private SchemaRegistryClient client;

private ProtobufToJsonDeserializer protobufToJsonDeserializer;

@Getter(AccessLevel.NONE)
Expand Down Expand Up @@ -69,13 +85,15 @@ public Record(RecordMetadata record, SchemaRegistryType schemaRegistryType, byte
this.headers = headers;
}

public Record(ConsumerRecord<byte[], byte[]> record, SchemaRegistryType schemaRegistryType, Deserializer kafkaAvroDeserializer,
public Record(SchemaRegistryClient client, ConsumerRecord<byte[], byte[]> record, SchemaRegistryType schemaRegistryType, Deserializer kafkaAvroDeserializer,
Deserializer kafkaJsonDeserializer, Deserializer kafkaProtoDeserializer,
ProtobufToJsonDeserializer protobufToJsonDeserializer, byte[] bytesValue) {
if (schemaRegistryType == SchemaRegistryType.TIBCO) {
this.MAGIC_BYTE = (byte) 0x80;
} else {
this.MAGIC_BYTE = 0x0;
}
this.client = client;
this.topic = record.topic();
this.partition = record.partition();
this.offset = record.offset();
Expand All @@ -91,6 +109,8 @@ public Record(ConsumerRecord<byte[], byte[]> record, SchemaRegistryType schemaRe

this.kafkaAvroDeserializer = kafkaAvroDeserializer;
this.protobufToJsonDeserializer = protobufToJsonDeserializer;
this.kafkaProtoDeserializer = kafkaProtoDeserializer;
this.kafkaJsonDeserializer = kafkaJsonDeserializer;
}

public String getKey() {
Expand Down Expand Up @@ -123,15 +143,39 @@ private String convertToString(byte[] payload, Integer schemaId, boolean isKey)
return null;
} else if (schemaId != null) {
try {
Object toType = kafkaAvroDeserializer.deserialize(topic, payload);


Object toType = null;

if (client != null) {
ParsedSchema schema = client.getSchemaById(schemaId);
if ( schema.schemaType().equals(ProtobufSchema.TYPE) ) {
toType = kafkaProtoDeserializer.deserialize(topic, payload);
if (!(toType instanceof Message)) {
return String.valueOf(toType);
}

Message dynamicMessage = (Message)toType;
return AvroToJsonSerializer.getMapper().readTree(JsonFormat.printer().print(dynamicMessage)).toString();
} else if ( schema.schemaType().equals(JsonSchema.TYPE) ) {
toType = kafkaJsonDeserializer.deserialize(topic, payload);
if ( !(toType instanceof JsonNode) ) {
return String.valueOf(toType);
}
JsonNode node = (JsonNode) toType;
return node.toString();
}
}

toType = kafkaAvroDeserializer.deserialize(topic, payload);

//for primitive avro type
if (!(toType instanceof GenericRecord)){
if (!(toType instanceof GenericRecord)) {
return String.valueOf(toType);
}

GenericRecord record = (GenericRecord) toType;
return AvroToJsonSerializer.toJson(record);

} catch (Exception exception) {
this.exceptions.add(exception.getMessage());

Expand Down
17 changes: 16 additions & 1 deletion src/main/java/org/akhq/models/Schema.java
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
package org.akhq.models;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import io.confluent.kafka.schemaregistry.ParsedSchema;
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaReference;
import io.confluent.kafka.schemaregistry.json.JsonSchema;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
import lombok.*;
import org.apache.avro.AvroTypeException;
import org.apache.avro.Schema.Parser;
Expand All @@ -26,11 +30,15 @@ public class Schema {
private Integer version;
private Config.CompatibilityLevelConfig compatibilityLevel;
private String schema;
private String schemaType;
private List<SchemaReference> references = new ArrayList<>();

@JsonIgnore
private org.apache.avro.Schema avroSchema;

@JsonIgnore
private JsonNode jsonSchema;

private String exception;

public Schema(Schema schema, Schema.Config config) {
Expand All @@ -55,7 +63,14 @@ public Schema(io.confluent.kafka.schemaregistry.client.rest.entities.Schema sche
}
this.references = parsedSchema.references();
this.schema = parsedSchema.rawSchema().toString();
this.avroSchema = parser.parse(this.schema);
this.schemaType = schema.getSchemaType();
if (schemaType.equals(AvroSchema.TYPE)) {
this.avroSchema = parser.parse(this.schema);
} else if ( schemaType.equals(JsonSchema.TYPE)) {
this.jsonSchema = ((JsonSchema)parsedSchema).toJsonNode();
} else if ( schemaType.equals(ProtobufSchema.TYPE)) {
this.schema = parsedSchema.canonicalString();
}
} catch (AvroTypeException e) {
this.schema = null;
this.exception = e.getMessage();
Expand Down
33 changes: 32 additions & 1 deletion src/main/java/org/akhq/modules/KafkaModule.java
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
package org.akhq.modules;

import com.google.common.collect.ImmutableMap;
import io.confluent.kafka.schemaregistry.SchemaProvider;
import io.confluent.kafka.schemaregistry.avro.AvroSchemaProvider;
import io.confluent.kafka.schemaregistry.json.JsonSchemaProvider;
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.RestService;
import io.confluent.kafka.schemaregistry.client.security.SslFactory;
import io.confluent.kafka.schemaregistry.client.security.basicauth.BasicAuthCredentialProvider;
import io.confluent.kafka.schemaregistry.client.security.basicauth.BasicAuthCredentialProviderFactory;
import io.confluent.kafka.schemaregistry.client.security.basicauth.UserInfoCredentialProvider;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.consumer.KafkaConsumer;
Expand All @@ -25,6 +28,7 @@
import javax.inject.Inject;
import javax.inject.Singleton;
import java.io.File;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -146,6 +150,26 @@ public AvroSchemaProvider getAvroSchemaProvider(String clusterId) {
return avroSchemaProvider;
}

public JsonSchemaProvider getJsonSchemaProvider(String clusterId) {
JsonSchemaProvider jsonSchemaProvider = new JsonSchemaProvider();
jsonSchemaProvider.configure(Collections.singletonMap(
"schemaVersionFetcher",
new CachedSchemaRegistryClient(this.getRegistryRestClient(clusterId), 100)
));

return jsonSchemaProvider;
}

public ProtobufSchemaProvider getProtobufSchemaProvider(String clusterId) {
ProtobufSchemaProvider protobufSchemaProvider = new ProtobufSchemaProvider();
protobufSchemaProvider.configure(Collections.singletonMap(
"schemaVersionFetcher",
new CachedSchemaRegistryClient(this.getRegistryRestClient(clusterId), 100)
));

return protobufSchemaProvider;
}

public RestService getRegistryRestClient(String clusterId) {
Connection connection = this.getConnection(clusterId);

Expand Down Expand Up @@ -199,10 +223,17 @@ public SchemaRegistryClient getRegistryClient(String clusterId) {
if (!this.registryClient.containsKey(clusterId)) {
Connection connection = this.getConnection(clusterId);

List<SchemaProvider> providers = new ArrayList<>();
providers.add( new AvroSchemaProvider() );
providers.add( new JsonSchemaProvider() );
providers.add( new ProtobufSchemaProvider() );

SchemaRegistryClient client = new CachedSchemaRegistryClient(
this.getRegistryRestClient(clusterId),
Integer.MAX_VALUE,
connection.getSchemaRegistry() != null ? connection.getSchemaRegistry().getProperties() : null
providers,
connection.getSchemaRegistry() != null ? connection.getSchemaRegistry().getProperties() : null,
null
);

this.registryClient.put(clusterId, client);
Expand Down
18 changes: 15 additions & 3 deletions src/main/java/org/akhq/repositories/RecordRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableMap;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.micronaut.context.annotation.Value;
import io.micronaut.context.env.Environment;
import io.micronaut.core.util.StringUtils;
import io.micronaut.http.sse.Event;
import io.reactivex.Flowable;
import lombok.*;
import lombok.extern.slf4j.Slf4j;
import org.akhq.configs.SchemaRegistryType;
import org.akhq.controllers.TopicController;
import org.akhq.models.Partition;
import org.akhq.models.Record;
Expand Down Expand Up @@ -417,23 +419,33 @@ private ConsumerRecords<byte[], byte[]> poll(KafkaConsumer<byte[], byte[]> consu
}

private Record newRecord(ConsumerRecord<byte[], byte[]> record, String clusterId) {
SchemaRegistryType schemaRegistryType = this.schemaRegistryRepository.getSchemaRegistryType(clusterId);
SchemaRegistryClient client = this.kafkaModule.getRegistryClient(clusterId);
return new Record(
client,
record,
this.schemaRegistryRepository.getSchemaRegistryType(clusterId),
this.schemaRegistryRepository.getKafkaAvroDeserializer(clusterId),
schemaRegistryType == SchemaRegistryType.CONFLUENT? this.schemaRegistryRepository.getKafkaJsonDeserializer(clusterId):null,
schemaRegistryType == SchemaRegistryType.CONFLUENT? this.schemaRegistryRepository.getKafkaProtoDeserializer(clusterId):null,
this.customDeserializerRepository.getProtobufToJsonDeserializer(clusterId),
avroWireFormatConverter.convertValueToWireFormat(record, this.kafkaModule.getRegistryClient(clusterId),
avroWireFormatConverter.convertValueToWireFormat(record, client,
this.schemaRegistryRepository.getSchemaRegistryType(clusterId))
);
}

private Record newRecord(ConsumerRecord<byte[], byte[]> record, BaseOptions options) {
SchemaRegistryType schemaRegistryType = this.schemaRegistryRepository.getSchemaRegistryType(options.clusterId);
SchemaRegistryClient client = this.kafkaModule.getRegistryClient(options.clusterId);
return new Record(
client,
record,
this.schemaRegistryRepository.getSchemaRegistryType(options.clusterId),
schemaRegistryType,
this.schemaRegistryRepository.getKafkaAvroDeserializer(options.clusterId),
schemaRegistryType == SchemaRegistryType.CONFLUENT? this.schemaRegistryRepository.getKafkaJsonDeserializer(options.clusterId):null,
schemaRegistryType == SchemaRegistryType.CONFLUENT? this.schemaRegistryRepository.getKafkaProtoDeserializer(options.clusterId):null,
this.customDeserializerRepository.getProtobufToJsonDeserializer(options.clusterId),
avroWireFormatConverter.convertValueToWireFormat(record, this.kafkaModule.getRegistryClient(options.clusterId),
avroWireFormatConverter.convertValueToWireFormat(record, client,
this.schemaRegistryRepository.getSchemaRegistryType(options.clusterId))
);
}
Expand Down
Loading
0