快捷搜索: 王者荣耀 脱发

Flink 1.14将数据写入InfluxDB 2.1.1

InfluxDB作为时序数据库,在与时间相关的数据记录中,发挥着巨大的作用。下文以flink为例,通过参考Flink第三方扩展(https://github.com/apache/bahir-flink/tree/master/flink-connector-influxdb2).

自定义source将数据写入influxDB 2.1.1中。

在完成以下工作时,请确保您已经安装并配置了InfluxDB 2.1.1,如果您还未安装配置,可参考以下文章(https://lrting-top.blog..net/article/details/122270992):

代码修改

当前版本的 bahir-flink对influxdb的支持为2.0.0,如果直接使用该版本,则会出现认证不通过的情况,此时需要修改部分代码,使用token的认证方式。

具体为,InfluxDBSinkBuilder类中的getInfluxDBClient方法,修改为:

public static InfluxDBClient getInfluxDBClient(final Configuration configuration) {
        final String url = configuration.getString(INFLUXDB_URL);
        final String bucket = configuration.getString(INFLUXDB_BUCKET);
        final String organization = configuration.getString(INFLUXDB_ORGANIZATION);
        final String token = configuration.getString(INFLUXDB_TOKEN);
        final InfluxDBClientOptions influxDBClientOptions =
                InfluxDBClientOptions.builder()
                        .url(url)
                        .authenticateToken(token.toCharArray())
                        .bucket(bucket)
                        .org(organization)
                        .build();
        return InfluxDBClientFactory.create(influxDBClientOptions);
    }
public static InfluxDBClient getInfluxDBClient(final Configuration configuration) { final String url = configuration.getString(INFLUXDB_URL); final String bucket = configuration.getString(INFLUXDB_BUCKET); final String organization = configuration.getString(INFLUXDB_ORGANIZATION); final String token = configuration.getString(INFLUXDB_TOKEN); final InfluxDBClientOptions influxDBClientOptions = InfluxDBClientOptions.builder() .url(url) .authenticateToken(token.toCharArray()) .bucket(bucket) .org(organization) .build(); return InfluxDBClientFactory.create(influxDBClientOptions); }

完整代码可参考(https://git.lrting.top/xiaozhch5/drfix):

经验分享 程序员 微信小程序 职场和发展