Egyidejű egyesítés Rendezés megosztott memóriában
Adott egy 'n' és egy n szám, rendezze a számokat a segítségével Egyidejű Összevonási rendezés. (Tipp: Próbáljon shmget shmat rendszerhívásokat használni).
1. rész: Az algoritmus (HOGYAN?)
Rekurzív módon állítson be két gyermek folyamatot a bal felének egyike a jobb felének. Ha egy folyamat elemeinek száma a tömbben 5-nél kevesebb, hajtsa végre a Beszúrás rendezése . A két gyermek szülője ezután összevonja az eredményt, és visszatér a szülőhöz és így tovább. De hogyan teheted egyidejűvé?
2. rész: A logikai (MIÉRT?)
A probléma megoldásának fontos része nem algoritmikus, hanem az operációs rendszer és a kernel fogalmának magyarázata.
Az egyidejű rendezés eléréséhez olyan módszerre van szükségünk, hogy két folyamatot egyszerre dolgozzunk ugyanazon a tömbön. A dolgok megkönnyítése érdekében a Linux sok rendszerhívást biztosít egyszerű API-végpontokon keresztül. Közülük kettő van shmget() (megosztott memóriakiosztáshoz) és shmat() (megosztott memória műveletekhez). Megosztott memóriateret hozunk létre a gyermekfolyamat között, amelyet elágazunk. Minden szegmens bal és jobb oldali gyermekre van felosztva, amelyek az érdekes rész szerint vannak rendezve, mivel párhuzamosan dolgoznak! Az shmget() arra kéri a rendszermagot, hogy allokálja a megosztott oldal mindkét folyamat számára.
Miért nem működik a hagyományos fork()?
A válasz abban rejlik, hogy a fork() mit csinál valójában. A dokumentációból a 'fork() új folyamatot hoz létre a hívási folyamat megkettőzésével'. A gyermekfolyamat és a szülőfolyamat külön memóriaterületen fut. A fork() idején mindkét memóriaterületnek ugyanaz a tartalma. A memória írja a fájlleíró(fd) változtatásokat stb., amelyeket az egyik folyamat hajt végre, nem érintik a másikat. Ezért szükségünk van egy megosztott memória szegmensre.
#include #include #include #include #include #include #include #include void insertionSort ( int arr [] int n ); void merge ( int a [] int l1 int h1 int h2 ); void mergeSort ( int a [] int l int h ) { int i len = ( h - l + 1 ); // Using insertion sort for small sized array if ( len <= 5 ) { insertionSort ( a + l len ); return ; } pid_t lpid rpid ; lpid = fork (); if ( lpid < 0 ) { // Lchild proc not created perror ( 'Left Child Proc. not created n ' ); _exit ( -1 ); } else if ( lpid == 0 ) { mergeSort ( a l l + len / 2 - 1 ); _exit ( 0 ); } else { rpid = fork (); if ( rpid < 0 ) { // Rchild proc not created perror ( 'Right Child Proc. not created n ' ); _exit ( -1 ); } else if ( rpid == 0 ) { mergeSort ( a l + len / 2 h ); _exit ( 0 ); } } int status ; // Wait for child processes to finish waitpid ( lpid & status 0 ); waitpid ( rpid & status 0 ); // Merge the sorted subarrays merge ( a l l + len / 2 - 1 h ); } /* Function to sort an array using insertion sort*/ void insertionSort ( int arr [] int n ) { int i key j ; for ( i = 1 ; i < n ; i ++ ) { key = arr [ i ]; j = i - 1 ; /* Move elements of arr[0..i-1] that are greater than key to one position ahead of their current position */ while ( j >= 0 && arr [ j ] > key ) { arr [ j + 1 ] = arr [ j ]; j = j - 1 ; } arr [ j + 1 ] = key ; } } // Method to merge sorted subarrays void merge ( int a [] int l1 int h1 int h2 ) { // We can directly copy the sorted elements // in the final array no need for a temporary // sorted array. int count = h2 - l1 + 1 ; int sorted [ count ]; int i = l1 k = h1 + 1 m = 0 ; while ( i <= h1 && k <= h2 ) { if ( a [ i ] < a [ k ]) sorted [ m ++ ] = a [ i ++ ]; else if ( a [ k ] < a [ i ]) sorted [ m ++ ] = a [ k ++ ]; else if ( a [ i ] == a [ k ]) { sorted [ m ++ ] = a [ i ++ ]; sorted [ m ++ ] = a [ k ++ ]; } } while ( i <= h1 ) sorted [ m ++ ] = a [ i ++ ]; while ( k <= h2 ) sorted [ m ++ ] = a [ k ++ ]; int arr_count = l1 ; for ( i = 0 ; i < count ; i ++ l1 ++ ) a [ l1 ] = sorted [ i ]; } // To check if array is actually sorted or not void isSorted ( int arr [] int len ) { if ( len == 1 ) { std :: cout < < 'Sorting Done Successfully' < < std :: endl ; return ; } int i ; for ( i = 1 ; i < len ; i ++ ) { if ( arr [ i ] < arr [ i - 1 ]) { std :: cout < < 'Sorting Not Done' < < std :: endl ; return ; } } std :: cout < < 'Sorting Done Successfully' < < std :: endl ; return ; } // To fill random values in array for testing // purpose void fillData ( int a [] int len ) { // Create random arrays int i ; for ( i = 0 ; i < len ; i ++ ) a [ i ] = rand (); return ; } // Driver code int main () { int shmid ; key_t key = IPC_PRIVATE ; int * shm_array ; int length = 128 ; // Calculate segment length size_t SHM_SIZE = sizeof ( int ) * length ; // Create the segment. if (( shmid = shmget ( key SHM_SIZE IPC_CREAT | 0666 )) < 0 ) { perror ( 'shmget' ); _exit ( 1 ); } // Now we attach the segment to our data space. if (( shm_array = ( int * ) shmat ( shmid NULL 0 )) == ( int * ) -1 ) { perror ( 'shmat' ); _exit ( 1 ); } // Create a random array of given length srand ( time ( NULL )); fillData ( shm_array length ); // Sort the created array mergeSort ( shm_array 0 length - 1 ); // Check if array is sorted or not isSorted ( shm_array length ); /* Detach from the shared memory now that we are done using it. */ if ( shmdt ( shm_array ) == -1 ) { perror ( 'shmdt' ); _exit ( 1 ); } /* Delete the shared memory segment. */ if ( shmctl ( shmid IPC_RMID NULL ) == -1 ) { perror ( 'shmctl' ); _exit ( 1 ); } return 0 ; }
Java import java.util.Arrays ; import java.util.Random ; import java.util.concurrent.ForkJoinPool ; import java.util.concurrent.RecursiveAction ; public class ConcurrentMergeSort { // Method to merge sorted subarrays private static void merge ( int [] a int low int mid int high ) { int [] temp = new int [ high - low + 1 ] ; int i = low j = mid + 1 k = 0 ; while ( i <= mid && j <= high ) { if ( a [ i ] <= a [ j ] ) { temp [ k ++] = a [ i ++] ; } else { temp [ k ++] = a [ j ++] ; } } while ( i <= mid ) { temp [ k ++] = a [ i ++] ; } while ( j <= high ) { temp [ k ++] = a [ j ++] ; } System . arraycopy ( temp 0 a low temp . length ); } // RecursiveAction for fork/join framework static class SortTask extends RecursiveAction { private final int [] a ; private final int low high ; SortTask ( int [] a int low int high ) { this . a = a ; this . low = low ; this . high = high ; } @Override protected void compute () { if ( high - low <= 5 ) { Arrays . sort ( a low high + 1 ); } else { int mid = low + ( high - low ) / 2 ; invokeAll ( new SortTask ( a low mid ) new SortTask ( a mid + 1 high )); merge ( a low mid high ); } } } // Method to check if array is sorted private static boolean isSorted ( int [] a ) { for ( int i = 0 ; i < a . length - 1 ; i ++ ) { if ( a [ i ] > a [ i + 1 ] ) { return false ; } } return true ; } // Method to fill array with random numbers private static void fillData ( int [] a ) { Random rand = new Random (); for ( int i = 0 ; i < a . length ; i ++ ) { a [ i ] = rand . nextInt (); } } public static void main ( String [] args ) { int length = 128 ; int [] a = new int [ length ] ; fillData ( a ); ForkJoinPool pool = new ForkJoinPool (); pool . invoke ( new SortTask ( a 0 a . length - 1 )); if ( isSorted ( a )) { System . out . println ( 'Sorting Done Successfully' ); } else { System . out . println ( 'Sorting Not Done' ); } } }
Python3 import numpy as np import multiprocessing as mp import time def insertion_sort ( arr ): n = len ( arr ) for i in range ( 1 n ): key = arr [ i ] j = i - 1 while j >= 0 and arr [ j ] > key : arr [ j + 1 ] = arr [ j ] j -= 1 arr [ j + 1 ] = key def merge ( arr l mid r ): n1 = mid - l + 1 n2 = r - mid L = arr [ l : l + n1 ] . copy () R = arr [ mid + 1 : mid + 1 + n2 ] . copy () i = j = 0 k = l while i < n1 and j < n2 : if L [ i ] <= R [ j ]: arr [ k ] = L [ i ] i += 1 else : arr [ k ] = R [ j ] j += 1 k += 1 while i < n1 : arr [ k ] = L [ i ] i += 1 k += 1 while j < n2 : arr [ k ] = R [ j ] j += 1 k += 1 def merge_sort ( arr l r ): if l < r : if r - l + 1 <= 5 : insertion_sort ( arr ) else : mid = ( l + r ) // 2 p1 = mp . Process ( target = merge_sort args = ( arr l mid )) p2 = mp . Process ( target = merge_sort args = ( arr mid + 1 r )) p1 . start () p2 . start () p1 . join () p2 . join () merge ( arr l mid r ) def is_sorted ( arr ): for i in range ( 1 len ( arr )): if arr [ i ] < arr [ i - 1 ]: return False return True def fill_data ( arr ): np . random . seed ( 0 ) arr [:] = np . random . randint ( 0 1000 size = len ( arr )) if __name__ == '__main__' : length = 128 shm_array = mp . Array ( 'i' length ) fill_data ( shm_array ) start_time = time . time () merge_sort ( shm_array 0 length - 1 ) end_time = time . time () if is_sorted ( shm_array ): print ( 'Sorting Done Successfully' ) else : print ( 'Sorting Not Done' ) print ( 'Time taken:' end_time - start_time )
JavaScript // Importing required modules const { Worker isMainThread parentPort workerData } = require ( 'worker_threads' ); // Function to merge sorted subarrays function merge ( a low mid high ) { let temp = new Array ( high - low + 1 ); let i = low j = mid + 1 k = 0 ; while ( i <= mid && j <= high ) { if ( a [ i ] <= a [ j ]) { temp [ k ++ ] = a [ i ++ ]; } else { temp [ k ++ ] = a [ j ++ ]; } } while ( i <= mid ) { temp [ k ++ ] = a [ i ++ ]; } while ( j <= high ) { temp [ k ++ ] = a [ j ++ ]; } for ( let p = 0 ; p < temp . length ; p ++ ) { a [ low + p ] = temp [ p ]; } } // Function to check if array is sorted function isSorted ( a ) { for ( let i = 0 ; i < a . length - 1 ; i ++ ) { if ( a [ i ] > a [ i + 1 ]) { return false ; } } return true ; } // Function to fill array with random numbers function fillData ( a ) { for ( let i = 0 ; i < a . length ; i ++ ) { a [ i ] = Math . floor ( Math . random () * 1000 ); } } // Function to sort the array using merge sort function sortArray ( a low high ) { if ( high - low <= 5 ) { a . sort (( a b ) => a - b ); } else { let mid = low + Math . floor (( high - low ) / 2 ); sortArray ( a low mid ); sortArray ( a mid + 1 high ); merge ( a low mid high ); } } // Main function function main () { let length = 128 ; let a = new Array ( length ); fillData ( a ); sortArray ( a 0 a . length - 1 ); if ( isSorted ( a )) { console . log ( 'Sorting Done Successfully' ); } else { console . log ( 'Sorting Not Done' ); } } main ();
Kimenet:
Sorting Done Successfully
Időbonyolultság :O(N log N )
Segédtér: O(N)
Teljesítményjavítások?
Próbálja meg időzíteni a kódot, és hasonlítsa össze a teljesítményét a hagyományos szekvenciális kóddal. Meglepődne, ha tudná, hogy a szekvenciális rendezés jobb!
Amikor mondjuk a bal oldali gyermek hozzáfér a bal tömbhöz, a tömb betöltődik a processzor gyorsítótárába. Most, amikor a jobb oldali tömbhöz hozzáférünk (egyidejű hozzáférések miatt), gyorsítótár hiányzik, mivel a gyorsítótár meg van töltve bal oldali szegmenssel, majd a jobb oldali szegmens a gyorsítótár memóriájába másolódik. Ez az ide-oda folyamat folytatódik, és olyan szintre rontja a teljesítményt, hogy gyengébb teljesítményt nyújt, mint a szekvenciális kód.
Vannak módok a gyorsítótár hiányosságainak csökkentésére a kód munkafolyamatának vezérlésével. De ezeket nem lehet teljesen elkerülni!