-
Notifications
You must be signed in to change notification settings - Fork 416
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[#6233] feat(flink): flink jdbc catalog #6543
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall looks good.
...ctor/flink/src/main/java/org/apache/gravitino/flink/connector/jdbc/GravitinoJdbcCatalog.java
Show resolved
Hide resolved
Thanks for the PR, could you add a related document? And besides Mysql, does Flink JDBC connector support other RDMS? |
...-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalog.java
Outdated
Show resolved
Hide resolved
...ctor/flink/src/main/java/org/apache/gravitino/flink/connector/jdbc/GravitinoJdbcCatalog.java
Outdated
Show resolved
Hide resolved
...java/org/apache/gravitino/flink/connector/integration/test/jdbc/FlinkJdbcMysqlCatalogIT.java
Show resolved
Hide resolved
...r/flink/src/main/java/org/apache/gravitino/flink/connector/jdbc/JdbcPropertiesConstants.java
Show resolved
Hide resolved
The FlinkJdbcCatalog supports only MySQL and PostgreSQL databases. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall looks pretty good.
Left some picky comments for consideration.
...ctor/flink/src/main/java/org/apache/gravitino/flink/connector/jdbc/GravitinoJdbcCatalog.java
Show resolved
Hide resolved
...ink/src/main/java/org/apache/gravitino/flink/connector/jdbc/GravitinoJdbcCatalogFactory.java
Outdated
Show resolved
Hide resolved
Bumps [axios](https://github.com/axios/axios) from 1.7.2 to 1.7.4. <details> <summary>Release notes</summary> <p><em>Sourced from <a href="https://github.com/axios/axios/releases">axios's releases</a>.</em></p> <blockquote> <h2>Release v1.7.4</h2> <h2>Release notes:</h2> <h3>Bug Fixes</h3> <ul> <li><strong>sec:</strong> CVE-2024-39338 (<a href="https://redirect.github.com/axios/axios/issues/6539">#6539</a>) (<a href="https://redirect.github.com/axios/axios/issues/6543">#6543</a>) (<a href="https://github.com/axios/axios/commit/6b6b605eaf73852fb2dae033f1e786155959de3a">6b6b605</a>)</li> <li><strong>sec:</strong> disregard protocol-relative URL to remediate SSRF (<a href="https://redirect.github.com/axios/axios/issues/6539">#6539</a>) (<a href="https://github.com/axios/axios/commit/07a661a2a6b9092c4aa640dcc7f724ec5e65bdda">07a661a</a>)</li> </ul> <h3>Contributors to this release</h3> <ul> <li><!-- raw HTML omitted --> <a href="https://github.com/levpachmanov" title="+47/-11 ([apache#6543](axios/axios#6543) )">Lev Pachmanov</a></li> <li><!-- raw HTML omitted --> <a href="https://github.com/hainenber" title="+49/-4 ([apache#6539](axios/axios#6539) )">Đỗ Trọng Hải</a></li> </ul> <h2>Release v1.7.3</h2> <h2>Release notes:</h2> <h3>Bug Fixes</h3> <ul> <li><strong>adapter:</strong> fix progress event emitting; (<a href="https://redirect.github.com/axios/axios/issues/6518">#6518</a>) (<a href="https://github.com/axios/axios/commit/e3c76fc9bdd03aa4d98afaf211df943e2031453f">e3c76fc</a>)</li> <li><strong>fetch:</strong> fix withCredentials request config (<a href="https://redirect.github.com/axios/axios/issues/6505">#6505</a>) (<a href="https://github.com/axios/axios/commit/85d4d0ea0aae91082f04e303dec46510d1b4e787">85d4d0e</a>)</li> <li><strong>xhr:</strong> return original config on errors from XHR adapter (<a href="https://redirect.github.com/axios/axios/issues/6515">#6515</a>) (<a href="https://github.com/axios/axios/commit/8966ee7ea62ecbd6cfb39a905939bcdab5cf6388">8966ee7</a>)</li> </ul> <h3>Contributors to this release</h3> <ul> <li><!-- raw HTML omitted --> <a href="https://github.com/DigitalBrainJS" title="+211/-159 ([apache#6518](axios/axios#6518) [apache#6519](axios/axios#6519) )">Dmitriy Mozgovoy</a></li> <li><!-- raw HTML omitted --> <a href="https://github.com/ValeraS" title="+3/-3 ([apache#6515](axios/axios#6515) )">Valerii Sidorenko</a></li> <li><!-- raw HTML omitted --> <a href="https://github.com/prianyu" title="+2/-2 ([apache#6505](axios/axios#6505) )">prianYu</a></li> </ul> </blockquote> </details> <details> <summary>Changelog</summary> <p><em>Sourced from <a href="https://github.com/axios/axios/blob/v1.x/CHANGELOG.md">axios's changelog</a>.</em></p> <blockquote> <h2><a href="https://github.com/axios/axios/compare/v1.7.3...v1.7.4">1.7.4</a> (2024-08-13)</h2> <h3>Bug Fixes</h3> <ul> <li><strong>sec:</strong> CVE-2024-39338 (<a href="https://redirect.github.com/axios/axios/issues/6539">#6539</a>) (<a href="https://redirect.github.com/axios/axios/issues/6543">#6543</a>) (<a href="https://github.com/axios/axios/commit/6b6b605eaf73852fb2dae033f1e786155959de3a">6b6b605</a>)</li> <li><strong>sec:</strong> disregard protocol-relative URL to remediate SSRF (<a href="https://redirect.github.com/axios/axios/issues/6539">#6539</a>) (<a href="https://github.com/axios/axios/commit/07a661a2a6b9092c4aa640dcc7f724ec5e65bdda">07a661a</a>)</li> </ul> <h3>Contributors to this release</h3> <ul> <li><!-- raw HTML omitted --> <a href="https://github.com/levpachmanov" title="+47/-11 ([apache#6543](axios/axios#6543) )">Lev Pachmanov</a></li> <li><!-- raw HTML omitted --> <a href="https://github.com/hainenber" title="+49/-4 ([apache#6539](axios/axios#6539) )">Đỗ Trọng Hải</a></li> </ul> <h2><a href="https://github.com/axios/axios/compare/v1.7.2...v1.7.3">1.7.3</a> (2024-08-01)</h2> <h3>Bug Fixes</h3> <ul> <li><strong>adapter:</strong> fix progress event emitting; (<a href="https://redirect.github.com/axios/axios/issues/6518">#6518</a>) (<a href="https://github.com/axios/axios/commit/e3c76fc9bdd03aa4d98afaf211df943e2031453f">e3c76fc</a>)</li> <li><strong>fetch:</strong> fix withCredentials request config (<a href="https://redirect.github.com/axios/axios/issues/6505">#6505</a>) (<a href="https://github.com/axios/axios/commit/85d4d0ea0aae91082f04e303dec46510d1b4e787">85d4d0e</a>)</li> <li><strong>xhr:</strong> return original config on errors from XHR adapter (<a href="https://redirect.github.com/axios/axios/issues/6515">#6515</a>) (<a href="https://github.com/axios/axios/commit/8966ee7ea62ecbd6cfb39a905939bcdab5cf6388">8966ee7</a>)</li> </ul> <h3>Contributors to this release</h3> <ul> <li><!-- raw HTML omitted --> <a href="https://github.com/DigitalBrainJS" title="+211/-159 ([apache#6518](axios/axios#6518) [apache#6519](axios/axios#6519) )">Dmitriy Mozgovoy</a></li> <li><!-- raw HTML omitted --> <a href="https://github.com/ValeraS" title="+3/-3 ([apache#6515](axios/axios#6515) )">Valerii Sidorenko</a></li> <li><!-- raw HTML omitted --> <a href="https://github.com/prianyu" title="+2/-2 ([apache#6505](axios/axios#6505) )">prianYu</a></li> </ul> </blockquote> </details> <details> <summary>Commits</summary> <ul> <li><a href="https://github.com/axios/axios/commit/abd24a7367726616e60dfc04cb394b4be37cf597"><code>abd24a7</code></a> chore(release): v1.7.4 (<a href="https://redirect.github.com/axios/axios/issues/6544">#6544</a>)</li> <li><a href="https://github.com/axios/axios/commit/6b6b605eaf73852fb2dae033f1e786155959de3a"><code>6b6b605</code></a> fix(sec): CVE-2024-39338 (<a href="https://redirect.github.com/axios/axios/issues/6539">#6539</a>) (<a href="https://redirect.github.com/axios/axios/issues/6543">#6543</a>)</li> <li><a href="https://github.com/axios/axios/commit/07a661a2a6b9092c4aa640dcc7f724ec5e65bdda"><code>07a661a</code></a> fix(sec): disregard protocol-relative URL to remediate SSRF (<a href="https://redirect.github.com/axios/axios/issues/6539">#6539</a>)</li> <li><a href="https://github.com/axios/axios/commit/c6cce43cd94489f655f4488c5a50ecaf781c94f2"><code>c6cce43</code></a> chore(release): v1.7.3 (<a href="https://redirect.github.com/axios/axios/issues/6521">#6521</a>)</li> <li><a href="https://github.com/axios/axios/commit/e3c76fc9bdd03aa4d98afaf211df943e2031453f"><code>e3c76fc</code></a> fix(adapter): fix progress event emitting; (<a href="https://redirect.github.com/axios/axios/issues/6518">#6518</a>)</li> <li><a href="https://github.com/axios/axios/commit/85d4d0ea0aae91082f04e303dec46510d1b4e787"><code>85d4d0e</code></a> fix(fetch): fix withCredentials request config (<a href="https://redirect.github.com/axios/axios/issues/6505">#6505</a>)</li> <li><a href="https://github.com/axios/axios/commit/92cd8ed94362f929d3d0ed85ca84296c0ac8fd6d"><code>92cd8ed</code></a> chore(github): update ISSUE_TEMPLATE.md (<a href="https://redirect.github.com/axios/axios/issues/6519">#6519</a>)</li> <li><a href="https://github.com/axios/axios/commit/8966ee7ea62ecbd6cfb39a905939bcdab5cf6388"><code>8966ee7</code></a> fix(xhr): return original config on errors from XHR adapter (<a href="https://redirect.github.com/axios/axios/issues/6515">#6515</a>)</li> <li>See full diff in <a href="https://github.com/axios/axios/compare/v1.7.2...v1.7.4">compare view</a></li> </ul> </details> <br /> [](https://docs.github.com/en/github/managing-security-vulnerabilities/about-dependabot-security-updates#about-compatibility-scores) Dependabot will resolve any conflicts with this PR as long as you don't alter it yourself. You can also trigger a rebase manually by commenting `@dependabot rebase`. [//]: # (dependabot-automerge-start) [//]: # (dependabot-automerge-end) --- <details> <summary>Dependabot commands and options</summary> <br /> You can trigger Dependabot actions by commenting on this PR: - `@dependabot rebase` will rebase this PR - `@dependabot recreate` will recreate this PR, overwriting any edits that have been made to it - `@dependabot merge` will merge this PR after your CI passes on it - `@dependabot squash and merge` will squash and merge this PR after your CI passes on it - `@dependabot cancel merge` will cancel a previously requested merge and block automerging - `@dependabot reopen` will reopen this PR if it is closed - `@dependabot close` will close this PR and stop Dependabot recreating it. You can achieve the same result by closing it manually - `@dependabot show <dependency name> ignore conditions` will show all of the ignore conditions of the specified dependency - `@dependabot ignore this major version` will close this PR and stop Dependabot creating any more for this major version (unless you reopen the PR or upgrade to it yourself) - `@dependabot ignore this minor version` will close this PR and stop Dependabot creating any more for this minor version (unless you reopen the PR or upgrade to it yourself) - `@dependabot ignore this dependency` will close this PR and stop Dependabot creating any more for this dependency (unless you reopen the PR or upgrade to it yourself) You can disable automated security fix PRs for this repo from the [Security Alerts page](https://github.com/apache/gravitino/network/alerts). </details> Signed-off-by: dependabot[bot] <[email protected]> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
...r/flink/src/main/java/org/apache/gravitino/flink/connector/jdbc/JdbcPropertiesConstants.java
Show resolved
Hide resolved
...ink/src/main/java/org/apache/gravitino/flink/connector/jdbc/GravitinoJdbcCatalogFactory.java
Outdated
Show resolved
Hide resolved
...ink/src/main/java/org/apache/gravitino/flink/connector/jdbc/GravitinoJdbcCatalogFactory.java
Show resolved
Hide resolved
...ctor/flink/src/main/java/org/apache/gravitino/flink/connector/jdbc/GravitinoJdbcCatalog.java
Show resolved
Hide resolved
...ctor/flink/src/main/java/org/apache/gravitino/flink/connector/jdbc/GravitinoJdbcCatalog.java
Outdated
Show resolved
Hide resolved
...-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalog.java
Outdated
Show resolved
Hide resolved
...ink/src/main/java/org/apache/gravitino/flink/connector/jdbc/GravitinoJdbcCatalogFactory.java
Show resolved
Hide resolved
|
||
@Override | ||
public String driverName() { | ||
return "com.mysql.jdbc.Driver"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There may multi Mysql drivers with different names, seems we need a more general solution to handle jdbc-driver
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
...ink/src/test/java/org/apache/gravitino/flink/connector/integration/test/utils/TestUtils.java
Outdated
Show resolved
Hide resolved
...r/flink/src/main/java/org/apache/gravitino/flink/connector/jdbc/JdbcPropertiesConstants.java
Outdated
Show resolved
Hide resolved
@@ -136,7 +137,8 @@ default Map<String, String> toFlinkDatabaseProperties(Map<String, String> gravit | |||
* @param gravitinoProperties The table properties provided by Gravitino. | |||
* @return The table properties for the Flink connector. | |||
*/ | |||
default Map<String, String> toFlinkTableProperties(Map<String, String> gravitinoProperties) { | |||
default Map<String, String> toFlinkTableProperties( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could you add tablePath
to Java doc and explain why add this parameters?
...tor/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalogFactory.java
Show resolved
Hide resolved
...r/flink/src/main/java/org/apache/gravitino/flink/connector/jdbc/JdbcPropertiesConstants.java
Show resolved
Hide resolved
flinkToGravitinoMap.put(FLINK_JDBC_URL, GRAVITINO_JDBC_URL); | ||
flinkToGravitinoMap.put(FLINK_JDBC_USER, GRAVITINO_JDBC_USER); | ||
flinkToGravitinoMap.put(FLINK_JDBC_PASSWORD, GRAVITINO_JDBC_PASSWORD); | ||
flinkToGravitinoMap.put(FLINK_JDBC_DEFAULT_DATABASE, GRAVITINO_JDBC_DEFAULT_DATABASE); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no need to transform FLINK_JDBC_DEFAULT_DATABASE
between GRAVITINO_JDBC_DEFAULT_DATABASE
, Flink properties converter will do it automaticly.
public Map<String, String> toFlinkTableProperties( | ||
Map<String, String> gravitinoProperties, ObjectPath tablePath) { | ||
Map<String, String> tableOptions = new HashMap<>(); | ||
tableOptions.put( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
any reason to append database to FLINK_JDBC_URL? the URL may contains database information.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+ "/" | ||
+ tablePath.getDatabaseName()); | ||
tableOptions.put("table-name", tablePath.getObjectName()); | ||
tableOptions.put("username", catalogOptions.get(JdbcPropertiesConstants.FLINK_JDBC_USER)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about reuse the properties defined in JdbcPropertiesConstants
or define new properties ?
|
||
@Override | ||
public org.apache.flink.table.catalog.Catalog createCatalog(Context context) { | ||
context.getOptions().remove(JdbcPropertiesConstants.FLINK_DRIVER); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why removing FLINK_DRIVER ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
public static final String GRAVITINO_JDBC_PASSWORD = "jdbc-password"; | ||
public static final String GRAVITINO_JDBC_URL = "jdbc-url"; | ||
public static final String GRAVITINO_JDBC_DRIVER = "jdbc-driver"; | ||
public static final String GRAVITINO_JDBC_DEFAULT_DATABASE = "flink.bypass.default-database"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's odd to place GRAVITINO_JDBC_DEFAULT_DATABASE
here, could you remove it?
@Override | ||
public Map<String, String> toFlinkTableProperties( | ||
Map<String, String> gravitinoProperties, ObjectPath tablePath) { | ||
Map<String, String> tableOptions = new HashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If gravitinoProperties doesn't contain driver
, should we return the driver from catalog properties?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The flink JdbcCatalog will automatically infer the driver based on the URL.
protected abstract JdbcPropertiesConverter getConverter(Map<String, String> catalogOptions); | ||
|
||
@Test | ||
public void testToPaimonFileSystemCatalog() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
testToPaimonFileSystemCatalog
wrong name?
import org.junit.jupiter.api.Test; | ||
|
||
/** Test for {@link JdbcPropertiesConverter} */ | ||
public abstract class AbstractJdbcPropertiesConverter { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add sepcific test for driver? as the transform logic is complicated.
import org.junit.jupiter.api.Test; | ||
|
||
/** Test for {@link JdbcPropertiesConverter} */ | ||
public abstract class AbstractJdbcPropertiesConverter { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about rename to AbstractJdbcPropertiesConverterTestSuite
?
What changes were proposed in this pull request?
Provide flink jdbc catalog
Why are the changes needed?
close #6233
Does this PR introduce any user-facing change?
None
How was this patch tested?
org.apache.gravitino.flink.connector.integration.test.jdbc.FlinkJdbcMysqlCatalogIT
org.apache.gravitino.flink.connector.jdbc.TestJdbcPropertiesConverter