Concurrency Coursework

Tagged:  

This coursework received a 100% mark. It is a multi-threaded Eratosthenes filter written in Java, which can be shown not to deadlock.

Read more for the source code.

/*
 * Eratosthenes.java
 *
 * Created on 01 November 2002, 01:06
 */

/**
 *
 * @author  Thomas
 */

/* -- Basic Program Life Cycle--

   1) SET-UP

     Get number of primes as x
     Make counter set to x

     make first buffer
     make generator attached to first buffer
     Run generator

     for each of x sieves
       create a new buffer
       make a sieve with
             input  = "buffer made for last sieve"
             output = "the new buffer"
       Run sieve

     make printer linked to buffer of last sieve
     Run printer


   2) FILTERING

     Generator 2 3 4 5 -> Sieve(2) 3 5 -> Sieve(3) 5 ->
... -> Sieve( 5->_ ) 5 -> Printer = 5 ==> [Screen]


   3) CLOSING DOWN

     Printer decrements counter
     Generator checks counter
     When counter = 0
       then generator "poisons" sieve pipe-line and exits
     "Poison" passed down pipeline ending all threads including printer.


 -- Generator Life Cycle --
 * Start initalised with output buffer, counter and limit
 * Print consecutive int to output buffer
 * Until counter > limit
    then pass poison item to pipeline
 * End

 -- Pipe Life Cycle --
 * Start initalised with input and output buffer
 * Set filter to 1st number recieved
    then mark that number to be passed through
    put it on the output buffer
 * Filter numbers
 * Until poisoned item received
    then pass poisoned item right
 * End

 -- Printer Life Cycle --
 * Start initalised with input buffer and counter
 * Read from input buffer
     increemnt counter
     print item
 * Until reads a poisoned item
 End



-- Design Compromises --

 Since threading the Sieve algorithm is already very inefficient, I have decided
 to concentrate on writing a clear program that can be easily read and adapted.

 * Numbers are wrapped Item instances to simplify Sieve code.  This adds extra
   function calls to the Sieve thread.

 * LinkedLists are used for the buffer class instead of arrays.  This is slower
   but more elegant, eg. easy to adapt, 'room for growth', etc.
 
 * Use of 'poisoned' item is a far less efficent way to destroy Sieve threads
   than having them watch the counter.  However it can be implement and generalised
   more easily.  Problems with threads hanging waiting on threads that have
   already finished, etc.

 * Did not use dynamic process creation in order to simplify Sieve class.  This
   led to a large number of sieves simply passing values through without filtering
   them.  This slows the whole pipeline down, especially when filtering large
   numbers.

 * I placed all the classes in one file by static referencing.  I do this on the
   assumption that this system will just be a command-line app, so there is no
   need to create seperate class files.  It would in any case be trivial to remove
   the static references and break the classes apart for reuse.


    */


import java.io.*; // IO package for reading keyboard
import java.util.LinkedList; // LinkedList for implementing buffer


public class Eratosthenes {
 
 public static void main(String[] args) {
      Printer print;  // Holds printer
      Generator gen;  // Holds generator
      Buffer temp1 = new Buffer();  // Used to connect Buffers to Sieves
      Buffer temp2 = new Buffer();  // Used to connect Buffers to Sieves
      Sieve sieves;   // Temporary variable for sieves construction
      
          // Get input
          System.out.println("How many prime numbers ?");  // Print promt
          
          // Set-up keyboard input stream
          String s = "30";  // Default (if we can't read keyboard)
          BufferedReader keyboardInput;
          keyboardInput=new BufferedReader(new InputStreamReader(System.in));
          
          // Grab line
          try {
               s = keyboardInput.readLine();
              }
          catch(IOException e) {
              // Arguably the program should terminate here !
              System.out.println("Error - Defaulting to 30 primes");
              System.out.println(e.toString());
              }

          // Convert string -> integer
          s.trim();  // Remove white-space
          int input = Integer.parseInt(s);  // Parse into an integer

          Counter limit = new Counter(input);  // Make counter

          // Build thread pipeline
          gen = new Generator(temp1, limit);  // Set-up generator
          gen.start();  // Start generator

          // for range numOfPrimes of x
          for(int x=0; x!=input; x++) {
              temp2 = new Buffer();  // set-up buffer x
              sieves = new Sieve(temp1, temp2);  // Create new sieve
              sieves.start();  // Start the new sieve
              temp1 = temp2;  // Shuffle pointers forward
          }

          print = new Printer(temp2, limit); // Set-up printer
          print.start(); // Start printer
    }


    /* -- Sieve Class --
       Thread for the sieve "filter" algorithm.  Linked to an output & input buffer.
       Message passing is handled through the Buffer class which the thread enters.
    */
    public static class Sieve extends Thread  {

        private Item filter;  // Stores interger to filter against
        private Buffer out;  // Buffer to put output onto
        private Buffer in;   // Buffer to get input from

        // Set links to input & output buffer
        public Sieve(Buffer i, Buffer o) {
            out = o;
            in = i;
        }

