Home > java > Concorrenza in Java: Thread, Executor e Fork/Join

Concorrenza in Java: Thread, Executor e Fork/Join

19 giugno 2013

Thread, Executor e Fork/Join
La concorrenza è la capacità di eseguire in parallelo una parte o diverse parti di un programma. Nei moderni computer forniti di diverse CPU o diversi CORE è possibile sfruttare la parallelizzazione dei processi. Ciò consente di impiegare al meglio l’hardware in nostro possesso, di ridurre il tempo di elaborazione e di servire un numero sempre più elevato di utenze. Il miglioramento dell’esecuzione può essere valutato in base alla legge di Amhdal da cui si ricava che il massimo ricavo lo si ottiene ottimizzando il processo più frequente.

In Java storicamente la concorrenza è resa possibile tramite l’utilizzo dei Thread ma questi non hanno riscontrato un grande favore da parte degli sviluppatori a causa della loro complessità e spesso sono stati evitati per non incorrere in problemi di gestione sbagliata della concorrenza.
A partire da java 1.5 è stato introdotto il framework Executor che ha visto la luce di nuove librerie tali da consentire un utilizzo più agevole dei Thread.
Mentre invece a partire da java 1.7 è stato introdotto il framework Fork/Join che ha introdotto altre librerie tali da ottimizzare il framework Executor.
L’applicazione della concorrenza risulta molto utile in tutti quei casi in cui non si ha la necessità di serializzare, cioè accodare i processi, ma è anche possibile parallelizzarli. La necessità di serializzare due processi avviene quando l’output del primo processo costituisce, in tutto o in parte, l’input del secondo processo, una situazione del tipo: x=f(z) e y=f(x). Ma questa situazione non si verifica spesso, o almeno non sempre.
Prendiamo il caso di una serie di liste di scelte indipendenti, una lista di prodotti e la propria citta di nascita, per ogni lista abbiamo query diverse, ed in questo caso possiamo lanciare le query in parallelo.
Se invece le liste sono tra loro dipendenti, per esempio selezionare regione poi provincia e poi comune di nascita, allora la prima scelta sarà determinante per la scenda scelta quindi dovremo serializzare, anche se useremo altre tecniche per ottimizzare la ricerca.

Implementazione
Spesso ci si ritrova a sviluppare delle funzionalità che richiedono la composizione di diverse logiche: estrarre dal database delle liste, invocare un webservice, invocare un programma tramite CICS ecc. E spesso queste operazione sono tra loro indipendenti.
L’utilizzo della concorrenza consente di eseguire in parallelo le logiche di business e solo al termine aggregare i dati al fine di effettuare dei controlli sui dati, trascodifiche e visualizzazione del risultato.

In questo articolo la mia intenzione è di esporre in modo sintetico le 3 modalità di utilizzo della concorrenza in java partendo dai Thread, per poi passare agli Executor ed infine completare con il Fork/Join.
L’esempio che tratto, si sofferma su un semplice “Hello World!” parallelo, tale da potermi concentrare sulle librerie della concorrenza e non sulle logiche di business. Le classi Hello e World rappresentano simbolicamente le classi di business che contengono le logiche parallelizzate delle attività.

1.Utilizzo dei Thread
Per creare un Thread sappiamo che dobbiamo creare una classe che estenda Thread oppure implementi Runnable. In entrami i casi dobbiamo implementare il metodo run(), dobbiamo invocarlo chiamando il metodo start() e restare in attesa del completamento con il metodo join().
Il metodo run() non prevede parametri in ingresso e non restituisce valore (tipo void), questo significa che per passare e ricevere valori dobbiamo procedere in altri modi. Infatti avviene spesso che l’inizializzazione del Thread avvenga passando parametri al costruttore mentre il recupero del risultato avvenga tramite la definizione di un metodo get…() che legga le proprietà incapsulate nel Thread.

package concorrenza.threads;

public class Client {
	public static void main(String[] args) throws InterruptedException {
		new ParallelFacade().execute();
	}
}
package concorrenza.threads;

import java.util.LinkedList;
import java.util.List;

public class ParallelFacade {
	public void execute() throws InterruptedException {
		Hello hello = new Hello("Hello ");
		World world = new World("World!");

		List<Thread> list = new LinkedList<Thread>();
		list.add(hello);
		list.add(world);

		// Avvia i Threads
		for (Thread t : list) {
			t.start();
		}

		// Attende completamento
		for (Thread t : list) {
			t.join();
		}

		// Recupera risultato
		String helloWorld = hello.get() + world.get();

		// Stampa il risultato
		System.out.println(helloWorld);
	}
}
package concorrenza.threads;

public class Hello extends Thread {

	private String hello;

	public Hello(String hello) {
		this.hello = hello;
	}

	public String get() {
		return this.hello;
	}

	@Override
	public void run() {
		this.hello = this.hello.toUpperCase();
	}

}
package concorrenza.threads;

public class World extends Thread {

	private String world;

	public World(String world) {
		this.world = world;
	}

	public String get() {
		return this.world;
	}

	@Override
	public void run() {
		this.world = this.world.toLowerCase();
	}

}

