華為云計(jì)算 云知識 使用Flume采集器上報(bào)日志到LTS
使用Flume采集器上報(bào)日志到LTS

Flume是一個高可用的,高可靠的,分布式的海量日志采集、聚合和傳輸?shù)南到y(tǒng),F(xiàn)lume支持在日志系統(tǒng)中定制各類數(shù)據(jù)發(fā)送方,用于收集數(shù)據(jù);同時,F(xiàn)lume提供對數(shù)據(jù)進(jìn)行簡單處理,并寫到各種數(shù)據(jù)接受方的能力。

用戶使用Flume系統(tǒng)采集日志,并且通過 LTS 側(cè)提供的KAFKA協(xié)議方式上報(bào)日志。以下是部分常用數(shù)據(jù)采集場景示例:

  1. 使用Flume采集文本日志上報(bào)到LTS
  2. 使用Flume采集 數(shù)據(jù)庫 表數(shù)據(jù)并且上報(bào)至LTS
  3. 使用Flume采集syslog協(xié)議傳輸?shù)娜罩旧蠄?bào)到LTS
  4. 通過Flume采集TCP/UDP協(xié)議傳輸?shù)娜罩旧蠄?bào)到LTS
  5. 通過Flume采集SNMP協(xié)議上報(bào)的設(shè)備管理數(shù)據(jù)并發(fā)送到LTS
  6. 使用默認(rèn)攔截器處理日志
  7. 自定義攔截器處理日志
  8. 使用外部數(shù)據(jù)源豐富日志內(nèi)容并上報(bào)到LTS

前提條件

用戶機(jī)器已經(jīng)安裝了JDK。

用戶已經(jīng)安裝Flume,并且需要在Flume中配置文件中配置JDK路徑。

使用Flume采集文本日志上報(bào)到LTS

支持使用Flume采集文本日志內(nèi)容上報(bào)至LTS,參考如下示例添加采集文本日志的conf文件。以下示例中的參數(shù)介紹請參考使用KAFKA協(xié)議上報(bào)日志

#Named
a1.sources = r1
a1.channels = c1
a1.sinks = k1

#Source
a1.sources.r1.type = TAILDIR
a1.sources.r1.channels = c1
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /tmp/test.txt
a1.sources.r1.fileHeader = true
a1.sources.r1.maxBatchCount = 1000

#Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100

#Sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId}
a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port}
a1.sinks.k1.kafka.producer.acks = 0
a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT
a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN
a1.sinks.k1.kafka.producer.compression.type = gzip
a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}";

#Bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
使用Flume采集數(shù)據(jù)庫表數(shù)據(jù)并且上報(bào)至LTS

