Run PySpark script from command line - Run Hello World Program from command line
In previous session we developed Hello World PySpark program and used pyspark interpreter to run the program. The pyspark interpreter is used to run program by typing it on console and it is executed on the Spark cluster.
The pyspark console is useful for development of application where programmers can write code and see the results immediately. If there is any code error he/she can update and test the code. This mode is very good for development. But for running on production spark-submit job is used which connects to the Spark cluster and runs the saved code file (.py).
After development and testing of the code developers saves into .py file and then it can be submitted to Spark cluster with spark-submit script.
What is spark-submit Script?
The spark-submit script is distributed with the Spark distribution and it is accessible from the bin directory of Spark. This tool is used to submit Java, Scala, PySpark, R and TensorFlow job on the distributed Spark cluster.
In our tutorial we have one node cluster so the program will be executed on the same machine.
To run the program with spark-submit script we have to manually create the SparkContext object in the program. If its not created manually then it will throw NameError: name 'sc' is not defined error.
Required code is:
from pyspark import SparkContext sc = SparkContext("local", "Hello World App")
Create a new file (helloworld.py) in the bin directory of Spark and add following code:
from pyspark import SparkContext from operator import add from pyspark import SparkContext sc = SparkContext("local", "Hello World App") data = sc.parallelize(list("Hello World")) counts = data.map(lambda x: (x, 1)).reduceByKey(add).sortBy(lambda x: x[1], ascending=False).collect() for (word, count) in counts: print("{}: {}".format(word, count))
Save file and exit from the vi editor (if you are using vi). You can use any of the text editor tool in Ubuntu.
To run the program type following command on the terminal:
deepak@deepak-VirtualBox:~/spark/spark-2.3.0-bin-hadoop2.7/bin$ ./spark-submit helloworld.py
This will submit the job on the Spark standalone cluster and display following result:
deepak@deepak-VirtualBox:~/spark/spark-2.3.0-bin-hadoop2.7/bin$ clear deepak@deepak-VirtualBox:~/spark/spark-2.3.0-bin-hadoop2.7/bin$ ./spark-submit helloworld.py 2018-04-29 17:56:45 WARN Utils:66 - Your hostname, deepak-VirtualBox resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3) 2018-04-29 17:56:45 WARN Utils:66 - Set SPARK_LOCAL_IP if you need to bind to another address 2018-04-29 17:56:45 WARN NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 2018-04-29 17:56:46 INFO SparkContext:54 - Running Spark version 2.3.0 2018-04-29 17:56:46 INFO SparkContext:54 - Submitted application: Hello World App 2018-04-29 17:56:46 INFO SecurityManager:54 - Changing view acls to: deepak 2018-04-29 17:56:46 INFO SecurityManager:54 - Changing modify acls to: deepak 2018-04-29 17:56:46 INFO SecurityManager:54 - Changing view acls groups to: 2018-04-29 17:56:46 INFO SecurityManager:54 - Changing modify acls groups to: 2018-04-29 17:56:46 INFO SecurityManager:54 - SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(deepak); groups with view permissions: Set(); users with modify permissions: Set(deepak); groups with modify permissions: Set() 2018-04-29 17:56:46 INFO Utils:54 - Successfully started service 'sparkDriver' on port 41270. 2018-04-29 17:56:46 INFO SparkEnv:54 - Registering MapOutputTracker 2018-04-29 17:56:46 INFO SparkEnv:54 - Registering BlockManagerMaster 2018-04-29 17:56:46 INFO BlockManagerMasterEndpoint:54 - Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information 2018-04-29 17:56:46 INFO BlockManagerMasterEndpoint:54 - BlockManagerMasterEndpoint up 2018-04-29 17:56:46 INFO DiskBlockManager:54 - Created local directory at /tmp/blockmgr-4b2dd654-bff5-415d-81e9-ecb4440fc019 2018-04-29 17:56:46 INFO MemoryStore:54 - MemoryStore started with capacity 366.3 MB 2018-04-29 17:56:46 INFO SparkEnv:54 - Registering OutputCommitCoordinator 2018-04-29 17:56:46 INFO log:192 - Logging initialized @2144ms 2018-04-29 17:56:46 INFO Server:346 - jetty-9.3.z-SNAPSHOT 2018-04-29 17:56:46 INFO Server:414 - Started @2268ms 2018-04-29 17:56:46 INFO AbstractConnector:278 - Started ServerConnector@65fc67fa{HTTP/1.1,[http/1.1]}{0.0.0.0:4040} 2018-04-29 17:56:46 INFO Utils:54 - Successfully started service 'SparkUI' on port 4040. 2018-04-29 17:56:46 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@34818b7a{/jobs,null,AVAILABLE,@Spark} 2018-04-29 17:56:46 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@40799694{/jobs/json,null,AVAILABLE,@Spark} 2018-04-29 17:56:46 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@71365737{/jobs/job,null,AVAILABLE,@Spark} 2018-04-29 17:56:46 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@4b666305{/jobs/job/json,null,AVAILABLE,@Spark} 2018-04-29 17:56:46 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@472781cb{/stages,null,AVAILABLE,@Spark} 2018-04-29 17:56:46 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@4ebe9f7{/stages/json,null,AVAILABLE,@Spark} 2018-04-29 17:56:46 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@3503c726{/stages/stage,null,AVAILABLE,@Spark} 2018-04-29 17:56:46 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@6d28fe16{/stages/stage/json,null,AVAILABLE,@Spark} 2018-04-29 17:56:46 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@46392e62{/stages/pool,null,AVAILABLE,@Spark} 2018-04-29 17:56:46 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@24f8a1d9{/stages/pool/json,null,AVAILABLE,@Spark} 2018-04-29 17:56:46 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@2acad461{/storage,null,AVAILABLE,@Spark} 2018-04-29 17:56:46 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@b84760{/storage/json,null,AVAILABLE,@Spark} 2018-04-29 17:56:46 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@ffd232b{/storage/rdd,null,AVAILABLE,@Spark} 2018-04-29 17:56:46 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@6eeaad64{/storage/rdd/json,null,AVAILABLE,@Spark} 2018-04-29 17:56:46 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@12ff1504{/environment,null,AVAILABLE,@Spark} 2018-04-29 17:56:46 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@503ed37c{/environment/json,null,AVAILABLE,@Spark} 2018-04-29 17:56:46 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@36bc462b{/executors,null,AVAILABLE,@Spark} 2018-04-29 17:56:46 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@d064c15{/executors/json,null,AVAILABLE,@Spark} 2018-04-29 17:56:46 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@67bb2f84{/executors/threadDump,null,AVAILABLE,@Spark} 2018-04-29 17:56:46 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@41b8a53c{/executors/threadDump/json,null,AVAILABLE,@Spark} 2018-04-29 17:56:46 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@15e13e3e{/static,null,AVAILABLE,@Spark} 2018-04-29 17:56:46 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@5ff0a0cc{/,null,AVAILABLE,@Spark} 2018-04-29 17:56:46 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@11765f4d{/api,null,AVAILABLE,@Spark} 2018-04-29 17:56:46 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@6e010294{/jobs/job/kill,null,AVAILABLE,@Spark} 2018-04-29 17:56:46 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@3ca086de{/stages/stage/kill,null,AVAILABLE,@Spark} 2018-04-29 17:56:46 INFO SparkUI:54 - Bound SparkUI to 0.0.0.0, and started at http://10.0.2.15:4040 2018-04-29 17:56:47 INFO SparkContext:54 - Added file file:/home/deepak/spark/spark-2.3.0-bin-hadoop2.7/bin/helloworld.py at file:/home/deepak/spark/spark-2.3.0-bin-hadoop2.7/bin/helloworld.py with timestamp 1525004807219 2018-04-29 17:56:47 INFO Utils:54 - Copying /home/deepak/spark/spark-2.3.0-bin-hadoop2.7/bin/helloworld.py to /tmp/spark-a4e5f638-8582-4976-a29f-2fcbc7f1a046/userFiles-bb2ea523-cffd-406c-8a23-d37e238742ac/helloworld.py 2018-04-29 17:56:47 INFO Executor:54 - Starting executor ID driver on host localhost 2018-04-29 17:56:47 INFO Utils:54 - Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 41058. 2018-04-29 17:56:47 INFO NettyBlockTransferService:54 - Server created on 10.0.2.15:41058 2018-04-29 17:56:47 INFO BlockManager:54 - Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy 2018-04-29 17:56:47 INFO BlockManagerMaster:54 - Registering BlockManager BlockManagerId(driver, 10.0.2.15, 41058, None) 2018-04-29 17:56:47 INFO BlockManagerMasterEndpoint:54 - Registering block manager 10.0.2.15:41058 with 366.3 MB RAM, BlockManagerId(driver, 10.0.2.15, 41058, None) 2018-04-29 17:56:47 INFO BlockManagerMaster:54 - Registered BlockManager BlockManagerId(driver, 10.0.2.15, 41058, None) 2018-04-29 17:56:47 INFO BlockManager:54 - Initialized BlockManager: BlockManagerId(driver, 10.0.2.15, 41058, None) 2018-04-29 17:56:47 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@411003bc{/metrics/json,null,AVAILABLE,@Spark} 2018-04-29 17:56:48 INFO SparkContext:54 - Starting job: collect at /home/deepak/spark/spark-2.3.0-bin-hadoop2.7/bin/helloworld.py:10 2018-04-29 17:56:48 INFO DAGScheduler:54 - Registering RDD 2 (reduceByKey at /home/deepak/spark/spark-2.3.0-bin-hadoop2.7/bin/helloworld.py:9) 2018-04-29 17:56:48 INFO DAGScheduler:54 - Got job 0 (collect at /home/deepak/spark/spark-2.3.0-bin-hadoop2.7/bin/helloworld.py:10) with 1 output partitions 2018-04-29 17:56:48 INFO DAGScheduler:54 - Final stage: ResultStage 1 (collect at /home/deepak/spark/spark-2.3.0-bin-hadoop2.7/bin/helloworld.py:10) 2018-04-29 17:56:48 INFO DAGScheduler:54 - Parents of final stage: List(ShuffleMapStage 0) 2018-04-29 17:56:48 INFO DAGScheduler:54 - Missing parents: List(ShuffleMapStage 0) 2018-04-29 17:56:48 INFO DAGScheduler:54 - Submitting ShuffleMapStage 0 (PairwiseRDD[2] at reduceByKey at /home/deepak/spark/spark-2.3.0-bin-hadoop2.7/bin/helloworld.py:9), which has no missing parents 2018-04-29 17:56:48 INFO MemoryStore:54 - Block broadcast_0 stored as values in memory (estimated size 7.1 KB, free 366.3 MB) 2018-04-29 17:56:48 INFO MemoryStore:54 - Block broadcast_0_piece0 stored as bytes in memory (estimated size 4.8 KB, free 366.3 MB) 2018-04-29 17:56:48 INFO BlockManagerInfo:54 - Added broadcast_0_piece0 in memory on 10.0.2.15:41058 (size: 4.8 KB, free: 366.3 MB) 2018-04-29 17:56:48 INFO SparkContext:54 - Created broadcast 0 from broadcast at DAGScheduler.scala:1039 2018-04-29 17:56:48 INFO DAGScheduler:54 - Submitting 1 missing tasks from ShuffleMapStage 0 (PairwiseRDD[2] at reduceByKey at /home/deepak/spark/spark-2.3.0-bin-hadoop2.7/bin/helloworld.py:9) (first 15 tasks are for partitions Vector(0)) 2018-04-29 17:56:48 INFO TaskSchedulerImpl:54 - Adding task set 0.0 with 1 tasks 2018-04-29 17:56:48 INFO TaskSetManager:54 - Starting task 0.0 in stage 0.0 (TID 0, localhost, executor driver, partition 0, PROCESS_LOCAL, 7916 bytes) 2018-04-29 17:56:48 INFO Executor:54 - Running task 0.0 in stage 0.0 (TID 0) 2018-04-29 17:56:48 INFO Executor:54 - Fetching file:/home/deepak/spark/spark-2.3.0-bin-hadoop2.7/bin/helloworld.py with timestamp 1525004807219 2018-04-29 17:56:48 INFO Utils:54 - /home/deepak/spark/spark-2.3.0-bin-hadoop2.7/bin/helloworld.py has been previously copied to /tmp/spark-a4e5f638-8582-4976-a29f-2fcbc7f1a046/userFiles-bb2ea523-cffd-406c-8a23-d37e238742ac/helloworld.py 2018-04-29 17:56:49 INFO PythonRunner:54 - Times: total = 366, boot = 355, init = 10, finish = 1 2018-04-29 17:56:49 INFO Executor:54 - Finished task 0.0 in stage 0.0 (TID 0). 1478 bytes result sent to driver 2018-04-29 17:56:49 INFO TaskSetManager:54 - Finished task 0.0 in stage 0.0 (TID 0) in 787 ms on localhost (executor driver) (1/1) 2018-04-29 17:56:49 INFO TaskSchedulerImpl:54 - Removed TaskSet 0.0, whose tasks have all completed, from pool 2018-04-29 17:56:49 INFO DAGScheduler:54 - ShuffleMapStage 0 (reduceByKey at /home/deepak/spark/spark-2.3.0-bin-hadoop2.7/bin/helloworld.py:9) finished in 1.039 s 2018-04-29 17:56:49 INFO DAGScheduler:54 - looking for newly runnable stages 2018-04-29 17:56:49 INFO DAGScheduler:54 - running: Set() 2018-04-29 17:56:49 INFO DAGScheduler:54 - waiting: Set(ResultStage 1) 2018-04-29 17:56:49 INFO DAGScheduler:54 - failed: Set() 2018-04-29 17:56:49 INFO DAGScheduler:54 - Submitting ResultStage 1 (PythonRDD[5] at collect at /home/deepak/spark/spark-2.3.0-bin-hadoop2.7/bin/helloworld.py:10), which has no missing parents 2018-04-29 17:56:49 INFO MemoryStore:54 - Block broadcast_1 stored as values in memory (estimated size 7.5 KB, free 366.3 MB) 2018-04-29 17:56:49 INFO MemoryStore:54 - Block broadcast_1_piece0 stored as bytes in memory (estimated size 4.8 KB, free 366.3 MB) 2018-04-29 17:56:49 INFO BlockManagerInfo:54 - Added broadcast_1_piece0 in memory on 10.0.2.15:41058 (size: 4.8 KB, free: 366.3 MB) 2018-04-29 17:56:49 INFO SparkContext:54 - Created broadcast 1 from broadcast at DAGScheduler.scala:1039 2018-04-29 17:56:49 INFO DAGScheduler:54 - Submitting 1 missing tasks from ResultStage 1 (PythonRDD[5] at collect at /home/deepak/spark/spark-2.3.0-bin-hadoop2.7/bin/helloworld.py:10) (first 15 tasks are for partitions Vector(0)) 2018-04-29 17:56:49 INFO TaskSchedulerImpl:54 - Adding task set 1.0 with 1 tasks 2018-04-29 17:56:49 INFO TaskSetManager:54 - Starting task 0.0 in stage 1.0 (TID 1, localhost, executor driver, partition 0, ANY, 7649 bytes) 2018-04-29 17:56:49 INFO Executor:54 - Running task 0.0 in stage 1.0 (TID 1) 2018-04-29 17:56:49 INFO ShuffleBlockFetcherIterator:54 - Getting 1 non-empty blocks out of 1 blocks 2018-04-29 17:56:49 INFO ShuffleBlockFetcherIterator:54 - Started 0 remote fetches in 5 ms 2018-04-29 17:56:49 INFO BlockManagerInfo:54 - Removed broadcast_0_piece0 on 10.0.2.15:41058 in memory (size: 4.8 KB, free: 366.3 MB) 2018-04-29 17:56:49 INFO PythonRunner:54 - Times: total = 40, boot = -491, init = 530, finish = 1 2018-04-29 17:56:49 INFO Executor:54 - Finished task 0.0 in stage 1.0 (TID 1). 1766 bytes result sent to driver 2018-04-29 17:56:49 INFO TaskSetManager:54 - Finished task 0.0 in stage 1.0 (TID 1) in 139 ms on localhost (executor driver) (1/1) 2018-04-29 17:56:49 INFO TaskSchedulerImpl:54 - Removed TaskSet 1.0, whose tasks have all completed, from pool 2018-04-29 17:56:49 INFO DAGScheduler:54 - ResultStage 1 (collect at /home/deepak/spark/spark-2.3.0-bin-hadoop2.7/bin/helloworld.py:10) finished in 0.195 s 2018-04-29 17:56:49 INFO DAGScheduler:54 - Job 0 finished: collect at /home/deepak/spark/spark-2.3.0-bin-hadoop2.7/bin/helloworld.py:10, took 1.451364 s l: 3 o: 2 H: 1 e: 1 : 1 W: 1 r: 1 d: 1 2018-04-29 17:56:49 INFO SparkContext:54 - Invoking stop() from shutdown hook 2018-04-29 17:56:49 INFO AbstractConnector:318 - Stopped Spark@65fc67fa{HTTP/1.1,[http/1.1]}{0.0.0.0:4040} 2018-04-29 17:56:49 INFO SparkUI:54 - Stopped Spark web UI at http://10.0.2.15:4040 2018-04-29 17:56:49 INFO MapOutputTrackerMasterEndpoint:54 - MapOutputTrackerMasterEndpoint stopped! 2018-04-29 17:56:49 INFO MemoryStore:54 - MemoryStore cleared 2018-04-29 17:56:49 INFO BlockManager:54 - BlockManager stopped 2018-04-29 17:56:49 INFO BlockManagerMaster:54 - BlockManagerMaster stopped 2018-04-29 17:56:49 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint:54 - OutputCommitCoordinator stopped! 2018-04-29 17:56:49 INFO SparkContext:54 - Successfully stopped SparkContext 2018-04-29 17:56:49 INFO ShutdownHookManager:54 - Shutdown hook called 2018-04-29 17:56:49 INFO ShutdownHookManager:54 - Deleting directory /tmp/spark-a4e5f638-8582-4976-a29f-2fcbc7f1a046 2018-04-29 17:56:49 INFO ShutdownHookManager:54 - Deleting directory /tmp/spark-1e4f3443-b12e-427a-a0c5-dc46ed4dc113 2018-04-29 17:56:49 INFO ShutdownHookManager:54 - Deleting directory /tmp/spark-a4e5f638-8582-4976-a29f-2fcbc7f1a046/pyspark-006ca0f1-db48-4f95-9f15-11856131e4dc deepak@deepak-VirtualBox:~/spark/spark-2.3.0-bin-hadoop2.7/bin$
Congrats you have successfully executed your program from command line. If your job is long running job you can see the details by opening the url http://localhost:4040 in browser. It will show the in the following format in browser:
In this tutorial you learned to Run PySpark script from command line.
Check more tutorials at: PySpark Tutorials - Learning PySpark from beginning.