[WAYANG-25] SQL API seed · apache/incubator-wayang@55e972c · GitHub
source link: https://github.com/apache/incubator-wayang/commit/55e972c4966d4e060e545d41b609217b2acf2d36
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
[WAYANG-25] SQL API seed · apache/incubator-wayang@55e972c · GitHub
Signed-off-by: bertty <[email protected]>
@@ -38,6 +38,7 @@
<modules>
<module>wayang-api-scala-java</module>
<module>wayang-api-python</module>
<module>wayang-api-sql</module>
</modules>
</project>
@@ -0,0 +1,43 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~ http://www.apache.org/licenses/LICENSE-2.0
~ Unless required by applicable law or agreed to in writing,
~ software distributed under the License is distributed on an
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
~ KIND, either express or implied. See the License for the
~ specific language governing permissions and limitations
~ under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>wayang-api</artifactId>
<groupId>org.apache.wayang</groupId>
<version>0.6.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>wayang-api-sql</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.calcite</groupId>
<artifactId>calcite-core</artifactId>
<version>1.29.0</version>
</dependency>
<dependency>
<groupId>org.apache.calcite</groupId>
<artifactId>calcite-linq4j</artifactId>
<version>1.29.0</version>
</dependency>
</dependencies>
</project>
@@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.wayang.api.sql;
public class WayangSQL {
}
@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.wayang.api.sql.converter;
import org.apache.calcite.adapter.enumerable.EnumerableRel;
import org.apache.calcite.adapter.enumerable.EnumerableRelImplementor;
import org.apache.calcite.adapter.enumerable.JavaRowFormat;
import org.apache.calcite.adapter.enumerable.PhysType;
import org.apache.calcite.adapter.enumerable.PhysTypeImpl;
import org.apache.calcite.linq4j.tree.BlockBuilder;
import org.apache.calcite.linq4j.tree.Expressions;
import org.apache.calcite.plan.ConventionTraitDef;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.convert.ConverterImpl;
import org.apache.calcite.runtime.Hook;
import org.apache.calcite.util.BuiltInMethod;
import java.util.List;
import org.apache.wayang.api.sql.relation.node.WayangRel;
public class WayangToEnumerableConverter extends ConverterImpl
implements EnumerableRel {
/** Creates a PigToEnumerableConverter. */
protected WayangToEnumerableConverter(
RelOptCluster cluster,
RelTraitSet traits,
RelNode input) {
super(cluster, ConventionTraitDef.INSTANCE, traits, input);
}
@Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
return new WayangToEnumerableConverter(
getCluster(), traitSet, sole(inputs));
}
/**
* {@inheritDoc}
*
* <p>This implementation does not actually execute the associated Pig Latin
* script and return results. Instead it returns an empty
* {@link org.apache.calcite.adapter.enumerable.EnumerableRel.Result}
* in order to allow for testing and verification of every step of query
* processing up to actual physical execution and result verification.
*
* <p>Next step is to invoke Pig from here, likely in local mode, have it
* store results in a predefined file so they can be read here and returned as
* a {@code Result} object.
*/
public Result implement(EnumerableRelImplementor implementor, Prefer pref) {
final BlockBuilder list = new BlockBuilder();
final PhysType physType =
PhysTypeImpl.of(implementor.getTypeFactory(), rowType,
pref.prefer(JavaRowFormat.ARRAY));
WayangRel.Implementor impl = new WayangRel.Implementor();
impl.visitChild(0, getInput());
Hook.QUERY_PLAN.run(impl.getStatements().count()); // for script validation in tests
list.add(
Expressions.return_(null,
Expressions.call(
BuiltInMethod.EMPTY_ENUMERABLE.method)));
return implementor.result(physType, list.toBlock());
}
}
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.wayang.api.sql.converter;
import org.apache.calcite.adapter.enumerable.EnumerableConvention;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.convert.ConverterRule;
import org.apache.wayang.api.sql.relation.node.WayangRel;
public class WayangToEnumerableConverterRule extends ConverterRule {
public static final ConverterRule INSTANCE =
new WayangToEnumerableConverterRule();
private WayangToEnumerableConverterRule() {
super(RelNode.class, WayangRel.CONVENTION, EnumerableConvention.INSTANCE,
"WayangToEnumerableConverterRule");
}
@Override public RelNode convert(RelNode rel) {
RelTraitSet newTraitSet = rel.getTraitSet().replace(getOutConvention());
return new WayangToEnumerableConverter(rel.getCluster(), newTraitSet, rel);
}
}
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.wayang.api.sql.executor;
import java.net.URL;
import java.net.URLDecoder;
import java.sql.*;
import java.util.Properties;
public class Client {
public static void main(String[] args) {
try {
/**
* Using files
* */
URL url = Client.class.getResource("/WayangModel.json");
String str = URLDecoder.decode(url.toString(), "UTF-8");
System.out.println(str);
Properties info = new Properties();
info.put("model", str.replace("file:", ""));
Connection connection = DriverManager.getConnection("jdbc:calcite:", info);
Statement statement = connection.createStatement();
//ResultSet resultSet = statement.executeQuery("select count(*) from (values (1), (2))");
//ResultSet resultSet = statement.executeQuery("select count(*) from (values (1), (2))");
ResultSet resultSet = statement.executeQuery("select * from (values (1), (2)) where EXPR$0 > 1");
System.out.println(resultSet.getMetaData().getColumnName(1));
while (resultSet.next()) {
System.out.println("data => ");
System.out.println(resultSet.getObject(1));
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.wayang.api.sql.executor;
import org.apache.calcite.linq4j.Enumerator;
import org.apache.calcite.util.Source;
import java.io.BufferedReader;
import java.io.IOException;
/**
* Data output
*/
public class CustomEnumerator<E> implements Enumerator<E> {
private E current;
private BufferedReader br;
public CustomEnumerator(Source source) {
try {
this.br = new BufferedReader(source.reader());
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public E current() {
return current;
}
@Override
public boolean moveNext() {
try {
String line = br.readLine();
if(line == null){
return false;
}
current = (E)new Object[]{line}; // If there are multiple columns, here are multiple values
} catch (IOException e) {
e.printStackTrace();
return false;
}
return true;
}
/**
* Anomalies go here
* */
@Override
public void reset() {
System.out.println("Reported a wrong brother, does not support this operation");
}
/**
* InputStream stream is closed here
* */
@Override
public void close() {
}
}
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.wayang.api.sql.executor;
import com.google.common.collect.ImmutableMap;
import org.apache.calcite.schema.*;
import org.apache.calcite.schema.impl.AbstractSchema;
import org.apache.calcite.util.Source;
import org.apache.calcite.util.Sources;
import java.net.URL;
import java.util.Map;
/**
* Similar to the database, Schema represents the database
* */
public class CustomSchema extends AbstractSchema {
private Map<String, Table> tableMap;
@Override
protected Map<String, Table> getTableMap() {
URL url = CustomSchema.class.getResource("/data.csv");
Source source = Sources.of(url);
if (tableMap == null) {
final ImmutableMap.Builder<String, Table> builder = ImmutableMap.builder();
// A database has multiple table names, initialized here, the case should be noted, TEST01 is the table name.
builder.put("TEST01", new CustomTable(source));
tableMap = builder.build();
}
return tableMap;
}
}
Recommend
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK