package spark
Package Members
- package api
Provides classes and code snippets to simplify usage of Spatial APIs in Location Intelligence SDK For Big Data.
Provides classes and code snippets to simplify usage of Spatial APIs in Location Intelligence SDK For Big Data.
For more information regarding the Usage Guide or API Docs, follow the below links:
Geo Spatial SDKs
Location Intelligence SDK For Big Data User Guide- Starting a spark session:
import org.apache.spark.sql.SparkSession // The config is required to set run the legacy UDFs used in the Spatial APIs. val session = SparkSession.builder.appName("Example") .master("yarn") .getOrCreate;
- Using DownloadManager capable of downloading remote resources (files like TAB, SHAPE, etc.) to a node local path.
Supports downloading from HDFS, S3, Google Storage or Local (default).
NOTE: For Downloading from S3 or Google Storage, you need to either have respective environment variables like AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY or provide configurations in spark session.
import com.pb.downloadmanager.api.downloaders.LocalFilePassthroughDownloader import com.pb.downloadmanager.api.downloaders.hadoop.{HDFSDownloader, S3Downloader, GoogleDownloader} import com.pb.downloadmanager.api.{DownloadManagerBuilder} val downloadManager = new DownloadManagerBuilder("/home/hadoop/data") .addDownloader(new S3Downloader(session.sparkContext.hadoopConfiguration)) .addDownloader(new GoogleDownloader(session.sparkContext.hadoopConfiguration)) .addDownloader(new HDFSDownloader(session.sparkContext.hadoopConfiguration)) .addDownloader(new LocalFilePassthroughDownloader()) .build()
- PointInPolygon Operation:
import com.precisely.bigdata.li.spark.api.SpatialAPI val pointInPolygonDF = SpatialAPI.pointInPolygon(inputDF = inputDF, tableFileType = tabFileType, tableFilePath = tableFilePath, tableFileName = tableFileName, libraries = libraries, longitude = longitude, latitude = latitude, includeEmptySearchResults = true, outputFields = outputFields, downloadManager = downloadManager ) // Defaults: // downloadManager = null, libraries = null, includeEmptySearchResults = true
- SearchNearest Operation:
import com.precisely.bigdata.li.spark.api.SpatialAPI val searchNearestDF = SpatialAPI.searchNearest(inputDF = fabricDF, tableFileType = tableFileType, tableFilePath = tableFilePath, tableFileName = tableFileName, libraries = libraries, maxCandidates = maxCandidates, distanceValue = distanceValue, distanceUnit = distanceUnit, distanceColumnName = distanceColumnName, geometryStringType = geometryStringType, geometryColumnName = geometryColumnName, includeEmptySearchResults = includeEmptySearchResults, outputFields = outputFields, downloadManager = downloadManager ) // Defaults: // distanceColumnName = "distance", downloadManager = null, libraries = null, includeEmptySearchResults = true, maxCandidates = 1000
- JoinByDistance Operation:
import com.precisely.bigdata.li.spark.api.SpatialAPI import com.precisely.bigdata.li.spark.api.util.DistanceJoinOption.DistanceJoinOption import com.precisely.bigdata.li.spark.api.util.LimitMethods val joinedDF = SpatialAPI.joinByDistance(df1 = df1, df2 = df2, df1Longitude = longitude1, df1Latitude = latitude1, df2Longitude = longitude2, df2Latitude = latitude2, searchRadius = searchRadius, distanceUnit = distanceUnit, geoHashPrecision = geoHashPrecision, options = Map( DistanceJoinOption.DistanceColumnName -> distanceColumnName, DistanceJoinOption.LimitMatches -> limit, DistanceJoinOption.LimitMethod -> LimitMethods.RowNumber ) ) // Defaults: // geoHashPrecision = 7, options = null
- HexagonGeneration Operation:
import com.precisely.bigdata.li.spark.api.SpatialAPI val hexGenDF = SpatialAPI.generateHexagon( sparkSession = session, minLongitude = minLongitude, minLatitude = minLatitude, maxLongitude = maxLongitude, maxLatitude = maxLatitude, hexLevel = hexLevel, containerLevel = containerLevel, numOfPartitions = numOfPartitions, maximumNumOfRowsPerPartition = maxNumberOfRows ) // Defaults: // hexLevel = 1, containerLevel = 1, numOfPartitions = 1, maximumNumOfRowsPerPartition = 1
- Registering the SQL Functions as UDFs:
import com.precisely.bigdata.li.spark.api.udf.SQLRegistrator SQLRegistrator.registerAll(); inputDF.createOrReplaceTempView("inputTable") val pointGeometry = spark.sql("SELECT ST_Point(X, Y, 'epsg:4326') as point_geom, * from inputTable") val wktGeometry = spark.sql("SELECT ST_GeomFromWKT(WKT) as geom, * from inputTable") // Currently Available SQL Functions // ST_Point, ST_GeomFromWKT, ST_GeomFromWKB, ST_GeomFromKML, ST_GeomFromGeoJSON, // ST_ToGeoJSON, ST_ToKML, ST_ToWKB, ST_ToWKT, ST_Buffer, ST_Union, ST_Transform, // ST_Intersection, ST_ConvexHull, ST_Within, ST_Disjoint, ST_Intersects, // ST_IsNullGeom, ST_Overlaps, ST_GeoHash, ST_GeoHashBoundary, ST_HexHash, ST_HexHashBoundary, // ST_SquareHash, ST_SquareHashBoundary, ST_X, ST_XMax, ST_XMin, ST_Y, ST_YMax, ST_YMin, // ST_Area, ST_Distance, ST_Length, ST_Perimeter