середу, 2 березня 2016 р.

AWS Key Management System (KMS)


AWS KMS
  Key Management System - один з веб сервісів Amazon покликаний для захисту інформації, а саме енкриптання (шифрування) та декриптання (розшифрування) даних за допомогою ID ключа, що зберігається на AWS.
  Використання KMS є дуже рентабельним в тому плані коли вам не хочеться ламати голову над тим як шифрувати дані, як зберігати метадані шифрування (ключі, паролі чи фрази) і як захистити свій алгоритм від декомпіляції або іншого роду взломів. KMS все це інкапсулює в собі, все чим ви оперуєте це внутрішня ID ключа котрий зберігається в KMS і AWS креденшили (AccessKey ID & SecretKey) а останні означають що комусь доведеться в прямому сенсі слова "взламати" Amazon щоб добратися до ваших даних. Іншими словами використовуючи KMS ви перекладаєте обов'язок шифрування та розшифрування даних на Amazon.

Постановка задачі
  В цій статті ми повинні: 
  - Розібратися з AwsKms клієнтом, що входить до пакету AWS SDK;
  - Заміряти його перформенс і створити най оптимальнішу конфігурацію для шифрування і дешифрування 1000 об'єктів типу String, скажемо 1000 паролів користувачів, за один підхід;
  - Розібратися з проблемами збереження шифрованих даних в String форматі.
  
Паралельно / Послідовно
  AWS SDK містить дві реалізації KMS клієнта , послідовну (AWSKMSClient) і паралельну (AWSKMSAsyncClient), відрізняються вони конструкторами, паралельна версія підтримує конструктор для отримання ExecutorService (якщо його не передавати то по замовчуванню його розмір буде 50 потоків) а також паралельна версія клієнта має методи з приставкою Async, наприклад : "decryptAsync" для роботи у розпаралеленому режимі. Інші ж методи у них ідентичні (тобто обидва мають методи для роботи у послідовному режимі).
В цій статті буде використовуватись клієнт з підтримкою паралельних операцій.

Підготовка проекту
  Для того щоб почати працювати з KMS треба підключити AWS SDK до проекту , я використовую наступно залежність у моєму Apache Maven проекті :

<dependency>
      <groupId>com.amazonaws</groupId>
      <artifactId>aws-java-sdk</artifactId>
      <version>1.10.51</version>
</dependency>

Створення клієнта
  Перш ніж створити клієнт і почати роботу потрібно ініціалізувати AWS креденшили, які клієнт приймає в конструкторі. Існує кілька способів отримати ці креденшили, можна імплементувати провайдер, і вже в середині вирішувати звідки отримати дані або ж просто ініціалізувати їх в коді, що ми і зробимо :

AWSCredentials awsCredentials = new AWSCredentials() {
    public String getAWSAccessKeyId() {
        return AWS_ACCESS_KEY_ID;
    }

    public String getAWSSecretKey() {
        return AWS_SECRET_KEY;
    }
};

  Далі потрібно створити конфігурацію для нашого клієнта, звісно можна обійтися без неї (в середині і так використається конфігурація по замовчуванню). Нам вона потрібена лише щоб переконфігурувати політику ретраїв (повторних спроб), оскільки KMS має певні рестрікшени (100 реквестів в секунду на запити  encrypt/decrypt та ін. одночасно) а ми використовуємо асинхронний клієнт, тому нам потрібно обробляти випадки коли виходимо за цей ліміт а також інші випадки при яких в асинхронному режимі варто повторити операцію (спершу я обробляв всі еррори і робив операції повторно в хендлері реквеста без цієї конфігурації, проте це не вірно, є випадки, наприклад: "дешифрування не валідних даних", які повторювати не треба). Сам клієнт вже має можливість повторювати певного роду помилки при роботі але по замовчуванню це вимкнено.
  Отже створюємо конфігурацію і задаємо кількість повторів (далі ми будемо визначати оптимальну кількість потоків в екзекютері нашого клієнта , адже чим більше потоків і даних тим більше треба буде повторів і тим гірший буде перформенс) :

ClientConfiguration clientConfiguration = PredefinedClientConfigurations.defaultConfig();
clientConfiguration.setMaxErrorRetry(1); 

  Наразі встановимо кількість повторів рівною 1.
  Маючи все що описано вище можна ініціалізувати KMS клієнт :

AWSKMSAsyncClient awsKmsClient = new AWSKMSAsyncClient(awsCredentials, clientConfiguration, 
Executors.newFixedThreadPool(50));

  В створеному клієнті ми використали наші AWS креденшили та конфігурацію, а також передали ExecutorService. Розмір пулу 50 є розміром по замовчуванню.

Шлях до AWS
  Amazon має багато дата-центрів по всій планеті і всі вони мають різні зони доступу (availability zone) тому і шлях потібно вказувати додатковим параметром :

awsKmsClient.setEndpoint(AWS_ENDPOINT);

  Для прикладу, шаблон шляху є таким : "https://kms.<availability_zone>.amazonaws.com", де <availability_zone> може бути: "eu-central-1".

