• Anasayfa
  • Eğitimler
    • JavaScript Eğitimi
    • Angular 2 Eğitimi
    • React.js Eğitimi
    • Java 8 Eğitimi
    • Java EE 7 Eğitimi
    • Spring Framework Eğitimi
    • Git Eğitimi
  • Online Eğitimler
    • Online React.js Eğitimi
    • Online Angular 2 Eğitimi
    • Online Spring Boot Eğitimi
  • Referanslar
  • Hakkında
  • İletişim
KodEdu
  • Anasayfa
  • Eğitimler
    • JavaScript Eğitimi
    • Angular 2 Eğitimi
    • React.js Eğitimi
    • Java 8 Eğitimi
    • Java EE 7 Eğitimi
    • Spring Framework Eğitimi
    • Git Eğitimi
  • Online Eğitimler
    • Online React.js Eğitimi
    • Online Angular 2 Eğitimi
    • Online Spring Boot Eğitimi
  • Referanslar
  • Hakkında
  • İletişim

Çok Büyük Sayıların Faktöryel Hesaplamasında İşbirlikçi Dağıtık Mimari Modeli

  • Posted by Kodedu
  • Categories backend, Eğitim, Genel, Uncategorized, Yazılar
  • Date 14 Kasım 2014

Merhaba arkadaşlar;

Bu yazıda Kocaeli Üniversitesinde sürdürmekte olduğum Yüksek Lisans eğitiminde almakta olduğum Distributed System dersine dönük ödev çalışmasını paylaşmak istedim. Keyifli okumalar..

Giriş

Faktöryel en basit haliyle, bir N tamsayının 1 → N boyunca tüm tamsayı değerlikleriyle çarpılması sonucu ortaya çıkarılan sonuca denmektedir.

Dağıtık olarak işletilecek iş birimlerinde faktöryel işlemi kullanılmasının sebebi ise, uygulanmasının basit oluşu ve N sayısı büyüdükçe CPU meşgul etme süresinin yüksek olmasından kaynaklanmaktadır.

Hazırlanan Dağıtık Model

Uygulamanın dağıtık modeli içerisinde Master olarak adlandırılan WebSocket sunucusu ve Node olarak isimlendirilen WebSocket istemcileri bulunmaktadır. Çift yönlü haberleşme imkanı sağlamasından ötürü WebSocket protokolü haberleşmede tercih edilmiştir. Fakat model haberleşme protokolünden bağımsızdır.

Master ve Node(Worker) birimleri arasındaki işgüdüm aşağıdaki adımlarda anlatıldığı gibidir;

Faktöryeli hesaplanacak büyük sayının liste parçaları halinde Node ‘lara iletilmesi

Example 1. Örneğin;

600000 sayısının faktöryelinin hesaplanacağını varsayalım.

1

2

3

4

5

6

7

..

..

600000

Öncelikli işlem 600000 sayısına kadar tüm tamsayılar için bir liste hazırlanır. Hazırlanan liste Node sayısı kadar parçalara bölünür ve her Node için listenin bir alt parçası iletilir.

Faktöryel alma işlemi için görev alacak birim bir Lambda ifadesi olarak düzenlenmiştir. Master tarafında hazırlanan Lambda fonksiyonu, Java Serialization API ile Node birimlerine iletilmektedir.

dispatch.png
Figure 1. Veri parçası ve iş mantığının transferi

WebSocket protokolü üzerinden binary veri olarak transfer edilen Lambda fonksiyonu, her bir Node üzerinde de-serialize edilir ve mevcut Node üzerinde koşturulur.

partial result.png
Figure 2. Ara sonuçların Master’a iletimi

Her bir Node üzerinde koşan Lambda fonksiyonu, ara faktöryel sonuclarını hesapladıktan sonra Master birimine iletir. Master birimi ise Node sayısı kadar aldığı ara faktöryel sonuçlarını tekrar hesaplayarak sonuç faktöryel değeri üretilir.

collector.png
Figure 3. Sonuç faktöryel değerinin üretimi

Uygulama Bileşenleri

Uygulama içerisinde 6 adet sınıf yer almaktadır. Bunların görev veya kullanım alanı aşağıdaki gibidir.

diagram.png
Figure 4. Uygulama bileşenleri
FaktoryelServerSoket
Master olarak davranır. Büyük bir listeyi Node’lar arasında pay eder. Parçalı listelerin nasıl işleneceğini belirler. Node’lardan dönen ara sonuçları biriktirir. İş bitiminde ise son hesapları yaparak Node’lara sonuçları bildirir.
@ServerEndpoint(
        value = "/soket",
        encoders = ObjectSerializer.class,
        decoders = ObjectSerializer.class)
public class FaktoryelServerSoket {

private static Long workStartTimeMillis = null;
private static List<BigInteger> subFactoriels = Collections.synchronizedList(new ArrayList<>());
private static int workDoneCount = 0;


@OnMessage
public void onmessage(Map message, Session session) throws IOException {

    // if there is a start key, start execution
    if (message.containsKey("start")) {
        dispatchWorks(session);
    }

    // if there is a chunk key, collect chunk results
    if (message.containsKey("chunk")) {
        collectChunks(message, session);
    }

}

/**
 * Liste parçalara ayrılır ve tüm istemcilere bir payı gönderilir,
 * Liste parçaları yanında serileştirme yöntemiyle bir Lambda fonksiyonu da iletilir,
 * Bu sayede sunucu tarafında yazılan iş mantığı, client tarafında koşturulmuş olur.
 * @param session
 * @throws IOException
 */
private void dispatchWorks(Session session) throws IOException {

    if (workStartTimeMillis == null) {
        workStartTimeMillis = System.currentTimeMillis();
    }

    // 1,...,600000 arası Liste hazırlanıyor
    List<Integer> numberList = IntStream.rangeClosed(1, 600_000).boxed().collect(Collectors.toList());

    Set<Session> allSessions = session.getOpenSessions();

    // Liste parçalara ayrılıyor
    List<List<Integer>> numberChunkedList = Lists.partition(numberList, (numberList.size() / allSessions.size()));

    // İşlemin bittiğini anlamak için gerekli sonuç sayısı
    workDoneCount = ((numberChunkedList.size() % allSessions.size()) == 0) ? allSessions.size(): allSessions.size() + 1;

    Iterator<Session> allSessionsIterator = allSessions.iterator();

    // Her parçalı sayı listesi bir lambda fonksiyonu içinde kullanıcılara pay ediliyor
    for (List<Integer> numberChunks : numberChunkedList) {

        ArrayList<Integer> chunk = new ArrayList<>(numberChunks);

        // remoteLambda fonksiyonu Worker'da tanımlanır, Node'larda koşturulur
        RemoteLambda<Session> remoteLambda =  (serverSession) -> {

            BigInteger subFactoriel = chunk
                    .stream()
                    .map(BigInteger::valueOf)
                    .reduce(BigInteger.ONE, (first, second) -> first.multiply(second));

            Map data = new HashMap();
            data.put("chunk", subFactoriel);

            serverSession.getAsyncRemote().sendObject(data);
        };


        Map data = new HashMap();
        data.put("chunk", remoteLambda);

        // Kullanıcılara tek tek iletiliyor.
        if (allSessionsIterator.hasNext()) {
            Session next = allSessionsIterator.next();
            next.getAsyncRemote().sendObject(data);
        } else {
            session.getAsyncRemote().sendObject(data);
        }

    }
}

/**
 * Her bir alt faktöryel sonucu bu metodda biriktirilir,
 * Tüm alt parçalar hesaplandığında ise,
 * Alt sonuçların da kendi içinde çarpımı sağlanır,
 * Ardından toplam koşma süresi gibi bilgiler tüm istemcilere iletilir.
 * @param message
 * @param session
 */
private void collectChunks(Map message, Session session) {

    subFactoriels.add((BigInteger) message.get("chunk"));

    if (subFactoriels.size() == workDoneCount) {

        BigInteger factorielResult = subFactoriels.parallelStream()
                .reduce(BigInteger.ONE, (first, second) -> first.multiply(second));

        long workerEndTimeMillis = System.currentTimeMillis();
        long workerCompleteTime = workerEndTimeMillis - workStartTimeMillis;


        Set<Session> allSessions = session.getOpenSessions();

        Map map = new HashMap();
        map.put("totalWorker", allSessions.size());
        map.put("completeTime", workerCompleteTime);
        map.put("result", factorielResult.toString().substring(0, 10).concat("..."));


        // İşlem tamam, sonuçlar kullanıcılara iletiliyor..
        for (Session e : allSessions) {
            e.getAsyncRemote().sendObject(map);
        }


    }
}

}
RemoteLambda
Master bileşeninin Node bileşenlerine serileştirerek ilettiği Lambda fonksiyonuna temel oluşturur.RemoteLambda bir fonksiyonel arayüz olup, Consumer<T> ve Serializable arayüzleri türündendir. FaktoryelServerSoket RemoteLambda türünden lambda deyimini Java dilinin serileştirme yöntemiyle Node’ lara iletmektedir.
@FunctionalInterface
public interface RemoteLambda<T> extends Consumer<T>, Serializable {
    // RemoteLambda Consumer#accept metodunu miras alır.
}
ObjectSerializer
Master ve Node arasında iletilen tüm veriler binary biçimde iletilmektedir. ObjectSerializer sınıfı Map türünden nesneleri ByteBuffer türüne serileştirirken, aynı zamanda bunun tersini de yapmaktadır. Master ve Node’lar arasındaki WebSocket haberleşmesi ByteBuffer nesneleri ile binary olarak sağlanmaktadır.
public class ObjectSerializer implements Encoder.Binary<Map>, Decoder.Binary<Map> {

@Override
public Map decode(ByteBuffer bytes) throws DecodeException {
    try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes.array());
         ObjectInputStream inputStream = new ObjectInputStream(byteArrayInputStream);) {

        return (Map) inputStream.readObject();

    } catch (Exception e) {
        e.printStackTrace();
    }

    return new HashMap();
}


@Override
public ByteBuffer encode(Map object) throws EncodeException {
    try (ByteArrayOutputStream bao = new ByteArrayOutputStream();
         ObjectOutputStream oos = new ObjectOutputStream(bao);) {
        oos.writeObject(object);
        return ByteBuffer.wrap(bao.toByteArray());
    } catch (IOException e) {
        e.printStackTrace();
    }

    return ByteBuffer.wrap(new byte[0]);
}

...
}
FaktoryelWorkerSoket
Node olarak davranır. Görevi ise Master bileşeninin ilettiği Lambda fonksiyonunu kendi içinde koşturmaktır.
@ClientEndpoint(
        encoders = ObjectSerializer.class,
        decoders = ObjectSerializer.class
)
public class FaktoryelWorkerSoket {

@OnMessage
public void onmessage(Map message , Session session) throws Exception {

    // Eğer Lambda geldiyse
    if(message.containsKey("chunk")){
        Consumer<Session> task = (Consumer<Session>) message.get("chunk");
        // accept metodu lambda'yı o anki istemcide koşturur.
        // Ara sonucu ise parametre ile belirilen Session'a (Master) iletir.
        task.accept(session);
    }

    // Eğer sonuç geldiyse
    if(message.containsKey("result")){
        System.out.println(message);
    }

}
}
ServerApp
Master’i başlatır.
public class ServerApp {

public static void main(String[] args) throws Exception {

    Server server = new Server("0.0.0.0", 8080, "/", null, FaktoryelServerSoket.class);
    server.start();

    System.out.println("Master Started");

    System.in.read();


}
}
ClientApp
Her bir çalışmada yeni bir Node’u sisteme ekler.
public class ClientApp {

public static void main(String[] args) throws Exception {

    ClientApp.connect();

    System.in.read();
}

public static void connect() throws Exception {

    WebSocketContainer container = ContainerProvider.getWebSocketContainer();
    URI uri = new URI("ws://localhost:8080/soket");
    Session client = container.connectToServer(FaktoryelWorkerSoket.class, uri);

    Map map = new HashMap<>();
    map.put("start", null);

    client.getAsyncRemote().sendObject(map);

}
}

Yukarıda açıklanan modeli şimdi iki senaryo ile test edelim. Test için Amazon EC2 Cloud servisinde çalışan aynı sistem özellikli 6 makine kullanılmıştır. 1 makine Master için, diğer 5 makine ise Node olarak kullanışmıştır. Testlerde kullanılan sanal makinelerin özellikleri aşağıdaki gibidir.

OS: Ubuntu Server 14.04 LTS x64

CPU: Intel Xeon E5-2670 2.60 GHz (vCPU 4)

Memory: 15GB

Java version: 1.8.0-b132

Senaryo 1

Node sayısı 1-5 arasında artırılarak 600000 sayısının faktöryel hesaplamasının ne kadar sürdüğü mevcut sistem üzerinde belirlenir.

Bu senaryoda RemoteLambda nesnesi aşağıdaki gibidir.

RemoteLambda<Session> remoteLambda =  (serverSession) -> {

BigInteger subFactoriel = chunk
        .stream() // Dikkat (1)
        .map(BigInteger::valueOf)
        .reduce(BigInteger.ONE, (first, second) -> first.multiply(second));

Map data = new HashMap();
data.put("chunk", subFactoriel);

serverSession.getAsyncRemote().sendObject(data);

};
  1. Ardışık Stream nesnesi

Node sayısı (Worker) artırılarak RemoteLambda fonksiyonu ile hesaplama yapıldığında aşağıdaki sonuçlar elde edilmiştir.

{result=2234878177..., totalWorker=1, completeTime=181949}
{result=2234878177..., totalWorker=2, completeTime=52688}
{result=2234878177..., totalWorker=3, completeTime=30748}
{result=2234878177..., totalWorker=4, completeTime=21870}
{result=2234878177..., totalWorker=5, completeTime=15366}

Yukarıdaki çıktıların grafiksel gösterimi aşağıdaki gibidir.

senaryo1.png

Senaryo 2

Node sayısı 1-5 arasında artırılarak 600000 sayısının faktöryel hesaplamasının ne kadar sürdüğü mevcut sistem üzerinde belirlenir. Fakat bu senaryoda Java 8 Stream API paralel olarak yapılandırılmıştır. Böylece paralel stream nesnesinin performans katkısı ölçülmeye çalışılmıştır.

Bu senaryoda RemoteLambda nesnesi aşağıdaki gibidir.

RemoteLambda<Session> remoteLambda =  (serverSession) -> {

BigInteger subFactoriel = chunk
        .parallelStream() // Dikkat (1)
        .map(BigInteger::valueOf)
        .reduce(BigInteger.ONE, (first, second) -> first.multiply(second));

Map data = new HashMap();
data.put("chunk", subFactoriel);

serverSession.getAsyncRemote().sendObject(data);

};
  1. Paralel Stream nesnesi
{result=2234878177..., totalWorker=1, completeTime=12520}
{result=2234878177..., totalWorker=2, completeTime=5044}
{result=2234878177..., totalWorker=3, completeTime=4773}
{result=2234878177..., totalWorker=4, completeTime=3540}
{result=2234878177..., totalWorker=5, completeTime=4047}

Yukarıdaki çıktıların grafiksel gösterimi aşağıdaki gibidir.

senaryo2.png

Uygulamanın kaynak kodlarına https://github.com/kodcu/FaktoryelCoWorker bağlantısından erişebilirsiniz.

Tekrar görüşmek dileğiyle..

Tag:backend, distributed systems, lambda, websocket

  • Share:
author avatar
Kodedu

Previous post

Hadoop Nasıl Kurulur? (Windows için)
14 Kasım 2014

Next post

Distributed Map-Reduce Model using Java 8 and Java EE 7 WebSocket
16 Kasım 2014

You may also like

api-logo
Swagger Nedir? Neden kullanılır?
10 Ekim, 2018
spring-cli-logo
Spring CLI ile Spring Boot Projeleri Hazırlamak
21 Ağustos, 2017
eureka_architecture
Spring Cloud Netflix ve Eureka Service Discovery
3 Temmuz, 2017

Leave A Reply Cevabı iptal et

E-posta hesabınız yayımlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir

E-posta listesine kayıt olun!






Gözde yazılar

JPQL sorgulama dili
13Eki2012
Java EE 7 – Concurrency Utilities
23Eki2013
Java 8 – Tekrarlı Notasyonlar Nasıl Kullanılır?
23Eyl2014
6. Ulusal Yazılım Mühendisliği Sempozyumu (UYMS)
24May2012

Son Yazılar

  • Java’da Record’lar 27 Ocak 2020
  • Swagger Nedir? Neden kullanılır? 10 Ekim 2018
  • Spring CLI ile Spring Boot Projeleri Hazırlamak 21 Ağustos 2017
  • Spring Cloud Netflix ve Eureka Service Discovery 3 Temmuz 2017
  • Online React.js Eğitimi ardından (15-25 Mayıs 2017) 31 Mayıs 2017

Son Yorumlar

  • Coupling ve Cohesion Kavramları Nedir? için Hilal
  • Naïve Bayes Sınıflandırma Algoritması için Rahman Usta
  • Naïve Bayes Sınıflandırma Algoritması için Mete
  • YAML Nedir? Neden YAML Kullanmalıyız? için kara
  • JWT (JSON Web Tokens) Nedir? Ne işe yarar? için Furkan

Get Java Software

Arşivler

Bizi takip edin

React.js Eğitimi Başlıyor
11-22 Eylül, 2017
Eğitmen
Rahman Usta
İletişim

merhaba@kodedu.com

  • Hakkında
  • Gizlilik Politikası
  • İletişim
  • Referanslar
Kodedu Bilişim Danışmanlık
Cemil Meriç mah. Çelebi sok.
No:16/3 Ümraniye/İSTANBUL
Tel: 0850 885 38 65
Alemdağ V.D.: 8960484815

KODEDU © Tüm hakları saklıdır.