Комментарии 42
А можно увидеть реальный сценарий, когда это нужно?
Что же касается межпроцессорных сигналов то можно применить такой сценарий -> запускаем процесс который что то делает -> ждем от него сигнал -> обрабатываем.
На ум приходит централизованная обработка каких то событий в приложении простой отправкой сигнала.
А чем обычный event broker не подходит?
Что же касается межпроцессорных сигналов то можно применить такой сценарий -> запускаем процесс который что то делает -> ждем от него сигнал -> обрабатываем.
Ну так акторы же, с location transparency. Или очереди. Или вообще обычные сервисы.
Блокировать потоки — плохо, потоки надо отпускать, а потом поднимать по приходу "сигнала".
… опять-таки ж, при межпроцессном взаимодействии немедленно возникают всякие вопросы про то "а какой там код с той стороны", что тоже намекает нам, что лучше использовать типовые сервисные решения.
… и как это сделать с помощью блокирующего Receive
?
(особенно учитывая, что HttpClient — он весь на тасках)
Я хочу заметить, что ваша задача в общем случае решения не имеет, потому что если у нас есть больше одного параллельно работающего http-клиента, то нет никакой гарантии, что в момент получения одним из них ошибки авторизации сколько-то других не находятся в in-flight — останавливать к ним доступ уже поздно, а ошибку авторизации они все равно вернут. Поэтому все равно придется писать код, который обрабатывает ошибки авторизации, а тогда можно уже и не делать блокировку доступа.
Какого поведения ожидаете в случае, если источник эмитит собщение до того, как поток-потребитель обработал предыдущее? И еще вижу в коде привязку к экземпляра AutoResetEvent к ThreadId, это опасно, она как-то используется?
Dictionary<int,AutoResetEvent> events = new Dictionary<int, AutoResetEvent>();
Какого поведения ожидаете в случае, если источник эмитит собщение до того, как поток-потребитель обработал предыдущее?
Это проблемы потока потребителя. Можно в дальнейшем реализовать очередь если необходимо, но если между методами Receive() прошло какое то сообщение то поток соответственно его пропустит.
Использования Monitor вполне интересно.
Если Вы ждете выполнения какого то события и обработку не желательно осуществлять в том потоке которое спровоцировало событие то сигналы это вариант.
Нет, "вариант" — это не сигналы, а асинхронные события в любой их ипостаси.
2) Вообще для всех вариантов — Observable и ReactiveExtension. www.introtorx.com/content/v1.0.10621.0/00_Foreword.html
Основная бизнес логика может выполнять в отдельном потоке. Уведомления от текущем состоянии, о завершении — могут выполняться в UI-потоке.
Пример:
GetChildren()
.ToObservable()
.Select(ReCreateElement)
.SubscribeOn(NewThreadScheduler.Default)
.ObserveOn(SynchronizationContext.Current)
.Subscribe(shaft => { }, OnCompleted, _tokenSource.Token);
while (TryTake(T item)) и обрабатывает.
Вы изобрели Enterprise Service Bus только в рамках одного процесса.
А так же на практике, не без изъянов, реализовали паттерн Producer-Consumer.
Посмотрите в сторону TPL (DataFlow Blocks)
Я всем своим коллегам говорю — вы прикладные программисты, и всё что вы изобрели сегодня, до вас уже изобрели.
В частности если взять BufferBlock при вызове Receive только один поток получит сигнал который отправили в Post. Пример:
var bufferBlock = new BufferBlock<int>();
Task.Factory.StartNew(() => {
for (int i = 0; i < 3; i++)
{
Thread.Sleep(3000);
bufferBlock.Post(i);
}
});
Task.Factory.StartNew(() => {
for (int i = 0; i < 3; i++)
{
Console.WriteLine($"{bufferBlock.Receive()} threadId={Thread.CurrentThread.ManagedThreadId}");
}
});
for (int i = 0; i < 3; i++)
{
Console.WriteLine($"{bufferBlock.Receive()} threadId={Thread.CurrentThread.ManagedThreadId}");
}
Основная моя идея это не создание очереди сообщений а синхронизация потоков в том числе в разных процессах.
Вопрос как раз в том, зачем вам нужна синхронизация потоков (тем более в разных процессах), когда "современный тренд" — это как раз максимально избегать синхронизации.
Текущая модель с Task и IAsyncResult а так же TPL в целом решают все проблемы при должном проектировании
Я не утверждаю что область применения такого подхода широка, нет она даже очень специфична.
В качестве примера можно привести asp.net приложение где может быть несколько процессов (сколько выставили в IIS), в каждом процессе несколько потоков.
Вот к Вам пришел клиент и Вам нужно сделать дорогой запрос к базе, как синхронизировать потоки так что бы был только один запрос?
ISignal про получение сигналов о событиях в другом потоке.
Что бы было наглядно вот тест:
[TestMethod]
public void PingPong()
{
var signal = SignalFactory.GetInstanse<string>();
var task = Task.Factory.StartNew(() => {
for(int i = 0;i<10;i++)
{
Debug.WriteLine(signal.Receive());
Thread.Sleep(30);
signal.Send("pong");
}
});
Task.Factory.StartNew(() => {
Thread.Sleep(10);
for (int i = 0; i < 10; i++)
{
signal.Send("ping");
Debug.WriteLine(signal.Receive());
Thread.Sleep(30);
}
});
task.Wait();
}
pong
ping
pong
ping
pong
ping
pong
ping
pong
ping
pong
ping
pong
ping
pong
ping
pong
ping
IObservable<string>
Ну или ISubject<string>
, если вам вот прямо надо, чтобы оно было двухсторонним, хотя я бы делил на observable и observer, это упрощает зависимости.
В том-то и дело, что они предоставляют не колбэки, а поток событий, над которым определены полезные операции. В частности, на этом потоке можно выбрать первый элемент (в будущем!), сконвертировать этот элемент в таск, и сделать тому await или Wait — вот и ваш Receive, только, на выбор потребителя, блокирующий или асинхронный.
Но Вы подкинули пищу как можно оптимизировать данный механизм.
Создать одну задачу внутри класса с блокировкой, остальные потоки в Receive обращаются к Result этой задачи.
Как итог:
- Нужно блокировать только один поток(внутренний таск)
- У обычных потоков будет такая же блокировка как и сейчас
- Поток из пула освободится для выполнения других задач.
Нужно только прикинуть что будет дороже, примитивы синхронизации или вся эта пляска с тасками.
Смысл не в том что бы вернуть задачу, а заблокировать выполнение текущего потока до получения сигнала.
Говорят же вам: блокировать потоки — плохо. Но, что важнее, таск легко позволяет заблокировать поток до своего выполнения, причем с конфигурируемым таймаутом и кооперативной отменой.
Создать одну задачу внутри класса с блокировкой, остальные потоки в Receive обращаются к Result этой задачи.
Как итог:
- Нужно блокировать только один поток(внутренний таск)
- У обычных потоков будет такая же блокировка как и сейчас
- Поток из пула освободится для выполнения других задач.
Вообще-то, ничего не изменится. Просто раньше у вас потоки блокировались на WaitOne
, а станут блокироваться на Result
.
Пока вы таск не вытащите в публичный интерфейс, ничего не изменится.
Вообще-то, ничего не изменится. Просто раньше у вас потоки блокировались на WaitOne, а станут блокироваться на Result.
Соль то реализации в блокировке исполнения до получения сигнала.
Вообще-то, ничего не изменится. Просто раньше у вас потоки блокировались на WaitOne, а станут блокироваться на Result
По поводу блокировке на Result, немного расскажу в чем Task отличается от обычного потока на примере.
Пример 1:
ThreadPool.SetMaxThreads(1000, 1000);
var task1 = Task.Factory.StartNew(() => {
Console.WriteLine($"Task ThreadId {Thread.CurrentThread.ManagedThreadId}");
Thread.Sleep(10000);
return -1;
});
int mainThreadId = Thread.CurrentThread.ManagedThreadId;
Console.WriteLine($"mainThreadId {mainThreadId}");
var task2 = Task.Factory.StartNew(() => {
for (int i = 0; i < 1000; i++)
{
Task.Factory.StartNew(() =>
{
if (Thread.CurrentThread.ManagedThreadId == mainThreadId)
{
Console.WriteLine($"other thread with id {Thread.CurrentThread.ManagedThreadId }");
}
Console.WriteLine($"thread create {Thread.CurrentThread.ManagedThreadId }");
Thread.Sleep(2000);
});
}
});
int result = task1.Result;
Console.ReadKey();
Пример 2:
ThreadPool.SetMaxThreads(1000, 1000);
Task.Factory.StartNew(() => {
var task1 = Task.Factory.StartNew(() => {
Console.WriteLine($"Task ThreadId {Thread.CurrentThread.ManagedThreadId}");
Thread.Sleep(10000);
return -1;
});
int mainThreadId = Thread.CurrentThread.ManagedThreadId;
Console.WriteLine($"mainThreadId {mainThreadId}");
var task2 = Task.Factory.StartNew(() => {
for (int i = 0; i < 1000; i++)
{
Task.Factory.StartNew(() =>
{
if (Thread.CurrentThread.ManagedThreadId == mainThreadId)
{
Console.WriteLine($"other thread with id {Thread.CurrentThread.ManagedThreadId }");
}
Console.WriteLine($"thread create {Thread.CurrentThread.ManagedThreadId }");
Thread.Sleep(2000);
});
}
});
int result = task1.Result;
Console.ReadKey();
});
Второй пример отличается от первого только в том что обернут Task.Factory.StartNew.
То есть вся логика второго примера будет в потоке из пула потоков, первого примера в обычной потоке.
При выполнении первого примера Вы на строке int result = task1.Result; заблокируете поток, во втором случае Вы его не заблокируете а просто переназначите его на другую задачу.
В первом случае Вы не увидете строку other thread with id никогда, во втором же случае Вы увидите её несколько раз так как поток исполнения переназначен на другие задачи а продолжение после int result = task1.Result; по сути является callBackом.
Является ли текущий поток Taskом или обычным потоком можно узнать свойством
Task.CurrentId
Соль то реализации в блокировке исполнения до получения сигнала.
Практическое отличие-то в чем?
При выполнении первого примера Вы на строке int result = task1.Result; заблокируете поток, во втором случае Вы его не заблокируете а просто переназначите его на другую задачу.
Result
всегда блокирует поток, в котором находится (т.е. после него вы всегда окажетесь в том же потоке). Другое дело, что диспетчер — пользуясь своей собственной внутренней логикой, никак вам не подконтрольной — может решить выполнять таск синхронно (т.е., в том же потоке), или же асинхронно (в другом потоке):
If the current task has not started execution, theWait
[Result
вызывает Wait] method attempts to remove the task from the scheduler and execute it inline on the current thread.
Но это валидно тогда и только тогда, когда таск еще не начал выполнение — и когда его можно перенести в текущий тред (что верно не для всех тасков).
Проще говоря, если у вас есть один таск, который где-то как-то уже выполняется, и десять потоков, в которые вы этот таск передали, то если вы в этих потоках скажете task.Result
(или task.Wait()
), то вы получите десять заблокированных потоков (и это не считая того места, где таск выполняется). И это будет как раз ваш сценарий — все потоки, в которых будет вызван Receive
, будут заблокированы.
А вот если вы скажете await task
, то система вполне может эти потоки переиспользовать для чего-нибудь полезного (а continuation вызвать там, где сочтет нужным).
ThreadPool.SetMaxThreads(1000, 1000);
Task.Factory.StartNew(() => {
var task1 = Task.Factory.StartNew(() => {
Console.WriteLine($"Task ThreadId {Thread.CurrentThread.ManagedThreadId}");
Thread.Sleep(10000);
return -1;
});
int mainThreadId = Thread.CurrentThread.ManagedThreadId;
Console.WriteLine($"mainThreadId {mainThreadId}");
var task2 = Task.Factory.StartNew(() => {
for (int i = 0; i < 1000; i++)
{
Task.Factory.StartNew(() =>
{
if (Thread.CurrentThread.ManagedThreadId == mainThreadId)
{
Console.WriteLine($"other thread with id {Thread.CurrentThread.ManagedThreadId }");
}
Console.WriteLine($"thread create {Thread.CurrentThread.ManagedThreadId }");
Thread.Sleep(2000);
});
}
});
int result = task1.Result;
}).Wait();
Console.ReadKey();
Как можно объяснить что некоторые задачи запускаются с тем же ThreadId что и Task который в блокировке ждет результата?
Цитирую еще раз:
If the current task has not started execution, the Wait method attempts to remove the task from the scheduler and execute it inline on the current thread.
Простой способ проверить, что это не обязательно так:
Console.WriteLine($"Main ThreadId {Thread.CurrentThread.ManagedThreadId}");
Task.Factory.StartNew(() => {
Console.WriteLine($"Processing ThreadId {Thread.CurrentThread.ManagedThreadId}");
var task = Task.Factory.StartNew(() => {
Console.WriteLine($"Task ThreadId {Thread.CurrentThread.ManagedThreadId}");
Thread.Sleep(1000);
Console.WriteLine("Finishing task");
});
var cts1 = new TaskCompletionSource<int>();
var cts2 = new TaskCompletionSource<int>();
ThreadPool.QueueUserWorkItem(_ => {
Console.WriteLine($"Waiting for task on ThreadId {Thread.CurrentThread.ManagedThreadId}");
task.Wait();
Console.WriteLine($"Finished waiting for task on ThreadId {Thread.CurrentThread.ManagedThreadId}");
cts1.SetResult(0);
}
);
ThreadPool.QueueUserWorkItem(_ => {
Console.WriteLine($"Waiting for task on ThreadId {Thread.CurrentThread.ManagedThreadId}");
task.Wait();
Console.WriteLine($"Finished waiting for task on ThreadId {Thread.CurrentThread.ManagedThreadId}");
cts2.SetResult(0);
}
);
Task.WaitAll(cts1.Task, cts2.Task);
}).Wait();
… эээ, а где вы ожидаете результатов task2
? Иными словами, откуда вы знаете, что таск не получает свой десятый поток после того, как закончилось ожидание task1.Result
, и, как следствие, внешний Wait
?
Когда работал с Qt (немного) бывало несколько раз выстреливал себе в ногу сигналами как раз из-за их синхронизации. Легко словить дедлок, мне кажется.
~Signal()
{
if (!isDisposabled)
{
Dispose();
}
}
вроде известная вещь — декструктор/финализатор должен реализовывать только класс, который сам аллоцирует/выделает неуправляемые ресурсы. нет?
Сигналы на c#