RDD koristeći Spark: Građevinski blok Apache Spark



Ovaj blog o RDD-u koji koristi Spark pružit će vam detaljno i sveobuhvatno znanje o RDD-u, što je temeljna jedinica Sparka i koliko je korisno.

, Sama riječ dovoljna je da generira iskru u umu svakog inženjera Hadoop-a. DO n u memoriji alat za obradu koji je munjevit u klaster računanju. U usporedbi s MapReduceom, dijeljenje podataka u memoriji čini RDD-ove 10-100x brže nego dijeljenje mreže i diska, a sve je to moguće zbog RDD-ova (elastičnih distribuiranih skupova podataka). Ključne točke na koje se danas fokusiramo u ovom članku o istraživanju razvoja pomoću članka Spark su:

Trebate RDD-ove?

Zašto nam treba RDD? -RDD pomoću Sparka





Svijet se razvija s i Znanost o podacima zbog napretka u . Algoritmi na temelju Regresija , , i koja traje dalje Distribuirano Iterativni postupak acija moda koja uključuje ponovnu upotrebu i dijeljenje podataka između više računarskih jedinica.

Tradicionalno tehnikama potrebna stabilna srednja i distribuirana pohrana poput HDFS obuhvaćajući ponavljajuće proračune s replikacijama podataka i serializacijom podataka, što je puno usporilo proces. Pronaći rješenje nikad nije bilo lako.



Ovo je gdje RDD-ovi (Otporni distribuirani skupovi podataka) dolazi do velike slike.

RDD Jednostavni su za upotrebu i bez napora za stvaranje jer se podaci uvoze iz izvora podataka i ubacuju u RDD-ove. Nadalje, primjenjuju se operacije za njihovu obradu. Oni su a distribuirana kolekcija memorije s dozvolama kao Samo za čitanje i što je najvažnije, jesu Otporan na greške .



Ako ijedan particija podataka od RDD je izgubljeno , može se obnoviti primjenom istog preobrazba operacija na toj izgubljenoj particiji u loza , umjesto da obrađuje sve podatke od nule. Ovakav pristup u scenarijima u stvarnom vremenu može stvoriti čuda u situacijama gubitka podataka ili kada sustav ne radi.

Što su RDD-ovi?

RDD ili ( Skup elastičnih distribuiranih podataka ) je temeljna struktura podataka u Sparku. Uvjet Otporan definira sposobnost koja automatski generira podatke ili podatke kotrljajući se natrag prema izvorno stanje kada se dogodi neočekivana nesreća s vjerojatnošću gubitka podataka.

Podaci zapisani u RDD-ove su pregrađena i pohranjeni u više izvršnih čvorova . Ako je izvršni čvor ne uspije u vremenu izvođenja, zatim odmah dobiva sigurnosnu kopiju s sljedeći izvršni čvor . Zbog toga se RDD-ovi smatraju naprednom vrstom podatkovnih struktura u usporedbi s drugim tradicionalnim strukturama podataka. RDD-ovi mogu pohraniti strukturirane, nestrukturirane i polustrukturirane podatke.

Krenimo dalje s našim RDD-om pomoću bloga Spark i upoznajmo jedinstvene značajke RDD-a što mu daje prednost nad ostalim vrstama podatkovnih struktura.

Značajke RDD-a

  • U sjećanju (RADNA MEMORIJA) Računanja : Koncept izračunavanja u memoriji odvodi obradu podataka u bržu i učinkovitiju fazu u kojoj cjelokupno izvođenje sustava je nadograđena.
  • L njegova Procjena : Pojam lijena procjena kaže transformacije primjenjuju se na podatke u RDD-u, ali izlaz se ne generira. Umjesto toga, primijenjene transformacije su prijavljeni.
  • Upornost : Dobiveni RDD-ovi su uvijek višekratnu upotrebu.
  • Grubozrnate operacije : Korisnik može primijeniti transformacije na sve elemente u skupovima podataka karta, filtar ili grupirati prema operacijama.
  • Tolerantan na kvarove : Ako postoji gubitak podataka, sustav to može otkotrljati se natrag na svoje izvorno stanje pomoću prijavljenog transformacije .
  • Nepromjenljivost : Podaci definirani, dohvaćeni ili stvoreni ne mogu biti promijenio nakon što se prijavi u sustav. U slučaju da trebate pristupiti i izmijeniti postojeći RDD, morate stvoriti novi RDD primjenom skupa Transformacija funkcionira na trenutnom ili prethodnom RDD-u.
  • Pregrađivanje : To je presudna jedinica paralelizma u Varnici RDD. Prema zadanim postavkama, broj stvorenih particija temelji se na vašem izvoru podataka. Možete čak odrediti broj particija koje želite napraviti pomoću prilagođena particija funkcije.

