Bitcoin

Handling JSON, Schema Issues, and Doris Quirks in an Apache SeaTunnel Pipeline

Due to the need to integrate data from different data sources into our data warehouse for our projects, we chose Apache SeaTunnel among many options (Comparison Reference).

Currently, the interface we are using does not require authentication. If authentication is needed in the future, we will discuss and test that as well.

Actual Usage

Apache SeaTunnel Version: 2.3.4

Without further ado, here is the final configuration file. Since I am using thejson``rest-apisubmission method, the result is shown below:

The difference between usingrestandconflies in the job execution environment. TheconfusesClientJobExecutionEnvironment(also supports JSON format upon testing), while therestusesRestJobExecutionEnvironment.

Data Format Returned by the Interface

{
  "code": "0000",
  "msg": "Success",
  "data": {
    "records": \[
      {
        "id": "1798895733824393218",
        "taskContent": "License02",
        "taskType": "License"
      }
    \]
  }
}
// The actual data is paginated; the above is a sample.

Integration Configuration

{
  "env": {
    "job.mode": "BATCH",
    "job.name": "SeaTunnel_Job"
  },
  "source": \[
    {
      "result\_table\_name": "Table13367210156032",
      "plugin_name": "Http",
      "url": "http://*.*.*.*:*/day\_plan\_repair/page",
      "method": "GET",  
      "format": "json",
      "json_field": {   
        "id": "$.data.records\[*\].id",
        "taskContent": "$.data.records\[*\].taskContent",
        "taskType": "$.data.records\[*\].taskType"
      },
      // "pageing": {
      //   "page_field": "current", 
      //   "batch_size": 10 
      // },
      "schema": {
        "fields": {
          "id": "BIGINT",
          "taskContent": "STRING",
          "taskType": "STRING"
        }
      }
    }
  \],
  "transform": \[
    {
      "field_mapper": {
        "id": "id", 
        "taskContent": "task_content",
        "taskType": "task_type"
      },
      "result\_table\_name": "Table13367210156033",
      "source\_table\_name": "Table13367210156032",
      "plugin_name": "FieldMapper"
    }
  \],
  "sink": \[
    {
      "source\_table\_name": "Table13367210156033",
      "plugin_name": "Doris",
      "fenodes ": "*.*.*.*:*",
      "database": "test",
      "password": "****",
      "username": "****",
      "table": "ods\_day\_plan",
      "sink.label-prefix": "test-ods\_day\_plan",
      "sink.enable-2pc": false,
      "data\_save\_mode": "APPEND_DATA",
      "schema\_save\_mode": "CREATE\_SCHEMA\_WHEN\_NOT\_EXIST",
      "save\_mode\_create_template": "CREATE TABLE IF NOT EXISTS `${database}`.`${table_name}` (\\n ${rowtype_fields}\\n ) ENGINE=OLAP\\n UNIQUE KEY (id)\\n DISTRIBUTED BY HASH (id)\\n PROPERTIES (\\n \\"replication_allocation\\" = \\"tag.location.default: 1\\",\\n \\"in_memory\\" = \\"false\\",\\n  \\"storage_format\\" = \\"V2\\",\\n \\"disable\_auto\_compaction\\" = \\"false\\"\\n )",
      "sink.enable-delete": true,
      "doris.config": {
        "format": "json",
        "read\_json\_by_line": "true"
      }
    }
  \]
}

Issues Encountered During Usage

Handle Save Mode Failed

Caused by: java.sql.SQLException: errCode = 2, detailMessage = Syntax error in line 21:
 UNIQUE KEY ()
             ^
Encountered: )
Expected: IDENTIFIER

