воскресенье, 21 июля 2019 г.

graylog - отладка правил pipeline

Для чего нужны конвейеры в graylog и как их симулировать




Одна из самых полезных (и возможно самых сложных) функций в graylog - это pipelines или конвейеры (или трубопроводы, как кому больше нравится). Конвейеры позволяют как угодно обрабатывать поступающие в graylog сообщения - отбрасывать ненужные, приводить к нужному формату дату и время, добавлять статические новые поля или производить модификацию содержимого полей. Возможности практически бескрайние и ограничены только фантазией или производственными нуждами.



Немного про синтаксис


Синтаксис правил конвейера очень прост.


rule "descriptive_name"
when
       ...
then
       ...
end


Как видно, требуется придумать описательное имя правила, прописать одно или несколько условий срабатывания правила, а затем указать что собственно требуется выполнить.
Список доступных фукнций и типов данных, которые они принимают есть в документации (https://docs.graylog.org/en/3.0/pages/pipelines/functions.html#function-index).

Пишем код

Теперь можно переходить к примерам.
Для этого выбираем System / Pipelines и идем в "Manage Rules", где и создаем первое правило.

rule "flag windows messages"
when
    has_field("source") AND
    to_string($message.source) == "WIN-SRV"
then
    set_field("win2019_message", true);
end


В этом правиле вначале идет проверка на наличие поля "source" И определенного содержимого этого поля. Затем если условие истинно, создается новое поле "win2019_message". Таким нехитрым способом можно создавать что-то вроде тэгов для того чтобы помечать для дальшего использования (или сортировки) часть приходящих сообщений.

Следующее правило будет чуть сложнее:

rule "remove and clean winlogbeat"
when
    has_field("win2019_message") and
    has_field("winlogbeat_event_data_IpAddress") and
    in_private_net(to_string($message.winlogbeat_event_data_IpAddress))
then
    set_field("RFC1918", "true");
    set_field("src_ip", to_string($message.winlogbeat_event_data_IpAddress));
    remove_field("winlogbeat_event_data_IpAddress");
    remove_field("winlogbeat_beats_version");
end

Здесь в начале идет тройная проверка - наличие ранее выставленной метки, наличие определенного поля, а так же принадлежность содержимого этого поля к частным ip-сетям (функция in_private_net()).
Если сообщение будет удовлетворять этом условиям, то будет созданы два новых поля и удалены два старых поля. Можно заметить, что при создании нового поля "src_ip" используется содержимое другого поля.

Ну и создадим третье правило, которое будет менять содержимого полей:

rule "change SubStatus"
when
    cidr_match("192.168.10.0/24", to_ip($message.src_ip)) AND
    has_field("winlogbeat_event_data_SubStatus") AND
    contains(to_string($message.winlogbeat_event_id), "4625")
then

//Change code to text
 set_field("winlogbeat_event_data_SubStatus", "The username is misspelled or does not exist");
end

На этот раз тоже проверка состоит из трех условий, используется встроенная функция cidr_match(), которая в свою очередь умеет проверять пренадлежит ли ip-адрес указанной подсети.
И в случае успеха, значение поля "winlogbeat_event_data_SubStatus" будет изменено с человеконечитаемого кода, на вполне понятный текст.
Кстати, в этом правиле можно увидеть и использование комментариев, это очень важно для неочевидных правил в конвейерах.

Запускаем линию

Отлично, правила готовы, осталось создать конвейер, который и будет их использовать.
Для этого нужно перейти в "Manage Pipelines" и нажать "Add new pipeline".
Имя и описание могут быть любыми, главное же в том, что после создания конвейера нужно во-первых подключить его к потоку и в различные этапы (stages) добавить существующие правила.
Нумерация этапов может быть как 0-1-2-3 так и 100-200-300, лишь бы вам это было понятно и удобно.
В мое случае получилось как на скриншоте ниже:


Видно, что правила распределены по этапам последовательно, как мы их и создавали.
Graylog позволяет добавлять множество правил и в один этап, но это не рекомендуется и может привести к проседанию производительности.

Пусконаладочные работы

Правила написаны, конвейер создан, хотелось бы убедиться, что всё работает как задумано. Или же наоборот, видно что сообщения не обрабатываются и требуется выяснить, где закралась ошибка.
Для этого в разделе "System/Pipelines" есть "Simulator".
Основная проблема симулятора в том, что ему нужны для работы только raw (сырые) сообщения. И не совсем понятно где их взять или как их сформировать.
К счастью есть нативный формат GELF (Graylog Extended Log Format), который представляет из себя что-то вроде параметров с разделителем в виде двоеточия, и всё сообщение целиком нужно обрамить фигурными скобками.
Пример такого сообщения:

{"message":"[UFW BLOCK] IN=eth0 SRC=1.1.2.2","host":"c1","severity":"warning","facility":"kern"}


Обладая этим знанием, не составит труда скомпоновать сообщение для недавно созданного конвейера:

Естественно, нужно убедиться что выбран правильный поток и кодек установлен в GELF.
Жмем "Load message" и если всё правильно получаем следующий результат:


Слева сообщение которое в сыром виде было собрано вручную, а справа результат симуляции прохода через все правила и этапы конвейера.
Наглядно видно, какие поля были добавлены, удалены или модифицированы. Если же интересует какие этапы прошло (или не прошло) сообщение, то можно здесь же нажать кнопку "More results" и выбрать пункт "Simulation trace".

Немного дебага бонусом

И в завершении хотелось бы немного затронуть встроенную функцию debug().
Это может быть полезно, например во время удаления сообщений.
Правило вида:

let debug_message = concat("Dropped message from ",
                          to_string($message.source)
                          );
debug(debug_message);

Приведет к образованию записи
2019-07-21T14:37:59.400Z INFO  [Function] PIPELINE DEBUG: Dropped message from WIN-SRV

в лог файле /var/log/graylog-server/server.log.

Комментариев нет:

Отправить комментарий