ورود

View Full Version : استفاده از کلاس فوق العاده کارآمد ForkJoinTask برای برنامه نویسی وظیفه ای در جاوا 7 به بعد



farhad_shiri_ex
جمعه 17 فروردین 1397, 16:09 عصر
private static AtomicLong atomicLong = new AtomicLong(0);

private static AtomicBoolean isTaskLastRun = new AtomicBoolean(false) ;
/**
* Asynchronous process by ForkJoin FrameWork in JDK1.7
* @param <T>
* @param <E>
*/
private static class RecursiveConcurrent<T extends Function ,E ,V > extends RecursiveTask<E> {

private static final long serialVersionUID = 5232443952276485270L;

private final int seqThreshHold = NUMBER_OF_CORES > 1
? NUMBER_OF_CORES * 500 > 10000
? 2000 : NUMBER_OF_CORES * 1000 : 2000 ;// threshold level must between 100 / 10000 in fork join parallel frameWork

private static final int MAX_CAP = 0x7fff; // 32767 max #workers - 1

private ForkJoinPool fjp = new ForkJoinPool(Math.min(MAX_CAP, NUMBER_OF_CORES));

private E[] data;

private List<V> collection;

private T func ;

private int start , end ;

private Observer<AtomicLong> observed;

RecursiveConcurrent(E[] data, T action, int start, int end, Observer<AtomicLong> observed) {
this.data = data;
this.func = action;
this.start = start;
this.end = end;
this.observed = observed;
}

RecursiveConcurrent(List<V> data, T action, int start, int end, Observer<AtomicLong> observed) {
this.collection = data;
this.func = action;
this.start = start;
this.end = end;
this.observed = observed;
}

@SuppressWarnings("unchecked")
@Override
protected E compute() {

//double sum =0;
if (this.collection == null && this.data != null && this.data.length > 0) {
if ((end - start) < seqThreshHold) {
for (int i = start; i < end; i++) {
data[i] = (E) func.apply(data[i]);
atomicLong.incrementAndGet();
}
observed.update(atomicLong);
} else {
int middle = (start + end) / 2 ; // Divide and Conquer Strategy in ForkJoin FrameWork
//----Synchronized
//invokeAll(new RecursiveConcurrent<>(data,func,start,middle),
// new RecursiveConcurrent<>(data,func,middle,end));

//----ASynchronized
RecursiveConcurrent t1 = new RecursiveConcurrent<Function<Double,Double>,Object,Object>(data,func,start,middle, observed);
RecursiveConcurrent t2 = new RecursiveConcurrent<Function<Double,Double>,Object,Object>(data,func,middle,end, observed);
t1.fork();
t2.fork();

//sum = (double)t1.join() + (double)t2.join();

}
}

return null;

}

private void execute(RecursiveConcurrent recurSiveCon) {
//RecursiveConcurrent recurSiveCon = new RecursiveConcurrent(data,func,start,end,observed);
//fjp.invoke(recurSiveCon);//Synchronized
fjp.execute(recurSiveCon);//ASynchronized
}

private boolean cancel() {
if(!fjp.isShutdown())
fjp.shutdown();

return fjp.isShutdown();
}
}

public static class ActionTask {

private static RecursiveConcurrent<Function<Double,Double>,Double,Object> rec;

private Observer<AtomicLong> observed;

private Function<Double,Double> myFunction = Math::sqrt ;

private int mainForIndex ;

private static Double[] data;

public ActionTask(Observer<AtomicLong> observed, int mainForIndex) {
this.observed = observed;
this.mainForIndex = mainForIndex;
}

public static boolean cancel() {
if (rec.cancel()) {
atomicLong.set(0);
isTaskLastRun.set(false);
}
return rec.isDone();
}

public void execute() {
data = new Double[mainForIndex];
for (double i = 0; i < data.length; i++) {
data[(int) i] = i;
}
if (!isTaskLastRun.get()) {
isTaskLastRun.set(true);
rec = new RecursiveConcurrent<Function<Double,Double>,Double,Object>(data, myFunction, 0, data.length, observed);
rec.execute(rec);
}
}

}

