Background

while (true) {
    /* produce an item in next_produced */

    while (count == BUFFER_SIZE); // do nothing

    buffer[in] = next_produced;
    in = (in + 1) % BUFFER_SIZE;
    count++;
}

...

while (true) {
    while (count == 0); // do nothing

    next_consumed = buffer[out];
    out = (out + 1) % BUFFER_SIZE;
    count--;

    /* consume the item in next_consumed */
}
#include <stdio.h>
#include <pthread.h>

int sum = 0;

void *run(void *param)
{
    int i;
    for (i = 0; i < 10000; i++)
        sum++;
    pthread_exit(0);
}

int main()
{
    pthread_t tid1, tid2;
    pthread_create(&tid1, NULL, run, NULL);
    pthread_create(&tid2, NULL, run, NULL);
    pthread_join(tid1, NULL);
    pthread_join(tid2, NULL);
    printf("%d\n", sum);
}
int sum = 0;

void *run1(void *param)
{
    int i;
    for (i = 0; i < 10000; i++)
        sum++;
    pthread_exit(0);
}

void *run2(void *param)
{
    int i;
    for (i = 0; i < 10000; i++)
        sum--;
    pthread_exit(0);
}

int main()
{
    pthread_t tid1, tid2;
    pthread_create(&tid1, NULL, run1, NULL);
    pthread_create(&tid2, NULL, run2, NULL);
    pthread_join(tid1, NULL);
    pthread_join(tid2, NULL);
    printf("%d\n", sum);
}
Example
Example

Race Condition

class RunnableTwo implements Runnable {
    static int count = 0;

    @Override
    public void run() {
        for (int i = 0; i < 10000; i++)
            count++;
    }
}

public class RaceCondition2 {
    public static void main(String[] args) throws Exception {
        RunnableTwo run1 = new RunnableTwo();
        RunnableTwo run2 = new RunnableTwo();
        Thread t1 = new Thread(run1);
        Thread t2 = new Thread(run2);
        t1.start(); t2.start();
        t1.join(); t2.join();
        System.out.println("Result: " + RunnableTwo.count);
    }
}

The Critical Section Problem

while (true) {
    ...
    /*
    * Entry Section
    */

    /*
    * Critical Section
    */

    /*
    * Exit Section
    */

    /*
    * Remainder Section
    */
    ...
}
RaceCondition
Race condition when assigning a pid

Petersonโ€™s Solution

/*
* The structure of process Pi in Perterson's solution
*/

int turn;
boolean flag[2];

while (true) {
    flag[i] = true;
    turn = j;
    while (flag[j] && turn == j);

    /* critical section */

    flag[i] = false;

    /* remainder section */
}
#include <stdio.h>
#include <pthread.h>

#define true 1
#define false 0

int sum = 0;

int turn;
int flag[2];

int main()
{
    pthread_t tid1, tid2;
    pthread_create(&tid1, NULL, producer, NULL);
    pthread_create(&tid2, NULL, consumer, NULL);
    pthread_join(tid1, NULL);
    pthread_join(tid2, NULL);
    printf("sum = %d\n", sum);
}

void *producer(void *param)
{
    int k;
    for (k = 0; k < 10000; k++) {
        /* entry section */
        flag[0] = true;
        turn = 1;
        while (flag[1] && turn == 1);

        /* critical section */
        sum++;

        /* exit section */
        flag[0] = false;

        /* remainder section */
    }
    pthread_exit(0);
}

void *consumer(void *param)
{
    int k;
    for (k = 0; k < 10000; k++) {
        /* entry section */
        flag[1] = true;
        turn = 0;
        while (flag[0] && turn == 0);

        /* critical section */
        sum--;

        /* exit section */
        flag[1] = false;

        /* remainder section */
    }
    pthread_exit(0);
}

Hardware Support for Synchronization

Atomicity

/* Definition of the atomic test_and_set() instruction */

boolean test_and_set (boolean *target) {
    boolean rv = *target;
    *target = true;

    return rv
}

/* Mutual-exclusion implementation with test_and_set() */

do {
    while (test_and_set(&lock)); // do nothing

    /* critical section */

    lock = false;

    /* remainder section */
} while (true);
/* Definition of the atomic compare_and_swap instruction */

int compare_and_swap (int *value, int expected, int new_value) {
    int temp = *value;

    if (*value == expected)
        *value = new_value;

    return temp;
}

