15

Querying Power BI REST API using Fabric Spark SQL

 10 months ago
source link: https://blog.gbrueckl.at/2023/08/querying-power-bi-rest-api-using-fabric-spark-sql/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

24 Replies to “Querying Power BI REST API using Fabric Spark SQL”

  1. Pingback: Querying the Power BI REST API from Fabric Spark – Curated SQL

  2. d33cddfbdd7739e9a28d87cabab8ff16?s=68&d=identicon&r=gTom on 2023-09-02 at 13:07 said:

    Hi Gerhard, thanks for your post. Where do I put spark.sql(f”SET token={pbi_access_token}”) ? When I use it in a notebook with PySpark Python or SQL I get error messages.
    Regards
    Tom

    • d9c97eb0d24f198b05d97393e8ce2176?s=39&d=identicon&r=gGerhard Brueckl on 2023-09-02 at 20:31 said:

      thats Python/PySpark code
      in my example I used a Python/PySpark notebook
      whenever I used SQL, I used the %%sql notebook magic

      • 58c8da9d45f95256085ca2d03fee31fe?s=39&d=identicon&r=gAnthony on 2023-09-05 at 20:20 said:

        I’m running into the same issue. When I run the 2nd line of code spark.sql(f”SET token={pbi_access_token}”), I get an error.

        NameError: name ‘pbi_access_token’ is not defined

        • d9c97eb0d24f198b05d97393e8ce2176?s=39&d=identicon&r=gGerhard Brueckl on 2023-09-06 at 09:51 said:

          sorry, seems like there was some copy&paste error on my side where I mixed up some variable names.
          I fixed the code and it should work again now

          • 58c8da9d45f95256085ca2d03fee31fe?s=39&d=identicon&r=gAnthony on 2023-09-06 at 18:55 said:

            That did the trick! Thank you so much Gerhard. As a long-time Power BI user and no-time pyspark user, this is amazing.

  3. 25cf70efb528aa8e9c53d2b468232bbd?s=68&d=identicon&r=gDavid on 2023-09-03 at 00:59 said:

    As a data science student I’m not sure how I can use this?
    to work with SQL using power BI data or vice versa?
    Any tips is appreciated. Thanks for sharing.

    • d9c97eb0d24f198b05d97393e8ce2176?s=39&d=identicon&r=gGerhard Brueckl on 2023-09-04 at 08:39 said:

      The idea is to simply query Power BI metadata which is only exposed via the REST API using simple SQL statements instead of writing the API calls on your own
      there are some use-cases for monitoring and operations I would say and for quick exploration of your PBI tenant

  4. 147ae0d3806b4ac55c90cee6f8905256?s=68&d=identicon&r=gJames Dawe on 2023-09-04 at 13:49 said:

    Do you have some examples of how to use the REST API to get Datasets outside of your Org and pull them in to be used as dataflows?

    • d9c97eb0d24f198b05d97393e8ce2176?s=39&d=identicon&r=gGerhard Brueckl on 2023-09-04 at 13:58 said:

      so do you want to query the Power BI metadata from a different org or e.g. send a DAX query to a dataset that resides in a different org?

      either way, the most challenging part will probably be authentication. I dont know if this is supported out of the box.
      What you can try though is replacing /myorg/ in your API calls with //
      You will still need to get an access token though and to be honest I am not quite sure how to do that for remote tenants

      • 147ae0d3806b4ac55c90cee6f8905256?s=39&d=identicon&r=gJimmy Dawe on 2023-09-04 at 14:02 said:

        We pull in data from other organisations, normally read account on the DB is fine but API’s are being pushed and I am really struggling with the REST API and how I can use it for those DBs that will not give me DB access and want me to use API.

        Thanks for the ideas, i will give it a play

        • d9c97eb0d24f198b05d97393e8ce2176?s=39&d=identicon&r=gGerhard Brueckl on 2023-09-04 at 14:09 said:

          guess this very much differs from what this blog post is about
          if I understood correctly, you want to connect from Power BI to those new REST APIs provided by the other organisations. Then you need to stick to the authentication their REST APIs provide

  5. 5c82ab71391682dde0b15449371b379e?s=68&d=identicon&r=gSteffen Scholz on 2023-09-20 at 17:45 said:

    Looking at the above I am thinking it must also be possible to get the activity events in this way, but I do not really understand what I need to change to the code to access these, can you maybe explain? Would be much appreciated!

  6. 766ca7932fd4428da262b84222a1a0b0?s=68&d=identicon&r=gScott on 2023-09-27 at 20:27 said:

    Can an AppID/ClientSecret be used? Or only OAUTH?

  7. 232796b31ce52b0645a0f006ce5ada92?s=68&d=identicon&r=gChristiano Chamma on 2023-10-20 at 21:28 said:

    I am so sorry, above comment lost identation and also lost a mention to insert Darren Gosbell’s code to be able to call admin APIs, removing only the “pip install” line because it resets the session. Darren Gosbell’s code would be inserted above that cell:
    spark.sql(f”SET pbi_admin_token={access_token}”)

    • d9c97eb0d24f198b05d97393e8ce2176?s=39&d=identicon&r=gGerhard Brueckl on 2023-10-23 at 13:58 said:

      can you update your other comment to also include Darren Gosbell’s code so its easier for others to use it
      I think its not quit clear which part of his code exactly you are referring to

  8. 232796b31ce52b0645a0f006ce5ada92?s=68&d=identicon&r=gChristiano Chamma on 2023-10-20 at 21:36 said:

    Great! Thank you very much!

    If anyone is interested, please see code below to call activityevents with pagination. Each — represents one notebook cell. Replace [tab] with identation.

    %pip install azure.identity

    pbi_access_token = mssparkutils.credentials.getToken(“https://analysis.windows.net/powerbi/api”)
    spark.sql(f”SET pbi_access_token={pbi_access_token}”)

    #function pbi_api modified to handle other arrays different than “value” and pagination
    import requests
    # make sure to support different versions of the API path passed to the function
    def get_api_path(path: str) -> str:
    [tab]base_path = “https://api.powerbi.com/v1.0/myorg/”
    [tab]base_items = list(filter(lambda x: x, base_path.split(“/”)))
    [tab]path_items = list(filter(lambda x: x, path.split(“/”)))
    [tab]index = path_items.index(base_items[-1]) if base_items[-1] in path_items else -1
    [tab]return base_path + “/”.join(path_items[index+1:])
    # call the api_path with the given token and return the list in the “value” property
    def pbi_api(api_path: str, token: str, jsonitem: str = “value”) -> object:
    [tab]continuationUri = “”
    [tab]continuationToken = “”
    [tab]continua = True
    [tab]countloop = 1
    [tab]currentpath = get_api_path(api_path)
    [tab]values = []

    [tab]while(continua):
    [tab][tab]#print(f”countloop={countloop}————————————————————–“)

    [tab][tab]result = requests.get(currentpath, headers = {“authorization”: “Bearer ” + token})

    [tab][tab]if not result.ok:
    [tab][tab][tab]return [{“status_code”: result.status_code, “error”: result.reason}]
    [tab][tab]json = result.json()
    [tab][tab]#if not jsonitem in json:
    [tab][tab]# return []
    [tab][tab]valuestemp = json[jsonitem]
    [tab][tab]for value in valuestemp:
    [tab][tab][tab]if “id” in value:
    [tab][tab][tab][tab]value[“apiPath”] = f”{api_path}/{value[‘id’]}”
    [tab][tab][tab]else:
    [tab][tab][tab][tab]value[“apiPath”] = f”{api_path}”
    [tab][tab]values += valuestemp

    [tab][tab]if “continuationUri” in json and json[“continuationUri”] is not None:
    [tab][tab][tab]continuationUri = json[“continuationUri”]
    [tab][tab][tab]#print(f”VAR continuationUri={continuationUri}”)
    [tab][tab][tab]currentpath = continuationUri
    [tab][tab]else:
    [tab][tab][tab]continua = False

    [tab][tab]if “continuationToken” in json and json[“continuationToken”] is not None:
    [tab][tab][tab]continuationToken = json[“continuationToken”]
    [tab][tab][tab]#print(f”VAR continuationToken={continuationToken}”)
    [tab][tab]countloop = countloop + 1
    [tab][tab]if countloop > 1000:
    [tab][tab][tab]return [{“status_code”: -1, “error”: “Something is wrong!”}]
    [tab]return values

    import pyspark.sql.functions as F
    import pyspark.sql.types as T
    # schema of the function output – an array of maps to make it work with all API outputs
    schema = T.ArrayType(
    [tab]T.MapType(T.StringType(), T.StringType())
    )
    # register the function for PySpark
    pbi_api_udf = F.udf(lambda api_path, token: pbi_api(api_path, token), schema)
    # register the function for SparkSQL
    spark.udf.register(“pbi_api_udf”, pbi_api_udf)

    #register another function modified to handle other arrays different than “value”
    import pyspark.sql.functions as F
    import pyspark.sql.types as T
    # schema of the function output – an array of maps to make it work with all API outputs
    schema = T.ArrayType(
    [tab]T.MapType(T.StringType(), T.StringType())
    )
    # register the function for PySpark
    pbi_api_udf2 = F.udf(lambda api_path, token, jsonitem: pbi_api(api_path, token, jsonitem), schema)
    # register the function for SparkSQL
    spark.udf.register(“pbi_api_udf2”, pbi_api_udf2)

    %%sql
    SELECT explode(pbi_api_udf(‘/groups’, ‘${pbi_access_token}’)) as workspace

    Insert here Darren Gosbell’s code to be able to call admin APIs, removing only the “pip install” line because it resets the session

    spark.sql(f”SET pbi_admin_token={access_token}”)

    %%sql
    SELECT explode(pbi_api_udf(‘admin/capacities/refreshables?$top=200’, ‘${pbi_admin_token}’)) AS refreshables

    %%sql
    –Note: it supports only 1 day at a time
    SELECT explode(pbi_api_udf2(“/admin/activityevents?startDateTime=’2023-10-18T00:00:00’&endDateTime=’2023-10-18T23:59:59′”, ‘${pbi_admin_token}’, ‘activityEventEntities’)) AS atividades

  9. 232796b31ce52b0645a0f006ce5ada92?s=68&d=identicon&r=gChristiano Chamma on 2023-10-23 at 16:50 said:

    Great! Thank you very much!

    If anyone is interested, please see code below to call activityevents API with pagination.
    Gerhard, could you please delete my previous comments? I am sorry for the various comments, I had to adjust some code due to the blog comment engine automatic formatting.

    Please note in the code below:
    1) Each “—” below represents one notebook cell, so split the code in multiple cells.
    2) Replace “[tab]” below with proper identation, because blog comment engine removes all identation.


    %pip install azure.identity

    import pyspark.sql.functions as F
    import pyspark.sql.types as T
    import json, requests, pandas as pd
    import datetime
    from azure.identity import ClientSecretCredential
    import requests

    # if you need to call admin APIs, then you have to obtain admin token:
    # Darren Gosbell’s code to be able to call admin APIs:
    #########################################################################################
    # Read secretes from Azure Key Vault
    #########################################################################################
    key_vault = “https://dgosbellKeyVault.vault.azure.net/”
    tenant = mssparkutils.credentials.getSecret(key_vault , “FabricTenantId”)
    client = mssparkutils.credentials.getSecret(key_vault , “FabricClientId”)
    client_secret = mssparkutils.credentials.getSecret(key_vault , “FabricClientSecret”)
    # if you dont have secrets in Azure Key Vault, which is not recommended by the way, simply fill the variables below:
    #tenant = “???”
    #client = “???”
    #client_secret = “???”
    #########################################################################################
    # Authentication (for admin APIs)
    #########################################################################################
    # Generates the access token for the Service Principal
    api = ‘https://analysis.windows.net/powerbi/api/.default’
    auth = ClientSecretCredential(authority = ‘https://login.microsoftonline.com/’, tenant_id = tenant, client_id = client, client_secret = client_secret)
    access_token = auth.get_token(api)
    access_token = access_token.token
    print(‘\nSuccessfully authenticated.’)
    #########################################################################################
    # If you would like to test, then you can call Get Refreshables
    #########################################################################################
    #base_url = ‘https://api.powerbi.com/v1.0/myorg/’
    #header = {‘Authorization’: f’Bearer {access_token}’}
    #refreshables_url = “admin/capacities/refreshables?$top=200”
    #refreshables_response = requests.get(base_url + refreshables_url, headers=header)
    #print(refreshables_response.content)

    #if you dont need to call admin APIs, then you can obtain access token instead of admin token:
    #access_token = mssparkutils.credentials.getToken(“https://analysis.windows.net/powerbi/api”)

    spark.sql(f”SET pbi_access_token={access_token}”)

    #function pbi_api modified from original article to handle other json items different than “value” and also to handle pagination.
    # make sure to support different versions of the API path passed to the function
    def get_api_path(path: str) -> str:
    [tab]base_path = “https://api.powerbi.com/v1.0/myorg/”
    [tab]base_items = list(filter(lambda x: x, base_path.split(“/”)))
    [tab]path_items = list(filter(lambda x: x, path.split(“/”)))
    [tab]index = path_items.index(base_items[-1]) if base_items[-1] in path_items else -1
    [tab]return base_path + “/”.join(path_items[index+1:])
    # call the api_path with the given token and return the list in the “value” property
    def pbi_api(api_path: str, token: str, jsonitem: str = “value”) -> object:
    [tab]continuationUri = “”
    [tab]continuationToken = “”
    [tab]continua = True
    [tab]countloop = 1
    [tab]currentpath = get_api_path(api_path)
    [tab]values = []

    [tab]while(continua):
    [tab][tab]#print(f”countloop={countloop}————————————————————–“)

    [tab][tab]result = requests.get(currentpath, headers = {“authorization”: “Bearer ” + token})

    [tab][tab]if not result.ok:
    [tab][tab][tab]return [{“status_code”: result.status_code, “error”: result.reason}]
    [tab][tab]json = result.json()
    [tab][tab]#if not jsonitem in json:
    [tab][tab]# return []
    [tab][tab]valuestemp = json[jsonitem]
    [tab][tab]for value in valuestemp:
    [tab][tab][tab]if “id” in value:
    [tab][tab][tab][tab]value[“apiPath”] = f”{api_path}/{value[‘id’]}”
    [tab][tab][tab]else:
    [tab][tab][tab][tab]value[“apiPath”] = f”{api_path}”
    [tab][tab]values += valuestemp

    [tab][tab]if “continuationUri” in json and json[“continuationUri”] is not None:
    [tab][tab][tab]continuationUri = json[“continuationUri”]
    [tab][tab][tab]#print(f”VAR continuationUri={continuationUri}”)
    [tab][tab][tab]currentpath = continuationUri
    [tab][tab]else:
    [tab][tab][tab]continua = False

    [tab][tab]if “continuationToken” in json and json[“continuationToken”] is not None:
    [tab][tab][tab]continuationToken = json[“continuationToken”]
    [tab][tab][tab]#print(f”VAR continuationToken={continuationToken}”)
    [tab][tab]countloop = countloop + 1
    [tab][tab]if countloop > 1000:
    [tab][tab][tab]return [{“status_code”: -1, “error”: “Something is wrong!”}]
    [tab]return values

    # schema of the function output – an array of maps to make it work with all API outputs
    schema = T.ArrayType(
    [tab]T.MapType(T.StringType(), T.StringType())
    )
    # register the function for PySpark
    pbi_api_udf = F.udf(lambda api_path, token: pbi_api(api_path, token), schema)
    # register the function for SparkSQL
    spark.udf.register(“pbi_api_udf”, pbi_api_udf)

    #register another function modified to handle other arrays different than “value”
    # schema of the function output – an array of maps to make it work with all API outputs
    schema = T.ArrayType(
    [tab]T.MapType(T.StringType(), T.StringType())
    )
    # register the function for PySpark
    pbi_api_udf2 = F.udf(lambda api_path, token, jsonitem: pbi_api(api_path, token, jsonitem), schema)
    # register the function for SparkSQL
    spark.udf.register(“pbi_api_udf2”, pbi_api_udf2)

    %%sql
    –Test udf with non-admin API:
    SELECT explode(pbi_api_udf(‘/groups’, ‘${pbi_access_token}’)) as workspace

    %%sql
    –Test udf with admin API:
    SELECT explode(pbi_api_udf(‘admin/capacities/refreshables?$top=200’, ‘${pbi_access_token}’)) AS refreshables

    %%sql
    –Test udf with admin API with pagination and also returning activityEventEntities json items:
    –Note: This API supports only 1 day at a time
    SELECT explode(pbi_api_udf2(“/admin/activityevents?startDateTime=’2023-10-18T00:00:00’&endDateTime=’2023-10-18T23:59:59′”, ‘${pbi_access_token}’, ‘activityEventEntities’)) AS atividades

  10. 324c65e0f5d0c1e68e64b410728735fb?s=68&d=identicon&r=gJordan on 2023-11-03 at 16:30 said:

    Interesting to see all the different approaches to working with the Power BI REST API in notebooks.

    My attempt at calling the admin APIs (specifically the Activity Events API and dumping the results into a Delta table is here: https://github.com/klinejordan/fabric-tenant-admin-notebooks/blob/main/Fabric%20Admin%20Activities.ipynb

    Have not had much like with Spark UDFs calling more complex APIs like the Scanner API as they don’t seem to be able to handle polling very well but that could just be my inexperience with Spark

Leave a Reply Cancel reply

Your email address will not be published. Required fields are marked *

Comment *

Name *

Email *

Website

Notify me of follow-up comments by email.

Notify me of new posts by email.


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK