以下内容来自 尚硅谷,写这一系列的文章,主要是为了方便后续自己的查看,不用带着个PDF找来找去的,太麻烦!
第 11 章 JAVA操作InfluxDB
1、InfluxDB客户端可以参考:https://github.com/influxdata/influxdb-client-java
11.1 创建一个maven项目
1、这里我创建了一个名为java4influx的maven项目
11.2 导入maven依赖
1、在pom.xml里加入如下依赖。
<dependencies> <dependency> <groupId>com.influxdb</groupId> <artifactId>influxdb-client-java</artifactId> <version>6.5.0</version> </dependency></dependencies>
刷新一下maven,下载依赖。
11.3 创建一个package
1、在src/main/java下创建一个package。这里名为com.atguigu.influxdb.client。最后,项目结构如图所示
11.4 示例:查看InfluxDB健康状态
11.4.1 创建ExampleHealthy类
1、如图所示
11.4.2 创建客户端对象
1、influxdb-client-java内部其实封装的是各种HTTP请求,所以 token,org什么 HTTPAPI上需要的东西,在创建客户端的时候都需要考虑。
2、从上图可以看到InfluxDBClientFactory.create方法其实有多种重载。这是因为不同的接口它需要的权限和操作的范围不同。比如一个读写权限的token,它只能对某个存储桶进行操作,那么建立连接时就应该指定bucket,也就是使用下图的重载。
3、但如果你用的是操作员token,希望完成一些创建组织,删除用户的操作,那么就不应该在创建连接时指定存储桶。此时,应该使用下图所示的重载。
4、不过、检查InfluxDB的健康状态不需要任何权限和token。此时,我们只需指定一个URL,那就可以使用下图所示的重载了。
5、我这里是在Ubuntu上演示,目标URL是http://localhost:8086 。所以最终代码如下。
InfluxDBClient对象就是我们的客户端对象。
InfluxDBClient可以返回各种Api对象。
InfluxDBClient influxDBClient =InfluxDBClientFactory.create("http://localhost:8086");
6、如下图所示。这体现了java对InfluxDB HTTP API的封装。
11.4.3 调用API
1、一些简单的api,也可以通过InfluxDBClient对象直接调用。比如我们的检查InfluxDB健康状态,就可以直接调用InfluxDBClient对象的ping方法。如下所示。
System.out.println(influxDBClient.ping());
11.4.4 运行
1、ping方法会返回一个布尔值。如果InfluxDB可以ping通,那么就会得到true,否则返回flase,并记录一条失败日志。
11.4.5 补充
1、在之前的版本,有一个测试InfluxDB是否健康的API叫做health,不过现在这个接口已经被标记为废弃。health方法返回一个HealthCheck对象,相对而言,对这个对象的处理比直接处理布尔值要麻烦很多。在以后的版本,提倡用ping方法检查健康状态。
11.5 示例:查询InfluxDB中的数据
11.5.1 创建一个JAVA类
1、在com.atguigu.influxdb.client下创建一个新的java类,ExampleQuery。
11.5.2 加入一个main方法
1、稍后main方法里面会写我们的查询逻辑。
11.5.3 创建InfluxDB客户端对象
1、这次我们要操作InfluxDB中具体存储桶的数据,建立连接时,推荐选择图中的重载方法。
2、这个方法需要 4 个参数。
参数 | 说明 |
---|---|
url | InfluxDB服务的URL,在老师这里就是http://localhost:8086。 |
token | 授权的token,而且类型还必须得是char[ ]。 |
org | 指定要访问的组织。 |
bucket | 要访问的存储桶。 |
3、现在,我们在ExampleQuery下声明 4 个静态变量。如下图所示:
11.5.4 获取查询API对象
1、使用InfluxDBClient对象点一下,可以看到InfluxDBClient其实提供了两种API。这也是为了兼容性来考虑的。InfluxQLQueryAPI是InfluxDB2.x做的向前兼容。这里我们选择第一个方法,也就是getQueryApi,这意味着我们使用v2 api进行查询。
11.5.5 了解查询API
1、概括性地说,QueryApi对象下有两个方法query和queryRaw。
2、两个方法都需要传入一个FLUX脚本作为查询语句。但主要的不同点在于返回的结果上。
- queryRaw 方法返回API中的CSV格式数据(String类型)。
- query 方法视图将查询后的结果封装为各种对象(可以自己指定也可以使用influxdb-client-java提供的FluxTable)。
3、两个方法各有很多不同的实现,其中一大部分是用来制定连接参数的,比如你创建连接对象的时候没有制定org和bucket,那么可以延迟到调用具体api的时候再指定。
11.5.6 query
1、现在,我们先用query方法去查询一下InfluxDB中的数据,现在我们要查询 test_init 存储桶最近 2 分钟的数据。代码如下:
List query = queryApi.query("from(bucket:\"test_init\") |> range(start:-2m)");
2、我们的查询结果List 其实对应了FLUX查询语言中的表流概念。我们可以打印一下query变量。
3、现在,我们只取第一个FluxTable,看看里面有什么。
4、之前我们讲FLUX的时候,有讲过表流和groupKey之间的关系。现在可以打印一下groupKey,看看里面有什么东西。
代码如下:
5、输出结果如下:
6、这其实可以说明我们的整个表流是以_start,_stop,_field,_measurement 4 列为groupKey分组的结果。现在,我们可以尝试打印一下数据。代码如下:
7、结果如下:
8、剩下的功能大家可以自己探索。
11.5.7 queryRaw
1、我在这里创建了一个新类,叫ExampleQueryRaw。代码复制的都是ExampleQuery的。唯一不同的地方就是把queryApi.query改为了queryApi.queryRaw。同时,query变量的类型也从List变成了String
2、现在,我们将查询的结果打印一下。
3、可以看到,我们打印出了CSV格式的数据,这是因为InfluxDB HTTP API本来在请求体中放的就是CSV格式的数据。所以QueryRaw方法其实就是返回原始的CSV。
11.6 同步写和异步写的区别
1、同步写,就是当我调用写入方法时,立刻向InfluxDB发起一个请求,将数据传送过去,而且当前线程会一直阻塞等待写入操作完成。
2、异步写,其实是我调用写入方法的时候,先不执行写入这个操作,而是将数据放入一个缓冲区。当缓冲区满了,我再真正地将数据发送给InfluxDB,这样相当于实现了一个攒批的效果。在后面的示例中,我会向建议先在InfluxDB中创建一个名为example_java的存储桶,再了解后面的写入示例。
11.7 示例:同步写入InfluxDB
11.7.1 创建一个类
1、这次我们创建一个名为ExampleWriteSync的类,org,bucket,url,token什么的还是复制之前例子中的,但是此处我们把bucket改为example_java。总体代码如下:
public class ExampleWriteSync {private static String org = "atguigu";private static String bucket = "example_java";private static String url = "http://localhost:8086";private static char[] token = "ZA8uWTSRFflhKhFvNW4TcZwwvd2NHFW1YIVlcj9Am5iJ4ueHawWh49_jszoKybEymHqgR5mAWg4XMv4tb9TP3w==".toCharArray();public static void main(String[] args) {InfluxDBClient influxDBClient = InfluxDBClientFactory.create(url, token, org, bucket);}}
11.7.2 获取API对象
1、我们可以看到,InfluxDBClient 上有多种方法获取写操作 API 对象。其中WriteApiBlocking是同步写入,WriteApi是异步写入。
2、我们现在先使用getWriteApiBlocking方法获取同步写入的API。
11.7.3 有哪些写入方法
1、简单归纳,写入API给用户提供了 3 类方法写入数据。另外,这三类方法都有与之对应的带s后缀的版本,表示可以一次写入多条。
项目 | Value |
---|---|
writeMeasuremen | 用户可以写入自己的POJO类 |
writePoint | influxdb-client-java提供了一个Point类,用户可以将一条条数据封装为一个个Point,写入InfluxDB。 |
writeRecord | 用户可以用符合InfluxDB行协议的字符串向InfluxDB写入数据。 |
11.7.4 通过Point对象写入InfluxDB
11.7.4.1 构建Point对象
1、使用下面的代码创建一个point对象。
Point point = Point.measurement("temperature").addTag("location", "west").addField("value", 55D).time(Instant.now(), WritePrecision.MS);
2、这是典型的构造器设计模式,measurement是一个静态方法,它会帮我们 new一个Point。addTag和addField不再解释。最终的time,我们通过第二个参数指定写入时间戳的精度。这里是将写入的时间精度确定为了毫秒,如果你传入了一个纳秒时间戳,但精度指明了毫秒,那超出毫秒的部分会被直接截断。
11.7.4.2 将point写出
1、使用下面的代码,直接将point写到InfluxDB中。记得在此之前创建example_java存储桶。
writeApiBlocking.writePoint(point);
11.7.4.3 验证写入结果
1、执行程序后,在InfluxDB DataExplorer查看example_java里面有没有新的数据,并将它展示出来。
11.7.5 通过行协议写入InfluxDB
11.7.5.1 注释上一个示例的代码
1、现在,我们将前面通过Point写数据的代码注释掉。
11.7.5.2 编写代码
1、在main方法中追下下面的代码
此处我们在行协议中省略时间戳,让InfluxDB自动帮我们把时间补上。
writeApiBlocking.writeRecord(WritePrecision.NS,"temperature,location=west value=60.0");
11.7.5.3 验证写入结果
1、运行代码后,一样还是去InfluxDB上查看数据。如图所示,第二条数据已经成功进入InfluxDB了。
11.7.6 通过POJO类写入InfluxDB
11.7.6.1 注释上一个示例的代码
1、同样,我们还是先注释掉上一次用InfluxDB行协议写入数据的代码。
11.7.6.2 添加一个静态内部类
private static class Temperature {}
11.7.6.3 @Measurement注解
1、给静态内部类加一个注解。
@Measuremet注解必须加到类上,表示这个类对应InfluxDB中的哪个测量名称。
@Measurement(name = "temperature")private static class Temperature {}
11.7.6.4 添加成员变量
@Measurement(name = "temperature")private static class Temperature {String location;Double value;Instant time;}
11.7.6.5 @Column注解
1、@Column注解只能用在成员变量上。
2、 @Column有 4 种实现,如下图所示。
3、你可以将一个成员变量指定为tag、measurement、timestamp还是field。最终代码如下:
@Measurement(name = "temperature")private static class Temperature {@Column(tag = true)String location;@ColumnDouble value;@Column(timestamp = true)Instant time;}
11.7.6.6 创建一个Temperature对象并给其属性赋值
1、代码如下
Temperature temperature = new Temperature();temperature.location = "west";temperature.value = 40D;temperature.time = Instant.now();
11.7.6.7 写到InfluxDB
现在,我们将这个POJO类的对象写到InfluxDB
writeApiBlocking.writeMeasurement(WritePrecision.NS,temperature);
最终的代码如下图所示:
11.7.6.8 验证写入效果
1、运行main方法,去DataExplorer上查看输出效果。如图所示,数据已经成功进入InfluxDB。
11.8 示例:异步写入InfluxDB
11.8.1 创建一个类
1、创建一个名为ExampleWriteAsync的类,一样还是复用我们之前的org、bucket、url和token。并创建InfluxDBClient对象,基础代码如下图所示。
11.8.2 获取API对象
1、可以看到getWriteApi方法已经被标记为弃用了
2、现在鼓励使用的是makeWriteApi方法。实际上,在目前版本(2.4.0),getWriteApi的内部实现已经是直接调用makeWriteApi了。
11.8.3 编写写入代码
1、和之前的同步写入一样,writeApi对象也有writeRecord、writePoint、wirteMeasurement多种写入方法,这里不再赘述。这里,我们只用最简单的writeRecord方法插入一条数据,把代码跑通。
writeApi.writeRecord(WritePrecision.NS,"temperature,location=north value=60.0");
11.8.4 验证写入结果(写入失败)
1、此时,我们运行的代码如下图所示。
2、在Web UI上打开Data Explorer查看写入结果。可以看到,这里没有出现我们刚才的数据,这表示我们刚才的写入失败了.
3、但是,我们的java程序没有报错。这是因为WriteApi会使用一个守护线程,帮我们管理缓冲区,它会在缓冲区满或者距离上次写出数据过 1 秒时将数据写出去。我们刚才就放了一条数据,缓冲区没满、write方法调用完程序就立刻退出了,所以后台线程压根就没有做写的操作。
11.8.5 修改代码
1、现在,有两种方式让守护线程执行写的操作。
- 手动触发缓冲区刷写
writeApi.flush();
- 关闭InfluxDBClient
influxDBClient.close();
2、这里,我们先用第二种。修改后的代码如下
3、运行,之后去Data Explorer上查看结果。
11.8.6 验证写入结果(写入成功)
1、如果能看到location Tag上出现的north标签,而且能够查出来一条数据,那么写入操作就成功了!
11.8.7 小结:异步写入工作逻辑
- writeApi里有一个缓冲区,这个缓冲区的大小默认是 10000 条数据。
- 虽然有缓冲区但是writeApi写出数据并不是一次把整个缓冲区都写出去,而是按照批次(默认是 1000 条)的单位来写。
- 当产生被压或者写入失败时,守护线程会自动重试写入数据。
11.8.8 异步写入的配置
1、异步攒批的操作的守护线程隐式进行的,好在它的行为我们可以进行具体的配置。
2、influxdb-client-java为我们提供了一个WriteOption对象,调用makeWriteApi时可以传入这个对象,通过上图的提示我们可以看到,缓冲区的大小,批的大小,刷写的间隔我们都是可以进行明确指定的。
11.8.9 默认配置
11.9 兼容V1 Api
1、这里只做简单的介绍。
2、使用InfluxDBClientFactory创建Client对象时,调用createV1方法。这个时候你获取的就是兼容V1 Api 的Client对象。