Pull to refresh

Comments 2

Thank you so much for detailed description of handling the data processing pipeline!


There are a couple of questions that I would like to clarify:


  1. What is the strategy for the error handling? Error handling for data processing is one of the complex topic that requires special attention. What are the primitives the framework provides?
  2. What are the scalability property of the framework? Can we control the number of workers/threads for each stage? Is it possible to control the memory footprint? And finally, is it possible to distribute the data processing across the nodes?

Thanks for interesting questions!


What is the strategy for the error handling?

As I've said it's a trivial implementation that lacks some important features. One of them is error handling. Another is branching. Under the term of branching, I mean a possibility to return different objects from a stage handling. For example, validation stage can have the prototype:


variant<valid_raw_value, invalid_raw_value> func(const raw_value &);

And there could be two different stages next to validation stage: one for handling valid_raw_value, another for invalid_raw_value.


Such feature opens a possibility to add error handling as a part of a pipeline, e.g.:


variant<valid_raw_value, invalid_raw_value, validation_error> func(const raw_value &);

Another variant is a specification of a destination for errors. Something like that:


stage(validation, one_error_stream)
   | stage(conversion, one_error_stream)
   | stage(archiving, another_error_stream)
   ... 

Can we control the number of workers/threads for each stage?

It is possible to do via dispatchers. You can create as many dispatchers as you wish and bind your stages to the corresponding dispatchers. I've shown the example at the end of the article. Only two dispatchers were user but it can be easily changed.


Is it possible to control the memory footprint?

In this variant — no. It is because messages are delivered to actors and actors have unbounded message queues. Because of that, this implementation has no overload protection too.


To limit the count of messages waiting for processing it is necessary to switch to another message-delivery scheme. For example, mchains (aka CSP-channels) with limited capacity can be used.


And finally, is it possible to distribute the data processing across the nodes?

SObjectizer doesn't support distributed applications so the support for the distributions of messages to different nodes has to be implemented manually. For example, as a special case of a message box.

Sign up to leave a comment.

Articles