Kumulativna transformacija sa stanjem u strujanju iskrenja Apache

Ovaj post na blogu raspravlja o transformacijama koje su značajne za Spark Streaming. Saznajte sve o kumulativnom praćenju i unapređivanju vještine za karijeru Hadoop Spark.

Doprinos Prithviraj Bose

U svom prethodnom blogu raspravljao sam o transformacijama sa stanjem koristeći koncept prozora Apache Spark Streaming. Možete ga pročitati ovdje .



U ovom postu raspravljat ću o kumulativnim operacijama sa statusom u Apache Spark Streamingu. Ako ste tek upoznati sa Spark Streamingom, toplo vam preporučujem da pročitate moj prethodni blog kako biste razumjeli kako funkcionira otvaranje prozora.

Vrste transformacije sa stanjem u iskrenom strujanju (nastavak ...)

> Kumulativno praćenje

Koristili smo reduceByKeyAndWindow (...) API za praćenje stanja ključeva, no otvaranje prozora postavlja ograničenja za određene slučajeve upotrebe. Što ako želimo akumulirati stanja ključeva cijelo vrijeme, a ne ograničavati ih na vremenski prozor? U tom bismo slučaju trebali koristiti updateStateByKey (…) VATRA.



Ovaj API uveden je u Spark 1.3.0 i bio je vrlo popularan. Međutim, ovaj API ima neke režijske performanse, njegove performanse se pogoršavaju kako se veličina stanja s vremenom povećava. Napisao sam uzorak da pokažem upotrebu ovog API-ja. Možete pronaći kod ovdje .

Spark 1.6.0 predstavio je novi API mapWithState (…) što rješava opće troškove izvedbe koje postavlja updateStateByKey (…) . Na ovom blogu raspravljat ću o ovom konkretnom API-ju koristeći primjer programa koji sam napisao. Možete pronaći kod ovdje .

Prije nego što uđem u prolazak s kodom, poštedimo nekoliko riječi na kontrolnoj točki. Za svaku transformaciju sa statusom, kontrolna točka je obavezna. Kontrolna točka je mehanizam za vraćanje stanja ključeva u slučaju da program upravljačkog programa zakaže. Kada se upravljački program ponovno pokrene, stanje ključeva vraća se iz datoteka kontrolnih točaka. Lokacije kontrolnih točaka obično su HDFS ili Amazon S3 ili bilo koja pouzdana pohrana. Tijekom testiranja koda, također se može pohraniti u lokalni datotečni sustav.



U uzorku programa slušamo tok teksta utičnice na host = localhost i port = 9999. Tokenizira dolazni tok u (riječi, broj pojavljivanja) i prati broj riječi pomoću API-ja 1.6.0 API mapWithState (…) . Uz to se pomoću ključa uklanjaju ključevi bez ažuriranja StateSpec. timeout API. Provjeravamo u HDFS-u, a učestalost provjere je svakih 20 sekundi.

Stvorimo prvo sesiju Spark Streaming,

Spark-streaming-session

Mi stvaramo checkpointDir u HDFS-u, a zatim pozvati objektnu metodu getOrCreate (...) . The getOrCreate API provjerava checkpointDir da bi se utvrdilo postoje li neka prethodna stanja za obnavljanje, ako to postoji, on ponovno stvara sesiju Spark Streaming i ažurira stanja ključeva iz podataka pohranjenih u datotekama prije nego što krene s novim podacima. U suprotnom stvara novu sesiju Spark Streaming.

The getOrCreate uzima naziv direktorija kontrolne točke i funkciju (koju smo imenovali createFunc ) čiji bi potpis trebao biti () => StreamingContext .

Ispitajmo kod iznutra createFunc .

koje su funkcije u sql-u

Redak 2: Stvaramo kontekst strujanja s nazivom posla na „TestMapWithStateJob“ i intervalom batch-a = 5 sekundi.

Redak 5: Postavite direktorij kontrolne točke.

Redak 8: Postavite specifikaciju stanja pomoću klase org.apache.streaming.StateSpec objekt. Prvo postavljamo funkciju koja će pratiti stanje, a zatim postavljamo broj particija za rezultirajuće DStreamove koji će se generirati tijekom naknadnih transformacija. Na kraju smo postavili vremensko ograničenje (na 30 sekundi), ako se bilo kakvo ažuriranje ključa ne primi za 30 sekundi, tada će se ukloniti stanje ključa.

Linija 12 #: Postavite tok utičnice, izravnajte dolazne skupne podatke, stvorite par ključ / vrijednost, nazovite mapWithState , postavite interval provjere na 20s i na kraju ispišite rezultate.

Spark okvir poziva th e createFunc za svaki ključ s prethodnom vrijednošću i trenutnim stanjem. Izračunavamo zbroj i ažuriramo stanje kumulativnim zbrojem i na kraju vraćamo zbroj za ključ.

Izvori Github -> TestMapStateWithKey.scala , TestUpdateStateByKey.scala

Imate pitanje za nas? Molimo spomenite to u odjeljku za komentare i javit ćemo vam se.

Vezane objave:

Započnite s Apache Spark & ​​Scala

Stanovite transformacije s prozorima u iskrenju