DBInputFormat za prijenos podataka iz SQL-a u NoSQL bazu podataka



Cilj ovog bloga je naučiti kako prenijeti podatke iz SQL baza podataka u HDFS, kako prenijeti podatke iz SQL baza podataka u NoSQL baze podataka.

Na ovom ćemo blogu istražiti mogućnosti i mogućnosti jedne od najvažnijih komponenti Hadoop tehnologije, tj. MapReduce.

Danas tvrtke usvajaju Hadoop framework kao svoj prvi izbor za pohranu podataka zbog njegovih mogućnosti učinkovitog rukovanja velikim podacima. Ali također znamo da su podaci svestrani i da postoje u raznim strukturama i formatima. Za kontrolu tako velike raznolikosti podataka i njihovih različitih formata trebao bi postojati mehanizam koji će prilagoditi sve sorte, a opet dati učinkovit i dosljedan rezultat.





Najmoćnija komponenta u Hadoop okviru je MapReduce koja može pružiti kontrolu nad podacima i njihovom strukturom bolje od ostalih kolega. Iako to zahtijeva općenite krivulje učenja i složenost programiranja, ako se možete nositi s tim složenostima, sigurno možete obraditi bilo koju vrstu podataka s Hadoop-om.

MapReduce okvir dijeli sve svoje zadatke obrade u osnovi u dvije faze: Mapiranje i Smanjenje.



Priprema vaših sirovih podataka za ove faze zahtijeva razumijevanje nekih osnovnih klasa i sučelja. Super klasa za ovu preradu je InputFormat.

The InputFormat class je jedna od osnovnih klasa u Hadoop MapReduce API-ju. Ova je klasa odgovorna za definiranje dvije glavne stvari:

  • Podaci se dijele
  • Čitač zapisa

Podjela podataka je temeljni koncept u Hadoop MapReduce okviru koji definira veličinu pojedinačnih zadataka karte i potencijalni poslužitelj izvršenja. The Čitač zapisa odgovoran je za stvarne zapise o čitanju iz ulazne datoteke i predaje ih (kao parove ključ / vrijednost) mapiranju.



Broj mapiranja određuje se na temelju broja razdvajanja. Zadatak je InputFormat-a da kreira dijeljenja. Većina vremena podijeljene veličine ekvivalentna je veličini bloka, ali ne uvijek će se dijeljenja stvarati na temelju veličine bloka HDFS. To u potpunosti ovisi o tome kako je metoda getSplits () vašeg InputFormat-a nadjačana.

Postoji temeljna razlika između MR split i HDFS bloka. Blok je fizički komad podataka, dok je dijeljenje samo logični komad koji mapper čita. Podjela ne sadrži ulazne podatke, ona samo sadrži referencu ili adresu podataka. Podjela u osnovi ima dvije stvari: duljinu u bajtovima i skup mjesta za pohranu, koja su samo nizovi.

Da bismo ovo bolje razumjeli, uzmimo jedan primjer: Obrada podataka pohranjenih u vašem MySQL-u pomoću MR-a. Budući da u ovom slučaju ne postoji koncept blokova, teorija: 'dijeljenja se uvijek stvaraju na temelju HDFS bloka',ne uspije. Jedna od mogućnosti je stvaranje dijeljenja na temelju raspona redaka u vašoj MySQL tablici (a to je ono što DBInputFormat čini, ulazni format za čitanje podataka iz relacijskih baza podataka). Možemo imati k broj podjela koje se sastoje od n redova.

Jedino se za InputFormats temeljene na FileInputFormat (InputFormat za rukovanje podacima pohranjenim u datotekama) dijeljenja kreiraju na temelju ukupne veličine ulaznih datoteka u bajtovima. Međutim, veličina bloka FileSystem ulaznih datoteka tretira se kao gornja granica za ulazne podjele. Ako imate datoteku manju od veličine bloka HDFS, dobit ćete samo 1 mapper za tu datoteku. Ako želite imati drugačije ponašanje, možete koristiti mapred.min.split.size. Ali to opet ovisi isključivo o getSplits () vašeg InputFormat-a.

Imamo toliko već postojećih formata unosa dostupnih u paketu org.apache.hadoop.mapreduce.lib.input.

CombineFileInputFormat.html

CombineFileRecordReader.html

CombineFileRecordReaderWrapper.html

CombineFileSplit.html

CombineSequenceFileInputFormat.html

CombineTextInputFormat.html

FileInputFormat.html

FileInputFormatCounter.html

FileSplit.html

FixedLengthInputFormat.html

InvalidInputException.html

KeyValueLineRecordReader.html

KeyValueTextInputFormat.html

MultipleInputs.html

NLineInputFormat.html

SequenceFileAsBinaryInputFormat.html

SequenceFileAsTextInputFormat.html

SequenceFileAsTextRecordReader.html

SequenceFileInputFilter.html

SequenceFileInputFormat.html

SequenceFileRecordReader.html

TextInputFormat.html

java kako završiti program

Zadana vrijednost je TextInputFormat.

Slično tome, imamo toliko izlaznih formata koji čita podatke s reduktora i pohranjuje ih u HDFS:

FileOutputCommitter.html

FileOutputFormat.html

FileOutputFormatCounter.html

FilterOutputFormat.html

LazyOutputFormat.html

MapFileOutputFormat.html

MultipleOutputs.html

NullOutputFormat.html

PartialFileOutputCommitter.html

PartialOutputCommitter.html

SequenceFileAsBinaryOutputFormat.html

SequenceFileOutputFormat.html

TextOutputFormat.html

Zadano je TextOutputFormat.

Kad pročitate ovaj blog, naučili biste:

  • Kako napisati program za smanjenje karte
  • O različitim vrstama InputFormat-a dostupnih u Mapreduceu
  • Koja je potreba za InputFormats
  • Kako napisati prilagođene InputFormats
  • Kako prenijeti podatke iz SQL baza podataka u HDFS
  • Kako prenijeti podatke iz SQL (ovdje MySQL) baza podataka u NoSQL baze podataka (ovdje Hbase)
  • Kako prenijeti podatke iz jedne SQL baze podataka u drugu tablicu u SQL bazama podataka (Možda ovo možda nije toliko važno ako to radimo u istoj bazi podataka SQL. Međutim, nema ništa loše u poznavanju iste. Nikad se ne zna kako se može koristiti)

Preduvjet:

  • Unaprijed instaliran Hadoop
  • Unaprijed instaliran SQL
  • Unaprijed instaliran Hbase
  • Osnovno razumijevanje Java
  • MapSmanji znanje
  • Osnovno znanje Hadoop okvira

Razumijemo izjavu problema koju ćemo ovdje riješiti:

U našoj relacijskoj bazi podataka Edureka imamo tablicu zaposlenika u MySQL DB-u. Sada prema poslovnom zahtjevu moramo prebaciti sve podatke dostupne u relacijskom DB-u u datotečni sustav Hadoop, tj. HDFS, NoSQL DB poznat kao Hbase.

Imamo mnogo mogućnosti za izvršenje ovog zadatka:

  • Sqoop
  • Žlijeb
  • MapReduce

Sada ne želite instalirati i konfigurirati nijedan drugi alat za ovu operaciju. Preostala vam je samo jedna opcija, a to je Hadoopov okvir za obradu MapReduce. MapReduce okvir pružit će vam potpunu kontrolu nad podacima tijekom prijenosa. Možete manipulirati stupcima i staviti ih izravno na bilo koje od dva ciljna mjesta.

Bilješka:

  • Moramo preuzeti i staviti MySQL konektor u stazu klase Hadoop da dohvatimo tablice iz MySQL tablice. Da biste to učinili, preuzmite konektor com.mysql.jdbc_5.1.5.jar i zadržite ga u direktoriju Hadoop_home / share / Hadoop / MaPreduce / lib.
cp Downloads / com.mysql.jdbc_5.1.5.jar $ HADOOP_HOME / share / hadoop / mapreduce / lib /
  • Također, stavite sve staklenke Hbase pod Hadoop stazu kako bi vaš MR program pristupio Hbaseu. Da biste to učinili, izvršite sljedeću naredbu :
cp $ HBASE_HOME / lib / * $ HADOOP_HOME / share / hadoop / mapreduce / lib /

Verzije softvera koje sam koristio u izvršavanju ovog zadatka su:

  • Hadooop-2.3.0
  • HBaza 0,98,9-Hadoop2
  • Pomrčina Mjesec

Kako bih izbjegao program u bilo kojem problemu kompatibilnosti, preporučujem čitateljima da pokrenu naredbu sa sličnim okruženjem.

Prilagođeni DBInputWritable:

