Parallel Sort with coarrays

Hello all,

Not sure this is ready, but it is working. This took a LOT of time to figure out because almost every tutorial was just that little bit off. The whole idea was to use multiple threads to shuffle multiple sorted lists into a very long fully sorted list. At first it really didn’t work and took WAY longer than a single threaded sort. Now it works and suspiciously takes far less time… Curious…

  ! Parallel Sort Test by Frank Meyer
  ! Created Dec 2021
  ! Version 0.1a
  ! Updates:
program ParallelSort
  use iso_fortran_env , only: atomic_int_kind, lock_type
  implicit none

  ! Global type and variable declarations
  ! Structure to house the data to be sorted
  type :: t_sortelement
     integer :: i_data
     type(t_sortelement), pointer :: p_next, p_prev
  end type t_sortelement

  ! Structure to organize a list of lists which will be used to perform the merge sorting.
  type :: t_tagelement
     integer :: i_debug
     type(t_sortelement), pointer :: p_list, p_ilist, p_outlist
     type(t_tagelement), pointer :: p_next, p_prev
  end type t_tagelement

  ! Structure to help with moving data.
  type :: t_syncmessage
     integer :: i_small
     integer :: i_large
     integer :: i_iterations
  end type t_syncmessage

  ! Normal variables for each threads to operate.
  integer :: i_elements, i_all, i_me, i_need, i_doing
  integer :: i_loop1, i_loop2, i_rand, i_state, i_left, i_right, i_iterations
  real :: r_rand
  logical :: l_notdone
  integer :: i_ourdebug, i_ourout
  character(50) :: c_filename, c_threadnm, c_temp
  integer :: int_reging
  integer :: i_neighbours, i_neighbourl
  
  ! Coarrays for the number of elements, this image's smallest/largest number and happiness.
  integer(atomic_int_kind) :: i_small[*], i_large[*], i_unhappy[*]
  type(t_syncmessage) :: t_messages[*]

  ! Pointers to hold our lists.
  type(t_sortelement), pointer :: p_listroot, p_listnow, p_listnew, p_listtemp
  type(t_tagelement), pointer :: p_tagroot, p_tagnow, p_tagnew

  ! Pointers to help with merging.
  type(t_sortelement), pointer :: p_righthand, p_lefthand, p_centerhand, p_temphand
  type(t_syncmessage), pointer :: t_localcopy

  ! Assign starting values to variables.
  i_elements = 19
  i_me = this_image()
  i_all = num_images()
  i_need = i_all
  int_reging = i_all
  i_state = 0
  l_notdone = .true.
  i_unhappy = 2
  i_left = i_me - 1
  i_right = i_me + 1
  i_neighbours = 0
  i_neighbourl = 99999999
  

  ! Be sure of null() in our lists.
  p_listroot => null()
  p_listnow => null()
  p_listnew => null()
  p_listtemp => null()

  ! Deny entry if only 1 image!
  if (i_all .eq. 1) then
     print *, "Please restart with more than 1 image!"
     call exit(0)
  end if
  
  
  ! Program code goes here!
  ! Starting with using thread 1 to do some startup tasks.
  if (i_me .eq. 1) then
     print *, "************************************************************************"
     print *, "Welcome to the Parallel Sort Program by Frank Meyer"
     print *, "************************************************************************"
     print "(3/,a)", "We hope this works...."
     print "(a,$)", "How many elements? "
     read *, i_elements
  end if

  call co_broadcast(i_elements, 1)

  sync all  ! Make sure everyone knows the total elements we are to deal with.

  ! If total number of elements does not divide equally by our thread count, then add to the last threads quota.
  i_doing = i_elements / i_all
  if (i_me .eq. i_all) then
     i_doing = i_doing + mod(i_elements, i_all)
  end if

  ! Open our debug file for writing. Use a write statement to create the file name
  write(c_threadnm, "(i4.4)") i_me
  c_threadnm = trim(c_threadnm)
  write(c_filename, "(a5,a4,a4)") "debug",  c_threadnm, ".txt" 
  print *, "Our debug file will be: ", c_filename
  open(newunit=i_ourdebug, file=c_filename, status="replace")

  
  ! Ready list for first creation.
  allocate(p_listroot)
  p_listroot%p_next => null()
  p_listroot%p_prev => null()
  p_listnow => p_listroot
  
  ! Use each thread to create some of the total list elements.
  ! Add each new list item to the bottom of the current list.
  do i_loop1 = 1, i_doing
     call random_number(r_rand)                          ! Get random number,
     r_rand = r_rand * 1000000                           ! make it worthwhile,
     i_rand = int(r_rand)
     p_listnow%i_data = i_rand                           ! Put data into element.
     p_listnew => null()                                 ! Create new element.
     allocate(p_listnew)
     p_listnow%p_next => p_listnew
     p_listnew%p_prev => p_listnow
     p_listnew%p_next => null()
     p_listnow => p_listnew
  end do
  
  ! Burn last because we don't actually need it.
  p_listnow => p_listnow%p_prev
  p_listnow%p_next => null()
  deallocate(p_listnew)

  ! Because I couldn't think of how to do this in a multiple processor way, I've just decided to sort my list
  ! and then shuffle data up and down until 1<2<3<4<x. It's probably not the right way to do this, but
  ! it's what my brain came up with. The end will be very atomic and probably very slow!

  ! Edit: This might be the slowest way to do this! My single threaded version of this can easily beat it!
  
  ! Make each thread perform it's own import sort. I'm not worried at this time to check for already
  ! sorted list as that should be very improbable! When done we should have lists of lists.
  p_listnow => p_listroot
  allocate(p_tagroot)
  p_tagnow => p_tagroot
  do i_loop1 = 1, i_doing
     if (i_state .eq. 0) then          ! This should create the next link in the chain of lists
        allocate(p_tagnew)             ! and put the current list item into it.
        p_tagnow%p_next => p_tagnew
        p_tagnew%p_prev => p_tagnow
        p_tagnew%p_next => null()
        p_tagnow => p_tagnew
        p_tagnow%p_list => p_listnow
        p_tagnow%p_ilist => p_listnow
        p_listnew => p_listnow%p_next
        p_listnow%p_prev => null()     ! Remove it from the list it was in.
        p_listnow%p_next => null()     ! Hint next operation of the loop.
        if (.not. associated(p_listnew)) then
           exit
        end if
        if (p_listnew%i_data .lt. p_listnow%i_data) then
           i_state = 1 ! Next is smaller than first item.
        else
           i_state = 2 ! Next is larger than first item.
        end if
     else
        if (i_state .eq. 1) then          ! Insert the next item in the list to the front of this list.
           p_listtemp =>p_listnew%p_next  ! Save current next!
           p_listnow%p_prev => p_listnew
           p_listnew%p_next => p_listnow
           p_listnew%p_prev => null()
           p_tagnow%p_list => p_listnew   ! Hint next operation of the loop.
           p_listnow => p_listnew
           p_listnew => p_listtemp
           if (.not. associated(p_listnew)) then
              exit
           end if
           if (p_listnew%i_data .gt. p_listnow%i_data) then
              i_state = 0 ! Next item reverses current flow; create new list.
              p_listnow => p_listnew
           end if
        end if
        if (i_state .eq. 2) then          ! Insert the next item in the list to the last of this list.
           p_listtemp =>p_listnew%p_next  ! Save current next!
           p_listnow%p_next => p_listnew
           p_listnew%p_prev => p_listnow
           p_listnew%p_next => null()
           p_tagnow%p_ilist => p_listnew   ! Hint next operation of the loop
           p_listnow => p_listnew
           p_listnew => p_listtemp
           if (.not. associated(p_listnew)) then
              exit
           end if
           if (p_listnew%i_data .lt. p_listnow%i_data) then
              i_state = 0 ! Next item reverses current flow; create new list.
              p_listnow => p_listnew
           end if
        end if
     end if
  end do
  
  ! Lets see what is about to sort. Printing full list.
  p_tagnew => p_tagroot%p_next
  i_loop1 = 1
  do while(associated(p_tagnew))
     i_loop2 = 1
     p_tagnew%i_debug = i_loop1
     p_lefthand => p_tagnew%p_list
     do while(associated(p_lefthand))
        write(i_ourdebug, "(i3, i3, i10)") i_loop1, i_loop2, p_lefthand%i_data
        i_loop2 = i_loop2 + 1
        p_lefthand => p_lefthand%p_next
     end do
     i_loop1 = i_loop1 + 1
     p_tagnew => p_tagnew%p_next
  end do
  
  ! We should now have a list of lists for each image. We do not need to synchronize the images now, just start
  ! the merging and wait for the synchronization until all the images have only one list left. Then, just before
  ! the call to sync all, use the coarray to set my top and bottom numbers.
  ! Cute trick here; I create a circle of lists and take turns steping forward and deallocating
  ! each second list header. Then when we have only one list left it will be linked to itself!

  p_tagnow => p_tagroot%p_next  ! Start at the beginning and work forward.
  p_tagnew => p_tagroot%p_next
  deallocate(p_tagroot)         ! Don't need this pointer right now.
  do while(associated(p_tagnew%p_next))  ! Where's the end?
     p_tagnew => p_tagnew%p_next
  end do
  p_tagnew%p_next => p_tagnow  ! Found! Now create the ring.
  p_tagnow%p_prev => p_tagnew

  p_tagnew => p_tagnow%p_next  ! Set the pointers for the start of the sort.
  do while (.not. associated(p_tagnew, p_tagnow))  ! Start the sort!
     p_lefthand => p_tagnow%p_list  ! Left hand and
     p_righthand => p_tagnew%p_list ! Right hand get added one by one
     if (p_lefthand%i_data < p_righthand%i_data) then
        p_centerhand => p_lefthand  ! to the center hand.
        p_lefthand => p_lefthand%p_next
     else
        p_centerhand => p_righthand ! to the center hand.
        p_righthand => p_righthand%p_next
     end if
     p_tagnow%p_outlist => p_centerhand ! And set the center hand as our outlist.
     l_notdone = .true.
     do while(l_notdone)  ! None, one, or any left in each hand needs to be outlisted.
        if (associated(p_lefthand) .and. associated(p_righthand)) then
           if (p_lefthand%i_data < p_righthand%i_data) then
              p_centerhand%p_next => p_lefthand  ! Left hand adds to center hand.
              p_lefthand%p_prev => p_centerhand
              p_temphand => p_lefthand%p_next
              p_lefthand => p_temphand
           else
              p_centerhand%p_next => p_righthand ! Right hand adds to center hand.
              p_righthand%p_prev => p_centerhand
              p_temphand => p_righthand%p_next
              p_righthand => p_temphand
           end if
           p_centerhand => p_centerhand%p_next
           p_centerhand%p_next => null()
        else

           ! One hand or the other is done. Add the other to center and finish out.
           If (associated(p_lefthand)) then
              p_centerhand%p_next => p_lefthand ! Left hand is remaining.
              p_lefthand%p_prev => p_centerhand
           else
              p_centerhand%p_next => p_righthand ! Right hand is remaining.
              p_righthand%p_prev => p_centerhand
           end if
           l_notdone = .false.
           do while (associated(p_centerhand%p_next)) ! Either way get the inverse
              p_centerhand => p_centerhand%p_next     ! list ready.
           end do
           p_tagnow%p_ilist => p_centerhand
        end if
     end do
     p_tagnow%p_list => p_tagnow%p_outlist   ! Move out to in,
     p_tagnow%p_next => p_tagnew%p_next      ! move right hand out,
     deallocate(p_tagnew)                    ! remove the each second list,
     p_tagnow => p_tagnow%p_next             ! and finish on the next next.
     p_tagnew => p_tagnow%p_next
  end do
  p_tagroot => p_tagnow ! We can only be here when sorted, so make the sorted list our root.

  ! Print the entire sorted list to debug.
  write(i_ourdebug, *) "Our entire sorted list is;", i_me
  p_lefthand => p_tagroot%p_list
  do while(associated(p_lefthand))
     write(i_ourdebug, *) p_lefthand%i_data
     p_lefthand => p_lefthand%p_next
  end do


  ! Before we hit the sync all we should post our up and down numbers. This was the
  ! most rediculous set of issues in the whole build! Errors thrown for no reason
  ! and NO good instructions anywhere! Finally realized it was the lack of good error
  ! reporting or manuals and started asking the community for help... But after
  ! trial and error a _possible_ answer was found. I still don't trust this, but...
  ! It works...

  ! Using the root's forward and inverse lists allows finding the small and large easy!

  ! We should all be ready to swap items that do not make us happy.
  i_iterations = 0
  l_notdone = .true.
  do while (l_notdone)
     
     
     ! Post our new numbers to the local copy then sync.
     i_iterations = i_iterations + 1   ! Added to watch the number of iterations.
     p_lefthand => p_tagroot%p_list
     p_righthand => p_tagroot%p_ilist
     i_small = p_lefthand%i_data
     i_large = p_righthand%i_data
     allocate(t_localcopy)
     t_localcopy%i_small = i_small
     t_localcopy%i_large = i_large
     t_localcopy%i_iterations = i_iterations
     t_messages = t_localcopy
     deallocate(t_localcopy)
     

     ! Sync! Sync! It's bound to help keep everything in line!
     call execute_command_line('') ! And be sure to cause I/O to dump out!
     sync all

     ! Pushing didn't work, how about pulling?     
     ! Depending who we are, we should check our neighbours numbers.
     allocate(t_localcopy)
     if (i_me .gt. 1) then           ! All but first has a left!
        write(i_ourdebug, *) "Pulling left."
        t_localcopy = t_messages[i_left]
        do while(t_localcopy%i_iterations .ne. i_iterations)
           call sleep(1)
           t_localcopy = t_messages[i_left]
           write(i_ourdebug, *) "again."
        end do
        i_neighbourl = t_messages[i_left]%i_large
        if (i_small .ge. i_neighbourl) then
           i_unhappy = 0
        else
           ! Unhappy? Then delete from our list and add the remote data.
           i_unhappy = 2
           call SubNumberLeft(i_neighbourl, p_tagroot, i_ourdebug)
        end if
     end if
     deallocate(t_localcopy)

     allocate(t_localcopy)
     if (i_me .lt. i_all) then       ! All but last has a right!
        write(i_ourdebug, *) "Pulling right."
        t_localcopy = t_messages[i_right]
        do while(t_localcopy%i_iterations .ne. i_iterations)
           call sleep(1)
           t_localcopy = t_messages[i_right]
           write(i_ourdebug, *) "again."
        end do
        i_neighbours = t_messages[i_right]%i_small
        if (i_large .le. i_neighbours) then
           i_unhappy = 0
        else
           ! Do the same here, but from the right.
           i_unhappy = i_unhappy + 1
           call SubNumberRight(i_neighbours, p_tagroot, i_ourdebug)
        end if
     end if
     deallocate(t_localcopy)
     
     write(i_ourdebug, *) "Iterations; ", i_iterations, i_neighbours, i_neighbourl

     sync all ! Be aware of the race condition to these numbers!

     ! Debugging text...
     write(i_ourdebug, *) "Happinesses: ", i_unhappy
      
     ! Everything needs to be synced again, but before we do that we should output some bugcheck
     ! data and check for happiness. If the max unhappy was zero, then we must be done!
     call co_max(i_unhappy)
     
     if (i_unhappy .eq. 0) then
        l_notdone = .false.
     else
        l_notdone = .true.
     end if

     ! Post a new list to our debug either way.
     write(i_ourdebug, *) "Our new sorted list is;", i_me
     p_lefthand => p_tagroot%p_list
     p_righthand => p_lefthand%p_next
     do while(associated(p_lefthand))
        write(i_ourdebug, *) i_me, p_lefthand%i_data
        p_lefthand => p_lefthand%p_next
     end do

     ! Sync! Sync! It's bound to help keep everything in line!
     call execute_command_line('') ! And be sure to cause I/O to dump out!
     sync all
         
  end do

  ! Close our debug and open our final answer!
  close(i_ourdebug)
  write(c_filename, "(a6,a4,a4)") "output", c_threadnm, ".txt"
  print *, "Our output file will be: ", c_filename
  open(newunit=i_ourout, file=c_filename, status="replace")


  ! Ok! All done! Chances are there are a LOT of numbers to display here so if we just left the threads
  ! to throw their numbers down, this would look like number salad. An orderly print of all this will
  ! take time. I could make each thread wait their turn and then print... Sounds like a good idea!

  ! Edit: That was silly! Lets print all this to a set of files.

  write(i_ourout, "(a, i8, a)") "Reporting from image ", i_me, ":"
  write(i_ourout, *) "Final report after ", i_iterations, " iterations!"

  p_lefthand => p_tagroot%p_list
  i_loop2 = 1
  do while (associated(p_lefthand))
     write(i_ourdebug, *) i_loop2, p_lefthand%i_data
     i_loop2 = i_loop2 + 1
     p_lefthand => p_lefthand%p_next
  end do

  ! Close our last file
  close(i_ourout)

  ! That's all folkes! Have a great life!


  ! Now on to the subroutines:

  ! I tried very hard not to separate any work out of the main program, but this just kept causing me
  ! issues so I have separated it out to make it more understandable. What these two almost identical subroutines
  ! do is take something from either the left or the right and add it to our current sorted list in replacement
  ! for what ever we popped off either end.

