Как стать автором
Обновить

TPL + DLR = Многопоточный скриптинг

Время на прочтение14 мин
Количество просмотров4.5K
Я давно хотел поизучать «TPL» (Task Parallel Library) и «DLR» (Dynamic Languages Runtime). Для этого мне нужна была конкретная и, желательно, достаточно актуальная задача. В одном из моих переводов рассказывалось о так называемых «игровых циклах». Рассмотренная там тема для меня довольно интересна сама по себе и к тому же связка TPL+DLR подходит для той задачи как нельзя лучше, на мой взгляд. Так я пришел к идее о реализации легковесного асинхронного скриптового движка, который можно было бы относительно легко прикрутить к разным приложениям (в том числе к играм). Ядро движка я решил реализовать на C#. Выбор между динамическими языками в моем случае даже и не стоял. Я для этих целей уже давно облюбовал Ruby. Какое-то время я вынашивал идею, время от времени размышляя о ней на досуге.

Постановка задачи


Итак, хочется поиметь возможность запустить асинхронно или на каком-то конкретном потоке несколько скриптов. Для этого мне понадобится некий сервис, который будет отвечать за следующие задачи:
  • хранение ссылок на работающие задачи со скриптами
  • запрос останова конкретной скриптовой задачи
  • отслеживание состояния задач
  • логгирование вывода в консоль (Stdout и Stderr)
  • нотификация о текстовом выводе в консоль широковещательным событием

В общем, примерно такая схема

Что там изображено? Корневой узел — "IRE.Instance" — это синглтон, который будет выполнять роль того самого сервиса, отвечающего за координацию работы скриптов. Для этого в экземпляре синглтона будет храниться Dictionary с записями для каждой задачи, добавляемой через функции "RunScriptWithScheduler" и "RunScriptAsync". Как можно догадаться из названия, отличаться эти функции будут планировщиком, под управлением которого будет запущена задача. С помощью «RunScriptWithScheduler» можно будет запустить задачу, к примеру, на потоке GUI. Методы "WriteMessage" и "WriteError" будут доступны отовсюду (в том числе из скриптов) и предназначены для вывода в лог сообщений. На стороне скриптов можно будет переопределить стандартные методы вывода в консоль, чтобы перенаправлять текст в лог сервиса.
Что из себя будет представлять запись в словаре задач сервиса? Ключом пусть будет уникальный GUID, который будет идентифицировать запущенный скрипт. Этот GUID можно будет легко передать как стороне, запросившей запуск скрипта, так и внедрить в контекст самого сценария. Значение записи должно как минимум хранить ссылки на CancelationToukenSource и Task. CancelationToukenSource, с которым создается Task скрипта, позволит в любой момент сигнализировать задачу о том, что надо бы закругляться. А ссылка на саму Task'у позволит нам навесить на нее Continuation (подробно TPL тут расписывать не буду).

Сервис IRE


Начну пожалуй с описания ядра движка, а именно сервиса, выполненного в виде синглтона. Реализация синглтона взята с MSDN. Там же есть ссылка на довольно подробный разбор разных реализация синглтона на ЯП JAVA. Очень рекомендую почитать, если кто не читал еще.
Итак, сервис я реализовал в виде мультипоточного «Double-Checked Locking» синглтона. Код базовой реализации будет примерно таким:
Синглтона код комментированный
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using IronRuby;
using Microsoft.Scripting;
using Microsoft.Scripting.Hosting;
using System.IO;
using System.Threading.Tasks;
using System.Diagnostics;
using Microsoft.Scripting.Runtime;

namespace IREngine
{
    public sealed class IRE
    {
        #region Singleton

        private static volatile IRE _instance;
        private static readonly object SyncRoot = new Object();

        private IRE()
        {
            // Позже здесь будет многабукв
            // Выполнится только при самом первом обращении к IRE.Instance
            // Можете проверить, поставив тут брейк и прописав в каком-нибудь обработчике (на кнопке, например) следующий код
            // var instance = IRE.Instance;
        }

        public static IRE Instance
        {
            get
            {
                if (_instance == null)
                {
                    lock (SyncRoot)
                    {
                        if (_instance == null)
                            _instance = new IRE();
                    }
                }

                return _instance;
            }
        }

