Task Scheduler

Create the Scheduler

Create a file named task_scheduler.py and add the following code:

pythonCopy codeimport time
import json
import os
from threading import Thread
from datetime import datetime, timedelta

class TaskScheduler:
    def __init__(self, filename='tasks.json'):
        self.filename = filename
        self.tasks = self.load_tasks()
        self.running = True
        self.thread = Thread(target=self.run_tasks)
        self.thread.start()

    def load_tasks(self):
        if os.path.exists(self.filename):
            with open(self.filename, 'r') as file:
                return json.load(file)
        return []

    def save_tasks(self):
        with open(self.filename, 'w') as file:
            json.dump(self.tasks, file)

    def add_task(self, task_name, run_time):
        task = {
            'name': task_name,
            'run_time': run_time
        }
        self.tasks.append(task)
        self.save_tasks()
        print(f"Task '{task_name}' scheduled for {run_time}.")

    def view_tasks(self):
        if not self.tasks:
            print("No tasks scheduled.")
            return
        print("\nScheduled Tasks:")
        for i, task in enumerate(self.tasks, 1):
            print(f"{i}. {task['name']} - Scheduled for: {task['run_time']}")

    def run_tasks(self):
        while self.running:
            current_time = datetime.now().strftime('%Y-%m-%d %H:%M')
            for task in self.tasks:
                if task['run_time'] == current_time:
                    print(f"Running task: {task['name']}")
                    self.tasks.remove(task)
                    self.save_tasks()
            time.sleep(60)  # Check every minute

    def stop(self):
        self.running = False
        self.thread.join()

def main():
    scheduler = TaskScheduler()
    
    while True:
        print("\nTask Scheduler")
        print("1. Add Task")
        print("2. View Tasks")
        print("3. Quit")
        
        choice = input("Choose an option (1-3): ")

        if choice == '1':
            task_name = input("Enter the task name: ")
            run_time = input("Enter the run time (YYYY-MM-DD HH:MM): ")
            scheduler.add_task(task_name, run_time)
        elif choice == '2':
            scheduler.view_tasks()
        elif choice == '3':
            scheduler.stop()
            print("Exiting the task scheduler.")
            break
        else:
            print("Invalid choice. Please try again.")

if __name__ == "__main__":
    main()

Step 2: Running the Task Scheduler

  1. Open your terminal (or command prompt).
  2. Navigate to the directory where you saved task_scheduler.py.
  3. Run the script using the command:bashCopy codepython task_scheduler.py

How It Works

  • Task Scheduling: You can add tasks by specifying the name and the time to run (in YYYY-MM-DD HH:MM format). Tasks will be stored in a JSON file (tasks.json).
  • Task Execution: The scheduler checks every minute to see if any tasks need to be executed based on the current time.
  • Threading: A separate thread runs the task-checking loop, allowing the main program to accept user input simultaneously.

Example Usage

  1. Add a Task: Choose option 1, enter a task name, and specify the time you want it to run.
  2. View Scheduled Tasks: Choose option 2 to see all your scheduled tasks.
  3. Quit: Choose option 3 to exit the scheduler.

Comments

Leave a Reply

Your email address will not be published. Required fields are marked *