contains

  subroutine SubNumberLeft(i_newdata, p_tagroot, i_ourdebug)
    implicit none

    ! Declare incoming variables and scope.
    integer, intent(in) :: i_newdata, i_ourdebug
    type(t_tagelement), pointer, intent(inout) :: p_tagroot

    ! Declare local variables
    type(t_sortelement), pointer :: p_list, p_ilist, p_listnew, p_listtemp
    integer :: i_olddata
    logical :: l_working, l_unsure

    ! Initialize local variables
    l_working = .true.

    ! We need to sub the item to the left so remove the current left from the list.
    p_list => p_tagroot%p_list
    p_listtemp => p_list%p_next
    write(i_ourdebug, *) i_me, " is deleting: ", p_list%i_data, " for ", i_newdata
    deallocate(p_list)
    p_tagroot%p_list => p_listtemp
    p_listtemp%p_prev => null()

    ! Now create a new element and try to put it in place in the list.
    ! Try the easy answer first and work backwards using the double linked list.
    allocate(p_listnew)
    p_listnew%i_data = i_newdata
    p_ilist => p_tagroot%p_ilist
    i_olddata = p_ilist%i_data

    ! Is this the new right hand number? Check and link in if so.
    if (i_olddata .lt. i_newdata) then
       p_ilist%p_next => p_listnew
       p_listnew%p_prev => p_ilist
       p_listnew%p_next => null()
       p_tagroot%p_ilist => p_listnew
    else

       ! If not the right most number, then a number before that. Start moving up the list to see
       ! where this goes.
       do while (l_working)
          p_ilist => p_ilist%p_prev
          if (associated(p_ilist)) then      ! p_prev will be null when we reach the first element.
             i_olddata = p_ilist%i_data      ! Otherwise get data for testing.
             if (i_olddata .lt. i_newdata) then    ! If data fits, link into the list.
                p_listtemp => p_ilist%p_next
                p_ilist%p_next => p_listnew
                p_listtemp%p_prev => p_listnew
                p_listnew%p_next => p_listtemp
                p_listnew%p_prev => p_ilist
                l_working = .false.
             end if
          else

             ! We have reached the start of the list, so link it as the new first element.
             p_listtemp => p_tagroot%p_list
             p_tagroot%p_list => p_listnew
             p_listnew%p_prev => null()
             p_listnew%p_next => p_listtemp
             p_listtemp%p_prev => p_listnew
             l_working = .false.
          end if
       end do
    end if
  end subroutine SubNumberLeft
  
    
  subroutine SubNumberRight(i_newdata, p_tagroot, i_ourdebug)
    implicit none

    ! Declare incoming variables and scope.
    integer, intent(in) :: i_newdata, i_ourdebug
    type(t_tagelement), pointer, intent(inout) :: p_tagroot

    ! Declare local variables
    type(t_sortelement), pointer :: p_list, p_ilist, p_listnew, p_listtemp
    integer :: i_olddata
    logical :: l_working
     
    ! Initialize local variables
    l_working = .true.

    ! We need to sub the number to the right so remove the current right from the list.
    p_ilist => p_tagroot%p_ilist
    p_listtemp => p_ilist%p_prev
    write(i_ourdebug, *) i_me, " is deleting: ", p_ilist%i_data, " for ", i_newdata
    deallocate(p_ilist)
    p_tagroot%p_ilist => p_listtemp
    p_listtemp%p_next => null()

    ! Now create a new element and try to put it in place in the list.
    ! Try the easy answer first and work forwards using the double linked list.
    allocate(p_listnew)
    p_listnew%i_data = i_newdata
    p_list => p_tagroot%p_list
    i_olddata = p_list%i_data

    ! Is this the new left hand number? Check and link in if so.
    if (i_olddata .gt. i_newdata) then
       p_list%p_prev => p_listnew
       p_listnew%p_next => p_list
       p_listnew%p_prev => null()
       p_tagroot%p_list => p_listnew
    else

       ! If not the left most number, then a number after that. Start moving down the list to see
       ! where this goes.
       do while (l_working)
          p_list => p_list%p_next
          if (associated(p_list)) then      ! p_next will be null when we reach the last element.
             i_olddata = p_list%i_data      ! Otherwise get data for testing.
             if (i_olddata .gt. i_newdata) then    ! If data fits, link into the list.
                p_listtemp => p_list%p_prev
                p_list%p_prev => p_listnew
                p_listtemp%p_next => p_listnew
                p_listnew%p_prev => p_listtemp
                p_listnew%p_next => p_list
                l_working = .false.
             end if
          else

             ! We have reached the end of the list, so link it as the new last element.
             p_listtemp => p_tagroot%p_ilist
             p_tagroot%p_ilist => p_listnew
             p_listnew%p_next => null()
             p_listnew%p_prev => p_listtemp
             p_listtemp%p_next => p_listnew
             l_working = .false.
          end if
       end do
    end if
  end subroutine SubNumberRight
  
end program

Link to download the file: ParallelSort.f90 - Google Drive

Knarfnarf

2 Likes

Thanks for sharing. I tried to run this code with ifort, but I failed, probably because I did not know how to run it. Several non-standard statements in the code make it non-portable (luckily, ifort recognized the syntax albeit with many warnings). Would it be possible to convert this program to a standalone module so that it can be tested and compared with other routines?

Yeah, sorry. You’ll need CAF to run this. Installing CAF is a story arc from a ways back but suffice to say;

Download coarrays for Fortran from the repository.
Run install once and write down all the dependence’s then quit.
Install the dependency items yourself.
Then try your luck with CAF.

If it works you can run as many parallel tasks as you want with “cafrun -n ### binaryfile” with extra threads being time slices of your actual cores. So that means this code could run on 1000 cores. Could.

Anyone got 1000 cores to try this on? Asking for a friend.

Knarfnarf

The issue does not appear to be CAF. ifort recognizes CAF with /Qcoarray or -coarray flag. The non-portable lines which ifort does not like are exit() sleep(), the formats of the print statements, and similar.
I’d be happy to test it on the supercomputers for which I have allocations, but the program must first become standalone and executable for a successful test on my laptop. Right now, it asks a question from the user. I did not understand it fully, I gave it a number, and it subsequently crashed with the following error,

forrtl: severe (124): Invalid command supplied to EXECUTE_COMMAND_LINE

This happens because,

call execute_command_line('') ! And be sure to cause I/O to dump out!

is a non-portable way of flushing the stdout. One could fix it by replacing the empty string with a whitespace character. But I am not sure if this fixes the problem for all platforms (other than Windows).

Even after fixing this error, the program crashes for other reasons,

forrtl:forrtl: severe (32):forrtl: severe (32):  severe (32): invalid logical unit number, unit -129, file unknown

at which point I gave up on debugging the program.

Actually the shell command statements can be removed. They did and do nothing but help to keep text to the screen in order and I don’t dump to screen any more. The sleep call is a stop. There must be a sleep command you can call in your implementation. Without being about to sleep the thread, we never seem to get the sync to work. It’s what saved the project on my system. You can try it without, but…. And there is an exit command or thread exit in your implementation, there just has to be.

That should be easy for me to fix if you want!

Knarfnarf.

edit for forum reasons;

Ummm… Call sleep (1) is supported by ifort…

Ahh… It’s the space! call sleep(1) no, call sleep (1) yes! AND it still works on my machine. Also the exit was changed to stop.

The new version should now be at the same link;

Have fun!

Knarfnarf

Grrr…. Edit for forum reasons;

Forgot to remove the system statements. Will do that on break today.

Knarfnarf

Oh… Is this that intel stuff…

I had a lot of trouble with that and gave up. The compiler would not install.

I can put another copy of the file into the google drive with replacement lines in it, but I’m less enthusiastic about my chances of getting this running for you.

Let me see what I can do!

Knarfnarf