How to add column with the kafka message timestamp in kafka sink connector










0















I am configuring my connector using properties/json files, I am trying to add a timestamp column containing the kafka timestamp when it read the message from source connector without any success.



I have tried to add transforms, but it's always null and my sink connector "big query" it return me an error




Failed to update table schema




I did put these configurations in bigquery connector properties



transforms=InsertField
transforms.InsertField.timestamp.field=fieldtime
transforms.InsertField.type=org.apache.kafka.connect.transforms.InsertField$Value


My source Config Sap connector




"name": "sap",
"config":
"connector.class": "com.sap.kafka.connect.source.hana.HANASourceConnector",
"tasks.max": "10",
"topics": "mytopic",
"connection.url": "jdbc:sap://IP:30015/",
"connection.user": "user",
"connection.password": "pass",
"group.id":"589f5ff5-1c43-46f4-bdd3-66884d61m185",
"mytopic.table.name": ""schema"."mytable""




My sink Connector BigQuery



name=bigconnect
connector.class=com.wepay.kafka.connect.bigquery.BigQuerySinkConnector
tasks.max=1

sanitizeTopics=true

autoCreateTables=true
autoUpdateSchemas=true

schemaRetriever=com.wepay.kafka.connect.bigquery.schemaregistry.schemaretriever.SchemaRegistrySchemaRetriever
schemaRegistryLocation=http://localhost:8081

bufferSize=100000
maxWriteSize=10000
tableWriteWait=1000

project=kafka-test-217517
topics=mytopic
datasets=.*=sap_dataset
keyfile=/opt/bgaccess.json
transforms=InsertField
transforms.InsertField.timestamp.field=fieldtime
transforms.InsertField.type=org.apache.kafka.connect.transforms.InsertField$Value









share|improve this question
























  • which sink connector are you using`?

    – Nishu Tayal
    Nov 15 '18 at 13:06











  • bigquery i have tried with transforms.InsertSource.timestamp.field but it give an error regardign schema can't be modified

    – Sano
    Nov 15 '18 at 13:16






  • 1





    can you share your full Kafka Connect config (worker and connector), as well as the log from the Kafka Connect worker?

    – Robin Moffatt
    Nov 15 '18 at 13:32











  • @RobinMoffatt the sources are different multiple database, the destination connector is Bigquery I updated my answer with my config

    – Sano
    Nov 15 '18 at 14:56











  • That's not what I asked :) can you share the full contents of your Kafka Connect configuration files (worker and connector), as well as the log from the Kafka Connect worker?

    – Robin Moffatt
    Nov 15 '18 at 16:19















0















I am configuring my connector using properties/json files, I am trying to add a timestamp column containing the kafka timestamp when it read the message from source connector without any success.



I have tried to add transforms, but it's always null and my sink connector "big query" it return me an error




Failed to update table schema




I did put these configurations in bigquery connector properties



transforms=InsertField
transforms.InsertField.timestamp.field=fieldtime
transforms.InsertField.type=org.apache.kafka.connect.transforms.InsertField$Value


My source Config Sap connector




"name": "sap",
"config":
"connector.class": "com.sap.kafka.connect.source.hana.HANASourceConnector",
"tasks.max": "10",
"topics": "mytopic",
"connection.url": "jdbc:sap://IP:30015/",
"connection.user": "user",
"connection.password": "pass",
"group.id":"589f5ff5-1c43-46f4-bdd3-66884d61m185",
"mytopic.table.name": ""schema"."mytable""




My sink Connector BigQuery



name=bigconnect
connector.class=com.wepay.kafka.connect.bigquery.BigQuerySinkConnector
tasks.max=1

sanitizeTopics=true

autoCreateTables=true
autoUpdateSchemas=true

schemaRetriever=com.wepay.kafka.connect.bigquery.schemaregistry.schemaretriever.SchemaRegistrySchemaRetriever
schemaRegistryLocation=http://localhost:8081

bufferSize=100000
maxWriteSize=10000
tableWriteWait=1000

project=kafka-test-217517
topics=mytopic
datasets=.*=sap_dataset
keyfile=/opt/bgaccess.json
transforms=InsertField
transforms.InsertField.timestamp.field=fieldtime
transforms.InsertField.type=org.apache.kafka.connect.transforms.InsertField$Value









share|improve this question
























  • which sink connector are you using`?

    – Nishu Tayal
    Nov 15 '18 at 13:06











  • bigquery i have tried with transforms.InsertSource.timestamp.field but it give an error regardign schema can't be modified

    – Sano
    Nov 15 '18 at 13:16






  • 1





    can you share your full Kafka Connect config (worker and connector), as well as the log from the Kafka Connect worker?

    – Robin Moffatt
    Nov 15 '18 at 13:32











  • @RobinMoffatt the sources are different multiple database, the destination connector is Bigquery I updated my answer with my config

    – Sano
    Nov 15 '18 at 14:56











  • That's not what I asked :) can you share the full contents of your Kafka Connect configuration files (worker and connector), as well as the log from the Kafka Connect worker?

    – Robin Moffatt
    Nov 15 '18 at 16:19













0












0








0








I am configuring my connector using properties/json files, I am trying to add a timestamp column containing the kafka timestamp when it read the message from source connector without any success.



I have tried to add transforms, but it's always null and my sink connector "big query" it return me an error




Failed to update table schema




I did put these configurations in bigquery connector properties



transforms=InsertField
transforms.InsertField.timestamp.field=fieldtime
transforms.InsertField.type=org.apache.kafka.connect.transforms.InsertField$Value


My source Config Sap connector




"name": "sap",
"config":
"connector.class": "com.sap.kafka.connect.source.hana.HANASourceConnector",
"tasks.max": "10",
"topics": "mytopic",
"connection.url": "jdbc:sap://IP:30015/",
"connection.user": "user",
"connection.password": "pass",
"group.id":"589f5ff5-1c43-46f4-bdd3-66884d61m185",
"mytopic.table.name": ""schema"."mytable""




My sink Connector BigQuery



name=bigconnect
connector.class=com.wepay.kafka.connect.bigquery.BigQuerySinkConnector
tasks.max=1

sanitizeTopics=true

autoCreateTables=true
autoUpdateSchemas=true

schemaRetriever=com.wepay.kafka.connect.bigquery.schemaregistry.schemaretriever.SchemaRegistrySchemaRetriever
schemaRegistryLocation=http://localhost:8081

bufferSize=100000
maxWriteSize=10000
tableWriteWait=1000

project=kafka-test-217517
topics=mytopic
datasets=.*=sap_dataset
keyfile=/opt/bgaccess.json
transforms=InsertField
transforms.InsertField.timestamp.field=fieldtime
transforms.InsertField.type=org.apache.kafka.connect.transforms.InsertField$Value









share|improve this question
















I am configuring my connector using properties/json files, I am trying to add a timestamp column containing the kafka timestamp when it read the message from source connector without any success.



I have tried to add transforms, but it's always null and my sink connector "big query" it return me an error




Failed to update table schema




I did put these configurations in bigquery connector properties



transforms=InsertField
transforms.InsertField.timestamp.field=fieldtime
transforms.InsertField.type=org.apache.kafka.connect.transforms.InsertField$Value


My source Config Sap connector




"name": "sap",
"config":
"connector.class": "com.sap.kafka.connect.source.hana.HANASourceConnector",
"tasks.max": "10",
"topics": "mytopic",
"connection.url": "jdbc:sap://IP:30015/",
"connection.user": "user",
"connection.password": "pass",
"group.id":"589f5ff5-1c43-46f4-bdd3-66884d61m185",
"mytopic.table.name": ""schema"."mytable""




My sink Connector BigQuery



name=bigconnect
connector.class=com.wepay.kafka.connect.bigquery.BigQuerySinkConnector
tasks.max=1

sanitizeTopics=true

autoCreateTables=true
autoUpdateSchemas=true

schemaRetriever=com.wepay.kafka.connect.bigquery.schemaregistry.schemaretriever.SchemaRegistrySchemaRetriever
schemaRegistryLocation=http://localhost:8081

bufferSize=100000
maxWriteSize=10000
tableWriteWait=1000

project=kafka-test-217517
topics=mytopic
datasets=.*=sap_dataset
keyfile=/opt/bgaccess.json
transforms=InsertField
transforms.InsertField.timestamp.field=fieldtime
transforms.InsertField.type=org.apache.kafka.connect.transforms.InsertField$Value






apache-kafka google-bigquery apache-kafka-connect






share|improve this question















share|improve this question













share|improve this question




share|improve this question








edited Nov 20 '18 at 11:45







Sano

















asked Nov 15 '18 at 10:26









SanoSano

589




589












  • which sink connector are you using`?

    – Nishu Tayal
    Nov 15 '18 at 13:06











  • bigquery i have tried with transforms.InsertSource.timestamp.field but it give an error regardign schema can't be modified

    – Sano
    Nov 15 '18 at 13:16






  • 1





    can you share your full Kafka Connect config (worker and connector), as well as the log from the Kafka Connect worker?

    – Robin Moffatt
    Nov 15 '18 at 13:32











  • @RobinMoffatt the sources are different multiple database, the destination connector is Bigquery I updated my answer with my config

    – Sano
    Nov 15 '18 at 14:56











  • That's not what I asked :) can you share the full contents of your Kafka Connect configuration files (worker and connector), as well as the log from the Kafka Connect worker?

    – Robin Moffatt
    Nov 15 '18 at 16:19

















  • which sink connector are you using`?

    – Nishu Tayal
    Nov 15 '18 at 13:06











  • bigquery i have tried with transforms.InsertSource.timestamp.field but it give an error regardign schema can't be modified

    – Sano
    Nov 15 '18 at 13:16






  • 1





    can you share your full Kafka Connect config (worker and connector), as well as the log from the Kafka Connect worker?

    – Robin Moffatt
    Nov 15 '18 at 13:32











  • @RobinMoffatt the sources are different multiple database, the destination connector is Bigquery I updated my answer with my config

    – Sano
    Nov 15 '18 at 14:56











  • That's not what I asked :) can you share the full contents of your Kafka Connect configuration files (worker and connector), as well as the log from the Kafka Connect worker?

    – Robin Moffatt
    Nov 15 '18 at 16:19
















which sink connector are you using`?

– Nishu Tayal
Nov 15 '18 at 13:06





which sink connector are you using`?

– Nishu Tayal
Nov 15 '18 at 13:06













bigquery i have tried with transforms.InsertSource.timestamp.field but it give an error regardign schema can't be modified

– Sano
Nov 15 '18 at 13:16





bigquery i have tried with transforms.InsertSource.timestamp.field but it give an error regardign schema can't be modified

– Sano
Nov 15 '18 at 13:16




1




1





can you share your full Kafka Connect config (worker and connector), as well as the log from the Kafka Connect worker?

– Robin Moffatt
Nov 15 '18 at 13:32





can you share your full Kafka Connect config (worker and connector), as well as the log from the Kafka Connect worker?

– Robin Moffatt
Nov 15 '18 at 13:32













@RobinMoffatt the sources are different multiple database, the destination connector is Bigquery I updated my answer with my config

– Sano
Nov 15 '18 at 14:56





@RobinMoffatt the sources are different multiple database, the destination connector is Bigquery I updated my answer with my config

– Sano
Nov 15 '18 at 14:56













That's not what I asked :) can you share the full contents of your Kafka Connect configuration files (worker and connector), as well as the log from the Kafka Connect worker?

– Robin Moffatt
Nov 15 '18 at 16:19





That's not what I asked :) can you share the full contents of your Kafka Connect configuration files (worker and connector), as well as the log from the Kafka Connect worker?

– Robin Moffatt
Nov 15 '18 at 16:19












2 Answers
2






active

oldest

votes


















0














I would guess your error is coming from BigQuery, not Kafka Connect.



For example, start a Connect Console Consumer in standalone mode, you would see messages like



Struct...,fieldtime=Fri Nov 16 07:38:19 UTC 2018




Tested with connect-standalone ./connect-standalone.properties ./connect-console-sink.properties



I have an input topic with Avro data... Update your own settings accordingly



connect-standalone.properties



bootstrap.servers=kafka:9092

key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://schema-registry:8081
key.converter.schemas.enable=true

value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://schema-registry:8081
value.converter.schemas.enable=true

offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000

plugin.path=/usr/share/java


connect-console-sink.properties



name=local-console-sink
connector.class=org.apache.kafka.connect.file.FileStreamSinkConnector
tasks.max=1
topics=input-topic

transforms=InsertField
transforms.InsertField.timestamp.field=fieldtime
transforms.InsertField.type=org.apache.kafka.connect.transforms.InsertField$Value





share|improve this answer























  • In my caseThe problem is present in Source Connector and Sink connector. about BigQuery-Sink actually the problem was related with the Connector that don't manage well the transforms.InsertField.timestamp. And about source connector, if you use transforms.InsertField.timestamp this will be always NULL. All topics have ROWTIME column but i can't use it

    – Sano
    Nov 19 '18 at 10:10












  • KSQL has a "ROWTIME column" ... the actual message probably does not because there's no such thing as columns in a Kafka message... That being said, it's record.timestamp() of the Java API, which is the field gotten by timestamp.field=fieldtime... And all connectors support all the same transforms, it's not specific to a certain one. In any case, Confluent has a blog including using BigQuery confluent.io/blog/data-wrangling-apache-kafka-ksql

    – cricket_007
    Nov 19 '18 at 14:12


















0














OLD ANSWER
I think i reached to understand the problem behind



First of all you can't use the transform InsertField in any Source Connector because the Timestamp value for the msg is assigned at writing time into the topic so it's not something the connector can already know,

for JDBC connector there is this ticket
https://github.com/confluentinc/kafka-connect-jdbc/issues/311



and in sap source connector is not working as well.



Second BigQuery connector has a bug that doesn't allow the usage of InsertField to add the timestamp to every table as mentioned here



https://github.com/wepay/kafka-connect-bigquery/issues/125#issuecomment-439102994



So if you want use bigquery as your output the only solution right now is to manually edit the schema of each table to add the column before loading the cink connector



UPDATE 2018-12-03
The final solution to always add the message timestamp in SINK connector. Let's assume you want add the timestamp to EVERY table of sink connector



in your SOURCE CONNECTOR put this configuration



"transforms":"InsertField"
"transforms.InsertField.timestamp.field":"fieldtime",
"transforms.InsertField.type":"org.apache.kafka.connect.transforms.InsertField$Value"


This will add a column name called "fieldtime" to every source tables



in your SINK CONNECTOR put those configuration



"transforms":"InsertField,DropField",
"transforms.DropField.type":"org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.DropField.blacklist":"fieldtime",
"transforms.InsertSource.timestamp.field":"kafka_timestamp",
"transforms.InsertField.timestamp.field":"fieldtime",
"transforms.InsertField.type":"org.apache.kafka.connect.transforms.InsertField$Value"


this will virtually remove the column fieldtime and add it again with the timestamp of the message



This solution will automatically add the column with the right value without any addition operation






share|improve this answer
























    Your Answer






    StackExchange.ifUsing("editor", function ()
    StackExchange.using("externalEditor", function ()
    StackExchange.using("snippets", function ()
    StackExchange.snippets.init();
    );
    );
    , "code-snippets");

    StackExchange.ready(function()
    var channelOptions =
    tags: "".split(" "),
    id: "1"
    ;
    initTagRenderer("".split(" "), "".split(" "), channelOptions);

    StackExchange.using("externalEditor", function()
    // Have to fire editor after snippets, if snippets enabled
    if (StackExchange.settings.snippets.snippetsEnabled)
    StackExchange.using("snippets", function()
    createEditor();
    );

    else
    createEditor();

    );

    function createEditor()
    StackExchange.prepareEditor(
    heartbeatType: 'answer',
    autoActivateHeartbeat: false,
    convertImagesToLinks: true,
    noModals: true,
    showLowRepImageUploadWarning: true,
    reputationToPostImages: 10,
    bindNavPrevention: true,
    postfix: "",
    imageUploader:
    brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
    contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
    allowUrls: true
    ,
    onDemand: true,
    discardSelector: ".discard-answer"
    ,immediatelyShowMarkdownHelp:true
    );



    );













    draft saved

    draft discarded


















    StackExchange.ready(
    function ()
    StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53317310%2fhow-to-add-column-with-the-kafka-message-timestamp-in-kafka-sink-connector%23new-answer', 'question_page');

    );

    Post as a guest















    Required, but never shown

























    2 Answers
    2






    active

    oldest

    votes








    2 Answers
    2






    active

    oldest

    votes









    active

    oldest

    votes






    active

    oldest

    votes









    0














    I would guess your error is coming from BigQuery, not Kafka Connect.



    For example, start a Connect Console Consumer in standalone mode, you would see messages like



    Struct...,fieldtime=Fri Nov 16 07:38:19 UTC 2018




    Tested with connect-standalone ./connect-standalone.properties ./connect-console-sink.properties



    I have an input topic with Avro data... Update your own settings accordingly



    connect-standalone.properties



    bootstrap.servers=kafka:9092

    key.converter=io.confluent.connect.avro.AvroConverter
    key.converter.schema.registry.url=http://schema-registry:8081
    key.converter.schemas.enable=true

    value.converter=io.confluent.connect.avro.AvroConverter
    value.converter.schema.registry.url=http://schema-registry:8081
    value.converter.schemas.enable=true

    offset.storage.file.filename=/tmp/connect.offsets
    offset.flush.interval.ms=10000

    plugin.path=/usr/share/java


    connect-console-sink.properties



    name=local-console-sink
    connector.class=org.apache.kafka.connect.file.FileStreamSinkConnector
    tasks.max=1
    topics=input-topic

    transforms=InsertField
    transforms.InsertField.timestamp.field=fieldtime
    transforms.InsertField.type=org.apache.kafka.connect.transforms.InsertField$Value





    share|improve this answer























    • In my caseThe problem is present in Source Connector and Sink connector. about BigQuery-Sink actually the problem was related with the Connector that don't manage well the transforms.InsertField.timestamp. And about source connector, if you use transforms.InsertField.timestamp this will be always NULL. All topics have ROWTIME column but i can't use it

      – Sano
      Nov 19 '18 at 10:10












    • KSQL has a "ROWTIME column" ... the actual message probably does not because there's no such thing as columns in a Kafka message... That being said, it's record.timestamp() of the Java API, which is the field gotten by timestamp.field=fieldtime... And all connectors support all the same transforms, it's not specific to a certain one. In any case, Confluent has a blog including using BigQuery confluent.io/blog/data-wrangling-apache-kafka-ksql

      – cricket_007
      Nov 19 '18 at 14:12















    0














    I would guess your error is coming from BigQuery, not Kafka Connect.



    For example, start a Connect Console Consumer in standalone mode, you would see messages like



    Struct...,fieldtime=Fri Nov 16 07:38:19 UTC 2018




    Tested with connect-standalone ./connect-standalone.properties ./connect-console-sink.properties



    I have an input topic with Avro data... Update your own settings accordingly



    connect-standalone.properties



    bootstrap.servers=kafka:9092

    key.converter=io.confluent.connect.avro.AvroConverter
    key.converter.schema.registry.url=http://schema-registry:8081
    key.converter.schemas.enable=true

    value.converter=io.confluent.connect.avro.AvroConverter
    value.converter.schema.registry.url=http://schema-registry:8081
    value.converter.schemas.enable=true

    offset.storage.file.filename=/tmp/connect.offsets
    offset.flush.interval.ms=10000

    plugin.path=/usr/share/java


    connect-console-sink.properties



    name=local-console-sink
    connector.class=org.apache.kafka.connect.file.FileStreamSinkConnector
    tasks.max=1
    topics=input-topic

    transforms=InsertField
    transforms.InsertField.timestamp.field=fieldtime
    transforms.InsertField.type=org.apache.kafka.connect.transforms.InsertField$Value





    share|improve this answer























    • In my caseThe problem is present in Source Connector and Sink connector. about BigQuery-Sink actually the problem was related with the Connector that don't manage well the transforms.InsertField.timestamp. And about source connector, if you use transforms.InsertField.timestamp this will be always NULL. All topics have ROWTIME column but i can't use it

      – Sano
      Nov 19 '18 at 10:10












    • KSQL has a "ROWTIME column" ... the actual message probably does not because there's no such thing as columns in a Kafka message... That being said, it's record.timestamp() of the Java API, which is the field gotten by timestamp.field=fieldtime... And all connectors support all the same transforms, it's not specific to a certain one. In any case, Confluent has a blog including using BigQuery confluent.io/blog/data-wrangling-apache-kafka-ksql

      – cricket_007
      Nov 19 '18 at 14:12













    0












    0








    0







    I would guess your error is coming from BigQuery, not Kafka Connect.



    For example, start a Connect Console Consumer in standalone mode, you would see messages like



    Struct...,fieldtime=Fri Nov 16 07:38:19 UTC 2018




    Tested with connect-standalone ./connect-standalone.properties ./connect-console-sink.properties



    I have an input topic with Avro data... Update your own settings accordingly



    connect-standalone.properties



    bootstrap.servers=kafka:9092

    key.converter=io.confluent.connect.avro.AvroConverter
    key.converter.schema.registry.url=http://schema-registry:8081
    key.converter.schemas.enable=true

    value.converter=io.confluent.connect.avro.AvroConverter
    value.converter.schema.registry.url=http://schema-registry:8081
    value.converter.schemas.enable=true

    offset.storage.file.filename=/tmp/connect.offsets
    offset.flush.interval.ms=10000

    plugin.path=/usr/share/java


    connect-console-sink.properties



    name=local-console-sink
    connector.class=org.apache.kafka.connect.file.FileStreamSinkConnector
    tasks.max=1
    topics=input-topic

    transforms=InsertField
    transforms.InsertField.timestamp.field=fieldtime
    transforms.InsertField.type=org.apache.kafka.connect.transforms.InsertField$Value





    share|improve this answer













    I would guess your error is coming from BigQuery, not Kafka Connect.



    For example, start a Connect Console Consumer in standalone mode, you would see messages like



    Struct...,fieldtime=Fri Nov 16 07:38:19 UTC 2018




    Tested with connect-standalone ./connect-standalone.properties ./connect-console-sink.properties



    I have an input topic with Avro data... Update your own settings accordingly



    connect-standalone.properties



    bootstrap.servers=kafka:9092

    key.converter=io.confluent.connect.avro.AvroConverter
    key.converter.schema.registry.url=http://schema-registry:8081
    key.converter.schemas.enable=true

    value.converter=io.confluent.connect.avro.AvroConverter
    value.converter.schema.registry.url=http://schema-registry:8081
    value.converter.schemas.enable=true

    offset.storage.file.filename=/tmp/connect.offsets
    offset.flush.interval.ms=10000

    plugin.path=/usr/share/java


    connect-console-sink.properties



    name=local-console-sink
    connector.class=org.apache.kafka.connect.file.FileStreamSinkConnector
    tasks.max=1
    topics=input-topic

    transforms=InsertField
    transforms.InsertField.timestamp.field=fieldtime
    transforms.InsertField.type=org.apache.kafka.connect.transforms.InsertField$Value






    share|improve this answer












    share|improve this answer



    share|improve this answer










    answered Nov 16 '18 at 8:24









    cricket_007cricket_007

    83k1145113




    83k1145113












    • In my caseThe problem is present in Source Connector and Sink connector. about BigQuery-Sink actually the problem was related with the Connector that don't manage well the transforms.InsertField.timestamp. And about source connector, if you use transforms.InsertField.timestamp this will be always NULL. All topics have ROWTIME column but i can't use it

      – Sano
      Nov 19 '18 at 10:10












    • KSQL has a "ROWTIME column" ... the actual message probably does not because there's no such thing as columns in a Kafka message... That being said, it's record.timestamp() of the Java API, which is the field gotten by timestamp.field=fieldtime... And all connectors support all the same transforms, it's not specific to a certain one. In any case, Confluent has a blog including using BigQuery confluent.io/blog/data-wrangling-apache-kafka-ksql

      – cricket_007
      Nov 19 '18 at 14:12

















    • In my caseThe problem is present in Source Connector and Sink connector. about BigQuery-Sink actually the problem was related with the Connector that don't manage well the transforms.InsertField.timestamp. And about source connector, if you use transforms.InsertField.timestamp this will be always NULL. All topics have ROWTIME column but i can't use it

      – Sano
      Nov 19 '18 at 10:10












    • KSQL has a "ROWTIME column" ... the actual message probably does not because there's no such thing as columns in a Kafka message... That being said, it's record.timestamp() of the Java API, which is the field gotten by timestamp.field=fieldtime... And all connectors support all the same transforms, it's not specific to a certain one. In any case, Confluent has a blog including using BigQuery confluent.io/blog/data-wrangling-apache-kafka-ksql

      – cricket_007
      Nov 19 '18 at 14:12
















    In my caseThe problem is present in Source Connector and Sink connector. about BigQuery-Sink actually the problem was related with the Connector that don't manage well the transforms.InsertField.timestamp. And about source connector, if you use transforms.InsertField.timestamp this will be always NULL. All topics have ROWTIME column but i can't use it

    – Sano
    Nov 19 '18 at 10:10






    In my caseThe problem is present in Source Connector and Sink connector. about BigQuery-Sink actually the problem was related with the Connector that don't manage well the transforms.InsertField.timestamp. And about source connector, if you use transforms.InsertField.timestamp this will be always NULL. All topics have ROWTIME column but i can't use it

    – Sano
    Nov 19 '18 at 10:10














    KSQL has a "ROWTIME column" ... the actual message probably does not because there's no such thing as columns in a Kafka message... That being said, it's record.timestamp() of the Java API, which is the field gotten by timestamp.field=fieldtime... And all connectors support all the same transforms, it's not specific to a certain one. In any case, Confluent has a blog including using BigQuery confluent.io/blog/data-wrangling-apache-kafka-ksql

    – cricket_007
    Nov 19 '18 at 14:12





    KSQL has a "ROWTIME column" ... the actual message probably does not because there's no such thing as columns in a Kafka message... That being said, it's record.timestamp() of the Java API, which is the field gotten by timestamp.field=fieldtime... And all connectors support all the same transforms, it's not specific to a certain one. In any case, Confluent has a blog including using BigQuery confluent.io/blog/data-wrangling-apache-kafka-ksql

    – cricket_007
    Nov 19 '18 at 14:12













    0














    OLD ANSWER
    I think i reached to understand the problem behind



    First of all you can't use the transform InsertField in any Source Connector because the Timestamp value for the msg is assigned at writing time into the topic so it's not something the connector can already know,

    for JDBC connector there is this ticket
    https://github.com/confluentinc/kafka-connect-jdbc/issues/311



    and in sap source connector is not working as well.



    Second BigQuery connector has a bug that doesn't allow the usage of InsertField to add the timestamp to every table as mentioned here



    https://github.com/wepay/kafka-connect-bigquery/issues/125#issuecomment-439102994



    So if you want use bigquery as your output the only solution right now is to manually edit the schema of each table to add the column before loading the cink connector



    UPDATE 2018-12-03
    The final solution to always add the message timestamp in SINK connector. Let's assume you want add the timestamp to EVERY table of sink connector



    in your SOURCE CONNECTOR put this configuration



    "transforms":"InsertField"
    "transforms.InsertField.timestamp.field":"fieldtime",
    "transforms.InsertField.type":"org.apache.kafka.connect.transforms.InsertField$Value"


    This will add a column name called "fieldtime" to every source tables



    in your SINK CONNECTOR put those configuration



    "transforms":"InsertField,DropField",
    "transforms.DropField.type":"org.apache.kafka.connect.transforms.ReplaceField$Value",
    "transforms.DropField.blacklist":"fieldtime",
    "transforms.InsertSource.timestamp.field":"kafka_timestamp",
    "transforms.InsertField.timestamp.field":"fieldtime",
    "transforms.InsertField.type":"org.apache.kafka.connect.transforms.InsertField$Value"


    this will virtually remove the column fieldtime and add it again with the timestamp of the message



    This solution will automatically add the column with the right value without any addition operation






    share|improve this answer





























      0














      OLD ANSWER
      I think i reached to understand the problem behind



      First of all you can't use the transform InsertField in any Source Connector because the Timestamp value for the msg is assigned at writing time into the topic so it's not something the connector can already know,

      for JDBC connector there is this ticket
      https://github.com/confluentinc/kafka-connect-jdbc/issues/311



      and in sap source connector is not working as well.



      Second BigQuery connector has a bug that doesn't allow the usage of InsertField to add the timestamp to every table as mentioned here



      https://github.com/wepay/kafka-connect-bigquery/issues/125#issuecomment-439102994



      So if you want use bigquery as your output the only solution right now is to manually edit the schema of each table to add the column before loading the cink connector



      UPDATE 2018-12-03
      The final solution to always add the message timestamp in SINK connector. Let's assume you want add the timestamp to EVERY table of sink connector



      in your SOURCE CONNECTOR put this configuration



      "transforms":"InsertField"
      "transforms.InsertField.timestamp.field":"fieldtime",
      "transforms.InsertField.type":"org.apache.kafka.connect.transforms.InsertField$Value"


      This will add a column name called "fieldtime" to every source tables



      in your SINK CONNECTOR put those configuration



      "transforms":"InsertField,DropField",
      "transforms.DropField.type":"org.apache.kafka.connect.transforms.ReplaceField$Value",
      "transforms.DropField.blacklist":"fieldtime",
      "transforms.InsertSource.timestamp.field":"kafka_timestamp",
      "transforms.InsertField.timestamp.field":"fieldtime",
      "transforms.InsertField.type":"org.apache.kafka.connect.transforms.InsertField$Value"


      this will virtually remove the column fieldtime and add it again with the timestamp of the message



      This solution will automatically add the column with the right value without any addition operation






      share|improve this answer



























        0












        0








        0







        OLD ANSWER
        I think i reached to understand the problem behind



        First of all you can't use the transform InsertField in any Source Connector because the Timestamp value for the msg is assigned at writing time into the topic so it's not something the connector can already know,

        for JDBC connector there is this ticket
        https://github.com/confluentinc/kafka-connect-jdbc/issues/311



        and in sap source connector is not working as well.



        Second BigQuery connector has a bug that doesn't allow the usage of InsertField to add the timestamp to every table as mentioned here



        https://github.com/wepay/kafka-connect-bigquery/issues/125#issuecomment-439102994



        So if you want use bigquery as your output the only solution right now is to manually edit the schema of each table to add the column before loading the cink connector



        UPDATE 2018-12-03
        The final solution to always add the message timestamp in SINK connector. Let's assume you want add the timestamp to EVERY table of sink connector



        in your SOURCE CONNECTOR put this configuration



        "transforms":"InsertField"
        "transforms.InsertField.timestamp.field":"fieldtime",
        "transforms.InsertField.type":"org.apache.kafka.connect.transforms.InsertField$Value"


        This will add a column name called "fieldtime" to every source tables



        in your SINK CONNECTOR put those configuration



        "transforms":"InsertField,DropField",
        "transforms.DropField.type":"org.apache.kafka.connect.transforms.ReplaceField$Value",
        "transforms.DropField.blacklist":"fieldtime",
        "transforms.InsertSource.timestamp.field":"kafka_timestamp",
        "transforms.InsertField.timestamp.field":"fieldtime",
        "transforms.InsertField.type":"org.apache.kafka.connect.transforms.InsertField$Value"


        this will virtually remove the column fieldtime and add it again with the timestamp of the message



        This solution will automatically add the column with the right value without any addition operation






        share|improve this answer















        OLD ANSWER
        I think i reached to understand the problem behind



        First of all you can't use the transform InsertField in any Source Connector because the Timestamp value for the msg is assigned at writing time into the topic so it's not something the connector can already know,

        for JDBC connector there is this ticket
        https://github.com/confluentinc/kafka-connect-jdbc/issues/311



        and in sap source connector is not working as well.



        Second BigQuery connector has a bug that doesn't allow the usage of InsertField to add the timestamp to every table as mentioned here



        https://github.com/wepay/kafka-connect-bigquery/issues/125#issuecomment-439102994



        So if you want use bigquery as your output the only solution right now is to manually edit the schema of each table to add the column before loading the cink connector



        UPDATE 2018-12-03
        The final solution to always add the message timestamp in SINK connector. Let's assume you want add the timestamp to EVERY table of sink connector



        in your SOURCE CONNECTOR put this configuration



        "transforms":"InsertField"
        "transforms.InsertField.timestamp.field":"fieldtime",
        "transforms.InsertField.type":"org.apache.kafka.connect.transforms.InsertField$Value"


        This will add a column name called "fieldtime" to every source tables



        in your SINK CONNECTOR put those configuration



        "transforms":"InsertField,DropField",
        "transforms.DropField.type":"org.apache.kafka.connect.transforms.ReplaceField$Value",
        "transforms.DropField.blacklist":"fieldtime",
        "transforms.InsertSource.timestamp.field":"kafka_timestamp",
        "transforms.InsertField.timestamp.field":"fieldtime",
        "transforms.InsertField.type":"org.apache.kafka.connect.transforms.InsertField$Value"


        this will virtually remove the column fieldtime and add it again with the timestamp of the message



        This solution will automatically add the column with the right value without any addition operation







        share|improve this answer














        share|improve this answer



        share|improve this answer








        edited Dec 3 '18 at 16:59

























        answered Nov 20 '18 at 13:20









        SanoSano

        589




        589



























            draft saved

            draft discarded
















































            Thanks for contributing an answer to Stack Overflow!


            • Please be sure to answer the question. Provide details and share your research!

            But avoid


            • Asking for help, clarification, or responding to other answers.

            • Making statements based on opinion; back them up with references or personal experience.

            To learn more, see our tips on writing great answers.




            draft saved


            draft discarded














            StackExchange.ready(
            function ()
            StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53317310%2fhow-to-add-column-with-the-kafka-message-timestamp-in-kafka-sink-connector%23new-answer', 'question_page');

            );

            Post as a guest















            Required, but never shown





















































            Required, but never shown














            Required, but never shown












            Required, but never shown







            Required, but never shown

































            Required, but never shown














            Required, but never shown












            Required, but never shown







            Required, but never shown







            這個網誌中的熱門文章

            How to read a connectionString WITH PROVIDER in .NET Core?

            Node.js Script on GitHub Pages or Amazon S3

            Museum of Modern and Contemporary Art of Trento and Rovereto