Створення ключа для шифрування
  Для шифрування даних нам потрібно використовувати Encryption Key Id (Id ключа шифрування). Щоб створити цей ключ і дізнатися його ID можна скористатися нашим KMS клієнтом або через Веб консоль AWS. Я користувався AWS веб консоллю і для того, щоб це зробити потрібно піти по наступному шляху :


Маючи все попереднє можна сміло приступати до шифрування / розшифрування даних.

Шифруємо (Data Encryption)
  Для шифрування даних потрібно створити EnryptRequest і передати його на виконання нашому AWSKMSAsyncClient'у :

EncryptRequest encryptRequest = new EncryptRequest();
encryptRequest.setKeyId(KMS_KEY_ID);
encryptRequest.setPlaintext(ByteBuffer.wrap("This is text message".getBytes()));

  В методі "setKeyId" використовуємо ID ключа а в метод "setPlaintext" передаємо байт-буфер даних котрі шифруються, в мому випадку це проста текстова стрічка. Також АРІ об'єкта EncryptRequest дозволяє створити його за допомогою білдера :

EncryptRequest encryptRequest = new EncryptRequest().withKeyId(KMS_KEY_ID).withPlaintext(ByteBuffer.wrap("This is text message".getBytes()));

  Після цього передаємо запит шифрування на виконання нашому клієнту :

byte[] encryptedData = null;
awsKmsClient.encryptAsync(encryptRequest,
  new AsyncHandler<EncryptRequest, EncryptResult>() {
    public void onError(Exception e) {
      //error
    }

    public void onSuccess(EncryptRequest request, EncryptResult encryptResult) {
      // get encrypted data bytes from ByteBuffer
      encryptedData = encryptResult.getCiphertextBlob().array();
    }
  });

  Оскільки я використовую клієнт з паралельною обробкою мені потрібно обробити результат виконання обробником або ж зберегти отриманий після виконання методу "encryptAsync" об'єкт Future<EncryptResult> і дочекавшись його значення отримати бажаний результат. Як показано вище, я використав обробник і в методі "onSuccess" вичитую ByteBuffer а вже з нього беру байти закодованого повідомлення. Метод "onFailure" я тут не чіпаю, хоча з даним підходом (коли ми використали Retry Policy клієнта) можна сказати що якщо цей метод викличеться то можна сміло фейлити операцію і логувати екзепшен (раніше я пробував тут повторно виконувати шифрування, та це не є правильним адже спрацювати він може через те, наприклад, що ключа яким шифруєте нема).

Розшифровуємо (Data Decryption)
  Для того щоб розшифрувати наше повідомлення також потрібно створити запит :

DecryptRequest decryptRequest = new DecryptRequest().withCiphertextBlob(ByteBuffer.wrap(encryptedData));

Як бачите, для розшифрування повідомлення не потрібно вказувати ID ключа. А все тому, що зашифровані дані містять в собі мета-дані за допомогою яких відбувається розшифрування.

Запускаємо цей запит на виконання:

String result = null;
awsKmsClient.decryptAsync(decryptRequest,
  new AsyncHandler<DecryptRequest, DecryptResult>() {
    public void onError(Exception e) {
      //error
    }

    public void onSuccess(DecryptRequest request, DecryptResult decryptResult) {
      byte[] data = decryptResult.getPlaintext().array();
      result = new String(data);
    }
  });

  Аналогічно шифруванню, тут я використовує обробник. Як тільки виконається метод "onSuccess" я дістаю байти повідомлення і перетворюю в стрінгу.

  Усе вище працює без проблем. Загалом схему шифрування та дешифрування можна зобрадити так :




Проблеми зі збереженням шифрованих даних у форматі String
  Моя задача передбачала що дані (шифровані чи ні) передаються і зберігаються у форматі String. А String має конструктор що приймає масив байтів, тому паємо щойно зашифровані дані в стрінгу і зберігаємо (чи передаємо). Проте як тільки ми спробуємо перетворити стрінгу назад в байти і розшифрувати, ми отримаємо екзепшен з повідомленням :

Service: AWSKMS; Status Code: 400; Error Code: InvalidCiphertextException; Request ID: <your_request_id>

  А все тому, що байти отримані з об'єкта String є "поломаними" і AWS KMS не може їх провалідувати і розшифрувати. 
  Моє розуміння проблеми наступне: - Коли зашифрований масив байтів (отриманий з відповіді KMS клієнта) приводиться до об'єкту String (за допомогою його конструктора) тоді і відбувається кораптання даних. Всім відомо що String має кодування по замовчуванню рівне "UTF-8" а це від 4 до 6 байтів на символ, реально ж використовуються символи тільки до 2 в 21 сепені (знайшов на вікіпедії), а як веде себе String коли зустрічає код символу що перевищує це число ? Хоча може в цей момент шифровані дані ще не "поломані", проте коли дану String'у привести назад в масив байтів щоб розшифрувати - точно "поломані".
  Нажаль я глибоко в цю проблему не закопувався, тому якщо хтось розуміється краще, я буду дуже вдячний за коментарі. 
  
Що робити ?
  На просторах інтернету був знайдений солюшен - перетворити шифровані дані в формат Base64 :

