PLINQ paralelno procesuiranje LINQ upita

PLINQ paralelno procesuiranje LINQ upita

Nakon upoznavanja sa TPL bibliotekom, posvetit ćemo se korištenju PLINQ-a, odnosno paralelnim LINQ upitima.


U prethodnim dijelovima upoznali smo se sa TPL - bibliotekom za paralelno programiranje pod .NET Frameworkom. U ovom dijelu govorit ćemo o korištenju PLINQ odnosno paralelnim LINQ upitima. Ukoliko želite saznati više informacije o LINQ pogledajte blog autora na ili reference koje su pobrojane u tim člancima. Paralelno izvršavanje LINQ upita implementirano je na dva od 4 osnovna izvora podataka i to:

- LINQ to Objects i
- LINQ to XML.

Promjene implementacije upita iz LINQ u PLINQ su minorne i ne zahtijevaju drastično prilagođavanje. Pri korištenju PLINQ upita nad objektima potrebno je implementiratiParallelEnumerable interfejs koji čini sastavni dio PLINQ, a koji je implementiran preko proširenih metoda. PLINQ ima nekoliko prednosti pri korištenju i to:

- PLINQ - iskorištava novu generaciju hardvera korištenjem LINQ upita
- PLINQ podržava sve .NET upite koji se koriste u LINQ
- Minimalna promjena u implementaciji u odnosu na klasičnu implementaciju LINQ

Nešto ranije vidjeli smo sastav System.Threading.dll u smislu korištenja TPL, dok je  PLINQ implementiran u osnovnom imenskom prostoru System.LINQ. Da bi se što prirodnije upoznali sa PLINQ započnimo sa jednim jednostavnim primjerom. Prvo ćemo primjer implementirati u sekvencijalnom odnosno LINQ obliku, a zatim tako konstruiran upit paralelizirati, odnosno implementirati u PLINQ-u.

Pretpostavimo da imamo polje prirodnih brojeva od 1-10.000.000, i želimo da nad tim poljem izvršimo nekoliko LINQ upita. Pretpostavimo i da želimo izračunati koliko prostih brojeva se nalazi u intervalu od 1 do 10.000.000.

Implementacija sa klasičnim LINQ upitom izgleda kao na:
 

static void Main(string[] args)
{
    //skup brojava od 1- 10.000.000
    var skupBrojeva = Enumerable.Range(2,10000000);
    Stopwatch st = new Stopwatch();
    st.Start();
 
    //Formiranje običnog LINQ upita
    var q_seq = from p in skupBrojeva
                where JeProst(p)
                select p;
    int ukupno = q_seq.Count();
    Console.WriteLine("Prostih brojeva od 1-10.000.000 ima:{0}",ukupno);
    //Završetak programa
    Console.WriteLine("Operacija završila za: {0} sec.", st.Elapsed.TotalSeconds);
    //Press any key to continue...
    Console.ReadKey();
}
 

U LINQ upitu korištena je metoda JeProst(int n), koja za dati prirodni broja vraća true ako je broj prost, inače vraća false. Kasnije u tekstu biće prikazana ova metoda.

Rezultat ove implementacije prikazan je na sljedećoj slici:
 

      
 

Vidimo da je traženje prostih brojeva u određenom intervalu poprilično zahtjevna operacija i ona iznosi oko 16 sekundi. Ako prilikom izvršavanja ovog primjera otvorimo TaskManager i pogledamo opterećenje procesora vidimo da se primjer vrtio samo na jednoj jezgri, odnosno zauzimao je oko 25% resursa procesora.
 

          
 

Kako ovu implementaciju učiniti bržom, ako posjedujemo višejezgreni procesor? Paralelnom implementacijom LINQ upita, odnosno korištenjem PLINQ, implementacija na quad-core procesoru trebala bi približno da bude 4 puta brža. Da bi smo implementirali paralelnu verziju jedino je potrebno da na našoj implementaciji  pozovemo samo jednu proširenu metodu i to q_seq.AsParallel(). Sa ovim pozivom cijela naša implementacija je prilagođena paralelnom izvršavanju LINQ upita odnosno procesuiranju PLINQ. Sljedeći kod implementira PLINQ:

 

