3

[WAYANG-25] SQL API seed · apache/incubator-wayang@55e972c · GitHub

 2 years ago
source link: https://github.com/apache/incubator-wayang/commit/55e972c4966d4e060e545d41b609217b2acf2d36
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

[WAYANG-25] SQL API seed · apache/incubator-wayang@55e972c · GitHub

Permalink

Browse files

[WAYANG-25] SQL API seed
Signed-off-by: bertty <[email protected]>

berttty committed 13 days ago

Verified

1 parent a91bd36 commit 55e972c4966d4e060e545d41b609217b2acf2d36
Showing with 1,721 additions and 0 deletions.

@@ -38,6 +38,7 @@

<modules>

<module>wayang-api-scala-java</module>

<module>wayang-api-python</module>

<module>wayang-api-sql</module>

</modules>

</project>

@@ -0,0 +1,43 @@

<?xml version="1.0" encoding="UTF-8"?>

<!--

~ Licensed to the Apache Software Foundation (ASF) under one

~ or more contributor license agreements. See the NOTICE file

~ distributed with this work for additional information

~ regarding copyright ownership. The ASF licenses this file

~ to you under the Apache License, Version 2.0 (the

~ "License"); you may not use this file except in compliance

~ with the License. You may obtain a copy of the License at

~ http://www.apache.org/licenses/LICENSE-2.0

~ Unless required by applicable law or agreed to in writing,

~ software distributed under the License is distributed on an

~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

~ KIND, either express or implied. See the License for the

~ specific language governing permissions and limitations

~ under the License.

-->

<project xmlns="http://maven.apache.org/POM/4.0.0"

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

<parent>

<artifactId>wayang-api</artifactId>

<groupId>org.apache.wayang</groupId>

<version>0.6.1-SNAPSHOT</version>

</parent>

<modelVersion>4.0.0</modelVersion>

<artifactId>wayang-api-sql</artifactId>

<dependencies>

<dependency>

<groupId>org.apache.calcite</groupId>

<artifactId>calcite-core</artifactId>

<version>1.29.0</version>

</dependency>

<dependency>

<groupId>org.apache.calcite</groupId>

<artifactId>calcite-linq4j</artifactId>

<version>1.29.0</version>

</dependency>

</dependencies>

</project>

@@ -0,0 +1,22 @@

/*

* Licensed to the Apache Software Foundation (ASF) under one

* or more contributor license agreements. See the NOTICE file

* distributed with this work for additional information

* regarding copyright ownership. The ASF licenses this file

* to you under the Apache License, Version 2.0 (the

* "License"); you may not use this file except in compliance

* with the License. You may obtain a copy of the License at

* http://www.apache.org/licenses/LICENSE-2.0

* Unless required by applicable law or agreed to in writing,

* software distributed under the License is distributed on an

* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

* KIND, either express or implied. See the License for the

* specific language governing permissions and limitations

* under the License.

*/

package org.apache.wayang.api.sql;

public class WayangSQL {

}

@@ -0,0 +1,81 @@

/*

* Licensed to the Apache Software Foundation (ASF) under one

* or more contributor license agreements. See the NOTICE file

* distributed with this work for additional information

* regarding copyright ownership. The ASF licenses this file

* to you under the Apache License, Version 2.0 (the

* "License"); you may not use this file except in compliance

* with the License. You may obtain a copy of the License at

* http://www.apache.org/licenses/LICENSE-2.0

* Unless required by applicable law or agreed to in writing,

* software distributed under the License is distributed on an

* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

* KIND, either express or implied. See the License for the

* specific language governing permissions and limitations

* under the License.

*/

package org.apache.wayang.api.sql.converter;

import org.apache.calcite.adapter.enumerable.EnumerableRel;

import org.apache.calcite.adapter.enumerable.EnumerableRelImplementor;

import org.apache.calcite.adapter.enumerable.JavaRowFormat;

import org.apache.calcite.adapter.enumerable.PhysType;

import org.apache.calcite.adapter.enumerable.PhysTypeImpl;

import org.apache.calcite.linq4j.tree.BlockBuilder;

import org.apache.calcite.linq4j.tree.Expressions;

import org.apache.calcite.plan.ConventionTraitDef;