byte[] encoded = Base64.encodeBase64(data);

  Для цього було використано утиліту з пакету Apache Commons.
  Отриманий після цього масив байтів можна спокійно приводити до об'єкту String і зберігати, надсилати чи що кому треба. Насправді навіть порівнюючи (при виводі) стрінгу створену з масиву байт шифрованих KMS'ом і стрінгу створену з тих же даних тільки перетворених у Base64 було чітко видно що стрінга першого варіанту взагалі була не "валідною" - містила пусті символи (підозрюю просто не відображалися) та якісь ієрогліфи, а друга стрінга була просто хаотичним набором латинських літер та символів, що зарактерно для відображення будь-якої захифрованої інформації.

Звісно треба не забути що в такому випадку для розшиврування даних потрібно їх привезти з Base64 у нормальний вигляд :

byte[] decoded = Base64.decodeBase64(data);

І вже цей набір байтів розшифровувати за допомогою KMS.
  Звісно це лише мій випадок, коли я зберагаю у форматі String, якщо у вас є можливість зберігати це як маисв байтів або blob об'єкт то цього не потрібно.

Тестування в одному потоці та пошук оптимальних значень повторів та розміру пула потоків для клієнта
  Для визначення найоптимальніших налаштуванн KMS клієнта поставимо наступні умови :
- Зашифрувати 1000 повідомлень;
- Розшифрувати 1000 повідомлень.

Отже запускаємо наш тестовий стенд (посилання на вихідний код буде в кінці) з розміром пула потоків 50 та кількісттю повторних спроб 2 :

- 978 повідомлень успішно зашифровані;
- 22 повідомлення не було зашифровано успішно;
- Аплікація не дойшла до розшифрування так як не всі повідомлення зашифрувалися;
- Ось таке повідомлення в консолі :

You have exceeded the rate at which you may call KMS. Reduce the frequency of your calls. (Service: AWSKMS; Status Code: 400; Error Code: ThrottlingException; Request ID: <your_request_id>


Що сталося?
  AWS KMS має ліміти на з'єднання, для encrypt i decrypt операції це 100 запитів в секунду.
Оскільки наш пул потоків в середині клієнта = 50, а дані додаються в пуп без затримки то ми попросту досягли ліміту по з'єднанях, причому такого роду помилка обробляються в середині клієнта і запит буде повторено так як кількість повторів = 2.
Запити були повторені та всеодно 22 повідомлення так і не зашифрувалися , одже 22 реквеста були не вдалі а значить і наша конфігурація теж. (Доречі, 50 це число пулу в клієнті що встановлюється по замовчуванню якщо його не задавати при створенні)

Міняємо налаштування, мабуть всі скажуть що замало повторних спроб, ОК , ставимо більше :

Запускаємо наш тестовий стенд з розміром пула потоків 50 та кількісттю повторних спроб 3 :

- 1000 повідомлень успішно зашифровані;
- 983 повідомлення успішно розшифровано;
- 27 повідомлень не було розшифровано успішно;
- Те саме повідомлення в консолі.

Збільшуємо кількість повторів знову? - Вважаю це хибним, адже повторна спроба це і теоретично погано і практично б'є по швидкодії.

Пропуную зменшити розмір пула потоків і повернути кількість спроб назад:

Запускаємо наш тестовий стенд з розміром пула потоків 20 та кількісттю повторних спроб 2 :

- 999 повідомлень успішно зашифровані;
- 1 повідомлення не було успішно зашифровано;

Зменшуємо далі :

Запускаємо наш тестовий стенд з розміром пула потоків 10 та кількісттю повторних спроб 2 :

- 1000 повідомлень успішно зашифровані;
- 1000 повідомлень успішно розшифровано.

Юххууу... отже конфігурацію знайшли, проте коли я запускав ту ж роботу тільки від кількох потоків (а ми знаємо що пул клієнта буде 1 для всіх) то проблеми виникали але дуже рідко , що змусило підняти кількість спроб до 3, далі усе було ідеально.

Чи впливає маленьких розмір пула на час виконання всіх операції загалом в порівнянні з більшим розміром? - так алетести показали що ця різниця = 10 мілісекунд і не є суттєвою в мому випадку. Для економію часу не буду наводити дані тесту швидкодії проте найоптимальніша на мій погляд конфігурація дає такий результат :

Кількість потоків - 1, Розмір пулу в клієнта - 10, кількість повторів - 3 :

Encryption time for 1000 items (MILLISECONDS) : 8515
Errors happened on encryption cycle : 0
Decryption time for 1000 items (MILLISECONDS) : 10828
Errors happened on decryption cycle : 0
GENERAL TIME TO ENCRYPT 1000 AND DECRYPT 1000 items (MILLISECONDS) : 19343

20 секунд на все про все , думаю хороший результат =) .

Ціни
  Відомо що Amazon заробляє на всіх своїх сервісах. Нижче наведена цінова політика на момент написання статті:

- Перші 20 000 реквестів у місяць безкоштовно (Encrypt / Decrypt / Create Key / Delete Key);
- Настіпні 10 000 реквестів після використаних безкоштовних будуть коштувати 0.3 центи;
- За зберігання ключа стягуватиметься 1 долар в місяць

Сорси проекту тут

Дякую усім за увагу!


понеділок, 1 лютого 2016 р.

Apache Spark for Newbie

Вступ (Лінивим можна пропустити)

    Врамках Big Data тренінгу я почав вивчати роботу з Apach Spark computing framework. В інтернеті є достатня кількість прикладів побудови пайплайнів на різних мовах програмування (наприклад тут), проте я ніде не зміг найти хорошого гайду чи прикладу як написати проект з "нуля" і запустити його виконання на кластері. Усі що я знаходив приклади використовували локальний (тестовий) режим мастера , таким чином ніякої дистрибуції логіки не відбувалося і відчути що таке Apache Spark в повній мірі мені не вдавалося. До дня коли я прослухав тренінг людини, що працювала з цим інструментом в продакшені, саме він вніс ясність в те як Apache Spark працює (дякую тобі Тарас Матяшовський). Отже, після того як я в цьому розібрався я вирішив залишити ці знання на просторах Java User Group для всіх кому цікаво і для самого себе, щоб не забути ;-) Також, надіюся з заголовку статті зрозуміло що тут буде йтись лише про поверхневе ознайомлення з спарком і основна мета це зробити усе правильно. Тут буде використаний Stand Alone Spark Master який не потрібно використувувати в продакшені, для цого ви може використати Hadoop Yarn.

Опис задачі

    Задача взята з документації ApacheSpark - підрахунок слів у текстовому файлі. Проте зробимо це по законам жанру:
1. Дистрибутивна бібліотека (саме та яка розлетиться по воркерах кластеру і буде робити усю роботу);
2. Spark Driver (іншими словами це і буде наш клієнт через який ми доступатимемось до мастера і де ми будемо будувати наші пайплайни);
3. Web application (Наша веб аплікація - ендпоінти які ми будемо викликати щоб виконати ту чи іншу роботу, ну і щоб Spark Application UI був доступний і ми змогли подивитись статистику виконання роботи).

    Надіюся усе зрозуміло , проте якщо й ні , то далі постараюся усе розписати як найкраще. Поїхали !

Створення архітектури проектів

    Архітектура нашого проекту буде складатися з трьох модулів, а саме :
1) Distributed JAR
2) Spark Driver
3) Web Application
Загальна, архітектура зображена нижче :

    Для збірки проектів я буду використовувати Apache Maven, і почнемо з того що створемо батьківський проект який назвемо "spark-for-newbie" і додамо туди pom.xml файл. Батьківський pom.xml файл повинен включати усі вище згадані модулі :

<modules>
        <module>distributed-library</module>
        <module>web-api</module>
        <module>spark-driver</module>
</modules>

    Тепер підключаємо до нього основні залежності :
- Залежності на наші бібліотечні модулі :
<dependency>
        <groupId>org.ar.spark.newbie</groupId>
        <artifactId>spark-client</artifactId>
        <version>${project.version}</version>
</dependency>
<dependency>
        <groupId>org.ar.spark.newbie</groupId>
        <artifactId>distributed-library</artifactId>
        <version>${project.version}</version>
</dependency>

- Також залежність на останню (на момент публікації) версію Apache Spark :
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.10</artifactId>
    <version>1.6.0</version>
    <exclusions>
        <exclusion>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
        </exclusion>
    </exclusions>
</dependency>

Тепер створюємо наші модулі в середині батьківського проекту:

distributed-library
    Додаємо до цього проекту лише залежність на Apache Spark , так як ця бібліотека міститиме класи функцій з його пакету, що будуть виконуватись на різних воркерах нашого кластеру.
<dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.10</artifactId>
      <scope>provided</scope>
</dependency>

spark-driver
    Цей проект також буде бібліотекою яку ми використаємо у наступному web-api модулі. Він міститиме Spark Conext та залежність на функції, що будуть виконувати логіку нашого обчислення, тому нам потрібні наступні залежності :
<dependency>
      <groupId>org.ar.spark.newbie</groupId>
      <artifactId>distributed-library</artifactId>
</dependency>
<dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.10</artifactId>
</dependency>

web-api
    Даний модуль буде нашою основною аплікацією, він повинен містити залежність на наш Spark Driver а також на дуже цікаву бібліотеку "Java Spark", завдяки якій ми зробимо REST Endpoint всього в одну лінійку коду. Spark Java, немає нічого спілького з Big Data та Apache Spark, також вона заслуговує окремої статті щоб описати її можливості.
<dependency>
      <groupId>org.ar.spark.newbie</groupId>
      <artifactId>spark-driver</artifactId>
      <version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
      <groupId>com.sparkjava</groupId>
      <artifactId>spark-core</artifactId>
      <version>2.3</version>
</dependency>

Розробка дистрибутивної бібліотеки (distributed-library)

    Для того, щоб підрахувати кількість слів у текстовому файлі ми повинні зробити наступний пайплайн :


    Отже нам треба буде написати 4 функції, 3 map і 1 reduce а також клас в який ми будемо мапити key-value рузультати і повернемо результати з воркера назад в аплікацію.

    SeparateWordLinesFunction.java - її задача отримати на вхід об'єкт типу String котрий є вичитаною лінією тексту з текстового файлу і розбити на окремі слова , тобто повернути список об'єктів String. Для цього, клас функції робитиме наступне :

public class SeparateWordLinesFunction implements FlatMapFunction<String, String> {
  @Override
  public Iterable<String> call(String s) throws Exception {
    return Arrays.asList(s.split(" "));
  }
};

    MapWordsToKeyValueFunction.java - повинна замапити кожне слово до лічіильника, який по замовчуванню буде "1", таким чином ми отримаємо key-value структуру даних. Для того, щоб тримати таку структуру буде використано об'єкт Tuple2 з пакету Scala бібліотеки Apache Spark : 

public class MapWordsToKeyValueFunction implements PairFunction<String, String, Integer> {
  @Override
  public Tuple2<String, Integer> call(String s) throws Exception {
    return new Tuple2<String, Integer>(s, 1);
  }
};

    ReduceKeyValueWordsByKey.java - reduce операція по ключу нашої структури, тут ми повинні вирішити що будемо робити з значенням і в нашому випадку ми його просумуємо :

public class ReduceKeyValueWordsByKey implements Function2<Integer, Integer, Integer> {
  @Override
  public Integer call(Integer intVal1, Integer intVal2) throws Exception {
    return intVal1 + intVal2;
  }
}

    MapKeyValueWordsToWrapperObject.java - функція що перетворить key-value структуру в більш лояльний для подальшої роботи об'єкт :

public class MapKeyValueWordsToWrapperObject implements Function<Tuple2<String,Integer>, WordResult> {
  @Override
  public WordResult call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
    return new WordResult(stringIntegerTuple2._1, stringIntegerTuple2._2);
  }
}

   WordsResult.java - звичайний POJO об'єкт котрий містить поля для самого слова і його лічильника. Потрібно лише імплементувати інтерфейс Serializable (для надання можливості серіалізувати його при передачі по мережі) і особисто від себе я перевизначив метод toString так як буду виводи об'єкти напряму до користувача.

Розробка  Spark Driver бібліотеки (spark-driver)

    Тут ми створемо SparkContext через який отримаємо доступ до мастера і задекларуємо наш пайплайн.

    Створення  SparkContext :
Для цього необідно створити SparkConf - конфігураційних об'єкт якому ми передамо назву нашої аплікації, шлях до мастера та шлях до скомпільованого JAR файлу який Spark розповсюдить на воркери де будуть виконуватись етапи нашого пайплайну :

private String[] distributedJars = new String[]{"/<path_to_workspace>/distributed-library/target/distributed-library-1.0-SNAPSHOT.jar"};

private JavaSparkContext sparkContext;

public SparkDriver(){
    SparkConf sparkConf = new SparkConf().setAppName("SparkForNewbie")
.setJars(distributedJars).setMaster("spark://127.0.0.1:7077");
    this.sparkContext = new JavaSparkContext(sparkConf);
}

    Імплементація Pipeline :

public List<WordResult> countWordsFromFile(String filePath){
        JavaRDD<String> words = sparkContext.textFile(filePath);
        return words
            .flatMap(new SeparateWordLinesFunction())
            .mapToPair(new MapWordsToKeyValueFunction())
            .reduceByKey(new ReduceKeyValueWordsByKey())
            .map(new MapKeyValueWordsToWrapperObject())
            .collect();
}

    З імплементації пайплайну чітко видно нашу попередню схему , де я показував як він виглядатиме. Усі функції підтянуті через залежність і знаходяться в distributed-library.

Розробка Веб аплікації

    Цей модуль потрібен нам саме для взаємодії нашого проекту з кінцевим користувачем. Тут ми зробимо енд-поінт який користувач зможе викликати в браузері і запустити на виконання Spark Pipeline, проте найцікавіше в цьому модулі це використання Spark Java, за допомогою якої ми реалізуємо енд-поінт :

get("/data", (request, response) -> dataService.countWords())

    Угу, саме так, 1 рядок коду і при старті web-api Spark Java розгорне Embedded Jetty і реалізує "/data" енд-поінт.  Далі, при виклику нашого енд-поінту викликаємо сервіс який містить в собі інстанс SparkDriver класу і виконує наш пайплайн передаючи шлях до піддослідного файлу:

public class DataService {

  private static final String TEXT_FILE_PATH = "/workspace/projects/spark-for-newbie/test.txt";
  private SparkDriver sparkDriver = new SparkDriver();

  public List<WordResult> countWords(){
    return sparkDriver.countWordsFromFile(TEXT_FILE_PATH);
  }

}

    Ось і все , усі модулі нашого проекту розписані , і можна переходити до тестування.

Тестування

Запуск Apache Master (stand alone version)
    Для цього качаємо дистрибутив Apache Spark (я викачував spark-1.6.0-bin-hadoop2.6.tgz) і йдемо у папку "conf" що в середині.
Тут потрібно змінити розширення файлу "spark-env.sh.template" на "spark-env.sh", зайти в середину і додати кілька рядків для конфігурації :

export SPARK_LOCAL_IP=127.0.0.1

export SPARK_MASTER_IP=127.0.0.1
export SPARK_MASTER_PORT=7077
export SPARK_MASTER_WEBUI_PORT=8082

export SPARK_WORKER_MEMORY=2g
export SPARK_WORKER_CORES=2
export SPARK_WORKER_WEBUI_PORT=8083

(Опис усіх цих та інших можливих опцій присутні у цьомуж файлі)

    Тепер йдемо назад і переходимо у папку "sbin", тут виконуємо "./start-master.sh " і переходимо в браузері по шляху "http://localhost:8082/", тут має бути розгорутий ЮІ нашого мастера :


    Далі запускаємо воркер (так, так це не кластер адже все робиться на одній машині, але спарк про це не знає, він має все що необхідно для роботи у розподіленому режимі - мастер а мастер має воркер). Виконуємо "/start-slave.sh 127.0.0.1:7077" і переходимо по шляху "http://localhost:8083/" :

    Тепер у нас є все щоб запустити нашу аплікацію, йдемо в мейн метод модуля web-api та запускаємо його. В логах розгортання проекту ви повинні побачити як спарк розгорає ЮІ нашої аплікації :


    Тут буде відображатися найцікавіша інформація виконання нашого пайплайну.

    Переходимо по цьому шляху і повинні побачити :


    Щож, тепер усе готово, відкриваємо нову вкладку в браузері і переходимо на наш енд-поінт :

http://localhost:4567/data
    
    Якщо ж все було зроблено правильно ви повинні побачити результат нашого обчислення :



    P.S.

    Якщо ж у вас щось не вийшло або не спрацювало, ви можете знайти сорси мого проекту тут . Також ділюся з вами сорсами проекту Тараса, котрі я використовував в довідкових цілях коли знайомився з Apache Spark, вдачі !

P.P.S Стаття перенесена з JUG блогу, ось оригінальний пост.

АОР & Performance tool

Оскільки це перша стаття у моєму блозі, я вирішив скопіювати її з блогу JUG де я її виклав раніше. Ось оригінальний пост.



Що спільного в словах AOP та Performance tool ? -нічого , окрім того що задопомогою АОР ми будемо пиляти власного "валосипеда" і спробуємо ним поміря швидкодію Java аплікацій.

Отож, визначимо цілі нашого проекту :

  • Отримаувати заміри виконання методі задопомогою анотацій
  • Керувати історією викликів мотода
  • Отримувати результат в наступних форматах : String, JSON, HTML
Проект унас буде збиратися Apache Maven'ом, для імплементації АОР використаємо AspectJ а тести напишемо на JUnit.


Отже, створюємо наш проект командою : "mvn archetype:create -DgroupId=org.ar.stat4j  -DartifactId=Stat4J" з цього бидно архітектуру пакетів а також горду назву "Stat4J" (типу заявка на світове визнання 8-D ).

Додаємо залежності в pom.xml :

<dependencies>
  <dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
    <version>4.12</version>
    <scope>test</scope>
  </dependency>

  <dependency>
    <groupId>org.aspectj</groupId>
    <artifactId>aspectjrt</artifactId>
    <version>${aspectj.version}</version>
  </dependency>
</dependencies>


Створюємо пакет "org.ar.stat4j.annotations" і додаємо в в нього клас "Stat4JPoint.java", це і буде наша анотація котру ставитимемо над методом котрий будемо заміряти. Відкриваємо клас і пишемо в ньому настйпний код :

@Target(ElementType.METHOD) 
@Retention(RetentionPolicy.RUNTIME) 
public @interface Stat4JPoint {
}



@Target - місце над яким можна буде декларувати дану анотацію , у нашому випадку це методи.
@Retention - як довго дана анотація буде зберігатися над анотованим типом, у нашому випадку напротязі усього виконання аплікації.
@interface - вказує що даний клас є анотацією

Отже тепер ми маємо анотацію яка буде вказувати на методи котрі нам треба заміряти, залишилось тільки відслідковувати коли під час виконання аплікацію будуть викликатися анотовані цією анотацією методи. Для цього нам і потрібне АОР.

Створюємо пакет "org.ar.stat4j.aspects" і додаємо туди наш аспект "Stat4JAsp.aj". Даний аспект повинен спрацьовувати коли буде викликано будь який метод анотований нашою анотацією. Для цього в середену аспекту пишемо :

@Aspectpublic class Stat4JAsp 

 @Around("execution(* *(..)) &&@annotation(org.ar.stat4j.annotations.Stat4JPoint)")  public Object around(ProceedingJoinPoint point) throws Throwable {

  } 

}

@Aspect - вказує AspectJ плагіну (який ми пізніше додато до конфігурації мавена) , що даний клас є аспектом.
@Around - вказує що даний аспект має виконатись навколо методу , тобто не перед чи після а саме навко методу (іншими словами його обгортаємо) а як аргумент ми передаємо умову при якій даний аспект повинен виконатись : * - будь який ретурн тип , * - будь яка назва методу , (..) - будь яка кількість аргументів,  && - а також , @annotation(<type>) - анотація типу <type>.

Тепер у нас є анотація і аспект який її відслідковуватиме.

Залишилось лише додати плагін до bild секції в pom.xml який під час компіляції проекту зробить всю ту "магію" яка називається АОР (Обгорне анотовані методи своєю іпмлементацією). Плагін має виглядати так :