L’esecuzione produce l’output:

$JAVA_HOME/bin/java concorrenza.threads.Client
HELLO world!

2.Utilizzo degli Executor
Da java 1.5 è possibile utilizzare il framework Executor che consente di semplificare l’invocazione dei Thread. In questo caso occorre ottenere l’istanza di un ExecutorService tramite la classe factory Executors, invocare i Thread di tipo Callable, ossia che ritorna valore, per ottenere una lista di Future, ricavare il valore di ritorno delle singole invocazioni.
In questo caso l’inizializzazione degli oggetti Callable, come nel caso dei Thread, avviene tramite il costruttore mentre il recupero del risultato avviene tramite gli oggetti Future che espongono il metodo get() che ritorna il tipo generic definito nelle classi Callable.

package concorrenza.executors;

import java.util.concurrent.ExecutionException;

public class Client {
	public static void main(String[] args) throws InterruptedException, ExecutionException {
		new ParallelFacade().execute();
	}
}
package concorrenza.executors;

import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class ParallelFacade {
	public void execute() throws InterruptedException, ExecutionException {

		List<Callable<String>> callables = new LinkedList<Callable<String>>();
		callables.add(new Hello("Hello "));
		callables.add(new World("World!"));

		ExecutorService executor = Executors.newSingleThreadExecutor();
		//Avvia i Threads ed attende il completamento
		List<Future<String>> list = executor.invokeAll(callables);

		//Recupera risultato
		String helloWorld = "";
		for (Future<String> future : list) {
			helloWorld += future.get();
		}
		//Chiude il pool
		executor.shutdown();

		//Stampa il risultato
		System.out.println( helloWorld );
	}
}
package concorrenza.executors;

import java.util.concurrent.Callable;

public class Hello implements Callable<String> {

	private String hello;

	public Hello(String hello) {
		this.hello = hello;
	}

	@Override
	public String call() throws Exception {
		return this.hello.toUpperCase();
	}
}
package concorrenza.executors;

import java.util.concurrent.Callable;

public class World implements Callable<String> {

	private String world;

	public World(String world) {
		this.world = world;
	}

	@Override
	public String call() throws Exception {
		return this.world.toLowerCase();
	}
}

L’esecuzione produce l’output:

$JAVA_HOME/bin/java concorrenza.executors.Client
HELLO world!

3.Utilizzo del Fork/Join
Da java 1.7, anche se è possibile in java 1.6 importando la libreria http://gee.cs.oswego.edu/dl/jsr166/dist/jsr166y.jar , sono disponibili delle nuove classi che implementano l’algoritmo Fork/Join. In realtà si tratta di una specializzazione del framework Executor introdotti in java 1.5 che presenta delle ottimizzazione.
Tali librerie sono accessibili tramite la classe ForkJoinPool che consente, tramite il metodo invoke(), di invocare un task di tipo ForkJoinTask. Il task implementa il tipo RecursiveTask o RecursiveAction, a seconda se deve o meno tornare un valore, ed implementa il metodo compute() che si occupa di gestire le invocazioni, al termine viene recuperato il risultato tramite il metodo join().

package concorrenza.forkJoin;

public class Client {
	public static void main(String[] args) {
		new ParallelFacade().execute();
	}
}
package concorrenza.forkJoin;

import java.util.Collection;
import java.util.LinkedList;
import java.util.List;

import jsr166y.ForkJoinPool;
import jsr166y.ForkJoinTask;
import jsr166y.RecursiveTask;

public class ParallelFacade extends RecursiveTask<String> {

	public void execute() {

		ForkJoinPool forkJoinPool = new ForkJoinPool();
		// Avvia i Threads, attende il completamento e recupera il risultato
		String helloWorld = forkJoinPool.invoke(new ParallelFacade());
		
		forkJoinPool.shutdown();
		// Stampa il risultato
		System.out.println(helloWorld);
	}

	@Override
	protected String compute() {
		List<ForkJoinTask<?>> ts = new LinkedList<ForkJoinTask<?>>();
		ts.add(new Hello("Hello "));
		ts.add(new World("World!"));

		// Avvia i Threads
		Collection<ForkJoinTask<?>> list = invokeAll(ts);
		
		//Recupera il risultato
		String helloWorld = "";
		for (ForkJoinTask<?> rt : list) {
			helloWorld += rt.join();
		}

		return helloWorld;
	}

}
package concorrenza.forkJoin;

import jsr166y.RecursiveTask;

public class Hello extends RecursiveTask<String> {
	private String hello;

	public Hello(String hello) {
		this.hello = hello;
	}

	@Override
	protected String compute() {
		return this.hello.toUpperCase();
	}

}
package concorrenza.forkJoin;

import jsr166y.RecursiveTask;

public class World extends RecursiveTask<String> {
	private String world;

	public World(String world) {
		this.world = world;
	}

	@Override
	protected String compute() {
		return this.world.toLowerCase();
	}

}

L’esecuzione produce l’output:

$JAVA_HOME/bin/java concorrenza.forkJoin.Client
HELLO world!
Categorie:java
%d blogger cliccano Mi Piace per questo: