MapReduce
Einführung
- MapReduce
- MapReduce ist ein von Google Inc. eingeführtes Framework für nebenläufige Berechnungen über große (mehrere Petabyte) Datenmengen auf Computerclustern. (DeWikipedia:MapReduce)
- es ist
fast
robust
easy to use
scalable
widely applicable
Monitoring - das Framework ist seit 2003 bei Google zunehmend mehr im Einsatz
Namensgebung:
- viele konkrete Probleme können in den Funktionen höherer Ordnung map und reduce ausgedrückt werden MapReduce ist ein Framework, das über die zwei Funktionen map und reduce durch den Anwender parametrisiert werden kann.
- map und reduce Beispiele in python
>>> map(lambda x: x*x,range(1,11))
[1, 4, 9, 16, 25, 36, 49, 64, 81, 100]
>>> map(lambda a: ord(a),"aeioAEIOU")
[97, 101, 105, 111, 117, 65, 69, 73, 79, 85]
>>> reduce(lambda a,b:a*b,range(1,11))
3628800
>>> reduce(lambda a,b:a+" "+b,["Only","for","testing","purpose","."])
'Only for testing purpose .'
Grundidee
- biete ein mächtiges Framework mit den folgenden Eigenschaften an
- automatische Parallelisierung, Koordininierung und Verteilung von Jobs
- Fehlertoleranz gegenüber Hardware- und Softwareausfällen
- automatische Lastverteilung
- Optimierung von Netzwerk und Datentransfer
- Überblick mittels Status und Monitoring
- lasse den Benutzer die Anwendung des Framework über die zwei Funktionen map und reduce parametrisieren
Anwender beschäftigt sich nur noch mit der Anwendungslogik
Anwendungen
- Indizierung grosser Datenmengen (grösser als 20 Terabyte) für die Suchaufbereitung
- mit mehreren MapReduce Berechnungen werden die Daten strukturiert
- Maschinelles Lernen
- künstliches Erzeugung von Wissen aus Erfahrung
- Verteiltes
- Suchen von Pattern
- Sortieren von Daten
- Rechnen (Matrizenmultiplikationen)
- Auswertung von Log Dateien
- Kategorisieren von Daten
- Nachrichtenaufbereitung
- Datenaufbereitung um Reports für häufige Anfrage zu erzeugen
- Erzeugen von Graphen
- Aufrufhäufigkeit von Seiten
- Verlinkung von Webseiten
- Extrahieren von Daten und neue Einsichten zu erlangen
Protokoll
Datenfluß
- Datenfluß MapReduce:
- die Eingabedatei wird auf mehrere map-Prozesse verteilt und die map-Phase beginnt
- die map-Prozesse berechnen parallel die Zwischenergebnisse
- sobald alle map-Prozesse fertig sind, beginnt die reduce-Phase
- die reduce-Prozesse berechnen parallel für bestimmte Zwischenergebnisse die Ausgabedaten
- für jeden reduce-Prozess wird eine Ausgabedatei erzeugt
- sind alle reduce-Prozesse fertig, ist die reduce-Phase vollendet und somit auch die MapReduce Berechnung
Details
Mapper
- Deklaration in Haskell
map: (s1,w1)->[(s2,w2)]
- verarbeitet eine Liste von (Schlüssel,Wert) Paaren (s1,w1)
- erzeugt eine Liste von (neuen Schlüssel,Wert) Paaren (s2,w2) aus jedem Eingabepaar (s1,w1)
- die Liste kann auch leer sein
- der Mapper ist implizit gegebenfalls auch ein Filter
- der Mapper ist mit der Verarbeitung der Liste aller Paare (s1,w1) fertig
Reducer
- Deklaration in Haskell
reduce: (s2,[w2]) -> [w3]
- erhält eine alle Paare mit gleichem Schlüssel s2 in der Form des Paares (s2,w2*)
- wendet reduce auf die Werte w2 an und gibt eine Liste von w3's als Ergebnis aus
map und reduce Besonderheiten
Die Begrifflichkeit von MapReduce lehnt sich nur an ihre funktionalen map und reduce Pendants an (Vgl.: Google's MapReduce Programming Model -- Revisited) . Die map Funktion übernimmt implizit die Funktionalität der dritten elementaren funktionalen Funktion filter.GHCi, version 6.10.2: http://www.haskell.org/ghc/ :? for help
Loading package ghc-prim ... linking ... done.
Loading package integer ... linking ... done.
Loading package base ... linking ... done.
Prelude> :type map
map :: (a -> b) -> [a] -> [b]
Prelude> :type foldl
foldl :: (a -> b -> a) -> a -> [b] -> a
- die map und reduce Funktion aus MapReduce entspricht der Funktionen in der funktionalen Welt, die auf die Liste angewandt wird
Eigenschaft map funktional map Phase MapReduce Eingabe Liste von Werten Listen von Paare (Schlüssel,Wert) Verarbeitung jeder Wert wird auf einen neuen Wert abgebildet (Schlüssel,Wert) Paar kann auf keine oder mehrere (Schlüssel,Wert) Paare abgebildet werden Ausgabe Liste von neuen Werten Liste von neuen Paaren (Schlüssel,Liste von Werten)
Beispiele
- Klassiker: Worthäufigkeit in einer Datei zählen
- map Funktion erhält als Input die Liste aller Paare (Zeilennummer,Zeileninhalt)
- map Funktion produziert zu jedem Paar (Zeilennummer,Zeileninhalt) alle Paare (Wort,1) zu jedem Wort der Zeile
- reduce Funktion erhält zu jedem Wort eine Paar (Wort,[1,..,1]) und berechnet die Häufigkeit von Wort in der Datei, indem es die 1 in [1,..,1] addiert
- als Ergebnis steht eine Liste [(Wort,Häufigkeit des Wortes)] über alle Wörter aus der Datei zu Verfügung
- weitere Anwendungsfälle
Anwendung Eingabe für Mapper Ausgabe Mapper Eingabe Reducer Ausgabe Reducer Wortsuche in mehreren Dateien [(Dateiname,Dateiinhalt)] [(wort,(Dateiname,Zeilenummer)] [(wort,[(Dateiname,Zeilenummer),...,(Dateiname,Zeilenummer)] [(wort,[(Dateiname,Zeile),...,(Dateiname,Zeile)] Am häufigsten referenzierte Web-Pages ermitteln [(url,Inhalt der html-Seite)] [(Referenz einer url,1)] [(Referenz einer url,[1,..,1] ) [(Referenz einer url, Länge von ([1,..,1])] Verteiltes Sortieren (Wörter) [(i-ter Teil ,Wörter des i-ten Teils)] ([Buchstabe == k,( k-Wort )] [Buchstabe k,(Liste über k-Wörter)] [(Buchstabe, sortierte Liste über k-Wörter)] Primzahlen aus n-Zahlen ermitteln [(i-ter Teil, Zahlen des i-ten Teils)] [(Zahlenbereich i,Zahl im Zahlenbereich i)] [Zahlenbereich,(alle Zahlen aus Zahlenbereich i)] [Zahlenbereich i, (Primzahlen aus Zahlenbereich)]
Mitwirkende
- der Eingabe Leser
- teilt die Eingabe in große Blöcke (64MB Google) und weist in einem Mapper zu
- transformiert die Blöcke in (Schlüssel,Wert) Paare für den Mapper
- die Mapper Funktion
- nimmt die Liste von (Schlüssel,Wert) Paare and und erzeugt zu jedem Paar eine neue Listen von (Schlüssel,Wert) Paaren, die auch leer sein kann
- die Verteilerfunktion
- die Ausgabe aller Maps wird den entsprechenden Reducer zur Weiterverarbeitung zugewiesen
- eine typische Zuweisung erfolgt durch den Hashwert des Keys modulo der Anzahl der Reducer
- die Vergleichsfunktion
- sortiert die Werte der Mapper Funktion entsprechend der Schlüssel
- dies ist notwendig, da der Reducer die Werte mehrerer Schlüssel in der Regel bearbeitet
- die Reducer Funktion
- erzeugt zu jedem Schlüssel und deren Werte einen Ausgabeliste, die auch leer sein kann
- der Ausgabe Schreiber
- speichert das Ergebnis in einer Datei ab
Architektur
Ausführung
- MapReduce Framework:
- das MapReduceFramework
- teilt die Eingaben in m Teile auf
- start die Master und Worker Prozesse auf dem Cluster
- Master
- weist die m map Aufgaben und r reduce Aufgaben den entsprechenden Rechner zu
- Mapper
- lesen die Wert als (Schlüssel,Wert) Paare ein
- verarbeiten die Daten
- schreiben die Zwischenergebnisse in den Speicher
- Mapper
- schreiben die Zwischenergebnisse periodisch auf die Platte
- die Zwischenergebnisse werden in r Dateien aufgeteilt
- teilt dem Master die Namen der Dateien mit
- Reducer
- bekommt die Adressen der Zwischenergebnisse mitgeteilt
- liest alle für ihn bestimmten (Schlüssel,Wert) Paare ein
- sortiert die (Schlüssel,Wert) Paare nach dem Schlüssel
- Reducer
- iteriert über die (Schlüssel,Wert) Paare für jeden Schlüssel
- übergibt die (Schlüssel,Wert) Paare an die reduce Funktion
- die reduce Funktion akkumuliert die Ergebnisse des reduce Schrittes zu einem Ausgabewert
- das MapReduce Framework
- warten bis alle map und reduce Prozesse fertig sind
- übergibt die Kontrolle wieder der Applikation
IO Managment
- Kompression der Daten
- Serialisierung der Daten
- Verteilung und Sortierung der Daten
- Verwaltung der Metadaten auf dem Master
verteilte Dateisystem
- Charakteristiken
- sehr große Datenmengen Petabytes
- wenige grosse Dateien (>GB) bestehen aus Datenblöcken, die typischerweise 64 MB groß sind
- die Datenblöcke liegen redundant (typischerweise 3 mal) auf Chunkservern vor
- Datendurchsatz ist wichtiger als die Zugriffszeit
- Streaming Zugriff auf die Daten
- lesen vom Dateianfang
- schreiben ans Dateiende
- write-once, read-many-times Zugriffe
- wenige grosse, statt vieler kleiner Zugriffe
- meist werden die ganzen Daten gelesen
- kann mit hohen Fehlerraten umgehen (Chunkserver sind einfache PCs)
- Beispiele
Status und Monitoring
Mittels eines Webservers kann das MapReduce Framework administriert werden.- Überblick über die Master und Worker
- Status und Fortschritt der einzelnen Jobs
- Zähler über die Map Prozesse
- Zugriff auf die Logdateien
Tiefere Einsichten
- Master
- überwacht den Zustand der Arbeiter (idle, in-progress,completed)
- kennt die r Zwischenergebnisse Adressen jedes Mappers
- Fehlertoleranz
- Arbeiter
- Master pingt in periodischen Zeitabschnitten die Arbeiter
- Ausfall einer map Aufgabe
- werden neu auf andere Rechner verteilt, da sich die Zwischenergebnisse im Arbeitsspeicher oder lokalen Speicher des Rechners befinden
- die reduce Aufgaben werden wiederholt
- Ausfall einer reduce Aufgaben
- fertige reduce Aufgaben sind durch das globalen Filesystem weiter verfügbar
- Master
- bei einem Fehler des Master wird MapReduce neu angestossen
- Umgang mit mehrfachen Ergebnissen
- wird eine map Aufgabe von mehreren Mapper gleichzeitig gelöst, verwendet der Master das Ergebnis des ersten Mappers
- Reducer, die die gleiche Aufgabe lösen, überschreiben ihre Fehler im globalen Filesystem
- Arbeiter
- Datenlokalität
- die Rechner werden zu Mappern erklärt, die den Eingabedaten am nächsten sind
- Entscheidunskriterium ( Knoten, Rack, Datenzenter )
- im Google File Sytem (GFS) und Hadoop File System (HDFS) sind die Datenblöcke mindenstens 3 mal vorhanden
- die Rechner werden zu Mappern erklärt, die den Eingabedaten am nächsten sind
- Aufgabenaufteilung
- die Anzahl m der map und die Anzahl r der reduce Aufgaben sollte deutlich höher als die Anzahl der Rechner sein, damit Aufgaben optimal verteilt werden können
- typische Werte von Google für 2000 Arbeiter
- map Aufgaben über 16 - 64MB
- 20000 map Aufgaben
- 5000 reduce Aufgaben
- Nachzügler
- kurz vor Ende des MapReduce Prozesses werden neue Worker für die nicht vollendeten Aufgaben gestartet
- sobald der ursprüngliche Worker oder der neue Worker fertig, ist dieser Teil des MapReduce Prozesses fertig
Erweiterungen
- die Partitionsierungsfunktion kann vorgegeben werden
- Default: hash(key) mod r
- die (Schlüssel,Wert) Paare einer Partition sind nach dem Schlüssel aufsteigend sortiert
- Combiner Funktion
- die Zwischenergebnisse der Mapper können schon vorab mit dem Reducer vereinfacht werden, wenn dieser asssoziativ und kommunitativ ist
- im klassischen Worthäufigkeit Zähler Problem ermittelt der Mapper statt k Paare (wort,1), ein Paar (wort,k) zu einem gegebenen Wort
- lesen der Eingabe und schreiben der Ausgabe
- damit der Mapper die Daten in der richtigen Form (Liste über (Schlüssel,Wert)) erhält, kann eine Eingabefunktion dem Mapper vorangestellt werden
- die Ausgabefunktion lässt sich ebenfalls parametrisieren, sodass mehrere MapReduce Berechnungen verkettet werden können die Komposition von mehrern MapReduce Jobs ist einem komplexen MapReduce Job vorzuziehen
- Umgang mit fehlerhaften Datensätzen
- fehlerhafte Datensätze, die die Worker zum Abbruch zwingen, werden vom Master ausgesondert
- debugging Unterstützung
- eine lokale, sequentielle MapReduce Implementierung hilft bei der Fehlersuche
Performance
- Beispiel von Google (http://labs.google.com/papers/mapreduce.html)
- 3 Zeichen grep über 1 Terabyte Daten
- 1800 Worker mit 4 GByte Ram, 2GHz Intel Xeon Prozessor mit Hyperthreading und 2 160 GB IDE Festplatten
- das gesuchte Pattern existiert 92.000 mal
- 15000 map Aufgaben und 1 reduce Aufgabe benötigten 150 Sekunden (60 Sekunden startup)
- Eckdaten
- Wer kann am schnellsten 1 Terabyte Daten sortieren?
- Google und Yahoo schaffen dies mit Clustern von 3000 - 4000 Rechner in ca. 60 Sekunden
- MapReduce skaliert linear
- Datengrösse Petabyte (10^15 Byte oder auch 1 Million Gigabyte)
- Wer kann am schnellsten 1 Terabyte Daten sortieren?
Implementierungen
- Google
- Framework in C++
- Interface in Python und Java
- Hadoop(Yahoo)
- Java
- Interface in Java
- spezielles C++ Interface über Hadoop Pipes (Sockets)
- streaming fähig
- im Einsatz bei
- Yahoo
- Amazon
- AOL
- IBM
- The New York Times
- disco(Nokia)
- Framework in Erlang
- Interface in Python
Beispiele
- erster Versuch mit disco scheiterte; Jobs wurden einfach nicht an die Worker geschickt
- zweiter Versucht mit dem Streaming Interface Hadoop klappt sofort, das nur ein Java 1.6 notwendig war
Hadoop
setup
- Eingabedatei: grimm.txt (Grimm's Fairy Tales by Jacob Grimm and Wilhelm Grimm in us-ascii)
- Umgebungsvariablen
export HD=/home/grimm/hadoop/hadoop-0.20.1
export HD_W=/home/grimm/hadoop/examples
export PATH=/home/grimm/python/Python-2.6/bin:$PATH
streaming Interface
- mapper: Mapper Funktion
- reducer: Reducer Funktion
- input: Eingabedatei(en)
- output: Ausgabeverzeichnis
Codebeispiele
- zwei Funktion readInput und outputWriter, die die Eingabe von stdin lesen und nach stdout schreiben
import sys
def readInput(mapper):
for (num,line) in enumerate(sys.stdin):
line=line.strip()
mapper(num,line)
def outputWriter(reducer,sortKey=None):
key2Values={}
for line in sys.stdin:
line= line.strip()
key,value= line.split()
try:
key2Values[key].append(value)
except:
key2Values[key]=[value]
results=[]
for key in key2Values.keys():
results.append((key,reducer(key,key2Values[key])))
results.sort(key=sortKey)
for (key,value)in results:
print "%s %s"% (key,value)
- readInput
- wird über den Mapper parametrisiert
- nimmt den Eingabestrom an
- erzeugt für jedes Zeile ein Paar (Zeilennummer,Zeile)
- füttert mit dem Paar (Zeilennummer,Zeile) den Mapper
- erzeugt aus dem Eingabestrom einen Ausgabestrom (Schlüssel,Wert) und füttert damit den Mapper
- outputWriter
- wird über den Reducer und eine Sortierfunktion parametrisiert
- nimmt den Eingabestrom an
- baut für jeden Schlüssel die Lister aller Werte auf
- wendet die reduce Funktion auf die Elemente dieser Liste an
- sortiert die Ergebnisliste gegebenfalls
- schreibt das Ergebnis aus stdout
MapReduce Aufrufe
Worthäufigkeit
- bestimme zu jedem Wort seine Häufigkeit nach Wörter sortiert
- Aufruf
$HD/bin/hadoop jar $HD/contrib/streaming/hadoop-0.20.1-streaming.jar -mapper $HD_W/mapperWordCount.py
-reducer $HD_W/reducerWordCount.py -input $HD_W/book/grimm.txt -output $HD_W/grimmWordCount.out - Mapper
#!/usr/bin/env python
import ioMapReduce
def mapper(key,value):
words = value.split()
for word in words:
print "%s %s" % (word, 1)
ioMapReduce.readInput(mapper)
- Reducer
#!/usr/bin/env python
import sys
import ioMapReduce
def reducer(key,values):
return len(values)
ioMapReduce.outputWriter(reducer)
Worthäufigkeit invertiert
- bestimme zu jedem Wort seine Häufigkeit nach deren Häufigkeit sortiert
- Aufruf
$HD/bin/hadoop jar $HD/contrib/streaming/hadoop-0.20.1-streaming.jar -mapper $HD_W/mapperFrequency.py
-reducer $HD_W/reducerFrequency.py -input $HD_W/grimmWordCount.out/part-00000 -output $HD_W/grimmFrequency.out - Mapper
#!/usr/bin/env python
import ioMapReduce
def mapper(key,value):
(word,freq) = value.split()
print "%s %s" % (freq,word )
ioMapReduce.readInput(mapper)
- Reducer
#!/usr/bin/env python
import sys
import ioMapReduce
def reducer(key,values):
return values
ioMapReduce.outputWriter(reducer,lambda item: int(item[0]) )
Pattern Matching
- finde ein Wort in einer Datei und gib dessen Zeilennummer aus
- Aufruf
$HD/bin/hadoop jar $HD/contrib/streaming/hadoop-0.20.1-streaming.jar -mapper $HD_W/mapperFrequencyWord.py
-reducer $HD_W/reducerFrequencyWord.py -input $HD_W/book/grimm.txt -output $HD_W/grimmFrequencyWord.out - Mapper
#!/usr/bin/env python
import ioMapReduce
def mapper(key,value):
words = value.split()
for word in words:
if ( word == "witch" ): print "%s %s" % (word,key)
ioMapReduce.readInput(mapper)
- Reducer
#!/usr/bin/env python
import sys
import ioMapReduce
def reducer(key,values):
return sorted(values,key=int)
ioMapReduce.outputWriter(reducer)
Fazit
- die Mapper und Reducer sind in Python einfach zu schreiben
- das Aufsetzen des Frameworks stellt die grösste Hürde dar
- es fehlt jetzt nur noch 2000 Knoten
Weiterlesen...