BlockingCollection multiple consumers per group FIFO










1














I have a single producer that insert messages into a BlockingCollection.



The messages in the collection each have a groupId. The count of different groupIds is dynamic. The messages per group must be processed in FIFO order.
The work that the consumers do is a small calculation and a database insert, sometimes an additional http request.



I tried to create a new consumer thread for each groupId - but there are problems with too many threads.



The messages are processed a lot faster when I create 50 consumer threads like this:



for (int i = 0; i < 50; i++)

Task.Factory.StartNew(() =>

foreach (var item in _queue.GetConsumingEnumerable())

var group = item.groupId;
// do work





But with this code the messages per groupId are not processed in serial order.



Is there an easy way to make the consumer "sticky" per groupId?



e.g.: when a message with groupId 7 was processed by consumer 3 all further messages with groupId 7 must be processed by consumer 3.










share|improve this question

















  • 1




    Perhaps a ConcurrentDictionary<int, Lazy<Tuple<BlockingCollection, WhatEverYourConsumerTypeIs>>>? This would allow you to have a fixed consumer per groupid (key of the ConcurrentDictionary). Then use TakeFromAny so that each consumer reads messages from all of its BlockingCollections.
    – mjwills
    Nov 12 at 10:52











  • Why not use one BlockingCollection per groupid?
    – Klaus Gütter
    Nov 12 at 10:52










  • I already tried with ConcurrentDictionary<int, Tuple<BlockingCollection<dynamic>, SingleGroupProcessor>> and create a consumer Task for each Dictionary Entry. The problem is when there are over 2000 different groupIds I have as many threads - or am I thinking wrong here?
    – mibiio
    Nov 12 at 10:55










  • I think you should look into TPL DataFlow, this is a bit messy
    – TheGeneral
    Nov 12 at 10:57











  • There is no need to create a new processor per groupid. You can have a single processor processing multiple groupids.
    – mjwills
    Nov 12 at 10:59















1














I have a single producer that insert messages into a BlockingCollection.



The messages in the collection each have a groupId. The count of different groupIds is dynamic. The messages per group must be processed in FIFO order.
The work that the consumers do is a small calculation and a database insert, sometimes an additional http request.



I tried to create a new consumer thread for each groupId - but there are problems with too many threads.



The messages are processed a lot faster when I create 50 consumer threads like this:



for (int i = 0; i < 50; i++)

Task.Factory.StartNew(() =>

foreach (var item in _queue.GetConsumingEnumerable())

var group = item.groupId;
// do work





But with this code the messages per groupId are not processed in serial order.



Is there an easy way to make the consumer "sticky" per groupId?



e.g.: when a message with groupId 7 was processed by consumer 3 all further messages with groupId 7 must be processed by consumer 3.










share|improve this question

















  • 1




    Perhaps a ConcurrentDictionary<int, Lazy<Tuple<BlockingCollection, WhatEverYourConsumerTypeIs>>>? This would allow you to have a fixed consumer per groupid (key of the ConcurrentDictionary). Then use TakeFromAny so that each consumer reads messages from all of its BlockingCollections.
    – mjwills
    Nov 12 at 10:52











  • Why not use one BlockingCollection per groupid?
    – Klaus Gütter
    Nov 12 at 10:52










  • I already tried with ConcurrentDictionary<int, Tuple<BlockingCollection<dynamic>, SingleGroupProcessor>> and create a consumer Task for each Dictionary Entry. The problem is when there are over 2000 different groupIds I have as many threads - or am I thinking wrong here?
    – mibiio
    Nov 12 at 10:55










  • I think you should look into TPL DataFlow, this is a bit messy
    – TheGeneral
    Nov 12 at 10:57











  • There is no need to create a new processor per groupid. You can have a single processor processing multiple groupids.
    – mjwills
    Nov 12 at 10:59













1












1








1







I have a single producer that insert messages into a BlockingCollection.



The messages in the collection each have a groupId. The count of different groupIds is dynamic. The messages per group must be processed in FIFO order.
The work that the consumers do is a small calculation and a database insert, sometimes an additional http request.



I tried to create a new consumer thread for each groupId - but there are problems with too many threads.



The messages are processed a lot faster when I create 50 consumer threads like this:



for (int i = 0; i < 50; i++)

Task.Factory.StartNew(() =>

foreach (var item in _queue.GetConsumingEnumerable())

var group = item.groupId;
// do work





But with this code the messages per groupId are not processed in serial order.



Is there an easy way to make the consumer "sticky" per groupId?



e.g.: when a message with groupId 7 was processed by consumer 3 all further messages with groupId 7 must be processed by consumer 3.










share|improve this question













I have a single producer that insert messages into a BlockingCollection.



The messages in the collection each have a groupId. The count of different groupIds is dynamic. The messages per group must be processed in FIFO order.
The work that the consumers do is a small calculation and a database insert, sometimes an additional http request.



I tried to create a new consumer thread for each groupId - but there are problems with too many threads.



The messages are processed a lot faster when I create 50 consumer threads like this:



for (int i = 0; i < 50; i++)

Task.Factory.StartNew(() =>

foreach (var item in _queue.GetConsumingEnumerable())

var group = item.groupId;
// do work





But with this code the messages per groupId are not processed in serial order.



Is there an easy way to make the consumer "sticky" per groupId?



e.g.: when a message with groupId 7 was processed by consumer 3 all further messages with groupId 7 must be processed by consumer 3.







c# queue fifo






share|improve this question













share|improve this question











share|improve this question




share|improve this question










asked Nov 12 at 10:39









mibiio

223




223







  • 1




    Perhaps a ConcurrentDictionary<int, Lazy<Tuple<BlockingCollection, WhatEverYourConsumerTypeIs>>>? This would allow you to have a fixed consumer per groupid (key of the ConcurrentDictionary). Then use TakeFromAny so that each consumer reads messages from all of its BlockingCollections.
    – mjwills
    Nov 12 at 10:52











  • Why not use one BlockingCollection per groupid?
    – Klaus Gütter
    Nov 12 at 10:52










  • I already tried with ConcurrentDictionary<int, Tuple<BlockingCollection<dynamic>, SingleGroupProcessor>> and create a consumer Task for each Dictionary Entry. The problem is when there are over 2000 different groupIds I have as many threads - or am I thinking wrong here?
    – mibiio
    Nov 12 at 10:55










  • I think you should look into TPL DataFlow, this is a bit messy
    – TheGeneral
    Nov 12 at 10:57











  • There is no need to create a new processor per groupid. You can have a single processor processing multiple groupids.
    – mjwills
    Nov 12 at 10:59












  • 1




    Perhaps a ConcurrentDictionary<int, Lazy<Tuple<BlockingCollection, WhatEverYourConsumerTypeIs>>>? This would allow you to have a fixed consumer per groupid (key of the ConcurrentDictionary). Then use TakeFromAny so that each consumer reads messages from all of its BlockingCollections.
    – mjwills
    Nov 12 at 10:52











  • Why not use one BlockingCollection per groupid?
    – Klaus Gütter
    Nov 12 at 10:52










  • I already tried with ConcurrentDictionary<int, Tuple<BlockingCollection<dynamic>, SingleGroupProcessor>> and create a consumer Task for each Dictionary Entry. The problem is when there are over 2000 different groupIds I have as many threads - or am I thinking wrong here?
    – mibiio
    Nov 12 at 10:55










  • I think you should look into TPL DataFlow, this is a bit messy
    – TheGeneral
    Nov 12 at 10:57











  • There is no need to create a new processor per groupid. You can have a single processor processing multiple groupids.
    – mjwills
    Nov 12 at 10:59







1




1




Perhaps a ConcurrentDictionary<int, Lazy<Tuple<BlockingCollection, WhatEverYourConsumerTypeIs>>>? This would allow you to have a fixed consumer per groupid (key of the ConcurrentDictionary). Then use TakeFromAny so that each consumer reads messages from all of its BlockingCollections.
– mjwills
Nov 12 at 10:52





Perhaps a ConcurrentDictionary<int, Lazy<Tuple<BlockingCollection, WhatEverYourConsumerTypeIs>>>? This would allow you to have a fixed consumer per groupid (key of the ConcurrentDictionary). Then use TakeFromAny so that each consumer reads messages from all of its BlockingCollections.
– mjwills
Nov 12 at 10:52













Why not use one BlockingCollection per groupid?
– Klaus Gütter
Nov 12 at 10:52




Why not use one BlockingCollection per groupid?
– Klaus Gütter
Nov 12 at 10:52












I already tried with ConcurrentDictionary<int, Tuple<BlockingCollection<dynamic>, SingleGroupProcessor>> and create a consumer Task for each Dictionary Entry. The problem is when there are over 2000 different groupIds I have as many threads - or am I thinking wrong here?
– mibiio
Nov 12 at 10:55




I already tried with ConcurrentDictionary<int, Tuple<BlockingCollection<dynamic>, SingleGroupProcessor>> and create a consumer Task for each Dictionary Entry. The problem is when there are over 2000 different groupIds I have as many threads - or am I thinking wrong here?
– mibiio
Nov 12 at 10:55












I think you should look into TPL DataFlow, this is a bit messy
– TheGeneral
Nov 12 at 10:57





I think you should look into TPL DataFlow, this is a bit messy
– TheGeneral
Nov 12 at 10:57













There is no need to create a new processor per groupid. You can have a single processor processing multiple groupids.
– mjwills
Nov 12 at 10:59




There is no need to create a new processor per groupid. You can have a single processor processing multiple groupids.
– mjwills
Nov 12 at 10:59

















active

oldest

votes











Your Answer






StackExchange.ifUsing("editor", function ()
StackExchange.using("externalEditor", function ()
StackExchange.using("snippets", function ()
StackExchange.snippets.init();
);
);
, "code-snippets");

StackExchange.ready(function()
var channelOptions =
tags: "".split(" "),
id: "1"
;
initTagRenderer("".split(" "), "".split(" "), channelOptions);

StackExchange.using("externalEditor", function()
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled)
StackExchange.using("snippets", function()
createEditor();
);

else
createEditor();

);

function createEditor()
StackExchange.prepareEditor(
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader:
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
,
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
);



);













draft saved

draft discarded


















StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53260405%2fblockingcollection-multiple-consumers-per-group-fifo%23new-answer', 'question_page');

);

Post as a guest















Required, but never shown






























active

oldest

votes













active

oldest

votes









active

oldest

votes






active

oldest

votes















draft saved

draft discarded
















































Thanks for contributing an answer to Stack Overflow!


  • Please be sure to answer the question. Provide details and share your research!

But avoid


  • Asking for help, clarification, or responding to other answers.

  • Making statements based on opinion; back them up with references or personal experience.

To learn more, see our tips on writing great answers.





Some of your past answers have not been well-received, and you're in danger of being blocked from answering.


Please pay close attention to the following guidance:


  • Please be sure to answer the question. Provide details and share your research!

But avoid


  • Asking for help, clarification, or responding to other answers.

  • Making statements based on opinion; back them up with references or personal experience.

To learn more, see our tips on writing great answers.




draft saved


draft discarded














StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53260405%2fblockingcollection-multiple-consumers-per-group-fifo%23new-answer', 'question_page');

);

Post as a guest















Required, but never shown





















































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown

































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown







這個網誌中的熱門文章

How to read a connectionString WITH PROVIDER in .NET Core?

In R, how to develop a multiplot heatmap.2 figure showing key labels successfully

Museum of Modern and Contemporary Art of Trento and Rovereto