Getting Started with Chainlink Data Streams
Read data from a Data Stream and validate the answer on-chain. This example uses a Chainlink Automation Log Trigger to check for events that require data. For this example, the log trigger comes from a simple emitter contract. Chainlink Automation then uses StreamsLookup
to retrieve a signed report from the Data Streams Engine, return the data in a callback, and run the performUpkeep
function on your registered upkeep contract. The performUpkeep
function calls the verify
function on the verifier contract.
Before you begin
- Data Streams are currently available only in Early Access. Upkeeps for Data Streams will not run without first acquiring access. Contact us to talk to an expert before you deploy the example contracts in this tutorial.
- If you are new to smart contract development, learn how to Deploy Your First Smart Contract so you are familiar with the tools that are necessary for this guide:
- Acquire testnet funds. This guide requires a testnet ETH on Arbitrum Goerli.
- Use the Arbitrum Bridge to transfer testnet ETH from Ethereum Goerli to Arbitrum Goerli. Testnet ETH is available at one of several faucets.
- Testnet LINK is available for Arbitrum Goerli at faucets.chain.link.
- Learn how to Fund your contract with LINK.
Deploy the Chainlink Automation upkeep
Deploy an upkeep that is enabled to retrieve data from Data Streams. For this example, you will read from the ETH/USD stream with ID 0x00023496426b520583ae20a66d80484e0fc18544866a5b0bfee15ec771963274
on Arbitrum Goerli. For a complete list of available assets, IDs, and verifier proxy addresses, see the Stream Identifiers page.
-
Select the Arbitrum Goerli network in MetaMask.
-
Open the example upkeep in Remix.
-
Compile the contract.
-
In the Deploy & Run tab, select Injected Provider as your Environment. For this example, you should be using Arbitrum Goerli.
-
Deploy the contract. Record the contract address.
Deploy the emitter
This contract emits logs that trigger the upkeep. This code can be part of your dApp. For example, you might emit log triggers when your users initiate a trade or other action that requires data retrieval. For this Getting Started guide, we will use a very simple emitter so you can test the upkeep and data retrieval.
-
Select the Arbitrum Goerli network in MetaMask.
-
Open the example emitter in Remix.
-
Compile the contract.
-
In the Deploy & Run tab, select Injected Provider as your Environment.
-
Deploy the contract with the following constructor variables:
- FeeAddress (WETH):
0xe39Ab88f8A4777030A534146A9Ca3B52bd5D43A3
- VerifierAddress:
0xea9B98Be000FBEA7f6e88D08ebe70EbaAD10224c
- FeeAddress (WETH):
-
Record the contract address.
Register the upkeep
Register a new Log Trigger upkeep. See Automation Log Triggers to learn more about how to register Log Trigger upkeeps.
-
Go to the Chainlink Automation UI for Arbitrum Goerli and connect your browser wallet.
-
Click Register new Upkeep.
-
Select the Log Trigger upkeep type and click Next.
-
Specify a name for the upkeep.
-
Specify the upkeep contract address that you saved earlier and click Next.
-
Specify the emitter contract address that you saved earlier. This tells Chainlink Automation what contracts to watch for log triggers.
-
Specify a Starting balance of 1 testnet LINK for this example. You can retrieve unused LINK later.
-
Leave the Check data value and other fields blank for now, and click Register Upkeep. MetaMask prompts you to confirm the transaction.
Emit a log
Now you can use your emitter contract to emit a log and initiate the upkeep, which retrieves data for the specified Data Streams asset ID.
-
Go to goerli.arbiscan.io.
-
Use the search to find your emitter contract using the address you saved earlier.
-
Click the Contract tab to view the contract.
-
Click the Write Contract button to find the
emitLog
function. -
Click Connect to Web3 to connect your wallet to Arbiscan. This allows you to run write functions in Arbiscan. MetaMask will prompt you to accept the connection.
-
Click the
emitLog
function to expand the function details. -
Click the Write button to execute the function that will emit the log. MetaMask prompts you to accept the transaction.
After the transaction is complete, the log is emitted and the upkeep is triggered. You can see the retrieved data on your upkeep.
View the upkeep
View the upkeep and check the on-chain verification.
-
Go to the Chainlink Automation UI for Arbitrum Goerli.
-
On your list of upkeeps, click the upkeep that you crated for this guide. The upkeep details open with a list of upkeeps that were performed. You should see your log triggered upkeep on this list.
-
Click the transaction hash for the upkeep to view the transaction in Arbiscan.
-
On the Logs tab for your transaction, you will see that the
performUpkeep
function emitted the price from the verified report.
Examine the code
The example code that you deployed has all of the interfaces and functions required to work with Chainlink Automation as an upkeep contract. It follows a similar flow to the trading flow in the Architecture documentation, but uses a basic log emitter to simulate the client contract that would initiate a StreamsLookup
. After the contract receives and verifies the report, performUpkeep
emits a PriceUpdate
log message with the price. You could modify this to use the data in a way that works for your specific use case and application.
// SPDX-License-Identifier: MIT
pragma solidity ^0.8.19;
interface StreamsLookupCompatibleInterface {
error StreamsLookup(
string feedParamKey,
string[] feeds,
string timeParamKey,
uint256 time,
bytes extraData
);
function checkCallback(
bytes[] memory values,
bytes memory extraData
) external view returns (bool upkeepNeeded, bytes memory performData);
}
interface ILogAutomation {
function checkLog(
Log calldata log,
bytes memory checkData
) external returns (bool upkeepNeeded, bytes memory performData);
function performUpkeep(bytes calldata performData) external;
}
struct Log {
uint256 index;
uint256 timestamp;
bytes32 txHash;
uint256 blockNumber;
bytes32 blockHash;
address source;
bytes32[] topics;
bytes data;
}
interface IVerifierProxy {
function verify(
bytes memory signedReport
) external payable returns (bytes memory verifierResponse);
}
interface IReportHandler {
function handleReport(bytes calldata report) external;
}
contract StreamsUpkeep is ILogAutomation, StreamsLookupCompatibleInterface {
IVerifierProxy public verifier;
struct BasicReport {
bytes32 feedId; // The feed ID the report has data for
uint32 validFromTimestamp; // Earliest timestamp for which price is applicable
uint32 observationsTimestamp; // Latest timestamp for which price is applicable
uint192 nativeFee; // Base cost to validate a transaction using the report, denominated in the chain’s native token (WETH/ETH)
uint192 linkFee; // Base cost to validate a transaction using the report, denominated in LINK
uint32 expiresAt; // Latest timestamp where the report can be verified on-chain
int192 price; // DON consensus median price, carried to 8 decimal places
}
struct PremiumReport {
bytes32 feedId; // The feed ID the report has data for
uint32 validFromTimestamp; // Earliest timestamp for which price is applicable
uint32 observationsTimestamp; // Latest timestamp for which price is applicable
uint192 nativeFee; // Base cost to validate a transaction using the report, denominated in the chain’s native token (WETH/ETH)
uint192 linkFee; // Base cost to validate a transaction using the report, denominated in LINK
uint32 expiresAt; // Latest timestamp where the report can be verified on-chain
int192 price; // DON consensus median price, carried to 8 decimal places
int192 bid; // Simulated price impact of a buy order up to the X% depth of liquidity utilisation
int192 ask; // Simulated price impact of a sell order up to the X% depth of liquidity utilisation
}
struct Quote {
address quoteAddress;
}
event ReportVerified(BasicReport indexed report);
event PriceUpdate(int192 price);
address public immutable FEE_ADDRESS;
string public constant STRING_DATASTREAMS_FEEDLABEL = "feedIDs";
string public constant STRING_DATASTREAMS_QUERYLABEL = "timestamp";
string[] public feedsHex = [
"0x00023496426b520583ae20a66d80484e0fc18544866a5b0bfee15ec771963274"
];
constructor(address _feeAddress, address _verifier) {
verifier = IVerifierProxy(_verifier); //0xea9B98Be000FBEA7f6e88D08ebe70EbaAD10224c
FEE_ADDRESS = _feeAddress; // 0xe39Ab88f8A4777030A534146A9Ca3B52bd5D43A3 (WETH)
}
function checkLog(
Log calldata log,
bytes memory
) external view returns (bool upkeepNeeded, bytes memory performData) {
revert StreamsLookup(
STRING_DATASTREAMS_FEEDLABEL,
feedsHex,
STRING_DATASTREAMS_QUERYLABEL,
log.timestamp,
""
);
}
function checkCallback(
bytes[] calldata values,
bytes calldata extraData
) external pure returns (bool, bytes memory) {
return (true, abi.encode(values, extraData));
}
function performUpkeep(bytes calldata performData) external override {
(bytes[] memory signedReports, bytes memory extraData) = abi.decode(
performData,
(bytes[], bytes)
);
bytes memory report = signedReports[0];
bytes memory bundledReport = bundleReport(report);
BasicReport memory unverifiedReport = _getReportData(report);
bytes memory verifiedReportData = verifier.verify{
value: unverifiedReport.nativeFee
}(bundledReport);
BasicReport memory verifiedReport = abi.decode(
verifiedReportData,
(BasicReport)
);
emit PriceUpdate(verifiedReport.price);
}
function bundleReport(
bytes memory report
) internal view returns (bytes memory) {
Quote memory quote;
quote.quoteAddress = FEE_ADDRESS;
(
bytes32[3] memory reportContext,
bytes memory reportData,
bytes32[] memory rs,
bytes32[] memory ss,
bytes32 raw
) = abi.decode(
report,
(bytes32[3], bytes, bytes32[], bytes32[], bytes32)
);
bytes memory bundledReport = abi.encode(
reportContext,
reportData,
rs,
ss,
raw,
abi.encode(quote)
);
return bundledReport;
}
function _getReportData(
bytes memory signedReport
) internal pure returns (BasicReport memory) {
(, bytes memory reportData, , , ) = abi.decode(
signedReport,
(bytes32[3], bytes, bytes32[], bytes32[], bytes32)
);
BasicReport memory report = abi.decode(reportData, (BasicReport));
return report;
}
fallback() external payable {}
}