        #endregion

        #region Consts

        // Тут будут объявлены константы

        #endregion

        #region Fields

        // Здесь будут приватные поля

        #endregion

        #region Properties

        // Здесь разместим свойства и события

        #endregion

        #region Private Methods

        // Эта секция зарезервирована под приватные методы

        #endregion

        #region Public Methods

        // В этой секции будут размещены публичные методы

        #endregion

    }
}


Думаю, из коментариев структура понятна. По коду разъяснять особо нечего. Дам лишь пару коментариев по сути синглтона. Вместо синглтона можно было бы использовать просто статический хэлпер. Но жизненный цикл синглтона легче контролировать. К примеру, момент вызова статического конструктора недетерменирован, а вызов же нестатичного конструктора произойдет лишь при первом обращении к инстансу. Благодаря этому, если до начала работы синглтона в будущем понадобится проинициализировать что-либо, мы сможем чувствовать себя в относительной безопасности. Еще одна интересная вещь — двойная проверка на «null». Два потока могут одновременно оказаться внутри первого условия, но «lock» выполнится лишь на одном из них. Второй будет ждать, когда первый поток выйдет из критической секции. После освобождения критической секции второй поток должен снова проверить инстанс на «null». Таким образом, мы не допустим ситуации создания двух экземпляров синглтона параллельно запущенными потоками.

Кратко об использовании TPL


Теперь кратко опишу азы TPL. Подробнее можете почитать на CodeProject.
Первые асинхронные задачи, которые мы создадим, будут выполнять слежение за буферами консольного вывода. Как я писал вначале, одной и функций сервиса будет логирование вывода сообщений и ошибок в консоль. Итак, что нам понадобится?
  • Action проверки наличия изменений в буфере Output
  • Action обработки ошибок при проверке буфера Output
  • Action проверки наличия изменений в буфере Error
  • Action обработки ошибок при проверке буфера Error
  • Публичный метод, который будет создавать экземляры Task для выполнения Action'ов

Разместим инициализацию Action'ов в приватный конструктор синглтона
Action-ов инициализация

private IRE()
{
	// Позже сюда добавится инициализация Ruby
        _outStringBuilder = new StringBuilder();
        _errStringBuilder = new StringBuilder();

	_outWatchAction = () =>
	{
	  int i = 0;
	  while (IsConsoleOutputWatchingEnabled)
	  {
		  string msg = string.Format("***\t_outWatchTask >> tick ({0})\t***", i++);
		  Debug.WriteLine(msg);
		  WriteMessage(msg);
		  
		  Task.Factory.CancellationToken.ThrowIfCancellationRequested();

		  int currentLength = OutputBuilder.Length;
		  if (OutputUpdated != null && currentLength != _lastOutSize)
		  {
			  OutputUpdated.Invoke(_outWatchTask,
								   new StringEventArgs(OutputBuilder.
														   ToString(_lastOutSize,
																	currentLength - _lastOutSize)));

			  _lastOutSize = currentLength;
		  }
		  Thread.Sleep(TIME_BETWEEN_CONSOLE_OUTPUT_UPDATES);
	  }
	};
	_outWatchExcHandler = (t) =>
	{
	  if (t.Exception == null)
		  return;

	  Instance.WriteError(
		  string.Format(
			  "!!!\tException raised in Output Watch Task\t!!!\n{0}",
			  t.Exception.InnerException.Message));
	};
	_errWatchAction = () =>
	{
	  int i = 0;
	  while (IsConsoleErrorWatchingEnabled)
	  {
		  string msg = string.Format("***\t_errWatchTask >> tick ({0})\t***", i++);
		  Debug.WriteLine(msg);
		  WriteError(msg);

		  Task.Factory.CancellationToken.ThrowIfCancellationRequested();

		  int currentLength = ErrorBuilder.Length;
		  if (ErrorUpdated != null && currentLength != _lastErrSize)
		  {
				ErrorUpdated.Invoke(_errWatchTask,
									new StringEventArgs(ErrorBuilder.
														ToString(_lastErrSize, currentLength - _lastErrSize)));
			  
			  _lastErrSize = currentLength;
		  }
		  Thread.Sleep(TIME_BETWEEN_CONSOLE_OUTPUT_UPDATES);
	  }
	};
	_errWatchExcHandler = (t) =>
	{
	  if (t.Exception == null)
		  return;

	  Instance.WriteError(
		  string.Format(
			  "!!!\tException raised in Error Watch Task\t!!!{0}",
			  t.Exception.InnerException.Message));
	};
}