paket com.inputFormat.copy import java.io.DataInput import java.io.DataOutput import java.io.IOException import java.sql.ResultSet import java.sql.PreparedStatement import java.sql.SQLException import org.apache.hadoop.iodo.io .Writable import org.apache.hadoop.mapreduce.lib.db.DBWritable javna klasa DBInputWritable implementira Writable, DBWritable {private int id private String name, dept public void readFields (DataInput in) baca IOException {} public void readFields (ResultSet) baca SQLException // Resultset objekt predstavlja podatke vraćene iz SQL izraza {id = rs.getInt (1) name = rs.getString (2) dept = rs.getString (3)} public void write (DataOutput out) baca IOException { } public void write (PreparedStatement ps) baca SQLException {ps.setInt (1, id) ps.setString (2, name) ps.setString (3, dept)} public int getId () {return id} public String getName () {return ime} javni niz getDept () {return dept}}

Prilagođeni DBOutputWritable:

paket com.inputFormat.copy import java.io.DataInput import java.io.DataOutput import java.io.IOException import java.sql.ResultSet import java.sql.PreparedStatement import java.sql.SQLException import org.apache.hadoop.iodo.io .Writable import org.apache.hadoop.mapreduce.lib.db.DBWritable javna klasa DBOutputWritable implementira Writable, DBWritable {private String name private int id private String dept public DBOutputWritable (String name, int id, String dept) {this.name = name this.id = id this.dept = dept} public void readFields (DataInput in) baca IOException {} public void readFields (ResultSet rs) baca SQLException {} public void write (DataOutput out) baca IOException {} public void write (PreparedStatement ps) baca SQLException {ps.setString (1, name) ps.setInt (2, id) ps.setString (3, dept)}}

Ulazna tablica:

stvoriti bazu podataka edureka
izradi tablicu emp (empid int nije null, ime varchar (30), dept varchar (20), primarni ključ (empid))
umetnuti u vrijednosti emp (1, 'abhay', 'developement'), (2, 'brundesh', 'test')
odaberite * iz emp

Slučaj 1: Prijenos s MySQL-a na HDFS

paket com.inputFormat.copy import java.net.URI import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.FileSystem import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce .Job import org.apache.hadoop.mapreduce.lib.db.DBConfiguration import org.apache.hadoop.mapreduce.lib.db.DBInputFormat import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat import org.apache.hadoop .io.Text import org.apache.hadoop.io.IntWritable javna klasa MainDbtohdfs {public static void main (String [] args) baca izuzetak {Configuration conf = new Configuration () DBConfiguration.configureDB (conf, 'com.mysql.jdbc .Driver ', // klasa vozača' jdbc: mysql: // localhost: 3306 / edureka ', // db url' root ', // korisničko ime' root ') // lozinka Posao = novi posao (conf) .setJarByClass (MainDbtohdfs.class) job.setMapperClass (Map.class) job.setMapOutputKeyClass (Text.class) job.setMapOutputValueClass (IntWritable.class) job.setInputFormatClass (DBInputFormat.clamat). novi put (args [0])) DBInputFormat.setInput (posao, DBInputWritable.class, 'emp', // naziv ulazne tablice null, null, novi String [] {'empid', 'name', 'dept'} / / stupci tablice) Put p = novi put (args [0]) FileSystem fs = FileSystem.get (novi URI (args [0]), conf) fs.delete (p) System.exit (job.waitForCompletion (true)? 0: 1)}}

Ovaj dio koda omogućuje nam da pripremimo ili konfiguriramo ulazni format za pristup našem izvornom SQL DB-u. Parametar uključuje klasu upravljačkog programa, URL ima adresu SQL baze podataka, korisničko ime i lozinku.

DBConfiguration.configureDB (conf, 'com.mysql.jdbc.Driver', // klasa upravljačkog programa 'jdbc: mysql: // localhost: 3306 / edureka', // db url 'root', // korisničko ime 'root') //zaporka

Ovaj dio koda omogućuje nam prosljeđivanje detalja tablica u bazu podataka i postavljanje u objekt posla. Parametri uključuju, naravno, instancu posla, prilagođenu klasu za upisivanje koja mora implementirati DBWritable sučelje, naziv izvorne tablice, uvjet ako je još nešto null, bilo koji parametar sortiranja else null, popis stupaca tablice.

DBInputFormat.setInput (posao, DBInputWritable.class, 'emp', // unos imena tablice null, null, novi String [] {'empid', 'name', 'dept'} // stupci tablice)

Mapper

paket com.inputFormat.copy import java.io.IOException import org.apache.hadoop.mapreduce.Mapper import org.apache.hadoop.io.LongWritable import org.apache.hadoop.io.Text import org.apache.hadoop.io .IntWritable javna klasa Map proširuje Mapper {
zaštićena void karta (LongWritable ključ, DBInputWritable vrijednost, Context ctx) {try {String name = value.getName () IntWritable id = new IntWritable (value.getId ()) String dept = value.getDept ()
ctx.write (novi Tekst (ime + '' + id + '' + odjel), id)
} catch (IOException e) {e.printStackTrace ()} catch (InterruptedException e) {e.printStackTrace ()}}}

Reduktor: koristi se reduktor identiteta

Naredba za izvođenje:

hadoop jar dbhdfs.jar com.inputFormat.copy.MainDbtohdfs / dbtohdfs

Izlaz: MySQL tablica prenesena na HDFS

hadoop dfs -ls / dbtohdfs / *

Slučaj 2: Prijenos s jedne tablice u MySQL-u na drugu u MySQL-u

stvaranje izlazne tablice u MySQL-u

stvoriti tablicu zaposlenik1 (ime varchar (20), id int, dept varchar (20))

paket com.inputFormat.copy import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.mapreduce.lib.db.DBConfiguration import org.apache.hadoop.mapreduce.lib .db.DBInputFormat import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat import org.apache.hadoop.io.Text import org.apache.hadoop.io.IntWritable import org.apache.hadoop.io.NullWritable public class Mainonetable_to_other_table {javna statička void glavna (String [] args) baca izuzetak {Configuration conf = new Configuration () DBConfiguration.configureDB (conf, 'com.mysql.jdbc.Driver', // class driver 'jdbc: mysql: // localhost : 3306 / edureka ', // db url' root ', // korisničko ime' root ') // lozinka Posao posao = novi Posao (conf) job.setJarByClass (Mainonetable_to_other_table.class) job.setMapperClass (Map.class) posao .setReducerClass (Reduce.class) job.setMapOutputKeyClass (Text.class) job.setMapOutputValueClass (IntWritable.class) job.setOutputKeyClass (DBOutputWritable.class) job.setOutputValueClass (NET). lWritable.class) job.setInputFormatClass (DBInputFormat.class) job.setOutputFormatClass (DBOutputFormat.class) DBInputFormat.setInput (job, DBInputWritable.class, 'emp', // naziv ulazne tablice null, null, new String [] {'empid ',' name ',' dept '} // stupci tablice) DBOutputFormat.setOutput (posao,' zaposlenik1 ', // izlazni naziv tablice novi String [] {' name ',' id ',' dept '} // tablica stupci) System.exit (job.waitForCompletion (true)? 0: 1)}}

Ovaj dio koda omogućuje nam konfiguriranje imena izlazne tablice u SQL DB-u. Parametri su primjerice posao, naziv izlazne tablice i izlazni stupac.

DBOutputFormat.setOutput (posao, 'zaposlenik1', // ime izlazne tablice novi String [] {'ime', 'id', 'dept'} // stupci tablice)

Mapper: Isto kao slučaj 1

Reduktor:

paket com.inputFormat.copy import java.io.IOException import org.apache.hadoop.mapreduce.Reducer import org.apache.hadoop.io.Text import org.apache.hadoop.io.IntWritable import org.apache.hadoop.io .NullWritable javna klasa Reduce proširuje Reduktor {zaštićena void redukcija (Tekst ključ, Iterable vrijednosti, Context ctx) {int sum = 0 Red niza [] = key.toString (). Split ('') try {ctx.write (new DBOutputWritable (linija [0] .toString (), Integer.parseInt (linija [1] .toString ()), linija [2] .toString ()), NullWritable.get ())} ulov (IOException e) {e.printStackTrace ()} catch (InterruptedException e) {e.printStackTrace ()}}}

Naredba za pokretanje:

hadoop jar dbhdfs.jar com.inputFormat.copy.Mainonetable_to_other_table

Izlaz: Preneseni podaci iz EMP tablice u MySQL-u na drugog zaposlenika tablice1 u MySQL-u

Slučaj 3: Prijenos iz tablice u MySQL-u u tablicu NoSQL (Hbase)

Izrada Hbase tablice za smještaj izlaza iz SQL tablice:

stvori 'zaposlenik', 'službena_informacija'

Klasa vozača:

paket Dbtohbase import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.mapreduce.lib.db.DBConfiguration import org.apache.hadoop.mapreduce.lib.db.DBInputFormat import org.apache.hadoop.hbase.mapreduce.TableOutputFormat import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.client.HTable import org.apache.hadoop.hbase.client.HTableInterface import org.apache .hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil import org.apache.hadoop.io.Text javna klasa MainDbToHbase {public static void main (String [] args) baca izuzetak {Configuration conf = config HBaseConfiguration.create () HTableInterface mytable = novi HTable (conf, 'emp') DBConfiguration.configureDB (conf, 'com.mysql.jdbc.Driver', // klasa vozača 'jdbc: mysql: // localhost: 3306 / edureka' , // db url 'root', // korisničko ime 'root') // lozinka Posao = novi posao (conf, 'dbtohbase') job.setJarByClass (MainDbToHbase.class) job.s etMapperClass (Map.class) job.setMapOutputKeyClass (ImmutableBytesWritable.class) job.setMapOutputValueClass (Text.class) TableMapReduceUtil.initTableReducerJob ('zaposlenik', Reduce.class. klasa) DBInputFormat.setInput (posao, DBInputWritable.class, 'emp', // naziv ulazne tablice null, null, novi String [] {'empid', 'name', 'dept'} // stupci tablice) System.exit (job.waitForCompletion (istina)? 0: 1)}}

Ovaj dio koda omogućuje vam konfiguriranje klase izlaznog ključa koja je u slučaju hbase ImmutableBytesWritable

job.setMapOutputKeyClass (ImmutableBytesWritable.class) job.setMapOutputValueClass (Text.class)

Ovdje prosljeđujemo ime tablice hbase i reduktor da djeluju na tablici.

TableMapReduceUtil.initTableReducerJob ('zaposlenik', Reduce.class, posao)

Maper:

paket Dbtohbase import java.io.IOException import org.apache.hadoop.mapreduce.Mapper import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.io .LongWritable import org.apache.hadoop.io.Text import org.apache.hadoop.io.IntWritable javna klasa Map proteže Mapper {private IntWritable one = new IntWritable (1) zaštićena void karta (LongWritable id, DBInputWritable value, Context context). {try {String line = value.getName () String cd = value.getId () + '' String dept = value.getDept () context.write (new ImmutableBytesWritable (Bytes.toBytes (cd)), new Text (line + ' '+ dept))} catch (IOException e) {e.printStackTrace ()} catch (InterruptedException e) {e.printStackTrace ()}}}

U ovom dijelu koda uzimamo vrijednosti od getera iz klase DBinputwritable i zatim ih prosljeđujemo
ImmutableBytesWritable tako da dođu do reduktora u bytewriatble obliku koji Hbase razumije.

String line = value.getName () String cd = value.getId () + '' String dept = value.getDept () context.write (new ImmutableBytesWritable (Bytes.toBytes (cd)), new Text (line + '' + dept ))

Reduktor:

paket Dbtohbase import java.io.IOException import org.apache.hadoop.hbase.client.Put import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.TableReducer import org.apache.hadoop .hbase.util.Bytes import org.apache.hadoop.io.Text javna klasa Reduce produžuje TableReducer {javna praznina smanjuje (ImmutableBytesWritable ključ, Iterable vrijednosti, kontekst konteksta) baca IOException, InterruptedException {String [] uzrok = null // Vrijednosti petlje za (Tekst val: vrijednosti) {uzrok = val.toString (). split ('')} // Stavi na HBase Put stavi = novi Put (key.get ()) put.add (Bytes.toBytes ('official_info') ), Bytes.toBytes ('ime'), Bytes.toBytes (uzrok [0])) put.add (Bytes.toBytes ('official_info'), Bytes.toBytes ('odjel'), Bytes.toBytes (uzrok [1 ])) context.write (ključ, put)}}

Ovaj dio koda omogućuje nam da odredimo točan redak i stupac u koji bismo spremali vrijednosti iz reduktora. Ovdje pohranjujemo svaki empid u zasebni redak kao što smo emid napravili kao ključ retka koji bi bio jedinstven. U svaki redak pohranjujemo službene podatke o zaposlenicima pod obitelj stupaca „official_info“ pod stupce „ime“, odnosno „odjel“.

Put put = novo Put (key.get ()) put.add (Bytes.toBytes ('official_info'), Bytes.toBytes ('name'), Bytes.toBytes (uzrok [0])) put.add (Bytes. toBytes ('official_info'), Bytes.toBytes ('odjel'), Bytes.toBytes (uzrok [1])) context.write (ključ, put)

Preneseni podaci u Hbase:

skenirati zaposlenika

Kao što vidimo uspjeli smo dovršiti zadatak migriranja naših poslovnih podataka iz relacijskog SQL DB-a u NoSQL DB.

U sljedećem blogu naučit ćemo kako pisati i izvršavati kodove za druge ulazne i izlazne formate.

Nastavite objavljivati ​​svoje komentare, pitanja ili bilo kakve povratne informacije. Volio bih čuti vaše mišljenje.

Imate pitanje za nas? Molimo spomenite to u odjeljku za komentare i javit ćemo vam se.

Vezane objave: