Like what you see? Have a play with our trial version.

A number of methods need to be implemented to get the processing logic of the step to work. Some methods are different depending on whether it is a Row Step or a Cached Step.

 

Common methods

Following are some of the common methods for defining how a step processes data.

MethodDescription & Example
public ETLStepAPIVersion getAPIVersion()

This method is used to define the version of the Yellowfin Step API to maintain step updates. This is usually the latest version in the enum ETLStepAPIVersion. The API version is used to determine compatibility.


@Override
public ETLStepAPIVersion getAPIVersion() {
    return ETLStepAPIVersion.V1;
}
public Collection<ETLException> validate()

This method performs pre-run validations. The implementation should check if any mandatory options were not set up, or if credentials are incorrect or a host is reachable. Errors should be captured in an instance of ETLException and added to the collection returned from the method. Alternatively, one could use the convenience method getInvalidConfigETLException() instead of constructing an ETLException.

 

@Override
public Collection<ETLException> validate() {
	List<ETLException> validationErrors = new ArrayList<>();
 	       
	String exampleOption = this.getStepOption("APPEND_VALUE");
	if (exampleOption == null) {
		// Add a generic message "Step not Properly Configured"
		validationErrors.add(this.getInvalidConfigETLException());
	}
 
	try {
		Integer.parseInt(exampleOption);
	} catch (NumberFormatException e) {
		ETLException ve = new ETLException(ETLElement.STEP, getUuid(), null, "Option is not a number", e);
		validationErrors.add(ve);
	}
	return validationErrors;
}
public Map<String, String> getValidatedStepOptions()

This method allows step options to be validated. Mapping of optionKey to optionValue will be saved in the Yellowfin config database. This is where one can remove invalid option values. Manipulating the mapping returned by this.getStepOptions() will have no effect.

 

@Override
public Map<String, String> getValidatedStepOptions() {
    Map<String, String> stepOptions = this.getStepOptions();
    String exampleOption = stepOptions.get("APPEND_VALUE");
    if (exampleOption == null) {
        // Remove option if the value is no longer set
        stepOptions.remove("APPEND_VALUE");
    } else {        	
        try {
            Integer.parseInt(exampleOption);
        } catch (NumberFormatException e) {
            // Remove option if the value is not an integer
            stepOptions.remove("APPEND_VALUE");
        }
    }
    // Return the map of valid options
    return stepOptions;
}


public void setupGeneratedFields()

Implement this method when the step is required to output a new field. The data in the new field may be generated using other fields. The new field may also replace an existing field or be a duplicate. Yellowfin provides convenience methods for each operation. This method is expected to create a new instance of ETLStepMetadataFieldBean or duplicate an existing field. It is important that this method runs only if fields were not set up before, or if they should be set up again. If they are to be recreated because of a change in an option, the old field must be removed. Otherwise new fields will be generated every time the step is reconfigured.

 

@Override
public void setupGeneratedFields() throws ETLException {
    if (getStepOption("NEW_FIELD") != null) {
        // The field was already set up.
        return;
    }

    ETLStepMetadataFieldBean newField = new ETLStepMetadataFieldBean();
    newField.setFieldName("Concatenated Field");
    newField.setFieldType(ETLDataType.TEXT.name());

    // The sort order is 0 based, so the new field will be at the end
    newField.setSortOrder(getDefaultMetadataFields().size());
	
    // Ensure that the new field is output from the step
    newField.setStepIncludeField(true);
    newField.setUserIncludeField(true);
	
    // This method assigns the field a new UUID and
    // adds a Step Option to help reference it elsewhere.
    this.addNewGeneratedField(newField, "NEW_FIELD");
}


The above example is for generating a new field. To duplicate a field, use the following statement:

this.addGeneratedField(newFieldBean, ETLFieldLinkType.DUPLICATE, originalFieldUUID)


To replace an existing field use:
this.replaceDefaultField(fieldToReplace)

The above line returns a new “replacement” field which links to the original field. You may modify the object, but should not change the linkFieldUUID and linkType.


Use the below code to get back the original field and delete the replacement field.

this.restoreReplacedField(replacementField)

public Integer getMinInputSteps()

public Integer getMaxInputSteps()

public Integer getMinOutputSteps()

public Integer getMaxOutputSteps()


These methods should be overridden if the step has multiple inputs or outputs. Yellowfin provides default implementations which return min/max values based on the Step Category. These values are defined in the ETLStepCategory enum element for that category.

YFLogger

Although this isn’t a method, it is common for all steps. A step can write to the Data Transformation log using the YFLogger. It should be declared as an instance variable. YFLogger is a wrapper for log4j’s Logger class.

 

private static final YFLogger log = YFLogger.getLogger(TestStep.class.getName());

 

 

 


 

Row Step Implementation 

A Row step extends the AbstractETLRowStep class. It requires the implementation of only one method, processWireData().


processWireData();

When the framework invokes processWireData(), data from each column of the current row is already on the correct Wire. Each wire is mapped to a metadata field and may be accessed using this.getWireForField(fieldUUID). Data should be retrieved from the wire, processed, and put back on the same wire or a different one.


  • Method Parameters: The method has one parameter, a List<ETLStepMetadataFieldBean>. These are the fields from the input step. They are provided for convenience. Although they may be used to fetch wires, the preferred way is by using Default Metadata Fields, as shown in the snippet below.
  • Return Value: The return value is a boolean which indicates whether the row of data should be output to the next step. A filter step, for example, would return false when data in a row does not satisfy filter conditions.
  • Exceptions: This method throws ETLException and InterruptedException. Processing errors, if any, must be wrapped in an instance of ETLException and thrown so that Yellowfin can display them for the user. Convenience method this.throwUnhandledETLException(e) may be used. Exceptions must not be caught and swallowed. It is also not advisable to catch java.lang.Exception as an InterruptedException will be caught as well. If it is unavoidable, then InterruptedException should be caught and thrown in a separate catch block.

    Code snippet:

    } catch (InterruptedException e) {
        throw e;
    } catch (Exception e) {
        log.error("Error: " + e, e);
        throwUnhandledETLException(e);
    }

 

Here’s a sample implementation for appending a number to a specific field.

@Override
protected boolean processWireData(List<ETLStepMetadataFieldBean> fields)
                                  throws ETLException, InterruptedException {
    	
    // The options should've been validated by the validate() method,
    // so no need for further checks here
    String appendFieldUUID = this.getStepOption("APPEND_FIELD");
    String newFieldUUID = this.getStepOption("NEW_FIELD");
    String appendValue = this.getStepOption("APPEND_VALUE");
 
    Wire<Object, String> appendFieldWire = this.getWireForField(appendFieldUUID);
    Wire<Object, String> newFieldWire = this.getWireForField(newFieldUUID);
 
    Object data = appendFieldWire.getValue();
    String newFieldData = null;
    if (data == null) {
        newFieldData = appendValue;
    } else {
        newFieldData = data.toString() + appendValue;
    }
	
    newFieldWire.send(newFieldData);
	
    return true;
}

In this example, processWireData() runs for every row of data. In a real-world implementation, objects which will not change in subsequent method calls, should be cached in member variables. For example, appendFieldUUID, newFieldUUID, appendValue, appendFieldWire and newFieldWire should be member variables, populated only when processWireData() runs for the first time.






Cached Step Implementation 

A cached step extends AbstractETLCachedStep. Only one method, processEndRows(), needs to be implemented. Cached steps usually have more than one input step. Data extraction steps are also often implemented as a cached step because the step will have no input. It generates data by executing an SQL query, for instance.


processEndRows();


  • Input Steps: For input steps, the framework invokes processEndRows() as soon as the process starts running. An input step needn’t bother about data caching. However, the implementation must put data on wires and emit data to its output(s). Here’s a sample implementation of the method for a data generator step:

    @Override
    protected void processEndRows() throws ETLException, InterruptedException {
    
        // Get the first output flow;
        // Useful for most steps which have a single output
        String outFlow = getFirstOutputFlow();
    
        // The step outputs four Generated fields.
        List<String> orderedFieldUUIDs = new ArrayList<>(4);
    
        // The step's implementation of setupGeneratedFields()
        // should set these up. Their UUIDs would've been saved as step options.
        orderedFieldUUIDs.add(getStepOption("FIELD1_UUID"));
        orderedFieldUUIDs.add(getStepOption("FIELD2_UUID"));
        orderedFieldUUIDs.add(getStepOption("FIELD3_UUID"));
        orderedFieldUUIDs.add(getStepOption("FIELD4_UUID"));
    
        // Sample Data
        String[] field1_data = {"Adventure", "Relaxation", "Culture", "Family"};
        int[] field2_data = {30, 32, 11, 44};
        Date[] field3_data = {new Date(103882823L), new Date(10388283323L),
                              new Date(103883232823L), new Date(102323882823L)};
        Timestamp[] field4_data = {new Timestamp(103882823L), new Timestamp(10388283323L),
                                  new Timestamp(103883232823L), new Timestamp(102323882823L)};
    
        // Generate as many rows as configured in Step Option ROW_COUNT
        int rowCount = 10;//Integer.parseInt(getStepOption("ROW_COUNT"));
        Random random = new Random();
    
        for (int i = 0 ; i < rowCount ; i++) {
            // Data is emitted in packets.
            // This implementation creates a new packet for every row.
            // Data packets can accumulate rows and emit, say, every 20 rows.
            ETLStepResult dataPacket = getFreshDataPacket(outFlow);
    
            Object[] row = new Object[4];
            row[0] = field1_data[random.nextInt(4)];
            row[1] = field2_data[random.nextInt(4)];
            row[2] = field3_data[random.nextInt(4)];
            row[3] = field4_data[random.nextInt(4)];
    
            // Send the row of data from Default Fields to Output Fields
            beginInternalTransmission(row, orderedFieldUUIDs);
    
            // Accumulate transmitted data in a data packet
            endInternalTransmission(dataPacket);
    
            // Emit the packet of data to the next step.
            // This may be done less frequently, after accumulating rows
            emitData(dataPacket);
        }
    }
    
    
  • Transformation steps: For cached transformation steps, the framework invokes processEndRows() when every input has finished sending its data to the cached step. Each input step’s data is stored in a separate memory cache. The step implementation must send data on wires and emit data from the step to its output(s). The example below implements proc for a Union-All step and shows how data caches are used.


    @Override
    protected void processEndRows() throws ETLException, InterruptedException {
        // Get input flows which feed data to this step
        Set<String> inputFlowUuids = this.getInputFlowUuids();
    
        // Get the output flow as there can be only one
        String outFlowUuid = getFirstOutputFlow();
    
        // Get a data packet
        ETLStepResult dataPacket = getFreshDataPacket(outFlowUuid);
    
        for (String inputFlowUuid : inputFlowUuids) {
            // Get data of each input from its cache
            ETLDataCache inputData = getDataCache(inputFlowUuid);
    
            // Use this to get the Default Metadata Field corresponding to an Input Field.
            // Input Metadata Field is the field in the input step.
            // Cached data will be in the order of input fields.
            Map<String, String> inputToDefaultFieldMap = getInputToDefaultFieldMap();
    
            // The data will match the Input Metadata Fields
            List<ETLStepMetadataFieldBean> inputFieldList = inputData.getMetadataFields();
            List<String> unionResultFields = new ArrayList<String>();
    
            for(ETLStepMetadataFieldBean fieldBean : inputFieldList){
                // Get the Default Metadata Field for an Input Metadata Field
                String inputFieldUuid = fieldBean.getEtlStepMetadataFieldUUID();
                String defaultFieldUuid = inputToDefaultFieldMap.get(inputFieldUuid);
    
                // Get the Generated Default Metadata Field holding the result of the Union.
                // getUnionFieldForDefaultField() is a method defined in the Step.
                // The result of the union operation is sent to new generated fields.
                // It figures out how a Default Field is linked to the generated field.
                String unionResultField = getUnionFieldForDefaultField(defaultFieldUuid);
    
                // This holds the Union field corresponding to the Input Field.
                // Fields which are excluded from the union will have a null entry.
                unionResultFields.add(unionResultField);    
            }
    
            // Iterate through the cached data
            Iterator<Object[]> it = inputData.iterator();
    
            while (it.hasNext()) {
                Object[] row = it.next();
    
                // Transmit data from:
                // input fields -> default -> generated default (union fields) -> output
                // Data excluded from the union will have a "null" field, 
                // so nothing will be transmitted.
                this.beginInternalTransmission(row, unionResultFields);
                this.endInternalTransmission(dataPacket);
            }
        }
    
        // Accumulate all data before emitting to the next step
        emitData(dataPacket);
    }

 

 

 


 


 

  • No labels