Stvaranje RDD-a pomoću Sparka

RDD-ovi se mogu stvoriti u tri načina:

  1. Čitanje podataka iz paralelizirane zbirke
val PCRDD = spark.sparkContext.parallelize (Array ('Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat'), 2) val resultRDD = PCRDD.collect () resultRDD.collect ( ) .foreach (println)
  1. Podnošenje zahtjeva preobrazba na prethodnim RDD-ima
val riječi = spark.sparkContext.parallelize (Seq ('Spark', 'je', 'a', 'vrlo', 'moćan', 'jezik')) val wordpair = words.map (w = (w.charAt ( 0), w)) wordpair.collect (). Foreach (println)
  1. Čitanje podataka iz vanjska pohrana ili staze datoteka poput HDFS ili HBase
val Sparkfile = spark.read.textFile ('/ user / edureka_566977 / spark / spark.txt.') Sparkfile.collect ()

Operacije izvedene na RDD-ima:

Postoje uglavnom dvije vrste operacija koje se izvode na RDD-ima, i to:

  • Transformacije
  • Akcije

Transformacije : The operacijama primjenjujemo na RDD-ove za filtar, pristup i preinačiti podaci u roditeljskom RDD-u za generiranje a uzastopni RDD Zove se preobrazba . Novi RDD vraća pokazivač na prethodni RDD osiguravajući ovisnost između njih.

Transformacije su Lijene procjene, drugim riječima, operacije primijenjene na RDD-u na kojima radite bit će zabilježene, ali ne pogubljen. Sustav baca rezultat ili iznimku nakon pokretanja datoteke Akcijski .

Transformacije možemo podijeliti u dvije vrste kao što je prikazano u nastavku:

  • Uske transformacije
  • Široke transformacije

Uske transformacije Primjenjujemo uske transformacije na a pojedinačna particija nadređenog RDD-a za generiranje novog RDD-a jer su podaci potrebni za obradu RDD-a dostupni na jednoj particiji matični ASD . Primjeri za uske transformacije su:

  • karta()
  • filtar()
  • flatMap ()
  • particija ()
  • mapPartitions ()

Široke transformacije: Primjenjujemo široku transformaciju na više particija za generiranje novog RDD-a. Podaci potrebni za obradu RDD-a dostupni su na više particija matični ASD . Primjeri za široke transformacije su:

  • smanjitiBy ()
  • unija()

Akcije : Akcije upućuju Apache Spark da se prijavi računanje i rezultat ili iznimku proslijedite natrag RDD-u vozača. Nekoliko akcija uključuje:

  • skupljati()
  • računati()
  • uzeti()
  • prvi()

Primijenimo praktično operacije na RDD-ima:

IPL (indijska Premier liga) je kriket turnir sa svojim nadanjem na vrhuncu. Dakle, dopustimo danas da se dohvatimo IPL skupa podataka i izvršimo naš RDD pomoću Sparka.

  • Prvo, preuzmimo CSV podatke o podudaranju IPL-a. Nakon preuzimanja počinje izgledati kao EXCEL datoteka s redovima i stupcima.

U sljedećem koraku palimo iskru i učitavamo datoteku match.csv sa svog mjesta, u mom slučaju mogcsvmjesto datoteke je “/User/edureka_566977/test/matches.csv”

niz objekata u Java primjeru programa

Počnimo sada s Transformacija prvi dio:

  • karta():

Koristimo Transformacija karte primijeniti specifičnu operaciju transformacije na svaki element RDD-a. Ovdje kreiramo RDD pod imenom CKfile gdje pohranjujemo našcsvdatoteka. Stvorit ćemo još jedan RDD pozvan Države pohrani detalje o gradu .

spark2-shell val CKfile = sc.textFile ('/ user / edureka_566977 / test / matches.csv') CKfile.collect.foreach (println) val stanja = CKfile.map (_. split (',') (2)) state.collect (). foreach (println)

  • filtar():

