Sorting Revisited #3: Linked List Merge Sort

  • Thread starter Thread starter rcgldr
  • Start date Start date
  • Tags Tags
    Sorting
Click For Summary
SUMMARY

The discussion focuses on implementing a bottom-up merge sort algorithm specifically designed for linked lists, utilizing an array of pointers to optimize the merging process. The algorithm employs a structure defined as NODE and incorporates a function MergeLists to facilitate the merging of two lists. Additionally, a C++ implementation demonstrates a four-way merge sort that outperforms traditional two-way merges by approximately 15% on systems with 16 registers. The conversation also touches on polyphase merge sort techniques, particularly relevant for optimizing sorting operations across multiple stacks.

PREREQUISITES
  • Understanding of linked list data structures
  • Familiarity with C/C++ programming language
  • Knowledge of sorting algorithms, particularly merge sort
  • Experience with pointer manipulation in programming
NEXT STEPS
  • Study the implementation of MergeLists in detail to understand merging logic
  • Explore the performance differences between two-way and four-way merge sorts
  • Investigate polyphase merge sort techniques and their applications
  • Review the use of arrays of pointers for optimizing linked list operations
USEFUL FOR

Software developers, algorithm enthusiasts, and computer science students interested in advanced sorting techniques and linked list optimizations.

rcgldr
Homework Helper
Messages
8,948
Reaction score
687
Links to prior threads
https://www.physicsforums.com/threads/sorting-revisited-again.728448
https://www.physicsforums.com/threads/sorting-revisited.218369
https://www.physicsforums.com/threads/efficient-sorting.181589
https://www.physicsforums.com/threads/merge-sort-using-linked-indexes.786487

Bottom up merge sort for linked list, using a small (26 to 32 ) array of pointers to nodes, where array[ i ] points to a list of size 2 to the power i (or null).

C:
typedef struct NODE_{
struct NODE_ * next;
int data;
}NODE;

/* prototype */
NODE * MergeLists(NODE *pSrc1, NODE *pSrc2);

/* sort list using array of pointers to first nodes of list   */
/* aList[i] = NULL or ptr to list with 2 to the power i nodes */

#define NUMLISTS 32                 /* size of array */
NODE * SortList(NODE *pList)
{
NODE * aList[NUMLISTS];             /* array of lists */
NODE * pNode;
NODE * pNext;
int i;
    if(pList == NULL)               /* check for empty list */
        return NULL;
    for(i = 0; i < NUMLISTS; i++)   /* zero array */
        aList[i] = NULL;
    pNode = pList;                  /* merge nodes into array */
    while(pNode != NULL){
        pNext = pNode->next;
        pNode->next = NULL;
        for(i = 0; (i < NUMLISTS) && (aList[i] != NULL); i++){
            pNode = MergeLists(aList[i], pNode);
            aList[i] = NULL;
        }
        if(i == NUMLISTS)           /* don't go past end of array */
            i--;
        aList[i] = pNode;
        pNode = pNext;
    }
    pNode = NULL;                   /* merge array into one list */
    for(i = 0; i < NUMLISTS; i++)
        pNode = MergeLists(aList[i], pNode);
    return pNode;
}

/* mergelists -  compare uses src2 < src1           */
/* instead of src1 <= src2 to be similar to C++ STL */

NODE * MergeLists(NODE *pSrc1, NODE *pSrc2)
{
NODE *pDst = NULL;                  /* destination head ptr */
NODE **ppDst = &pDst;               /* ptr to head or prev->next */
    if(pSrc1 == NULL)
        return pSrc2;
    if(pSrc2 == NULL)
        return pSrc1;
    while(1){
        if(pSrc2->data < pSrc1->data){  /* if src2 < src1 */
            *ppDst = pSrc2;
            pSrc2 = *(ppDst = &(pSrc2->next));
            if(pSrc2 == NULL){
                *ppDst = pSrc1;
                break;
            }
        } else {                        /* src1 <= src2 */
            *ppDst = pSrc1;
            pSrc1 = *(ppDst = &(pSrc1->next));
            if(pSrc1 == NULL){
                *ppDst = pSrc2;
                break;
            }
        }
    }
    return pDst;
}
 
Last edited by a moderator:
Technology news on Phys.org
C++ example of a 4 way bottom up merge sort using pointers. On a system with 16 registers (like X86 in 64 bit mode), this is about 15% faster than 2 way bottom up merge sort, and about as fast as quicksort.

C:
int * BottomUpMergeSort(int a[], int b[], size_t n)
{
int *p0r;       // ptr to      run 0
int *p0e;       // ptr to end  run 0
int *p1r;       // ptr to      run 1
int *p1e;       // ptr to end  run 1
int *p2r;       // ptr to      run 2
int *p2e;       // ptr to end  run 2
int *p3r;       // ptr to      run 3
int *p3e;       // ptr to end  run 3
int *pax;       // ptr to set of runs in a
int *pbx;       // ptr for merged output to b
size_t rsz = 1; // run size
    if(n < 2)
        return a;
    if(n == 2){
        if(a[0] > a[1])std::swap(a[0],a[1]);
        return a;
    }
    if(n == 3){
        if(a[0] > a[2])std::swap(a[0],a[2]);
        if(a[0] > a[1])std::swap(a[0],a[1]);
        if(a[1] > a[2])std::swap(a[1],a[2]);
        return a;
    }
    while(rsz < n){
        pbx = &b[0];
        pax = &a[0];
        while(pax < &a[n]){
            p0e = rsz + (p0r = pax);
            if(p0e >= &a[n]){
                p0e = &a[n];
                goto cpy10;}
            p1e = rsz + (p1r = p0e);
            if(p1e >= &a[n]){
                p1e = &a[n];
                goto mrg201;}
            p2e = rsz + (p2r = p1e);
            if(p2e >= &a[n]){
                p2e = &a[n];
                goto mrg3012;}
            p3e = rsz + (p3r = p2e);
            if(p3e >= &a[n])
                p3e = &a[n];
            // 4 way merge
            while(1){
                if(*p0r <= *p1r){
                    if(*p2r <= *p3r){
                        if(*p0r <= *p2r){
mrg40:                      *pbx++ = *p0r++;    // run 0 smallest
                            if(p0r < p0e)       // if not end run continue
                                continue;
                            goto mrg3123;       // merge 1,2,3
                        } else {
mrg42:                      *pbx++ = *p2r++;    // run 2 smallest
                            if(p2r < p2e)       // if not end run continue
                                continue;
                            goto mrg3013;       // merge 0,1,3
                        }
                    } else {
                        if(*p0r <= *p3r){
                            goto mrg40;         // run 0 smallext
                        } else {
mrg43:                      *pbx++ = *p3r++;    // run 3 smallest
                            if(p3r < p3e)       // if not end run continue
                                continue;
                            goto mrg3012;       // merge 0,1,2
                        }
                    }
                } else {
                    if(*p2r <= *p3r){
                        if(*p1r <= *p2r){
mrg41:                      *pbx++ = *p1r++;    // run 1 smallest
                            if(p1r < p1e)       // if not end run continue
                                continue;
                            goto mrg3023;       // merge 0,2,3
                        } else {
                            goto mrg42;         // run 2 smallest
                        }
                    } else {
                        if(*p1r <= *p3r){
                            goto mrg41;         // run 1 smallest
                        } else {
                            goto mrg43;         // run 3 smallest
                        }
                    }
                }
            }
            // 3 way merge
mrg3123:    p0r = p1r;
            p0e = p1e;
mrg3023:    p1r = p2r;
            p1e = p2e;
mrg3013:    p2r = p3r;
            p2e = p3e;
mrg3012:    while(1){
                if(*p0r <= *p1r){
                    if(*p0r <= *p2r){
                        *pbx++ = *p0r++;        // run 0 smallest
                        if(p0r < p0e)           // if not end run continue
                            continue;
                        goto mrg212;            // merge 1,2
                    } else {
mrg32:                  *pbx++ = *p2r++;        // run 2 smallest
                        if(p2r < p2e)           // if not end run continue
                            continue;
                        goto mrg201;            // merge 0,1
                    }
                } else {
                    if(*p1r <= *p2r){
                        *pbx++ = *p1r++;        // run 1 smallest
                        if(p1r < p1e)           // if not end run continue
                            continue;
                        goto mrg202;            // merge 0,2
                    } else {
                        goto mrg32;             // run 2 smallest
                    }
                }
            }
mrg212:     p0r = p1r;
            p0e = p1e;
mrg202:     p1r = p2r;
            p1e = p2e;
            // 2 way merge
mrg201:     while(1){
                if(*p0r <= *p1r){
                    *pbx++ = *p0r++;            // run 0 smallest
                    if(p0r < p0e)               // if not end run continue
                        continue;
                    goto cpy11;
                } else {
                    *pbx++ = *p1r++;            // run 1 smallest
                    if(p1r < p1e)               // if not end run continue
                        continue;
                    goto cpy10;
                }
            }
cpy11:      p0r = p1r;
            p0e = p1e;
            // 1 way copy
cpy10:      while (1) {
                *pbx++ = *p0r++;                // copy element
                if (p0r < p0e)                  // if not end of run continue
                    continue;
                break;
            }
            pax += rsz << 2;            // setup for next set of runs
        }
        std::swap(a, b);                // swap ptrs
        rsz <<= 2;                      // quadruple run size
    }
    return a;                           // return sorted array
}
 
Fastest 3 stack sort is polyphase merge sort. Polyphase merge sort was optimal for 3 to 7 tape drives (the stack version is similar to using tape drives with a read reverse option), but these days, it's mostly a solution to a programming puzzle (3 stack sort). The tricky bits are determining the initial ascending versus descending merge (it alternates after that), and keeping track of when run sizes change if the number of elements isn't an exact Fibonacci number.

Wiki article:

http://en.wikipedia.org/wiki/Polyphase_merge_sort

Example Java code using custom stack class that includes a swap member:

C:
class xstack{
    int []ar;                               // array
    int sz;                                 // size
    int sp;                                 // stack pointer
    public xstack(int sz){                  // constructor with size
        this.ar = new int[sz];
        this.sz = sz;
        this.sp = sz;
    }
    public void push(int d){
        this.ar[--sp] = d;
    }
    public int pop(){
        return this.ar[sp++];
    }
    public int peek(){
        return this.ar[sp];
    }
    public boolean empty(){
        return sp == sz;
    }
    public int size(){
        return sz-sp;
    }
    public void swap(xstack othr){
        int []tempar = othr.ar;
        int tempsz = othr.sz;
        int tempsp = othr.sp;
        othr.ar = this.ar;
        othr.sz = this.sz;
        othr.sp = this.sp;
        this.ar = tempar;
        this.sz = tempsz;
        this.sp = tempsp;
    }
}

public class ppmrg3s {
    static final int[] FIBTBL =
   {         0,         1,         1,         2,         3,         5,
             8,        13,        21,        34,        55,        89,
           144,       233,       377,       610,       987,      1597,
          2584,      4181,      6765,     10946,     17711,     28657,
         46368,     75025,    121393,    196418,    317811,    514229,
        832040,   1346269,   2178309,   3524578,   5702887,   9227465,
      14930352,  24157817,  39088169,  63245986, 102334155, 165580141,
     267914296, 433494437, 701408733,1134903170,1836311903};

    // return index of largest fib() <= n
    static int flfib(int n)
    {
    int lo = 0;
    int hi = 47;
        while((hi - lo) > 1){
            int i = (lo + hi)/2;
            if(n < FIBTBL[i]){
                hi = i;
                continue;
            }
            if(n > FIBTBL[i]){
                lo = i;
                continue;
            }
            return i;
        }
        return lo;
    }

    // poly phase merge sort using 3 stacks
    static void ppmrg3s(xstack a, xstack b, xstack c)
    {
        if(a.size() < 2)
            return;
        int ars = 1;                        // init run sizes
        int brs = 1;
        int asc = 0;                        // no size change
        int bsc = 0;
        int csc = 0;
        int scv = 0-1;                      // size change value
        boolean dsf;                        // == 1 if descending sequence
        {                                   // block for local variable scope
            int f = flfib(a.size());        // FIBTBL[f+1] > size >= FIBTBL[f]
            dsf = ((f%3) == 0);             // init compare flag
            if(FIBTBL[f] == a.size()){      // if exact fibonacci size,
                for (int i = 0; i < FIBTBL[f - 1]; i++) { //  move to b
                    b.push(a.pop());
                }
            } else {                        // else move to b, c
                // update compare flag
                dsf ^= 1 == ((a.size() - FIBTBL[f]) & 1);
                // i = excess run count
                int i = a.size() - FIBTBL[f];
                // j = dummy run count
                int j = FIBTBL[f + 1] - a.size();
                // move excess elements to b
                do{
                    b.push(a.pop());
                }while(0 != --i);
                // move dummy count elements to c
                do{
                    c.push(a.pop());
                }while(0 != --j);
                csc = c.size();
            }
        }                                   // end block
        while(true){                        // setup to merge pair of runs
            if(asc == a.size()){            // check for size count change
                ars += scv;                 //   (due to dummy run size == 0)
                scv = 0-scv;
                asc = 0;
                csc = c.size();
            }
            if(bsc == b.size()){
                brs += scv;
                scv = 0-scv;
                bsc = 0;
                csc = c.size();
            }
            int arc = ars;                  // init run counters
            int brc = brs;
            while(true){                    // merging pair of runs
                if(dsf ^ (a.peek() <= b.peek())){
                    c.push(a.pop());        // move a to c
                    if(--arc != 0)          // if not end a
                        continue;           //   continue back to compare
                    do{                     // else move rest of b run to c
                        c.push(b.pop());
                    }while(0 != --brc);
                    break;                  //   and break
                } else {
                    c.push(b.pop());        // move b to c
                    if(0 != --brc)          // if not end b
                        continue;           //   continue back to compare
                    do{                     // else move rest of a run to c
                        c.push(a.pop());
                    }while(0 != --arc);
                    break;                  //   and break
                }
            }                               // end merge pair of runs
            dsf ^= true;                    // toggle compare flag
            if(b.empty()){                  // if end b
                if(a.empty())               //   if end a, done
                    break;
                b.swap(c);                  //   swap b, c
                brs += ars;
                if (0 == asc)
                    bsc = csc;
            } else {                        // else not end b
                if(!a.empty())              //   if not end a
                    continue;               //     continue back to setup next merge
                a.swap(c);                  //   swap a, c
                ars += brs;
                if (0 == bsc)
                    asc = csc;
            }
        }
        a.swap(c);                          // return sorted stack in a
    }
}

Example C++ code using arrays with pointers as stacks:

Code:
typedef unsigned int uint32_t;

static size_t fibtbl[48] =
    {        0,         1,         1,         2,         3,         5,
             8,        13,        21,        34,        55,        89,
           144,       233,       377,       610,       987,      1597,
          2584,      4181,      6765,     10946,     17711,     28657,
         46368,     75025,    121393,    196418,    317811,    514229,
        832040,   1346269,   2178309,   3524578,   5702887,   9227465,
      14930352,  24157817,  39088169,  63245986, 102334155, 165580141,
     267914296, 433494437, 701408733,1134903170,1836311903,2971215073};

// binary search: return index of largest fib() <= n
size_t flfib(size_t n)
{
size_t lo = 0;
size_t hi = sizeof(fibtbl)/sizeof(fibtbl[0]);
    while((hi - lo) > 1){
        size_t i = (lo + hi)/2;
        if(n < fibtbl[i]){
            hi = i;
            continue;
        }
        if(n > fibtbl[i]){
            lo = i;
            continue;
        }
        return i;
    }
    return lo;
}

// poly phase merge sort array using 3 arrays as stacks, a is source
uint32_t * ppmrg3(uint32_t a[], uint32_t b[], uint32_t c[], size_t n)
{
    if(n < 2)
        return a+n;
    uint32_t *asp = a;                  // a stack ptr
    uint32_t *aer;                      // a end run
    uint32_t *aes = a + n;              // a end
    uint32_t *bsp = b + n;              // b stack ptr
    uint32_t *ber;                      // b end run
    uint32_t *bes = b + n;              // b end
    uint32_t *csp = c + n;              // c stack ptr
    uint32_t *ces = c + n;              // c end
    uint32_t *rsp = 0;                  // run size change stack ptr
    size_t ars = 1;                     // run sizes
    size_t brs = 1;
    size_t scv = 0-1;                   // size change increment / decrement
    bool dsf;                           // == 1 if descending sequence
    {                                   // block for local variable scope
        size_t f = flfib(n);            // fibtbl[f+1] > n >= fibtbl[f]
        dsf = ((f%3) == 0);             // init compare flag
        if(fibtbl[f] == n){             // if exact fibonacci size, move to b
            for(size_t i = 0; i < fibtbl[f-1]; i++)
                *(--bsp) = *(asp++);
        } else {                        // else move to b, c
            // update compare flag
            dsf ^= (n - fibtbl[f]) & 1;
            // move excess elements to b
            aer = a + n - fibtbl[f];
            while (asp < aer)
                *(--bsp) = *(asp++);
            // move dummy count elements to c
            aer = a + fibtbl[f - 1];
            while (asp < aer)
                *(--csp) = *(asp++);
            rsp = csp;                  // set run size change ptr
        }
    }                                   // end local variable block
    while(1){                           // setup to merge pair of runs
        if(asp == rsp){                 // check for size count change
            ars += scv;                 //   (due to dummy run size == 0)
            scv = 0-scv;
            rsp = csp;
        }
        if(bsp == rsp){
            brs += scv;
            scv = 0-scv;
            rsp = csp;
        }
        aer = asp + ars;                // init end run ptrs
        ber = bsp + brs;
        while(1){                       // merging pair of runs
            if(dsf ^ (*asp <= *bsp)){   // if a <= b
                *(--csp) = *(asp++);    //   move a to c
                if(asp != aer)          //   if not end a run
                    continue;           //     continue back to compare
                do                      //   else move rest of b run to c
                    *(--csp) = *(bsp++);
                while (bsp < ber);
                break;                  //     and break
            } else {                    // else a > b
                *(--csp) = *(bsp++);    //   move b to c
                if(bsp != ber)          //   if not end b run
                    continue;           //     continue back to compare
                do                      //   else move rest of a run to c
                    *(--csp) = *(asp++);
                while (asp < aer);
                break;                  //     and break
            }
        }                               // end merging pair of runs
        dsf ^= 1;                       // toggle compare flag
        if(bsp == bes){                 // if b empty
            if(asp == aes)              //   if a empty, done
                break;
            std::swap(bsp, csp);        //   swap b, c
            std::swap(bes, ces);
            brs += ars;                 //   update run size
        } else {                        // else b not empty
            if(asp != aes)              //   if a not empty
                continue;               //     continue back to setup next merge
            std::swap(asp, csp);        //   swap a, c
            std::swap(aes, ces);
            ars += brs;                 //   update run size
        }
    }
    return csp;                          // return ptr to sorted stack
}
 

Similar threads

  • · Replies 1 ·
Replies
1
Views
2K
  • · Replies 8 ·
Replies
8
Views
2K
  • · Replies 20 ·
Replies
20
Views
5K
  • · Replies 8 ·
Replies
8
Views
2K
  • · Replies 7 ·
Replies
7
Views
3K
  • · Replies 5 ·
Replies
5
Views
4K
  • · Replies 3 ·
Replies
3
Views
2K
  • · Replies 1 ·
Replies
1
Views
2K
  • · Replies 2 ·
Replies
2
Views
2K
  • · Replies 1 ·
Replies
1
Views
11K