public void registerTask(View v) {
HandleThreadService.Observer<AtomicLong> observer = this::upDateUiThread;
new HandleThreadService.ActionTask(observer,5000000).e xecute();
}

private void upDateUiThread(AtomicLong atomicLong) {
runOnUiThread(() -> {
long aa = atomicLong.get();
if(aa > 0)
tvDate.setText("counter : "+aa);
if (aa >= 4999999) {
if(HandleThreadService.ActionTask.cancel())
tvDate.setText("Compelet.!");
}
});
}

@FunctionalInterface
public interface Observer<T> {
void update(T value);
}

farhad_shiri_ex
جمعه 17 فروردین 1397, 16:14 عصر
این آبجکتها رو خودم نوشتم اگر نیاز به اصلاح دارند خوشحال میشم راهنمایی کنید
در هر بخشی هم که کدها نامفهوم هستن بفرمایید توضیح بدم
کلاس HandleThreadService هم کلاس والد هست.

farhad_shiri_ex
یک شنبه 19 فروردین 1397, 10:01 صبح
جالبه 50 نفر این مطلب رو بازدید کردن دریغ از یه نظر ؟!!!
دوستان نخواستیم تشکر کنید لطفا در بحث شرکت کنید ؟
آیا کسی از این روش تا حالا کسی استفاده کرده برای برنامه نویسی وظیفه ای؟
نظرتون راجه به این مباجث چی هستش؟
آیا کسی راجع به ساختمان نخ ها در جاوا تجربه ای داره؟ مثل apartment in COM

vahid-p
یک شنبه 19 فروردین 1397, 16:12 عصر
بهتر بود کد رو توضیح میدادید. نه خط به خط ولی اول هدف از این کد رو میگفتید بعد اینکه برای رسیدن به اون از چه چیزهایی استفاده کردید و ForkJoinTask چه کمکی به ما میکنه. در غیر این صورت ممکنه جز کسی که با مشکلی قبلا رو برو شده و از طریق جستجو این مطلب رو دیده، برای فردی که مطلب رو میبینه مفید واقع نشه چون خوندن کد دیگری بدون پیش زمینه مناسب، کار ساده ای نیست و گاهی طاقت فرساست.

farhad_shiri_ex
یک شنبه 19 فروردین 1397, 19:31 عصر
واقعیتش چون دیدم منطق تقریبا ساده ای داره توضیح ندادم .
ولی حرف شما هم منطقی
چون خوندن کد دیگری بدون پیش زمینه مناسب، کار ساده ای نیست و گاهی طاقت فرساست. و اینکه بازهم دیدم کلا از این 50 60 نفری که خوندن پست رو حتی یکنفر هم نبود راجع به منطق اش سوالی داشته باشه و نحوه کارکردش بیشتر تصور کردم خیلی مفید نبوده.
چون خودم خیلی رو نوشتن این طور کلاسها تو سی شارپ ویا تو جاوا روی مفاهیم نخ و موازی سازی و هم زمانی کار کردم گفتم شاید مطرح کردنش دردی از کسی دوا کنه.! و اینکه بهینه تر هم بتونیم بنویسیم.
ابتدا یک آبجکت از روی کلاس Fork join می سازم که در اینجا من از کلاس خواستم که حوضچه ای تعریف کنه با حداکثر coreهای پردازشگر یعنی با این کار از حداکثر بازدهی سیستم استفاده میکنم پس در سیستم های مختلف زمانهای مختلفی خواهیم داشت البته امروزه دیگه بعید پردازشگرهایی داشته باشیم که تک هسته ای باشند ولی به هرحال باز تو این سیستمها هم سیستم عامل این حالت تا حدودی پیاده سازی میکنه به هرحال تعداد هسته ها روهم 4 بایت در نظر گرفتم.
private static final int MAX_CAP = 0x7fff; // 32767 max #workers - 1

private ForkJoinPool fjp = new ForkJoinPool(Math.min(MAX_CAP, NUMBER_OF_CORES));