<plugin>
  <groupId>org.codehaus.mojo</groupId>
  <artifactId>aspectj-maven-plugin</artifactId>
  <version>${aj.mvn.pg.version}</version>
  <executions>
    <execution>
      <goals>
        <goal>compile</goal><!-- to weave all your main classes -->
        <goal>test-compile</goal><!-- to weave all your test classes --> 
      </goals>
    </execution>
</executions>
<configuration>
  <sources>
    <source>
      <basedir>src/main/java/org/ar/stat4j/aspects</basedir>
      <includes>
        <include>**/*.aj</include>
      </includes>
    </source>
  </sources>
  <complianceLevel>1.8</complianceLevel>
  <outxml>false</outxml>
  <verbose>true</verbose>
  <showWeaveInfo>true</showWeaveInfo>
  <source>1.8</source>
  <target>1.8</target>
</configuration></plugin>

В даній конфігурації важливо вірно вказати пакет в якому знаходяться ваші аспекти а також версію джави (у нашому випадку 1.8).

Пробуємо білдати наш проект : mvn clean install , і якщо бачимо :

[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------

тоді усе гаразд. 

Тепер давайте зробимо власне нашу утиліту яка буде відповідати за початок заміру, кінець а також вивід статистики. Створюємо в пакеті "org.ar.stat4j" класс Stat4J.java і зробимо його синглтоном щоб там де він буде використовуватись він завжди був єдиним на всю систему.

public class Stat4J { 
  private static Stat4J instance;
  private Stat4J() {} 
  public static Stat4J instance() { 
    if (instance == null) { 
      synchronized (Stat4J.class) { 
        if (instance == null) { 
          instance = new Stat4J();
        } 
      } 
    } 
    return instance;
  }
}


Тепер, маючи головний клас нашої утиліти створюємо пакет "org.ar.atat4j.data" і додаємо два класи , перший для зберігання усієї статистики , а другий для зберігання статистики конкретно по одному виклику метода :

Statistic.java

public class Statistic { 
  private List<Point> stats = new ArrayList<>();
  ...
}

Point.java

public class Point implements Comparable<Point>{ 
  public static final int NANO_IN_MILIS = 1000000;
  private long startTrack;
  private long finishTrack;
  ...
}


Тепер навчимо клас Statistic.java обраховувати інформацію про свої заміри :

(приклад наводжу на одному з методів, повна версія буде доступна за посилання в кінці поста)

public long getMaxExecutionTimeInNano(){ 
  if(!stats.isEmpty()) { 
    Collections.sort(stats);
    return stats.get(stats.size()-1).executionTimeInNanoseconds();
  } 
  return 0;
}


Маючи об'єкт статистики і всю інформацію про виклики методу додамо мапу в головний об'єкт нашої утиліти котра за іменем класу зберігатиме іншу мапу , яка в свою чергу за іменем методу зберігатиме об'єкт статистики :

Stat4J.java

...
private Map<String, Map<String, Statistic>> records;
...

Також додаємо метод який розпочинатиме та закічнуватиме заміри :

public Point startTrack(String componentName, String pointName) { 
  Point point = new Point(System.nanoTime());
  if (!records.containsKey(componentName)) { 
    Statistic statistic = new Statistic();
    statistic.getPoints().add(point);
    Map<String, Statistic> pointToStat = new HashMap<>();
    pointToStat.put(pointName, statistic);
    records.put(componentName, pointToStat);
  } else if (!records.get(componentName).containsKey(pointName)) {
    Statistic statistic = new Statistic();
    statistic.getPoints().add(point);
    records.get(componentName).put(pointName, statistic);
  } else {
    records.get(componentName).get(pointName).getPoints().add(point);
  }
  return point;
}
Перший if ініціалізує цілу групу замірів відштовхуєчись від імені класу. Другий if інішіалізує групу замірів на базі методів у випадку якщо група класу вже ініціалізована. А третій if спрацює якщо група класу і методі уже є , тоді тільки додасть новий замір.


public void stopTrack(Point point) { 
  point.finish(System.nanoTime());
}


Для зупинки заміру просто проставляємо поточну дату (час закінчення) в точку заміру .

Маючи все вище написане залишаться оновити наш аспект і в його тілі вказати що перш ніж виконати метод ми повинні розпочати замір а далі виконати метод і після цього закінчити замір:

Point statisticPoint =   Stat4J.instance().startTrack(point.getTarget().getClass().getCanonicalName(), MethodSignature.class.cast(point.getSignature()).getMethod().getName());

Object result = point.proceed();
Stat4J.instance().stopTrack(statisticPoint);

return result;

У випадку якщо наш метод нічого не повертає , ми всеодно повинні повертати Object як результат виконання методу так як на даному етапі аспект нічого про сам метод не знає.


Компілюємо аплікацію і впевнюємось що унас все зроблено вірно.

Залишилось вивестинаш результат. Для цього створюємо пакет "org.ar.stat4j.printers" і додаємо в нього клас "StringPriner,java". Він повинен приймати наші об"єкти статистики та примати аргументом параметр що буде вказувати треба нам історія викликів методу чи ні . 

Ось код цього прінтера : 

public class StringPrinter implements Printer{

  public static final String COMPONENT_OUTPUT = "%1s.%2s:\tCall times: %3s,\tMax:     %4s (ns)\t|\t%5s(ms),\tMin: %6s(ns)\t|\t%7s(ms),\tAvr: %8s(ns)\t|\t%9s(ms);\n";

  public static final String POINT_OUTPUT = "\t\t%1s:\tExecution date:  
  %2s,\tExecution time: %3s\t(ns)|\t%4s(ms);\n";

  public static final SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("MM.dd.yy   HH:mm:ss.SSS");
  @Override
  public String print(Map<String, Map<String, Statistic>> statistic, boolean history){
    final StringBuilder strBld = new StringBuilder();

    statistic.forEach((componentName, points) -> points.forEach((pointName, stats) 
    -> strBld.append(generateComponentStatistic(componentName, pointName, stats,
    history))));

   return strBld.toString();
  } 

  private String generateComponentStatistic(String componentName, String pointName,   Statistic statistic, boolean history) {

    StringBuilder strBld = new StringBuilder(); 
    strBld.append(String.format(COMPONENT_OUTPUT, componentName, pointName,
    statistic.getPointSize(), statistic.getMaxExecutionTimeInNano(),
    statistic.getMaxExecutionTimeInMili(),
    statistic.getMinExecutionTimeInNano(),
    statistic.getMinExecutionTimeInMili(),
    statistic.getAverageExecutionTimeInNano(),
    statistic.getAverageExecutionTimeInMili()));
    if (history && statistic.getPoints().size() > 1) { 
      statistic.getPoints().forEach((statPoint) ->
      strBld.append(String.format(POINT_OUTPUT,pointName,
      DATE_FORMAT.format(statPoint.getExecutionDate()),
      statPoint.executionTimeInNanoseconds(),
      statPoint.executionTimeInMiliseconds())));
    } 
    return strBld.toString(); 
  } 
}
Тепер потрібно додати метод в головний клас утиліти для виводу статистики який буде приймати аргументом тип прінтера (String, JSON, HTML) і вмикання / вимикання історії викликів :

private final StringPrinter stringPrinter = new StringPrinter();
...
  public String getStatistic(OutputFormatType outputFormatType, boolean history){ 
    switch(outputFormatType){ 
      case STRING: return stringPrinter.print(records,history);
... 
    } 
}
Після цього напишемо тестовий склас  методи якого нічого не робитимуть але зупинятимуть потік на якийсь час, і заанотуємо ці методи щоб виміряти їх час виконання :

public class PerformanceTestObject {
  @Stat4JPoint
  public void method2MS() throws InterruptedException { 
    Thread.sleep(2);
  }  
  
  @Stat4JPoint
  public void method25MS() throws InterruptedException {
    Thread.sleep(25);
  } 
  
  @Stat4JPoint
  public int method155MS() throws InterruptedException { 
    Thread.sleep(155);
    return 155;
  } 
  
  @Stat4JPoint
  public void method1Sec() throws InterruptedException { 
    Thread.sleep(1000);
  }
}



і відповідно сам тест :

public class Point4JAspTest { 
  @Test
  public void testPerformanceMeassuringStringOutput() throws InterruptedException {     PerformanceTestObject performanceTestObject = new PerformanceTestObject();     
    performanceTestObject.method2MS();
    performanceTestObject.method25MS();
    performanceTestObject.method25MS();
    performanceTestObject.method25MS(); 
    performanceTestObject.method25MS();
    performanceTestObject.method1Sec();
    performanceTestObject.method155MS(); 
    System.out.println(Stat4J.instance().
      getStatistic(Stat4J.OutputFormatType.STRING,true));
  }
}


Запускаємо та дивимось результат :

org.ar.sta4j.PerformanceTestObject.method25MS: Call times:   4, Max: 30011694(ns) |    30(ms), Min: 27759291(ns) |      27(ms), Avr: 28949879(ns) |        28(ms);
method25MS: Execution date: 03.03.76 14:58:04.321, Execution time: 27759291 (ns)|   27(ms);
method25MS: Execution date: 03.03.76 07:00:38.850, Execution time: 28609810 (ns)|   28(ms);
method25MS: Execution date: 03.02.76 14:27:54.968, Execution time: 29418723 (ns)|   29(ms);
method25MS: Execution date: 03.02.76 22:38:51.322, Execution time: 30011694 (ns)|   30(ms);
org.ar.sta4j.PerformanceTestObject.method2MS: Call times:   1, Max: 3253797(ns) |     3(ms), Min: 3253797(ns) |       3(ms), Avr:  3253797(ns) |         3(ms);
org.ar.sta4j.PerformanceTestObject.method1Sec: Call times:   1, Max: 1002833335(ns) |  1002(ms), Min: 1002833335(ns) |    1002(ms), Avr: 1002833335(ns) |      1002(ms);
org.ar.sta4j.PerformanceTestObject.method155MS: Call times:   1, Max: 162245276(ns) |   162(ms), Min: 162245276(ns) |     162(ms), Avr: 162245276(ns) |       162(ms);


Ось і все. Прінтери для JSON та HTML формату не став тут викладати заради економії часу проте все чого тут не вистачає ви зможене знайти тут.


Очікую ваші відгуки та пропозиції щодо даної утиліти.

П.С. Надіюсь було цікаво.