/* Mutual-exclusion implementation with compare_and_swap() */

while (true) {
    while (compare_and_swap(&lock, 0, 1)); // do nothing

    /* critical section */

    lock = 0;

    /* remainder section */
}

Atomic Variable

/*
* Java implementation of Petersonโ€™s solution with Atomic variable
*/

import java.util.concurrent.atomic.AtomicBoolean;

public class Peterson2 {

    static int count = 0;

    static int turn = 0;
    static AtomicBoolean[] flag;
    static {
        flag = new AtomicBoolean[2];
        for (int i = 0; i < flag.length; i++)
            flag[i] = new AtomicBoolean();
    }

    static class Producer implements Runnable {
        @Override
        public void run() {
            for (int k = 0; k < 100000; k++) {
                /* entry section */
                flag[0].set(true);
                turn = 1;
                while (flag[1].get() && turn == 1);

                /* critical section */
                count++;

                /* exit section */
                flag[0].set(false);

                /* remainder section */
            }
        }
    }

    static class Consumer implements Runnable {
        @Override
        public void run() {
            for (int k = 0; k < 100000; k++) {
                /* entry section */
                flag[1].set(true);
                turn = 0;
                while (flag[0].get() && turn == 0);
                
                /* critical section */
                count--;

                /* exit section */
                flag[1].set(false);

                /* remainder section */
            }
        }
    }
}

Higher-level software tools to solve the CSP

* **Mutex Locks**: the simplest tools for synchronization.
* **Semaphore**: more robust, convenient, and effective tool.
* **Monitor**: overcomes the demerits of mutex and semaphore.
* **Liveness**: ensures for processes to make progress.

Mutex Locks

while (true) {
    // acquire lock

    /*
    * critical section
    */

    // release lock

    /*
    * remainder section
    */
}
acquire() {
    while (!available); // busy wait

    available = false;
}

realease() {
    available = true;
}

Busy Waiting

Spin lock

#include <stdio.h>
#include <pthread.h>

int sum = 0; // a shared variable

pthread_mutex_t mutex;

void *counter(void *param)
{
    int k;
    for (k = 0; k < 10000; k++) {
        /* entry section */
        pthread_mutex_lock(&mutex);

        /* critical section */
        sum++;

        /* exit section */
        pthread_mutex_unlock(&mutex);

        /* remainder section */
    }
    pthread_exit(0);
}

int main()
{
    pthread_t tid1, tid2;
    pthread_mutex_init(&mutex, NULL);
    pthread_create(&tid1, NULL, counter, NULL);
    pthread_create(&tid2, NULL, counter, NULL);
    pthread_join(tid1, NULL);
    pthread_join(tid2, NULL);
    printf("sum = %d\n", sum);
}

Semaphore

Definition of wait() and signal():

wait(S) {
    while (S <= 0); // busy wait
    S--;
}

signal(S) {
    S++;
}

โ€ข All modifications to the integer value of the semaphore in the wait() and signal() operations must be executed atomically.

Binary and Counting Semaphores

    typedef struct {
        int value;
        struct process *list;
    } semaphore;

    wait(semaphore *S) {
        S->value--;
        if (S -> value < 0) {
            add this process to S->list;
            sleep();
        }
    }

    signal (semaphore *S) {
        S -> value++;
        if (S -> value <= 0) {
            remove a process P from S->list;
            wakeup(P);
        }
    }
#include <stdio.h>
#include <pthread.h>
#include <semaphore.h>

int sum = 0; // a shared variable

sem_t sem;

void *counter(void *param)
{
    int k;
    for (k = 0; k < 10000; k++) {
        /* entry section */
        sem_wait(&sem);

        /* critical section */
        sum++;

        /* exit section */
        sem_post(&sem);

        /* remainder section */
    }
    pthread_exit(0);
}

int main()
{
    pthread_t tid[5]; int i;
    sem_init(&sem, 0, 1); // thread๋Š” 5๊ฐœ์ง€๋งŒ, instance๋Š” sum์œผ๋กœ 1๊ฐœ. ๋”ฐ๋ผ์„œ, binary semaphore๋ฅผ ์“ฐ๋Š” ๊ฒŒ ๋” ์ ํ•ฉ
    for (i = 0; i < 5; i++)
        pthread_create(&tid[i], NULL, counter, NULL);
    for (i = 0; i < 5; i++)
        pthread_join(tid[i], NULL);
    printf("sum = %d\n", sum);
}

Monitor

signal (mutex);
...
critical section
...
wait (mutex);
wait (mutex);
...
critical section
...
wait(mutex)

Monitor Type

monitor monitor_name 
{
    /* shared vairable declarations */

    function P1 (...) {
        ...
    }

    function P2 (...) {
        ...
    }

    ...

    function Pn (...) {
        ...
    }

    initialization_code (...) {
        ...
    }
}
Schema
Schematic view of a monitorl

Condition Variables

condition x, y;

x.wait();

y.wait();
Schema2
Monitor with condition variables

Java Monitors

synchronized (object) {
// critical section
}

public synchronized void add() {
// critical section 
}

Java Synchronization Examples

public class SynchExample1 {

    static class Counter {
        public static int count = 0;
        public static void increment() {
            count++;
            }
    }

    static class MyRunnable implements Runnable {
        @Override
        public void run() {
            for (int i = 0; i < 10000; i++)
                Counter.increment();
        }
    }

    public static void main(String[] args) throws Exception {
        Thread[] threads = new Thread[5];
        for (int i = 0; i < threads.length; i++) {
            threads[i] = new Thread(new MyRunnable());
            threads[i].start();
        }
        for (int i = 0; i < threads.length; i++)
            threads[i].join();
        System.out.println("counter = " + Counter.count);
    }
}
public class SynchExample2 {
    static class Counter {
        public static int count = 0;
        synchronized public static void increment() { 
            count++; 
        }
    }

    static class MyRunnable implements Runnable {
        @Override
        public void run() {
            for (int i = 0; i < 10000; i++)
                Counter.increment();
        }
    }

    public static void main(String[] args) throws Exception {
        Thread[] threads = new Thread[5];
        for (int i = 0; i < threads.length; i++) {
            threads[i] = new Thread(new MyRunnable());
            threads[i].start();
        }
        for (int i = 0; i < threads.length; i++)
            threads[i].join();
        System.out.println("counter = " + Counter.count);
    }
}
public class SynchExample3 {
    static class Counter {
        private static Object object = new Object();
        public static int count = 0;
        public static void increment() {
            synchronized (object) {
                count++;
            }
        }
    }

    static class MyRunnable implements Runnable {
        @Override
        public void run() {
            for (int i = 0; i < 10000; i++)
                Counter.increment();
        }
    }

    public static void main(String[] args) throws Exception {
        Thread[] threads = new Thread[5];
        for (int i = 0; i < threads.length; i++) {
            threads[i] = new Thread(new MyRunnable());
            threads[i].start();
        }
        for (int i = 0; i < threads.length; i++)
            threads[i].join();
        System.out.println("counter = " + Counter.count);
    }
}
public class SynchExample4 {
    static class Counter {
        public static int count = 0;
        public void increment() {
            synchronized (this) {
                Counter.count++;
            }
        }
    }

    static class MyRunnable implements Runnable {
        Counter counter;
        public MyRunnable(Counter counter) {
            this.counter = counter;
        }
        @Override
        public void run() {
            for (int i = 0; i < 10000; i++)
                counter.increment();
        }
    }

    public static void main(String[] args) throws Exception {
        Thread[] threads = new Thread[5];
        for (int i = 0; i < threads.length; i++) {
            threads[i] = new Thread(new MyRunnable(new Counter()));
            threads[i].start();
        }
        for (int i = 0; i < threads.length; i++)
            threads[i].join();
        System.out.println("counter = " + Counter.count);
    }
}
public class SynchExample5 {
    static class Counter {
        public static int count = 0;
        public void increment() {
            synchronized (this) {
                Counter.count++;
            }
        }
    }

    static class MyRunnable implements Runnable {
        Counter counter;
        public MyRunnable(Counter counter) {
            this.counter = counter;
        }
        @Override
        public void run() {
            for (int i = 0; i < 10000; i++)
                counter.increment();
        }
    }

    public static void main(String[] args) throws Exception {
        Thread[] threads = new Thread[5];
        Counter counter = new Counter();
        for (int i = 0; i < threads.length; i++) {
            threads[i] = new Thread(new MyRunnable(counter));
            threads[i].start();
        }
        for (int i = 0; i < threads.length; i++)
            threads[i].join();
        System.out.println("counter = " + Counter.count);
    }
}

Liveness

Deadlock

Deadlock

Priority Inversion