C Spark Knowledge Modules
This appendix provides information about the Spark knowledge modules.
This appendix includes the following sections:
LKM File to Spark
This KM will load data from a file into a Spark Python variable and can be defined on the AP between the execution units, source technology File, target technology Spark Python.
Note:
This KM also supports loading HDFS files, although it's preferable to use LKM HDFS to Spark for that purpose.The following tables describe the options for LKM File to Spark.
Table C-1 LKM File to Spark
Option | Description |
---|---|
Storage Function |
The storage function to be used to load/store data. Choose one of the following storage functions to load data:
|
streamingContext |
Name of Streaming Context. |
InputFormatClass |
Class for reading the format of input data. For example,
|
KeyClass |
Fully qualified classname for keys. For example,
|
ValueClass |
Fully qualified classname for values. For example,
|
KeyConverter |
Fully qualified classname of key converter class. |
ValueConverter |
Fully qualified classname of value converter class. |
Job Configuration |
Hadoop configuration. For example, {'hbase.zookeeper.quorum': 'HOST', 'hbase.mapreduce.inputtable': 'TAB'} |
inferSchema |
Infer DataFrame schema from data. If set to True (default), the column names and types will be inferred from source data and DataFrame will be created with default options. If set to False, the DataFrame schema will be specified based on the source data store definition. |
dateFormat |
Format for Date or Timestamp input fields. |
Delete Spark Mapping Files |
Delete temporary objects at end of mapping. |
Cache |
Cache RDD/DataFrame across operations after computation. |
Storage Level |
The storage level to be used to cache data. |
Repartition |
Repartition the RDD/DataFrame after transformation of this component. |
Level of Parallelism |
Number of partitions. |
Sort Partitions |
Sort partitions by a key function when repartitioning. |
Partition Sort Order |
Sort partitions order. |
Partition Keys |
Define keys for partitions. |
Partition Function |
Customized partitioning function. |
This LKM uses StreamingContext.textFileStream() method to transfer file context as data stream. The directory is monitored while the Spark application is running. Any files copied from other locations into this directory is detected.
Table C-2 LKM File to Spark for Streaming
Option | Description |
---|---|
Storage Function |
If STREAMING_MODE is set to true, the load function is changed to textFileStream automatically. Default is textFile. |
Source Data store |
Source data store is a directory and field separator should be defined. |
LKM Spark to File
This KM will store data into a file from a Spark Python variable and can be defined on the AP between the execution units, source technology Spark Python, target technology File.
Note:
This KM also supports writing to an HDFS File, although the LKM Spark to HDFS is preferable.The following tables describes the options for LKM Spark to File.
Table C-3 LKM Spark to File
Option | Description |
---|---|
Storage Function |
Storage function to be used to load/store data. Choose one of the following storage functions to store data:
Note: When spark.useDataFrames is set to True, the data will be saved as RDD of JSON strings forsaveAsNewAPIHadoopFile , saveAsHadoopFile , and saveAsSequenceFile .
|
OutputFormatClass |
Fully qualified classname for writing the data. For example,
|
KeyClass |
Fully qualified class for keys. For example,
|
ValueClass |
Fully qualified class for values. For example,
|
KeyConverter |
Fully qualified classname of key converter class. |
ValueConverter |
Fully qualified classname of value converter class. |
Job Configuration |
Allows adding or overriding Hadoop configuration properties. For example, {'hbase.zookeeper.quorum': 'HOST', 'hbase.mapreduce.inputtable': 'TAB'} |
SQL_EXPRESSIONS |
Use SQL Expressions. |
Delete Spark Mapping Files |
Delete temporary objects at end of mapping. |
Cache |
Cache RDD/DataFrame across operations after computation. |
Storage Level |
The storage level to be used to cache data. |
Repartition |
Repartition the RDD/DataFrame after transformation of this component. |
Level of Parallelism |
Number of partitions. |
Sort Partitions |
Sort partitions by a key function when repartitioning. |
Partition Sort Order |
Sort partitions order. |
Partition Keys |
Define keys for partitions. |
Partition Function |
Customized partitioning function. |
Table C-4 LKM Spark to File for streaming
Option | Description |
---|---|
Storage Function |
If STREAMING_MODE is set to true, the load function is changed to textFileStream automatically. Default is textFile. |
LKM Hive to Spark
This KM will load data from a Hive table into a Spark Python variable and can be defined on the AP between the execution units, source technology Hive, target technology Spark Python.
The following table describes the options for LKM Hive to Spark:
Table C-5 LKM Hive to Spark
Option | Description |
---|---|
Delete Spark Mapping Files |
Delete temporary objects at end of mapping. |
Cache |
Cache RDD/DataFrame across operations after computation. |
Storage Level |
The storage level to be used to cache data. |
Repartition |
Repartition the RDD/DataFrame after transformation of this component. |
Level of Parallelism |
Number of partitions. |
Sort Partitions |
Sort partitions by a key function when repartitioning. |
Partition Sort Order |
Sort partitions order. |
Partition Keys |
Define keys for partitions. |
Partition Function |
Customized partitioning function. |
LKM Spark to Hive
This KM will store data into a Hive table from a Spark Python variable and can be defined on the AP between the execution units, source technology Spark, target technology Hive.
The following table describes the options for LKM Spark to Hive.
Table C-6 LKM Spark to Hive
Option | Description |
---|---|
OVERWRITE_TARGET_TABLE |
Overwrite the target table. |
INFER_SCHEMA |
Infer target DataFrame schema from RDD data. Note: This option is set to True by default. When set to True, the column names and types will be inferred from RDD data and DataFrame will be created with default options. If it is set to False, DataFrame schema will be specified based on target datastore definition. Set this option to False if, you are getting errors such as :ValueError: Some types cannot be determined by the first X rows, please try again with sampling. This usually happens if, one or more target columns receive NULL values. When using False there might be execution errors if source column datatype is different from target column datatype. In such a case it is recommended to add conversion function to attribute mapping.
|
SAMPLING_RATIO |
The sample ratio of rows used for inferring. |
SQL_EXPRESSIONS |
Use SQL Expressions. |
Delete Spark Mapping Files |
Delete temporary objects at end of mapping. |
Cache |
Cache RDD/DataFrame across operations after computation. |
Storage Level |
The storage level to be used to cache data. |
Repartition |
Repartition the RDD/DataFrame after transformation of this component. |
Level of Parallelism |
Number of partitions. |
Sort Partitions |
Sort partitions by a key function when repartitioning. |
Partition Sort Order |
Sort partitions order. |
Partition Keys |
Define keys for partitions. |
Partition Function |
Customized partitioning function. |
LKM HDFS to Spark
This KM will load data from HDFS file to Spark.
Table C-7 LKM HDFS to Spark
Option | Description |
---|---|
streamingContext |
Name of Streaming Context. |
inferSchema |
Infer DataFrame schema from data. |
Delete Spark Mapping Files |
Delete temporary objects at end of mapping. |
Cache |
Cache RDD/DataFrame across operations after computation. |
Storage Level |
The storage level to be used to cache data. |
Repartition |
Repartition the RDD/DataFrame after transformation of this component. |
Level of Parallelism |
Number of partitions. |
Sort Partitions |
Sort partitions by a key function when repartitioning. |
Partition Sort Order |
Sort partitions order. |
Partition Keys |
Define keys for partitions. |
Partition Function |
Customized partitioning function. |
Note:
Streaming is enabled when the streaming check box is selected in the physical schema. Streaming is only supported for the Delimited and JSON formats.LKM Spark to HDFS
This KM will load data from Spark to HDFS file.
Table C-8 LKM Spark to HDFS
Option | Description |
---|---|
SQL_EXPRESSIONS |
Use SQL Expressions. |
Delete Spark Mapping Files |
Delete temporary objects at end of mapping. |
Cache |
Cache RDD/DataFrame across operations after computation. |
Storage Level |
The storage level to be used to cache data. |
Repartition |
Repartition the RDD/DataFrame after transformation of this component. |
Level of Parallelism |
Number of partitions. |
Sort Partitions |
Sort partitions by a key function when repartitioning. |
Partition Sort Order |
Sort partitions order. |
Partition Keys |
Define keys for partitions. |
Partition Function |
Customized partitioning function. |
Note:
Streaming is enabled when the streaming check box is selected in the physical schema. Streaming is supported for all formats.LKM Kafka to Spark
This KM will load data with Kafka source and Spark target and can be defined on the AP node that exist in Spark execution unit and have Kafka upstream node.
Table C-9 LKM Kafka to Spark for streaming
Option | Description |
---|---|
Storage Function |
The storage function to be used to load data. |
fromOffsets |
Per-topic/partition Kafka offsets defining the (inclusive) starting point of the stream. |
KeyDecoder |
Converts message key. |
ValueDecoder |
Converts message value. |
groupId |
The group id for this consumer. |
storageLevel |
RDD Storage level. |
numPartitions |
Number of partitions for each consumer. |
offsetRanges |
List of offsetRange to specify topic:partition:[start, end) to consume. |
leaders |
Kafka brokers for each TopicAndPartition in offsetRanges. |
messageHandler |
A function used to convert KafkaMessageAndMetadata. |
avroSchema |
avroSchema have the content of .avsc file. This file is associated with .avro Data file. |
Delete Spark Mapping Files |
Delete temporary objects at end of mapping. |
Cache |
Cache RDD/DataFrame across operations after computation. |
Storage Level |
The storage level to be used to cache data. |
Repartition |
Repartition the RDD/DataFrame after transformation of this component. |
Level of Parallelism |
Number of partitions. |
Sort Partitions |
Sort partitions by a key function when repartitioning. |
Partition Sort Order |
Sort partitions order. |
Partition Keys |
Define keys for partitions. |
Partition Function |
Customized partitioning function. |
LKM Spark to Kafka
LKM Spark to Kafka works in both streaming and batch mode and can be defined on the AP between the execution units and have Kafka downstream node.
Table C-10 LKM Spark to Kafka
Option | Description |
---|---|
avroSchema |
Has the content of .avsc file. This file is associated with .avro Data file. |
Delete Spark Mapping Files |
Delete temporary objects at end of mapping. |
Cache |
Cache RDD/DataFrame across operations after computation. |
Storage Level |
The storage level to be used to cache data. |
Repartition |
Repartition the RDD/DataFrame after transformation of this component. |
Level of Parallelism |
Number of partitions. |
Sort Partitions |
Sort partitions by a key function when repartitioning. |
Partition Sort Order |
Sort partitions order. |
Partition Keys |
Define keys for partitions. |
Partition Function |
Customized partitioning function. |
LKM SQL to Spark
This KM is designed to load data from Cassandra into Spark, but it can work with other JDBC sources. It can be defined on the AP node that have SQL source and Spark target.
To use this KM, it is mandatory to configure the Hadoop Credential Provider and define the password. For more information, see Password Handling in Hadoop.
Table C-11 LKM SQL to Spark
Option | Description |
---|---|
PARTITION_COLUMN |
Column used for partitioning. |
LOWER_BOUND |
Lower bound of the partition column. |
UPPER_BOUND |
Upper bound of the partition column. |
NUMBER_PARTITIONS |
Number of partitions. |
PREDICATES |
List of predicates. |
Delete Spark Mapping Files |
Delete temporary objects at end of mapping. |
Cache |
Cache RDD/DataFrame across operations after computation. |
Storage Level |
The storage level to be used to cache data. |
Repartition |
Repartition the RDD/DataFrame after transformation of this component. |
Level of Parallelism |
Number of partitions. |
Sort Partitions |
Sort partitions by a key function when repartitioning. |
Partition Sort Order |
Sort partitions order. |
Partition Keys |
Define keys for partitions. |
Partition Function |
Customized partitioning function. |
LKM Spark to SQL
This KM will load data from Spark into JDBC targets and can be defined on the AP node that have Spark source and SQL target.
To use this KM, it is mandatory to the configure the Hadoop Credential Provider and define the password. For more information, see Password Handling in Hadoop.
Table C-12 LKM Spark to SQL
Option | Description |
---|---|
CREATE_TARG_TABLE |
Create target table. |
TRUNCATE_TARG_TABLE |
Truncate target table. |
DELETE_TARG_TABLE |
Delete target table. |
INFER_SCHEMA |
Infer target DataFrame schema from RDD data. Note: This option is set to True by default. When set to True, the column names and types will be inferred from RDD data and DataFrame will be created with default options. If it is set to False, DataFrame schema will be specified based on target datastore definition. Set this option to False if you are getting errors such as :ValueError: Some types cannot be determined by the first X rows, please try again with sampling. This usually happens if, one or more target columns receive NULL values. When using False there might be execution errors if source column datatype is different from target column datatype. In such a case it is recommended to add conversion function to attribute mapping.
|
SAMPLING_RATIO |
The sample ratio of rows used for inferring. |
SQL_EXPRESSIONS |
Use SQL Expressions. |
Delete Spark Mapping Files |
Delete temporary objects at end of mapping. |
Cache |
Cache RDD/DataFrame across operations after computation. |
Storage Level |
The storage level is used to cache data. |
Repartition |
Repartition the RDD/DataFrame after transformation of this component. |
Level of Parallelism |
Number of partitions. |
Sort Partitions |
Sort partitions by a key function when you repartition RDD/DataFrame. |
Partition Sort Order |
Sort partition order. |
Partition Keys |
Define keys of partition. |
Partition Function |
Customized partitioning function. |
LKM Spark to Cassandra
To use this KM, it is mandatory to configure the Hadoop Credential Provider and define the password. For more information, see Password Handling in Hadoop.
Table C-13 LKM Spark to Cassandra
Option | Description |
---|---|
CREATE_TARG_TABLE |
Create target table. |
TRUNCATE_TARG_TABLE |
Truncate target table. |
DELETE_TARG_TABLE |
Delete target table. |
INFER_SCHEMA |
Infer target DataFrame schema from RDD data. |
SAMPLING_RATIO |
The sample ratio of rows used for inferring. |
SQL_EXPRESSIONS |
Use SQL Expressions. |
Delete Spark Mapping Files |
Delete temporary objects at end of mapping. |
Cache |
Cache RDD/DataFrame across operations after computation. |
Storage Level |
The storage level to be used to cache data. |
Repartition |
Repartition the RDD/DataFrame after transformation of this component. |
Level of Parallelism |
Number of partitions. |
Sort Partitions |
Sort partitions by a key function when repartitioning. |
Partition Sort Order |
Sort partitions order. |
Partition Keys |
Define keys for partitions. |
Partition Function |
Customized partitioning function. |
RKM Cassandra
-
Cassandra tables as data stores.
The Mask field in the Reverse Engineer tab filters reverse-engineered objects based on their names. The Mask field cannot be empty and must contain at least the percent sign (%).
-
Cassandra columns as attributes with their data types.
XKM Spark Aggregate
Summarize rows, for example, using SUM and GROUP BY.
The following tables describes the options for XKM Spark Aggregate.
Table C-14 XKM Spark Aggregate
Option | Description |
---|---|
CACHE_DATA |
Persist the data with the default storage level. |
NUMBER_OF_TASKS |
Task number. |
Table C-15 XKM Spark Aggregate for streaming
Option | Description |
---|---|
WINDOW_AGGREGATION |
Enable window aggregation. |
WINDOW_LENGTH |
Number of batch intervals. |
SLIDING_INTERVAL |
The interval at which the window operation is performed. |
STATEFUL_AGGREGATION |
Enables stateful aggregation. |
STATE_RETENTION_PERIOD |
Time in seconds to retain a key or value aggregate in the Spark state object. |
FORWARD_ONLY_UPDATED_ROWS |
Modified aggregate values forwarded to downstream components. |
XKM Spark Distinct
Eliminates duplicates in data and functionality is identical to the existing batch processing.
XKM Spark Filter
Produce a subset of data by a filter condition.
The following tables describes the options for XKM Spark Filter.
Table C-16 XKM Spark Filter
Option | Description |
---|---|
CACHE_DATA |
Persist the data with the default storage level. |
XKM Spark Join
Joins more than one input sources based on the join condition.
The following tables describes the options for XKM Spark Join.
Table C-17 XKM Spark Join
Option | Description |
---|---|
CACHE_DATA |
Persist the data with the default storage level. |
NUMBER_OF_TASKS |
Task number. |
XKM Spark Lookup
Lookup data for a driving data source.
The following tables describes the options for XKM Spark Lookup.
Table C-18 XKM Spark Lookup
Option | Description |
---|---|
CACHE_DATA |
Persist the data with the default storage level. |
NUMBER_OF_TASKS |
Task number. |
MAP_SIDE |
Defines whether the KM will do a map-side lookup or a reduce-side lookup and significantly impacts lookup performance. |
KEY_BASED_LOOKUP |
Only data corresponding to the lookup keys are retrieved. |
Table C-19 XKM Spark Lookup for streaming
Option | Description |
---|---|
MAP_SIDE |
MAP_SIDE=true : Suitable for small lookup data sets fitting into memory. This setting provides better performance by broadcasting the lookup data to all Spark tasks. |
KEY_BASED_LOOKUP |
For any incoming lookup key a Spark cache is checked.
|
CACHE_RELOAD |
This option defines when the lookup source data is loaded and refreshed and here are the corresponding values:
|
CACHE_RELOAD_INTERVAL |
Defines the time data to be retained in the Spark cache. After this time the expired data or records are removed from cache. |
XKM Spark Pivot
Take data in separate rows, aggregates it and converts it into columns.
The following tables describes the options for XKM Spark Pivot.
Table C-20 XKM Spark Pivot
Option | Description |
---|---|
CACHE_DATA |
Persist the data with the default storage level. |
Note:
XKM Spark Pivot does not support streaming.XKM Spark Sort
Sort data using an expression.
The following tables describes the options for XKM Spark Sort.
Table C-21 XKM Spark Sort
Option | Description |
---|---|
CACHE_DATA |
Persist the data with the default storage level. |
NUMBER_OF_TASKS |
Task number. |
XKM Spark Split
Split data into multiple paths with multiple conditions.
The following tables describes the options for XKM Spark Split.
Table C-22 XKM Spark Split
Option | Description |
---|---|
CACHE_DATA |
Persist the data with the default storage level. |
XKM Spark Table Function
This KM allows applying custom transformation by executing arbitrary Spark/Python transformations as part of the overall Spark Python script.
The following table describes the options for XKM Spark Table Function.
Table C-23 XKM Spark Table Function
Option | Description |
---|---|
SPARK_SCRIPT |
User specifies the customized code content. |
SPARK_SCRIPT_FILE |
User specifies the path of spark script file. |
CACHE_DATA |
Persist the data with the default storage level. |
Note:
Only one of the options, either SPARK_SCRIPT or SPARK_SCRIPT_FILE must be set.-
If SPARK_SCRIPT_FILE is set, then the specified file will be dynamically executed.
-
If SPARK_SCRIPT is set, its content will be inserted into the main Spark script.
-
If neither SPARK_SCRIPT nor SPARK_SCRIPT_FILE is set, a validation error is generated stating that at least one of the options must be specified.
-
If both SPARK_SCRIPT and SPARK_SCRIPT_FILE are set, a validation error is generated stating that only one of the options must be specified.
IKM Spark Table Function
Spark table function as target.
The following tables describes the options for IKM Spark Table Function.
Table C-24 IKM Spark Table Function
Option | Description |
---|---|
SPARK_SCRIPT_FILE |
User specifies the path of spark script file. |
CACHE_DATA |
Persist the data with the default storage level. |
XKM Spark Unpivot
Transform a single row of attributes into multiple rows in an efficient manner.
The following tables describes the options for XKM Spark Pivot.
Table C-25 XKM Spark Unpivot
Option | Description |
---|---|
CACHE_DATA |
Persist the data with the default storage level. |
Note:
XKM Spark Unpivot does not support streaming.