Link utili
Algoritmi
Mutua esclusione
Test and set
shared {
lock = 0
}
thread i {
loop {
old = 0
do {
TS(lock, old)
} while(old == 1)
// sezione critica
lock = 0
}
}
Compare and swap
shared {
lock = 0
}
thread i {
loop {
old = 1
do {
old = 1
CAS(lock, old)
} while(old == 1)
// sezione critica
lock = 0
}
}
Alternanza stretta
shared {
turn = 0
}
thread 0 {
while turn == 1 {}
// Sezione critica
turn = 1
}
thread 1 {
while turn == 0 {}
// Sezione critica
turn = 0
}
// NON E' GARANTITO IL PROGRESSO per thread con velocita' diverse
Peterson per 2 thread
shared {
interested0 = false
interested1 = false
turn = 0
}
thread 0 {
loop {
interested0 = true
turn = 0
while interested1 and turn == 0 {}
// sezione critica
interested0 = false
}
}
thread 1 {
loop {
interested1 = true
turn = 1
while interested0 and turn == 1 {}
// sezione critica
interested1 = false
}
}
Lamport per N thread
shared {
choosing: boolean[N]
numbers: int[N]
}
thread i {
loop {
choosing[i] = true
numbers[i] = max(numbers) + 1
choosing[i] = false
for k in 0..N {
while choosing[k] {}
while numbers[k] != 0 and (numbers[k], k) << (numbers[i], i) {}
// dove (a,b) << (c,d) <==> a < c or (a==c and b < d)
}
// Critical section
numbers[i] = 0
}
}
Producer e consumer
Vanilla:
buffer = int[N]
int in, out, counter = 0
thread producer {
next_item = produce_item()
while counter == N {}
buffer[in] = next_item
in = (in + 1) % N
counter++
}
thread consumer {
while counter == 0 {}
item = buffer[out]
out = (out + 1) % N
counter--
consume_item(item)
}
// Questo schema funziona sse counter-- e counter++ sono istruzioni atomiche
Con semafori:
shared {
mutex: mutex = 1
empty = semaphore(N)
full = semaphore(0)
}
produttore {
item = produce_item()
down(empty)
lock(mutex)
insert_item()
unlock(mutex)
up(full)
}
consumatore {
down(full)
lock(mutex)
item = remove_item()
unlock(mutex)
up(empty)
consume_item(item)
}
Con monitor
monitor ProducerConsumer {
condition full
condition empty
int count = 0
function insert(item) {
if count == N wait(full)
insert_item(item)
count++
if count == 1 signal(empty)
}
function remove() {
if count == 0 wait(empty)
item = remove_item()
count--
if count == N-1 signal(full)
return item
}
}
thread Producer {
loop {
item = produce_item()
ProducerConsumer.insert(item)
}
}
thread Consumer {
loop {
item = ProducerConsumer.remove()
consume_item(item)
}
}
In java
class Cell {
int value;
boolean writable;
}
class Produttore extends Thread {
public void run() {
for(int i = 0; i<10; i++) {
synchronized(cell) {
while(!cell.writable) {
try {
cell.wait();
} catch(InterruptedException e) {
return;
}
}
cell.value++;
cell.writable = false;
cell.notify();
}
}
}
}
class Consumer extends Thread {
public void run() {
int value;
for(int i=0; i<10; i++) {
synchronized(cell) {
while(cell.writable) {
try {
cell.wait();
} catch(InterruptedException e) {
return;
}
}
value = cell.value;
cell.writable = true;
cell.notify();
}
}
}
}
In java con BlockingQueue
class Main {
public static void main(String[] args) throws Exception {
BlockingQueue queue = new ArrayBlockingQueue(1024);
Produce producer = new Producer(queue);
Consumer consumer = new Consumer(queue);
new Thred(producer).start();
new Thread(consumer).start();
Thread.sleep(4000);
}
}
class Producer implements Runnable {
protected BlockingQueue queue;
public Producer(BlockingQueue queue) {
this.queue = queue;
}
public void run() {
try {
queue.put("1");
queue.put("2");
queue.put("3");
} catch(InterruptedException e) {
return;
}
}
}
class Consumer implements Runnable {
protected BlockingQueue queue;
public Consumer(BlockingQueue queue) {
this.queue = queue;
}
public void run() {
try {
var item = queue.take();
consume_item(item);
} catch(InterruptedException e) {
return;
}
}
}
Strutture
Thread pool
da: thread pool in java
import java.util.concurrent.LinkedBlockingQueue;
public class ThreadPool {
private final int nThreads;
private final PoolWorker[] threads;
private final LinkedBlockingQueue queue;
public ThreadPool(int nThreads) {
this.nThreads = nThreads;
queue = new LinkedBlockingQueue();
threads = new PoolWorker[nThreads];
for (int i = 0; i < nThreads; i++) {
threads[i] = new PoolWorker();
threads[i].start();
}
}
public void execute(Runnable task) {
synchronized (queue) {
queue.add(task);
queue.notify();
}
}
private class PoolWorker extends Thread {
public void run() {
Runnable task;
while (true) {
synchronized (queue) {
while (queue.isEmpty()) {
try {
queue.wait();
} catch (InterruptedException e) {
System.out.println("An error occurred while queue is waiting: " + e.getMessage());
}
}
task = queue.poll();
}
try {
task.run();
} catch (RuntimeException e) {
System.out.println("Thread pool is interrupted due to an issue: " + e.getMessage());
}
}
}
}
}
Barrier con semafori e condvar
#include <pthread.h>
#include <semaphore.h>
#include <stdio.h>
typedef struct mybarrier {
pthread_mutex_t m;
pthread_cond_t c;
unsigned count;
unsigned waiting;
} mybarrier_t;
mybarrier_t barrier;
void mybarrier_init(mybarrier_t *b, unsigned count) {
b->count = count;
b->waiting = 0;
pthread_mutex_init(&b->m, NULL);
pthread_cond_init(&b->c, NULL);
}
void mybarrier_wait(mybarrier_t *b) {
pthread_mutex_lock(&b->m);
if (++b->waiting < b->count) {
pthread_cond_wait(&b->c, &b->m);
pthread_mutex_unlock(&b->m);
return;
}
b->waiting = 0;
pthread_cond_broadcast(&b->c);
pthread_mutex_unlock(&b->m);
}
void *thread_main(void *data) {
printf("Hello from thread %ld before barriers\n", (long)data);
mybarrier_wait(&barrier);
printf("Hello from thread %ld after first barrier\n", (long)data);
mybarrier_wait(&barrier);
printf("Hello from thread %ld after second barrier\n", (long)data);
return 0;
}
#define THREAD_COUNT 4
int main() {
mybarrier_init(&barrier, THREAD_COUNT);
pthread_t tid[THREAD_COUNT];
for (long i = 0; i < THREAD_COUNT; i++) {
pthread_create(tid + i, 0, thread_main, (void *)i);
}
void *ret;
for (int i = 0; i < THREAD_COUNT; i++) {
pthread_join(tid[i], &ret);
}
}
Semafori con mutex e condvar
#include <pthread.h>
typedef struct {
pthread_mutex_t m;
pthread_cond_t c;
int count;
} mysem_t;
void mysem_init(mysem_t *s, int count) {
s->count = count;
pthread_mutex_init(&s->m, NULL);
pthread_cond_init(&s->c, NULL);
}
void mysem_down(mysem_t *s) {
pthread_mutex_lock(&s->m);
--s->count;
while (s->count < 0)
pthread_cond_wait(&s->c, &s->m);
pthread_mutex_unlock(&s->m);
}
void mysem_up(mysem_t *s) {
pthread_mutex_lock(&s->m);
++s->count;
pthread_cond_signal(&s->c);
pthread_mutex_unlock(&s->m);
}
int main() {}
RWLock con mutex
mutex_resource: mutex
mutex_counter: mutex
counter = 0
read()
{
mutex_counter.lock()
counter++
if(counter == 1) // se sono il primo lettore
mutex_resource.lock()
mutex_counter.unlock()
// Sezione Critica
mutex_counter.lock()
counter--
if(counter == 0) // se sono l'ultimo lettore
mutex_resource.unlock()
mutex_counter.unlock()
}
write(){
mutex_resource.lock()
// Sezione Critica
mutex_resource.unlock()
}