ورود

View Full Version : Concurrency Management



ahmad.mo74
سه شنبه 11 آذر 1393, 18:58 عصر
سلام،

من تو بخشی از پروژم یک search engine درست کردم که یک keyword میگیره بر اساس اون یه سری پارامترهای دیگه در نظر گرفته میشه و ... سرچ رو انجام میده و یک خروجی json هم برمیگردونه.

اما اگر چند کاربر به طور همزمان یک keyword رو سرچ کنن فرایند سرچ که خیلی هم هزینه داره برای همشون جداگانه انجام میشه، از طرفی ساختار دیتابیسم جوریه که هر keyword فقط یکبار سرچ بشه کافیه و بر اساس سرچ هایی که در آینده انجام میشه (یا قبلا شده) ممکنه به keyword های دیگه هم مرتبط بشه و یا جواب اونا رو کامل تر کنه (مثلا کلمه real madrid و ronaldo میتونن به هم مرتبط بشن و جواب های هر کدوم تکمیل کننده هم باشه)

بخشی از کد :


public String search(Map<String, Object> params) {
String result = /*search*/;
return result;
}


حالا برای جلوگیری از این مسئله 3 تا راه هست :

1- استفاده از synchronized : به دلایلی که میدونید در اینجا استفاده از synchronized کاملا مشکل رو حل نمیکنه، چون هم سرعت رو به شدت پایین میاره و هم اینکه به هر حال برای هر request یکبار فرایند سرچ انجام میشه اما جلوی اینکه یک keyword دوبار تو دیتابیس ذخیره بشه رو میگیره (اگر قبلا ذخیره شده باشه از همون استفاده میشه)

2- استفاده از ReentrantLock : اینجا یه قدم جلوتر میریم در مقایسه با استفاده از synchronized ، با این برتری که میشه فقط سرچ های مشابه رو sync کرد نه همه رو :


private static final Map<Object, ReentrantLock> CONCURRENT_SEARCHES = new ConcurrentHashMap<>();


public String search(Map<String, Object> params) {
Object key = params.get("keyword");
assert key != null;
ReentrantLock lock = CONCURRENT_SEARCHES.get(key);
if (lock == null) {
CONCURRENT_SEARCHES.put(key, (lock = new ReentrantLock()));
}
lock.lock();
try {
String result = /*search*/;
return result;
} finally {
lock.unlock();
}
}


3- استفاده از Shared Objects : تقریبا بهترین راه بود که به ذهنم رسید :


private static final Map<Object, Queue<Locker<Object, String>>> CONCURRENT_SEARCHES = new ConcurrentHashMap<>();


public String search(Map<String, Object> params) {
Object key = params.get("keyword");
assert key != null;
Queue<Locker<Object, String>> queue = CONCURRENT_SEARCHES.get(key);
if (queue != null) {
System.out.println("waiting");
Locker<Object, String> locker = new Locker<>(key);
queue.add(locker);
locker.lock();
String result = locker.getValue();
return result == null ? "[]" : result;
}
System.out.println("new search");
CONCURRENT_SEARCHES.put(key, (queue = new ConcurrentLinkedQueue<>()));
String result = /*search*/;
CONCURRENT_SEARCHES.remove(key);
Locker<Object, String> locker;
while ((locker = queue.poll()) != null) {
locker.setValue(result);
locker.unlock();
}
return result;
}



public final class Locker<K, V> {


private final Object o = new Object();
private final K key;
private V value;


public Locker(K key) {
this.key = key;
}


public K getKey() {
return key;
}


public V getValue() {
return value;
}


public void setValue(V value) {
this.value = value;
}


public void lock() {
synchronized (o) {
try {
o.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}


public void lock(long timeout) {
synchronized (o) {
try {
o.wait(timeout);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}


public void unlock() {
synchronized (o) {
o.notify();
}
}


}


این روش خیلی خوبه چون اگر 100 تا سرچ همزمان با یک keyword هم داشته باشیم فقط یکبار سرچ انجام میشه و همون جواب به همشون داده میشه، اما تست کردم و به دلایلی که نمیدونم این خروجی رو داد :


public class Test {


public static void main(String[] args) throws InterruptedException {
String id = "1";
ProcessSession session = ProcessManager.openSession(id);
ExecutorService service = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
service.submit(() -> session.search("keyword"));
}
}


}


خروجی :


new search
waiting
new search
new search
new search
waiting
new search
waiting
waiting
waiting


دوستان اگر مشکلشو میدونید راهنمایی کنید چون هر جوری فکر میکنم نباید این اتفاق بیفته! اگر هم راه بهتری رو بلدید پیشنهاد کنید، ممنون :)

پ.ن : به دلایلی که بالا توضیح دادم نمیتونم جواب سرچ رو کش کنم چون جواب ها ممکنه تغییر کنه به مرور زمان!

cups_of_java
سه شنبه 11 آذر 1393, 22:59 عصر
من کدهات رو کامل نخوندم فقط چند نکته رو یاد آوری میکنم:

۱) به طوری کلی استفاده از داد های اشتراکی (Shared State/Data) مخصوصن توی کد برنامتون روش مناسبی نیست و بعدن مشکلات زیادی رو در مدیریت همزمانی نوشتن روی اون اطلاعات و تغییرشون ایجاد میکنه برات

۲)‌استفاده از قفل ها و به طور کلی استفاده از مدل Locking توی همروندی رو سعی کن تا جایی که می تونی منسوخ شده بدونی (اگه هدفت آموزش و یادگرفتن نیست) و ازشون استفاده نکنی توی پروژه های تجاری و مخصوصن بزرگ. اولین مشکلی که برات پیش میاد درگیر شدن با یه پیچیدگی زیاده که کجا قفل بزنیم و چه قفلی باشه و کجا باز کنم... بعد که میری جلوتر توی همروندی بالا میبینی باز برات race condition پیش میاد... میای اونو درست کنی کد میشه کثافت کاری خالی! از اینجا هاس که ممکنه وقتی داری تو بار بالا برنامه رو اجرا میکنی ممکنه DeadLock پیش بیاد.... و اونوقته که دیگه دهنت س...سه
بگذریم... فقط خواستم بگم که ته داستان این بازی به کجا ها ممکنه برسه...

۳)‌ وابسته کردن همروندی پاسخگویی کاربران به هم کار درستی نیست. یعنی اینکه درخواست های دیگه به خاطر یه درخواستی که زود تر رسیده منتظر بمونن و بعدش اینکه join بشن به انتهای اون درخواست و بعد همه جواب رو برگردونن اصن کار قشنگی نیست.


راه حل هایی که اگه من جات بودم الان به ذهنم میرسه:

۱)‌ درخواست ها رو بگیری و از یک سرویس Singleton جستجو برای هر query استفاده کنی. اگه پاسخ اماده بود توسط سرویس که جوابو میدی کلاینت - اگه نبود همه درخواست ها مثلن بلاک می مونن تا سرویس جستجوی اون کووری جواب رو آماده کنه و برگردونه بهشون...

۲)‌استفاده از یک دیتابیس که این امکان رو فراهم کنه و خودم درگیر این چیزا نشم اصلن
دیتابیسی مثل CouchDB همین کاری که شما میخواین رو انجام میده براتون و این کارو با امکان Map/Reduce خودش انجام میده. نتایج یک جا پردازش و نگهداری میشه. کاربر در صورت نیازش می تونه درخواستش منتظر بمونه تا آخرین نتایج رو بگیره وقتی آماده شد. اگه هم که آماده باشه که بهش سریع بر میگردونه. (البته شما محدودیت هایی در نوع کووری ها خواهی داشت با map/reduce)

ahmad.mo74
چهارشنبه 12 آذر 1393, 11:06 صبح
سلام، ممنون

اتفاقا منم همینو میگم که اصلا کار قشنگی نیست که هر ریکوئست بخواد منتظر قبلی بمونه، تمام تلاشمم همینه که جلوی اینکارو بگیرم اما به شرطی که یک keyword دوبار تو دیتابیسم ذخیره نشه (امکان unique کردن keyword نیست). مثلا اگر کلمه ای برای بار اول سرچ بشه تو دیتابیس ذخیره میشه و برای دفعات بعد چک میشه که اگر وجود داشت از همون قبلی استفاده بشه، اما اگر اون کلمه رو همزمان چند نفر با هم سرچ کنن به همون تعداد هم تو دیتابیس ذخیره میشه و بی نظمی به وجود میاد تو دیتابیسم...

توی راه حل سوم همه چیز درست به نظر میاد، یعنی اگر یک کلمه به طور همزمان 2 بار یا بیشتر سرچ بشه کسی که زودتر از همه سرچ کرده جواب رو میگیره و بعد به بقیه هم میده، اینطوری هم مشکل بالا پیش نمیاد هم اینکه فقط یکبار سرچ انجام میشه و هم اینکه ریکوئست های بعدی هم زودتر به جوابشون میرسن، مثلا اگر یه سرچ قراره 10 ثانیه طول بکشه، نفر اول سرچ میکنه حالا این بین مثلا از ثانیه 5 ام چند نفر دیگه هم اضافه میشن و وقتی نفر اول جوابش رو گرفت به بقیه هم میده و اینطوری بقیه 5 ثانیه زود تر به جواب رسیدن...

ahmad.mo74
چهارشنبه 12 آذر 1393, 13:17 عصر
ممنون حل شد :


private static final Map<Object, List<Locker<Object, String>>> CONCURRENT_SEARCHES = new ConcurrentHashMap<>();




public String search(Map<String, Object> params) {
Object key = params.get("keyword");
assert key != null;
Locker<Object, String> locker = new Locker<>(key);
if (isInProgress(key, locker)) {
locker.lock();
String result = locker.getValue();
return result == null ? "[]" : result;
}
String result = /*search*/;
finished(key, result);
return result;
}


private static synchronized boolean isInProgress(Object key, Locker<Object, String> locker) {
List<Locker<Object, String>> list = CONCURRENT_SEARCHES.get(key);
if (list != null) {
list.add(locker);
return true;
}
CONCURRENT_SEARCHES.put(key, Collections.synchronizedList(new ArrayList<>()));
return false;
}

private static synchronized void finished(Object key, String result) {
Optional.of(CONCURRENT_SEARCHES)
.map(searches -> searches.remove(key))
.ifPresent(list -> {
list.stream().forEach(locker -> {
locker.setValue(result);
locker.unlock();
});
list.clear();
});
}