Flow - потоковая обработка данных
В разработке используются в основном 2 приема обработки данных: либо загрузка всех данных в оперативную память, либо обработка небольшими порциями по мере чтения. Первый способ прост и годится для небольших объемов, второй - сложнее, но является единственным решением при обработке больших объемов данных.
Оба подхода к обработке данных можно встретить, например, при обработке XML. К первому типу относится библиотека XML::LibXML
. К потоковым относятся XML::Parser
, XML::SAX
, XML::ExtOn
. На основе потоковой библиотеки XML::ExtOn
построена, например, реализация Perl6::Pod
.
Однако все больше приходится использовать потоковую обработку не только в отношении XML. И тут такие сущности, как пространства имен и тэги оказываются лишними. Для подобных случаев я создал библиотеку Flow [1]. По сути FLow
- это XML::SAX::Machines
[2] в максимально упрощенном виде: выброшен XML и SAX, а осталась лишь идея композиции - Machines ( то, что требуется для построения роботов! ).
Простое должно оставаться простым
Концептуально библиотека Flow
похожа на набор привычных утилит: grep, tail, head, которые используются ежедневно в командной строке. Они ориентированы на построчную потоковую обработку и будучи объединенными в pipe реализуют необходимую логику. Например:
cat test_file.txt | grep MyLine
Данная команда выведет строки из файла, содержащие "MyLine".
Идея компоновки простых инструментов в "pipe" используется в Flow. Указанный ниже код реализует аналогичную логику:
use Flow;
my $f = create_flow(
Grep=>qr/MyLine/,
sub {print @_; \@_}
);
open FH, "<test_file.txt";
$p = $f->parser;
$p->begin;
while (my $str = <FH> ) {
$p->flow($str)
}
$p->end;
Особенностью Flow
, является то, что процесс обработки потока позволяет применять логику к буферу, содержащим несколько элементов данных.
my $f = create_flow( Splice=>20, Grep=>qr/MyLine/)
Благодаря этому достигается компромисс в виде блочной обработки данных. Это позволяет снизить процессорные затраты на вызовы методов. К тому же варьируя размер блока данных ( Splice
) можно регулировать использование памяти.
Пример использования библиотеки Flow
Чтобы продемонстрировать возможности библиотеки, рассмотрим небольшую задачу. Пусть нам необходимо найти произведение чисел, хранящихся в файле. Содержимое файла следующее:
1
2
0
3
1
5
Таким образом имеем простой файл, в каждой строке которого находится число. Код, решающий эту задачу, выглядит следующим образом:
use Flow;
my $res = 1;
my $f = create_flow(
#Печатаем каждую прочитанную строку
sub { print "process:",@_,"\n" ;\@_},
#Результат произведения накапливаем в
# переменной $res
sub { $res *= $_ for @_}
);
open FH, "<test_file.txt";
$p = $f->parser;
$p->begin;
while (my $str = <FH> ) {
$p->flow($str)
}
$p->end;
# вывод результата
print "$res \n";
Результат работы скрипта следующий:
process:1
process:2
process:0
process:3
process:1
process:5
result: 0
Результат произведения чисел равен 0. Для этого была прочитана каждая строка файла и произведено действие умножения.
В приведенном примере обработка завершиться как только будет достигнут конец файла. Для нашей задачи результат становится очевидным, если будет встречен 0, так как если среди множителей есть 0, то и произведение будет равно 0. Дальнейшая выборка чисел и обработка не нужна.
Особенностью Flow
является обратная связь, существующая между циклом формирования потока данных и элементами, обрабатывающими этот поток. Возможна ситуация, когда дальнейшая обработка не имеет смысла. Например, необходимая информация была найдена или встречены данные, при которых продолжение вычислений теряет смысл ( как в рассмотренном примере ).
Реализуется данный механизм исходя из следующего соглашения. Если блок кода или метод объекта в последовательности обработчиков возвращает undef
или ссылку на массив, то производится дальнейшая обработка данных. Иначе данный результат рассматривается как особый и возвращается в инициирующий поток цикл. В нашем примере таким циклом является построчное чтение из файла.
C учетом описанных особенностей модифицируем код примера:
use Flow;
my $res = 1;
my $f = create_flow(
sub { print "process:",@_,"\n" ;\@_},
sub {
for (@_) {
#Возращаем 1 как признак особого случая
$res *= $_ or return 1
}
\@_ # передача данных далее "по цепочке"
}
);
open FH, "<test_file.txt";
$p = $f->parser;
$p->begin;
while (my $str = <FH> ) {
# прекращаем дальнейшую обработку
last if $p->flow($str)
}
$p->end;
# вывод результата
print "$res \n";
Результат будет следующим:
process:1
process:2
process:0
result: 0
Как видим возможно управлять процессом формирования потока данных из компонент, обрабатывающих сам поток данных.
Именно наличие подобной управляющей обратной связи [3], позволяет достигать оптимальных результатов при обработке данных.
Данный пример прост, но общее понимание принципа работы библиотеки Flow
поможет при ее использовании в более сложных задачах.
[1]Flow - библиотека для обработки потока данных. http://search.cpan.org/perldoc?Flow
[2]Introducing XML::SAX::Machines. http://www.xml.com/pub/a/2002/02/13/sax-machines.html
[3] Обратная связь. http://ru.wikipedia.org/wiki/Обратная_свзяь. Наиболее полную информацию об обратных связях можно почерпнуть из курса по "Автоматизированным Системам Управления".