Azure Stream Analytics, prosty przypadek

Azure Stream Analytics to chyba jeden z największych "kombajnów" na Azure, a przynajmniej im głębiej w niego zaglądam tym więcej ciekawostek znajduję. Ograniczę się jednak na razie do przypadków IoT.
 
 

Czym jest Stream Analytics?

Azure Stream Analytics to usługa pozwalająca na skomplikowane przetwarzanie w czasie rzeczywistym zdarzeń. A właściwie jest to silnik do przetwarzania zdarzeń, ponieważ może być uruchamiany również na urządzeniach IoT Edge, nie tylko w Azure jakoś usługa PaaS.  Stream Analytics potrafi przetwarzać duże wolumeny danych streamingowych z wielu równoległych źródeł. Świetnie sprawdza się we wszystkich przypadkach danych IoT czy to będzie telemetria czy dane analityczne z autonomicznych samochodów. Ale również poradzi sobie z danymi streamingowymi np. wykrywania anomalii giełdowych, czy przetwarzaniem logów czy analityk aplikacji.
 

Create new

Stream Analytics jest naszym silnikiem do uruchamiania pojedynczych 'zadań'. Jednostek obliczeniowych. Czyli Stream Analytics Job, czyli taką usługę sobie tworzymy. Wiele nie potrzeba, nazwa, subskrypcja, lokalizacja. Mamy też do wyboru czy jest to usługa w chmurze, czy też będziemy sobie ją uruchamiać bezpośrednio na urządzeniach. 
Streaming units. To dość ważne ustawienie. Stream Analytics zapewnia nam bardzo proste skalowanie każdego joba, po prostu dodajemy Streaming Unit. Jeden SU kosztuje 0.12$/h, czyli około 88$ na miesiąc. Proponowane 6 SU na początek jest dość dużą wartością, a ile jednostek potrzebujemy musimy określić na podstawie ilości danych i sposobu ich obróbki. Jest to na tyle skomplikowane, że w dokumentacji jest osobna sekcja Leverage query parallelization in Azure Stream Analytics 
Wjeżdżają nam tutaj dodatkowo partycjonowania (w końcu wiadomo po co nam partycje i kto to jest reader - w tym wypadku to jeden SU). I tak, jeśli mamy jeden krok w Query Joba to maksymalnie możemy mieć 6 SU, ale jeśli czytamy dane ze źródła, które ma 16 partycji dla tego samego joba możemy mieć 96 SU. Warto się zagłębić w dokumentację.  
 
 

Główne składowe

Na podsumowaniu naszego Joba widzimy najważniejsze składowe: Inputs, Outputs, Query. Tutaj również na szybko job będzie informował, gdy coś jest nie tak z Inputami lub Outputami.
 
W prostym przykładzie odtwórzmy sobie ścieżkę zimną, czyli zapisywanie danych z IoT Huba do Blob Storage tak jak one do nas przychodzą lub delikatnie zmienione.
 

Input

Ponieważ przetwarzamy stream danych to do wyboru mamy tylko usługi, które takie dane zwracają: Event Hub, IoT Hub oraz Blob storage. Dodatkowo możemy używać danych referencyjnych przechowywanych w Blob storage lub bazie SQL
 

Wybieramy więc nowy input IoTHub i wypełniamy.
Pierwsze pole Input alias - to alias, którym potem będziemy się posługiwać w naszym zapytaniu, warto więc aby był znaczący, ale też w miarę prosty. Dalej wyszukujemy sobie nasz IoT Hub, jego endpoint i łączymy po uprawnieniach.
I teraz robi się ciekawie. Consumer group - na wbudowanym endpoincie to grupa, której będzie używać ten Job do czytania danych z naszego IoT Huba. Ja tutaj wybieram "secondgroup", przypominam poniżej również skąd się to wzięło. Ciekawostką jest, że dostajemy tutaj podpowiedź, aby stworzyć osobną grupę per job ponieważ IoT Hub umożliwia tylko 5 readerów per grupę. Oczywiście jest to cenna podpowiedź i warto się do niej stosować, również dlatego że chcemy, aby nasz job przetwarzał wszystkie przychodzące wiadomości, a nie konkurował z innymi readerami o dane.
 

Dalej mamy jeszcze dodatkowe opcje wynikające z wielości danych przyjmowanych. Możemy sobie ustawić typ kompresji oraz posługiwać się innymi formatami danych niż tylko JSON i AVRO. 
 
 


Outputs

Mamy tutaj dużo większy wybór. Usługi obsługujące dalej zdarzenia jak Event Hub, Servis Bus, Servis Bus Topic, Azure Functions oraz wybór usług do przechowywania Power BI, Data Lake Storage, Blob Storage, Table Storage, SQL Database oraz Cosmos DB. Przy Cosmos DB trzeba zaznaczyć, że na razie obsługiwane jest tylko API SQL
My wybieramy Blob Storage. I jedziemy przez konfiguracje. Tworzymy sobie osobny kontener. Dalej możemy wybrać format ścieżki, w której będą zapisywane nasze pliki i rozmiar batcha czy to ilość wierszy czy też czas, po którym partia danych i tak się zapisze. Żadnych nowości. 
 

Query

Mając wejścia oraz wyjścia wystarczy przetworzyć jakoś dane z jednego końca na drugi. 
Stream Analytics ma swój język zapytań bardzo podobny do zwykłego SQLa. W najprostszej wersji mamy co=SELECT, skąd=FROM, dokąd=INTO
Po standardowe wzorce zapytań zajrzyj na Stream analytics query patterns, bo trochę tego jest :) 
 
Okno zapytania. Po lewej mamy dostępne wszystkie wejścia i wyjścia. Jeśli mamy dostępne dane w elemencie wejściowym, jak w tym przypadku w iotHub, mamy dodatkową ikonkę wskazującą na dokument. Możemy te dane przeglądać w postaci tabeli lub w postaci surowych danych, w tym przypadku JSON. Jest to bardzo pomocne, jeśli chcemy sobie ograniczyć pola, które będziemy wybierać lub, jak w tym przypadku, jeśli chcemy spłycić strukturę obiektu. Najpierw biorę wszystkie pola, a potem jeszcze wszystkie pola z obiektu IoTHub.

Gdy napiszemy satysfakcjonujące nas zapytanie możemy go przetestować (na danych przykładowych które wcześniej zostały zaciągnięte). To tylko poglądowy efekt naszej pracy. W taki sposób możemy napisać zapytanie, które da nam wartość. Gdy skończymy zapisujemy zapytanie.

Run Job

Mamy jeszcze jeden krok do zrobienia. Musimy uruchomić nasz Job. Całą konfigurację robimy zawsze na zatrzymanym Jobie. Za zatrzymany Job również nie płacimy. Jak? Klikamy Start. Możemy skonfigurować, kiedy Job ma się uruchomić albo uruchomić go od razu.
 
W naszym przypadku job pobiera dane z IoTHuba i zapisuje do storege. Pliki nazywane są w sposób unikatowy, a że nie wybraliśmy żadnego wzorca to tak po prostu luzem. Działa też batch po 5 minutach, bo mamy w pliku 67 wierszy choć nie widać tego na screenie, ale kto ciekawy może zajrzeć do pliku tutaj
 

Komentarze

Popularne posty