Bitcoin

Cybersecurity Giant Supercharges Apache SeaTunnel to Tame Complex Data

With the continuous complexity of data sources and the rapid evolution of business demands, general-purpose data integration frameworks often face many challenges in practical deployment: frequent issues such as irregular data structures, missing fields, mixed sensitive information, and unclear data semantics. To better address these complex scenarios, a leading publicly listed cybersecurity enterprise has performed secondary development based on Apache SeaTunnel, building a scalable, easy-to-maintain data processing and intelligent fault-tolerance mechanism suitable for complex scenarios. This article will comprehensively introduce the relevant technical implementations around actual functional extensions and design concepts.

Background and Pain Points

In practical business scenarios, the data sources we face are highly heterogeneous, including but not limited to log files, FTP/SFTP files, Kafka messages, and database changes. The data itself may be structurally inconsistent or even unstructured text or semi-structured XML format. The following problems are particularly prominent:

  • Insufficient complex data parsing capability: Unable to parse and ingest complex data such as XML, key-value, and irregular data.
  • Lack of data completion and dictionary translation capability: Unable to complete asset information when supplementing raw logs, resulting in incomplete data and missing key information, which leads to insufficient data analysis capability and inability to mine data value.
  • Limited file reading modes: Unable to capture and parse newly added logs in real time, causing delays in security threat detection and loss of real-time analysis and subsequent system alert value.
  • Weak exception handling mechanisms: During task execution, data senders may change logs without notifying receivers, causing task interruption. Without notification of log changes, it is difficult to quickly locate and solve problems.

New Features: Processing and Transformation Capability Extensions Based on SeaTunnel

To address the above complex scenarios, we built multiple Transform plugins based on SeaTunnel for regex transform, XML parsing, Key-Value parsing, dynamic data completion, IP address completion, data masking, dictionary translation, extended Incremental reading for SFTP/FTP, and other processing.

1. Regex Parsing (Regex Transform)

Used for parsing structured or semi-structured text fields. By configuring regular expressions and specifying group mappings, raw text can be split into multiple business fields. This method is widely used in log parsing and field splitting scenarios.

Core Parameters:

  • source_field: The original field to be parsed
  • regex: Regular expression, e.g., (\d+)-(\w+)
  • groupMap: The mapping relationship between parsed result fields and regex capture group indexes

33

2. XML Parsing

Using the VTD-XML parser combined with XPath expressions to precisely extract XML nodes, attributes, and text content, converting them into structured data.

Core Parameters:

  • pathMap: Mapping each result field to the corresponding XPath path of the needed attribute
  • source_field: The XML string field name
  • 55

3. Key-Value Parsing

Parse strings like "key1=value1;key2=value2" into structured fields. Supports configuration of key-value and field delimiters.

Core Parameters:

  • source_field: Upstream key-value string field
  • field_delimiter: Key-value pair delimiter (e.g., ;)
  • kv_delimiter: Key and value delimiter (e.g., =)
  • fields: Set of target mapped field keys

77

4. Dynamic Data Completion (Lookup Enrichment)

Dynamically fill in missing fields using auxiliary data streams or dictionary tables, such as completing device asset information, user attributes, etc.

NameTypeRequiredDescriptionsource_table_join_fieldStringYesSource table join field, used to get the source table field valuedimension_table_join_fieldStringYesDimension table join field, used to get the dimension table datadimension_table_jdbc_urlStringYesJDBC URL path for the dimension tabledriver_class_nameStringYesDriver nameusernameStringYesUsernamepasswordStringYesPassworddimension_table_sqlStringYesSQL statement for the dimension table, the queried fields will be output to the next level processdata_cache_expire_time_minuteslongYesData cache refresh time in minutes

Implementation Highlights:

  • Support external data source lookup based on key fields
  • Local cache to improve lookup performance
  • Configurable timed cache data refresh

99

5. IP Address Completion

Derive geographic information such as country, city, region from IP fields by locally integrating the IP2Location database.

Parameters:

  • field: Source IP field
  • output_fields: Geographic fields to extract (e.g., country, city)

1111

6. Data Masking

Mask sensitive information such as phone numbers, ID cards, emails, IP addresses, supporting various masking rules (mask, fuzzy replacement, etc.) to ensure privacy compliance.

NameTypeRequiredDescriptionfieldStringYesThe field that needs to be desensitizedrule_typeStringYesRule type: Positive desensitization, Equal desensitizationdesensitize_typeStringYesDesensitization type: Phone number, ID number, Email, IP address; Required when choosing positive desensitizationequal_contentStringNoEqual content; Required when choosing equal desensitizationdisplay_modeStringYesDisplay mode: Full desensitization, Head-tail desensitization, Middle desensitization

Common Masking Strategies:

  • Mask middle four digits of phone numbers: 138****8888
  • Mask email account name: x***@domain.com
  • Mask IP address: 192.168.*.*

1313

7. Dictionary Translation

Convert encoded values into business semantics (e.g., gender code 1 => Male2 => Female), improving data readability and report quality.

NameTypeRequiredDescriptionfieldsStringYesField list, format: target field name = source field namedict_fieldsArray

YesDictionary field list. Each dictionary object has the following attributes:fieldNameSource field nametypeDictionary source, “FILE” means the dictionary is from a file, “CONFIG” means it is from a string.dictConfigDictionary string, in JSONObject format. Required when type = “CONFIG”.filePathPath to the dictionary file, content is in JSONObject format. Required when type = “FILE”.fileSplitWordDelimiter for dictionary items in the dictionary file. Default is a comma.

Supported Sources:

  • Configured JSON format string data
  • Referenced TEXT files containing dictionary content

1515

8. Extended Incremental Reading for SFTP/FTP

SeaTunnel natively supports reading remote files but has room for optimization in incremental pulling, breakpoint resume, and multithreaded consumption. We extended the following capabilities:

  • Incremental detection based on file modification time: automatically detect new or changed files
  • Thread pool scan triggering: schedule file pulling with timed tasks
  • Concurrent processing with multiple consumers and consumption modes: improve throughput and avoid duplicate consumption
  • Read offset record and breakpoint resume: ensure no data loss on retries
  • Log and health status monitoring: support real-time alerts and logging
  • Historical file cleanup strategy: automatically clean old data according to retention days

Performance Test (Real Data):

  • Data volume: 10 million records, target table fields: 72
  • Kafka throughput: 55,000 records/second
  • Environment configuration:
  • Kafka, SeaTunnel, ClickHouse all deployed on single machines
  • OS: CentOS 7, CPU: 8 cores 16 threads, Memory: 32G
  • SeaTunnel JVM: -Xms1G -Xmx1G

New FeatureProcessing Rate (rows/s)Maximum Processing Rate (rows/s)Positive Rule Parsing55000/s61034/sXML Parsing52000/s60030/sKey-Value Parsing54020/s59010/sDynamic Data Completion53000/s62304/sIP Address Completion50410/s58060/sData Desensitization55100/s63102/sDictionary Translation55000/s61690/sFtp/Sftp Incremental Reading53000/s69000/s

Component Development Case Sharing

SeaTunnel’s data processing and transformation API defines the basic framework and behavior of transformation operations through abstract classes and interfaces. Specific transformation operation classes inherit and implement these abstractions to complete specific data transformation logic. This design grants SeaTunnel’s transformation operations good extensibility and flexibility.

SeaTunnel Transform API Architecture Analysis

1. SeaTunnelTransform (Interface)

  • Type: Interface
  • Role: Top-level interface for transform plugins, defines a unified entry for reading data from source and performing transformations.
  • Core methods:
  • map(): Abstract method to be implemented by subclasses, converts raw data into new structures.
  • getProducedCatalogTable(): Returns the structure information of the transformed data table, ensuring consistency with the data returned by map.

2. AbstractSeaTunnelTransform (Abstract Class)

  • Type: Abstract class
  • Role: Implements SeaTunnelTransform interface and encapsulates common logic.
  • Functionality: Unifies the handling process of transform(), so subclasses only focus on specific transformation logic.

3. AbstractCatalogSupportTransform (Abstract Class)

  • Type: Abstract class
  • Role: On top of the previous abstraction, further abstracts field mapping and data structure transformation.
  • Functionality: Provides unified field mapping and Catalog mapping support, facilitating integration with metadata systems.

4. Concrete classes like RegexParseTransformSplitTransform

  • Type: Concrete implementation classes
  • Role: Implement transformRow() and transformTableSchema() methods, defining specific transformation logic for each data row and output structure.
  • Functionality: Represent user-customized transformation rules, such as regex parsing, field splitting, etc.

1919

Based on Apache SeaTunnel’s efficient and extensible Transform mechanism, we developed the above components. Next, we analyze two cases to illustrate the new capabilities brought by these extensions.

Case 1: Regex Parsing Capability

  • Requirements and Background:
  • An internet company needs to ingest host login logs for analysis of abnormal user behaviors like brute force attacks, account theft, and borrowing.
  • Sample data: [2023-08-15 14:23:45 [ERROR] 192.168.1.1 - Login failed for user zhangsan].
  • Log features: complex format, multiple key information blended together, inconsistent format with no fixed field delimiters.
  • Solution: Build a regex-based log parsing function to dynamically identify and extract key information from irregular logs (log files, host logs, application logs), converting unstructured log content into structured data.
  • Solution:
  1. Identify the upstream field to be parsed.
  2. Write regular expressions. Define regex patterns in config files using capture groups to mark key data segments to extract.
  3. Define mapping between result fields and regex capture groups (groupMap). SeaTunnel reads raw data and applies predefined regex line by line to match patterns, automatically extracting data for each capture group and mapping extracted results to pre-configured target fields.

2020

Case 2: Dynamic Data Completion Capability

  • Requirements and Background:
  • A bank needs to ingest customer transaction data, but the raw data lacks key information such as transaction purpose and payee name, affecting subsequent risk monitoring and anti-money laundering analysis.
  • Sample data: Transaction ID: TX20250426001, Customer Account: 622202****1234, Transaction Time: 2024-09-10 08:00, Amount: 5,000 CNY, Channel: Mobile Banking.
  • Log features: missing transaction purpose, payee name, and other key information.
  • Solution: Build a function to complete missing data fields based on lookup auxiliary data streams, ensuring data integrity and consistency.
  • Solution:
  1. Identify source fields and dimension (lookup) table fields.
  2. Configure auxiliary data source JDBC connection, driver, username, and password, supporting any JDBC-type database.
  3. Define custom SQL to complete data. The rowjoin component uses configured database connection to query the database and write SQL query results into a Caffeine cache. The cache expiry mechanism refreshes data by re-executing SQL when data expires.

2121

Intelligent Fault Tolerance Mechanism for Dirty Data

In large-scale data processing tasks, a small amount of abnormal data should not cause the entire task to fail. We designed a closed-loop mechanism of “error classification ➡ detection ➡ handling ➡ recording” to ensure handling of all types of exceptions during massive data processing.

22.22.

The principle is: individual dirty data or exceptions should not interrupt the entire task, while all error information is retained for subsequent fixing and auditing.

Core Principles:

  • Do not interrupt main flow: Dirty data can be recorded and skipped without affecting overall task execution.
  • Hierarchical handling strategy:
  • Parsing exceptions (e.g., invalid JSON/XML format)
  • Mapping exceptions (field type mismatch, missing fields)
  • Network/IO exceptions (external source connection failures)
  • Full chain error logging and tracing: Include raw data, exception type, and handling result.
  • Configurable retry mechanism: Allow limited automatic retries for recoverable exceptions.

Future Plans and Evolution Directions

To make Apache SeaTunnel better meet our business scenarios, we will continue to evolve data processing capabilities along the following directions:

  1. JDBC-based time incremental: Use scheduled tasks and timestamp fields to query incremental data from databases, suitable for environments where database configurations cannot be modified.
  2. API incremental collection: Periodically call third-party business system APIs over HTTP/HTTPS to obtain the latest asset qualification data.
  3. Connector-Syslog: Plan to expand collector plugins for Syslog protocol to capture network device and system logs in real-time, supporting high-availability HA and load balancing.
  4. More intelligent parsing: Explore use of NLP, AI-based parsing methods for complex text content.
  5. Enhanced monitoring: Build unified monitoring platform with error alarm, task status tracking, and consumption flow metrics.
  6. More connectors: Continue adding connectors to support more big data, cloud, and enterprise databases.

Conclusion

In summary, secondary development of Apache SeaTunnel provides a powerful and flexible platform for addressing the highly complex, heterogeneous, and large-scale data integration challenges faced by enterprises in cybersecurity and finance. Through self-developed transformation components, advanced incremental reading strategies, and intelligent fault tolerance mechanisms, we have greatly improved the robustness, accuracy, and real-time performance of data pipelines.

SeaTunnel’s extensible architecture and modular plugin design allow us to quickly implement business-specific functions while benefiting from the rich ecosystem and ongoing community innovation. We hope this sharing will provide practical reference and inspiration for other developers and engineers working in complex enterprise data integration scenarios.

Related Articles

Leave a Reply

Your email address will not be published. Required fields are marked *

Back to top button