Для чего нужны конвейеры в graylog и как их симулировать
Одна из самых полезных (и возможно самых сложных) функций в graylog - это pipelines или конвейеры (или трубопроводы, как кому больше нравится). Конвейеры позволяют как угодно обрабатывать поступающие в graylog сообщения - отбрасывать ненужные, приводить к нужному формату дату и время, добавлять статические новые поля или производить модификацию содержимого полей. Возможности практически бескрайние и ограничены только фантазией или производственными нуждами.
Немного про синтаксис
Синтаксис правил конвейера очень прост.
rule "descriptive_name"
when
...
then
...
end
Как видно, требуется придумать описательное имя правила, прописать одно или несколько условий срабатывания правила, а затем указать что собственно требуется выполнить.
Список доступных фукнций и типов данных, которые они принимают есть в документации (https://docs.graylog.org/en/3.0/pages/pipelines/functions.html#function-index).
Пишем код
Теперь можно переходить к примерам.Для этого выбираем System / Pipelines и идем в "Manage Rules", где и создаем первое правило.
rule "flag windows messages"
when
has_field("source") AND
to_string($message.source) == "WIN-SRV"
then
set_field("win2019_message", true);
end
when
has_field("source") AND
to_string($message.source) == "WIN-SRV"
then
set_field("win2019_message", true);
end
В этом правиле вначале идет проверка на наличие поля "source" И определенного содержимого этого поля. Затем если условие истинно, создается новое поле "win2019_message". Таким нехитрым способом можно создавать что-то вроде тэгов для того чтобы помечать для дальшего использования (или сортировки) часть приходящих сообщений.
Следующее правило будет чуть сложнее:
rule "remove and clean winlogbeat"
when
has_field("win2019_message") and
has_field("winlogbeat_event_data_IpAddress") and
in_private_net(to_string($message.winlogbeat_event_data_IpAddress))
then
set_field("RFC1918", "true");
set_field("src_ip", to_string($message.winlogbeat_event_data_IpAddress));
remove_field("winlogbeat_event_data_IpAddress");
remove_field("winlogbeat_beats_version");
end
when
has_field("win2019_message") and
has_field("winlogbeat_event_data_IpAddress") and
in_private_net(to_string($message.winlogbeat_event_data_IpAddress))
then
set_field("RFC1918", "true");
set_field("src_ip", to_string($message.winlogbeat_event_data_IpAddress));
remove_field("winlogbeat_event_data_IpAddress");
remove_field("winlogbeat_beats_version");
end
Здесь в начале идет тройная проверка - наличие ранее выставленной метки, наличие определенного поля, а так же принадлежность содержимого этого поля к частным ip-сетям (функция in_private_net()).
Если сообщение будет удовлетворять этом условиям, то будет созданы два новых поля и удалены два старых поля. Можно заметить, что при создании нового поля "src_ip" используется содержимое другого поля.
Ну и создадим третье правило, которое будет менять содержимого полей:
rule "change SubStatus"
when
cidr_match("192.168.10.0/24", to_ip($message.src_ip)) AND
has_field("winlogbeat_event_data_SubStatus") AND
contains(to_string($message.winlogbeat_event_id), "4625")
then
//Change code to text
set_field("winlogbeat_event_data_SubStatus", "The username is misspelled or does not exist");
end
when
cidr_match("192.168.10.0/24", to_ip($message.src_ip)) AND
has_field("winlogbeat_event_data_SubStatus") AND
contains(to_string($message.winlogbeat_event_id), "4625")
then
//Change code to text
set_field("winlogbeat_event_data_SubStatus", "The username is misspelled or does not exist");
end
На этот раз тоже проверка состоит из трех условий, используется встроенная функция cidr_match(), которая в свою очередь умеет проверять пренадлежит ли ip-адрес указанной подсети.
И в случае успеха, значение поля "winlogbeat_event_data_SubStatus" будет изменено с человеконечитаемого кода, на вполне понятный текст.
Кстати, в этом правиле можно увидеть и использование комментариев, это очень важно для неочевидных правил в конвейерах.
Запускаем линию
Отлично, правила готовы, осталось создать конвейер, который и будет их использовать.Для этого нужно перейти в "Manage Pipelines" и нажать "Add new pipeline".
Имя и описание могут быть любыми, главное же в том, что после создания конвейера нужно во-первых подключить его к потоку и в различные этапы (stages) добавить существующие правила.
Нумерация этапов может быть как 0-1-2-3 так и 100-200-300, лишь бы вам это было понятно и удобно.
В мое случае получилось как на скриншоте ниже:
Видно, что правила распределены по этапам последовательно, как мы их и создавали.
Graylog позволяет добавлять множество правил и в один этап, но это не рекомендуется и может привести к проседанию производительности.
Пусконаладочные работы
Правила написаны, конвейер создан, хотелось бы убедиться, что всё работает как задумано. Или же наоборот, видно что сообщения не обрабатываются и требуется выяснить, где закралась ошибка.Для этого в разделе "System/Pipelines" есть "Simulator".
Основная проблема симулятора в том, что ему нужны для работы только raw (сырые) сообщения. И не совсем понятно где их взять или как их сформировать.
К счастью есть нативный формат GELF (Graylog Extended Log Format), который представляет из себя что-то вроде параметров с разделителем в виде двоеточия, и всё сообщение целиком нужно обрамить фигурными скобками.
Пример такого сообщения:
{"message":"[UFW BLOCK] IN=eth0 SRC=1.1.2.2","host":"c1","severity":"warning","facility":"kern"}
Обладая этим знанием, не составит труда скомпоновать сообщение для недавно созданного конвейера:
Естественно, нужно убедиться что выбран правильный поток и кодек установлен в GELF.
Жмем "Load message" и если всё правильно получаем следующий результат:
Слева сообщение которое в сыром виде было собрано вручную, а справа результат симуляции прохода через все правила и этапы конвейера.
Наглядно видно, какие поля были добавлены, удалены или модифицированы. Если же интересует какие этапы прошло (или не прошло) сообщение, то можно здесь же нажать кнопку "More results" и выбрать пункт "Simulation trace".
Немного дебага бонусом
И в завершении хотелось бы немного затронуть встроенную функцию debug().Это может быть полезно, например во время удаления сообщений.
Правило вида:
let debug_message = concat("Dropped message from ",
to_string($message.source)
);
debug(debug_message);
to_string($message.source)
);
debug(debug_message);
Приведет к образованию записи
2019-07-21T14:37:59.400Z INFO [Function] PIPELINE DEBUG: Dropped message from WIN-SRV
в лог файле /var/log/graylog-server/server.log.
Комментариев нет:
Отправить комментарий