Çok Büyük Sayıların Faktöryel Hesaplamasında İşbirlikçi Dağıtık Mimari Modeli
Merhaba arkadaşlar;
Bu yazıda Kocaeli Üniversitesinde sürdürmekte olduğum Yüksek Lisans eğitiminde almakta olduğum Distributed System dersine dönük ödev çalışmasını paylaşmak istedim. Keyifli okumalar..
Giriş
Faktöryel en basit haliyle, bir N
tamsayının 1 → N
boyunca tüm tamsayı değerlikleriyle çarpılması sonucu ortaya çıkarılan sonuca denmektedir.
Dağıtık olarak işletilecek iş birimlerinde faktöryel işlemi kullanılmasının sebebi ise, uygulanmasının basit oluşu ve N
sayısı büyüdükçe CPU meşgul etme süresinin yüksek olmasından kaynaklanmaktadır.
Hazırlanan Dağıtık Model
Uygulamanın dağıtık modeli içerisinde Master
olarak adlandırılan WebSocket sunucusu ve Node
olarak isimlendirilen WebSocket istemcileri bulunmaktadır. Çift yönlü haberleşme imkanı sağlamasından ötürü WebSocket protokolü haberleşmede tercih edilmiştir. Fakat model haberleşme protokolünden bağımsızdır.
Master ve Node(Worker) birimleri arasındaki işgüdüm aşağıdaki adımlarda anlatıldığı gibidir;
Faktöryeli hesaplanacak büyük sayının liste parçaları halinde Node ‘lara iletilmesi
600000 sayısının faktöryelinin hesaplanacağını varsayalım.
1 |
2 |
3 |
4 |
5 |
6 |
7 |
.. |
.. |
600000 |
Öncelikli işlem 600000 sayısına kadar tüm tamsayılar için bir liste hazırlanır. Hazırlanan liste Node sayısı kadar parçalara bölünür ve her Node için listenin bir alt parçası iletilir.
Faktöryel alma işlemi için görev alacak birim bir Lambda ifadesi olarak düzenlenmiştir. Master tarafında hazırlanan Lambda fonksiyonu, Java Serialization API ile Node birimlerine iletilmektedir.
WebSocket protokolü üzerinden binary veri olarak transfer edilen Lambda fonksiyonu, her bir Node üzerinde de-serialize edilir ve mevcut Node üzerinde koşturulur.
Her bir Node üzerinde koşan Lambda fonksiyonu, ara faktöryel sonuclarını hesapladıktan sonra Master birimine iletir. Master birimi ise Node sayısı kadar aldığı ara faktöryel sonuçlarını tekrar hesaplayarak sonuç faktöryel değeri üretilir.
Uygulama Bileşenleri
Uygulama içerisinde 6 adet sınıf yer almaktadır. Bunların görev veya kullanım alanı aşağıdaki gibidir.
- FaktoryelServerSoket
- Master olarak davranır. Büyük bir listeyi Node’lar arasında pay eder. Parçalı listelerin nasıl işleneceğini belirler. Node’lardan dönen ara sonuçları biriktirir. İş bitiminde ise son hesapları yaparak Node’lara sonuçları bildirir.
@ServerEndpoint(
value = "/soket",
encoders = ObjectSerializer.class,
decoders = ObjectSerializer.class)
public class FaktoryelServerSoket {
private static Long workStartTimeMillis = null;
private static List<BigInteger> subFactoriels = Collections.synchronizedList(new ArrayList<>());
private static int workDoneCount = 0;
@OnMessage
public void onmessage(Map message, Session session) throws IOException {
// if there is a start key, start execution
if (message.containsKey("start")) {
dispatchWorks(session);
}
// if there is a chunk key, collect chunk results
if (message.containsKey("chunk")) {
collectChunks(message, session);
}
}
/**
* Liste parçalara ayrılır ve tüm istemcilere bir payı gönderilir,
* Liste parçaları yanında serileştirme yöntemiyle bir Lambda fonksiyonu da iletilir,
* Bu sayede sunucu tarafında yazılan iş mantığı, client tarafında koşturulmuş olur.
* @param session
* @throws IOException
*/
private void dispatchWorks(Session session) throws IOException {
if (workStartTimeMillis == null) {
workStartTimeMillis = System.currentTimeMillis();
}
// 1,...,600000 arası Liste hazırlanıyor
List<Integer> numberList = IntStream.rangeClosed(1, 600_000).boxed().collect(Collectors.toList());
Set<Session> allSessions = session.getOpenSessions();
// Liste parçalara ayrılıyor
List<List<Integer>> numberChunkedList = Lists.partition(numberList, (numberList.size() / allSessions.size()));
// İşlemin bittiğini anlamak için gerekli sonuç sayısı
workDoneCount = ((numberChunkedList.size() % allSessions.size()) == 0) ? allSessions.size(): allSessions.size() + 1;
Iterator<Session> allSessionsIterator = allSessions.iterator();
// Her parçalı sayı listesi bir lambda fonksiyonu içinde kullanıcılara pay ediliyor
for (List<Integer> numberChunks : numberChunkedList) {
ArrayList<Integer> chunk = new ArrayList<>(numberChunks);
// remoteLambda fonksiyonu Worker'da tanımlanır, Node'larda koşturulur
RemoteLambda<Session> remoteLambda = (serverSession) -> {
BigInteger subFactoriel = chunk
.stream()
.map(BigInteger::valueOf)
.reduce(BigInteger.ONE, (first, second) -> first.multiply(second));
Map data = new HashMap();
data.put("chunk", subFactoriel);
serverSession.getAsyncRemote().sendObject(data);
};
Map data = new HashMap();
data.put("chunk", remoteLambda);
// Kullanıcılara tek tek iletiliyor.
if (allSessionsIterator.hasNext()) {
Session next = allSessionsIterator.next();
next.getAsyncRemote().sendObject(data);
} else {
session.getAsyncRemote().sendObject(data);
}
}
}
/**
* Her bir alt faktöryel sonucu bu metodda biriktirilir,
* Tüm alt parçalar hesaplandığında ise,
* Alt sonuçların da kendi içinde çarpımı sağlanır,
* Ardından toplam koşma süresi gibi bilgiler tüm istemcilere iletilir.
* @param message
* @param session
*/
private void collectChunks(Map message, Session session) {
subFactoriels.add((BigInteger) message.get("chunk"));
if (subFactoriels.size() == workDoneCount) {
BigInteger factorielResult = subFactoriels.parallelStream()
.reduce(BigInteger.ONE, (first, second) -> first.multiply(second));
long workerEndTimeMillis = System.currentTimeMillis();
long workerCompleteTime = workerEndTimeMillis - workStartTimeMillis;
Set<Session> allSessions = session.getOpenSessions();
Map map = new HashMap();
map.put("totalWorker", allSessions.size());
map.put("completeTime", workerCompleteTime);
map.put("result", factorielResult.toString().substring(0, 10).concat("..."));
// İşlem tamam, sonuçlar kullanıcılara iletiliyor..
for (Session e : allSessions) {
e.getAsyncRemote().sendObject(map);
}
}
}
}
- RemoteLambda
- Master bileşeninin Node bileşenlerine serileştirerek ilettiği Lambda fonksiyonuna temel oluşturur.RemoteLambda bir fonksiyonel arayüz olup, Consumer<T> ve Serializable arayüzleri türündendir.
FaktoryelServerSoket
RemoteLambda
türünden lambda deyimini Java dilinin serileştirme yöntemiyle Node’ lara iletmektedir.
@FunctionalInterface
public interface RemoteLambda<T> extends Consumer<T>, Serializable {
// RemoteLambda Consumer#accept metodunu miras alır.
}
- ObjectSerializer
- Master ve Node arasında iletilen tüm veriler binary biçimde iletilmektedir.
ObjectSerializer
sınıfı Map türünden nesneleri ByteBuffer türüne serileştirirken, aynı zamanda bunun tersini de yapmaktadır. Master ve Node’lar arasındaki WebSocket haberleşmesi ByteBuffer nesneleri ile binary olarak sağlanmaktadır.
public class ObjectSerializer implements Encoder.Binary<Map>, Decoder.Binary<Map> {
@Override
public Map decode(ByteBuffer bytes) throws DecodeException {
try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes.array());
ObjectInputStream inputStream = new ObjectInputStream(byteArrayInputStream);) {
return (Map) inputStream.readObject();
} catch (Exception e) {
e.printStackTrace();
}
return new HashMap();
}
@Override
public ByteBuffer encode(Map object) throws EncodeException {
try (ByteArrayOutputStream bao = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bao);) {
oos.writeObject(object);
return ByteBuffer.wrap(bao.toByteArray());
} catch (IOException e) {
e.printStackTrace();
}
return ByteBuffer.wrap(new byte[0]);
}
...
}
- FaktoryelWorkerSoket
- Node olarak davranır. Görevi ise Master bileşeninin ilettiği Lambda fonksiyonunu kendi içinde koşturmaktır.
@ClientEndpoint(
encoders = ObjectSerializer.class,
decoders = ObjectSerializer.class
)
public class FaktoryelWorkerSoket {
@OnMessage
public void onmessage(Map message , Session session) throws Exception {
// Eğer Lambda geldiyse
if(message.containsKey("chunk")){
Consumer<Session> task = (Consumer<Session>) message.get("chunk");
// accept metodu lambda'yı o anki istemcide koşturur.
// Ara sonucu ise parametre ile belirilen Session'a (Master) iletir.
task.accept(session);
}
// Eğer sonuç geldiyse
if(message.containsKey("result")){
System.out.println(message);
}
}
}
- ServerApp
- Master’i başlatır.
public class ServerApp {
public static void main(String[] args) throws Exception {
Server server = new Server("0.0.0.0", 8080, "/", null, FaktoryelServerSoket.class);
server.start();
System.out.println("Master Started");
System.in.read();
}
}
- ClientApp
- Her bir çalışmada yeni bir Node’u sisteme ekler.
public class ClientApp {
public static void main(String[] args) throws Exception {
ClientApp.connect();
System.in.read();
}
public static void connect() throws Exception {
WebSocketContainer container = ContainerProvider.getWebSocketContainer();
URI uri = new URI("ws://localhost:8080/soket");
Session client = container.connectToServer(FaktoryelWorkerSoket.class, uri);
Map map = new HashMap<>();
map.put("start", null);
client.getAsyncRemote().sendObject(map);
}
}
Yukarıda açıklanan modeli şimdi iki senaryo ile test edelim. Test için Amazon EC2 Cloud servisinde çalışan aynı sistem özellikli 6 makine kullanılmıştır. 1 makine Master için, diğer 5 makine ise Node olarak kullanışmıştır. Testlerde kullanılan sanal makinelerin özellikleri aşağıdaki gibidir.
OS: Ubuntu Server 14.04 LTS x64
CPU: Intel Xeon E5-2670 2.60 GHz (vCPU 4)
Memory: 15GB
Java version: 1.8.0-b132
Senaryo 1
Node sayısı 1-5 arasında artırılarak 600000 sayısının faktöryel hesaplamasının ne kadar sürdüğü mevcut sistem üzerinde belirlenir.
Bu senaryoda RemoteLambda nesnesi aşağıdaki gibidir.
RemoteLambda<Session> remoteLambda = (serverSession) -> {
BigInteger subFactoriel = chunk
.stream() // Dikkat (1)
.map(BigInteger::valueOf)
.reduce(BigInteger.ONE, (first, second) -> first.multiply(second));
Map data = new HashMap();
data.put("chunk", subFactoriel);
serverSession.getAsyncRemote().sendObject(data);
};
- Ardışık Stream nesnesi
Node sayısı (Worker) artırılarak RemoteLambda fonksiyonu ile hesaplama yapıldığında aşağıdaki sonuçlar elde edilmiştir.
{result=2234878177..., totalWorker=1, completeTime=181949} {result=2234878177..., totalWorker=2, completeTime=52688} {result=2234878177..., totalWorker=3, completeTime=30748} {result=2234878177..., totalWorker=4, completeTime=21870} {result=2234878177..., totalWorker=5, completeTime=15366}
Yukarıdaki çıktıların grafiksel gösterimi aşağıdaki gibidir.
Senaryo 2
Node sayısı 1-5 arasında artırılarak 600000 sayısının faktöryel hesaplamasının ne kadar sürdüğü mevcut sistem üzerinde belirlenir. Fakat bu senaryoda Java 8 Stream API paralel olarak yapılandırılmıştır. Böylece paralel stream nesnesinin performans katkısı ölçülmeye çalışılmıştır.
Bu senaryoda RemoteLambda nesnesi aşağıdaki gibidir.
RemoteLambda<Session> remoteLambda = (serverSession) -> {
BigInteger subFactoriel = chunk
.parallelStream() // Dikkat (1)
.map(BigInteger::valueOf)
.reduce(BigInteger.ONE, (first, second) -> first.multiply(second));
Map data = new HashMap();
data.put("chunk", subFactoriel);
serverSession.getAsyncRemote().sendObject(data);
};
- Paralel Stream nesnesi
{result=2234878177..., totalWorker=1, completeTime=12520} {result=2234878177..., totalWorker=2, completeTime=5044} {result=2234878177..., totalWorker=3, completeTime=4773} {result=2234878177..., totalWorker=4, completeTime=3540} {result=2234878177..., totalWorker=5, completeTime=4047}
Yukarıdaki çıktıların grafiksel gösterimi aşağıdaki gibidir.
Uygulamanın kaynak kodlarına https://github.com/kodcu/FaktoryelCoWorker bağlantısından erişebilirsiniz.
Tekrar görüşmek dileğiyle..