# Sorting revisited #3

1. Aug 25, 2016

### rcgldr

Bottom up merge sort for linked list, using a small (26 to 32 ) array of pointers to nodes, where array[ i ] points to a list of size 2 to the power i (or null).

Code (C):

typedef struct NODE_{
struct NODE_ * next;
int data;
}NODE;

/* prototype */
NODE * MergeLists(NODE *pSrc1, NODE *pSrc2);

/* sort list using array of pointers to first nodes of list   */
/* aList[i] = NULL or ptr to list with 2 to the power i nodes */

#define NUMLISTS 32                 /* size of array */
NODE * SortList(NODE *pList)
{
NODE * aList[NUMLISTS];             /* array of lists */
NODE * pNode;
NODE * pNext;
int i;
if(pList == NULL)               /* check for empty list */
return NULL;
for(i = 0; i < NUMLISTS; i++)   /* zero array */
aList[i] = NULL;
pNode = pList;                  /* merge nodes into array */
while(pNode != NULL){
pNext = pNode->next;
pNode->next = NULL;
for(i = 0; (i < NUMLISTS) && (aList[i] != NULL); i++){
pNode = MergeLists(aList[i], pNode);
aList[i] = NULL;
}
if(i == NUMLISTS)           /* don't go past end of array */
i--;
aList[i] = pNode;
pNode = pNext;
}
pNode = NULL;                   /* merge array into one list */
for(i = 0; i < NUMLISTS; i++)
pNode = MergeLists(aList[i], pNode);
return pNode;
}

/* mergelists -  compare uses src2 < src1           */
/* instead of src1 <= src2 to be similar to C++ STL */

NODE * MergeLists(NODE *pSrc1, NODE *pSrc2)
{
NODE *pDst = NULL;                  /* destination head ptr */
NODE **ppDst = &pDst;               /* ptr to head or prev->next */
if(pSrc1 == NULL)
return pSrc2;
if(pSrc2 == NULL)
return pSrc1;
while(1){
if(pSrc2->data < pSrc1->data){  /* if src2 < src1 */
*ppDst = pSrc2;
pSrc2 = *(ppDst = &(pSrc2->next));
if(pSrc2 == NULL){
*ppDst = pSrc1;
break;
}
} else {                        /* src1 <= src2 */
*ppDst = pSrc1;
pSrc1 = *(ppDst = &(pSrc1->next));
if(pSrc1 == NULL){
*ppDst = pSrc2;
break;
}
}
}
return pDst;
}

Last edited by a moderator: Aug 25, 2016
2. Aug 25, 2016

### rcgldr

C++ example of a 4 way bottom up merge sort using pointers. On a system with 16 registers (like X86 in 64 bit mode), this is about 15% faster than 2 way bottom up merge sort, and about as fast as quicksort.

Code (C):

int * BottomUpMergeSort(int a[], int b[], size_t n)
{
int *p0r;       // ptr to      run 0
int *p0e;       // ptr to end  run 0
int *p1r;       // ptr to      run 1
int *p1e;       // ptr to end  run 1
int *p2r;       // ptr to      run 2
int *p2e;       // ptr to end  run 2
int *p3r;       // ptr to      run 3
int *p3e;       // ptr to end  run 3
int *pax;       // ptr to set of runs in a
int *pbx;       // ptr for merged output to b
size_t rsz = 1; // run size
if(n < 2)
return a;
if(n == 2){
if(a[0] > a[1])std::swap(a[0],a[1]);
return a;
}
if(n == 3){
if(a[0] > a[2])std::swap(a[0],a[2]);
if(a[0] > a[1])std::swap(a[0],a[1]);
if(a[1] > a[2])std::swap(a[1],a[2]);
return a;
}
while(rsz < n){
pbx = &b[0];
pax = &a[0];
while(pax < &a[n]){
p0e = rsz + (p0r = pax);
if(p0e >= &a[n]){
p0e = &a[n];
goto cpy10;}
p1e = rsz + (p1r = p0e);
if(p1e >= &a[n]){
p1e = &a[n];
goto mrg201;}
p2e = rsz + (p2r = p1e);
if(p2e >= &a[n]){
p2e = &a[n];
goto mrg3012;}
p3e = rsz + (p3r = p2e);
if(p3e >= &a[n])
p3e = &a[n];
// 4 way merge
while(1){
if(*p0r <= *p1r){
if(*p2r <= *p3r){
if(*p0r <= *p2r){
mrg40:                      *pbx++ = *p0r++;    // run 0 smallest
if(p0r < p0e)       // if not end run continue
continue;
goto mrg3123;       // merge 1,2,3
} else {
mrg42:                      *pbx++ = *p2r++;    // run 2 smallest
if(p2r < p2e)       // if not end run continue
continue;
goto mrg3013;       // merge 0,1,3
}
} else {
if(*p0r <= *p3r){
goto mrg40;         // run 0 smallext
} else {
mrg43:                      *pbx++ = *p3r++;    // run 3 smallest
if(p3r < p3e)       // if not end run continue
continue;
goto mrg3012;       // merge 0,1,2
}
}
} else {
if(*p2r <= *p3r){
if(*p1r <= *p2r){
mrg41:                      *pbx++ = *p1r++;    // run 1 smallest
if(p1r < p1e)       // if not end run continue
continue;
goto mrg3023;       // merge 0,2,3
} else {
goto mrg42;         // run 2 smallest
}
} else {
if(*p1r <= *p3r){
goto mrg41;         // run 1 smallest
} else {
goto mrg43;         // run 3 smallest
}
}
}
}
// 3 way merge
mrg3123:    p0r = p1r;
p0e = p1e;
mrg3023:    p1r = p2r;
p1e = p2e;
mrg3013:    p2r = p3r;
p2e = p3e;
mrg3012:    while(1){
if(*p0r <= *p1r){
if(*p0r <= *p2r){
*pbx++ = *p0r++;        // run 0 smallest
if(p0r < p0e)           // if not end run continue
continue;
goto mrg212;            // merge 1,2
} else {
mrg32:                  *pbx++ = *p2r++;        // run 2 smallest
if(p2r < p2e)           // if not end run continue
continue;
goto mrg201;            // merge 0,1
}
} else {
if(*p1r <= *p2r){
*pbx++ = *p1r++;        // run 1 smallest
if(p1r < p1e)           // if not end run continue
continue;
goto mrg202;            // merge 0,2
} else {
goto mrg32;             // run 2 smallest
}
}
}
mrg212:     p0r = p1r;
p0e = p1e;
mrg202:     p1r = p2r;
p1e = p2e;
// 2 way merge
mrg201:     while(1){
if(*p0r <= *p1r){
*pbx++ = *p0r++;            // run 0 smallest
if(p0r < p0e)               // if not end run continue
continue;
goto cpy11;
} else {
*pbx++ = *p1r++;            // run 1 smallest
if(p1r < p1e)               // if not end run continue
continue;
goto cpy10;
}
}
cpy11:      p0r = p1r;
p0e = p1e;
// 1 way copy
cpy10:      while (1) {
*pbx++ = *p0r++;                // copy element
if (p0r < p0e)                  // if not end of run continue
continue;
break;
}
pax += rsz << 2;            // setup for next set of runs
}
std::swap(a, b);                // swap ptrs
rsz <<= 2;                      // quadruple run size
}
return a;                           // return sorted array
}

3. Aug 25, 2016

### rcgldr

Fastest 3 stack sort is polyphase merge sort. Polyphase merge sort was optimal for 3 to 7 tape drives (the stack version is similar to using tape drives with a read reverse option), but these days, it's mostly a solution to a programming puzzle (3 stack sort). The tricky bits are determining the initial ascending versus descending merge (it alternates after that), and keeping track of when run sizes change if the number of elements isn't an exact Fibonacci number.

Wiki article:

http://en.wikipedia.org/wiki/Polyphase_merge_sort

Example Java code using custom stack class that includes a swap member:

Code (C):

class xstack{
int []ar;                               // array
int sz;                                 // size
int sp;                                 // stack pointer
public xstack(int sz){                  // constructor with size
this.ar = new int[sz];
this.sz = sz;
this.sp = sz;
}
public void push(int d){
this.ar[--sp] = d;
}
public int pop(){
return this.ar[sp++];
}
public int peek(){
return this.ar[sp];
}
public boolean empty(){
return sp == sz;
}
public int size(){
return sz-sp;
}
public void swap(xstack othr){
int []tempar = othr.ar;
int tempsz = othr.sz;
int tempsp = othr.sp;
othr.ar = this.ar;
othr.sz = this.sz;
othr.sp = this.sp;
this.ar = tempar;
this.sz = tempsz;
this.sp = tempsp;
}
}

public class ppmrg3s {
static final int[] FIBTBL =
{         0,         1,         1,         2,         3,         5,
8,        13,        21,        34,        55,        89,
144,       233,       377,       610,       987,      1597,
2584,      4181,      6765,     10946,     17711,     28657,
46368,     75025,    121393,    196418,    317811,    514229,
832040,   1346269,   2178309,   3524578,   5702887,   9227465,
14930352,  24157817,  39088169,  63245986, 102334155, 165580141,
267914296, 433494437, 701408733,1134903170,1836311903};

// return index of largest fib() <= n
static int flfib(int n)
{
int lo = 0;
int hi = 47;
while((hi - lo) > 1){
int i = (lo + hi)/2;
if(n < FIBTBL[i]){
hi = i;
continue;
}
if(n > FIBTBL[i]){
lo = i;
continue;
}
return i;
}
return lo;
}

// poly phase merge sort using 3 stacks
static void ppmrg3s(xstack a, xstack b, xstack c)
{
if(a.size() < 2)
return;
int ars = 1;                        // init run sizes
int brs = 1;
int asc = 0;                        // no size change
int bsc = 0;
int csc = 0;
int scv = 0-1;                      // size change value
boolean dsf;                        // == 1 if descending sequence
{                                   // block for local variable scope
int f = flfib(a.size());        // FIBTBL[f+1] > size >= FIBTBL[f]
dsf = ((f%3) == 0);             // init compare flag
if(FIBTBL[f] == a.size()){      // if exact fibonacci size,
for (int i = 0; i < FIBTBL[f - 1]; i++) { //  move to b
b.push(a.pop());
}
} else {                        // else move to b, c
// update compare flag
dsf ^= 1 == ((a.size() - FIBTBL[f]) & 1);
// i = excess run count
int i = a.size() - FIBTBL[f];
// j = dummy run count
int j = FIBTBL[f + 1] - a.size();
// move excess elements to b
do{
b.push(a.pop());
}while(0 != --i);
// move dummy count elements to c
do{
c.push(a.pop());
}while(0 != --j);
csc = c.size();
}
}                                   // end block
while(true){                        // setup to merge pair of runs
if(asc == a.size()){            // check for size count change
ars += scv;                 //   (due to dummy run size == 0)
scv = 0-scv;
asc = 0;
csc = c.size();
}
if(bsc == b.size()){
brs += scv;
scv = 0-scv;
bsc = 0;
csc = c.size();
}
int arc = ars;                  // init run counters
int brc = brs;
while(true){                    // merging pair of runs
if(dsf ^ (a.peek() <= b.peek())){
c.push(a.pop());        // move a to c
if(--arc != 0)          // if not end a
continue;           //   continue back to compare
do{                     // else move rest of b run to c
c.push(b.pop());
}while(0 != --brc);
break;                  //   and break
} else {
c.push(b.pop());        // move b to c
if(0 != --brc)          // if not end b
continue;           //   continue back to compare
do{                     // else move rest of a run to c
c.push(a.pop());
}while(0 != --arc);
break;                  //   and break
}
}                               // end merge pair of runs
dsf ^= true;                    // toggle compare flag
if(b.empty()){                  // if end b
if(a.empty())               //   if end a, done
break;
b.swap(c);                  //   swap b, c
brs += ars;
if (0 == asc)
bsc = csc;
} else {                        // else not end b
if(!a.empty())              //   if not end a
continue;               //     continue back to setup next merge
a.swap(c);                  //   swap a, c
ars += brs;
if (0 == bsc)
asc = csc;
}
}
a.swap(c);                          // return sorted stack in a
}
}

Example C++ code using arrays with pointers as stacks:

Code (Text):

typedef unsigned int uint32_t;

static size_t fibtbl[48] =
{        0,         1,         1,         2,         3,         5,
8,        13,        21,        34,        55,        89,
144,       233,       377,       610,       987,      1597,
2584,      4181,      6765,     10946,     17711,     28657,
46368,     75025,    121393,    196418,    317811,    514229,
832040,   1346269,   2178309,   3524578,   5702887,   9227465,
14930352,  24157817,  39088169,  63245986, 102334155, 165580141,
267914296, 433494437, 701408733,1134903170,1836311903,2971215073};

// binary search: return index of largest fib() <= n
size_t flfib(size_t n)
{
size_t lo = 0;
size_t hi = sizeof(fibtbl)/sizeof(fibtbl[0]);
while((hi - lo) > 1){
size_t i = (lo + hi)/2;
if(n < fibtbl[i]){
hi = i;
continue;
}
if(n > fibtbl[i]){
lo = i;
continue;
}
return i;
}
return lo;
}

// poly phase merge sort array using 3 arrays as stacks, a is source
uint32_t * ppmrg3(uint32_t a[], uint32_t b[], uint32_t c[], size_t n)
{
if(n < 2)
return a+n;
uint32_t *asp = a;                  // a stack ptr
uint32_t *aer;                      // a end run
uint32_t *aes = a + n;              // a end
uint32_t *bsp = b + n;              // b stack ptr
uint32_t *ber;                      // b end run
uint32_t *bes = b + n;              // b end
uint32_t *csp = c + n;              // c stack ptr
uint32_t *ces = c + n;              // c end
uint32_t *rsp = 0;                  // run size change stack ptr
size_t ars = 1;                     // run sizes
size_t brs = 1;
size_t scv = 0-1;                   // size change increment / decrement
bool dsf;                           // == 1 if descending sequence
{                                   // block for local variable scope
size_t f = flfib(n);            // fibtbl[f+1] > n >= fibtbl[f]
dsf = ((f%3) == 0);             // init compare flag
if(fibtbl[f] == n){             // if exact fibonacci size, move to b
for(size_t i = 0; i < fibtbl[f-1]; i++)
*(--bsp) = *(asp++);
} else {                        // else move to b, c
// update compare flag
dsf ^= (n - fibtbl[f]) & 1;
// move excess elements to b
aer = a + n - fibtbl[f];
while (asp < aer)
*(--bsp) = *(asp++);
// move dummy count elements to c
aer = a + fibtbl[f - 1];
while (asp < aer)
*(--csp) = *(asp++);
rsp = csp;                  // set run size change ptr
}
}                                   // end local variable block
while(1){                           // setup to merge pair of runs
if(asp == rsp){                 // check for size count change
ars += scv;                 //   (due to dummy run size == 0)
scv = 0-scv;
rsp = csp;
}
if(bsp == rsp){
brs += scv;
scv = 0-scv;
rsp = csp;
}
aer = asp + ars;                // init end run ptrs
ber = bsp + brs;
while(1){                       // merging pair of runs
if(dsf ^ (*asp <= *bsp)){   // if a <= b
*(--csp) = *(asp++);    //   move a to c
if(asp != aer)          //   if not end a run
continue;           //     continue back to compare
do                      //   else move rest of b run to c
*(--csp) = *(bsp++);
while (bsp < ber);
break;                  //     and break
} else {                    // else a > b
*(--csp) = *(bsp++);    //   move b to c
if(bsp != ber)          //   if not end b run
continue;           //     continue back to compare
do                      //   else move rest of a run to c
*(--csp) = *(asp++);
while (asp < aer);
break;                  //     and break
}
}                               // end merging pair of runs
dsf ^= 1;                       // toggle compare flag
if(bsp == bes){                 // if b empty
if(asp == aes)              //   if a empty, done
break;
std::swap(bsp, csp);        //   swap b, c
std::swap(bes, ces);
brs += ars;                 //   update run size
} else {                        // else b not empty
if(asp != aes)              //   if a not empty
continue;               //     continue back to setup next merge
std::swap(asp, csp);        //   swap a, c
std::swap(aes, ces);
ars += brs;                 //   update run size
}
}
return csp;                          // return ptr to sorted stack
}