Kafka Stream custom State Store
up vote
1
down vote
favorite
I have been readying the doc about state store but it is still not clear to me if it can fit my purpose. I would like to use some Distributed Graph Database as as a state store that other external application can consume from. Is that possible, what effort does that involve and can anyone point me to the class/code that will need to be extended for that functionality to happen.
apache-kafka-streams
add a comment |
up vote
1
down vote
favorite
I have been readying the doc about state store but it is still not clear to me if it can fit my purpose. I would like to use some Distributed Graph Database as as a state store that other external application can consume from. Is that possible, what effort does that involve and can anyone point me to the class/code that will need to be extended for that functionality to happen.
apache-kafka-streams
add a comment |
up vote
1
down vote
favorite
up vote
1
down vote
favorite
I have been readying the doc about state store but it is still not clear to me if it can fit my purpose. I would like to use some Distributed Graph Database as as a state store that other external application can consume from. Is that possible, what effort does that involve and can anyone point me to the class/code that will need to be extended for that functionality to happen.
apache-kafka-streams
I have been readying the doc about state store but it is still not clear to me if it can fit my purpose. I would like to use some Distributed Graph Database as as a state store that other external application can consume from. Is that possible, what effort does that involve and can anyone point me to the class/code that will need to be extended for that functionality to happen.
apache-kafka-streams
apache-kafka-streams
asked Nov 11 at 12:06
MaatDeamon
1,81522060
1,81522060
add a comment |
add a comment |
1 Answer
1
active
oldest
votes
up vote
3
down vote
accepted
You can implement custom state store using Processor API as described here :
https://docs.confluent.io/current/streams/developer-guide/interactive-queries.html#querying-local-custom-state-stores
- Your custom state store must implement StateStore.
- You must have an interface to represent the operations available on the store.
- You must provide an implementation of StoreBuilder for creating instances of your store.
- It is recommended that you provide an interface that restricts access to read-only operations. This prevents users of this API from mutating the state of your running Kafka Streams application out-of-band.
Implementation will look something like this :
public class MyCustomStore<K,V> implements StateStore, MyWriteableCustomStore<K,V>
// implementation of the actual store
// Read-write interface for MyCustomStore
public interface MyWriteableCustomStore<K,V> extends MyReadableCustomStore<K,V>
void write(K Key, V value);
// Read-only interface for MyCustomStore
public interface MyReadableCustomStore<K,V>
V read(K key);
public class MyCustomStoreBuilder implements StoreBuilder<MyCustomStore<K,V>>
// implementation of the supplier for MyCustomStore
In order to make it queryable;
- Provide an implementation of QueryableStoreType.
- Provide a wrapper class that has access to all of the underlying instances of the store and is used for querying.
Example :
public class MyCustomStoreType<K,V> implements QueryableStoreType<MyReadableCustomStore<K,V>>
// Only accept StateStores that are of type MyCustomStore
public boolean accepts(final StateStore stateStore)
return stateStore instanceOf MyCustomStore;
public MyReadableCustomStore<K,V> create(final StateStoreProvider storeProvider, final String storeName)
return new MyCustomStoreTypeWrapper(storeProvider, storeName, this);
add a comment |
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
up vote
3
down vote
accepted
You can implement custom state store using Processor API as described here :
https://docs.confluent.io/current/streams/developer-guide/interactive-queries.html#querying-local-custom-state-stores
- Your custom state store must implement StateStore.
- You must have an interface to represent the operations available on the store.
- You must provide an implementation of StoreBuilder for creating instances of your store.
- It is recommended that you provide an interface that restricts access to read-only operations. This prevents users of this API from mutating the state of your running Kafka Streams application out-of-band.
Implementation will look something like this :
public class MyCustomStore<K,V> implements StateStore, MyWriteableCustomStore<K,V>
// implementation of the actual store
// Read-write interface for MyCustomStore
public interface MyWriteableCustomStore<K,V> extends MyReadableCustomStore<K,V>
void write(K Key, V value);
// Read-only interface for MyCustomStore
public interface MyReadableCustomStore<K,V>
V read(K key);
public class MyCustomStoreBuilder implements StoreBuilder<MyCustomStore<K,V>>
// implementation of the supplier for MyCustomStore
In order to make it queryable;
- Provide an implementation of QueryableStoreType.
- Provide a wrapper class that has access to all of the underlying instances of the store and is used for querying.
Example :
public class MyCustomStoreType<K,V> implements QueryableStoreType<MyReadableCustomStore<K,V>>
// Only accept StateStores that are of type MyCustomStore
public boolean accepts(final StateStore stateStore)
return stateStore instanceOf MyCustomStore;
public MyReadableCustomStore<K,V> create(final StateStoreProvider storeProvider, final String storeName)
return new MyCustomStoreTypeWrapper(storeProvider, storeName, this);
add a comment |
up vote
3
down vote
accepted
You can implement custom state store using Processor API as described here :
https://docs.confluent.io/current/streams/developer-guide/interactive-queries.html#querying-local-custom-state-stores
- Your custom state store must implement StateStore.
- You must have an interface to represent the operations available on the store.
- You must provide an implementation of StoreBuilder for creating instances of your store.
- It is recommended that you provide an interface that restricts access to read-only operations. This prevents users of this API from mutating the state of your running Kafka Streams application out-of-band.
Implementation will look something like this :
public class MyCustomStore<K,V> implements StateStore, MyWriteableCustomStore<K,V>
// implementation of the actual store
// Read-write interface for MyCustomStore
public interface MyWriteableCustomStore<K,V> extends MyReadableCustomStore<K,V>
void write(K Key, V value);
// Read-only interface for MyCustomStore
public interface MyReadableCustomStore<K,V>
V read(K key);
public class MyCustomStoreBuilder implements StoreBuilder<MyCustomStore<K,V>>
// implementation of the supplier for MyCustomStore
In order to make it queryable;
- Provide an implementation of QueryableStoreType.
- Provide a wrapper class that has access to all of the underlying instances of the store and is used for querying.
Example :
public class MyCustomStoreType<K,V> implements QueryableStoreType<MyReadableCustomStore<K,V>>
// Only accept StateStores that are of type MyCustomStore
public boolean accepts(final StateStore stateStore)
return stateStore instanceOf MyCustomStore;
public MyReadableCustomStore<K,V> create(final StateStoreProvider storeProvider, final String storeName)
return new MyCustomStoreTypeWrapper(storeProvider, storeName, this);
add a comment |
up vote
3
down vote
accepted
up vote
3
down vote
accepted
You can implement custom state store using Processor API as described here :
https://docs.confluent.io/current/streams/developer-guide/interactive-queries.html#querying-local-custom-state-stores
- Your custom state store must implement StateStore.
- You must have an interface to represent the operations available on the store.
- You must provide an implementation of StoreBuilder for creating instances of your store.
- It is recommended that you provide an interface that restricts access to read-only operations. This prevents users of this API from mutating the state of your running Kafka Streams application out-of-band.
Implementation will look something like this :
public class MyCustomStore<K,V> implements StateStore, MyWriteableCustomStore<K,V>
// implementation of the actual store
// Read-write interface for MyCustomStore
public interface MyWriteableCustomStore<K,V> extends MyReadableCustomStore<K,V>
void write(K Key, V value);
// Read-only interface for MyCustomStore
public interface MyReadableCustomStore<K,V>
V read(K key);
public class MyCustomStoreBuilder implements StoreBuilder<MyCustomStore<K,V>>
// implementation of the supplier for MyCustomStore
In order to make it queryable;
- Provide an implementation of QueryableStoreType.
- Provide a wrapper class that has access to all of the underlying instances of the store and is used for querying.
Example :
public class MyCustomStoreType<K,V> implements QueryableStoreType<MyReadableCustomStore<K,V>>
// Only accept StateStores that are of type MyCustomStore
public boolean accepts(final StateStore stateStore)
return stateStore instanceOf MyCustomStore;
public MyReadableCustomStore<K,V> create(final StateStoreProvider storeProvider, final String storeName)
return new MyCustomStoreTypeWrapper(storeProvider, storeName, this);
You can implement custom state store using Processor API as described here :
https://docs.confluent.io/current/streams/developer-guide/interactive-queries.html#querying-local-custom-state-stores
- Your custom state store must implement StateStore.
- You must have an interface to represent the operations available on the store.
- You must provide an implementation of StoreBuilder for creating instances of your store.
- It is recommended that you provide an interface that restricts access to read-only operations. This prevents users of this API from mutating the state of your running Kafka Streams application out-of-band.
Implementation will look something like this :
public class MyCustomStore<K,V> implements StateStore, MyWriteableCustomStore<K,V>
// implementation of the actual store
// Read-write interface for MyCustomStore
public interface MyWriteableCustomStore<K,V> extends MyReadableCustomStore<K,V>
void write(K Key, V value);
// Read-only interface for MyCustomStore
public interface MyReadableCustomStore<K,V>
V read(K key);
public class MyCustomStoreBuilder implements StoreBuilder<MyCustomStore<K,V>>
// implementation of the supplier for MyCustomStore
In order to make it queryable;
- Provide an implementation of QueryableStoreType.
- Provide a wrapper class that has access to all of the underlying instances of the store and is used for querying.
Example :
public class MyCustomStoreType<K,V> implements QueryableStoreType<MyReadableCustomStore<K,V>>
// Only accept StateStores that are of type MyCustomStore
public boolean accepts(final StateStore stateStore)
return stateStore instanceOf MyCustomStore;
public MyReadableCustomStore<K,V> create(final StateStoreProvider storeProvider, final String storeName)
return new MyCustomStoreTypeWrapper(storeProvider, storeName, this);
answered Nov 11 at 16:16
Nishu Tayal
11.2k73381
11.2k73381
add a comment |
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Some of your past answers have not been well-received, and you're in danger of being blocked from answering.
Please pay close attention to the following guidance:
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53248566%2fkafka-stream-custom-state-store%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown