PDA

View Full Version : مقاله: جاوا و الگوی معماری Pipes and Filters



همایون افشاری
دوشنبه 04 شهریور 1392, 08:18 صبح
جاوا و الگوی معماری Pipes and Filters
خب اولش می خواستم یک برنامه ساده بر اساس الگوی معماری Pipes and Filter ایجاد کنم. در واقع ساده ترین پیاده سازی الگوی Pipes and Filter مدنظرم بود. ایده ی ساده ای است اما به محض اینکه دست به کار شدم به این نتیجه رسیدم که سرفصل های مهمی از برنامه نویسی شی گرا و جاوا باید مرور بشود. این نوشته شاید تنها معرفی گذرایی باشد از :

Parallel-Programming
Multi-Threading
Piped-Streams
Ploymorephism
Anonymous-Implementation
Pipes-and-Filters-Pattern

برای اونهایی برنامه نویسی با جاوا را تازه یاد گرفتن و البته بهش علاقه دارن چیز بدی نیست!

برنامه موازی
فرض کنید مجموعه ای داده همسان وجود دارد و می خواهیم روی آنها پردازش انجام داده و نتیجه را ذخیره کنیم. مثلا فایل بزرگی را در نظر بگیرید که شامل 1 میلیون خط است و هر خط آن یک عدد صحیح را نشان می دهد. حالا می خواهیم برنامه ای بنویسیم که مجموعه ای از توابع ریاضی را روی هر خط (عدد صحیح) اعمال کرده و نتیجه را در فایل خروجی ثبت می کند. مثلا



f1(x)=x^2 f1(5) = 25
f2(x)=x-5 f2(25) = 20
f3(x)=x*13 f3(20) = 260
f(x) = f3(f2(f1(x))) f(5) = 260



توجه کنید که این کار برای تمام خط ها باید یک به یک انجام بگیرد و نتیجه در فایل خروجی ثبت شود.




5 -> f1 -> f2 -> f3 -> 260
3 -> f1 -> f2 -> f3 -> 52



ساده ترین راه حل نوشتن برنامه ای است که به شکل ترتیبی توابع را روی خطوط (یکی بعد از دیگری) اجرا کرده و نتیجه را ثبت می کند. اما فرض کنید هر یک از بخش های پردازشی (توابع) دارای پیچیدگی بوده و هزینه زمانی قابل ملاحظه داشته باشند. بدیهی است که در این حالت زمان زیادی برای تولید خروجی توسط برنامه ترتیبی صرف خواهد شد. مثلا اگر هر بخش پردازشی 1 ثانیه زمان بخواهد برای هر خط 3 ثانیه و برای 1 میلیون خط 3 میلیون ثانیه زمان لازم است!

پس باید به دنبال راه حل دیگری باشیم: برنامه ای که به شکل موازی خطوط را پردازش می کند.

البته توجه کنید که داده و برنامه مورد نظر ما این ویژگی ها را دارد:


پردازش هر عنصر داده ای از دیگری مستقل است. یعنی حاصل اعمال توابع روی یک خط در نتیجه خط دیگر تاثیر ندارد.
پردازشی که روی هر عنصر داده ای انجام می گیرد قابل شکستن است. در این مثال تایع f به سه تابع شکسته شده است. ضمن اینکه هر بخش پردازشی زمان محسوسی صرف می کند.
خروجی و ورودی ها همگی از یک نوع هستند و به عبارتی زبان ارتباطی بخش ها (در اینجا توابع) یکسان است. در این مثال تمام ورودی و خروجی ها عدد صحیح است.

موازی سازی با توجه به چنین ویژگی هایی ممکن خواهد بود. برای اطلاعات بیشتر ببینید:

http://en.wikipedia.org/wiki/Parallel_computing
https://computing.llnl.gov/tutorials/parallel_comp/



الگوی Pipes and Filters
این الگوی یکی از الگوهای استاندارد طراحی معماری است که کاربرد عمده آن در موازی سازی و تشکیل خط لوله پردازشی است. طبق این الگو پیکربندی نهایی برنامه شامل تعدادی Filter است که به شکل همزمان با هم در حال اجرا بوده و بوسیله Pipe ها به یکدیگر متصل شده اند. هر Filter در واقع یک واحد پردازشی در برنامه موازی است که عملیات منحصر به فرد خود را انجام می دهد. یک Pipe عنصری است که واحدهای پردازشی را به هم مرتبط ساخته و خروجی یک Filter را به ورودی Filter دیگر متصل می کند. مدیریت تمام دغدغه های ارتباطی و دریافت و ارسال داده به عهده Pipe هاست. در الگوی استاندارد یک سرچشمه داده (ورودی اصلی کل برنامه) به نام Source و یک محل تجمع داده نهایی (خروجی کل برنامه) به نام Sink تعریف شده است.
توجه کنید که این الگو را می توان در سطح بالای طراحی یعنی معماری سیستم به کار گرفت. در این حالت هر یک از Filter ها در واقع یک قطعه (Component) از برنامه کاربردی است و Pipe ها قطعاتی که وظیفه تبدیلات داده خروجی به داده با پروتکل مشترک و داده با زبان مشترک به داده ورودی را نیز بر عهده دارند (به این تبدیلات به ترتیب marshalling و unmarshalling گفته می شود).
مشهورترین مثال برای الگوی Pipes and Filter برنامه های استاندارد Unix است. به مثال زیر توجه کنید.
فرض کنید بخواهیم از فایل input شماره خطوطی که در آنها ERROR رخ داده است را استخراج کنیم:






workspace$ cat input

10 ERROR
24 WARN
27 ERROR
43 INFO
44 INFO
49 ERROR
52 WARN
61 INFO






با لوله کشی (!) سه برنامه یونیکسی این کار انجام خواهد شد:



workspace$ cat input | grep ERROR | cut -d' ' -f1
10
27
49



برنامه cat محتویات input را در خروجی چاپ می کند، برنامه grep فقط خطوطی که شامل ERROR هستند را به خروجی هدایت می کند و برنامه cut اولین ستون هر خط را (با توحه به جداکننده space) چاپ می کند. سه Filter ما در اینجا (cat و grep و cut) به وسیله pipe یا | به هم متصل شده اند.
منابع مفید در این زمینه :

http://www.amazon.com/dp/0471958697
http://www.cs.olemiss.edu/~hcc/csci581oo/notes/pipes.html



پیاده سازی ساده با جاوا
برگردیم به برنامه اول. می خواهیم برنامه جاوایی بر اساس الگوی Pipes and Filters بنویسیم که مجموعه ای از توابع را روی تک تک خط های یک ورودی به ترتیب اجرا کرده و خروجی متناسب با هر خط را تولید می کند. برای سادگی کار فرض می کنیم هر خطِ ورودی یک عدد صحیح است.

کلاس Filter

import java.io.InputStream;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.util.Scanner;

public class Filter extends Thread {

InputStream input;
OutputStream output;

public void setInputStream(InputStream input) {
this.input = input;
}

public void setOutputStream(OutputStream output) {
this.output = output;
}

@Override
public void run() {
Scanner reader = new Scanner(input);
PrintWriter writer = new PrintWriter(output);
while (reader.hasNextInt()) {
int line = reader.nextInt();
int result = function(line);
writer.write(result + "");
writer.flush();
}
reader.close();
writer.close();
}

private int function(int x) {
return x + 10;
}
}


