以下内容来自 尚硅谷,写这一系列的文章,主要是为了方便后续自己的查看,不用带着个PDF找来找去的,太麻烦!

第 11 章 JAVA操作InfluxDB

1、InfluxDB客户端可以参考:https://github.com/influxdata/influxdb-client-java

11.1 创建一个maven项目

1、这里我创建了一个名为java4influx的maven项目

11.2 导入maven依赖

1、在pom.xml里加入如下依赖。

<dependencies> <dependency> <groupId>com.influxdb</groupId> <artifactId>influxdb-client-java</artifactId> <version>6.5.0</version> </dependency></dependencies>

刷新一下maven,下载依赖。

11.3 创建一个package

1、在src/main/java下创建一个package。这里名为com.atguigu.influxdb.client。最后,项目结构如图所示

11.4 示例:查看InfluxDB健康状态

11.4.1 创建ExampleHealthy类

1、如图所示

11.4.2 创建客户端对象

1、influxdb-client-java内部其实封装的是各种HTTP请求,所以 token,org什么 HTTPAPI上需要的东西,在创建客户端的时候都需要考虑。


2、从上图可以看到InfluxDBClientFactory.create方法其实有多种重载。这是因为不同的接口它需要的权限和操作的范围不同。比如一个读写权限的token,它只能对某个存储桶进行操作,那么建立连接时就应该指定bucket,也就是使用下图的重载。


3、但如果你用的是操作员token,希望完成一些创建组织,删除用户的操作,那么就不应该在创建连接时指定存储桶。此时,应该使用下图所示的重载。


4、不过、检查InfluxDB的健康状态不需要任何权限和token。此时,我们只需指定一个URL,那就可以使用下图所示的重载了。


5、我这里是在Ubuntu上演示,目标URL是http://localhost:8086 。所以最终代码如下。

InfluxDBClient对象就是我们的客户端对象。

InfluxDBClient可以返回各种Api对象。

InfluxDBClient influxDBClient =InfluxDBClientFactory.create("http://localhost:8086");

6、如下图所示。这体现了java对InfluxDB HTTP API的封装。

11.4.3 调用API

1、一些简单的api,也可以通过InfluxDBClient对象直接调用。比如我们的检查InfluxDB健康状态,就可以直接调用InfluxDBClient对象的ping方法。如下所示。

System.out.println(influxDBClient.ping());

11.4.4 运行

1、ping方法会返回一个布尔值。如果InfluxDB可以ping通,那么就会得到true,否则返回flase,并记录一条失败日志。

11.4.5 补充

1、在之前的版本,有一个测试InfluxDB是否健康的API叫做health,不过现在这个接口已经被标记为废弃。health方法返回一个HealthCheck对象,相对而言,对这个对象的处理比直接处理布尔值要麻烦很多。在以后的版本,提倡用ping方法检查健康状态。

11.5 示例:查询InfluxDB中的数据

11.5.1 创建一个JAVA类

1、在com.atguigu.influxdb.client下创建一个新的java类,ExampleQuery。

11.5.2 加入一个main方法

1、稍后main方法里面会写我们的查询逻辑。

11.5.3 创建InfluxDB客户端对象

1、这次我们要操作InfluxDB中具体存储桶的数据,建立连接时,推荐选择图中的重载方法。

2、这个方法需要 4 个参数。

参数说明
urlInfluxDB服务的URL,在老师这里就是http://localhost:8086。
token授权的token,而且类型还必须得是char[ ]。
org指定要访问的组织。
bucket要访问的存储桶。

3、现在,我们在ExampleQuery下声明 4 个静态变量。如下图所示:

11.5.4 获取查询API对象

1、使用InfluxDBClient对象点一下,可以看到InfluxDBClient其实提供了两种API。这也是为了兼容性来考虑的。InfluxQLQueryAPI是InfluxDB2.x做的向前兼容。这里我们选择第一个方法,也就是getQueryApi,这意味着我们使用v2 api进行查询。

11.5.5 了解查询API

1、概括性地说,QueryApi对象下有两个方法query和queryRaw。


2、两个方法都需要传入一个FLUX脚本作为查询语句。但主要的不同点在于返回的结果上。

  • queryRaw 方法返回API中的CSV格式数据(String类型)。
  • query 方法视图将查询后的结果封装为各种对象(可以自己指定也可以使用influxdb-client-java提供的FluxTable)。

3、两个方法各有很多不同的实现,其中一大部分是用来制定连接参数的,比如你创建连接对象的时候没有制定org和bucket,那么可以延迟到调用具体api的时候再指定。

11.5.6 query

1、现在,我们先用query方法去查询一下InfluxDB中的数据,现在我们要查询 test_init 存储桶最近 2 分钟的数据。代码如下:

List query = queryApi.query("from(bucket:\"test_init\") |> range(start:-2m)");


2、我们的查询结果List 其实对应了FLUX查询语言中的表流概念。我们可以打印一下query变量。

3、现在,我们只取第一个FluxTable,看看里面有什么。

4、之前我们讲FLUX的时候,有讲过表流和groupKey之间的关系。现在可以打印一下groupKey,看看里面有什么东西。
代码如下:


5、输出结果如下:


6、这其实可以说明我们的整个表流是以_start,_stop,_field,_measurement 4 列为groupKey分组的结果。现在,我们可以尝试打印一下数据。代码如下:


7、结果如下:

8、剩下的功能大家可以自己探索。

11.5.7 queryRaw

1、我在这里创建了一个新类,叫ExampleQueryRaw。代码复制的都是ExampleQuery的。唯一不同的地方就是把queryApi.query改为了queryApi.queryRaw。同时,query变量的类型也从List变成了String

2、现在,我们将查询的结果打印一下。


3、可以看到,我们打印出了CSV格式的数据,这是因为InfluxDB HTTP API本来在请求体中放的就是CSV格式的数据。所以QueryRaw方法其实就是返回原始的CSV。

11.6 同步写和异步写的区别

1、同步写,就是当我调用写入方法时,立刻向InfluxDB发起一个请求,将数据传送过去,而且当前线程会一直阻塞等待写入操作完成。

2、异步写,其实是我调用写入方法的时候,先不执行写入这个操作,而是将数据放入一个缓冲区。当缓冲区满了,我再真正地将数据发送给InfluxDB,这样相当于实现了一个攒批的效果。在后面的示例中,我会向建议先在InfluxDB中创建一个名为example_java的存储桶,再了解后面的写入示例。

11.7 示例:同步写入InfluxDB

11.7.1 创建一个类

1、这次我们创建一个名为ExampleWriteSync的类,org,bucket,url,token什么的还是复制之前例子中的,但是此处我们把bucket改为example_java。总体代码如下:

public class ExampleWriteSync {private static String org = "atguigu";private static String bucket = "example_java";private static String url = "http://localhost:8086";private static char[] token = "ZA8uWTSRFflhKhFvNW4TcZwwvd2NHFW1YIVlcj9Am5iJ4ueHawWh49_jszoKybEymHqgR5mAWg4XMv4tb9TP3w==".toCharArray();public static void main(String[] args) {InfluxDBClient influxDBClient = InfluxDBClientFactory.create(url, token, org, bucket);}}

11.7.2 获取API对象

1、我们可以看到,InfluxDBClient 上有多种方法获取写操作 API 对象。其中WriteApiBlocking是同步写入,WriteApi是异步写入。


2、我们现在先使用getWriteApiBlocking方法获取同步写入的API。

11.7.3 有哪些写入方法

1、简单归纳,写入API给用户提供了 3 类方法写入数据。另外,这三类方法都有与之对应的带s后缀的版本,表示可以一次写入多条。

项目Value
writeMeasuremen用户可以写入自己的POJO类
writePointinfluxdb-client-java提供了一个Point类,用户可以将一条条数据封装为一个个Point,写入InfluxDB。
writeRecord用户可以用符合InfluxDB行协议的字符串向InfluxDB写入数据。

11.7.4 通过Point对象写入InfluxDB

11.7.4.1 构建Point对象

1、使用下面的代码创建一个point对象。

Point point = Point.measurement("temperature").addTag("location", "west").addField("value", 55D).time(Instant.now(), WritePrecision.MS);

2、这是典型的构造器设计模式,measurement是一个静态方法,它会帮我们 new一个Point。addTag和addField不再解释。最终的time,我们通过第二个参数指定写入时间戳的精度。这里是将写入的时间精度确定为了毫秒,如果你传入了一个纳秒时间戳,但精度指明了毫秒,那超出毫秒的部分会被直接截断。

11.7.4.2 将point写出

1、使用下面的代码,直接将point写到InfluxDB中。记得在此之前创建example_java存储桶。

writeApiBlocking.writePoint(point);
11.7.4.3 验证写入结果

1、执行程序后,在InfluxDB DataExplorer查看example_java里面有没有新的数据,并将它展示出来。

11.7.5 通过行协议写入InfluxDB

11.7.5.1 注释上一个示例的代码

1、现在,我们将前面通过Point写数据的代码注释掉。

11.7.5.2 编写代码

1、在main方法中追下下面的代码

此处我们在行协议中省略时间戳,让InfluxDB自动帮我们把时间补上。

writeApiBlocking.writeRecord(WritePrecision.NS,"temperature,location=west value=60.0");
11.7.5.3 验证写入结果

1、运行代码后,一样还是去InfluxDB上查看数据。如图所示,第二条数据已经成功进入InfluxDB了。

11.7.6 通过POJO类写入InfluxDB

11.7.6.1 注释上一个示例的代码

1、同样,我们还是先注释掉上一次用InfluxDB行协议写入数据的代码。

11.7.6.2 添加一个静态内部类
private static class Temperature {}
11.7.6.3 @Measurement注解

1、给静态内部类加一个注解。

@Measuremet注解必须加到类上,表示这个类对应InfluxDB中的哪个测量名称。

@Measurement(name = "temperature")private static class Temperature {}
11.7.6.4 添加成员变量
@Measurement(name = "temperature")private static class Temperature {String location;Double value;Instant time;} 
11.7.6.5 @Column注解

1、@Column注解只能用在成员变量上。

2、 @Column有 4 种实现,如下图所示。


3、你可以将一个成员变量指定为tag、measurement、timestamp还是field。最终代码如下:

@Measurement(name = "temperature")private static class Temperature {@Column(tag = true)String location;@ColumnDouble value;@Column(timestamp = true)Instant time;}
11.7.6.6 创建一个Temperature对象并给其属性赋值

1、代码如下

Temperature temperature = new Temperature();temperature.location = "west";temperature.value = 40D;temperature.time = Instant.now();
11.7.6.7 写到InfluxDB

现在,我们将这个POJO类的对象写到InfluxDB

writeApiBlocking.writeMeasurement(WritePrecision.NS,temperature);

最终的代码如下图所示:

11.7.6.8 验证写入效果

1、运行main方法,去DataExplorer上查看输出效果。如图所示,数据已经成功进入InfluxDB。

11.8 示例:异步写入InfluxDB

11.8.1 创建一个类

1、创建一个名为ExampleWriteAsync的类,一样还是复用我们之前的org、bucket、url和token。并创建InfluxDBClient对象,基础代码如下图所示。

11.8.2 获取API对象

1、可以看到getWriteApi方法已经被标记为弃用了

2、现在鼓励使用的是makeWriteApi方法。实际上,在目前版本(2.4.0),getWriteApi的内部实现已经是直接调用makeWriteApi了。

11.8.3 编写写入代码

1、和之前的同步写入一样,writeApi对象也有writeRecord、writePoint、wirteMeasurement多种写入方法,这里不再赘述。这里,我们只用最简单的writeRecord方法插入一条数据,把代码跑通。

writeApi.writeRecord(WritePrecision.NS,"temperature,location=north value=60.0");

11.8.4 验证写入结果(写入失败)

1、此时,我们运行的代码如下图所示。


2、在Web UI上打开Data Explorer查看写入结果。可以看到,这里没有出现我们刚才的数据,这表示我们刚才的写入失败了.

3、但是,我们的java程序没有报错。这是因为WriteApi会使用一个守护线程,帮我们管理缓冲区,它会在缓冲区满或者距离上次写出数据过 1 秒时将数据写出去。我们刚才就放了一条数据,缓冲区没满、write方法调用完程序就立刻退出了,所以后台线程压根就没有做写的操作。

11.8.5 修改代码

1、现在,有两种方式让守护线程执行写的操作。

  • 手动触发缓冲区刷写
 writeApi.flush();
  • 关闭InfluxDBClient
influxDBClient.close();

2、这里,我们先用第二种。修改后的代码如下

3、运行,之后去Data Explorer上查看结果。

11.8.6 验证写入结果(写入成功)

1、如果能看到location Tag上出现的north标签,而且能够查出来一条数据,那么写入操作就成功了!

11.8.7 小结:异步写入工作逻辑

  • writeApi里有一个缓冲区,这个缓冲区的大小默认是 10000 条数据。
  • 虽然有缓冲区但是writeApi写出数据并不是一次把整个缓冲区都写出去,而是按照批次(默认是 1000 条)的单位来写。
  • 当产生被压或者写入失败时,守护线程会自动重试写入数据。

11.8.8 异步写入的配置

1、异步攒批的操作的守护线程隐式进行的,好在它的行为我们可以进行具体的配置。


2、influxdb-client-java为我们提供了一个WriteOption对象,调用makeWriteApi时可以传入这个对象,通过上图的提示我们可以看到,缓冲区的大小,批的大小,刷写的间隔我们都是可以进行明确指定的。

11.8.9 默认配置

11.9 兼容V1 Api

1、这里只做简单的介绍。

2、使用InfluxDBClientFactory创建Client对象时,调用createV1方法。这个时候你获取的就是兼容V1 Api 的Client对象。