How to add column with the kafka message timestamp in kafka sink connector
I am configuring my connector using properties/json files, I am trying to add a timestamp column containing the kafka timestamp when it read the message from source connector without any success.
I have tried to add transforms
, but it's always null and my sink connector "big query" it return me an error
Failed to update table schema
I did put these configurations in bigquery connector properties
transforms=InsertField
transforms.InsertField.timestamp.field=fieldtime
transforms.InsertField.type=org.apache.kafka.connect.transforms.InsertField$Value
My source Config Sap connector
"name": "sap",
"config":
"connector.class": "com.sap.kafka.connect.source.hana.HANASourceConnector",
"tasks.max": "10",
"topics": "mytopic",
"connection.url": "jdbc:sap://IP:30015/",
"connection.user": "user",
"connection.password": "pass",
"group.id":"589f5ff5-1c43-46f4-bdd3-66884d61m185",
"mytopic.table.name": ""schema"."mytable""
My sink Connector BigQuery
name=bigconnect
connector.class=com.wepay.kafka.connect.bigquery.BigQuerySinkConnector
tasks.max=1
sanitizeTopics=true
autoCreateTables=true
autoUpdateSchemas=true
schemaRetriever=com.wepay.kafka.connect.bigquery.schemaregistry.schemaretriever.SchemaRegistrySchemaRetriever
schemaRegistryLocation=http://localhost:8081
bufferSize=100000
maxWriteSize=10000
tableWriteWait=1000
project=kafka-test-217517
topics=mytopic
datasets=.*=sap_dataset
keyfile=/opt/bgaccess.json
transforms=InsertField
transforms.InsertField.timestamp.field=fieldtime
transforms.InsertField.type=org.apache.kafka.connect.transforms.InsertField$Value
apache-kafka google-bigquery apache-kafka-connect
|
show 3 more comments
I am configuring my connector using properties/json files, I am trying to add a timestamp column containing the kafka timestamp when it read the message from source connector without any success.
I have tried to add transforms
, but it's always null and my sink connector "big query" it return me an error
Failed to update table schema
I did put these configurations in bigquery connector properties
transforms=InsertField
transforms.InsertField.timestamp.field=fieldtime
transforms.InsertField.type=org.apache.kafka.connect.transforms.InsertField$Value
My source Config Sap connector
"name": "sap",
"config":
"connector.class": "com.sap.kafka.connect.source.hana.HANASourceConnector",
"tasks.max": "10",
"topics": "mytopic",
"connection.url": "jdbc:sap://IP:30015/",
"connection.user": "user",
"connection.password": "pass",
"group.id":"589f5ff5-1c43-46f4-bdd3-66884d61m185",
"mytopic.table.name": ""schema"."mytable""
My sink Connector BigQuery
name=bigconnect
connector.class=com.wepay.kafka.connect.bigquery.BigQuerySinkConnector
tasks.max=1
sanitizeTopics=true
autoCreateTables=true
autoUpdateSchemas=true
schemaRetriever=com.wepay.kafka.connect.bigquery.schemaregistry.schemaretriever.SchemaRegistrySchemaRetriever
schemaRegistryLocation=http://localhost:8081
bufferSize=100000
maxWriteSize=10000
tableWriteWait=1000
project=kafka-test-217517
topics=mytopic
datasets=.*=sap_dataset
keyfile=/opt/bgaccess.json
transforms=InsertField
transforms.InsertField.timestamp.field=fieldtime
transforms.InsertField.type=org.apache.kafka.connect.transforms.InsertField$Value
apache-kafka google-bigquery apache-kafka-connect
which sink connector are you using`?
– Nishu Tayal
Nov 15 '18 at 13:06
bigquery i have tried with transforms.InsertSource.timestamp.field but it give an error regardign schema can't be modified
– Sano
Nov 15 '18 at 13:16
1
can you share your full Kafka Connect config (worker and connector), as well as the log from the Kafka Connect worker?
– Robin Moffatt
Nov 15 '18 at 13:32
@RobinMoffatt the sources are different multiple database, the destination connector is Bigquery I updated my answer with my config
– Sano
Nov 15 '18 at 14:56
That's not what I asked :) can you share the full contents of your Kafka Connect configuration files (worker and connector), as well as the log from the Kafka Connect worker?
– Robin Moffatt
Nov 15 '18 at 16:19
|
show 3 more comments
I am configuring my connector using properties/json files, I am trying to add a timestamp column containing the kafka timestamp when it read the message from source connector without any success.
I have tried to add transforms
, but it's always null and my sink connector "big query" it return me an error
Failed to update table schema
I did put these configurations in bigquery connector properties
transforms=InsertField
transforms.InsertField.timestamp.field=fieldtime
transforms.InsertField.type=org.apache.kafka.connect.transforms.InsertField$Value
My source Config Sap connector
"name": "sap",
"config":
"connector.class": "com.sap.kafka.connect.source.hana.HANASourceConnector",
"tasks.max": "10",
"topics": "mytopic",
"connection.url": "jdbc:sap://IP:30015/",
"connection.user": "user",
"connection.password": "pass",
"group.id":"589f5ff5-1c43-46f4-bdd3-66884d61m185",
"mytopic.table.name": ""schema"."mytable""
My sink Connector BigQuery
name=bigconnect
connector.class=com.wepay.kafka.connect.bigquery.BigQuerySinkConnector
tasks.max=1
sanitizeTopics=true
autoCreateTables=true
autoUpdateSchemas=true
schemaRetriever=com.wepay.kafka.connect.bigquery.schemaregistry.schemaretriever.SchemaRegistrySchemaRetriever
schemaRegistryLocation=http://localhost:8081
bufferSize=100000
maxWriteSize=10000
tableWriteWait=1000
project=kafka-test-217517
topics=mytopic
datasets=.*=sap_dataset
keyfile=/opt/bgaccess.json
transforms=InsertField
transforms.InsertField.timestamp.field=fieldtime
transforms.InsertField.type=org.apache.kafka.connect.transforms.InsertField$Value
apache-kafka google-bigquery apache-kafka-connect
I am configuring my connector using properties/json files, I am trying to add a timestamp column containing the kafka timestamp when it read the message from source connector without any success.
I have tried to add transforms
, but it's always null and my sink connector "big query" it return me an error
Failed to update table schema
I did put these configurations in bigquery connector properties
transforms=InsertField
transforms.InsertField.timestamp.field=fieldtime
transforms.InsertField.type=org.apache.kafka.connect.transforms.InsertField$Value
My source Config Sap connector
"name": "sap",
"config":
"connector.class": "com.sap.kafka.connect.source.hana.HANASourceConnector",
"tasks.max": "10",
"topics": "mytopic",
"connection.url": "jdbc:sap://IP:30015/",
"connection.user": "user",
"connection.password": "pass",
"group.id":"589f5ff5-1c43-46f4-bdd3-66884d61m185",
"mytopic.table.name": ""schema"."mytable""
My sink Connector BigQuery
name=bigconnect
connector.class=com.wepay.kafka.connect.bigquery.BigQuerySinkConnector
tasks.max=1
sanitizeTopics=true
autoCreateTables=true
autoUpdateSchemas=true
schemaRetriever=com.wepay.kafka.connect.bigquery.schemaregistry.schemaretriever.SchemaRegistrySchemaRetriever
schemaRegistryLocation=http://localhost:8081
bufferSize=100000
maxWriteSize=10000
tableWriteWait=1000
project=kafka-test-217517
topics=mytopic
datasets=.*=sap_dataset
keyfile=/opt/bgaccess.json
transforms=InsertField
transforms.InsertField.timestamp.field=fieldtime
transforms.InsertField.type=org.apache.kafka.connect.transforms.InsertField$Value
apache-kafka google-bigquery apache-kafka-connect
apache-kafka google-bigquery apache-kafka-connect
edited Nov 20 '18 at 11:45
Sano
asked Nov 15 '18 at 10:26
SanoSano
589
589
which sink connector are you using`?
– Nishu Tayal
Nov 15 '18 at 13:06
bigquery i have tried with transforms.InsertSource.timestamp.field but it give an error regardign schema can't be modified
– Sano
Nov 15 '18 at 13:16
1
can you share your full Kafka Connect config (worker and connector), as well as the log from the Kafka Connect worker?
– Robin Moffatt
Nov 15 '18 at 13:32
@RobinMoffatt the sources are different multiple database, the destination connector is Bigquery I updated my answer with my config
– Sano
Nov 15 '18 at 14:56
That's not what I asked :) can you share the full contents of your Kafka Connect configuration files (worker and connector), as well as the log from the Kafka Connect worker?
– Robin Moffatt
Nov 15 '18 at 16:19
|
show 3 more comments
which sink connector are you using`?
– Nishu Tayal
Nov 15 '18 at 13:06
bigquery i have tried with transforms.InsertSource.timestamp.field but it give an error regardign schema can't be modified
– Sano
Nov 15 '18 at 13:16
1
can you share your full Kafka Connect config (worker and connector), as well as the log from the Kafka Connect worker?
– Robin Moffatt
Nov 15 '18 at 13:32
@RobinMoffatt the sources are different multiple database, the destination connector is Bigquery I updated my answer with my config
– Sano
Nov 15 '18 at 14:56
That's not what I asked :) can you share the full contents of your Kafka Connect configuration files (worker and connector), as well as the log from the Kafka Connect worker?
– Robin Moffatt
Nov 15 '18 at 16:19
which sink connector are you using`?
– Nishu Tayal
Nov 15 '18 at 13:06
which sink connector are you using`?
– Nishu Tayal
Nov 15 '18 at 13:06
bigquery i have tried with transforms.InsertSource.timestamp.field but it give an error regardign schema can't be modified
– Sano
Nov 15 '18 at 13:16
bigquery i have tried with transforms.InsertSource.timestamp.field but it give an error regardign schema can't be modified
– Sano
Nov 15 '18 at 13:16
1
1
can you share your full Kafka Connect config (worker and connector), as well as the log from the Kafka Connect worker?
– Robin Moffatt
Nov 15 '18 at 13:32
can you share your full Kafka Connect config (worker and connector), as well as the log from the Kafka Connect worker?
– Robin Moffatt
Nov 15 '18 at 13:32
@RobinMoffatt the sources are different multiple database, the destination connector is Bigquery I updated my answer with my config
– Sano
Nov 15 '18 at 14:56
@RobinMoffatt the sources are different multiple database, the destination connector is Bigquery I updated my answer with my config
– Sano
Nov 15 '18 at 14:56
That's not what I asked :) can you share the full contents of your Kafka Connect configuration files (worker and connector), as well as the log from the Kafka Connect worker?
– Robin Moffatt
Nov 15 '18 at 16:19
That's not what I asked :) can you share the full contents of your Kafka Connect configuration files (worker and connector), as well as the log from the Kafka Connect worker?
– Robin Moffatt
Nov 15 '18 at 16:19
|
show 3 more comments
2 Answers
2
active
oldest
votes
I would guess your error is coming from BigQuery, not Kafka Connect.
For example, start a Connect Console Consumer in standalone mode, you would see messages like
Struct...,fieldtime=Fri Nov 16 07:38:19 UTC 2018
Tested with connect-standalone ./connect-standalone.properties ./connect-console-sink.properties
I have an input topic with Avro data... Update your own settings accordingly
connect-standalone.properties
bootstrap.servers=kafka:9092
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://schema-registry:8081
key.converter.schemas.enable=true
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://schema-registry:8081
value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
plugin.path=/usr/share/java
connect-console-sink.properties
name=local-console-sink
connector.class=org.apache.kafka.connect.file.FileStreamSinkConnector
tasks.max=1
topics=input-topic
transforms=InsertField
transforms.InsertField.timestamp.field=fieldtime
transforms.InsertField.type=org.apache.kafka.connect.transforms.InsertField$Value
In my caseThe problem is present in Source Connector and Sink connector. about BigQuery-Sink actually the problem was related with the Connector that don't manage well the transforms.InsertField.timestamp. And about source connector, if you use transforms.InsertField.timestamp this will be always NULL. All topics have ROWTIME column but i can't use it
– Sano
Nov 19 '18 at 10:10
KSQL has a "ROWTIME column" ... the actual message probably does not because there's no such thing as columns in a Kafka message... That being said, it'srecord.timestamp()
of the Java API, which is the field gotten bytimestamp.field=fieldtime
... And all connectors support all the same transforms, it's not specific to a certain one. In any case, Confluent has a blog including using BigQuery confluent.io/blog/data-wrangling-apache-kafka-ksql
– cricket_007
Nov 19 '18 at 14:12
add a comment |
OLD ANSWER
I think i reached to understand the problem behind
First of all you can't use the transform InsertField in any Source Connector because the Timestamp value for the msg is assigned at writing time into the topic so it's not something the connector can already know,
for JDBC connector there is this ticket
https://github.com/confluentinc/kafka-connect-jdbc/issues/311
and in sap source connector is not working as well.
Second BigQuery connector has a bug that doesn't allow the usage of InsertField to add the timestamp to every table as mentioned here
https://github.com/wepay/kafka-connect-bigquery/issues/125#issuecomment-439102994
So if you want use bigquery as your output the only solution right now is to manually edit the schema of each table to add the column before loading the cink connector
UPDATE 2018-12-03
The final solution to always add the message timestamp in SINK connector. Let's assume you want add the timestamp to EVERY table of sink connector
in your SOURCE CONNECTOR put this configuration
"transforms":"InsertField"
"transforms.InsertField.timestamp.field":"fieldtime",
"transforms.InsertField.type":"org.apache.kafka.connect.transforms.InsertField$Value"
This will add a column name called "fieldtime" to every source tables
in your SINK CONNECTOR put those configuration
"transforms":"InsertField,DropField",
"transforms.DropField.type":"org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.DropField.blacklist":"fieldtime",
"transforms.InsertSource.timestamp.field":"kafka_timestamp",
"transforms.InsertField.timestamp.field":"fieldtime",
"transforms.InsertField.type":"org.apache.kafka.connect.transforms.InsertField$Value"
this will virtually remove the column fieldtime and add it again with the timestamp of the message
This solution will automatically add the column with the right value without any addition operation
add a comment |
Your Answer
StackExchange.ifUsing("editor", function ()
StackExchange.using("externalEditor", function ()
StackExchange.using("snippets", function ()
StackExchange.snippets.init();
);
);
, "code-snippets");
StackExchange.ready(function()
var channelOptions =
tags: "".split(" "),
id: "1"
;
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function()
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled)
StackExchange.using("snippets", function()
createEditor();
);
else
createEditor();
);
function createEditor()
StackExchange.prepareEditor(
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader:
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
,
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
);
);
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53317310%2fhow-to-add-column-with-the-kafka-message-timestamp-in-kafka-sink-connector%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
2 Answers
2
active
oldest
votes
2 Answers
2
active
oldest
votes
active
oldest
votes
active
oldest
votes
I would guess your error is coming from BigQuery, not Kafka Connect.
For example, start a Connect Console Consumer in standalone mode, you would see messages like
Struct...,fieldtime=Fri Nov 16 07:38:19 UTC 2018
Tested with connect-standalone ./connect-standalone.properties ./connect-console-sink.properties
I have an input topic with Avro data... Update your own settings accordingly
connect-standalone.properties
bootstrap.servers=kafka:9092
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://schema-registry:8081
key.converter.schemas.enable=true
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://schema-registry:8081
value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
plugin.path=/usr/share/java
connect-console-sink.properties
name=local-console-sink
connector.class=org.apache.kafka.connect.file.FileStreamSinkConnector
tasks.max=1
topics=input-topic
transforms=InsertField
transforms.InsertField.timestamp.field=fieldtime
transforms.InsertField.type=org.apache.kafka.connect.transforms.InsertField$Value
In my caseThe problem is present in Source Connector and Sink connector. about BigQuery-Sink actually the problem was related with the Connector that don't manage well the transforms.InsertField.timestamp. And about source connector, if you use transforms.InsertField.timestamp this will be always NULL. All topics have ROWTIME column but i can't use it
– Sano
Nov 19 '18 at 10:10
KSQL has a "ROWTIME column" ... the actual message probably does not because there's no such thing as columns in a Kafka message... That being said, it'srecord.timestamp()
of the Java API, which is the field gotten bytimestamp.field=fieldtime
... And all connectors support all the same transforms, it's not specific to a certain one. In any case, Confluent has a blog including using BigQuery confluent.io/blog/data-wrangling-apache-kafka-ksql
– cricket_007
Nov 19 '18 at 14:12
add a comment |
I would guess your error is coming from BigQuery, not Kafka Connect.
For example, start a Connect Console Consumer in standalone mode, you would see messages like
Struct...,fieldtime=Fri Nov 16 07:38:19 UTC 2018
Tested with connect-standalone ./connect-standalone.properties ./connect-console-sink.properties
I have an input topic with Avro data... Update your own settings accordingly
connect-standalone.properties
bootstrap.servers=kafka:9092
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://schema-registry:8081
key.converter.schemas.enable=true
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://schema-registry:8081
value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
plugin.path=/usr/share/java
connect-console-sink.properties
name=local-console-sink
connector.class=org.apache.kafka.connect.file.FileStreamSinkConnector
tasks.max=1
topics=input-topic
transforms=InsertField
transforms.InsertField.timestamp.field=fieldtime
transforms.InsertField.type=org.apache.kafka.connect.transforms.InsertField$Value
In my caseThe problem is present in Source Connector and Sink connector. about BigQuery-Sink actually the problem was related with the Connector that don't manage well the transforms.InsertField.timestamp. And about source connector, if you use transforms.InsertField.timestamp this will be always NULL. All topics have ROWTIME column but i can't use it
– Sano
Nov 19 '18 at 10:10
KSQL has a "ROWTIME column" ... the actual message probably does not because there's no such thing as columns in a Kafka message... That being said, it'srecord.timestamp()
of the Java API, which is the field gotten bytimestamp.field=fieldtime
... And all connectors support all the same transforms, it's not specific to a certain one. In any case, Confluent has a blog including using BigQuery confluent.io/blog/data-wrangling-apache-kafka-ksql
– cricket_007
Nov 19 '18 at 14:12
add a comment |
I would guess your error is coming from BigQuery, not Kafka Connect.
For example, start a Connect Console Consumer in standalone mode, you would see messages like
Struct...,fieldtime=Fri Nov 16 07:38:19 UTC 2018
Tested with connect-standalone ./connect-standalone.properties ./connect-console-sink.properties
I have an input topic with Avro data... Update your own settings accordingly
connect-standalone.properties
bootstrap.servers=kafka:9092
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://schema-registry:8081
key.converter.schemas.enable=true
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://schema-registry:8081
value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
plugin.path=/usr/share/java
connect-console-sink.properties
name=local-console-sink
connector.class=org.apache.kafka.connect.file.FileStreamSinkConnector
tasks.max=1
topics=input-topic
transforms=InsertField
transforms.InsertField.timestamp.field=fieldtime
transforms.InsertField.type=org.apache.kafka.connect.transforms.InsertField$Value
I would guess your error is coming from BigQuery, not Kafka Connect.
For example, start a Connect Console Consumer in standalone mode, you would see messages like
Struct...,fieldtime=Fri Nov 16 07:38:19 UTC 2018
Tested with connect-standalone ./connect-standalone.properties ./connect-console-sink.properties
I have an input topic with Avro data... Update your own settings accordingly
connect-standalone.properties
bootstrap.servers=kafka:9092
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://schema-registry:8081
key.converter.schemas.enable=true
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://schema-registry:8081
value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
plugin.path=/usr/share/java
connect-console-sink.properties
name=local-console-sink
connector.class=org.apache.kafka.connect.file.FileStreamSinkConnector
tasks.max=1
topics=input-topic
transforms=InsertField
transforms.InsertField.timestamp.field=fieldtime
transforms.InsertField.type=org.apache.kafka.connect.transforms.InsertField$Value
answered Nov 16 '18 at 8:24
cricket_007cricket_007
83k1145113
83k1145113
In my caseThe problem is present in Source Connector and Sink connector. about BigQuery-Sink actually the problem was related with the Connector that don't manage well the transforms.InsertField.timestamp. And about source connector, if you use transforms.InsertField.timestamp this will be always NULL. All topics have ROWTIME column but i can't use it
– Sano
Nov 19 '18 at 10:10
KSQL has a "ROWTIME column" ... the actual message probably does not because there's no such thing as columns in a Kafka message... That being said, it'srecord.timestamp()
of the Java API, which is the field gotten bytimestamp.field=fieldtime
... And all connectors support all the same transforms, it's not specific to a certain one. In any case, Confluent has a blog including using BigQuery confluent.io/blog/data-wrangling-apache-kafka-ksql
– cricket_007
Nov 19 '18 at 14:12
add a comment |
In my caseThe problem is present in Source Connector and Sink connector. about BigQuery-Sink actually the problem was related with the Connector that don't manage well the transforms.InsertField.timestamp. And about source connector, if you use transforms.InsertField.timestamp this will be always NULL. All topics have ROWTIME column but i can't use it
– Sano
Nov 19 '18 at 10:10
KSQL has a "ROWTIME column" ... the actual message probably does not because there's no such thing as columns in a Kafka message... That being said, it'srecord.timestamp()
of the Java API, which is the field gotten bytimestamp.field=fieldtime
... And all connectors support all the same transforms, it's not specific to a certain one. In any case, Confluent has a blog including using BigQuery confluent.io/blog/data-wrangling-apache-kafka-ksql
– cricket_007
Nov 19 '18 at 14:12
In my caseThe problem is present in Source Connector and Sink connector. about BigQuery-Sink actually the problem was related with the Connector that don't manage well the transforms.InsertField.timestamp. And about source connector, if you use transforms.InsertField.timestamp this will be always NULL. All topics have ROWTIME column but i can't use it
– Sano
Nov 19 '18 at 10:10
In my caseThe problem is present in Source Connector and Sink connector. about BigQuery-Sink actually the problem was related with the Connector that don't manage well the transforms.InsertField.timestamp. And about source connector, if you use transforms.InsertField.timestamp this will be always NULL. All topics have ROWTIME column but i can't use it
– Sano
Nov 19 '18 at 10:10
KSQL has a "ROWTIME column" ... the actual message probably does not because there's no such thing as columns in a Kafka message... That being said, it's
record.timestamp()
of the Java API, which is the field gotten by timestamp.field=fieldtime
... And all connectors support all the same transforms, it's not specific to a certain one. In any case, Confluent has a blog including using BigQuery confluent.io/blog/data-wrangling-apache-kafka-ksql– cricket_007
Nov 19 '18 at 14:12
KSQL has a "ROWTIME column" ... the actual message probably does not because there's no such thing as columns in a Kafka message... That being said, it's
record.timestamp()
of the Java API, which is the field gotten by timestamp.field=fieldtime
... And all connectors support all the same transforms, it's not specific to a certain one. In any case, Confluent has a blog including using BigQuery confluent.io/blog/data-wrangling-apache-kafka-ksql– cricket_007
Nov 19 '18 at 14:12
add a comment |
OLD ANSWER
I think i reached to understand the problem behind
First of all you can't use the transform InsertField in any Source Connector because the Timestamp value for the msg is assigned at writing time into the topic so it's not something the connector can already know,
for JDBC connector there is this ticket
https://github.com/confluentinc/kafka-connect-jdbc/issues/311
and in sap source connector is not working as well.
Second BigQuery connector has a bug that doesn't allow the usage of InsertField to add the timestamp to every table as mentioned here
https://github.com/wepay/kafka-connect-bigquery/issues/125#issuecomment-439102994
So if you want use bigquery as your output the only solution right now is to manually edit the schema of each table to add the column before loading the cink connector
UPDATE 2018-12-03
The final solution to always add the message timestamp in SINK connector. Let's assume you want add the timestamp to EVERY table of sink connector
in your SOURCE CONNECTOR put this configuration
"transforms":"InsertField"
"transforms.InsertField.timestamp.field":"fieldtime",
"transforms.InsertField.type":"org.apache.kafka.connect.transforms.InsertField$Value"
This will add a column name called "fieldtime" to every source tables
in your SINK CONNECTOR put those configuration
"transforms":"InsertField,DropField",
"transforms.DropField.type":"org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.DropField.blacklist":"fieldtime",
"transforms.InsertSource.timestamp.field":"kafka_timestamp",
"transforms.InsertField.timestamp.field":"fieldtime",
"transforms.InsertField.type":"org.apache.kafka.connect.transforms.InsertField$Value"
this will virtually remove the column fieldtime and add it again with the timestamp of the message
This solution will automatically add the column with the right value without any addition operation
add a comment |
OLD ANSWER
I think i reached to understand the problem behind
First of all you can't use the transform InsertField in any Source Connector because the Timestamp value for the msg is assigned at writing time into the topic so it's not something the connector can already know,
for JDBC connector there is this ticket
https://github.com/confluentinc/kafka-connect-jdbc/issues/311
and in sap source connector is not working as well.
Second BigQuery connector has a bug that doesn't allow the usage of InsertField to add the timestamp to every table as mentioned here
https://github.com/wepay/kafka-connect-bigquery/issues/125#issuecomment-439102994
So if you want use bigquery as your output the only solution right now is to manually edit the schema of each table to add the column before loading the cink connector
UPDATE 2018-12-03
The final solution to always add the message timestamp in SINK connector. Let's assume you want add the timestamp to EVERY table of sink connector
in your SOURCE CONNECTOR put this configuration
"transforms":"InsertField"
"transforms.InsertField.timestamp.field":"fieldtime",
"transforms.InsertField.type":"org.apache.kafka.connect.transforms.InsertField$Value"
This will add a column name called "fieldtime" to every source tables
in your SINK CONNECTOR put those configuration
"transforms":"InsertField,DropField",
"transforms.DropField.type":"org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.DropField.blacklist":"fieldtime",
"transforms.InsertSource.timestamp.field":"kafka_timestamp",
"transforms.InsertField.timestamp.field":"fieldtime",
"transforms.InsertField.type":"org.apache.kafka.connect.transforms.InsertField$Value"
this will virtually remove the column fieldtime and add it again with the timestamp of the message
This solution will automatically add the column with the right value without any addition operation
add a comment |
OLD ANSWER
I think i reached to understand the problem behind
First of all you can't use the transform InsertField in any Source Connector because the Timestamp value for the msg is assigned at writing time into the topic so it's not something the connector can already know,
for JDBC connector there is this ticket
https://github.com/confluentinc/kafka-connect-jdbc/issues/311
and in sap source connector is not working as well.
Second BigQuery connector has a bug that doesn't allow the usage of InsertField to add the timestamp to every table as mentioned here
https://github.com/wepay/kafka-connect-bigquery/issues/125#issuecomment-439102994
So if you want use bigquery as your output the only solution right now is to manually edit the schema of each table to add the column before loading the cink connector
UPDATE 2018-12-03
The final solution to always add the message timestamp in SINK connector. Let's assume you want add the timestamp to EVERY table of sink connector
in your SOURCE CONNECTOR put this configuration
"transforms":"InsertField"
"transforms.InsertField.timestamp.field":"fieldtime",
"transforms.InsertField.type":"org.apache.kafka.connect.transforms.InsertField$Value"
This will add a column name called "fieldtime" to every source tables
in your SINK CONNECTOR put those configuration
"transforms":"InsertField,DropField",
"transforms.DropField.type":"org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.DropField.blacklist":"fieldtime",
"transforms.InsertSource.timestamp.field":"kafka_timestamp",
"transforms.InsertField.timestamp.field":"fieldtime",
"transforms.InsertField.type":"org.apache.kafka.connect.transforms.InsertField$Value"
this will virtually remove the column fieldtime and add it again with the timestamp of the message
This solution will automatically add the column with the right value without any addition operation
OLD ANSWER
I think i reached to understand the problem behind
First of all you can't use the transform InsertField in any Source Connector because the Timestamp value for the msg is assigned at writing time into the topic so it's not something the connector can already know,
for JDBC connector there is this ticket
https://github.com/confluentinc/kafka-connect-jdbc/issues/311
and in sap source connector is not working as well.
Second BigQuery connector has a bug that doesn't allow the usage of InsertField to add the timestamp to every table as mentioned here
https://github.com/wepay/kafka-connect-bigquery/issues/125#issuecomment-439102994
So if you want use bigquery as your output the only solution right now is to manually edit the schema of each table to add the column before loading the cink connector
UPDATE 2018-12-03
The final solution to always add the message timestamp in SINK connector. Let's assume you want add the timestamp to EVERY table of sink connector
in your SOURCE CONNECTOR put this configuration
"transforms":"InsertField"
"transforms.InsertField.timestamp.field":"fieldtime",
"transforms.InsertField.type":"org.apache.kafka.connect.transforms.InsertField$Value"
This will add a column name called "fieldtime" to every source tables
in your SINK CONNECTOR put those configuration
"transforms":"InsertField,DropField",
"transforms.DropField.type":"org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.DropField.blacklist":"fieldtime",
"transforms.InsertSource.timestamp.field":"kafka_timestamp",
"transforms.InsertField.timestamp.field":"fieldtime",
"transforms.InsertField.type":"org.apache.kafka.connect.transforms.InsertField$Value"
this will virtually remove the column fieldtime and add it again with the timestamp of the message
This solution will automatically add the column with the right value without any addition operation
edited Dec 3 '18 at 16:59
answered Nov 20 '18 at 13:20
SanoSano
589
589
add a comment |
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53317310%2fhow-to-add-column-with-the-kafka-message-timestamp-in-kafka-sink-connector%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
which sink connector are you using`?
– Nishu Tayal
Nov 15 '18 at 13:06
bigquery i have tried with transforms.InsertSource.timestamp.field but it give an error regardign schema can't be modified
– Sano
Nov 15 '18 at 13:16
1
can you share your full Kafka Connect config (worker and connector), as well as the log from the Kafka Connect worker?
– Robin Moffatt
Nov 15 '18 at 13:32
@RobinMoffatt the sources are different multiple database, the destination connector is Bigquery I updated my answer with my config
– Sano
Nov 15 '18 at 14:56
That's not what I asked :) can you share the full contents of your Kafka Connect configuration files (worker and connector), as well as the log from the Kafka Connect worker?
– Robin Moffatt
Nov 15 '18 at 16:19