static void Main(string[] args)
{
    //skup brojava od 1- 10.000.000
    var skupBrojeva = Enumerable.Range(2, 10000000);
    Stopwatch st = new Stopwatch();
    st.Start();
 
    //Formiranje običnog LINQ upita
    var q_seq = from p in skupBrojeva.AsParallel()
                where JeProst(p)
                select p;
    int ukupno = q_seq.Count();
    Console.WriteLine("Prostih brojeva od 1-10.000.000 ima:{0}",ukupno);
    //Završetak programa
    Console.WriteLine("PLINQ upit se izvršio za: {0:0.00} sec.", st.Elapsed.TotalSeconds);
    //Press any key to continue...
    Console.ReadKey();
}
 

Rezultat i vrijeme za koje se izvršila operacija prebrojavanja prostih brojeva:
 

                            
 

Paralelna verzija se izvršila za 4,33 sekunde ili nešto manje od 4 puta. Zamislite koliko vremena bi uštedjeli ako bismo prebrojavali proste brojeve u mnogo većem skupu od 10 miliona. Za vrijeme izvršavanja PLINQ zauzeće svih jezgri procesora bilo je 100% što se može vidjeti na sljedećoj slici prikaza TaskManagera u vrijeme izvršavanja primjera.
 

                                  
 

Zaista, ovo je nevjerojatno na kako jednostavan i očigledan način se implementira paralelno izvršavanje LINQ upita.

PLINQ kao i cjelokupna biblioteka ParallelFX u toku izvršavanje koda formira onoliko niti koliko postoji procesora na dotičnom PC-u. U slučaju prethodnog primjera formirane su 4 niti, obzirom da se primjer izvršavao na quad-core mašini. Međutim, PLINQ dozvoljava i da se specificira tačan broj niti neovisno o broju procesora koji sadrži PC. Ovo možemo implementirati preko poziva metoda WithDegreeOfParallelism(3) kojim definišemo konkretan broj paralelnih niti. Da bi smo prethodni primjer izvršili sa 3 paralelne niti implementacija je prikazana kao::

 

static void Main(string[] args)
{
    Console.Title = "Članak za www.ITPro.ba";
    //skup brojava od 1- 10.000.000
    var skupBrojeva = Enumerable.Range(2, 10000000);
    Stopwatch st = new Stopwatch();
    st.Start();
 
    //Formiranje običnog LINQ upita
    var q_seq = from p in skupBrojeva.AsParallel().WithDegreeOfParallelism(3)
                where JeProst(p)
                select p;
    int ukupno = q_seq.Count();
    Console.WriteLine("Prostih brojeva od 1-10.000.000 ima:{0}",ukupno);
    //Završetak programa
    Console.WriteLine("PLINQ upit se izvršio za: {0:0.00} sec.", st.Elapsed.TotalSeconds);
    //Press any key to continue...
    Console.ReadKey();
}
 

Rezultat pokretanja primjera dat je na sljedećoj slici:
 

                                                                  

Procesuiranje je trajalo nešto duže, a ako pogledamo sljedeću sliku vidimo da su samo 3 jezgre "radile", odnosno upravo onako kako smo specificirali u kodu.
 

                                                                    


Kao što smo i očekivali rezultat je 3 puta brzi u odnosu na klasični LINQ upit.

Postavlja se logično pitanje šta ako za argument stavimo broj veći od broja jezgri procesora koji posjeduje PC. Rezultat će biti približan rezultatu od 4,5 sekundi, jer je biblioteka optimizirana da ne formira dodatne niti ukoliko to nema efekta. Ovo znači da specificiranje argumenta većeg od broja procesora nema značenja. Jedina svrha ovog argument je da se paralelizam smanji sa maksimalnog broja procesora da bi se nekoj drugoj operaciji dala mogućnost paralelnog izvršavanja.

Vrlo važna činjenica je da zbog paralelizma obrada podataka u listama ili kolekcijama se ne izvršava  po indeksu i u tom slučaju paralelna obrada podataka nasumice uzima stavke iz kolekcija i manipuliše s njima. Da bi smo se uvjerili u to u prethodnom primjeru potražimo proste brojeve od 1 do 20 i ispišimo ih onim redom kojim ih je PLINQ upit i pronašao. Listing primjera dat je na sljedeći način:

 

static void Main(string[] args)
{
    Console.Title = "Članak za www.ITPro.ba";
    //skup brojava od 1- 10.000.000
    var skupBrojeva = Enumerable.Range(2, 20);
    Stopwatch st = new Stopwatch();
    st.Start();
 
    //Formiranje običnog LINQ upita
    var q_plinq = (from p in skupBrojeva.AsParallel()
                    where JeProst(p)
                    select p).ToList();
 
    foreach(var i in q_plinq)
    {
        Console.WriteLine(i);
    }
    //Završetak programa
    Console.WriteLine("PLINQ upit se izvršio za: {0:0.00} sec.", st.Elapsed.TotalSeconds);
    //Press any key to continue...
    Console.ReadKey();
}
 

Rezultat je da na sljedećoj slici:
 

                                                                                                

Vidimo da zadnji prosti broj koji je pronađen je 3, a prvi 2, 5  itd. Ovo znači da sortiranje kolekcija u paralelnim izvršavanjima se mora posebno tretirati. Međutim, ParallelFX posjeduje proširenu metodu AsOrdered() koja kontroliše poredak izvršavanja u kolekciji, odnosno imamo mogućnost kontrole sortiranja kolekcije. Ako bi željeli da naši prosti brojevi budi pohranjeni u kolekciju uzlaznim redom po veličini tada bi pozvali metodu na sljedeći način:

 

static void Main(string[] args)
{
    Console.Title = "Članak za www.ITPro.ba";
    //skup brojava od 1- 10.000.000
    var skupBrojeva = Enumerable.Range(2, 20);
    Stopwatch st = new Stopwatch();
    st.Start();
 
    //Formiranje običnog LINQ upita
    var q_plinq = (from p in skupBrojeva.AsParallel().AsOrdered()
                    where JeProst(p)
                    select p).ToList();
 
    foreach(var i in q_plinq)
    {
        Console.WriteLine(i);
    }
    //Završetak programa
    Console.WriteLine("PLINQ upit se izvršio za: {0:0.00} sec.", st.Elapsed.TotalSeconds);
    //Press any key to continue...
    Console.ReadKey();
}
 

Izlaz bi bio kao na sljedećoj slici:
 

                                                                                                                          

U koliko bi povećali skup sa 20 na npr. 200 primijetit ćemo da se korištenjem AsOrdered metode, povećalo vrijeme izvršavanja što ima logike jer se brojevi koji su veći od onog koji se treba ispisati na konzoli na čekanju za red za ispis na konzolu.
 

Particioniranje u ParallelFX i PLINQ

Particioniranje nad izvorom podataka u ParallelFX predstavlja osnovni koncept raspodjele višestrukih sekcija kojim se pristupa paralelno iz više niti. PLINQ i TPL u svom konceptu sadrže interno particioniranje koji za kompleksne slučajeve nije efektivno, i potrebno je definisati posebne vrste particioniranja. Početno particioniranje temelji se na statičkoj raspodjeli količine zadataka na svaku jezgru, što ponekad i nije efektivno.

U ParallelFX moguće je definisati dva načina particioniranja:

Range partitioning se primjenjuje kada govorimo o kolekcijama sa podacima poput liste ili polja sa poznatom veličinom elemenata, intervalno particioniranje (range partitioning)  je najčešće korišten i najjednostavniji način particioniranja. Svaka nit prima određenu količinu podataka iz liste sa početnim i završnim indexom, te ne postoji mogućnost da dvije ili više niti pristupaju istom podatku u listi. Osim početne operacije formiranja particija u procesu obrade podataka nikakve druge operacije nisu potrebne. Zbog toga ovaj način particioniranja je vrlo efikasan ukoliko se radi o konstantnom opterećenju (workload) nad svim podacima u listi. Mana ovog particioniranja predstavlja situaciju ukoliko opterećenje nije konstantno, pa se neke niti završavaju ranije, a cijela operacija mora čekati dok se i zadnja nit izvrši da bi se završila cijela operacija.

Chunk partitioning se primjenjuje ukoliko se radi o kolekcijama u kojima je nepoznat broj elemenata, poput povezanih lista (LinkedList) i sl, mogućnost particioniranja se svodi na to da svaka nit uzme određeni broj elemenata iz liste (chunk size), te ih procesuira. Kada završi vraća se ponovo i uzima drugi komad (chank). U ovakvom particioniranju implementacija se vrši na način da ni jedna nit ne uzme dupli podatak. Veličina komada (chunk)  biva proizvoljna i može biti od 1 do više elemenata, a sve zavisi od prirode i kompleksnosti operacije nad kojom se vrši. Zbog prirode samih kolekcija nad kojim se vrši procesuiranje, veličina komada koji se uzimaju zavisi od procesuiranja i dinamički se vrši sihronizacija uravnoteženja opterećenja (load-balancing) tokom obrade. Količina sihronizacije je obrnuto proporcionalna veličini komada (chunk).

TPL particioneri podržavaju formiranje dinamičkog broja particija, odnosno u vrijeme izvršavanja donosi se odluka o formiranju particije, koja će se izvršavati na određenoj slobodnoj niti.

Klasa Partitioner

Klasa Partitioner dozvoljava formiranje particija za bilo koju kolekciju. Višestruko preklopljenja metoda Create ove klase, dozvoljava formiranje particija za sve petlje implementirane u ParallelFX.

Ova klasa posjeduje samo jednu metodu i to metodu Create koja ima sljedeće preklopljene verzije:

 

namespace System.Collections.Concurrent
{
    public static class Partitioner
    {
        public static OrderablePartitioner<TSource> Create<TSource>(IEnumerable<TSource> source);
        public static OrderablePartitioner<TSource> Create<TSource>(IList<TSource> list, bool loadBalance);
        public static OrderablePartitioner<Tuple<int, int>> Create(int fromInclusive, int toExclusive);
        public static OrderablePartitioner<Tuple<long, long>> Create(long fromInclusive, long toExclusive);
        public static OrderablePartitioner<TSource> Create<TSource>(TSource[] array, bool loadBalance);
        public static OrderablePartitioner<Tuple<int, int>> Create(int fromInclusive, int toExclusive, int rangeSize);
        public static OrderablePartitioner<Tuple<long, long>> Create(long fromInclusive, long toExclusive, long rangeSize);
    }
}


U narednom tekstu vidjet ćemo njenu primjenu i kako efikasno se može upotrijebiti u implementaciji.

Pretpostavimo jednostavan primjer iz prethodne implementacije, odnosno želimo prebrojiti koliko ima prostih brojeva u nekom intervalu. Implementacija ove metode ima više te postoji nekoliko algoritama preko kojih se implementira. Listing metode je sljedeći:

 

static bool JeProst(int n)   
{
    for (int i = 2; i * i <= n; i++)
        if (n % i == 0)
            return false;
    return true;
 
}
 

Ova metoda koristi  svojstvo da svaki broj ima paran broj djelilaca osim potpunog kvadrata koji ima neparan broj, pa se svi djelioci mogu postaviti u uređene dvojke brojeva čiji produkt iznosi dotični broj. U svakoj uređenoj dvojki prvi član je manji ili jednak od korijena broja. Ukoliko ne postoji ni jedan djelilac broja koji je manji ili jednak od korijena datog broja, onda je broj prost. Ovo je jedan od najbržih algoritama za izračunavanje prostih brojeva. Brži algoritam od navedenog koristi svojstvo tzv. Eratostenovog sita.

Vidjet ćemo nekoliko implementacija na koji način možemo prebrojati proste brojeve, a kao zadnja implementacija koristit ćemo klasu Partitioner, odnosno particioniranje paralelnih zadataka. Krenimo od najjednostavnije sekvencijalne implementacije.


I Implementacija sastoji se u definisanju običnog sekvencijalnog LINQ upita, kao na sljedećem listingu odnosno identično prethodnim implementacijama:

 

var count = (from p in skupBrojeva
             where JeProst(p)
             select p).Count();
 

Ovaj kod se izvršava na jednoj jezgri procesora i kao što smo vidjeli izvršava se za nešto više od 16 sekundi.


II Implementacija sastoji se u definisanju klasičnog PLINQ upita, kao na sljedećem listingu:

 var count = (from p in skupBrojeva.AsParallel()

                where JeProst(p)
                select p).Count();
 

Ova implementacija uzima interno particioniranje definisano u samoj biblioteci ParallelFX, a u prethodnom izlaganju vidjeli smo da operacije traje oko 4,63 sekunde.


III Implementacija sastoji se u definisanju posebno definisanog particioniranja, koji u vrijeme izvršavanja definiše veličinu “komada” (chunks) za procesuiranje svake niti.

 

var count = Partitioner.Create(skupBrojeva).AsParallel()
                 .Where(p => JeProst(p))
                 .Count();
 

Primjetit ćemo da ova implementacija ide preko Create metode Partitioner klase, koja interno formira dijelove kolekcije, a teži da ravnomjerno rasporedi opterećenje za svaku nit. Obzirom da je primjer relativno brzo završi ipak imamo blagi pomak brzine, jer ova implementacija je za nijansu brža od početne.
 

                                                                                                                                                        
 

Metoda JeProst(int n) koja igra centralnu ulogu u ovom primjeru, uzima različito procesorsko vrijeme u odnosu na broj n, tj. za veće brojeve potrebno više procesorskog vremena, a koristi se statičko particioniranje koje je ugrađeno u PLINQ upit (implementacija II). Ova implementacija neće moći do kraja efektivno procesuirati, iz tog razloga što će određene niti brže završiti posao i čekat će se druge niti koje izvršavaju veći dio posla. To je i očigledno je ako se posmatra metoda JeProst vidimo da provjera nekog malo broja u odnosu na veći broj vrlo je velika pa standardno statičko particioniranje u PLINQ ipak nije optimizirano na pravi način.

Za razliku od varijante 2, varijanta 3 koja u vrijeme izvršavanja koda formira particije ravnomjerno raspoređuje opterećenja na sve formirane niti, što je rezultiralo kraće vrijeme izvršavanja.

Obzirom da se radi o relativno maloj obradi podataka, pa su vremena posebno u drugoj i trećeoj varijanti  u manjoj mjeri razlikuju. Bez obzira na to, opet imamo znatno poboljšanje performansi između spomenutih varijanti. U stvari kada smo koristili ugrađeni PLINQ particioner vidimo da on za ovaj slučaj nije izbalansiran, te iz tog razloga treća implementacija maksimalno iskorištava sve resurse PC-a. Čitaocu se ostavlja da uradi test za veći opseg, kao i da analizira isti.

Više informacija i demo primjera možete pronaći na autorovom blogu, kao i SkyDrive računu na kojem se nalazi desetak demo primjera sa ovog naslova koji su prezentirani na User Group sastancima u Bihaću, odnosno u Beogradu na Community Tracku konferencije Sinergije 10. Isto tako, na blogu autora može se pronaći desetak blog postova koja se tiču ParallelFx i primjene u raznim poljima poput izračunavanje sistema linearnih jednačina, izračunavanja decimala broja PI, rješavanje Problema sa Project Euler stranice itd.

Komentari (0)

Pošalji svoj komentar

Newsletter

Pretplatite se na naš newsletter