使用Flume采集數(shù)據(jù)庫表數(shù)據(jù)并且上報(bào)至LTS,實(shí)現(xiàn)對表數(shù)據(jù)變動監(jiān)控。以下示例中的參數(shù)介紹請參考使用KAFKA協(xié)議上報(bào)日志。

  1. https://github.com/keedio/flume-ng-sql-source頁面下載flume-ng-sql-source插件,轉(zhuǎn)換為jar包并取名為flume-ng-sql-source.jar,打包前注意將pom文件中的flume-ng-core 版本與flume安裝版本保持一致,并且將jar包放在安裝Flume包路徑的lib目錄下面,例如FLUME_HOME/lib目錄下(例子中的FLUME_HOME為Flume安裝路徑,僅供參考,請以實(shí)際安裝路徑為準(zhǔn))。
    1. 下載MySQL驅(qū)動。
      wget https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-5.1.35.tar.gz
    2. 將驅(qū)動包解壓并打?yàn)閖ar包。
      tar xzf mysql-connector-java-5.1.35.tar.gz
    3. 將jar包存放在FLUME_HOME/lib/路徑。
      cp mysql-connector-java-5.1.35-bin.jar  FLUME_HOME/lib/
       
      添加MySQL驅(qū)動到FLUME_HOME/lib目錄下:

       

  2. 添加采集MySQL的conf文件。

     

    # a1表示agent的名稱
    # source是a1的輸入源
    # channels是緩沖區(qū)
    # sinks是a1輸出目的地,本例子sinks使用了kafka
    a1.channels = c1
    a1.sources = r1
    a1.sinks = k1
    
    #source
    a1.sources.r1.type = org.keedio.flume.source.SQLSource
    # 連接mysql的一系列操作,{mysql_host}改為您虛擬機(jī)的 ip地址 ,可以通過ifconfig或者ip addr查看,{database_name}改為數(shù)據(jù)庫名稱
    # url中要加入?useUnicode=true&characterEncoding=utf-8&useSSL=false,否則有可能連接失敗
    a1.sources.r1.hibernate.connection.url = jdbc:mysql://{mysql_host}:3306/{database_name}?useUnicode=true&characterEncoding=utf-8&useSSL=false
    # Hibernate Database connection properties
    # mysql賬號,一般都是root
    a1.sources.r1.hibernate.connection.user = root
    # 填入您的mysql密碼
    a1.sources.r1.hibernate.connection.password = xxxxxxxx
    a1.sources.r1.hibernate.connection.autocommit = true
    # mysql驅(qū)動
    a1.sources.r1.hibernate.dialect = org.hibernate.dialect.MySQL5Dialect
    a1.sources.r1.hibernate.connection.driver_class = com.mysql.jdbc.Driver
    # 存放status文件
    a1.sources.r1.status.file.path = FLUME_HOME/bin
    a1.sources.r1.status.file.name = sqlSource.status
    # Custom query
    # 填寫需要采集的數(shù)據(jù)表名{table_name},也可以使用下面的方法:
    a1.sources.r1.custom.query = select * from {table_name}
    
    
    #Sink
    a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId}
    a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port}
    a1.sinks.k1.kafka.producer.acks = 0
    a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT
    a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN
    a1.sinks.k1.kafka.producer.compression.type = gzip
    a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}";
    
    
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 10000
    a1.channels.c1.transactionCapacity = 10000
    a1.channels.c1.byteCapacityBufferPercentage = 20
    a1.channels.c1.byteCapacity = 800000
  3. 啟動Flume后,即可開始采集數(shù)據(jù)庫中的表數(shù)據(jù)到LTS。

使用Flume采集syslog協(xié)議傳輸?shù)娜罩旧蠄?bào)到LTS

Syslog協(xié)議是一種用于在IP網(wǎng)絡(luò)中傳輸日志消息的協(xié)議,通過Flume將syslog協(xié)議傳輸?shù)娜罩静杉⑸蠄?bào)到LTS。以下示例中的參數(shù)介紹請參考使用KAFKA協(xié)議上報(bào)日志。

  • 接收UDP日志,參考如下示例添加采集Syslog協(xié)議的conf文件。
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    a1.sources.r1.type=syslogudp
    #host_port為syslog服務(wù)器的端口
    a1.sources.r1.port = {host_port}
    #host_ip為syslog服務(wù)器的ip地址
    a1.sources.r1.host = {host_ip}
    a1.sources.r1.channels = c1
    
    a1.channels.c1.type = memory
    
    #Sink
    a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId}
    a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port}
    a1.sinks.k1.kafka.producer.acks = 0
    a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT
    a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN
    a1.sinks.k1.kafka.producer.compression.type = gzip
    a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}";
    a1.sinks.k1.channel = c1
  • 接收TCP日志,參考如下示例添加采集Syslog協(xié)議的conf文件。
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    a1.sources.r1.type=syslogtcp
    #host_port為syslog服務(wù)器的端口
    a1.sources.r1.port = {host_port}
    #host_ip為syslog服務(wù)器的ip地址
    a1.sources.r1.host = {host_ip}
    a1.sources.r1.channels = c1
    
    a1.channels.c1.type = memory
    
    #Sink
    a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId}
    a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port}
    a1.sinks.k1.kafka.producer.acks = 0
    a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT
    a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN
    a1.sinks.k1.kafka.producer.compression.type = gzip
    a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}";
    a1.sinks.k1.channel = c1

通過Flume采集TCP/UDP協(xié)議傳輸?shù)娜罩旧蠄?bào)到LTS

通過Flume采集TCP/UDP協(xié)議傳輸?shù)娜罩旧蠄?bào)到LTS。以下示例中的參數(shù)介紹請參考使用KAFKA協(xié)議上報(bào)日志。

  • 采集TCP端口日志,參考如下示例添加采集端口的conf文件。
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = 0.0.0.0
    a1.sources.r1.port = {host_port}
    
    a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId}
    a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port}
    a1.sinks.k1.kafka.producer.acks = 0
    a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT
    a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN
    a1.sinks.k1.kafka.producer.compression.type = gzip
    a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}";
    
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
  • 采集UDP端口日志,參考如下示例添加采集端口的conf文件。
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    a1.sources.r1.type = netcatudp
    a1.sources.r1.bind = 0.0.0.0
    a1.sources.r1.port = {host_port}
    
    a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId}
    a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port}
    a1.sinks.k1.kafka.producer.acks = 0
    a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT
    a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN
    a1.sinks.k1.kafka.producer.compression.type = gzip
    a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}";
    
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1

通過Flume采集SNMP協(xié)議上報(bào)的設(shè)備管理數(shù)據(jù)并發(fā)送到LTS

通過Flume采集SNMP協(xié)議上報(bào)的設(shè)備管理數(shù)據(jù)并發(fā)送到LTS。以下示例中的參數(shù)介紹請參考使用KAFKA協(xié)議上報(bào)日志。

  • 監(jiān)聽SNMP協(xié)議通信端口號161。參考如下示例添加SNMP協(xié)議接受日志的conf。
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    a1.sources.r1.type = netcatudp
    a1.sources.r1.bind = 0.0.0.0
    a1.sources.r1.port = 161
    
    a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId}
    a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port}
    a1.sinks.k1.kafka.producer.acks = 0
    a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT
    a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN
    a1.sinks.k1.kafka.producer.compression.type = gzip
    a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}";
    
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
  • 監(jiān)聽SNMP協(xié)議陷阱(Trap)通信的端口號162,參考如下示例添加SNMP協(xié)議接受日志的conf。
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    
    a1.sources.r1.type = netcatudp
    a1.sources.r1.bind = 0.0.0.0
    a1.sources.r1.port = 162
    
    
    a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId}
    a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port}
    a1.sinks.k1.kafka.producer.acks = 0
    a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT
    a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN
    a1.sinks.k1.kafka.producer.compression.type = gzip
    a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}";
    
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1

使用默認(rèn)攔截器處理日志

使用Flume采集器時,攔截器是簡單的插件式組件,設(shè)置在Source和Channel之間。Source接收到的事件Event,在寫入Channel之前,攔截器都可以進(jìn)行轉(zhuǎn)換或者刪除這些事件。每個攔截器只處理同一個Source接收到的事件。

  • 時間戳攔截器

    該攔截器的作用是將時間戳插入到flume的事件報(bào)頭中。如果不使用任何攔截器,flume接收到的只有message。時間戳攔截器的配置, 參數(shù)默認(rèn)值描述type,類型名稱timestamp,也可以使用類名的全路徑preserveExisting為false。如果設(shè)置為true,若事件中報(bào)頭已經(jīng)存在,不會替換時間戳報(bào)頭的值。source連接到時間戳攔截器的配置:

    a1.sources.r1.interceptors = timestamp 
    a1.sources.r1.interceptors.timestamp.type=timestamp 
    a1.sources.r1.interceptors.timestamp.preserveExisting=false
     
  • 正則過濾攔截器

    在日志采集的時候,可能有一些數(shù)據(jù)是不需要的,添加過濾攔截器可以過濾掉不需要的日志,也可以根據(jù)需要收集滿足正則條件的日志。參數(shù)默認(rèn)值描述type,類型名稱REGEX_FILTER。excludeEvents為false時默認(rèn)收集匹配到的事件。如果為true,則會刪除匹配到的event,收集未匹配到的。source連接到正則過濾攔截器的配置:

    a1.sources.r1.interceptors = regex 
    a1.sources.r1.interceptors.regex.type=REGEX_FILTER 
    a1.sources.r1.interceptors.regex.regex=(today)|(Monday) 
    a1.sources.r1.interceptors.regex.excludeEvents=false
     

    這樣配置的攔截器就只會接收日志消息中帶有today或者M(jìn)onday的日志。

  • 搜索并替換攔截器

    攔截器基于Java正則表達(dá)式提供簡單的基于字符串的搜索和替換功能。配置如下:

    # 攔截器別名
    a1.sources.r1.interceptors = search-replace
    # 攔截器類型,必須是search_replace
    a1.sources.r1.interceptors.search-replace.type = search_replace
    
    #刪除事件正文中的字符,根據(jù)正則匹配event內(nèi)容
    a1.sources.r1.interceptors.search-replace.searchPattern = today
    # 替換匹配到的event內(nèi)容
    a1.sources.r1.interceptors.search-replace.replaceString = yesterday
    # 設(shè)置字符集,默認(rèn)是utf8
    a1.sources.r1.interceptors.search-replace.charset = utf8