Transformacija filtra, sam naziv opisuje njegovu upotrebu. Ovu operaciju transformacije koristimo za filtriranje selektivnih podataka iz zbirke zadanih podataka. Prijavite se rad filtra ovdje da biste dobili zapise o IPL utakmicama godine 2017. godine i pohranite ga u fil RDD.

val fil = CKfile.filter (line => line.contens ('2017')) fil.collect (). foreach (println)

  • flatMap ():

Primjenjujemo flatMap operaciju transformacije za svaki od elemenata RDD-a za stvaranje novogRDD-a. Slična je transformaciji karte. ovdje se prijavljujemoRavna kartado ispljunite šibice grada Hyderabada i pohranite podatke ufilRDDRDD.

val filRDD = fil.flatMap (linija => line.split ('Hyderabad')). collect ()

  • particija ():

Svaki podatak koji upišemo u RDD podijeljen je na određeni broj particija. Koristimo ovu transformaciju za pronalaženje broj particija podaci su zapravo podijeljeni na.

val fil = CKfile.filter (line => line.contains ('2017')) fil.partitions.size

  • mapPartitions ():

MapPatitions smatramo alternativom Map () iza svakoga() zajedno. Ovdje koristimo mapPartitions za pronalaženje broj redova imamo u našem RDD-u.

val fil = CKfile.filter (line => line.contains ('2016')) fil.mapPartitions (idx => Array (idx.size) .iterator) .collect

  • reduceBy ():

KoristimoSmanjiteBy() na Parovi ključ / vrijednost . Koristili smo ovu transformaciju na našemcsvdatoteku za pronalaženje playera s najviši čovjek šibica .

val ManOfTheMatch = CKfile.map (_. split (',') (13)) val MOTMcount = ManOfTheMatch.map (WINcount => (WINcount, 1)) val ManOTH = MOTMcount.reduceByKey ((x, y) => x + y) .map (tup => (tup._2, tup._1)) sortByKey (false) ManOTH.take (10) .foreach (println)

  • unija():

Ime sve objašnjava, Mi koristimo sindikalnu transformaciju je do udružite dva RDD-a zajedno . Ovdje stvaramo dva RDD-a, naime fil i fil2. fil RDD sadrži evidenciju IPL utakmica 2017. godine, a fil2 RDD sadrži evidenciju IPL 2016 utakmice.

val fil = CKfile.filter (linija => line.contains ('2017')) val fil2 = CKfile.filter (line => line.contains ('2016')) val uninRDD = fil.union (fil2)

Krenimo od Akcijski dio u kojem prikazujemo stvarni izlaz:

  • skupljati():

Prikupljanje je radnja na koju naviknemo prikazati sadržaj u RDD-u.

val CKfile = sc.textFile ('/ user / edureka_566977 / test / matches.csv') CKfile.collect.foreach (println)

  • računati():

Računatije akcija koju koristimo za brojanje broj zapisa prisutan u RDD-u.Ovdjekoristimo ovu operaciju za brojanje ukupnog broja zapisa u našoj datoteci matches.csv.

val CKfile = sc.textFile ('/ user / edureka_566977 / test / matches.csv') CKfile.count ()

  • uzeti():

Take je radnja akcije slična sličnoj prikupljanju, ali jedina razlika je u tome što se može ispisati bilo koju selektivni broj redaka prema zahtjevu korisnika. Ovdje primjenjujemo sljedeći kod za ispis deset vodećih izvještaja.

val statecountm = Scount.reduceByKey ((x, y) => x + y) .map (tup => (tup._2, tup._1)) sortByKey (false) statecountm.collect (). foreach (println) statecountm. take (10) .foreach (println)

  • prvi():

First () je akcijska operacija slična skupljanju () i preuzimanju ()tokoristi se za ispis najvišeg izvješća s izlazom Ovdje koristimo prvu () operaciju za pronalaženje maksimalan broj odigranih utakmica u određenom gradu a kao rezultat dobivamo Mumbai.