این Filter دارای یک درگاه ورودی و یک درگاه خروجی است. کار اصلی Filter در متد run انجام می گیرد: تا زمانی که عدد صحیح در ورودی موجود باشد آن را دریافت و بعد از اعمال function در خروجی می نویسد. فعلا function مورد نظر را فقط جمع با 10 در نظر می گیریم. دلیل استفاده از Scanner و PrintWriter ساده تر شدن کد است و اینکه می خواهیم ورودی را خط به خط بخوانیم و بنویسیم.
چون می خواهیم واحدهای پردازشی برنامه به شکل موازی اجرا شوند آنها را به شکل Thread پیاده سازی می کنیم. به طور خلاصه یک Thread در واقع یک نخ اجرایی جداگانه است که می تواند به شکل موازی با دیگر Thread های برنامه به اجرا در بیاید. سیستم هایی که دارای چند پردازشگر هستند می توانند thread ها را به شکل واقعی به صورت همزمان به اجرا در بیاورند. در سیستم های تک پردازنده قابلیت time-sharing در سیستم عامل سبب می شود که گمان کنیم برنامه ها (و در نتیجه thread) ها به شکل همزمان اجرا می شوند. هر برنامه جاوا حداقل یک thread اجرایی به نام main دارد که همان خط اجرایی در تابع main برنامه است. برنامه نویس می تواند Thread های اجرایی مورد نیاز خود را نیز به برنامه بیفزاید. یکی از روش ها (که در اینجا برای پیاده سازی Filter از آن استفاده کرده ایم) ارث بری از کلاس Thread است. برنامه اجرایی Thread در متد run آن قرار می گیرد. با فراخوانی متد start یک Thread شروع به کار می کند. (روش های دیگری برای تعریف Thread ها مثلا با پیاده سازی واسط Runnable وجود دارد که انعطاف بیشتری دارد. در اینجا برای سادگی کد فقط کلاس Thread را extend کرده ایم. به طور کلی بحث multi-threading در جاوا بسیار گسترده و در عین حال شیرین است!).

کلاس Pipe

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;

public class Pipe {

private InputStream in;
private OutputStream out;

public Pipe() throws IOException {
in = new PipedInputStream();
out = new PipedOutputStream((PipedInputStream) in);
}

public InputStream getInputStream() {
return in;
}

public OutputStream getOutputStream() {
return out;
}
}


هر Pipe یک درگاه ورودی و یک درگاه خروجی دارد. کلاس ساده ای است! تنها نکته مهم در این کلاس استفاده از PipedInputStream و PipedOutputStream است. این دو کلاس امکان می دهند جریانهای ورودی و خروجی در ارتباط با هم تعریف شوند. در اینجا out جریان خروجی ای است که ورودی خود را از in می گیرد. چنین جریانهایی می تواند کاربردهای مختلف و متنوعی داشته باشند.

پیکربندی و اجرا
فرض کنید می خواهیم برنامه طوری پیکربندی شود که هر خط ورودی را با 20 جمع کرده و در خروجی بنویسد. به دو Filter (هر کدام یکبار با 10 جمع می کنند) و یک Pipe نیاز داریم:


public class Main {

public static void main(String[] args) throws Exception {

Filter f1 = new Filter();
Filter f2 = new Filter();
Pipe p1 = new Pipe();

f1.setInputStream(System.in);
f1.setOutputStream(p1.getOutputStream());
f2.setInputStream(p1.getInputStream());
f2.setOutputStream(System.out);

f1.start();
f2.start();
}
}


اگر برنامه را اجرا کنید خواهید دید که هر عدد ورودی با 20 جمع شده و نمایش داده می شود:


workspace/PipesAndFilters/bin# java Main
10
30


یا:



workspace/PipesAndFilters/bin# cat input
14
15
13
16
24
90
workspace/PipesAndFilters/bin# cat input | java Main
34
35
33
36
44
110


فایل input شامل چند خط از اعداد صحیح است. برنامه تمامی خطوط را پردازش کرده و نتیجه را در خروجی ظاهر می کند.

آزمون کارآیی
برای درک بهتر اثر کارایی باید کمی تاخیر در کار Filter ایجاد کنیم:


import java.io.InputStream;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.util.Scanner;

public class Filter extends Thread {

InputStream input;
OutputStream output;

public void setInputStream(InputStream input) {
this.input = input;
}

public void setOutputStream(OutputStream output) {
this.output = output;
}

@Override
public void run() {
Scanner reader = new Scanner(input);
PrintWriter writer = new PrintWriter(output);
while (reader.hasNextInt()) {
delay();
int line = reader.nextInt();
int result = function(line);
writer.write(result + "\n");
writer.flush();
}
reader.close();
writer.close();
}

private int function(int x) {
return x + 10;
}

private void delay() {
long DELAY_TIME = 1000; // 1 sec
try {
Thread.sleep(DELAY_TIME);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}


در این حالت اجرای function در هر Filter یک ثانیه زمان می برد به این دلیل که Thread اجرایی آن در محل فراخوانی sleep به مدت 1000 میلی ثانیه (1 ثانیه) متوقف می شود.
برنامه را با ده Filter پیکربندی می کنیم (در نتیجه خروجی باید برابر با ورودی به اضافه 10*10=100 باشد).


public class Main {

public static void main(String[] args) throws Exception {

int FILTER_COUNT = 10;
Filter[] filters = new Filter[FILTER_COUNT];
Pipe[] pipes = new Pipe[FILTER_COUNT - 1];

for (int i = 0; i < FILTER_COUNT; i++)
filters[i] = new Filter();

for (int i = 0; i < FILTER_COUNT - 1; i++) {
pipes[i] = new Pipe();
filters[i].setOutputStream(pipes[i].getOutputStream());
filters[i + 1].setInputStream(pipes[i].getInputStream());
}

filters[0].setInputStream(System.in);
filters[FILTER_COUNT - 1].setOutputStream(System.out);

for (int i = 0; i < FILTER_COUNT; i++)
filters[i].start();
}
}


برنامه را برای فایل با 6 ورودی اجرا می کنیم:



/workspace/PipesAndFilters/bin# time cat input | java Main
114
115
113
116
124
190

real 0m15.133s
user 0m0.200s
sys 0m0.020s


زمان اجرای کامل تقریبا برابر 15 ثانیه و کمتر از زمان لازم برای انجام این پردازش به شکل ترتیبی (6*10=60 ثانیه - هر عمل «جمع با 10» یک ثانیه طول می کشد) است.

سفارشی کردن Filter
تا اینجا Filter برنامه تنها قادر است ورودی را با 10 جمع کند. اما چگونه می توانیم عمل function را در Filter سفارشی و به عبارت دیگر مجرد کنیم؟ کافی است از امکانات برنامه نویسی شی گرا استفاده کنیم. یکی از اصول برنامه نویسی شی گرا اصل Polymorephism یا چندریختی است. به طور خیلی خلاصه و ساده طبق این اصل یک رفتار در کلاس می تواند توسط زیر کلاسهای آن و بنابر ویژگی های آن زیرکلاس پیاده سازی شده و در نتیجه یک نام مشترک، چند ریختِ پیاده سازی متفاوت دارد! مثلا اگر کلاس Animal دارای رفتار speak باشد، این رفتار در زیرکلاس Dog به شکل bark-bark و در زیرکلاس Duck به شکل quak-quak پیاده سازی خواهد شد (و مورد انسان پیاده سازی speak خیلی پیچیده تر از اینهاست :لبخندساده: ). اصطلاحاً می گوییم speak در زیرکلاس ها Polymorephic شده است.
برای استفاده از این ویژگی در مورد Filter راه حل های مختلفی وجود دارد. ساده ترین حالت این است که کلاس Filter را به یک کلاس abstract تبدیل کرده و متد function را در آن مجرد کنیم:


import java.io.InputStream;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.util.Scanner;

public abstract class Filter extends Thread {

InputStream input;
OutputStream output;

public void setInputStream(InputStream input) {
this.input = input;
}

public void setOutputStream(OutputStream output) {
this.output = output;
}

@Override
public void run() {
Scanner reader = new Scanner(input);
PrintWriter writer = new PrintWriter(output);
while (reader.hasNextInt()) {
int line = reader.nextInt();
int result = function(line);
writer.write(result + "\n");
writer.flush();
}
reader.close();
writer.close();
}

abstract int function(int x);
}


در این حالت هر کلاسی که از Filter ارث می برد باید function را پیاده سازی کند.
توجه کنید رفتار اصلی Filter یعنی خواندن-تغییر-نوشتن سر جای خود باقی است و فقط function مجرد شده است.
حال برنامه را با دو فیلتر دلخواه پیکربندی و اجرا می کنیم:


public class Main {

public static void main(String[] args) throws Exception {

Filter f1 = new Filter() {
@Override
int function(int x) {
return x + 13;
}
};

Filter f2 = new Filter() {
@Override
int function(int x) {
return x * x;
}
};

Pipe p = new Pipe();
f1.setInputStream(System.in);
f1.setOutputStream(p.getOutputStream());
f2.setInputStream(p.getInputStream());
f2.setOutputStream(System.out);

f1.start();
f2.start();
}
}


دو زیرکلاس برای Filter داریم که هر یک function را به دلخواه پیاده سازی می کند (یکی به اضافه 13 و یکی توان دو). توجه کنید که امکان نمونه سازی از کلاس abstract وجود ندارد. در اینجا برای سادگی دو کلاس را در همان لحظه نمونه سازی تعریف کرده ایم. به این کار Anonymous Implementation گفته می شود. در Anonymous Implementation به جای اینکه در یک کلاس جداگانه، کلاس مجرد (یا واسط) مورد نظر را پیاده سازی کنیم، به محض نمونه سازی از آن متدهای مورد نیاز را override می کنیم.
با اجرای برنامه خروجی مورد انتظار تولید می شود:



workspace/PipesAndFilters/bin# java Main
7
400


می توانیم برای حالت های خاص کلاس های فیلتر پیش فرض تولید کنیم. مثلا:


public class AddFilter extends Filter {

int y = 0;

public AddFilter(int y) {
this.y = y;
}

@Override
int function(int x) {
return x + y;
}
}

و

public class PowerFilter extends Filter {

int power = 1;

public PowerFilter(int power) {
this.power = power;
}

@Override
int function(int x) {
return (int) Math.pow(x, power);
}
}


که در این صورت پیکربندی ساده تری برای مثال قبل خواهیم داشت:


public class Main {

public static void main(String[] args) throws Exception {

Filter f1 = new AddFilter(13);
Filter f2 = new PowerFilter(2);
Pipe p = new Pipe();

f1.setInputStream(System.in);
f1.setOutputStream(p.getOutputStream());
f2.setInputStream(p.getInputStream());
f2.setOutputStream(System.out);

f1.start();
f2.start();
}
}


کارهای آینده
این برنامه فقط یک ایده خام است و در عین حال قابلیت گسترش زیادی دارد. چندتایی که به ذهنم می رسد اینهاست:



اضافه کردن Pipe از نوع T-Junction به خط لوله : اتصال T شکلی که یک ورودی و دو خروجی (یا برعکس) دارند. (مطالعه بیشتر در مورد Stream ها در جاوا)
امکان تعریف یک فیلتر با دیگر فیلترها : یعنی فلیتری داریم که خودش از چند فیلتر تشکیل شده است. (مطالعه بیشتر در مورد ارث بری در جاوا) (راهنمایی: کار اصلی ای که چنین فیلتری باید انجام دهد این است که Pipe های بین فیلترهای خود را ایجاد کند و متد start تمام فیلترها را فراخوانی کند و... الگوی طراحی Composite چیزی است که باید اینجا مورد استفاده قرار بگیرد http://en.wikipedia.org/wiki/Composite_pattern)
خواندن پیکربندی سیستم از فایل xml : یک فایل xml وجود دارد که در آن توابع مورد نیاز آورده شده است. کار Main تنها این است که فایل xml را بخواند و با توجه به توابع در نظر گرفته شده در آن خط لوله مقتضی را بسازد. (چگونه با فایل xml ارتباط برقرار کنیم و چگونه رفتار برنامه در حین اجرا تغییر بدهیم)
نمایش گرافیکی وضعیت خط لوله : این نمایش می تواند شامل وضعیت thread های فیلترها نیز باشد. کسانی که می خواهند در این زمینه بیشتر کار کنند نگاه کنند به http://en.wikipedia.org/wiki/Java_Management_Extensions و http://docs.oracle.com/javase/tutorial/jmx/ و http://docs.oracle.com/javase/6/docs/technotes/guides/management/jconsole.html . (مطالعه در مورد مانیتورینگ و کنترل برنامه. JMX چیز جالبی است!)
تعمیم نوع ورودی به دیگر انواع داده مثلا String (مثلا پیاده سازی برنامه های استاندارد یونیکسی مثل grep و sed و cut و ... این کار یه کم حوصله بیشتر می خواهد!)
تعمیم عملیات انجام شده در هر فیلتر به عملیات پیچیده تر: مثلا یک فیلتر می تواند واحد رمزگذاری داده باشد، یک فیلتر واحد ذخیره سازی در دیتابیس و ... (فکر کنیم به یک برنامه واقعا بزرگ بر اساس الگوی Pipes and Filters).