自定義攔截器處理日志

在Flume中自定義攔截器的方式主要流程如下(以java語言為例),以下示例中的FLUME_HOME表示Flume的安裝路徑,例如/tools/flume(僅供參考),實(shí)際配置的時候,請以用戶安裝Flume的實(shí)際路徑為準(zhǔn)。

  1. 創(chuàng)建MAVEN工程項(xiàng)目,引入Flume依賴。

     

    根據(jù)集群中的 Flume 版本,引入 Flume 依賴,如下所示:

    <dependencies>
            <dependency>
                <groupId>org.apache.flume</groupId>
                <artifactId>flume-ng-core</artifactId>
                <version>1.10.1</version>
                <scope>provided</scope>
            </dependency>
        </dependencies>

    無需將該依賴打包進(jìn)最后的JAR包中,故將其作用域設(shè)置為provided。

     

  2. 創(chuàng)建類實(shí)現(xiàn)攔截器接口Interceptor,并且實(shí)現(xiàn)相關(guān)方法。

     

    • initialize() 方法:初始化攔截器操作,讀取配置信息、建立連接等。
    • intercept(Event event) 方法:用于攔截單個事件,并對事件進(jìn)行處理。接收一個事件對象作為輸入,并返回一個修改后的事件對象。
    • intercept(List<Event> list) 方法:事件批處理,攔截事件列表,并對事件列表進(jìn)行處理。
    • close() 方法:關(guān)閉攔截器,在這里釋放資源、關(guān)閉連接等。
    import org.apache.flume.Event;
    import org.apache.flume.interceptor.Interceptor;
    
    import java.nio.charset.StandardCharsets;
    import java.util.ArrayList;
    import java.util.List;
    
    public class TestInterceptor implements Interceptor {
        @Override
        public void initialize() {
    
        }
        @Override
        public Event intercept(Event event) {
    
            // 獲取事件數(shù)據(jù)
            String eventData = new String(event.getBody(), StandardCharsets.UTF_8);
            // 檢查事件數(shù)據(jù)中是否包含指定字符串
            if (eventData.contains("hello")) {
                // 如果包含指定字符串,則過濾掉該事件,返回 null
                return null;
            }
    
            return event;
        }
    
        @Override
        public List<Event> intercept(List<Event> events) {
            // 創(chuàng)建一個新的列表,存儲處理過后的事件
            List<Event> interceptedEvents = new ArrayList<>();
            for (Event event : events) {
                Event interceptedEvent = intercept(event);
                if (interceptedEvent != null) {
                    interceptedEvents.add(interceptedEvent);
                }
            }
            return interceptedEvents;
        }
    
        @Override
        public void close() {
    
        }
    
    }
  3. 構(gòu)建攔截器,攔截器的創(chuàng)建和配置通常是通過 Builder 模式來完成的,完整的代碼如下所示:

     

    import org.apache.flume.Context;
    import org.apache.flume.Event;
    import org.apache.flume.interceptor.Interceptor;
    
    import java.nio.charset.StandardCharsets;
    import java.util.ArrayList;
    import java.util.List;
    
    public class TestInterceptor implements Interceptor {
        @Override
        public void initialize() {
        }
        @Override
        public Event intercept(Event event) {
            // 獲取事件數(shù)據(jù)
            String eventData = new String(event.getBody(), StandardCharsets.UTF_8);
            // 檢查事件數(shù)據(jù)中是否包含指定字符串
            if (eventData.contains("hello")) {
                // 如果包含指定字符串,則過濾掉該事件,返回 null
                return null;
            }
            return event;
        }
        @Override
        public List<Event> intercept(List<Event> events) {
            List<Event> interceptedEvents = new ArrayList<>();
            for (Event event : events) {
                Event interceptedEvent = intercept(event);
                if (interceptedEvent != null) {
                    interceptedEvents.add(interceptedEvent);
                }
            }
            return interceptedEvents;
        }
    
        @Override
        public void close() {
    
        }
    
        // 攔截器構(gòu)建
        public static class Builder implements Interceptor.Builder {
    
            @Override
            public void configure(Context context) {
    
            }
    
            @Override
            public Interceptor build() {
                return new TestInterceptor();
            }
    
        }
    
    }
  4. 轉(zhuǎn)換為jar包,并且將其上傳至Flume安裝路徑下的lib文件夾下(請以用戶安裝Flume的實(shí)際路徑為準(zhǔn))。
  5. 編寫配置文件,需要將自定義的攔截器配置進(jìn)去。

     

    攔截器全類名配置時需要注意,格式為攔截器的全類名 + $Builder。

    # 攔截器配置
    # 攔截器定義
    a1.sources.r1.interceptors = i1
    # 攔截器全類名
    a1.sources.r1.interceptors.i1.type = TestInterceptor$Builder
  6. 運(yùn)行Flume即可。
KV解析日志:用空格分隔字符串并且轉(zhuǎn)換為Map類型字符串。
public class TestInterceptor implements Interceptor {
    @Override
    public void initialize() {
    }

    @Override
    public Event intercept(Event event) {
        // 獲取事件數(shù)據(jù)
        String eventData = new String(event.getBody(), StandardCharsets.UTF_8);
        Map<String, Object> splitMap = new HashMap<>();
        String[] splitList = eventData.split(" ");
        for (int i = 0; i < splitList.length; i++) {
            splitMap.put("field" + i, splitList[i].trim());
        }
        eventData.setBody(splitMap.toString().getBytes(StandardCharsets.UTF_8));
        return event;
    }

    @Override
    public List<Event> intercept(List<Event> events) {
        List<Event> interceptedEvents = new ArrayList<>();
        for (Event event : events) {
            Event interceptedEvent = intercept(event);
            if (interceptedEvent != null) {
                interceptedEvents.add(interceptedEvent);
            }
        }
        return interceptedEvents;
    }

    @Override
    public void close() {
    }
}

使用外部數(shù)據(jù)源豐富日志內(nèi)容并上報(bào)到LTS

Flume數(shù)據(jù)傳輸?shù)幕締卧訣vent的形式將數(shù)據(jù)從源頭傳輸至目的地。Event由Header和Body兩部分組成,Header用來存放該Event的一些屬性,為K-V結(jié)構(gòu),Body用來存放該條數(shù)據(jù),形式為字節(jié)數(shù)組。

有外部數(shù)據(jù)源時,如果您需要豐富日志內(nèi)容,例如修改日志內(nèi)容、添加字段、刪除內(nèi)容等操作,將修改內(nèi)容添加至Event的body中,F(xiàn)lume才能將日志內(nèi)容上報(bào)到LTS。例如使用Java自定義擴(kuò)展日志內(nèi)容。以下示例中的參數(shù)介紹請參考使用KAFKA協(xié)議上報(bào)日志

import com.alibaba.fastjson.JSONObject;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;

public class TestInterceptor implements Interceptor {
    @Override
    public void initialize() {
    }

    @Override
    public Event intercept(Event event) {
        // 獲取事件數(shù)據(jù),將原數(shù)據(jù)轉(zhuǎn)換為json字符串并且添加額外字段
        String eventData = new String(event.getBody(), StandardCharsets.UTF_8);
        JSONObject object = new JSONObject();
        object.put("content", eventData);
        object.put("workLoadType", "RelipcaSet");
        eventData = object.toJSONString();
        eventData.setBody(eventData.getBytes(StandardCharsets.UTF_8));
        return event;
    }

    @Override
    public List<Event> intercept(List<Event> events) {
        List<Event> interceptedEvents = new ArrayList<>();
        for (Event event : events) {
            Event interceptedEvent = intercept(event);
            if (interceptedEvent != null) {
                interceptedEvents.add(interceptedEvent);
            }
        }
        return interceptedEvents;
    }

    @Override
    public void close() {
    }
}