کلاس فوق از تکنیک تقسیم وظایف استفاده میکنه طبق توضیحات خود کتابخانه Divide and Conquer Strategy بدین صورت هست که اول یه threshold مقدار آستانه تعریف میکنیم که البته من با توجه به تعدادNUMBER_OF_CORES = Runtime.getRuntime().availableProcessors(); core های cpu این مقدار را تعریف میکنم وبعد اینکه چون گفته این مقدار آستانه بین 100 تا 10،000 میتونه باشه و بعد هر بخشی از آستانه را به یک Task مجزا تقسیم میکنیم int middle = (start + end) / 2 ; // Divide and Conquer Strategy in ForkJoin FrameWork
//----Synchronized
//invokeAll(new RecursiveConcurrent<>(data,func,start,middle),
// new RecursiveConcurrent<>(data,func,middle,end));

//----ASynchronized
RecursiveConcurrent t1 = new RecursiveConcurrent<Function<Double,Double>,Object ,Object>(data,func,start,middle, observed);
RecursiveConcurrent t2 = new RecursiveConcurrent<Function<Double,Double>,Object ,Object>(data,func,middle,end, observed);
t1.fork();
t2.fork();

البته در این کلاس ما فقط منطق موازی سازی را تعیین میکنیم و مدیریت حوضچه های نخ Thread Pool Execute و نخ ها و ناحیه های اشتراکی تماما به عهده فریم ورک Fork Join Task هستش و دراین کلاس چون به صورت جنریک هم تعریف کردم می توان روی یک آرایه یک عملیات که با اینترفیس Function پیاده سازی شده تعریف کرد که من جذر ریشه دوم تمام اعضا رو در خودشون مینویسم این عملیات روهم میشه هم همزمان ASynchronized انجام داد و هم غیر همزمان
Synchronized یعنی توقف تا اتمام پروسه و یا همزمان با دیگر پروسه ها و برای اینکه پیشرفت پروسه روهم مشاهده کنم از یک مشاهده گر Observer اینترفیس استفاده کردم
HandleThreadService.Observer<AtomicLong> observer = this::upDateUiThread;
و داخل نخ اصلی مقدار رو نمایش میدم والبته برای اطلاعات اشتراکی که همون مقدار آرایه های انجام شده است از AtomicLong استفاده کردم تا نخواهم خیلی هم قفل بنویسم.

واینکه کلا این فریم ورک زمانی که محاسبات زمان گیر دارین خیلی عالی مثلا فکر کنین بانکی دارین که اطلاعات چند میلیون قطعه داخلش هست و میخواهید بهای تمام شده تک تک این قطعات رو محاسبه کنید شک نکنید اگر به صورت موازی نخواهید اینکار رو انجام بدید حتی اگر روی یک سرور قدرتمند زنون 16 هسته ای با آدرس دهی 64 بیت هم که اینکار بکنید بازهم زمان الگوریتمی در بهترین حالت شاید به 3 الی 4 دقیقه برسه البته بسته به الگوریتم محاسبه قیمت اونهایی که حسابداری صنعتی کار کردن میدونن بهای تمام شده چقدر فرمول داره ولی با موازی سازی کاربر اصلا متوجه نمیشه که در بک گراند چه خبر هستش و تازه زیباترهم هست اگر همزمان رابط کاربر هم حالا هر کنترلی که هست به روز بشه باز هم میگم در استفاد از این فریم ورک حرف اول رو منطق الگوریتم و منطق موازی سازی میزنه .
امیدوارم خوب توضیح داده باشم

vahid-p
دوشنبه 20 فروردین 1397, 22:51 عصر
خیلی خوب بود. تشکر

farhad_shiri_ex
سه شنبه 01 خرداد 1397, 17:58 عصر
از اونجایی که ژنریک کلاس Stream که در جاوا 8 معرفی شده توانایی انجام پردازش ها را به صورت parallel داره و ار وانجاییکه از همین فریم وورک Fork Join استفاده کرده برای پیاده سازی موازی سازی گفتم بهتره یکبار دیگه توجه دوستان را جلب کنم به این تاپیک و در تصویر زیر یک نمای گرافیکی از تقسیم Task ها را در زمان اجرای این فریم وورک مشاهده نمایید برای درک بهتر موضوع...
148238