import org.apache.calcite.plan.RelOptCluster;

import org.apache.calcite.plan.RelTraitSet;

import org.apache.calcite.rel.RelNode;

import org.apache.calcite.rel.convert.ConverterImpl;

import org.apache.calcite.runtime.Hook;

import org.apache.calcite.util.BuiltInMethod;

import java.util.List;

import org.apache.wayang.api.sql.relation.node.WayangRel;

public class WayangToEnumerableConverter extends ConverterImpl

implements EnumerableRel {

/** Creates a PigToEnumerableConverter. */

protected WayangToEnumerableConverter(

RelOptCluster cluster,

RelTraitSet traits,

RelNode input) {

super(cluster, ConventionTraitDef.INSTANCE, traits, input);

}

@Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {

return new WayangToEnumerableConverter(

getCluster(), traitSet, sole(inputs));

}

/**

* {@inheritDoc}

*

* <p>This implementation does not actually execute the associated Pig Latin

* script and return results. Instead it returns an empty

* {@link org.apache.calcite.adapter.enumerable.EnumerableRel.Result}

* in order to allow for testing and verification of every step of query

* processing up to actual physical execution and result verification.

*

* <p>Next step is to invoke Pig from here, likely in local mode, have it

* store results in a predefined file so they can be read here and returned as

* a {@code Result} object.

*/

public Result implement(EnumerableRelImplementor implementor, Prefer pref) {

final BlockBuilder list = new BlockBuilder();

final PhysType physType =

PhysTypeImpl.of(implementor.getTypeFactory(), rowType,

pref.prefer(JavaRowFormat.ARRAY));

WayangRel.Implementor impl = new WayangRel.Implementor();

impl.visitChild(0, getInput());

Hook.QUERY_PLAN.run(impl.getStatements().count()); // for script validation in tests

list.add(

Expressions.return_(null,

Expressions.call(

BuiltInMethod.EMPTY_ENUMERABLE.method)));

return implementor.result(physType, list.toBlock());

}

}

@@ -0,0 +1,40 @@

/*

* Licensed to the Apache Software Foundation (ASF) under one

* or more contributor license agreements. See the NOTICE file

* distributed with this work for additional information

* regarding copyright ownership. The ASF licenses this file

* to you under the Apache License, Version 2.0 (the

* "License"); you may not use this file except in compliance

* with the License. You may obtain a copy of the License at

* http://www.apache.org/licenses/LICENSE-2.0

* Unless required by applicable law or agreed to in writing,

* software distributed under the License is distributed on an

* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

* KIND, either express or implied. See the License for the

* specific language governing permissions and limitations

* under the License.

*/

package org.apache.wayang.api.sql.converter;

import org.apache.calcite.adapter.enumerable.EnumerableConvention;

import org.apache.calcite.plan.RelTraitSet;

import org.apache.calcite.rel.RelNode;

import org.apache.calcite.rel.convert.ConverterRule;

import org.apache.wayang.api.sql.relation.node.WayangRel;

public class WayangToEnumerableConverterRule extends ConverterRule {

public static final ConverterRule INSTANCE =

new WayangToEnumerableConverterRule();

private WayangToEnumerableConverterRule() {

super(RelNode.class, WayangRel.CONVENTION, EnumerableConvention.INSTANCE,

"WayangToEnumerableConverterRule");

}

@Override public RelNode convert(RelNode rel) {

RelTraitSet newTraitSet = rel.getTraitSet().replace(getOutConvention());

return new WayangToEnumerableConverter(rel.getCluster(), newTraitSet, rel);

}

}

@@ -0,0 +1,53 @@

/*

* Licensed to the Apache Software Foundation (ASF) under one

* or more contributor license agreements. See the NOTICE file

* distributed with this work for additional information

* regarding copyright ownership. The ASF licenses this file

* to you under the Apache License, Version 2.0 (the

* "License"); you may not use this file except in compliance

* with the License. You may obtain a copy of the License at

* http://www.apache.org/licenses/LICENSE-2.0

* Unless required by applicable law or agreed to in writing,

* software distributed under the License is distributed on an

* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

* KIND, either express or implied. See the License for the

* specific language governing permissions and limitations

* under the License.

*/

package org.apache.wayang.api.sql.executor;

import java.net.URL;

import java.net.URLDecoder;

import java.sql.*;

import java.util.Properties;

public class Client {

public static void main(String[] args) {

try {

/**

* Using files

* */

URL url = Client.class.getResource("/WayangModel.json");

String str = URLDecoder.decode(url.toString(), "UTF-8");

System.out.println(str);

Properties info = new Properties();

info.put("model", str.replace("file:", ""));

Connection connection = DriverManager.getConnection("jdbc:calcite:", info);

Statement statement = connection.createStatement();

//ResultSet resultSet = statement.executeQuery("select count(*) from (values (1), (2))");

//ResultSet resultSet = statement.executeQuery("select count(*) from (values (1), (2))");

ResultSet resultSet = statement.executeQuery("select * from (values (1), (2)) where EXPR$0 > 1");

System.out.println(resultSet.getMetaData().getColumnName(1));

while (resultSet.next()) {

System.out.println("data => ");

System.out.println(resultSet.getObject(1));

}

} catch (Exception e) {

e.printStackTrace();

}

}

}

@@ -0,0 +1,79 @@

/*

* Licensed to the Apache Software Foundation (ASF) under one

* or more contributor license agreements. See the NOTICE file

* distributed with this work for additional information

* regarding copyright ownership. The ASF licenses this file

* to you under the Apache License, Version 2.0 (the

* "License"); you may not use this file except in compliance

* with the License. You may obtain a copy of the License at

* http://www.apache.org/licenses/LICENSE-2.0

* Unless required by applicable law or agreed to in writing,

* software distributed under the License is distributed on an

* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

* KIND, either express or implied. See the License for the

* specific language governing permissions and limitations

* under the License.

*/

package org.apache.wayang.api.sql.executor;

import org.apache.calcite.linq4j.Enumerator;

import org.apache.calcite.util.Source;

import java.io.BufferedReader;

import java.io.IOException;

/**

* Data output

*/

public class CustomEnumerator<E> implements Enumerator<E> {

private E current;

private BufferedReader br;

public CustomEnumerator(Source source) {

try {

this.br = new BufferedReader(source.reader());

} catch (IOException e) {

e.printStackTrace();

}

}

@Override

public E current() {

return current;

}

@Override

public boolean moveNext() {

try {

String line = br.readLine();

if(line == null){

return false;

}

current = (E)new Object[]{line}; // If there are multiple columns, here are multiple values

} catch (IOException e) {

e.printStackTrace();

return false;

}

return true;

}

/**

* Anomalies go here

* */

@Override

public void reset() {

System.out.println("Reported a wrong brother, does not support this operation");

}

/**

* InputStream stream is closed here

* */

@Override

public void close() {

}

}

@@ -0,0 +1,49 @@

/*

* Licensed to the Apache Software Foundation (ASF) under one

* or more contributor license agreements. See the NOTICE file

* distributed with this work for additional information

* regarding copyright ownership. The ASF licenses this file

* to you under the Apache License, Version 2.0 (the

* "License"); you may not use this file except in compliance

* with the License. You may obtain a copy of the License at

* http://www.apache.org/licenses/LICENSE-2.0

* Unless required by applicable law or agreed to in writing,

* software distributed under the License is distributed on an

* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

* KIND, either express or implied. See the License for the

* specific language governing permissions and limitations

* under the License.

*/

package org.apache.wayang.api.sql.executor;

import com.google.common.collect.ImmutableMap;

import org.apache.calcite.schema.*;

import org.apache.calcite.schema.impl.AbstractSchema;

import org.apache.calcite.util.Source;

import org.apache.calcite.util.Sources;

import java.net.URL;

import java.util.Map;

/**

* Similar to the database, Schema represents the database

* */

public class CustomSchema extends AbstractSchema {

private Map<String, Table> tableMap;

@Override

protected Map<String, Table> getTableMap() {

URL url = CustomSchema.class.getResource("/data.csv");

Source source = Sources.of(url);

if (tableMap == null) {

final ImmutableMap.Builder<String, Table> builder = ImmutableMap.builder();

// A database has multiple table names, initialized here, the case should be noted, TEST01 is the table name.

builder.put("TEST01", new CustomTable(source));

tableMap = builder.build();

}

return tableMap;

}

}


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK