Kafka Stream custom State Store









up vote
1
down vote

favorite
1












I have been readying the doc about state store but it is still not clear to me if it can fit my purpose. I would like to use some Distributed Graph Database as as a state store that other external application can consume from. Is that possible, what effort does that involve and can anyone point me to the class/code that will need to be extended for that functionality to happen.










share|improve this question

























    up vote
    1
    down vote

    favorite
    1












    I have been readying the doc about state store but it is still not clear to me if it can fit my purpose. I would like to use some Distributed Graph Database as as a state store that other external application can consume from. Is that possible, what effort does that involve and can anyone point me to the class/code that will need to be extended for that functionality to happen.










    share|improve this question























      up vote
      1
      down vote

      favorite
      1









      up vote
      1
      down vote

      favorite
      1






      1





      I have been readying the doc about state store but it is still not clear to me if it can fit my purpose. I would like to use some Distributed Graph Database as as a state store that other external application can consume from. Is that possible, what effort does that involve and can anyone point me to the class/code that will need to be extended for that functionality to happen.










      share|improve this question













      I have been readying the doc about state store but it is still not clear to me if it can fit my purpose. I would like to use some Distributed Graph Database as as a state store that other external application can consume from. Is that possible, what effort does that involve and can anyone point me to the class/code that will need to be extended for that functionality to happen.







      apache-kafka-streams






      share|improve this question













      share|improve this question











      share|improve this question




      share|improve this question










      asked Nov 11 at 12:06









      MaatDeamon

      1,81522060




      1,81522060






















          1 Answer
          1






          active

          oldest

          votes

















          up vote
          3
          down vote



          accepted










          You can implement custom state store using Processor API as described here :
          https://docs.confluent.io/current/streams/developer-guide/interactive-queries.html#querying-local-custom-state-stores



          • Your custom state store must implement StateStore.

          • You must have an interface to represent the operations available on the store.

          • You must provide an implementation of StoreBuilder for creating instances of your store.

          • It is recommended that you provide an interface that restricts access to read-only operations. This prevents users of this API from mutating the state of your running Kafka Streams application out-of-band.

          Implementation will look something like this :



          public class MyCustomStore<K,V> implements StateStore, MyWriteableCustomStore<K,V> 
          // implementation of the actual store


          // Read-write interface for MyCustomStore
          public interface MyWriteableCustomStore<K,V> extends MyReadableCustomStore<K,V>
          void write(K Key, V value);


          // Read-only interface for MyCustomStore
          public interface MyReadableCustomStore<K,V>
          V read(K key);


          public class MyCustomStoreBuilder implements StoreBuilder<MyCustomStore<K,V>>
          // implementation of the supplier for MyCustomStore



          In order to make it queryable;



          • Provide an implementation of QueryableStoreType.

          • Provide a wrapper class that has access to all of the underlying instances of the store and is used for querying.

          Example :



          public class MyCustomStoreType<K,V> implements QueryableStoreType<MyReadableCustomStore<K,V>> 

          // Only accept StateStores that are of type MyCustomStore
          public boolean accepts(final StateStore stateStore)
          return stateStore instanceOf MyCustomStore;


          public MyReadableCustomStore<K,V> create(final StateStoreProvider storeProvider, final String storeName)
          return new MyCustomStoreTypeWrapper(storeProvider, storeName, this);








          share|improve this answer




















            Your Answer






            StackExchange.ifUsing("editor", function ()
            StackExchange.using("externalEditor", function ()
            StackExchange.using("snippets", function ()
            StackExchange.snippets.init();
            );
            );
            , "code-snippets");

            StackExchange.ready(function()
            var channelOptions =
            tags: "".split(" "),
            id: "1"
            ;
            initTagRenderer("".split(" "), "".split(" "), channelOptions);

            StackExchange.using("externalEditor", function()
            // Have to fire editor after snippets, if snippets enabled
            if (StackExchange.settings.snippets.snippetsEnabled)
            StackExchange.using("snippets", function()
            createEditor();
            );

            else
            createEditor();

            );

            function createEditor()
            StackExchange.prepareEditor(
            heartbeatType: 'answer',
            convertImagesToLinks: true,
            noModals: true,
            showLowRepImageUploadWarning: true,
            reputationToPostImages: 10,
            bindNavPrevention: true,
            postfix: "",
            imageUploader:
            brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
            contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
            allowUrls: true
            ,
            onDemand: true,
            discardSelector: ".discard-answer"
            ,immediatelyShowMarkdownHelp:true
            );



            );













            draft saved

            draft discarded


















            StackExchange.ready(
            function ()
            StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53248566%2fkafka-stream-custom-state-store%23new-answer', 'question_page');

            );

            Post as a guest















            Required, but never shown

























            1 Answer
            1






            active

            oldest

            votes








            1 Answer
            1






            active

            oldest

            votes









            active

            oldest

            votes






            active

            oldest

            votes








            up vote
            3
            down vote



            accepted










            You can implement custom state store using Processor API as described here :
            https://docs.confluent.io/current/streams/developer-guide/interactive-queries.html#querying-local-custom-state-stores



            • Your custom state store must implement StateStore.

            • You must have an interface to represent the operations available on the store.

            • You must provide an implementation of StoreBuilder for creating instances of your store.

            • It is recommended that you provide an interface that restricts access to read-only operations. This prevents users of this API from mutating the state of your running Kafka Streams application out-of-band.

            Implementation will look something like this :



            public class MyCustomStore<K,V> implements StateStore, MyWriteableCustomStore<K,V> 
            // implementation of the actual store


            // Read-write interface for MyCustomStore
            public interface MyWriteableCustomStore<K,V> extends MyReadableCustomStore<K,V>
            void write(K Key, V value);


            // Read-only interface for MyCustomStore
            public interface MyReadableCustomStore<K,V>
            V read(K key);


            public class MyCustomStoreBuilder implements StoreBuilder<MyCustomStore<K,V>>
            // implementation of the supplier for MyCustomStore



            In order to make it queryable;



            • Provide an implementation of QueryableStoreType.

            • Provide a wrapper class that has access to all of the underlying instances of the store and is used for querying.

            Example :



            public class MyCustomStoreType<K,V> implements QueryableStoreType<MyReadableCustomStore<K,V>> 

            // Only accept StateStores that are of type MyCustomStore
            public boolean accepts(final StateStore stateStore)
            return stateStore instanceOf MyCustomStore;


            public MyReadableCustomStore<K,V> create(final StateStoreProvider storeProvider, final String storeName)
            return new MyCustomStoreTypeWrapper(storeProvider, storeName, this);








            share|improve this answer
























              up vote
              3
              down vote



              accepted










              You can implement custom state store using Processor API as described here :
              https://docs.confluent.io/current/streams/developer-guide/interactive-queries.html#querying-local-custom-state-stores



              • Your custom state store must implement StateStore.

              • You must have an interface to represent the operations available on the store.

              • You must provide an implementation of StoreBuilder for creating instances of your store.

              • It is recommended that you provide an interface that restricts access to read-only operations. This prevents users of this API from mutating the state of your running Kafka Streams application out-of-band.

              Implementation will look something like this :



              public class MyCustomStore<K,V> implements StateStore, MyWriteableCustomStore<K,V> 
              // implementation of the actual store


              // Read-write interface for MyCustomStore
              public interface MyWriteableCustomStore<K,V> extends MyReadableCustomStore<K,V>
              void write(K Key, V value);


              // Read-only interface for MyCustomStore
              public interface MyReadableCustomStore<K,V>
              V read(K key);


              public class MyCustomStoreBuilder implements StoreBuilder<MyCustomStore<K,V>>
              // implementation of the supplier for MyCustomStore



              In order to make it queryable;



              • Provide an implementation of QueryableStoreType.

              • Provide a wrapper class that has access to all of the underlying instances of the store and is used for querying.

              Example :



              public class MyCustomStoreType<K,V> implements QueryableStoreType<MyReadableCustomStore<K,V>> 

              // Only accept StateStores that are of type MyCustomStore
              public boolean accepts(final StateStore stateStore)
              return stateStore instanceOf MyCustomStore;


              public MyReadableCustomStore<K,V> create(final StateStoreProvider storeProvider, final String storeName)
              return new MyCustomStoreTypeWrapper(storeProvider, storeName, this);








              share|improve this answer






















                up vote
                3
                down vote



                accepted







                up vote
                3
                down vote



                accepted






                You can implement custom state store using Processor API as described here :
                https://docs.confluent.io/current/streams/developer-guide/interactive-queries.html#querying-local-custom-state-stores



                • Your custom state store must implement StateStore.

                • You must have an interface to represent the operations available on the store.

                • You must provide an implementation of StoreBuilder for creating instances of your store.

                • It is recommended that you provide an interface that restricts access to read-only operations. This prevents users of this API from mutating the state of your running Kafka Streams application out-of-band.

                Implementation will look something like this :



                public class MyCustomStore<K,V> implements StateStore, MyWriteableCustomStore<K,V> 
                // implementation of the actual store


                // Read-write interface for MyCustomStore
                public interface MyWriteableCustomStore<K,V> extends MyReadableCustomStore<K,V>
                void write(K Key, V value);


                // Read-only interface for MyCustomStore
                public interface MyReadableCustomStore<K,V>
                V read(K key);


                public class MyCustomStoreBuilder implements StoreBuilder<MyCustomStore<K,V>>
                // implementation of the supplier for MyCustomStore



                In order to make it queryable;



                • Provide an implementation of QueryableStoreType.

                • Provide a wrapper class that has access to all of the underlying instances of the store and is used for querying.

                Example :



                public class MyCustomStoreType<K,V> implements QueryableStoreType<MyReadableCustomStore<K,V>> 

                // Only accept StateStores that are of type MyCustomStore
                public boolean accepts(final StateStore stateStore)
                return stateStore instanceOf MyCustomStore;


                public MyReadableCustomStore<K,V> create(final StateStoreProvider storeProvider, final String storeName)
                return new MyCustomStoreTypeWrapper(storeProvider, storeName, this);








                share|improve this answer












                You can implement custom state store using Processor API as described here :
                https://docs.confluent.io/current/streams/developer-guide/interactive-queries.html#querying-local-custom-state-stores



                • Your custom state store must implement StateStore.

                • You must have an interface to represent the operations available on the store.

                • You must provide an implementation of StoreBuilder for creating instances of your store.

                • It is recommended that you provide an interface that restricts access to read-only operations. This prevents users of this API from mutating the state of your running Kafka Streams application out-of-band.

                Implementation will look something like this :



                public class MyCustomStore<K,V> implements StateStore, MyWriteableCustomStore<K,V> 
                // implementation of the actual store


                // Read-write interface for MyCustomStore
                public interface MyWriteableCustomStore<K,V> extends MyReadableCustomStore<K,V>
                void write(K Key, V value);


                // Read-only interface for MyCustomStore
                public interface MyReadableCustomStore<K,V>
                V read(K key);


                public class MyCustomStoreBuilder implements StoreBuilder<MyCustomStore<K,V>>
                // implementation of the supplier for MyCustomStore



                In order to make it queryable;



                • Provide an implementation of QueryableStoreType.

                • Provide a wrapper class that has access to all of the underlying instances of the store and is used for querying.

                Example :



                public class MyCustomStoreType<K,V> implements QueryableStoreType<MyReadableCustomStore<K,V>> 

                // Only accept StateStores that are of type MyCustomStore
                public boolean accepts(final StateStore stateStore)
                return stateStore instanceOf MyCustomStore;


                public MyReadableCustomStore<K,V> create(final StateStoreProvider storeProvider, final String storeName)
                return new MyCustomStoreTypeWrapper(storeProvider, storeName, this);









                share|improve this answer












                share|improve this answer



                share|improve this answer










                answered Nov 11 at 16:16









                Nishu Tayal

                11.2k73381




                11.2k73381



























                    draft saved

                    draft discarded
















































                    Thanks for contributing an answer to Stack Overflow!


                    • Please be sure to answer the question. Provide details and share your research!

                    But avoid


                    • Asking for help, clarification, or responding to other answers.

                    • Making statements based on opinion; back them up with references or personal experience.

                    To learn more, see our tips on writing great answers.





                    Some of your past answers have not been well-received, and you're in danger of being blocked from answering.


                    Please pay close attention to the following guidance:


                    • Please be sure to answer the question. Provide details and share your research!

                    But avoid


                    • Asking for help, clarification, or responding to other answers.

                    • Making statements based on opinion; back them up with references or personal experience.

                    To learn more, see our tips on writing great answers.




                    draft saved


                    draft discarded














                    StackExchange.ready(
                    function ()
                    StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53248566%2fkafka-stream-custom-state-store%23new-answer', 'question_page');

                    );

                    Post as a guest















                    Required, but never shown





















































                    Required, but never shown














                    Required, but never shown












                    Required, but never shown







                    Required, but never shown

































                    Required, but never shown














                    Required, but never shown












                    Required, but never shown







                    Required, but never shown







                    這個網誌中的熱門文章

                    How to read a connectionString WITH PROVIDER in .NET Core?

                    Node.js Script on GitHub Pages or Amazon S3

                    Museum of Modern and Contemporary Art of Trento and Rovereto