Flink 配置自定义的日志输出 logback.xml

maven 配置

    <properties>
        <flink-version>1.12.2</flink-version>
        <logback.version>1.2.3</logback.version>
        <slf4j.version>1.7.25</slf4j.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink-version}</version>
            <exclusions>
                <!-- 屏蔽自带的log4j -->
                <exclusion>
                    <groupId>log4j</groupId>
                    <artifactId>*</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.12</artifactId>
            <version>${flink-version}</version>
            <exclusions>
                <!-- 屏蔽自带的log4j -->
                <exclusion>
                    <groupId>log4j</groupId>
                    <artifactId>*</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>${flink-version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.12</artifactId>
            <version>${flink-version}</version>
        </dependency>
        
        <!-- 需要 -->
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-core</artifactId>
            <version>${logback.version}</version>
        </dependency>
        <!-- 需要 -->
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>${logback.version}</version>
        </dependency>
        <!-- 需要 -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>log4j-over-slf4j</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        
    </dependencies>

    <build>
        <plugins>
            <!-- 指定jdk版本 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.6.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

logback.xml 文件配置

<?xml version="1.0" encoding="UTF-8"?>
<configuration>

    <contextName>logback</contextName>

    <conversionRule conversionWord="clr" converterClass="org.springframework.boot.logging.logback.ColorConverter" />
    <conversionRule conversionWord="wex" converterClass="org.springframework.boot.logging.logback.WhitespaceThrowableProxyConverter" />
    <conversionRule conversionWord="wEx" converterClass="org.springframework.boot.logging.logback.ExtendedWhitespaceThrowableProxyConverter" />

    <!-- 格式配置 -->
    <property name="CONSOLE_LOG_PATTERN" value="${CONSOLE_LOG_PATTERN:-%clr(%d{yyyy-MM-dd HH:mm:ss.SSS}){faint} %clr(${LOG_LEVEL_PATTERN:-%5p}) %clr(${PID:- }){magenta} %clr(---){faint} %clr([%15.15t]){faint} %clr(%-40.40logger{39}){cyan} %clr(:){faint} %m%n${LOG_EXCEPTION_CONVERSION_WORD:-%wEx}}"/>

    <!-- 控制台配置 -->
    <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
        <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
            <level>debug</level>
        </filter>
        <encoder>
            <Pattern>${CONSOLE_LOG_PATTERN}</Pattern>
            <charset>UTF-8</charset>
        </encoder>
    </appender>

    <!-- 只让自己的程序以DEBUG级别输出日志 -->
    <logger name="com.giant.RtCalc" level="DEBUG" />

    <!-- 限制控制台只接受 error级别的输出,会屏蔽flink所带的或其他包的输出信息 -->
    <root level="error">
        <appender-ref ref="CONSOLE" />
    </root>

</configuration>

一旦这样写了之后,flink启动的所有日志信息都无法看到,只有当报错的时候才能看到,我们也可以自定义输出到的位置,以上是输出到console,输出还可以是到mysql、kafka、redis等,用于做flink程序的日志收集。以上的配置后 DataStream 的 print 就无法正常使用,需要自定义 sink 来替代 print 函数

自定义print

package com.giant.RtCalc.sink;

import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;

@Slf4j
@Data
public class PrintSink<IN> implements SinkFunction<IN> {

    private String str = "print";

    public PrintSink(String str) {
        this.str = str;
    }

    public PrintSink() {
    }

    @Override
    public void invoke(IN value, Context context) throws Exception {
        log.debug(str+"->{}",value);
        SinkFunction.super.invoke(value, context);
    }
}

Flink 配置自定义的日志输出 logback.xml

用 addSink 来取代 DataStream 的 print 功能。

这种方式虽然可以打印自己喜爱的日志格式,但是存在的问题是 提交到 flink-web 会报错,他只认 sl4j

LoggerFactory is not a Logback LoggerContext but Logback is on the classpath. Either remove Logback or the competing implementation (class org.apache.logging.slf4j.Log4jLoggerFactory loaded from file:/opt/software/flink-1.12.2/lib/log4j-slf4j-impl-2.12.1.jar). If you are using WebLogic you will need to add  org.slf4j  to prefer-application-packages in WEB-INF/weblogic.xml: org.apache.logging.slf4j.Log4jLoggerFactory

官方文档说用一下方式解决

Flink 配置自定义的日志输出 logback.xml

© 版权声明
THE END
如果内容对您有所帮助,就支持一下吧!
点赞0 分享
评论 抢沙发

请登录后发表评论

    暂无评论内容