ورود

View Full Version : یه معماری خوب برای نوشتن و خوندن رو لیست‌های تعداد بالا



TheKlaus
پنج شنبه 11 تیر 1394, 01:05 صبح
سلام همگی

یه HashTable دارم که معمولا توش بین ۱۰۰۰۰۰ تا ۱۰۰۰۰۰۰ آیتم هست. هدف این لیست کش کردن آیتم ها برای پیدا کردن سریعشونه. می‌خوام نظرهاتون رو بدونم راجع به اینکه:
اضافه‌کردن و حذف‌کردن تو این لیست رو چطوری پیاده‌سازی کنم که سرعت جستجو تو لیست تا جایی‌که ممکنه دست نخوره و چندین thread بتونن همزمان به این لیست دسترسی داشته باشن.

اگه فکر می‌کنین که HashTable برای این کار مناسب نیست پیشنهاد خودتون رو بدین. هم ساختمان‌داده مناسب رو معرفی کنین هم الگوریتمی که با اون ساختمان‌داده کار کنه.

ahmad.mo74
پنج شنبه 11 تیر 1394, 11:58 صبح
سلام.

HashTable خیلی برای اینکار مناسب نیست، چون synchronized هست و با اینکه thread-safe هست اما performance خوبی برای کار شما نداره.

به جاش میشه از ConcurrentHashMap استفاده کرد که فوق العاده سریع و همچنین thread-safe هست. بای دیفالت اطلاعات رو تو 16 بخش جدا نگه داری میکنه و اصلا از lock و synchronization خبری نیست.

اگر هدفتون کش کردن هست و مثلا میخواید آیتم ها بعد از مدتی حذف بشن، از com.google.common.cache.Cache استفاده کنید. Google Guava (http://mvnrepository.com/artifact/com.google.guava/guava/18.0)

مثال :


Cache<String, Object> cache = CacheBuilder.newBuilder()
.concurrencyLevel(16)
.expireAfterAccess(5, TimeUnit.MINUTES)
.<String, Object>build();


نکته دیگه اینکه با وجود اینکه این دو تا ساختمان داده thread-safe هستن، اما باز هم باید در یه سری موارد حواستون رو جمع کنید.

مثال :


final Map<String, List<String>> cache = CacheBuilder.newBuilder()
.concurrencyLevel(16)
.expireAfterAccess(5, TimeUnit.MINUTES)
.<String, List<String>>build().asMap();


void putMessages(final String key, final List<String> messages) {
if (key == null || messages == null || messages.isEmpty()) {
return;
}
final List<String> list = cache.get(key);
if (list == null) {
cache.put(key, messages);
} else {
list.addAll(messages);
}
}


List<String> removeMessages(final String key) {
return key == null ? null : cache.remove(key);
}


مثلا فرض کنیم 2 تا ترد میخوان روی cache کاری انجام بدن.
ترد اول متد putMessages رو با کلید "a" صدا میزنه و لیست رو از cache میگیره. (فرض میکنیم که از قبل لیستی با کلید "a" توی cache وجود داشته)
وقتی ترد اول به خط 15 میرسه، همزمان ترد دوم متد removeMessages رو با همون کلید "a" صدا میزنه و لیست پیام هارو از cache حذف میکنه و return میکنه.
ترد اول پیام های جدید رو به لست اضافه میکنه و از متد خارج میشه.

خب الان یه مشکل بزرگ داریم. ترد دوم لیست رو remove کرده و پیام های جدیدی که ترد اول به لیست اضافه کرده بوده از بین میره. درسته که رفرنسی از اون لیست در اختیار ترد دوم هست و مسیج های اضافه شده توی لیست وجود داره. اما به هر حال اتفاق خوبی نیست. مثلا فرض کنیم ترد دوم لیست پیام هارو remove کرده و داره روش iterate میکنه و همزمان از ترد اول داره مسیج های جدید اضافه میشه (list.addAll) و اینجا ممکنه ConcurrentModificationException رخ بده یا حتی ممکنه بعد از اینکه ترد دوم کارش رو انجام داد مسیج های جدید تو لیست اضافه بشه...

اگر بخوایم همچین کاری کنیم :


final Map<String, List<String>> cache = CacheBuilder.newBuilder()
.concurrencyLevel(16)
.expireAfterAccess(5, TimeUnit.MINUTES)
.<String, List<String>>build().asMap();


void putMessages(final String key, final List<String> messages) {
if (key == null || messages == null || messages.isEmpty()) {
return;
}
synchronized (cache) {
final List<String> list = cache.get(key);
if (list == null) {
cache.put(key, messages);
} else {
list.addAll(messages);
}
}
}


List<String> removeMessages(final String key) {
synchronized (cache) {
return key == null ? null : cache.remove(key);
}
}


مشکل حل میشه، اما به هدف اصلی ای که داشتیم نمیرسیم و الان دیگه فرقی نمیکنه که از HashMap استفاده کنیم یا ConcurrentHashMap ...

راه دیگه اینه که بیایم برای هر لیست یه lock در نظر بگیریم و به جای اینکه عملیات روی کل داده هامون رو sync کنیم، عملیات فقط روی اون لیست مورد نظر sync بشه.
اینطوری هم مشکل performance به وجود نمیاد و میتونیم همزمان از چنتا ترد به راحتی به Map مون دسترسی داشته باشیم و هم اینکه از صحت داده ها اطمینان داریم.


final Map<String, List<String>> cache = CacheBuilder.newBuilder()
.concurrencyLevel(16)
.expireAfterAccess(5, TimeUnit.MINUTES)
.<String, List<String>>build().asMap();


final SynchronizedTaskExecutor sync = new SynchronizedTaskExecutor();


void putMessages(final String key, final List<String> messages) {
if (key == null || messages == null || messages.isEmpty()) {
return;
}
sync.execute(key, () -> {
final List<String> list = cache.get(key);
if (list == null) {
cache.put(key, messages);
} else {
list.addAll(messages);
}
});
}


List<String> removeMessages(final String key) {
if (key == null) {
return null;
}
return sync.submit(key, () -> cache.remove(key));
}


و SynchronizedTaskExecutor.java :


import com.google.common.cache.CacheBuilder;


import java.io.Serializable;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;


/**
* @author avb
*/
public final class SynchronizedTaskExecutor {


private final Map<Serializable, ReentrantLock> locks;


public SynchronizedTaskExecutor() {
locks = CacheBuilder.newBuilder()
.expireAfterAccess(5, TimeUnit.MINUTES)
.concurrencyLevel(16)
.<Serializable, ReentrantLock>build().asMap();
}


public void execute(final Serializable key, final Runnable task) {
final ReentrantLock lock = lockFor(key);
lock.lock();
try {
task.run();
} finally {
lock.unlock();
}
}


public <R> R submit(final Serializable key, final Callable<R> task) {
final ReentrantLock lock = lockFor(key);
lock.lock();
try {
return task.call();
} catch (Error | RuntimeException err) {
throw err;
} catch (Exception ex) {
throw new RuntimeException(ex);
} finally {
lock.unlock();
}
}


private ReentrantLock lockFor(final Serializable key) {
ReentrantLock lock = locks.get(key);
if (lock == null) {
ReentrantLock r = locks.putIfAbsent(key, lock = new ReentrantLock());
if (r != null) {
lock = r;
}
}
return lock;
}


public void clear() {
locks.clear();
}


}


مثال از SynchronizedTaskExecutor :


public static void main(String[] args) throws InterruptedException {
SynchronizedTaskExecutor sync = new SynchronizedTaskExecutor();
ExecutorService service = Executors.newFixedThreadPool(100);
CountDownLatch latch = new CountDownLatch(100);
IntStream.range(0, 100).forEach(i -> service.execute(() -> {
System.out.println(sync.submit("a", () -> {
sleep(1000);
return i;
}));
latch.countDown();
}));
service.shutdown();
latch.await();
System.exit(0);
}


static void sleep(long timeout) {
try {
Thread.sleep(timeout);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

TheKlaus
جمعه 01 آبان 1394, 00:37 صبح
با تشکر از جواب کامل‌تون و وقتی که گذاشتین. برای کسایی که می‌خوان بیشتر راجع به این موضوع بدونن پیشنهاد می‌کنم این صفحه رو مطالعه کنن :)
http://www.ibm.com/developerworks/java/library/j-jtp07233/index.html