NEW

Chainlink Data Streams have officially launched on mainnet. Sign up for early access.

Back

Using Data Streams with Automation

You can access reports from Data Streams in Automation using StreamsLookup. StreamsLookup is available on chains where Data Streams is available. You can find a list of available networks and stream IDs on the Data Feeds Stream IDs page.

StreamsLookup interface

To get started you need to follow the StreamsLookup interface. You can use StreamsLookup with both log trigger and custom logic trigger upkeeps.

Using the StreamsLookup interface

To request data you need to use the StreamsLookup revert statement, following the error specification in the interface, in your checkUpkeep or checkLog function. The StreamsLookup error contains the details of the reports you wish to fetch from Data Streams.

You also need to add a checkCallback function. checkCallback is an additional function where you can post process fetched reports using verifiable off-chain compute in the Automation Network, before performing on-chain.

You can find the StreamsLookupCompatibleInterface.sol contract on GitHub.

StreamsLookup revert

Here is a snippet of the StreamsLookup revert error used with a log trigger upkeep:

revert StreamsLookup(feedParamKey,feeds,timeParamKey,log.timestamp,extraData);

The feedParamKey parameter can only take the string value feedIDs and timeParamKey parameter can only take the string value timestamp and these are case-sensitive.

You specify the feeds you want to request by using the string list variable feeds. For example feeds = ["0x0...","0x..."]. Data Stream feed IDs can be found here.

The parameter timestamp specifies the timestamp of the report you will be requesting. Requesting the report for a future timestamp will not return anything, similarly historical reports are only stored as indicated in Data Streams, one month at time of writing.

extraData is a variable you can use to send data to the checkCallback function. It can be empty e.g. "".

checkCallback

The checkCallback function is an extra verifiable off-chain compute function that the nodes will simulate after fetching your Data Streams report(s). Here you can process the fetched data and create the performData that you want to execute on-chain.

function checkCallback(
  bytes[] memory values,
  bytes memory extraData
) external view returns (bool upkeepNeeded, bytes memory performData);

The values variable is a returned array of bytes containing the reports fetched from Data Streams for each of the feeds requested. extraData is the data you sent from the checkUpkeep or checkLog function where you performed the StreamsLookup revert.

By setting upkeepNeeded equal to true the Network will proceed to execute your performUpkeep function on chain and you can use performData to provide calldata into your perform.

Example StreamsLookup

Here is a simple example of fetching Data Streams reports using StreamsLookup and verifying the report. Data Streams is available in Early Access. Contact us to talk to an expert about integrating Chainlink Data Streams with your applications.

This code example has some slight differences from the example in the Data Streams Getting Started guide for an example.

// SPDX-License-Identifier: MIT
pragma solidity ^0.8.19;

interface StreamsLookupCompatibleInterface {
    error StreamsLookup(
        string feedParamKey,
        string[] feeds,
        string timeParamKey,
        uint256 time,
        bytes extraData
    );

    function checkCallback(
        bytes[] memory values,
        bytes memory extraData
    ) external view returns (bool upkeepNeeded, bytes memory performData);
}

interface ILogAutomation {
    function checkLog(
        Log calldata log,
        bytes memory checkData
    ) external returns (bool upkeepNeeded, bytes memory performData);

    function performUpkeep(bytes calldata performData) external;
}

struct Log {
    uint256 index;
    uint256 timestamp;
    bytes32 txHash;
    uint256 blockNumber;
    bytes32 blockHash;
    address source;
    bytes32[] topics;
    bytes data;
}

interface IVerifierProxy {
    function verify(
        bytes memory signedReport
    ) external payable returns (bytes memory verifierResponse);
}

interface IReportHandler {
    function handleReport(bytes calldata report) external;
}

contract StreamsUpkeep is ILogAutomation, StreamsLookupCompatibleInterface {
    IVerifierProxy public verifier;

    struct BasicReport {
        bytes32 feedId; // The feed ID the report has data for
        uint32 validFromTimestamp; // Earliest timestamp for which price is applicable
        uint32 observationsTimestamp; // Latest timestamp for which price is applicable
        uint192 nativeFee; // Base cost to validate a transaction using the report, denominated in the chain’s native token (WETH/ETH)
        uint192 linkFee; // Base cost to validate a transaction using the report, denominated in LINK
        uint32 expiresAt; // Latest timestamp where the report can be verified on-chain
        int192 price; // DON consensus median price, carried to 8 decimal places
    }

    struct PremiumReport {
        bytes32 feedId; // The feed ID the report has data for
        uint32 validFromTimestamp; // Earliest timestamp for which price is applicable
        uint32 observationsTimestamp; // Latest timestamp for which price is applicable
        uint192 nativeFee; // Base cost to validate a transaction using the report, denominated in the chain’s native token (WETH/ETH)
        uint192 linkFee; // Base cost to validate a transaction using the report, denominated in LINK
        uint32 expiresAt; // Latest timestamp where the report can be verified on-chain
        int192 price; // DON consensus median price, carried to 8 decimal places
        int192 bid; // Simulated price impact of a buy order up to the X% depth of liquidity utilisation
        int192 ask; // Simulated price impact of a sell order up to the X% depth of liquidity utilisation
    }

    struct Quote {
        address quoteAddress;
    }

    event ReportVerified(BasicReport indexed report);
    event PriceUpdate(int192 price);

    address public immutable FEE_ADDRESS;
    string public constant STRING_DATASTREAMS_FEEDLABEL = "feedIDs";
    string public constant STRING_DATASTREAMS_QUERYLABEL = "timestamp";
    string[] public feedsHex = [
        "0x00023496426b520583ae20a66d80484e0fc18544866a5b0bfee15ec771963274"
    ];

    constructor(address _feeAddress, address _verifier) {
        verifier = IVerifierProxy(_verifier); //0xea9B98Be000FBEA7f6e88D08ebe70EbaAD10224c
        FEE_ADDRESS = _feeAddress; // 0xe39Ab88f8A4777030A534146A9Ca3B52bd5D43A3 (WETH)
    }

    function checkLog(
        Log calldata log,
        bytes memory
    ) external view returns (bool upkeepNeeded, bytes memory performData) {
        revert StreamsLookup(
            STRING_DATASTREAMS_FEEDLABEL,
            feedsHex,
            STRING_DATASTREAMS_QUERYLABEL,
            log.timestamp,
            ""
        );
    }

    function checkCallback(
        bytes[] calldata values,
        bytes calldata extraData
    ) external pure returns (bool, bytes memory) {
        return (true, abi.encode(values, extraData));
    }

    function performUpkeep(bytes calldata performData) external override {
        (bytes[] memory signedReports, bytes memory extraData) = abi.decode(
            performData,
            (bytes[], bytes)
        );

        bytes memory report = signedReports[0];

        bytes memory bundledReport = bundleReport(report);

        BasicReport memory unverifiedReport = _getReportData(report);

        bytes memory verifiedReportData = verifier.verify{
            value: unverifiedReport.nativeFee
        }(bundledReport);
        BasicReport memory verifiedReport = abi.decode(
            verifiedReportData,
            (BasicReport)
        );

        emit PriceUpdate(verifiedReport.price);
    }

    function bundleReport(
        bytes memory report
    ) internal view returns (bytes memory) {
        Quote memory quote;
        quote.quoteAddress = FEE_ADDRESS;
        (
            bytes32[3] memory reportContext,
            bytes memory reportData,
            bytes32[] memory rs,
            bytes32[] memory ss,
            bytes32 raw
        ) = abi.decode(
                report,
                (bytes32[3], bytes, bytes32[], bytes32[], bytes32)
            );
        bytes memory bundledReport = abi.encode(
            reportContext,
            reportData,
            rs,
            ss,
            raw,
            abi.encode(quote)
        );
        return bundledReport;
    }

    function _getReportData(
        bytes memory signedReport
    ) internal pure returns (BasicReport memory) {
        (, bytes memory reportData, , , ) = abi.decode(
            signedReport,
            (bytes32[3], bytes, bytes32[], bytes32[], bytes32)
        );

        BasicReport memory report = abi.decode(reportData, (BasicReport));
        return report;
    }

    fallback() external payable {}
}

Validating Data Streams reports

To ensure there is a cryptographic audit trail on-chain for your users, validate your reports on-chain. See the Data Streams Getting Started guide for an example.

Debugging StreamsLookup

Read our debugging section to learn how to identify and resolve common errors when using StreamsLookup.

What's next

Stay updated on the latest Chainlink news