kudu1.8没有提供Linux客户端可供使用,所以只能通过api进行测试。

pom引入kudu-client

<dependency>
    <groupId>org.apache.kudu</groupId>
    <artifactId>kudu-client</artifactId>
    <version>1.8.0</version>
</dependency>

java代码操作

package com.catcher92.demo.kududemo;

import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Schema;
import org.apache.kudu.Type;
import org.apache.kudu.client.*;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;

public class KuduDemo {

    public static void main(String[] args) {
        // 获取客户端
        KuduClient client = new KuduClient.KuduClientBuilder("hadoop07").build();
        try {
            List<String> tabletServersList = client.listTabletServers().getTabletServersList();
            // 列出tabletServer
            if (null != tabletServersList) {
                tabletServersList.forEach(System.out::println);
            }
            String tableName = "test";
            if (client.tableExists(tableName)) {
                // 删除表
                client.deleteTable(tableName);
            }
            // 新建表
            // create table student
            // (grade int, name string, age int, primary key (grade, name))
            // partition by hash(name) partitions 4,
            // range (grade) (partition value = 1, partition value = 2, partition value = 3, partition value = 4, partition value = 5, partition value = 6)
            // tblproperties
            // ('kudu.num_tablet_replicas' = '1')
            List<ColumnSchema> columnSchemas = new ArrayList<>(3);
            columnSchemas.add(new ColumnSchema.ColumnSchemaBuilder("grade", Type.INT32).key(true).build());
            columnSchemas.add(new ColumnSchema.ColumnSchemaBuilder("name", Type.STRING).key(true).build());
            columnSchemas.add(new ColumnSchema.ColumnSchemaBuilder("age", Type.INT32).build());
            Schema schema = new Schema(columnSchemas);
            CreateTableOptions builder = new CreateTableOptions();
            // hash分区(4个分区)
            builder.addHashPartitions(Collections.singletonList("name"), 4);
            // range分区(7个分区)
            // 最终分区是4*7=28个
            int[] keys = new int[]{1, 2, 3, 4, 5, 6};
            for (int key : keys) {
                PartialRow partialRow = schema.newPartialRow();
                partialRow.addInt("grade", key);
                builder.addSplitRow(partialRow);
            }
            builder.setRangePartitionColumns(Collections.singletonList("grade"));
            // 副本
            builder.setNumReplicas(1);
            client.createTable(tableName, schema, builder);

            // 查看表列表
            List<String> tablesList = client.getTablesList().getTablesList();
            if (null != tablesList) {
                tablesList.forEach(System.out::println);
            }
            // 获取表
            KuduTable kuduTable = client.openTable(tableName);
            String column = "remark";
            List<ColumnSchema> list = kuduTable.getSchema().getColumns().stream().filter(x -> x.getName().equals(column)).collect(Collectors.toList());
            if (list.isEmpty()) {
                // 修改表
                AlterTableOptions alterTableOptions = new AlterTableOptions();
                alterTableOptions.addNullableColumn(column, Type.STRING);
                AlterTableResponse alterTableResponse = client.alterTable(tableName, alterTableOptions);
                System.out.println(alterTableResponse.getTsUUID());
                // 变更schema后需要重新回去表否则拿不到新的schema
                kuduTable = client.openTable(tableName);
            }
            // 插入数据
            KuduSession insertSession = client.newSession();
            Insert insert = kuduTable.newInsert();
            insert.getRow().addString("name", "catcher92");
            insert.getRow().addInt("grade", 3);
            insert.getRow().addInt("age", 9);
            insert.getRow().addString("remark", "备注....");
            OperationResponse insertResult = insertSession.apply(insert);
            if (!insertResult.hasRowError()) {
                System.out.println("插入完成");
            } else {
                System.out.println("插入出错");
            }
            // 查找数据
            KuduScanner kuduScanner = client.newScannerBuilder(kuduTable)
                    .addPredicate(KuduPredicate.newComparisonPredicate(new ColumnSchema.ColumnSchemaBuilder("name", Type.STRING).build(), KuduPredicate.ComparisonOp.EQUAL, "catcher92"))
                    .setProjectedColumnNames(Arrays.asList("grade", "name", "remark", "age"))
                    .limit(1)
                    .build();
            while (kuduScanner.hasMoreRows()) {
                kuduScanner.nextRows().forEach(rowResult -> {
                    System.out.println("name=" + rowResult.getString("name"));
                    System.out.println("age=" + rowResult.getInt("age"));
                    System.out.println("grade=" + rowResult.getInt("grade"));
                    System.out.println("remark=" + rowResult.getString("remark"));
                });
            }
            // 删除数据
            KuduSession deleteSession = client.newSession();
            Delete delete = kuduTable.newDelete();
            delete.getRow().addString("name", "catcher92");
            delete.getRow().addInt("grade", 3);
            OperationResponse deleteResult = deleteSession.apply(delete);
            if (!deleteResult.hasRowError()) {
                System.out.println("删除完成");
            } else {
                System.out.println("删除出错");
            }
        } catch (KuduException e) {
            e.printStackTrace();
        } finally {
            try {
                client.shutdown();
            } catch (KuduException e) {
                e.printStackTrace();
            }
        }
    }

}

 通过浏览器页面看到的表信息如下: 

 

Logo

更多推荐