val CKfile = sc.textFile ('/ user / edureka_566977 / test / matches.csv') val stanja = CKfile.map (_. split (',') (2)) val Scount = state.map (Scount => ( Scount, 1)) scala & gt val statecount = Scount.reduceByKey ((x, y) => x + y) .collect.foreach (println) Scount.reduceByKey ((x, y) => x + y) .collect.foreach (println) val statecountm = Scount.reduceByKey ((x, y) => x + y) .map (tup => (tup._2, tup._1)) sortByKey (false) statecountm.first ()

Da bi naš proces učenja RDD-a pomoću Sparka bio još zanimljiviji, smislio sam zanimljiv slučaj korištenja.

RDD koristeći Spark: Pokemon Use Case

  • Prvo, Preuzmimo datoteku Pokemon.csv i učitajmo je u svjećicu, kao što smo to učinili s datotekom Matches.csv.
val PokemonDataRDD1 = sc.textFile ('/ user / edureka_566977 / PokemonFile / PokemonData.csv') PokemonDataRDD1.collect (). foreach (println)

Pokemoni su zapravo dostupni u velikoj raznolikosti. Pronađimo nekoliko sorti.

  • Uklanjanje sheme iz datoteke Pokemon.csv

Možda nam neće trebati Shema datoteke Pokemon.csv. Stoga ga uklanjamo.

val Head = PokemonDataRDD1.first () val NoHeader = PokemonDataRDD1.filter (line =>! line.equals (Head))

  • Pronalaženje broja pregrade naš se pokemon.csv distribuira u.
println ('No.ofpartitions =' + NoHeader.partitions.size)

  • Vodeni Pokemon

Pronalaženje broj vodenih pokemona

val WaterRDD = PokemonDataRDD1.filter (line => line.contens ('Water')) WaterRDD.collect (). foreach (println)

  • Vatreni Pokemon

Pronalaženje broj Vatrenih pokemona

val FireRDD = PokemonDataRDD1.filter (line => line.contains ('Fire')) FireRDD.collect (). foreach (println)

  • Također možemo otkriti stanovništvo druge vrste pokemona pomoću funkcije count
WaterRDD.count () FireRDD.count ()

  • Budući da mi se sviđa igra obrambena strategija pronađimo pokemon sa maksimalna obrana.
val defenceList = NoHeader.map {x => x.split (',')}. map {x => (x (6) .toDouble)} println ('Highest_Defence:' + defenceList.max ())

  • Znamo maksimum vrijednost obrambene snage ali ne znamo o kojem je pokemonu riječ. pa, pronađimo što je to pokemon.
val defWithPokemonName = NoHeader.map {x => x.split (',')}. map {x => (x (6) .toDouble, x (1))} val MaxDefencePokemon = defWithPokemonName.groupByKey.takeOrdered (1) (Naručivanje [Double] .reverse.on (_._ 1)) MaxDefencePokemon.foreach (println)

  • Sada ćemo riješiti pokemon s najmanje Obrana
val minDefencePokemon = defenceList.distinct.sortBy (x => x.toDouble, true, 1) minDefencePokemon.take (5) .foreach (println)

  • Sada da vidimo Pokemone s manje obrambena strategija.
val PokemonDataRDD2 = sc.textFile ('/ user / edureka_566977 / PokemonFile / PokemonData.csv') val Head2 = PokemonDataRDD2.first () val NoHeader2 = PokemonDataRDD2.filter (line => Head lineHeamN defadWon .map {x => x.split (',')}. map {x => (x (6) .toDouble, x (1))} val MinDefencePokemon2 = defWithPokemonName2.groupByKey.takeOrdered (1) (Naručivanje [Double ] .on (_._ 1)) MinDefencePokemon2.foreach (println)

Dakle, ovim smo došli do kraja ovog RDD-a koristeći članak Spark. Nadam se da smo malo osvijetlili vaše znanje o RDD-ima, njihovim značajkama i raznim vrstama operacija koje se na njima mogu izvoditi.

Ovaj članak zasnovan na osmišljen je kako bi vas pripremio za ispit za certificiranje programera Cloudera Hadoop and Spark (CCA175). Dobit ćete detaljno znanje o Apache Spark i Spark ekosustavu, koji uključuje Spark RDD, Spark SQL, Spark MLlib i Spark Streaming. Dobit ćete sveobuhvatno znanje o programskom jeziku Scala, HDFS, Sqoop, Flume, Spark GraphX ​​i sustavu za razmjenu poruka, kao što je Kafka.