        public void run() {
            Item x;  // Temporary variable to hold object for comparison

            while(true) {
                x = in.get();  // Get new Item
                if (x.kill()) {  // If "poisoned" Item...
                    out.put(x);  // ..then pass it on ...
                    break;       // ... and die !
                }
                else if (x.pass()) { // If Item is to be passed though...
                    out.put(x);      // then do that.
                }
                else if (null == filter) {  // If filter is unset...
                    filter = new Item(x.getItem()); // ...then set filter...
                    x.setPass();  // ...set the Item to be passed though..
                    out.put(x);  // ...and pass it on (since it is prime).
                }
                else if ( filter.filter(x) ) {  // If Item might be prime...
                    out.put(x);  // ... then pass it to the next sieve to check
                }

            }
        }
    }


    /* -- Buffer Class --
       Buffers up to 10 items in a queue.  Acts as a monitor for the sieves' message
       exchange.  This code is based very loosely on Steven Matthew's handshaking
       class.  Interestingly allowing larger buffers _slows_ execution speed,
       despite allowing the sieves to run more independently, since the overhead
       of message passing exceeds the time taken for the filter operation!
    */
    public static class Buffer {
        
        // Create linked list which is used for queue
        private LinkedList local = new LinkedList();

        public synchronized void put(Item x) {
	// Cap size of list to avoid memory overflow
	  while (10==local.size())
             { try { wait() ; } catch (InterruptedException e) { }
                // Wait here until list is shorter than 10
             }
          local.addFirst( (Object) x ); // Cast to object and add to buffer
          notify();  // Wake-up any waiting thread
        }

        public synchronized Item get() {
	// Wait for there to be something to get
          while (local.isEmpty())
             { try { wait() ; } catch (InterruptedException e) { }
                // Wait here until list is not empty
             }
          notify(); // Wake-up any waiting thread
          return( (Item) local.removeLast() ); // Fetch next item & cast
        }

   }


    /* -- Generator Class --
       Creates a sequence of number to be filtered though the pipeline.
       When the limit counter signals that enough primes have been found, it
       "poisons" the pipeline and terminates.
    */
    public static class Generator extends Thread {
        
        private int count = 2;  // Start generating Items from 2
        private Buffer out;  // Output buffer
        private Item temp; // Temporary variable for building Items
        private Counter limit;  // Link to limit counter 
        
        // Set-up link to output buffer & limit counter
        public Generator(Buffer o, Counter l) {
            out = o;
            limit = l;
        }
        
        public void run() {
            while(!limit.end()) { // While we haven't found enough items...
                temp = new Item(count++); // ... make more of them           
                out.put(temp);  // ... and feed them into pipeline
            }
            temp = new Item();  // Afterwards make "poisoned" item
            out.put(temp);  // ..  feed to pipeline then die!
       }
        
    }
 
    
    /* -- Printer Class --
       Collects and prints the primes.  Also decrements the limit counter, so that
       generator knows when to close down the pipeline.
    */
    public static class Printer extends Thread {
        
        private Buffer in;  // Input buffer
        private Counter limit;  // Link to limit counter

        // Set-up link to output buffer & limit counter
        public Printer(Buffer i, Counter l) {
            in = i;
            limit = l;
        }
              
        public void run() {
            while(true) {
                Item x;
                x = in.get();
                if (x.kill()) {  // If Item "poisoned"
                    break;  // ... then die!
                }
                else if (x.pass()) {  // Otherwise if item to be passed on ...
                System.out.println(x.getItem()); // ... print it out   
                limit.dec(); // Decrement the limit counter
                }
        }
    } 
        
  }


    /* -- Class Item --
       Wraps the numbers as they are passed down the pipeline.  Allows values to be marked
       to be passed through to the end.  OR as the special "poisoned" item, which kills
       the pipeline.
    */
    private static class Item {
        private int local;  // Value of item
        private boolean passthough = false;  // Item to be passed though pipeline
        private boolean kill = false; // Is item "poisoned" ?

        // Self-explanitory
        public Item(int l) {
            local = l;
        }

        public Item() {
            kill = true;
        }

        public int getItem() {
            return(local);
        }

        public boolean pass() {
            return passthough;
        }

        public boolean kill() {
            return kill;
        }

        public void setPass() {
            passthough = true;
    }

        // Returns true iff x is a multiple of Item
        public boolean filter(Item x) {
            return ( 0 != x.getItem() % local);
    }

 }


 /* -- Counter Class --
    Wraps an integer counter to be accessible from two threads: generator & printer.
    Printer uses this to tell generator when we have printed enough primes.
    Generator can then close down the pipe-line. Only printer writes and int
    updates are atomatic, so no sychronization is needed.
    NOTE : It does NOT matter if the generator does not shut-down immediately.
    The pipeline will continue to clear since the printer is still removing items.
    This means that generator will NOT be left hanging on a full output buffer.
    It is inevitable that the pipeline will shut-down after the counter equals
    zero.  The exact time taken will vary depending on interleavings.
 */
    public static class Counter {
        private int val;  // Integer count

        // Self-explanitory
        public Counter(int v) {
            val = v;
        }

        public void dec() {
            if (0!=val) {
                val--;
            }
        }

        public boolean end() {
            return(0==val);
        }

        public int get() {
            return(val);
        }
    }

}