Solution: See the link [issue](https://github.com/apache/seatunnel/issues/6646)

This issue was resolved by using the `save_mode_create_template` field in the configuration file, which can be customized according to business needs.

NoSuchMethodError

java.lang.NoSuchMethodError: retrofit2.Retrofit$Builder.client(Lshaded/okhttp3/OkHttpClient;)Lretrofit2/Retrofit$Builder;
	at org.influxdb.impl.InfluxDBImpl.(InfluxDBImpl.java:179) ~\[connector-influxdb-2.3.4.jar:2.3.4\]
	at org.influxdb.impl.InfluxDBImpl.(InfluxDBImpl.java:120) ~\[connector-influxdb-2.3.4.jar:2.3.4\]
	at org.apache.seatunnel.connectors.seatunnel.influxdb.client.InfluxDBClient.getInfluxDB(InfluxDBClient.java:72) ~\[connector-influxdb-2.3.4.jar:2.3.4\]

When using the InfluxDB connection, I encountered ajar package conflictissue. It was found that there was a version conflict between the `retrofit2` dependency used to create the HTTP connection and the one in the datahub connector. Since I did not use `datahub`, removing the `datahub` connector solved the issue.

Apache Doris BIGINT Type Precision Loss Issue

See thepostfor details.

Configuring Primary Key

When configuring thesave_mode_create_templatefor Doris, the primary key type must be a number or date type.

Theidfield in the source schema configuration is returned as a string type, but it is an all-numeric type generated by the Snowflake algorithm, so theBIGINTtype is used for automatic conversion.

The reason is that theUNIQUE KEYin thesave_mode_create_templatein the sink configuration usesidas the primary key, and Doris requires that the primary key column typemust be a number or date type!!

Personal Experience

  1. When there is only one sink, source, or transform, you can omit the result_table_name and source_table_nameconfiguration items.

  2. Download the source code, modify it, and add log entries to the source code. Package and replace the jar in SeaTunnel runtime to facilitate understanding the code and obtaining the desired results through logs.

  3. Based on the first point, after becoming familiar with the code, secondary development can be carried out. For example, how to handle interfaces requiring token authentication.

  4. Note that the value of the JsonPath in the json_fieldof the source configuration does not support extracting values from complex types in lists (Array. Consider secondary development to resolve this.

// Example:
{
  "code": "0000",
  "msg": "Success",
  "data": {
    "records": \[
      {
        "id": "1798895733824393218",
        "taskContent": "License02",
        "taskType": "License",
        "region_list": \[ // This format's region\_list cannot be parsed and synced $.data.records\[*\].region\_list\[*\].id will cause a data and total mismatch error
          {
            "id":"1",
            "name": "11"
          },
          {
            "id":"1",
            "name": "11"
          }
        \]
      }
    \]
  }
}

Testing Code (Using JDK17)

 private static final Option\[\] DEFAULT_OPTIONS = {
            Option.SUPPRESS\_EXCEPTIONS, Option.ALWAYS\_RETURN\_LIST, Option.DEFAULT\_PATH\_LEAF\_TO_NULL
    };
    private JsonPath\[\] jsonPaths;
    private final Configuration jsonConfiguration = Configuration.defaultConfiguration().addOptions(DEFAULT_OPTIONS);

    @Test
    public void test5() {
        String data = """
                {
                    "code": "0000",
                    "msg": "Success",
                    "data": {
                        "records": \[
                            {
                                "id": "1798895733824393218",
                                "taskContent": "12312312313"
                            }
                        \]
                    }
                }
                """;
        Map map = new HashMap<>();
        map.put("id", "$.data.records\[*\].id");
        map.put("taskContent", "$.data.records\[*\].taskContent");
        JsonField jsonField = JsonField.builder().fields(map).build();
        initJsonPath(jsonField);
        data = JsonUtils.toJsonNode(parseToMap(decodeJSON(data), jsonField)).toString();
        log.error(data);
    }
    // The following code is from HttpSourceReader
    private void initJsonPath(JsonField jsonField) {
        jsonPaths = new JsonPath\[jsonField.getFields().size()\];
        for (int index = 0; index < jsonField.getFields().keySet().size(); index++) {
            jsonPaths\[index\] =
                    JsonPath.compile(
                            jsonField.getFields().values().toArray(new String\[\] {})\[index\]);
        }
    }

    private List> parseToMap(List> datas, JsonField jsonField) {
        List> decodeDatas = new ArrayList<>(datas.size());
        String\[\] keys = jsonField.getFields().keySet().toArray(new String\[\] {});

        for (List data : datas) {
            Map decodeData = new HashMap<>(jsonField.getFields().size());
            final int\[\] index = {0};
            data.forEach(
                    field -> {
                        decodeData.put(keys\[index\[0\]\], field);
                        index\[0\]++;
                    });
            decodeDatas.add(decodeData);
        }

        return decodeDatas;
    }

    private List> decodeJSON(String data) {
        ReadContext jsonReadContext = JsonPath.using(jsonConfiguration).parse(data);
        List> results = new ArrayList<>(jsonPaths.length);
        for (JsonPath path : jsonPaths) {
            List result = jsonReadContext.read(path);
            results.add(result);
        }
        for (int i = 1; i < results.size(); i++) {
            List> result0 = results.get(0);
            List> result = results.get(i);
            if (result0.size() != result.size()) {
                throw new HttpConnectorException(
                        HttpConnectorErrorCode.FIELD\_DATA\_IS_INCONSISTENT,
                        String.format(
                                "\[%s\](%d) and \[%s\](%d) the number of parsing records is inconsistent.",
                                jsonPaths\[0\].getPath(),
                                result0.size(),
                                jsonPaths\[i\].getPath(),
                                result.size()));
            }
        }

        return dataFlip(results);
    }

    private List> dataFlip(List> results) {

        List> datas = new ArrayList<>();
        for (int i = 0; i < results.size(); i++) {
            List result = results.get(i);
            if (i == 0) {
                for (Object o : result) {
                    String val = o == null ? null : o.toString();
                    List row = new ArrayList<>(jsonPaths.length);
                    row.add(val);
                    datas.add(row);
                }
            } else {
                for (int j = 0; j < result.size(); j++) {
                    Object o = result.get(j);
                    String val = o == null ? null : o.toString();
                    List row = datas.get(j);
                    row.add(val);
                }
            }
        }
        return datas;
    }

I hope this experience sharing will be helpful to everyone!

Related Articles

Leave a Reply

Your email address will not be published. Required fields are marked *

Back to top button