Ниже представлены объявления нужных свойств, констант и приватных полей.
Полей, свойств, констант инициализация

        #region Consts

        public readonly int TIME_BETWEEN_CONSOLE_OUTPUT_UPDATES = 1000;

        #endregion

        #region Fields

        private bool _outWatchEnabled;
        private bool _errWatchEnabled;

        private Task _outWatchTask;
        private readonly CancellationTokenSource _outWatchTaskToken = new CancellationTokenSource();
        private int _lastOutSize = 0;
        private Task _errWatchTask;
        private readonly CancellationTokenSource _errWatchTaskToken = new CancellationTokenSource();
        private int _lastErrSize = 0;
        private readonly StringBuilder _outStringBuilder;
        private readonly StringBuilder _errStringBuilder;

        private readonly Action _outWatchAction;
        private readonly Action<Task> _outWatchExcHandler;
        private readonly Action _errWatchAction;
        private readonly Action<Task> _errWatchExcHandler;

        private readonly CancellationTokenSource _scriptsToken = new CancellationTokenSource();
        #endregion
        #region Properties

        public StringBuilder OutputBuilder
        {
            get
            {
                lock (SyncRoot)
                {
                    return _outStringBuilder;
                }
            }
        }

        public StringBuilder ErrorBuilder
        {
            get
            {
                lock (SyncRoot)
                {
                    return _errStringBuilder;
                }
            }
        }

        public bool IsConsoleOutputWatchingEnabled
        {
            get
            {
                lock (SyncRoot)
                {
                    return _outWatchEnabled;
                }
            }
            set
            {
                lock (SyncRoot)
                {
                    _outWatchEnabled = value;
                }
            }
        }

        public bool IsConsoleErrorWatchingEnabled
        {
            get
            {
                lock (SyncRoot)
                {
                    return _errWatchEnabled;
                }
            }
            set
            {
                lock (SyncRoot)
                {
                    _errWatchEnabled = value;
                }
            }
        }

        public event EventHandler<StringEventArgs> OutputUpdated;
        public event EventHandler<StringEventArgs> ErrorUpdated;

        #endregion


Что делает этот код? Периодически производится сравнение длины StringBuilder'ов, и если длины изменились, то дергаются соответствующие события. В хэндлерах этого события (на стороне GUI) выполняется вывод на экран добавленного тектса сообщений. Нужно заметить, что на стороне графического интерфейса код в обработчике будет выполнен также внутри Task, но с явным указанием для таски планировщика, полученного от графического интерфейса. Т.к. событие дергается в асинхронной задаче, выполняемой НЕ в потоке GUI, то код в обработчике не будет иметь прямого доступа к элементам интерфейса. Мы должны создать Action и запустить его на потоке интерфейса, явно указав Scheduler главного окна приложения. Ниже представлен код обработчика события на стороне графического интерфейса.
Обработчика события код

IRE.Instance.OutputUpdated += (s, args) =>
{
    string msg = string.Format("***\tIRE >> Out Updated callback\t***\nResult: {0}\n***\tEND\t***\n",args.Data);
    Debug.WriteLine(msg);

    var uiUpdateTask = Task.Factory.StartNew(() =>
                                                {
                                                    OutputLogList.Items.Add(msg);
                                                },
                                            Task.Factory.CancellationToken,
                                            TaskCreationOptions.None,
                                            _uiScheduler);
    uiUpdateTask.Wait();
};
IRE.Instance.ErrorUpdated += (s, args) =>
{
    string msg = string.Format("!!!\tIRE >> Err Updated callback\t!!!\nResult: {0}\n!!!\tEND\t!!!\n", args.Data);
    Debug.WriteLine(msg);
                                                 
    var uiUpdateTask = Task.Factory.StartNew(() =>
                                                {
                                                    ErrorLogList.Items.Add(msg);
                                                },
                                            Task.Factory.CancellationToken,
                                            TaskCreationOptions.None,
                                            _uiScheduler);
    uiUpdateTask.Wait();
};
IRE.Instance.StartWatching();


