BlockingCollection multiple consumers per group FIFO
I have a single producer that insert messages into a BlockingCollection.
The messages in the collection each have a groupId. The count of different groupIds is dynamic. The messages per group must be processed in FIFO order.
The work that the consumers do is a small calculation and a database insert, sometimes an additional http request.
I tried to create a new consumer thread for each groupId - but there are problems with too many threads.
The messages are processed a lot faster when I create 50 consumer threads like this:
for (int i = 0; i < 50; i++)
Task.Factory.StartNew(() =>
foreach (var item in _queue.GetConsumingEnumerable())
var group = item.groupId;
// do work
But with this code the messages per groupId are not processed in serial order.
Is there an easy way to make the consumer "sticky" per groupId?
e.g.: when a message with groupId 7 was processed by consumer 3 all further messages with groupId 7 must be processed by consumer 3.
c# queue fifo
|
show 1 more comment
I have a single producer that insert messages into a BlockingCollection.
The messages in the collection each have a groupId. The count of different groupIds is dynamic. The messages per group must be processed in FIFO order.
The work that the consumers do is a small calculation and a database insert, sometimes an additional http request.
I tried to create a new consumer thread for each groupId - but there are problems with too many threads.
The messages are processed a lot faster when I create 50 consumer threads like this:
for (int i = 0; i < 50; i++)
Task.Factory.StartNew(() =>
foreach (var item in _queue.GetConsumingEnumerable())
var group = item.groupId;
// do work
But with this code the messages per groupId are not processed in serial order.
Is there an easy way to make the consumer "sticky" per groupId?
e.g.: when a message with groupId 7 was processed by consumer 3 all further messages with groupId 7 must be processed by consumer 3.
c# queue fifo
1
Perhaps aConcurrentDictionary<int, Lazy<Tuple<BlockingCollection, WhatEverYourConsumerTypeIs>>>
? This would allow you to have a fixed consumer per groupid (key of theConcurrentDictionary
). Then useTakeFromAny
so that each consumer reads messages from all of itsBlockingCollection
s.
– mjwills
Nov 12 at 10:52
Why not use one BlockingCollection per groupid?
– Klaus Gütter
Nov 12 at 10:52
I already tried withConcurrentDictionary<int, Tuple<BlockingCollection<dynamic>, SingleGroupProcessor>>
and create a consumer Task for each Dictionary Entry. The problem is when there are over 2000 different groupIds I have as many threads - or am I thinking wrong here?
– mibiio
Nov 12 at 10:55
I think you should look into TPL DataFlow, this is a bit messy
– TheGeneral
Nov 12 at 10:57
There is no need to create a new processor per groupid. You can have a single processor processing multiple groupids.
– mjwills
Nov 12 at 10:59
|
show 1 more comment
I have a single producer that insert messages into a BlockingCollection.
The messages in the collection each have a groupId. The count of different groupIds is dynamic. The messages per group must be processed in FIFO order.
The work that the consumers do is a small calculation and a database insert, sometimes an additional http request.
I tried to create a new consumer thread for each groupId - but there are problems with too many threads.
The messages are processed a lot faster when I create 50 consumer threads like this:
for (int i = 0; i < 50; i++)
Task.Factory.StartNew(() =>
foreach (var item in _queue.GetConsumingEnumerable())
var group = item.groupId;
// do work
But with this code the messages per groupId are not processed in serial order.
Is there an easy way to make the consumer "sticky" per groupId?
e.g.: when a message with groupId 7 was processed by consumer 3 all further messages with groupId 7 must be processed by consumer 3.
c# queue fifo
I have a single producer that insert messages into a BlockingCollection.
The messages in the collection each have a groupId. The count of different groupIds is dynamic. The messages per group must be processed in FIFO order.
The work that the consumers do is a small calculation and a database insert, sometimes an additional http request.
I tried to create a new consumer thread for each groupId - but there are problems with too many threads.
The messages are processed a lot faster when I create 50 consumer threads like this:
for (int i = 0; i < 50; i++)
Task.Factory.StartNew(() =>
foreach (var item in _queue.GetConsumingEnumerable())
var group = item.groupId;
// do work
But with this code the messages per groupId are not processed in serial order.
Is there an easy way to make the consumer "sticky" per groupId?
e.g.: when a message with groupId 7 was processed by consumer 3 all further messages with groupId 7 must be processed by consumer 3.
c# queue fifo
c# queue fifo
asked Nov 12 at 10:39
mibiio
223
223
1
Perhaps aConcurrentDictionary<int, Lazy<Tuple<BlockingCollection, WhatEverYourConsumerTypeIs>>>
? This would allow you to have a fixed consumer per groupid (key of theConcurrentDictionary
). Then useTakeFromAny
so that each consumer reads messages from all of itsBlockingCollection
s.
– mjwills
Nov 12 at 10:52
Why not use one BlockingCollection per groupid?
– Klaus Gütter
Nov 12 at 10:52
I already tried withConcurrentDictionary<int, Tuple<BlockingCollection<dynamic>, SingleGroupProcessor>>
and create a consumer Task for each Dictionary Entry. The problem is when there are over 2000 different groupIds I have as many threads - or am I thinking wrong here?
– mibiio
Nov 12 at 10:55
I think you should look into TPL DataFlow, this is a bit messy
– TheGeneral
Nov 12 at 10:57
There is no need to create a new processor per groupid. You can have a single processor processing multiple groupids.
– mjwills
Nov 12 at 10:59
|
show 1 more comment
1
Perhaps aConcurrentDictionary<int, Lazy<Tuple<BlockingCollection, WhatEverYourConsumerTypeIs>>>
? This would allow you to have a fixed consumer per groupid (key of theConcurrentDictionary
). Then useTakeFromAny
so that each consumer reads messages from all of itsBlockingCollection
s.
– mjwills
Nov 12 at 10:52
Why not use one BlockingCollection per groupid?
– Klaus Gütter
Nov 12 at 10:52
I already tried withConcurrentDictionary<int, Tuple<BlockingCollection<dynamic>, SingleGroupProcessor>>
and create a consumer Task for each Dictionary Entry. The problem is when there are over 2000 different groupIds I have as many threads - or am I thinking wrong here?
– mibiio
Nov 12 at 10:55
I think you should look into TPL DataFlow, this is a bit messy
– TheGeneral
Nov 12 at 10:57
There is no need to create a new processor per groupid. You can have a single processor processing multiple groupids.
– mjwills
Nov 12 at 10:59
1
1
Perhaps a
ConcurrentDictionary<int, Lazy<Tuple<BlockingCollection, WhatEverYourConsumerTypeIs>>>
? This would allow you to have a fixed consumer per groupid (key of the ConcurrentDictionary
). Then use TakeFromAny
so that each consumer reads messages from all of its BlockingCollection
s.– mjwills
Nov 12 at 10:52
Perhaps a
ConcurrentDictionary<int, Lazy<Tuple<BlockingCollection, WhatEverYourConsumerTypeIs>>>
? This would allow you to have a fixed consumer per groupid (key of the ConcurrentDictionary
). Then use TakeFromAny
so that each consumer reads messages from all of its BlockingCollection
s.– mjwills
Nov 12 at 10:52
Why not use one BlockingCollection per groupid?
– Klaus Gütter
Nov 12 at 10:52
Why not use one BlockingCollection per groupid?
– Klaus Gütter
Nov 12 at 10:52
I already tried with
ConcurrentDictionary<int, Tuple<BlockingCollection<dynamic>, SingleGroupProcessor>>
and create a consumer Task for each Dictionary Entry. The problem is when there are over 2000 different groupIds I have as many threads - or am I thinking wrong here?– mibiio
Nov 12 at 10:55
I already tried with
ConcurrentDictionary<int, Tuple<BlockingCollection<dynamic>, SingleGroupProcessor>>
and create a consumer Task for each Dictionary Entry. The problem is when there are over 2000 different groupIds I have as many threads - or am I thinking wrong here?– mibiio
Nov 12 at 10:55
I think you should look into TPL DataFlow, this is a bit messy
– TheGeneral
Nov 12 at 10:57
I think you should look into TPL DataFlow, this is a bit messy
– TheGeneral
Nov 12 at 10:57
There is no need to create a new processor per groupid. You can have a single processor processing multiple groupids.
– mjwills
Nov 12 at 10:59
There is no need to create a new processor per groupid. You can have a single processor processing multiple groupids.
– mjwills
Nov 12 at 10:59
|
show 1 more comment
active
oldest
votes
Your Answer
StackExchange.ifUsing("editor", function ()
StackExchange.using("externalEditor", function ()
StackExchange.using("snippets", function ()
StackExchange.snippets.init();
);
);
, "code-snippets");
StackExchange.ready(function()
var channelOptions =
tags: "".split(" "),
id: "1"
;
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function()
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled)
StackExchange.using("snippets", function()
createEditor();
);
else
createEditor();
);
function createEditor()
StackExchange.prepareEditor(
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader:
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
,
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
);
);
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53260405%2fblockingcollection-multiple-consumers-per-group-fifo%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
active
oldest
votes
active
oldest
votes
active
oldest
votes
active
oldest
votes
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Some of your past answers have not been well-received, and you're in danger of being blocked from answering.
Please pay close attention to the following guidance:
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53260405%2fblockingcollection-multiple-consumers-per-group-fifo%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
1
Perhaps a
ConcurrentDictionary<int, Lazy<Tuple<BlockingCollection, WhatEverYourConsumerTypeIs>>>
? This would allow you to have a fixed consumer per groupid (key of theConcurrentDictionary
). Then useTakeFromAny
so that each consumer reads messages from all of itsBlockingCollection
s.– mjwills
Nov 12 at 10:52
Why not use one BlockingCollection per groupid?
– Klaus Gütter
Nov 12 at 10:52
I already tried with
ConcurrentDictionary<int, Tuple<BlockingCollection<dynamic>, SingleGroupProcessor>>
and create a consumer Task for each Dictionary Entry. The problem is when there are over 2000 different groupIds I have as many threads - or am I thinking wrong here?– mibiio
Nov 12 at 10:55
I think you should look into TPL DataFlow, this is a bit messy
– TheGeneral
Nov 12 at 10:57
There is no need to create a new processor per groupid. You can have a single processor processing multiple groupids.
– mjwills
Nov 12 at 10:59