Этот код добавляет подписку на событие обновления буферов консольного вывода. Главное, на что следует обратить внимание в нем, это "_uiScheduler". Это созданная в конструкторе главного окна ссылка на его (окна) планировщика. Создается эта ссылка следующим образом.

public MainWindow()
{
    InitializeComponent();

    _uiScheduler = TaskScheduler.FromCurrentSynchronizationContext();
}

Все Task'и, создаваемые с указанием этого планировщика будут запущены на потоке графического интерфейса, независимо от того, на каком потоке они инстанциируются. Замыкания на графические элементы интерфейса не вызовут исключения кроспоточного доступа.
Теперь о строке "uiUpdateTask.Wait();". Эту строку желательно обрамить в "try — catch". Все исключения, возникающие внутри Task'и передаются в создавший задачу поток не сразу. Одним из способов получить к исключениям доступ в вызывающем потоке является именно вызов функции «Wait()». Также можно довесить на таску «Continuation» и уже в нем похэндлить исключения. Не важно как, но обработать исключения вы обязаны. Иначе исключения будут переданы в вызывающий поток, когда до таски доберется «GarbageCollector». В какой момент это произойдет — неизвестно. Поэтому для приложения это может быть фатальным.
Я в данном случае для простоты не стал добавлять "try — catch", но позже скорее всего добавлю. Сейчас код здесь прост достаточно, а в будущем все может поменяться.
Теперь осталось привести код создания задач слежения за консольным выводом.

public void StartWatching()
{
    StopWatching();
    if (_outWatchTask != null)
        _outWatchTask.Wait();

    if (_errWatchTask != null)
        _errWatchTask.Wait();
            
    IsConsoleOutputWatchingEnabled = IsConsoleErrorWatchingEnabled = true;

    _outWatchTask = Task.Factory.StartNew(_outWatchAction, _outWatchTaskToken.Token);
    _outWatchTask.ContinueWith(_outWatchExcHandler, TaskContinuationOptions.OnlyOnFaulted);
    _errWatchTask = Task.Factory.StartNew(_errWatchAction, _errWatchTaskToken.Token);
    _errWatchTask.ContinueWith(_errWatchExcHandler, TaskContinuationOptions.OnlyOnFaulted);
}

Здесь все тоже просто. На всякий случай вызываем останов задач слежения ("StopWatching();"). Вызов этой функции просто выставляет в false флаги "IsConsoleOutputWatchingEnabled" и "IsConsoleErrorWatchingEnabled", а также запращивает останов через "CancelationToken". Я мог бы ограничиться одним лишь токеном. Но вообще запрос останова через токен считается аварийным остановом. Есть даже специальная функция "Task.Factory.CancellationToken.ThrowIfCancellationRequested();", вызов которой заставить таску выбросить исключение, если во время исполнения задачи будет получен запрос отмены через токен. Тут стоит оговориться, что вызов этой функции и вообще проверка состояния CancelationToken'а достаточно затратная процедура. Поэтому ее желательно делать как можно реже.
Следующее, на что хочу обратить внимание, это конструкция вида "_outWatchTask.ContinueWith(_outWatchExcHandler, TaskContinuationOptions.OnlyOnFaulted);". Этот код навешивает на задачу так называемое «продолжение» (Continuation). Причем продолжение создается с опцией "TaskContinuationOptions.OnlyOnFaulted". Такое продолжение вызовется только в случае, когда «AggregatedException» задачи содержит хотя бы одно исключение. Попросту говоря, если задача завершилась штатно, то это продолжение проигнорируется. Еще нужно заметить, что исключений может быть несколько. Ведь мы можем внутри этой задачи создать вложенные подзадачи. Когда вложенные задачи будут собираться "GC" все не обработанные исключения всплывут к родителю в виде того же «AggregatedException». Таким образом, можно получить целое дерево вложенных исключений. Для превращения этого дерева исключений в плоский список имеется специальный метод "AggregatedException.Flatten()".

Основы встраивания скриптов


Теперь поговорим о самом скриптовании. Для начала нам нужно создать «ScriptEngine» и загрузить в него необходимые сборки .Net нашего приложения. Делается это просто:

_defaultEngine = Ruby.CreateEngine((setup) => {
    setup.ExceptionDetail = true;
});
_defaultEngine.Runtime.LoadAssembly(typeof(IRE).Assembly);

Этот код я добавил в начало приватного конструктора синглтона. Первая часть кода, собственно, создает экземпляр «ScriptEngine». Переданная в качестве параметра ламбда позволяет произвести настройку движка. Делать это необязательно. Но с помощью такого подхода можно запретить ScriptEngine компилировать код Ruby, чтобы скрипты каждый раз интерпретировались. Это полезно, к примеру, на платформе WindowsPhone 7, т.к. сэкономит память. Я же просто включил подробный вывод информации об исключениях.
Вторая часть просто загружает сборку с нашим сервисом в рантайм движка Ruby. Без этого мы не сможем общаться с нашим приложением из скриптов. Таким же способом можно загрузить и другие нужные сборки. Желательно вообще вынести эту загрузку сборок в отдельный метод, чтоб легче было добавлять новые сборки и при этом не мозолила глаза простыня однотипных строк кода.
Осталось привести код функции добавления асинхронной таски скрипта и нескольких сервисных методов.
Добавленья скрипта задачи асинхронной код
public Guid RunScriptAsync(string code)
{
    var scriptScope = _defaultEngine.CreateScope();
    CompiledCode compiledCode = null;
    try
    {
        ScriptSource scriptSource = _defaultEngine.CreateScriptSourceFromString(code, SourceCodeKind.AutoDetect);

        var errListner = new ErrorSinkProxyListener(ErrorSink.Default);
        compiledCode = scriptSource.Compile(errListner);
    }
    catch (Exception ex)
    {
        WriteError(ex.Message);
        return Guid.Empty;
    }
            
    var action = new Action(() => compiledCode.Execute(scriptScope));
    var tokenSource = new CancellationTokenSource();
    var task = new Task(action, tokenSource.Token, TaskCreationOptions.LongRunning);
    var guid = Guid.NewGuid();
    AddTask(guid, new TaskRecord { TokenSource = tokenSource, Task = task });

    task.Start();
    task.ContinueWith((t) =>
                            {
                                // Actually we don't needed this due to "TaskContinuationOptions.OnlyOnFaulted" is set
                                if (t.Exception == null)
                                    return;

                                t.Exception.Flatten().Handle((ex) =>
                                                        {
                                                            Instance.WriteError(t.Exception.InnerException.Message);
                                                            return true;
                                                        });
                            }, TaskContinuationOptions.OnlyOnFaulted);
    task.ContinueWith((t) =>
                        {
                            Instance.RemoveTask(guid);
                        });
    return guid;
}

public void AddTask(Guid key, TaskRecord record)
{
    _tasks.Add(key, record);
}

public void RemoveTask(Guid key)
{
    _tasks.Remove(key);
}

public Task GetTask(Guid key)
{
    return _tasks.ContainsKey(key) ? _tasks[key].Task : null;
}

public void RequestCancelation(Guid key)
{
    var tokenSource = _tasks.ContainsKey(key) ? _tasks[key].TokenSource : null;

    if (tokenSource == null)
        return;

    tokenSource.Cancel();
}

Что мы тут имеем? Вначале создаем ScriptScope. Движок Ruby будем использовать дефолтный, а вот ScriptScope у каждого сценария будет свой. Область определения всех переменных и результатов исполнения сценария будет ограничена этими скоупами.
Далее внутри «try — catch» создаются ScriptSource и CompiledCode. «try — catch» необходим, т.к. при создании скомпилированного кода могут возникнуть исключения. В этом случае мы возвращаем пустой Guid, а вызывающая сторона должна адекватно обработать эту ситуацию. Если же скрипт нормально скомпилировался, то можно приступать к созданию задачи выполнения сценария.
Итак, создаем Action, выполняющий скрипт, создаем CancelationTokenSource, новую Task'у с опцией «TaskCreationOptions.LongRunning» (ведь скрипт, потенциально, будет выполняться длительное время). Затем создаем новый Guid для задачи и, наконец, пакуем это все в нувую запись в Dictionary скриптовых задач. Как я и планировал вначале.
После создания мы запускаем задачу и навешиваем на нее продолжение с обработкой исключений. В продолжении задачи мы производим «уплощение» AggregatedException и обрабатываем все исключения (return true;), предварительно записав в лог сообщение об ошибке.
Кроме указанного выше продолжения, мы навешиваем еще одно, которое вызовется при любом завершении задачи. В этом продолжении мы удаляем отработавшую Task'у из словаря, т.к. она нам больше не пригодится.
С сервисными функциями вроде все просто.

Пример использования


Чтобы продемонстрировать работу данного скриптового фреймворка, я создал проект оконного приложения. Разметка окна проста.
<Window x:Class="IREngineTestBed.MainWindow"
        xmlns="http://schemas.microsoft.com/winfx/2006/xaml/presentation"
        xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml"
        Title="MainWindow" Height="350" Width="525">
    <Grid>
        <Grid.ColumnDefinitions>
            <ColumnDefinition />
            <ColumnDefinition />
        </Grid.ColumnDefinitions>
        <Grid.RowDefinitions>
            <RowDefinition />
            <RowDefinition Height="Auto"/>
        </Grid.RowDefinitions>
        <Button x:Name="StartButton" Content="Start" Grid.Row="1" Margin="5" Height="25"
                Click="StartButton_Click"/>
        <Button x:Name="StopButton" Content="Stop" Grid.Row="1" Grid.Column="1" Margin="5" Height="25"
                Click="StopButton_Click"/>

        <ListBox x:Name="OutputLogList" Grid.Column="0" Margin="5"/>
        <ListBox x:Name="ErrorLogList" Grid.Column="1" Margin="5"/>
    </Grid>
</Window>

Заключение


Окно содержит два ListBox'а и две кнопки. Я не стал использовать TextBlock для вывода логов из-за плохой производительности добавления строк (TextBlock.Text += «some string» — есть великое зло, как вы наверняка знаете). Но и ListBox'ы, на самом деле, плохая идея. Просто так текст не скопировать. Вобщем, ListBox'ы лишь для демонстрации положил (на скорую руку). В будущем заменю, наверное, на RichTextBox. В них и виртуализация строк вроде есть, и текст, вроде, аппендится в StringBuilder.
Итак, по кнопке «StartButton» выполняется (ре)старт задач слежения за буферами консольного вывода и задачи выполнения скрипта. Кнопка «StopButton» — все останавливает.
Вот код тестового скрипта.
#!ruby19
# encoding: utf-8

include IREngine

class IRE
    def log(message)
        IRE.Instance.write_message message
    end
    
    def err(message)
        IRE.Instance.write_error message
    end
end

def log(message, is_err = false)
    IRE.Instance.log message unless is_err
    IRE.Instance.err message if is_err
end

5.times{|i|
    log "Hello, Output! (from Ruby Async Task)"
    log "Hello, Error! (from Ruby Async Task)", true
}

raise "\n!!!\tBOOO! Catch super scaring exception from RUBY...\t!!!\n"

В этом коде в класс сервиса добавляются хэлпер-методы вывода сообщений в лог. Причем дополнительные методы добавляются именно в инстанс синглтона. Но если б я поставил перед именами методов префикс «self.» или «IRE», то они были бы созданы как статические методы. Наверное позже так и сделаю (нужно будет отдельно протестить этот кейс).
В теле скрипта мы видим пятикратный вывод строки в лог и проброс исключения. Это исключение будет корректно обработано на стороне сервиса. Вот как выглядит интерфейс после выполнения скрипта.


Заключение


В статье я представил базовую реализацию движка, позволяющего запускать асинхронно несколько скриптов одновременно. На момент написания статьи не все задуманное было реализовано. К примеру, еще нужно проверить корректность обработки ситуации ошибок синтаксиса в скриптах. Также не реализован пока метод запуска скрипта с указанием конкретного планировщика.
За развитием кода можете следить в репозитории на github. Буду рад, если кто-то поучаствует в развитии проекта.

Спасибо за внимание!
Теги:
Хабы:
Всего голосов 9: ↑8 и ↓1+7
Комментарии2

Публикации

Истории

Работа

.NET разработчик
72 вакансии
Ruby on Rails
8 вакансий
Программист Ruby
8 вакансий

Ближайшие события